Scheduler.php 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. <?php declare(strict_types=1);
  2. namespace Packagist\WebBundle\Service;
  3. use Doctrine\Common\Persistence\ManagerRegistry;
  4. use Predis\Client as RedisClient;
  5. use Packagist\WebBundle\Entity\Package;
  6. use Packagist\WebBundle\Entity\Job;
  7. class Scheduler
  8. {
  9. /** @var ManagerRegistry */
  10. private $doctrine;
  11. private $redis;
  12. public function __construct(RedisClient $redis, ManagerRegistry $doctrine)
  13. {
  14. $this->doctrine = $doctrine;
  15. $this->redis = $redis;
  16. }
  17. public function scheduleUpdate($packageOrId, $updateEqualRefs = false, $deleteBefore = false, $executeAfter = null): Job
  18. {
  19. $updateEqualRefs = (bool) $updateEqualRefs;
  20. $deleteBefore = (bool) $deleteBefore;
  21. if ($packageOrId instanceof Package) {
  22. $packageOrId = $packageOrId->getId();
  23. } elseif (!is_int($packageOrId)) {
  24. throw new \UnexpectedValueException('Expected Package instance or int package id');
  25. }
  26. $pendingJobId = $this->getPendingUpdateJob($packageOrId, $updateEqualRefs, $deleteBefore);
  27. if ($pendingJobId) {
  28. $pendingJob = $this->doctrine->getManager()->getRepository(Job::class)->findOneBy(['id' => $pendingJobId]);
  29. // pending job will execute before the one we are trying to schedule so skip scheduling
  30. if (
  31. (!$pendingJob->getExecuteAfter() && $executeAfter)
  32. || ($pendingJob->getExecuteAfter() && $executeAfter && $pendingJob->getExecuteAfter() < $executeAfter)
  33. ) {
  34. return $pendingJob;
  35. }
  36. // neither job has executeAfter, so the pending one is equivalent to the one we are trying to schedule and we can skip scheduling
  37. if (!$pendingJob->getExecuteAfter() && !$executeAfter) {
  38. return $pendingJob;
  39. }
  40. // pending job will somehow execute after the one we are scheduling so we mark it complete and schedule a new job to run immediately
  41. $pendingJob->start();
  42. $pendingJob->complete(['status' => Job::STATUS_COMPLETED, 'message' => 'Another job is attempting to schedule immediately for this package, aborting scheduled-for-later update']);
  43. $this->doctrine->getManager()->flush($pendingJob);
  44. }
  45. return $this->createJob('package:updates', ['id' => $packageOrId, 'update_equal_refs' => $updateEqualRefs, 'delete_before' => $deleteBefore], $packageOrId, $executeAfter);
  46. }
  47. public function scheduleUserScopeMigration(int $userId, string $oldScope, string $newScope): Job
  48. {
  49. return $this->createJob('githubuser:migrate', ['id' => $userId, 'old_scope' => $oldScope, 'new_scope' => $newScope], $userId);
  50. }
  51. private function getPendingUpdateJob(int $packageId, $updateEqualRefs = false, $deleteBefore = false)
  52. {
  53. $result = $this->doctrine->getManager()->getConnection()->fetchAssoc(
  54. 'SELECT id, payload FROM job WHERE packageId = :package AND status = :status AND type = :type LIMIT 1',
  55. [
  56. 'package' => $packageId,
  57. 'type' => 'package:updates',
  58. 'status' => Job::STATUS_QUEUED,
  59. ]
  60. );
  61. if ($result) {
  62. $payload = json_decode($result['payload'], true);
  63. if ($payload['update_equal_refs'] === $updateEqualRefs && $payload['delete_before'] === $deleteBefore) {
  64. return $result['id'];
  65. }
  66. }
  67. }
  68. /**
  69. * @return array [status => x, message => y]
  70. */
  71. public function getJobStatus(string $jobId): array
  72. {
  73. $data = $this->redis->get('job-'.$jobId);
  74. if ($data) {
  75. return json_decode($data, true);
  76. }
  77. return ['status' => 'running', 'message' => ''];
  78. }
  79. /**
  80. * @param Job[] $jobs
  81. * @return array[]
  82. */
  83. public function getJobsStatus(array $jobs): array
  84. {
  85. $results = [];
  86. foreach ($jobs as $job) {
  87. $jobId = $job->getId();
  88. $data = $this->redis->get('job-'.$jobId);
  89. if ($data) {
  90. $results[$jobId] = json_decode($data, true);
  91. } else {
  92. $results[$jobId] = ['status' => $job->getStatus()];
  93. }
  94. }
  95. return $results;
  96. }
  97. private function createJob(string $type, array $payload, $packageId = null, $executeAfter = null): Job
  98. {
  99. $jobId = bin2hex(random_bytes(20));
  100. $job = new Job();
  101. $job->setId($jobId);
  102. $job->setType($type);
  103. $job->setPayload($payload);
  104. $job->setCreatedAt(new \DateTime());
  105. if ($packageId) {
  106. $job->setPackageId($packageId);
  107. }
  108. if ($executeAfter instanceof \DateTimeInterface) {
  109. $job->setExecuteAfter($executeAfter);
  110. }
  111. $em = $this->doctrine->getManager();
  112. $em->persist($job);
  113. $em->flush();
  114. // trigger immediately if not scheduled for later
  115. if (!$job->getExecuteAfter()) {
  116. $this->redis->lpush('jobs', $job->getId());
  117. }
  118. return $job;
  119. }
  120. }