Browse Source

Avoid starting all jobs immediately

Jordi Boggiano 6 years ago
2 changed files with 92 additions and 54 deletions
  1. 14 22
  2. 78 32

+ 14 - 22

@@ -95,12 +95,17 @@ class CurlDownloader
     public function download($resolve, $reject, $origin, $url, $options, $copyTo = null)
-        return $this->initDownload($resolve, $reject, $origin, $url, $options, $copyTo);
+        $attributes = array();
+        if (isset($options['retry-auth-failure'])) {
+            $attributes['retryAuthFailure'] = $options['retry-auth-failure'];
+            unset($options['retry-auth-failure']);
+        }
+        return $this->initDownload($resolve, $reject, $origin, $url, $options, $copyTo, $attributes);
     private function initDownload($resolve, $reject, $origin, $url, $options, $copyTo = null, array $attributes = array())
-        // TODO allow setting attributes somehow
         $attributes = array_merge(array(
             'retryAuthFailure' => true,
             'redirects' => 1,
@@ -193,12 +198,12 @@ class CurlDownloader
         $this->io->writeError('Downloading ' . $url . $usingProxy . $ifModified, true, IOInterface::DEBUG);
         $this->checkCurlResult(curl_multi_add_handle($this->multiHandle, $curlHandle));
+// TODO progress
         //$params['notification'](STREAM_NOTIFY_RESOLVE, STREAM_NOTIFY_SEVERITY_INFO, '', 0, 0, 0, false);
     public function tick()
-        // TODO check we have active handles before doing this
         if (!$this->jobs) {
@@ -229,6 +234,7 @@ class CurlDownloader
                 $statusCode = null;
                 $response = null;
                 try {
+// TODO progress
                     //$this->onProgress($curlHandle, $job['callback'], $progress, $job['progress']);
                     if (CURLE_OK !== $errno) {
                         throw new TransportException($error);
@@ -263,7 +269,6 @@ class CurlDownloader
                     // handle 3xx redirects, 304 Not Modified is excluded
                     if ($statusCode >= 300 && $statusCode <= 399 && $statusCode !== 304 && $job['redirects'] < $this->maxRedirects) {
-                        // TODO
                         $location = $this->handleRedirect($job, $response);
                         if ($location) {
                             $this->restartJob($job, $location, array('redirects' => $job['attributes']['redirects'] + 1));
@@ -274,6 +279,7 @@ class CurlDownloader
                     // fail 4xx and 5xx responses and capture the response
                     if ($statusCode >= 400 && $statusCode <= 599) {
                         throw $this->failResponse($job, $response, $response->getStatusMessage());
+// TODO progress
 //                        $this->io->overwriteError("Downloading (<error>failed</error>)", false);
@@ -320,24 +326,13 @@ class CurlDownloader
                 if ($this->jobs[$i]['progress'] !== $progress) {
                     $previousProgress = $this->jobs[$i]['progress'];
                     $this->jobs[$i]['progress'] = $progress;
-                    try {
-                        //$this->onProgress($curlHandle, $this->jobs[$i]['callback'], $progress, $previousProgress);
-                    } catch (TransportException $e) {
-                        var_dump('Caught '.$e->getMessage());die;
-                        unset($this->jobs[$i]);
-                        curl_multi_remove_handle($this->multiHandle, $curlHandle);
-                        curl_close($curlHandle);
-                        fclose($job['headerHandle']);
-                        fclose($job['bodyHandle']);
-                        if ($job['filename']) {
-                            @unlink($job['filename'].'~');
-                        }
-                        call_user_func($job['reject'], $e);
-                    }
+                    // TODO
+                    //$this->onProgress($curlHandle, $this->jobs[$i]['callback'], $progress, $previousProgress);
         } catch (\Exception $e) {
+            // TODO
             var_dump('Caught2', get_class($e), $e->getMessage(), $e);die;
@@ -444,13 +439,10 @@ class CurlDownloader
     private function onProgress($curlHandle, callable $notify, array $progress, array $previousProgress)
+        // TODO add support for progress
         if (300 <= $progress['http_code'] && $progress['http_code'] < 400) {
-        if (!$previousProgress['http_code'] && $progress['http_code'] && $progress['http_code'] < 200 || 400 <= $progress['http_code']) {
-            $code = 403 === $progress['http_code'] ? STREAM_NOTIFY_AUTH_RESULT : STREAM_NOTIFY_FAILURE;
-            $notify($code, STREAM_NOTIFY_SEVERITY_ERR, curl_error($curlHandle), $progress['http_code'], 0, 0, false);
-        }
         if ($previousProgress['download_content_length'] < $progress['download_content_length']) {
             $notify(STREAM_NOTIFY_FILE_SIZE_IS, STREAM_NOTIFY_SEVERITY_INFO, '', 0, 0, (int) $progress['download_content_length'], false);

+ 78 - 32

@@ -33,8 +33,8 @@ class HttpDownloader
     private $config;
     private $jobs = array();
     private $options = array();
-    private $index;
-    private $progress;
+    private $runningJobs = 0;
+    private $maxJobs = 10;
     private $lastProgress;
     private $disableTls = false;
     private $curl;
@@ -42,8 +42,6 @@ class HttpDownloader
     private $idGen = 0;
-     * Constructor.
-     *
      * @param IOInterface $io         The IO instance
      * @param Config      $config     The config
      * @param array       $options    The options
@@ -131,35 +129,24 @@ class HttpDownloader
             'status' => self::STATUS_QUEUED,
             'request' => $request,
             'sync' => $sync,
+            'origin' => Url::getOrigin($this->config, $request['url']),
-        $origin = Url::getOrigin($this->config, $job['request']['url']);
         // capture username/password from URL if there is one
         if (preg_match('{^https?://([^:/]+):([^@/]+)@([^/]+)}i', $request['url'], $match)) {
-            $this->io->setAuthentication($origin, rawurldecode($match[1]), rawurldecode($match[2]));
+            $this->io->setAuthentication($job['origin'], rawurldecode($match[1]), rawurldecode($match[2]));
-        $curl = $this->curl;
         $rfs = $this->rfs;
-        $io = $this->io;
-        if ($curl && preg_match('{^https?://}i', $job['request']['url'])) {
-            $resolver = function ($resolve, $reject) use (&$job, $curl, $origin) {
-                // start job
-                $url = $job['request']['url'];
-                $options = $job['request']['options'];
-                $job['status'] = HttpDownloader::STATUS_STARTED;
-                if ($job['request']['copyTo']) {
-                    $curl->download($resolve, $reject, $origin, $url, $options, $job['request']['copyTo']);
-                } else {
-                    $curl->download($resolve, $reject, $origin, $url, $options);
-                }
+        if ($this->curl && preg_match('{^https?://}i', $job['request']['url'])) {
+            $resolver = function ($resolve, $reject) use (&$job) {
+                $job['status'] = HttpDownloader::STATUS_QUEUED;
+                $job['resolve'] = $resolve;
+                $job['reject'] = $reject;
         } else {
-            $resolver = function ($resolve, $reject) use (&$job, $rfs, $curl, $origin) {
+            $resolver = function ($resolve, $reject) use (&$job, $rfs) {
                 // start job
                 $url = $job['request']['url'];
                 $options = $job['request']['options'];
@@ -167,11 +154,11 @@ class HttpDownloader
                 $job['status'] = HttpDownloader::STATUS_STARTED;
                 if ($job['request']['copyTo']) {
-                    $result = $rfs->copy($origin, $url, $job['request']['copyTo'], false /* TODO progress */, $options);
+                    $result = $rfs->copy($job['origin'], $url, $job['request']['copyTo'], false /* TODO progress */, $options);
                 } else {
-                    $body = $rfs->getContents($origin, $url, false /* TODO progress */, $options);
+                    $body = $rfs->getContents($job['origin'], $url, false /* TODO progress */, $options);
                     $headers = $rfs->getLastHeaders();
                     $response = new Http\Response($job['request'], $rfs->findStatusCode($headers), $headers, $body);
@@ -180,26 +167,85 @@ class HttpDownloader
+        $downloader = $this;
+        $io = $this->io;
         $canceler = function () {};
         $promise = new Promise($resolver, $canceler);
-        $promise->then(function ($response) use (&$job) {
+        $promise->then(function ($response) use (&$job, $downloader) {
             $job['status'] = HttpDownloader::STATUS_COMPLETED;
             $job['response'] = $response;
-            // TODO look for more jobs to start once we throttle to max X jobs
-        }, function ($e) use ($io, &$job) {
-            // var_dump(__CLASS__ . __LINE__);
-            // var_dump(get_class($e));
-            // var_dump($e->getMessage());
-            // die;
+            // TODO 3.0 this should be done directly on $this when PHP 5.3 is dropped
+            $downloader->markJobDone();
+            $downloader->scheduleNextJob();
+            return $response;
+        }, function ($e) use ($io, &$job, $downloader) {
             $job['status'] = HttpDownloader::STATUS_FAILED;
             $job['exception'] = $e;
+            $downloader->markJobDone();
+            throw $e;
         $this->jobs[$job['id']] =& $job;
+        if ($this->runningJobs < $this->maxJobs) {
+            $this->startJob($job['id']);
+        }
         return array($job, $promise);
+    private function startJob($id)
+    {
+        $job =& $this->jobs[$id];
+        if ($job['status'] !== self::STATUS_QUEUED) {
+            return;
+        }
+        // start job
+        $job['status'] = self::STATUS_STARTED;
+        $this->runningJobs++;
+        $resolve = $job['resolve'];
+        $reject = $job['reject'];
+        $url = $job['request']['url'];
+        $options = $job['request']['options'];
+        $origin = $job['origin'];
+        if ($job['request']['copyTo']) {
+            $this->curl->download($resolve, $reject, $origin, $url, $options, $job['request']['copyTo']);
+        } else {
+            $this->curl->download($resolve, $reject, $origin, $url, $options);
+        }
+    }
+    /**
+     * @private
+     */
+    public function markJobDone()
+    {
+        $this->runningJobs--;
+    }
+    /**
+     * @private
+     */
+    public function scheduleNextJob()
+    {
+        foreach ($this->jobs as $job) {
+            if ($job['status'] === self::STATUS_QUEUED) {
+                $this->startJob($job['id']);
+                if ($this->runningJobs >= $this->maxJobs) {
+                    return;
+                }
+            }
+        }
+    }
     public function wait($index = null, $progress = false)
         while (true) {