Browse Source

Add option "aggregate" to customize multiple connections aggregation.

This option must return a callable object that is used to override how
the client aggregates connections when passing an array of parameters
to its constructor.

When specified, this option overrides both "cluster" and "replication"
as it allows to make use of your own code to aggregate multiple nodes.

This is, for example, how you can mimic the standard initialization of
a cluster that relies on client-side sharding:

  $parameters = ['tcp://127.0.0.1:6380', 'tcp://127.0.0.1:6381'];

  $options = [
    'aggregate' => function () {
      return function ($parameters, $options) {
          $connection = new Predis\Connection\PredisCluster();
          $options->connections->aggregate($connection, $parameters);

          return $connection;
      };
    },
  ];

  $client = new Predis\Client($parameters, $options);

When invoked by the client, the specified callable must always return
a Predis\Connection\ConnectionInterface instance or the client will
throw an UnexpectedValueException.
Daniele Alessandri 11 years ago
parent
commit
a1c7889584
2 changed files with 92 additions and 17 deletions
  1. 37 14
      lib/Predis/Client.php
  2. 55 3
      tests/Predis/ClientTest.php

+ 37 - 14
lib/Predis/Client.php

@@ -12,13 +12,13 @@
 namespace Predis;
 
 use InvalidArgumentException;
+use UnexpectedValueException;
 use Predis\Command\CommandInterface;
 use Predis\Command\ScriptedCommand;
 use Predis\Configuration\Options;
 use Predis\Configuration\OptionsInterface;
 use Predis\Connection\AggregatedConnectionInterface;
 use Predis\Connection\ConnectionInterface;
-use Predis\Connection\ConnectionFactoryInterface;
 use Predis\Connection\ConnectionParametersInterface;
 use Predis\Monitor\MonitorContext;
 use Predis\Pipeline\PipelineContext;
@@ -102,28 +102,31 @@ class Client implements ClientInterface
         }
 
         if (is_array($parameters)) {
+            if (!isset($parameters[0])) {
+                return $this->options->connections->create($parameters);
+            }
+
             $options = $this->options;
 
-            if (isset($parameters[0])) {
-                $replication = isset($options->replication) && $options->replication;
-                $connection = $options->{$replication ? 'replication' : 'cluster'};
+            if ($options->defined('aggregate')) {
+                $initializer = $this->getConnectionInitializerWrapper($options->aggregate);
+                $connection = $initializer($parameters, $options);
+            } else {
+                if ($options->defined('replication') && $replication = $options->replication) {
+                    $connection = $replication;
+                } else {
+                    $connection = $options->cluster;
+                }
 
                 $options->connections->aggregate($connection, $parameters);
-
-                return $connection;
             }
 
-            return $options->connections->create($parameters);
+            return $connection;
         }
 
         if (is_callable($parameters)) {
-            $connection = call_user_func($parameters, $this->options);
-
-            if (!$connection instanceof ConnectionInterface) {
-                throw new InvalidArgumentException(
-                    'Callable parameters must return instances of Predis\Connection\ConnectionInterface'
-                );
-            }
+            $initializer = $this->getConnectionInitializerWrapper($parameters);
+            $connection = $initializer($this->options);
 
             return $connection;
         }
@@ -131,6 +134,26 @@ class Client implements ClientInterface
         throw new InvalidArgumentException('Invalid type for connection parameters');
     }
 
+    /**
+     * Wraps a callable to make sure that its returned value represents a valid
+     * connection type.
+     *
+     * @param mixed $callable
+     * @return mixed
+     */
+    protected function getConnectionInitializerWrapper($callable)
+    {
+        return function () use ($callable) {
+            $connection = call_user_func_array($callable, func_get_args());
+
+            if (!$connection instanceof ConnectionInterface) {
+                throw new UnexpectedValueException('The callable connection initializer returned an invalid type');
+            }
+
+            return $connection;
+        };
+    }
+
     /**
      * {@inheritdoc}
      */

+ 55 - 3
tests/Predis/ClientTest.php

@@ -217,10 +217,10 @@ class ClientTest extends StandardTestCase
 
     /**
      * @group disconnected
-     * @expectedException InvalidArgumentException
-     * @expectedExceptionMessage Callable parameters must return instances of Predis\Connection\ConnectionInterface
+     * @expectedException UnexpectedValueException
+     * @expectedExceptionMessage The callable connection initializer returned an invalid type
      */
-    public function testConstructorWithCallableArgumentButInvalidReturnType()
+    public function testConstructorWithCallableConnectionInitializerThrowsExceptionOnInvalidReturnType()
     {
         $wrongType = $this->getMock('stdClass');
 
@@ -263,6 +263,58 @@ class ClientTest extends StandardTestCase
         $this->assertSame('host2', $connection->getConnectionById('slave')->getParameters()->host);
     }
 
+    /**
+     * @group disconnected
+     */
+    public function testConstructorWithArrayAndOptionAggregateArgument()
+    {
+        $arg1 = array('tcp://host1', 'tcp://host2');
+
+        $connection = $this->getMock('Predis\Connection\ConnectionInterface');
+
+        $fnaggregate = $this->getMock('stdClass', array('__invoke'));
+        $fnaggregate->expects($this->once())
+                    ->method('__invoke')
+                    ->with($arg1)
+                    ->will($this->returnValue($connection));
+
+        $fncluster = $this->getMock('stdClass', array('__invoke'));
+        $fncluster->expects($this->never())->method('__invoke');
+
+        $fnreplication = $this->getMock('stdClass', array('__invoke'));
+        $fnreplication->expects($this->never())->method('__invoke');
+
+        $arg2 = array(
+            'aggregate'   => function () use ($fnaggregate) { return $fnaggregate; },
+            'cluster'     => function () use ($fncluster) { return $fncluster; },
+            'replication' => function () use ($fnreplication) { return $fncluster; },
+        );
+
+        $client = new Client($arg1, $arg2);
+
+        $this->assertSame($connection, $client->getConnection());
+    }
+
+    /**
+     * @group disconnected
+     * @expectedException UnexpectedValueException
+     * @expectedExceptionMessage The callable connection initializer returned an invalid type
+     */
+    public function testConstructorWithArrayAndOptionAggregateArgumentThrowExceptionOnInvalidReturnType()
+    {
+        $arg1 = array('tcp://host1', 'tcp://host2');
+
+        $fnaggregate = $this->getMock('stdClass', array('__invoke'));
+        $fnaggregate->expects($this->once())
+                    ->method('__invoke')
+                    ->with($arg1)
+                    ->will($this->returnValue(false));
+
+        $arg2 = array('aggregate' => function () use ($fnaggregate) { return $fnaggregate; });
+
+        $client = new Client($arg1, $arg2);
+    }
+
     /**
      * @group disconnected
      */