|
@@ -7,7 +7,7 @@ class Predis_ClientException extends PredisException { }
|
|
|
// Server-side errors
|
|
|
class Predis_ServerException extends PredisException {
|
|
|
public function toResponseError() {
|
|
|
- return new ResponseError($this->getMessage());
|
|
|
+ return new Predis_ResponseError($this->getMessage());
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -154,6 +154,26 @@ class Predis_Client {
|
|
|
return $this->_responseReader;
|
|
|
}
|
|
|
|
|
|
+ public function getClientFor($connectionAlias) {
|
|
|
+ if (!($this->_connection instanceof Predis_ConnectionCluster)) {
|
|
|
+ throw new Predis_ClientException(
|
|
|
+ 'This method is supported only when the client is connected to a cluster of connections.'
|
|
|
+ );
|
|
|
+ }
|
|
|
+
|
|
|
+ $connection = $this->_connection->getConnectionById($connectionAlias);
|
|
|
+ if ($connection === null) {
|
|
|
+ throw new InvalidArgumentException(
|
|
|
+ "Invalid connection alias: '$connectionAlias'"
|
|
|
+ );
|
|
|
+ }
|
|
|
+
|
|
|
+ $newClient = new Predis_Client();
|
|
|
+ $newClient->setupClient($this->_options);
|
|
|
+ $newClient->setConnection($this->getConnection($connectionAlias));
|
|
|
+ return $newClient;
|
|
|
+ }
|
|
|
+
|
|
|
public function connect() {
|
|
|
$this->_connection->connect();
|
|
|
}
|
|
@@ -186,11 +206,11 @@ class Predis_Client {
|
|
|
return $this->_serverProfile->createCommand($method, $arguments);
|
|
|
}
|
|
|
|
|
|
- public function executeCommand(Command $command) {
|
|
|
+ public function executeCommand(Predis_Command $command) {
|
|
|
return $this->_connection->executeCommand($command);
|
|
|
}
|
|
|
|
|
|
- public function executeCommandOnShards(Command $command) {
|
|
|
+ public function executeCommandOnShards(Predis_Command $command) {
|
|
|
$replies = array();
|
|
|
if ($this->_connection instanceof Predis_ConnectionCluster) {
|
|
|
foreach($this->_connection as $connection) {
|
|
@@ -211,8 +231,20 @@ class Predis_Client {
|
|
|
}
|
|
|
|
|
|
public function pipeline($pipelineBlock = null) {
|
|
|
- $pipeline = new Predis_CommandPipeline($this);
|
|
|
- return $pipelineBlock !== null ? $pipeline->execute($pipelineBlock) : $pipeline;
|
|
|
+ return $this->pipelineExecute(new Predis_CommandPipeline($this), $pipelineBlock);
|
|
|
+ }
|
|
|
+
|
|
|
+ public function pipelineSafe($pipelineBlock = null) {
|
|
|
+ $connection = $this->getConnection();
|
|
|
+ $pipeline = new Predis_CommandPipeline($this, $connection instanceof Predis_Connection
|
|
|
+ ? new Predis_Pipeline_SafeExecutor($connection)
|
|
|
+ : new Predis_Pipeline_SafeClusterExecutor($connection)
|
|
|
+ );
|
|
|
+ return $this->pipelineExecute($pipeline, $pipelineBlock);
|
|
|
+ }
|
|
|
+
|
|
|
+ private function pipelineExecute(Predis_CommandPipeline $pipeline, $block) {
|
|
|
+ return $block !== null ? $pipeline->execute($block) : $pipeline;
|
|
|
}
|
|
|
|
|
|
public function multiExec($multiExecBlock = null) {
|
|
@@ -250,12 +282,12 @@ class Predis_ClientOptionsProfile implements Predis_IClientOptionsHandler {
|
|
|
|
|
|
class Predis_ClientOptionsKeyDistribution implements Predis_IClientOptionsHandler {
|
|
|
public function validate($option, $value) {
|
|
|
- if ($value instanceof Predis_Distribution_IDistributionAlgorithm) {
|
|
|
+ if ($value instanceof Predis_Distribution_IDistributionStrategy) {
|
|
|
return $value;
|
|
|
}
|
|
|
if (is_string($value)) {
|
|
|
$valueReflection = new ReflectionClass($value);
|
|
|
- if ($valueReflection->isSubclassOf('Predis_Distribution_IDistributionAlgorithm')) {
|
|
|
+ if ($valueReflection->isSubclassOf('Predis_Distribution_IDistributionStrategy')) {
|
|
|
return new $value;
|
|
|
}
|
|
|
}
|
|
@@ -360,12 +392,14 @@ abstract class Predis_Command {
|
|
|
return true;
|
|
|
}
|
|
|
|
|
|
- public function getHash(Predis_Distribution_IDistributionAlgorithm $distributor) {
|
|
|
+ public function getHash(Predis_Distribution_IDistributionStrategy $distributor) {
|
|
|
if (isset($this->_hash)) {
|
|
|
return $this->_hash;
|
|
|
}
|
|
|
else {
|
|
|
if (isset($this->_arguments[0])) {
|
|
|
+ // TODO: should we throw an exception if the command does
|
|
|
+ // not support sharding?
|
|
|
$key = $this->_arguments[0];
|
|
|
|
|
|
$start = strpos($key, '{');
|
|
@@ -422,7 +456,10 @@ abstract class Predis_InlineCommand extends Predis_Command {
|
|
|
if (isset($arguments[0]) && is_array($arguments[0])) {
|
|
|
$arguments[0] = implode($arguments[0], ' ');
|
|
|
}
|
|
|
- return $command . ' ' . implode($arguments, ' ') . Predis_Protocol::NEWLINE;
|
|
|
+ return $command . (count($arguments) > 0
|
|
|
+ ? ' ' . implode($arguments, ' ') . Predis_Protocol::NEWLINE
|
|
|
+ : Predis_Protocol::NEWLINE
|
|
|
+ );
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -661,10 +698,11 @@ class Predis_ResponseQueued {
|
|
|
/* ------------------------------------------------------------------------- */
|
|
|
|
|
|
class Predis_CommandPipeline {
|
|
|
- private $_redisClient, $_pipelineBuffer, $_returnValues, $_running;
|
|
|
+ private $_redisClient, $_pipelineBuffer, $_returnValues, $_running, $_executor;
|
|
|
|
|
|
- public function __construct(Predis_Client $redisClient) {
|
|
|
+ public function __construct(Predis_Client $redisClient, Predis_Pipeline_IPipelineExecutor $executor = null) {
|
|
|
$this->_redisClient = $redisClient;
|
|
|
+ $this->_executor = $executor !== null ? $executor : new Predis_Pipeline_StandardExecutor();
|
|
|
$this->_pipelineBuffer = array();
|
|
|
$this->_returnValues = array();
|
|
|
}
|
|
@@ -684,27 +722,14 @@ class Predis_CommandPipeline {
|
|
|
}
|
|
|
|
|
|
public function flushPipeline() {
|
|
|
- $sizeofPipe = count($this->_pipelineBuffer);
|
|
|
- if ($sizeofPipe === 0) {
|
|
|
- return;
|
|
|
- }
|
|
|
-
|
|
|
- $connection = $this->_redisClient->getConnection();
|
|
|
- $commands = &$this->_pipelineBuffer;
|
|
|
-
|
|
|
- foreach ($commands as $command) {
|
|
|
- $connection->writeCommand($command);
|
|
|
- }
|
|
|
- for ($i = 0; $i < $sizeofPipe; $i++) {
|
|
|
- $response = $connection->readResponse($commands[$i]);
|
|
|
- $this->_returnValues[] = ($response instanceof Iterator
|
|
|
- ? iterator_to_array($response)
|
|
|
- : $response
|
|
|
+ if (count($this->_pipelineBuffer) > 0) {
|
|
|
+ $connection = $this->_redisClient->getConnection();
|
|
|
+ $this->_returnValues = array_merge(
|
|
|
+ $this->_returnValues,
|
|
|
+ $this->_executor->execute($connection, $this->_pipelineBuffer)
|
|
|
);
|
|
|
- unset($commands[$i]);
|
|
|
+ $this->_pipelineBuffer = array();
|
|
|
}
|
|
|
- $this->_pipelineBuffer = array();
|
|
|
-
|
|
|
return $this;
|
|
|
}
|
|
|
|
|
@@ -712,7 +737,6 @@ class Predis_CommandPipeline {
|
|
|
if ($bool == true && $this->_running == true) {
|
|
|
throw new Predis_ClientException("This pipeline is already opened");
|
|
|
}
|
|
|
-
|
|
|
$this->_running = $bool;
|
|
|
}
|
|
|
|
|
@@ -732,7 +756,6 @@ class Predis_CommandPipeline {
|
|
|
$this->flushPipeline();
|
|
|
}
|
|
|
catch (Exception $exception) {
|
|
|
- // TODO: client/server desync on ServerException
|
|
|
$pipelineBlockException = $exception;
|
|
|
}
|
|
|
|
|
@@ -1223,7 +1246,7 @@ class Predis_Connection implements Predis_IConnection {
|
|
|
class Predis_ConnectionCluster implements Predis_IConnection, IteratorAggregate {
|
|
|
private $_pool, $_distributor;
|
|
|
|
|
|
- public function __construct(Predis_Distribution_IDistributionAlgorithm $distributor = null) {
|
|
|
+ public function __construct(Predis_Distribution_IDistributionStrategy $distributor = null) {
|
|
|
$this->_pool = array();
|
|
|
$this->_distributor = $distributor !== null ? $distributor : new Predis_Distribution_HashRing();
|
|
|
}
|
|
@@ -1270,7 +1293,8 @@ class Predis_ConnectionCluster implements Predis_IConnection, IteratorAggregate
|
|
|
}
|
|
|
|
|
|
public function getConnectionById($id = null) {
|
|
|
- return $this->_pool[$id === null ? 0 : $id];
|
|
|
+ $alias = $id !== null ? $id : 0;
|
|
|
+ return isset($this->_pool[$alias]) ? $this->_pool[$alias] : null;
|
|
|
}
|
|
|
|
|
|
public function getIterator() {
|
|
@@ -1644,7 +1668,133 @@ class Predis_RedisServer_vNext extends Predis_RedisServer_v2_0 {
|
|
|
|
|
|
/* ------------------------------------------------------------------------- */
|
|
|
|
|
|
-interface Predis_Distribution_IDistributionAlgorithm {
|
|
|
+interface Predis_Pipeline_IPipelineExecutor {
|
|
|
+ public function execute(Predis_IConnection $connection, &$commands);
|
|
|
+}
|
|
|
+
|
|
|
+class Predis_Pipeline_StandardExecutor implements Predis_Pipeline_IPipelineExecutor {
|
|
|
+ public function execute(Predis_IConnection $connection, &$commands) {
|
|
|
+ $sizeofPipe = count($commands);
|
|
|
+ $values = array();
|
|
|
+
|
|
|
+ foreach ($commands as $command) {
|
|
|
+ $connection->writeCommand($command);
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ for ($i = 0; $i < $sizeofPipe; $i++) {
|
|
|
+ $response = $connection->readResponse($commands[$i]);
|
|
|
+ $values[] = $response instanceof Iterator
|
|
|
+ ? iterator_to_array($response)
|
|
|
+ : $response;
|
|
|
+ unset($commands[$i]);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ catch (Predis_ServerException $exception) {
|
|
|
+ // force disconnection to prevent protocol desynchronization
|
|
|
+ $connection->disconnect();
|
|
|
+ throw $exception;
|
|
|
+ }
|
|
|
+
|
|
|
+ return $values;
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+class Predis_Pipeline_SafeExecutor implements Predis_Pipeline_IPipelineExecutor {
|
|
|
+ public function execute(Predis_IConnection $connection, &$commands) {
|
|
|
+ $firstServerException = null;
|
|
|
+ $sizeofPipe = count($commands);
|
|
|
+ $values = array();
|
|
|
+
|
|
|
+ foreach ($commands as $command) {
|
|
|
+ try {
|
|
|
+ $connection->writeCommand($command);
|
|
|
+ }
|
|
|
+ catch (Predis_CommunicationException $exception) {
|
|
|
+ return array_fill(0, $sizeofPipe, $exception);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ for ($i = 0; $i < $sizeofPipe; $i++) {
|
|
|
+ $command = $commands[$i];
|
|
|
+ unset($commands[$i]);
|
|
|
+ try {
|
|
|
+ $response = $connection->readResponse($command);
|
|
|
+ $values[] = ($response instanceof Iterator
|
|
|
+ ? iterator_to_array($response)
|
|
|
+ : $response
|
|
|
+ );
|
|
|
+ }
|
|
|
+ catch (Predis_ServerException $exception) {
|
|
|
+ $values[] = $exception->toResponseError();
|
|
|
+ }
|
|
|
+ catch (Predis_CommunicationException $exception) {
|
|
|
+ if ($throwExceptions) {
|
|
|
+ throw $exception;
|
|
|
+ }
|
|
|
+ $toAdd = count($commands) - count($values);
|
|
|
+ $values = array_merge($values, array_fill(0, $toAdd, $exception));
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return $values;
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+class Predis_Pipeline_SafeClusterExecutor implements Predis_Pipeline_IPipelineExecutor {
|
|
|
+ public function execute(Predis_IConnection $connection, &$commands) {
|
|
|
+ $connectionExceptions = array();
|
|
|
+ $sizeofPipe = count($commands);
|
|
|
+ $values = array();
|
|
|
+
|
|
|
+ foreach ($commands as $command) {
|
|
|
+ $cmdConnection = $connection->getConnection($command);
|
|
|
+ if (isset($connectionExceptions[spl_object_hash($cmdConnection)])) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ $cmdConnection->writeCommand($command);
|
|
|
+ }
|
|
|
+ catch (Predis_CommunicationException $exception) {
|
|
|
+ $connectionExceptions[spl_object_hash($cmdConnection)] = $exception;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ for ($i = 0; $i < $sizeofPipe; $i++) {
|
|
|
+ $command = $commands[$i];
|
|
|
+ unset($commands[$i]);
|
|
|
+
|
|
|
+ $cmdConnection = $connection->getConnection($command);
|
|
|
+ $connectionObjectHash = spl_object_hash($cmdConnection);
|
|
|
+
|
|
|
+ if (isset($connectionExceptions[$connectionObjectHash])) {
|
|
|
+ $values[] = $connectionExceptions[$connectionObjectHash];
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ try {
|
|
|
+ $response = $cmdConnection->readResponse($command);
|
|
|
+ $values[] = ($response instanceof Iterator
|
|
|
+ ? iterator_to_array($response)
|
|
|
+ : $response
|
|
|
+ );
|
|
|
+ }
|
|
|
+ catch (Predis_ServerException $exception) {
|
|
|
+ $values[] = $exception->toResponseError();
|
|
|
+ }
|
|
|
+ catch (Predis_CommunicationException $exception) {
|
|
|
+ $values[] = $exception;
|
|
|
+ $connectionExceptions[$connectionObjectHash] = $exception;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return $values;
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+/* ------------------------------------------------------------------------- */
|
|
|
+
|
|
|
+interface Predis_Distribution_IDistributionStrategy {
|
|
|
public function add($node, $weight = null);
|
|
|
public function remove($node);
|
|
|
public function get($key);
|
|
@@ -1653,7 +1803,7 @@ interface Predis_Distribution_IDistributionAlgorithm {
|
|
|
|
|
|
class Predis_Distribution_EmptyRingException extends Exception { }
|
|
|
|
|
|
-class Predis_Distribution_HashRing implements Predis_Distribution_IDistributionAlgorithm {
|
|
|
+class Predis_Distribution_HashRing implements Predis_Distribution_IDistributionStrategy {
|
|
|
const DEFAULT_REPLICAS = 128;
|
|
|
const DEFAULT_WEIGHT = 100;
|
|
|
private $_nodes, $_ring, $_ringKeys, $_ringKeysCount, $_replicas;
|