Browse Source

Merge branch 'multi_exec'

Daniele Alessandri 15 years ago
parent
commit
77f885c233
3 changed files with 118 additions and 4 deletions
  1. 96 3
      lib/Predis.php
  2. 3 1
      test/PredisShared.php
  3. 19 0
      test/RedisCommandsTest.php

+ 96 - 3
lib/Predis.php

@@ -152,6 +152,11 @@ class Client {
         return $pipelineBlock !== null ? $pipeline->execute($pipelineBlock) : $pipeline;
     }
 
+    public function multiExec(\Closure $multiExecBlock = null) {
+        $multiExec = new MultiExecBlock($this);
+        return $multiExecBlock !== null ? $multiExec->execute($multiExecBlock) : $multiExec;
+    }
+
     public function registerCommands(Array $commands) {
         $this->_serverProfile->registerCommands($commands);
     }
@@ -280,6 +285,7 @@ class Response {
     const NEWLINE = "\r\n";
     const OK      = 'OK';
     const ERROR   = 'ERR';
+    const QUEUED  = 'QUEUED';
     const NULL    = 'nil';
 
     private static $_prefixHandlers;
@@ -289,7 +295,13 @@ class Response {
             // status
             '+' => function($socket) {
                 $status = rtrim(fgets($socket), Response::NEWLINE);
-                return $status === Response::OK ? true : $status;
+                if ($status === Response::OK) {
+                    return true;
+                }
+                else if ($status === Response::QUEUED) {
+                    return new ResponseQueued();
+                }
+                return $status;
             }, 
 
             // error
@@ -372,6 +384,14 @@ class Response {
     }
 }
 
+class ResponseQueued {
+    public $queued = true;
+
+    public function __toString() {
+        return Response::QUEUED;
+    }
+}
+
 class CommandPipeline {
     private $_redisClient, $_pipelineBuffer, $_returnValues, $_running;
 
@@ -445,6 +465,59 @@ class CommandPipeline {
     }
 }
 
+class MultiExecBlock {
+    private $_redisClient, $_commands, $_initialized;
+
+    public function __construct(Client $redisClient) {
+        $this->_initialized = false;
+        $this->_redisClient = $redisClient;
+        $this->_commands    = array();
+    }
+
+    private function initialize() {
+        if ($this->_initialized === false) {
+            $this->_redisClient->multi();
+            $this->_initialized = true;
+        }
+    }
+
+    public function __call($method, $arguments) {
+        $this->initialize();
+        $command = $this->_redisClient->createCommand($method, $arguments);
+        if (isset($this->_redisClient->executeCommand($command)->queued)) {
+            $this->_commands[] = $command;
+        }
+        else {
+            // TODO: ...
+            throw new ClientException('Unexpected condition');
+        }
+    }
+
+    public function execute(\Closure $block = null) {
+        $blockException = null;
+        $returnValues   = array();
+
+        try {
+            if ($block !== null) {
+                $block($this);
+            }
+            $execReply = $this->_redisClient->exec();
+            for ($i = 0; $i < count($execReply); $i++) {
+                $returnValues[] = $this->_commands[$i]->parseResponse($execReply[$i]);
+            }
+        }
+        catch (\Exception $exception) {
+            $blockException = $exception;
+        }
+
+        if ($blockException !== null) {
+            throw $blockException;
+        }
+
+        return $returnValues;
+    }
+}
+
 /* ------------------------------------------------------------------------- */
 
 class ConnectionParameters {
@@ -587,8 +660,8 @@ class Connection implements IConnection {
     public function readResponse(Command $command) {
         $socket   = $this->getSocket();
         $handler  = Response::getPrefixHandler(fgetc($socket));
-        $response = $command->parseResponse($handler($socket));
-        return $response;
+        $response = $handler($socket);
+        return isset($response->queued) ? $response : $command->parseResponse($response);
     }
 
     public function rawCommand($rawCommandData, $closesConnection = false) {
@@ -905,6 +978,16 @@ class RedisServer__V1_2 extends RedisServer__V1_0 {
     }
 }
 
+class RedisServer__Futures extends RedisServer__V1_2 {
+    public function getVersion() { return 0; }
+    public function getSupportedCommands() {
+        return array_merge(parent::getSupportedCommands(), array(
+            'multi'     => '\Predis\Commands\Multi',
+            'exec'      => '\Predis\Commands\Exec'
+        ));
+    }
+}
+
 /* ------------------------------------------------------------------------- */
 
 namespace Predis\Utilities;
@@ -1381,4 +1464,14 @@ class SlaveOf extends \Predis\InlineCommand {
         return count($arguments) === 0 ? array('NO ONE') : $arguments;
     }
 }
+
+class Multi extends \Predis\InlineCommand {
+    public function canBeHashed()  { return false; }
+    public function getCommandId() { return 'MULTI'; }
+}
+
+class Exec extends \Predis\InlineCommand {
+    public function canBeHashed()  { return false; }
+    public function getCommandId() { return 'EXEC'; }
+}
 ?>

+ 3 - 1
test/PredisShared.php

@@ -21,11 +21,13 @@ class RC {
     const EXCEPTION_NO_SUCH_KEY    = 'no such key';
     const EXCEPTION_OUT_OF_RANGE   = 'index out of range';
     const EXCEPTION_INVALID_DB_IDX = 'invalid DB index';
+    const EXCEPTION_EXEC_NO_MULTI  = 'EXEC without MULTI';
 
     private static $_connection;
 
     private static function createConnection() {
-        $connection = new Predis\Client(array('host' => RC::SERVER_HOST, 'port' => RC::SERVER_PORT));
+        $serverProfile = new Predis\RedisServer__Futures();
+        $connection = new Predis\Client(array('host' => RC::SERVER_HOST, 'port' => RC::SERVER_PORT), $serverProfile);
         $connection->connect();
         $connection->selectDatabase(RC::DEFAULT_DATABASE);
         return $connection;

+ 19 - 0
test/RedisCommandsTest.php

@@ -45,6 +45,25 @@ class RedisCommandTestSuite extends PHPUnit_Framework_TestCase {
         $this->assertFalse($this->redis->isConnected());
     }
 
+    function testMultiExec() {
+        // NOTE: due to a limitation in the current implementation of Predis\Client, 
+        //       the replies returned by Predis\Command\Exec are not parsed by their 
+        //       respective Predis\Command::parseResponse methods. If you need that 
+        //       kind of behaviour, you should use an instance of Predis\MultiExecBlock.
+        $this->assertTrue($this->redis->multi());
+        $this->assertType('Predis\ResponseQueued', $this->redis->ping());
+        $this->assertType('Predis\ResponseQueued', $this->redis->echo('hello'));
+        $this->assertType('Predis\ResponseQueued', $this->redis->echo('redis'));
+        $this->assertEquals(array('PONG', 'hello', 'redis'), $this->redis->exec());
+
+        $this->assertTrue($this->redis->multi());
+        $this->assertEquals(array(), $this->redis->exec());
+
+        // should throw an exception when trying to EXEC without having previously issued MULTI
+        RC::testForServerException($this, RC::EXCEPTION_EXEC_NO_MULTI, function($test) {
+            $test->redis->exec();
+        });
+    }
 
     /* commands operating on string values */