|
@@ -28,12 +28,35 @@ class Scheduler
|
|
throw new \UnexpectedValueException('Expected Package instance or int package id');
|
|
throw new \UnexpectedValueException('Expected Package instance or int package id');
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ $pendingJobId = $this->getPendingUpdateJob($packageOrId, $updateEqualRefs, $deleteBefore);
|
|
|
|
+ if ($pendingJobId) {
|
|
|
|
+ $pendingJob = $this->doctrine->getManager()->getRepository(Job::class)->findOneBy(['id' => $pendingJobId]);
|
|
|
|
+
|
|
|
|
+ // pending job will execute before the one we are trying to schedule so skip scheduling
|
|
|
|
+ if (
|
|
|
|
+ (!$pendingJob->getExecuteAfter() && $executeAfter)
|
|
|
|
+ || ($pendingJob->getExecuteAfter() && $executeAfter && $pendingJob->getExecuteAfter() < $executeAfter)
|
|
|
|
+ ) {
|
|
|
|
+ return $pendingJob;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // neither job has executeAfter, so the pending one is equivalent to the one we are trying to schedule and we can skip scheduling
|
|
|
|
+ if (!$pendingJob->getExecuteAfter() && !$executeAfter) {
|
|
|
|
+ return $pendingJob;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // pending job will somehow execute after the one we are scheduling so we mark it complete and schedule a new job to run immediately
|
|
|
|
+ $pendingJob->start();
|
|
|
|
+ $pendingJob->complete(['status' => Job::STATUS_COMPLETED, 'message' => 'Another job is attempting to schedule immediately for this package, aborting scheduled-for-later update']);
|
|
|
|
+ $this->doctrine->getManager()->flush($pendingJob);
|
|
|
|
+ }
|
|
|
|
+
|
|
return $this->createJob('package:updates', ['id' => $packageOrId, 'update_equal_refs' => $updateEqualRefs, 'delete_before' => $deleteBefore], $packageOrId, $executeAfter);
|
|
return $this->createJob('package:updates', ['id' => $packageOrId, 'update_equal_refs' => $updateEqualRefs, 'delete_before' => $deleteBefore], $packageOrId, $executeAfter);
|
|
}
|
|
}
|
|
|
|
|
|
- public function hasPendingUpdateJob(int $packageId, $updateEqualRefs = false, $deleteBefore = false): bool
|
|
|
|
|
|
+ private function getPendingUpdateJob(int $packageId, $updateEqualRefs = false, $deleteBefore = false)
|
|
{
|
|
{
|
|
- $result = $this->doctrine->getManager()->getConnection()->fetchAssoc('SELECT payload FROM job WHERE packageId = :package AND status = :status', [
|
|
|
|
|
|
+ $result = $this->doctrine->getManager()->getConnection()->fetchAssoc('SELECT id, payload FROM job WHERE packageId = :package AND status = :status', [
|
|
'package' => $packageId,
|
|
'package' => $packageId,
|
|
'status' => Job::STATUS_QUEUED,
|
|
'status' => Job::STATUS_QUEUED,
|
|
]);
|
|
]);
|
|
@@ -41,11 +64,9 @@ class Scheduler
|
|
if ($result) {
|
|
if ($result) {
|
|
$payload = json_decode($result['payload'], true);
|
|
$payload = json_decode($result['payload'], true);
|
|
if ($payload['update_equal_refs'] === $updateEqualRefs && $payload['delete_before'] === $deleteBefore) {
|
|
if ($payload['update_equal_refs'] === $updateEqualRefs && $payload['delete_before'] === $deleteBefore) {
|
|
- return true;
|
|
|
|
|
|
+ return $result['id'];
|
|
}
|
|
}
|
|
}
|
|
}
|
|
-
|
|
|
|
- return false;
|
|
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|