Sfoglia il codice sorgente

Add a method to the pipeline to fetch the connection from the client.

In this method we can put the logic needed to prepare the connection
right before sending the queued commands to the server (e.g. switching
to the master server when connected in replication mode).
Daniele Alessandri 11 anni fa
parent
commit
2d39408c0f

+ 33 - 25
lib/Predis/Pipeline/Atomic.php

@@ -13,6 +13,7 @@ namespace Predis\Pipeline;
 
 use SplQueue;
 use Predis\ClientException;
+use Predis\ClientInterface;
 use Predis\Connection\ConnectionInterface;
 use Predis\Connection\SingleConnectionInterface;
 use Predis\Profile;
@@ -25,14 +26,44 @@ use Predis\Response;
  */
 class Atomic extends Pipeline
 {
+    /**
+     * {@inheritdoc}
+     */
+    public function __construct(ClientInterface $client)
+    {
+        if (!$client->getProfile()->supportsCommands(array('multi', 'exec', 'discard'))) {
+            throw new ClientException(
+                'The specified server profile must support MULTI, EXEC and DISCARD.'
+            );
+        }
+
+        parent::__construct($client);
+    }
+
+    /**
+     * {@inheritdoc}
+     */
+    protected function getConnection()
+    {
+        $connection = $this->getClient()->getConnection();
+
+        if (!$connection instanceof SingleConnectionInterface) {
+            $class = __CLASS__;
+
+            throw new ClientException(
+                "$class can be used only with connections to single nodes"
+            );
+        }
+
+        return $connection;
+    }
+
     /**
      * {@inheritdoc}
      */
     protected function executePipeline(ConnectionInterface $connection, SplQueue $commands)
     {
         $profile = $this->getClient()->getProfile();
-        $this->check($connection, $profile);
-
         $connection->executeCommand($profile->createCommand('multi'));
 
         foreach ($commands as $command) {
@@ -84,27 +115,4 @@ class Atomic extends Pipeline
 
         return $responses;
     }
-
-    /**
-     * Verifies all the needed preconditions before executing the pipeline.
-     *
-     * @param ConnectionInterface $connection Connection instance.
-     * @param Profile\ProfileInterface $profile Server profile.
-     */
-    protected function check(ConnectionInterface $connection, Profile\ProfileInterface $profile)
-    {
-        if (!$connection instanceof SingleConnectionInterface) {
-            $class = __CLASS__;
-
-            throw new ClientException(
-                "$class can be used only with connections to single nodes"
-            );
-        }
-
-        if (!$profile->supportsCommands(array('multi', 'exec', 'discard'))) {
-            throw new ClientException(
-                'The specified server profile must support MULTI, EXEC and DISCARD.'
-            );
-        }
-    }
 }

+ 12 - 4
lib/Predis/Pipeline/ConnectionErrorProof.php

@@ -27,15 +27,23 @@ use Predis\Connection\SingleConnectionInterface;
  */
 class ConnectionErrorProof extends Pipeline
 {
+    /**
+     * {@inheritdoc}
+     */
+    protected function getConnection()
+    {
+        return $this->getClient()->getConnection();
+    }
+
     /**
      * {@inheritdoc}
      */
     protected function executePipeline(ConnectionInterface $connection, SplQueue $commands)
     {
         if ($connection instanceof SingleConnectionInterface) {
-            return $this->executePipelineNode($connection, $commands);
+            return $this->executeSingleNode($connection, $commands);
         } else if ($connection instanceof ClusterConnectionInterface) {
-            return $this->executePipelineCluster($connection, $commands);
+            return $this->executeCluster($connection, $commands);
         } else {
             throw new NotSupportedException("Unsupported connection type");
         }
@@ -44,7 +52,7 @@ class ConnectionErrorProof extends Pipeline
     /**
      * {@inheritdoc}
      */
-    public function executePipelineNode(SingleConnectionInterface $connection, SplQueue $commands)
+    public function executeSingleNode(SingleConnectionInterface $connection, SplQueue $commands)
     {
         $responses  = array();
         $sizeOfPipe = count($commands);
@@ -76,7 +84,7 @@ class ConnectionErrorProof extends Pipeline
     /**
      * {@inheritdoc}
      */
-    public function executePipelineCluster(ClusterConnectionInterface $connection, SplQueue $commands)
+    public function executeCluster(ClusterConnectionInterface $connection, SplQueue $commands)
     {
         $responses  = array();
         $sizeOfPipe = count($commands);

+ 0 - 5
lib/Predis/Pipeline/FireAndForget.php

@@ -13,7 +13,6 @@ namespace Predis\Pipeline;
 
 use SplQueue;
 use Predis\Connection\ConnectionInterface;
-use Predis\Connection\ReplicationConnectionInterface;
 
 /**
  * Command pipeline that writes commands to the servers but discards responses.
@@ -27,10 +26,6 @@ class FireAndForget extends Pipeline
      */
     protected function executePipeline(ConnectionInterface $connection, SplQueue $commands)
     {
-        if ($connection instanceof ReplicationConnectionInterface) {
-            $connection->switchTo('master');
-        }
-
         while (!$commands->isEmpty()) {
             $connection->writeCommand($commands->dequeue());
         }

+ 17 - 7
lib/Predis/Pipeline/Pipeline.php

@@ -95,6 +95,22 @@ class Pipeline implements BasicClientInterface, ExecutableContextInterface
         throw new Response\ServerException($message);
     }
 
+    /**
+     * Returns the underlying connection to be used by the pipeline.
+     *
+     * @return ConnectionInterface
+     */
+    protected function getConnection()
+    {
+        $connection = $this->getClient()->getConnection();
+
+        if ($connection instanceof ReplicationConnectionInterface) {
+            $connection->switchTo('master');
+        }
+
+        return $connection;
+    }
+
     /**
      * Implements the logic to flush the queued commands and read the responses
      * from the current connection.
@@ -105,10 +121,6 @@ class Pipeline implements BasicClientInterface, ExecutableContextInterface
      */
     protected function executePipeline(ConnectionInterface $connection, SplQueue $commands)
     {
-        if ($connection instanceof ReplicationConnectionInterface) {
-            $connection->switchTo('master');
-        }
-
         foreach ($commands as $command) {
             $connection->writeCommand($command);
         }
@@ -141,9 +153,7 @@ class Pipeline implements BasicClientInterface, ExecutableContextInterface
     public function flushPipeline($send = true)
     {
         if ($send && !$this->pipeline->isEmpty()) {
-            $connection = $this->client->getConnection();
-            $responses = $this->executePipeline($connection, $this->pipeline);
-
+            $responses = $this->executePipeline($this->getConnection(), $this->pipeline);
             $this->responses = array_merge($this->responses, $responses);
         } else {
             $this->pipeline = new SplQueue();