QueueWorker.php 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. <?php declare(strict_types=1);
  2. namespace Packagist\WebBundle\Service;
  3. use Predis\Client as Redis;
  4. use Psr\Log\LoggerInterface;
  5. use Symfony\Bridge\Doctrine\RegistryInterface;
  6. use Packagist\WebBundle\Entity\Job;
  7. use Seld\Signal\SignalHandler;
  8. use Graze\DogStatsD\Client as StatsDClient;
  9. class QueueWorker
  10. {
  11. private $redis;
  12. private $logger;
  13. /** @var RegistryInterface */
  14. private $doctrine;
  15. private $jobWorkers;
  16. private $processedJobs = 0;
  17. /** @var StatsDClient */
  18. private $statsd;
  19. public function __construct(Redis $redis, RegistryInterface $doctrine, LoggerInterface $logger, array $jobWorkers, StatsDClient $statsd)
  20. {
  21. $this->redis = $redis;
  22. $this->logger = $logger;
  23. $this->doctrine = $doctrine;
  24. $this->jobWorkers = $jobWorkers;
  25. $this->statsd = $statsd;
  26. }
  27. /**
  28. * @param string|int $minPriority
  29. */
  30. public function processMessages(int $count)
  31. {
  32. $signal = SignalHandler::create(null, $this->logger);
  33. $this->logger->info('Waiting for new messages');
  34. $nextTimedoutJobCheck = $this->checkForTimedoutJobs();
  35. $nextScheduledJobCheck = $this->checkForScheduledJobs($signal);
  36. while ($this->processedJobs++ < $count) {
  37. if ($signal->isTriggered()) {
  38. $this->logger->debug('Signal received, aborting');
  39. break;
  40. }
  41. $now = time();
  42. if ($nextTimedoutJobCheck <= $now) {
  43. $nextTimedoutJobCheck = $this->checkForTimedoutJobs();
  44. }
  45. if ($nextScheduledJobCheck <= $now) {
  46. $nextScheduledJobCheck = $this->checkForScheduledJobs($signal);
  47. }
  48. $result = $this->redis->brpop('jobs', 10);
  49. if (!$result) {
  50. continue;
  51. }
  52. $jobId = $result[1];
  53. $this->process($jobId, $signal);
  54. }
  55. }
  56. private function checkForTimedoutJobs(): int
  57. {
  58. $this->doctrine->getEntityManager()->getRepository(Job::class)->markTimedOutJobs();
  59. // check for timed out jobs every 20 min at least
  60. return time() + 1200;
  61. }
  62. private function checkForScheduledJobs(SignalHandler $signal): int
  63. {
  64. $em = $this->doctrine->getEntityManager();
  65. $repo = $em->getRepository(Job::class);
  66. foreach ($repo->getScheduledJobIds() as $jobId) {
  67. if ($this->process($jobId, $signal)) {
  68. $this->processedJobs++;
  69. }
  70. }
  71. // check for scheduled jobs every 5 minutes at least
  72. return time() + 300;
  73. }
  74. /**
  75. * Calls the configured processor with the job and a callback that must be called to mark the job as processed
  76. */
  77. private function process(string $jobId, SignalHandler $signal): bool
  78. {
  79. $em = $this->doctrine->getEntityManager();
  80. $repo = $em->getRepository(Job::class);
  81. if (!$repo->start($jobId)) {
  82. // race condition, some other worker caught the job first, aborting
  83. return false;
  84. }
  85. $job = $repo->findOneById($jobId);
  86. $this->logger->pushProcessor(function ($record) use ($job) {
  87. $record['extra']['job-id'] = $job->getId();
  88. return $record;
  89. });
  90. $expectedStart = $job->getExecuteAfter() ?: $job->getCreatedAt();
  91. $start = microtime(true);
  92. $this->statsd->timing('worker.queue.waittime', round(($start - $expectedStart->getTimestamp()) * 1000, 4), [
  93. 'jobType' => $job->getType(),
  94. ]);
  95. $processor = $this->jobWorkers[$job->getType()];
  96. $this->logger->reset();
  97. $this->logger->debug('Processing ' . $job->getType() . ' job', ['job' => $job->getPayload()]);
  98. try {
  99. $result = $processor->process($job, $signal);
  100. } catch (\Throwable $e) {
  101. $result = [
  102. 'status' => Job::STATUS_ERRORED,
  103. 'message' => 'An unexpected failure occurred',
  104. 'exception' => $e,
  105. ];
  106. }
  107. $this->statsd->increment('worker.queue.processed', 1, 1, [
  108. 'jobType' => $job->getType(),
  109. 'status' => $result['status'],
  110. ]);
  111. $this->statsd->timing('worker.queue.processtime', round((microtime(true) - $start) * 1000, 4), [
  112. 'jobType' => $job->getType(),
  113. ]);
  114. // If an exception is thrown during a transaction the EntityManager is closed
  115. // and we won't be able to update the job or handle future jobs
  116. if (!$this->doctrine->getEntityManager()->isOpen()) {
  117. $this->doctrine->resetManager();
  118. }
  119. // refetch objects in case the EM was reset during the job run
  120. $em = $this->doctrine->getEntityManager();
  121. $repo = $em->getRepository(Job::class);
  122. if ($result['status'] === Job::STATUS_RESCHEDULE) {
  123. $job->reschedule($result['after']);
  124. $em->flush($job);
  125. $this->logger->reset();
  126. $this->logger->popProcessor();
  127. return true;
  128. }
  129. if (!isset($result['message']) || !isset($result['status'])) {
  130. throw new \LogicException('$result must be an array with at least status and message keys');
  131. }
  132. if (!in_array($result['status'], [Job::STATUS_COMPLETED, Job::STATUS_FAILED, Job::STATUS_ERRORED, Job::STATUS_PACKAGE_GONE, Job::STATUS_PACKAGE_DELETED], true)) {
  133. throw new \LogicException('$result[\'status\'] must be one of '.Job::STATUS_COMPLETED.' or '.Job::STATUS_FAILED.', '.$result['status'].' given');
  134. }
  135. if (isset($result['exception'])) {
  136. $result['exceptionMsg'] = $result['exception']->getMessage();
  137. $result['exceptionClass'] = get_class($result['exception']);
  138. }
  139. $job = $repo->findOneById($jobId);
  140. $job->complete($result);
  141. $this->redis->setex('job-'.$job->getId(), 600, json_encode($result));
  142. $em->flush($job);
  143. $em->clear();
  144. if ($result['status'] === Job::STATUS_FAILED) {
  145. $this->logger->warning('Job '.$job->getId().' failed', $result);
  146. } elseif ($result['status'] === Job::STATUS_ERRORED) {
  147. $this->logger->error('Job '.$job->getId().' errored', $result);
  148. }
  149. $this->logger->reset();
  150. $this->logger->popProcessor();
  151. return true;
  152. }
  153. }