*/ class JobLockService { // determines default expiration date for locks - after this date, locks are not considered valid even if not released const MAXIMUM_LOCK_DURATION = '23 hours 50 minutes'; // determines how long are expired locks left in the database const MAXIMUM_LOCK_LIFETIME = '3 days'; // determines minimum sleep duration when the job is invoked - used to avoid running the job multiple times // from different containers, along with WAIT_MS_MAX - a value is assigned randomly between those two boundaries // to create artifical inacurracy and give enough time for locks to be created in other processes const WAIT_MS_MIN = 0; // upper boundry, same as WAIT_MS_MIN but it is also used as a timeout between job finish and lock release const WAIT_MS_MAX = 20000; protected EntityManager $em; public function __construct(EntityManager $em) { $this->em = $em; } /** * Create a lock for a job with identifier. If the job is already locked, throw an exception. * Uses a random short sleep duration to avoid concurrent runs not seeing each others locks. * * @throws Common\JobLock\JobLockedException */ public function acquireLock(string $identifier) : void { // random sleep is used to avoid too much accuracy which can lead to other instance starting // the job while the lock is being created usleep(rand(self::WAIT_MS_MIN * 1000, self::WAIT_MS_MAX * 1000)); // check for current locks that have not expired yet $current_lock = $this->em->createQuery('SELECT l FROM ' . JobLock::class . ' l WHERE l.identifier = :identifier AND l.expires > CURRENT_DATE() AND l.released IS NULL') ->setParameter('identifier', $identifier) ->getResult(); if ($current_lock) { throw new JobLockedException; } // persist the new lock $lock = new JobLock; $lock->setIdentifier($identifier); $lock->setExpires(new DateTime('+' . self::MAXIMUM_LOCK_DURATION)); $lock->setAcquired(new DateTime); $this->em->persist($lock); $this->em->flush(); } /** * find all unreleased locks for given identifier and release them */ public function releaseLock(string $identifier) : void { // sleep for maximum amount before releasing the lock to avoid another process with higher // wait time to start after the lock is released usleep(self::WAIT_MS_MAX * 1000); $locks = $this->em->getRepository(JobLock::class)->findBy(['identifier' => $identifier, 'released' => null]); $now = new DateTime; foreach ($locks as $lock) { $lock->setReleased($now); } $this->em->flush(); $this->cleanupExpiredLocks(); } /** * Removes all locks that have expiration date further in the past than the maximum lock lifetime * config constant from the database */ public function cleanupExpiredLocks() : void { $threshold = new DateTime('-' . self::MAXIMUM_LOCK_LIFETIME); $this->em->createQueryBuilder() ->delete(JobLock::class, 'l') ->where('l.expires <= :threshold') ->setParameter('threshold', $threshold) ->getQuery() ->execute(); } }