|
@@ -113,7 +113,9 @@ class Client {
|
|
|
}
|
|
|
|
|
|
private function createConnection($parameters) {
|
|
|
- $params = new ConnectionParameters($parameters);
|
|
|
+ $params = $parameters instanceof ConnectionParameters
|
|
|
+ ? $parameters
|
|
|
+ : new ConnectionParameters($parameters);
|
|
|
$connection = Connection::create($params, $this->_responseReader);
|
|
|
|
|
|
if ($params->password !== null) {
|
|
@@ -155,7 +157,7 @@ class Client {
|
|
|
}
|
|
|
|
|
|
public function getClientFor($connectionAlias) {
|
|
|
- if (!($this->_connection instanceof IConnectionCluster)) {
|
|
|
+ if (!Shared\Utils::isCluster($this->_connection)) {
|
|
|
throw new ClientException(
|
|
|
'This method is supported only when the client is connected to a cluster of connections'
|
|
|
);
|
|
@@ -191,7 +193,7 @@ class Client {
|
|
|
return $this->_connection;
|
|
|
}
|
|
|
else {
|
|
|
- return $this->_connection instanceof IConnectionCluster
|
|
|
+ return Shared\Utils::isCluster($this->_connection)
|
|
|
? $this->_connection->getConnectionById($id)
|
|
|
: $this->_connection;
|
|
|
}
|
|
@@ -212,7 +214,7 @@ class Client {
|
|
|
|
|
|
public function executeCommandOnShards(Command $command) {
|
|
|
$replies = array();
|
|
|
- if ($this->_connection instanceof \Predis\IConnectionCluster) {
|
|
|
+ if (Shared\Utils::isCluster($this->_connection)) {
|
|
|
foreach($this->_connection as $connection) {
|
|
|
$replies[] = $connection->executeCommand($command);
|
|
|
}
|
|
@@ -224,22 +226,50 @@ class Client {
|
|
|
}
|
|
|
|
|
|
public function rawCommand($rawCommandData, $closesConnection = false) {
|
|
|
- if ($this->_connection instanceof \Predis\IConnectionCluster) {
|
|
|
+ if (Shared\Utils::isCluster($this->_connection)) {
|
|
|
throw new ClientException('Cannot send raw commands when connected to a cluster of Redis servers');
|
|
|
}
|
|
|
return $this->_connection->rawCommand($rawCommandData, $closesConnection);
|
|
|
}
|
|
|
|
|
|
- public function pipeline($pipelineBlock = null) {
|
|
|
- return $this->pipelineExecute(new CommandPipeline($this), $pipelineBlock);
|
|
|
+ public function pipeline(/* arguments */) {
|
|
|
+ $argv = func_get_args();
|
|
|
+ $argc = func_num_args();
|
|
|
+
|
|
|
+ if ($argc === 0) {
|
|
|
+ return $this->initPipeline();
|
|
|
+ }
|
|
|
+ else if ($argc === 1) {
|
|
|
+ list($arg0) = $argv;
|
|
|
+ return is_array($arg0) ? $this->initPipeline($arg0) : $this->initPipeline(null, $arg0);
|
|
|
+ }
|
|
|
+ else if ($argc === 2) {
|
|
|
+ list($arg0, $arg1) = $argv;
|
|
|
+ return $this->initPipeline($arg0, $arg1);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
public function pipelineSafe($pipelineBlock = null) {
|
|
|
- $connection = $this->getConnection();
|
|
|
- $pipeline = new CommandPipeline($this, $connection instanceof IConnectionSingle
|
|
|
- ? new Pipeline\SafeExecutor($connection)
|
|
|
- : new Pipeline\SafeClusterExecutor($connection)
|
|
|
- );
|
|
|
+ return $this->initPipeline(array('safe' => true), $pipelineBlock);
|
|
|
+ }
|
|
|
+
|
|
|
+ private function initPipeline(Array $options = null, $pipelineBlock = null) {
|
|
|
+ $pipeline = null;
|
|
|
+ if (isset($options)) {
|
|
|
+ if (isset($options['safe']) && $options['safe'] == true) {
|
|
|
+ $connection = $this->getConnection();
|
|
|
+ $pipeline = new CommandPipeline($this, $connection instanceof Connection
|
|
|
+ ? new Pipeline\SafeExecutor($connection)
|
|
|
+ : new Pipeline\SafeClusterExecutor($connection)
|
|
|
+ );
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ $pipeline = new CommandPipeline($this);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ $pipeline = new CommandPipeline($this);
|
|
|
+ }
|
|
|
return $this->pipelineExecute($pipeline, $pipelineBlock);
|
|
|
}
|
|
|
|
|
@@ -802,6 +832,11 @@ class MultiExecBlock {
|
|
|
}
|
|
|
|
|
|
private function checkCapabilities(Client $redisClient) {
|
|
|
+ if (Shared\Utils::isCluster($redisClient->getConnection())) {
|
|
|
+ throw new \Predis\ClientException(
|
|
|
+ 'Cannot initialize a MULTI/EXEC context over a cluster of connections'
|
|
|
+ );
|
|
|
+ }
|
|
|
$profile = $redisClient->getProfile();
|
|
|
if ($profile->supportsCommands(array('multi', 'exec', 'discard')) === false) {
|
|
|
throw new \Predis\ClientException(
|
|
@@ -962,13 +997,16 @@ class PubSubContext implements \Iterator {
|
|
|
const MESSAGE = 'message';
|
|
|
const PMESSAGE = 'pmessage';
|
|
|
|
|
|
+ const STATUS_VALID = 0x0001;
|
|
|
+ const STATUS_SUBSCRIBED = 0x0010;
|
|
|
+ const STATUS_PSUBSCRIBED = 0x0100;
|
|
|
+
|
|
|
private $_redisClient, $_subscriptions, $_isStillValid, $_position;
|
|
|
|
|
|
public function __construct(Client $redisClient) {
|
|
|
$this->checkCapabilities($redisClient);
|
|
|
- $this->_redisClient = $redisClient;
|
|
|
- $this->_isStillValid = true;
|
|
|
- $this->_subscriptions = false;
|
|
|
+ $this->_redisClient = $redisClient;
|
|
|
+ $this->_statusFlags = self::STATUS_VALID;
|
|
|
}
|
|
|
|
|
|
public function __destruct() {
|
|
@@ -979,6 +1017,11 @@ class PubSubContext implements \Iterator {
|
|
|
}
|
|
|
|
|
|
private function checkCapabilities(Client $redisClient) {
|
|
|
+ if (Shared\Utils::isCluster($redisClient->getConnection())) {
|
|
|
+ throw new \Predis\ClientException(
|
|
|
+ 'Cannot initialize a PUB/SUB context over a cluster of connections'
|
|
|
+ );
|
|
|
+ }
|
|
|
$profile = $redisClient->getProfile();
|
|
|
$commands = array('publish', 'subscribe', 'unsubscribe', 'psubscribe', 'punsubscribe');
|
|
|
if ($profile->supportsCommands($commands) === false) {
|
|
@@ -988,9 +1031,13 @@ class PubSubContext implements \Iterator {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private function isFlagSet($value) {
|
|
|
+ return ($this->_statusFlags & $value) === $value;
|
|
|
+ }
|
|
|
+
|
|
|
public function subscribe(/* arguments */) {
|
|
|
$this->writeCommand(self::SUBSCRIBE, func_get_args());
|
|
|
- $this->_subscriptions = true;
|
|
|
+ $this->_statusFlags |= self::STATUS_SUBSCRIBED;
|
|
|
}
|
|
|
|
|
|
public function unsubscribe(/* arguments */) {
|
|
@@ -999,7 +1046,7 @@ class PubSubContext implements \Iterator {
|
|
|
|
|
|
public function psubscribe(/* arguments */) {
|
|
|
$this->writeCommand(self::PSUBSCRIBE, func_get_args());
|
|
|
- $this->_subscriptions = true;
|
|
|
+ $this->_statusFlags |= self::STATUS_PSUBSCRIBED;
|
|
|
}
|
|
|
|
|
|
public function punsubscribe(/* arguments */) {
|
|
@@ -1008,10 +1055,12 @@ class PubSubContext implements \Iterator {
|
|
|
|
|
|
public function closeContext() {
|
|
|
if ($this->valid()) {
|
|
|
- // TODO: as an optimization, we should not send both
|
|
|
- // commands if one of them has not been issued.
|
|
|
- $this->unsubscribe();
|
|
|
- $this->punsubscribe();
|
|
|
+ if ($this->isFlagSet(self::STATUS_SUBSCRIBED)) {
|
|
|
+ $this->unsubscribe();
|
|
|
+ }
|
|
|
+ if ($this->isFlagSet(self::STATUS_PSUBSCRIBED)) {
|
|
|
+ $this->punsubscribe();
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -1036,19 +1085,20 @@ class PubSubContext implements \Iterator {
|
|
|
}
|
|
|
|
|
|
public function next() {
|
|
|
- if ($this->_isStillValid) {
|
|
|
+ if ($this->isFlagSet(self::STATUS_VALID)) {
|
|
|
$this->_position++;
|
|
|
}
|
|
|
return $this->_position;
|
|
|
}
|
|
|
|
|
|
public function valid() {
|
|
|
- return $this->_subscriptions && $this->_isStillValid;
|
|
|
+ $subscriptions = self::STATUS_SUBSCRIBED + self::STATUS_PSUBSCRIBED;
|
|
|
+ return $this->isFlagSet(self::STATUS_VALID)
|
|
|
+ && ($this->_statusFlags & $subscriptions) > 0;
|
|
|
}
|
|
|
|
|
|
private function invalidate() {
|
|
|
- $this->_isStillValid = false;
|
|
|
- $this->_subscriptions = false;
|
|
|
+ $this->_statusFlags = 0x0000;
|
|
|
}
|
|
|
|
|
|
private function getValue() {
|
|
@@ -2131,6 +2181,10 @@ class KetamaPureRing extends HashRing {
|
|
|
namespace Predis\Shared;
|
|
|
|
|
|
class Utils {
|
|
|
+ public static function isCluster(\Predis\IConnection $connection) {
|
|
|
+ return $connection instanceof \Predis\ConnectionCluster;
|
|
|
+ }
|
|
|
+
|
|
|
public static function onCommunicationException(\Predis\CommunicationException $exception) {
|
|
|
if ($exception->shouldResetConnection()) {
|
|
|
$connection = $exception->getConnection();
|