QueueWorker.php 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. <?php declare(strict_types=1);
  2. namespace Packagist\WebBundle\Service;
  3. use Predis\Client as Redis;
  4. use Psr\Log\LoggerInterface;
  5. use Symfony\Bridge\Doctrine\RegistryInterface;
  6. use Packagist\WebBundle\Entity\Job;
  7. use Seld\Signal\SignalHandler;
  8. use Packagist\WebBundle\Service\LogResetter;
  9. class QueueWorker
  10. {
  11. private $logResetter;
  12. private $redis;
  13. private $logger;
  14. /** @var RegistryInterface */
  15. private $doctrine;
  16. private $jobWorkers;
  17. private $processedJobs = 0;
  18. public function __construct(LogResetter $logResetter, Redis $redis, RegistryInterface $doctrine, LoggerInterface $logger, array $jobWorkers)
  19. {
  20. $this->logResetter = $logResetter;
  21. $this->redis = $redis;
  22. $this->logger = $logger;
  23. $this->doctrine = $doctrine;
  24. $this->jobWorkers = $jobWorkers;
  25. }
  26. /**
  27. * @param string|int $minPriority
  28. */
  29. public function processMessages(int $count)
  30. {
  31. $signal = SignalHandler::create(null, $this->logger);
  32. $this->logger->info('Waiting for new messages');
  33. $this->doctrine->getEntityManager()->getRepository(Job::class)->markTimedOutJobs();
  34. $nextScheduledJobCheck = $this->checkForScheduledJobs($signal);
  35. while ($this->processedJobs++ < $count) {
  36. if ($signal->isTriggered()) {
  37. $this->logger->debug('Signal received, aborting');
  38. break;
  39. }
  40. if ($nextScheduledJobCheck <= time()) {
  41. $nextScheduledJobCheck = $this->checkForScheduledJobs($signal);
  42. }
  43. $result = $this->redis->brpop('jobs', 10);
  44. if (!$result) {
  45. $this->logger->debug('No message in queue');
  46. continue;
  47. }
  48. $jobId = $result[1];
  49. $this->process($jobId, $signal);
  50. }
  51. }
  52. private function checkForScheduledJobs(SignalHandler $signal): int
  53. {
  54. $em = $this->doctrine->getEntityManager();
  55. $repo = $em->getRepository(Job::class);
  56. foreach ($repo->getScheduledJobIds() as $jobId) {
  57. if ($this->process($jobId, $signal)) {
  58. $this->processedJobs++;
  59. }
  60. }
  61. // check for scheduled jobs every 30 sec at least
  62. return time() + 30;
  63. }
  64. /**
  65. * Calls the configured processor with the job and a callback that must be called to mark the job as processed
  66. */
  67. private function process(string $jobId, SignalHandler $signal): bool
  68. {
  69. $em = $this->doctrine->getEntityManager();
  70. $repo = $em->getRepository(Job::class);
  71. if (!$repo->start($jobId)) {
  72. // race condition, some other worker caught the job first, aborting
  73. return false;
  74. }
  75. $job = $repo->findOneById($jobId);
  76. $this->logger->pushProcessor(function ($record) use ($job) {
  77. $record['extra']['job-id'] = $job->getId();
  78. return $record;
  79. });
  80. $processor = $this->jobWorkers[$job->getType()];
  81. // clears/resets all fingers-crossed handlers to avoid dumping info messages that happened between two job executions
  82. $this->logResetter->reset();
  83. $this->logger->debug('Processing ' . $job->getType() . ' job', ['job' => $job->getPayload()]);
  84. try {
  85. $result = $processor->process($job, $signal);
  86. } catch (\Throwable $e) {
  87. $result = [
  88. 'status' => Job::STATUS_ERRORED,
  89. 'message' => 'An unexpected failure occurred',
  90. 'exception' => $e,
  91. ];
  92. }
  93. if ($result['status'] === Job::STATUS_RESCHEDULE) {
  94. $job->reschedule($result['after']);
  95. $em->flush($job);
  96. // reset logger
  97. $this->logResetter->reset();
  98. $this->logger->popProcessor();
  99. return true;
  100. }
  101. if (!isset($result['message']) || !isset($result['status'])) {
  102. throw new \LogicException('$result must be an array with at least status and message keys');
  103. }
  104. if (!in_array($result['status'], [Job::STATUS_COMPLETED, Job::STATUS_FAILED, Job::STATUS_ERRORED], true)) {
  105. throw new \LogicException('$result[\'status\'] must be one of '.Job::STATUS_COMPLETED.' or '.Job::STATUS_FAILED.', '.$result['status'].' given');
  106. }
  107. if (isset($result['exception'])) {
  108. $result['exceptionMsg'] = $result['exception']->getMessage();
  109. $result['exceptionClass'] = get_class($result['exception']);
  110. }
  111. // If an exception is thrown during a transaction the EntityManager is closed
  112. // and we won't be able to update the job or handle future jobs
  113. if (!$this->doctrine->getEntityManager()->isOpen()) {
  114. $this->doctrine->resetManager();
  115. $em = $this->doctrine->getEntityManager();
  116. $repo = $em->getRepository(Job::class);
  117. }
  118. $job = $repo->findOneById($jobId);
  119. $job->complete($result);
  120. $this->redis->setex('job-'.$job->getId(), 600, json_encode($result));
  121. $em->flush($job);
  122. $em->clear();
  123. if ($result['status'] === Job::STATUS_FAILED) {
  124. $this->logger->warning('Job '.$job->getId().' failed', $result);
  125. } elseif ($result['status'] === Job::STATUS_ERRORED) {
  126. $this->logger->error('Job '.$job->getId().' errored', $result);
  127. }
  128. // clears/resets all fingers-crossed handlers so that if one triggers it doesn't dump the entire debug log for all processed
  129. $this->logResetter->reset();
  130. $this->logger->popProcessor();
  131. return true;
  132. }
  133. }