You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

115 lines
3.7 KiB

<?php
namespace JobLock;
use DateTime;
use Doctrine\ORM\EntityManager;
/**
* Simple service that persist lock information into database. Used when application with multiple
* instances needs to run a job or some other procedure only once and avoid other instances spawning
* concurrent processes.
*
* @author Jan Pavlíček <jan@pavlicek.dev>
*/
class JobLockService
{
// determines default expiration date for locks - after this date, locks are not considered valid even if not released
const MAXIMUM_LOCK_DURATION = '23 hours 50 minutes';
// determines how long are expired locks left in the database
const MAXIMUM_LOCK_LIFETIME = '3 days';
// determines minimum sleep duration when the job is invoked - used to avoid running the job multiple times
// from different containers, along with WAIT_MS_MAX - a value is assigned randomly between those two boundaries
// to create artifical inacurracy and give enough time for locks to be created in other processes
const WAIT_MS_MIN = 0;
// upper boundry, same as WAIT_MS_MIN but it is also used as a timeout between job finish and lock release
const WAIT_MS_MAX = 20000;
protected EntityManager $em;
public function __construct(EntityManager $em)
{
$this->em = $em;
}
/**
* Create a lock for a job with identifier. If the job is already locked, throw an exception.
* Uses a random short sleep duration to avoid concurrent runs not seeing each others locks.
*
* @throws Common\JobLock\JobLockedException
*/
public function acquireLock(string $identifier) : void
{
// random sleep is used to avoid too much accuracy which can lead to other instance starting
// the job while the lock is being created
usleep(rand(self::WAIT_MS_MIN * 1000, self::WAIT_MS_MAX * 1000));
// check for current locks that have not expired yet
$current_lock = $this->em->createQuery('SELECT l FROM ' . JobLock::class . ' l
WHERE l.identifier = :identifier
AND l.expires > CURRENT_DATE()
AND l.released IS NULL')
->setParameter('identifier', $identifier)
->getResult();
if ($current_lock) {
throw new JobLockedException;
}
// persist the new lock
$lock = new JobLock;
$lock->setIdentifier($identifier);
$lock->setExpires(new DateTime('+' . self::MAXIMUM_LOCK_DURATION));
$lock->setAcquired(new DateTime);
$this->em->persist($lock);
$this->em->flush();
}
/**
* find all unreleased locks for given identifier and release them
*/
public function releaseLock(string $identifier) : void
{
// sleep for maximum amount before releasing the lock to avoid another process with higher
// wait time to start after the lock is released
usleep(self::WAIT_MS_MAX * 1000);
$locks = $this->em->getRepository(JobLock::class)->findBy(['identifier' => $identifier, 'released' => null]);
$now = new DateTime;
foreach ($locks as $lock) {
$lock->setReleased($now);
}
$this->em->flush();
$this->cleanupExpiredLocks();
}
/**
* Removes all locks that have expiration date further in the past than the maximum lock lifetime
* config constant from the database
*/
public function cleanupExpiredLocks() : void
{
$threshold = new DateTime('-' . self::MAXIMUM_LOCK_LIFETIME);
$this->em->createQueryBuilder()
->delete(JobLock::class, 'l')
->where('l.expires <= :threshold')
->setParameter('threshold', $threshold)
->getQuery()
->execute();
}
}