Browse Source

Improve check foro timed out/scheduled jobs

Jordi Boggiano 7 years ago
parent
commit
480eaac2b2

+ 16 - 4
src/Packagist/WebBundle/Service/QueueWorker.php

@@ -37,7 +37,7 @@ class QueueWorker
 
         $this->logger->info('Waiting for new messages');
 
-        $this->doctrine->getEntityManager()->getRepository(Job::class)->markTimedOutJobs();
+        $nextTimedoutJobCheck = $this->checkForTimedoutJobs();
         $nextScheduledJobCheck = $this->checkForScheduledJobs($signal);
 
         while ($this->processedJobs++ < $count) {
@@ -46,7 +46,11 @@ class QueueWorker
                 break;
             }
 
-            if ($nextScheduledJobCheck <= time()) {
+            $now = time();
+            if ($nextTimedoutJobCheck <= $now) {
+                $nextTimedoutJobCheck = $this->checkForTimedoutJobs();
+            }
+            if ($nextScheduledJobCheck <= $now) {
                 $nextScheduledJobCheck = $this->checkForScheduledJobs($signal);
             }
 
@@ -61,6 +65,14 @@ class QueueWorker
         }
     }
 
+    private function checkForTimedoutJobs(): int
+    {
+        $this->doctrine->getEntityManager()->getRepository(Job::class)->markTimedOutJobs();
+
+        // check for timed out jobs every 20 min at least
+        return time() + 1200;
+    }
+
     private function checkForScheduledJobs(SignalHandler $signal): int
     {
         $em = $this->doctrine->getEntityManager();
@@ -72,8 +84,8 @@ class QueueWorker
             }
         }
 
-        // check for scheduled jobs every 30 sec at least
-        return time() + 30;
+        // check for scheduled jobs every 5 minutes at least
+        return time() + 300;
     }
 
     /**

+ 8 - 4
src/Packagist/WebBundle/Service/Scheduler.php

@@ -56,10 +56,14 @@ class Scheduler
 
     private function getPendingUpdateJob(int $packageId, $updateEqualRefs = false, $deleteBefore = false)
     {
-        $result = $this->doctrine->getManager()->getConnection()->fetchAssoc('SELECT id, payload FROM job WHERE packageId = :package AND status = :status', [
-            'package' => $packageId,
-            'status' => Job::STATUS_QUEUED,
-        ]);
+        $result = $this->doctrine->getManager()->getConnection()->fetchAssoc(
+            'SELECT id, payload FROM job WHERE packageId = :package AND type = :type AND status = :status',
+            [
+                'package' => $packageId,
+                'type' => 'package:updates',
+                'status' => Job::STATUS_QUEUED,
+            ]
+        );
 
         if ($result) {
             $payload = json_decode($result['payload'], true);