Scheduler.php 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. <?php declare(strict_types=1);
  2. namespace Packagist\WebBundle\Service;
  3. use Doctrine\Common\Persistence\ManagerRegistry;
  4. use Predis\Client as RedisClient;
  5. use Packagist\WebBundle\Entity\Package;
  6. use Packagist\WebBundle\Entity\Job;
  7. class Scheduler
  8. {
  9. /** @var ManagerRegistry */
  10. private $doctrine;
  11. private $redis;
  12. public function __construct(RedisClient $redis, ManagerRegistry $doctrine)
  13. {
  14. $this->doctrine = $doctrine;
  15. $this->redis = $redis;
  16. }
  17. public function scheduleUpdate($packageOrId, $updateEqualRefs = false, $deleteBefore = false, $executeAfter = null): Job
  18. {
  19. if ($packageOrId instanceof Package) {
  20. $packageOrId = $packageOrId->getId();
  21. } elseif (!is_int($packageOrId)) {
  22. throw new \UnexpectedValueException('Expected Package instance or int package id');
  23. }
  24. $pendingJobId = $this->getPendingUpdateJob($packageOrId, $updateEqualRefs, $deleteBefore);
  25. if ($pendingJobId) {
  26. $pendingJob = $this->doctrine->getManager()->getRepository(Job::class)->findOneBy(['id' => $pendingJobId]);
  27. // pending job will execute before the one we are trying to schedule so skip scheduling
  28. if (
  29. (!$pendingJob->getExecuteAfter() && $executeAfter)
  30. || ($pendingJob->getExecuteAfter() && $executeAfter && $pendingJob->getExecuteAfter() < $executeAfter)
  31. ) {
  32. return $pendingJob;
  33. }
  34. // neither job has executeAfter, so the pending one is equivalent to the one we are trying to schedule and we can skip scheduling
  35. if (!$pendingJob->getExecuteAfter() && !$executeAfter) {
  36. return $pendingJob;
  37. }
  38. // pending job will somehow execute after the one we are scheduling so we mark it complete and schedule a new job to run immediately
  39. $pendingJob->start();
  40. $pendingJob->complete(['status' => Job::STATUS_COMPLETED, 'message' => 'Another job is attempting to schedule immediately for this package, aborting scheduled-for-later update']);
  41. $this->doctrine->getManager()->flush($pendingJob);
  42. }
  43. return $this->createJob('package:updates', ['id' => $packageOrId, 'update_equal_refs' => $updateEqualRefs, 'delete_before' => $deleteBefore], $packageOrId, $executeAfter);
  44. }
  45. private function getPendingUpdateJob(int $packageId, $updateEqualRefs = false, $deleteBefore = false)
  46. {
  47. $result = $this->doctrine->getManager()->getConnection()->fetchAssoc(
  48. 'SELECT id, payload FROM job WHERE packageId = :package AND type = :type AND status = :status',
  49. [
  50. 'package' => $packageId,
  51. 'type' => 'package:updates',
  52. 'status' => Job::STATUS_QUEUED,
  53. ]
  54. );
  55. if ($result) {
  56. $payload = json_decode($result['payload'], true);
  57. if ($payload['update_equal_refs'] === $updateEqualRefs && $payload['delete_before'] === $deleteBefore) {
  58. return $result['id'];
  59. }
  60. }
  61. }
  62. /**
  63. * @return array [status => x, message => y]
  64. */
  65. public function getJobStatus(string $jobId): array
  66. {
  67. $data = $this->redis->get('job-'.$jobId);
  68. if ($data) {
  69. return json_decode($data, true);
  70. }
  71. return ['status' => 'running', 'message' => ''];
  72. }
  73. /**
  74. * @param Job[] $jobs
  75. * @return array[]
  76. */
  77. public function getJobsStatus(array $jobs): array
  78. {
  79. $results = [];
  80. foreach ($jobs as $job) {
  81. $jobId = $job->getId();
  82. $data = $this->redis->get('job-'.$jobId);
  83. if ($data) {
  84. $results[$jobId] = json_decode($data, true);
  85. } else {
  86. $results[$jobId] = ['status' => $job->getStatus()];
  87. }
  88. }
  89. return $results;
  90. }
  91. private function createJob(string $type, array $payload, $packageId = null, $executeAfter = null): Job
  92. {
  93. $jobId = bin2hex(random_bytes(20));
  94. $job = new Job();
  95. $job->setId($jobId);
  96. $job->setType($type);
  97. $job->setPayload($payload);
  98. $job->setCreatedAt(new \DateTime());
  99. if ($packageId) {
  100. $job->setPackageId($packageId);
  101. }
  102. if ($executeAfter instanceof \DateTimeInterface) {
  103. $job->setExecuteAfter($executeAfter);
  104. }
  105. $em = $this->doctrine->getManager();
  106. $em->persist($job);
  107. $em->flush();
  108. // trigger immediately if not scheduled for later
  109. if (!$job->getExecuteAfter()) {
  110. $this->redis->lpush('jobs', $job->getId());
  111. }
  112. return $job;
  113. }
  114. }