123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169 |
- <?php declare(strict_types=1);
- namespace Packagist\WebBundle\Service;
- use Predis\Client as Redis;
- use Psr\Log\LoggerInterface;
- use Symfony\Bridge\Doctrine\RegistryInterface;
- use Packagist\WebBundle\Entity\Job;
- use Seld\Signal\SignalHandler;
- use Packagist\WebBundle\Service\LogResetter;
- class QueueWorker
- {
- private $logResetter;
- private $redis;
- private $logger;
- /** @var RegistryInterface */
- private $doctrine;
- private $jobWorkers;
- private $processedJobs = 0;
- public function __construct(LogResetter $logResetter, Redis $redis, RegistryInterface $doctrine, LoggerInterface $logger, array $jobWorkers)
- {
- $this->logResetter = $logResetter;
- $this->redis = $redis;
- $this->logger = $logger;
- $this->doctrine = $doctrine;
- $this->jobWorkers = $jobWorkers;
- }
- /**
- * @param string|int $minPriority
- */
- public function processMessages(int $count)
- {
- $signal = SignalHandler::create(null, $this->logger);
- $this->logger->info('Waiting for new messages');
- $this->doctrine->getEntityManager()->getRepository(Job::class)->markTimedOutJobs();
- $nextScheduledJobCheck = $this->checkForScheduledJobs($signal);
- while ($this->processedJobs++ < $count) {
- if ($signal->isTriggered()) {
- $this->logger->debug('Signal received, aborting');
- break;
- }
- if ($nextScheduledJobCheck <= time()) {
- $nextScheduledJobCheck = $this->checkForScheduledJobs($signal);
- }
- $result = $this->redis->brpop('jobs', 10);
- if (!$result) {
- $this->logger->debug('No message in queue');
- continue;
- }
- $jobId = $result[1];
- $this->process($jobId, $signal);
- }
- }
- private function checkForScheduledJobs(SignalHandler $signal): int
- {
- $em = $this->doctrine->getEntityManager();
- $repo = $em->getRepository(Job::class);
- foreach ($repo->getScheduledJobIds() as $jobId) {
- if ($this->process($jobId, $signal)) {
- $this->processedJobs++;
- }
- }
- // check for scheduled jobs every 30 sec at least
- return time() + 30;
- }
- /**
- * Calls the configured processor with the job and a callback that must be called to mark the job as processed
- */
- private function process(string $jobId, SignalHandler $signal): bool
- {
- $em = $this->doctrine->getEntityManager();
- $repo = $em->getRepository(Job::class);
- if (!$repo->start($jobId)) {
- // race condition, some other worker caught the job first, aborting
- return false;
- }
- $job = $repo->findOneById($jobId);
- $this->logger->pushProcessor(function ($record) use ($job) {
- $record['extra']['job-id'] = $job->getId();
- return $record;
- });
- $processor = $this->jobWorkers[$job->getType()];
- // clears/resets all fingers-crossed handlers to avoid dumping info messages that happened between two job executions
- $this->logResetter->reset();
- $this->logger->debug('Processing ' . $job->getType() . ' job', ['job' => $job->getPayload()]);
- try {
- $result = $processor->process($job, $signal);
- } catch (\Throwable $e) {
- $result = [
- 'status' => Job::STATUS_ERRORED,
- 'message' => 'An unexpected failure occurred',
- 'exception' => $e,
- ];
- }
- if ($result['status'] === Job::STATUS_RESCHEDULE) {
- $job->reschedule($result['after']);
- $em->flush($job);
- // reset logger
- $this->logResetter->reset();
- $this->logger->popProcessor();
- return true;
- }
- if (!isset($result['message']) || !isset($result['status'])) {
- throw new \LogicException('$result must be an array with at least status and message keys');
- }
- if (!in_array($result['status'], [Job::STATUS_COMPLETED, Job::STATUS_FAILED, Job::STATUS_ERRORED], true)) {
- throw new \LogicException('$result[\'status\'] must be one of '.Job::STATUS_COMPLETED.' or '.Job::STATUS_FAILED.', '.$result['status'].' given');
- }
- if (isset($result['exception'])) {
- $result['exceptionMsg'] = $result['exception']->getMessage();
- $result['exceptionClass'] = get_class($result['exception']);
- }
- // If an exception is thrown during a transaction the EntityManager is closed
- // and we won't be able to update the job or handle future jobs
- if (!$this->doctrine->getEntityManager()->isOpen()) {
- $this->doctrine->resetManager();
- $em = $this->doctrine->getEntityManager();
- $repo = $em->getRepository(Job::class);
- }
- $job = $repo->findOneById($jobId);
- $job->complete($result);
- $this->redis->setex('job-'.$job->getId(), 600, json_encode($result));
- $em->flush($job);
- $em->clear();
- if ($result['status'] === Job::STATUS_FAILED) {
- $this->logger->warning('Job '.$job->getId().' failed', $result);
- } elseif ($result['status'] === Job::STATUS_ERRORED) {
- $this->logger->error('Job '.$job->getId().' errored', $result);
- }
- // clears/resets all fingers-crossed handlers so that if one triggers it doesn't dump the entire debug log for all processed
- $this->logResetter->reset();
- $this->logger->popProcessor();
- return true;
- }
- }
|