|
@@ -13,7 +13,9 @@ class ServerException extends PredisException { // Server-si
|
|
class CommunicationException extends PredisException { // Communication errors
|
|
class CommunicationException extends PredisException { // Communication errors
|
|
private $_connection;
|
|
private $_connection;
|
|
|
|
|
|
- public function __construct(Connection $connection, $message = null, $code = null) {
|
|
|
|
|
|
+ public function __construct(IConnectionSingle $connection,
|
|
|
|
+ $message = null, $code = null) {
|
|
|
|
+
|
|
$this->_connection = $connection;
|
|
$this->_connection = $connection;
|
|
parent::__construct($message, $code);
|
|
parent::__construct($message, $code);
|
|
}
|
|
}
|
|
@@ -152,7 +154,7 @@ class Client {
|
|
}
|
|
}
|
|
|
|
|
|
public function getClientFor($connectionAlias) {
|
|
public function getClientFor($connectionAlias) {
|
|
- if (!($this->_connection instanceof ConnectionCluster)) {
|
|
|
|
|
|
+ if (!($this->_connection instanceof IConnectionCluster)) {
|
|
throw new ClientException(
|
|
throw new ClientException(
|
|
'This method is supported only when the client is connected to a cluster of connections'
|
|
'This method is supported only when the client is connected to a cluster of connections'
|
|
);
|
|
);
|
|
@@ -188,7 +190,7 @@ class Client {
|
|
return $this->_connection;
|
|
return $this->_connection;
|
|
}
|
|
}
|
|
else {
|
|
else {
|
|
- return $this->_connection instanceof ConnectionCluster
|
|
|
|
|
|
+ return $this->_connection instanceof IConnectionCluster
|
|
? $this->_connection->getConnectionById($id)
|
|
? $this->_connection->getConnectionById($id)
|
|
: $this->_connection;
|
|
: $this->_connection;
|
|
}
|
|
}
|
|
@@ -209,7 +211,7 @@ class Client {
|
|
|
|
|
|
public function executeCommandOnShards(Command $command) {
|
|
public function executeCommandOnShards(Command $command) {
|
|
$replies = array();
|
|
$replies = array();
|
|
- if ($this->_connection instanceof \Predis\ConnectionCluster) {
|
|
|
|
|
|
+ if ($this->_connection instanceof \Predis\IConnectionCluster) {
|
|
foreach($this->_connection as $connection) {
|
|
foreach($this->_connection as $connection) {
|
|
$replies[] = $connection->executeCommand($command);
|
|
$replies[] = $connection->executeCommand($command);
|
|
}
|
|
}
|
|
@@ -221,7 +223,7 @@ class Client {
|
|
}
|
|
}
|
|
|
|
|
|
public function rawCommand($rawCommandData, $closesConnection = false) {
|
|
public function rawCommand($rawCommandData, $closesConnection = false) {
|
|
- if ($this->_connection instanceof \Predis\ConnectionCluster) {
|
|
|
|
|
|
+ if ($this->_connection instanceof \Predis\IConnectionCluster) {
|
|
throw new ClientException('Cannot send raw commands when connected to a cluster of Redis servers');
|
|
throw new ClientException('Cannot send raw commands when connected to a cluster of Redis servers');
|
|
}
|
|
}
|
|
return $this->_connection->rawCommand($rawCommandData, $closesConnection);
|
|
return $this->_connection->rawCommand($rawCommandData, $closesConnection);
|
|
@@ -233,7 +235,7 @@ class Client {
|
|
|
|
|
|
public function pipelineSafe($pipelineBlock = null) {
|
|
public function pipelineSafe($pipelineBlock = null) {
|
|
$connection = $this->getConnection();
|
|
$connection = $this->getConnection();
|
|
- $pipeline = new CommandPipeline($this, $connection instanceof Connection
|
|
|
|
|
|
+ $pipeline = new CommandPipeline($this, $connection instanceof IConnectionSingle
|
|
? new Pipeline\SafeExecutor($connection)
|
|
? new Pipeline\SafeExecutor($connection)
|
|
: new Pipeline\SafeClusterExecutor($connection)
|
|
: new Pipeline\SafeClusterExecutor($connection)
|
|
);
|
|
);
|
|
@@ -501,11 +503,11 @@ abstract class MultiBulkCommand extends Command {
|
|
/* ------------------------------------------------------------------------- */
|
|
/* ------------------------------------------------------------------------- */
|
|
|
|
|
|
interface IResponseHandler {
|
|
interface IResponseHandler {
|
|
- function handle(Connection $connection, $payload);
|
|
|
|
|
|
+ function handle(IConnectionSingle $connection, $payload);
|
|
}
|
|
}
|
|
|
|
|
|
class ResponseStatusHandler implements IResponseHandler {
|
|
class ResponseStatusHandler implements IResponseHandler {
|
|
- public function handle(Connection $connection, $status) {
|
|
|
|
|
|
+ public function handle(IConnectionSingle $connection, $status) {
|
|
if ($status === Protocol::OK) {
|
|
if ($status === Protocol::OK) {
|
|
return true;
|
|
return true;
|
|
}
|
|
}
|
|
@@ -517,19 +519,19 @@ class ResponseStatusHandler implements IResponseHandler {
|
|
}
|
|
}
|
|
|
|
|
|
class ResponseErrorHandler implements IResponseHandler {
|
|
class ResponseErrorHandler implements IResponseHandler {
|
|
- public function handle(Connection $connection, $errorMessage) {
|
|
|
|
|
|
+ public function handle(IConnectionSingle $connection, $errorMessage) {
|
|
throw new ServerException(substr($errorMessage, 4));
|
|
throw new ServerException(substr($errorMessage, 4));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
class ResponseErrorSilentHandler implements IResponseHandler {
|
|
class ResponseErrorSilentHandler implements IResponseHandler {
|
|
- public function handle(Connection $connection, $errorMessage) {
|
|
|
|
|
|
+ public function handle(IConnectionSingle $connection, $errorMessage) {
|
|
return new ResponseError(substr($errorMessage, 4));
|
|
return new ResponseError(substr($errorMessage, 4));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
class ResponseBulkHandler implements IResponseHandler {
|
|
class ResponseBulkHandler implements IResponseHandler {
|
|
- public function handle(Connection $connection, $dataLength) {
|
|
|
|
|
|
+ public function handle(IConnectionSingle $connection, $dataLength) {
|
|
if (!is_numeric($dataLength)) {
|
|
if (!is_numeric($dataLength)) {
|
|
Shared\Utils::onCommunicationException(new MalformedServerResponse(
|
|
Shared\Utils::onCommunicationException(new MalformedServerResponse(
|
|
$connection, "Cannot parse '$dataLength' as data length"
|
|
$connection, "Cannot parse '$dataLength' as data length"
|
|
@@ -549,7 +551,7 @@ class ResponseBulkHandler implements IResponseHandler {
|
|
return null;
|
|
return null;
|
|
}
|
|
}
|
|
|
|
|
|
- private static function discardNewLine(Connection $connection) {
|
|
|
|
|
|
+ private static function discardNewLine(IConnectionSingle $connection) {
|
|
if ($connection->readBytes(2) !== Protocol::NEWLINE) {
|
|
if ($connection->readBytes(2) !== Protocol::NEWLINE) {
|
|
Shared\Utils::onCommunicationException(new MalformedServerResponse(
|
|
Shared\Utils::onCommunicationException(new MalformedServerResponse(
|
|
$connection, 'Did not receive a new-line at the end of a bulk response'
|
|
$connection, 'Did not receive a new-line at the end of a bulk response'
|
|
@@ -559,7 +561,7 @@ class ResponseBulkHandler implements IResponseHandler {
|
|
}
|
|
}
|
|
|
|
|
|
class ResponseMultiBulkHandler implements IResponseHandler {
|
|
class ResponseMultiBulkHandler implements IResponseHandler {
|
|
- public function handle(Connection $connection, $rawLength) {
|
|
|
|
|
|
+ public function handle(IConnectionSingle $connection, $rawLength) {
|
|
if (!is_numeric($rawLength)) {
|
|
if (!is_numeric($rawLength)) {
|
|
Shared\Utils::onCommunicationException(new MalformedServerResponse(
|
|
Shared\Utils::onCommunicationException(new MalformedServerResponse(
|
|
$connection, "Cannot parse '$rawLength' as data length"
|
|
$connection, "Cannot parse '$rawLength' as data length"
|
|
@@ -584,7 +586,7 @@ class ResponseMultiBulkHandler implements IResponseHandler {
|
|
}
|
|
}
|
|
|
|
|
|
class ResponseMultiBulkStreamHandler implements IResponseHandler {
|
|
class ResponseMultiBulkStreamHandler implements IResponseHandler {
|
|
- public function handle(Connection $connection, $rawLength) {
|
|
|
|
|
|
+ public function handle(IConnectionSingle $connection, $rawLength) {
|
|
if (!is_numeric($rawLength)) {
|
|
if (!is_numeric($rawLength)) {
|
|
Shared\Utils::onCommunicationException(new MalformedServerResponse(
|
|
Shared\Utils::onCommunicationException(new MalformedServerResponse(
|
|
$connection, "Cannot parse '$rawLength' as data length"
|
|
$connection, "Cannot parse '$rawLength' as data length"
|
|
@@ -595,7 +597,7 @@ class ResponseMultiBulkStreamHandler implements IResponseHandler {
|
|
}
|
|
}
|
|
|
|
|
|
class ResponseIntegerHandler implements IResponseHandler {
|
|
class ResponseIntegerHandler implements IResponseHandler {
|
|
- public function handle(Connection $connection, $number) {
|
|
|
|
|
|
+ public function handle(IConnectionSingle $connection, $number) {
|
|
if (is_numeric($number)) {
|
|
if (is_numeric($number)) {
|
|
return (int) $number;
|
|
return (int) $number;
|
|
}
|
|
}
|
|
@@ -637,7 +639,7 @@ class ResponseReader {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- public function read(Connection $connection) {
|
|
|
|
|
|
+ public function read(IConnectionSingle $connection) {
|
|
$header = $connection->readLine();
|
|
$header = $connection->readLine();
|
|
if ($header === '') {
|
|
if ($header === '') {
|
|
Shared\Utils::onCommunicationException(new MalformedServerResponse(
|
|
Shared\Utils::onCommunicationException(new MalformedServerResponse(
|
|
@@ -1077,7 +1079,11 @@ interface IConnection {
|
|
public function executeCommand(Command $command);
|
|
public function executeCommand(Command $command);
|
|
}
|
|
}
|
|
|
|
|
|
-class Connection implements IConnection {
|
|
|
|
|
|
+interface IConnectionSingle extends IConnection { }
|
|
|
|
+
|
|
|
|
+interface IConnectionCluster extends IConnection { }
|
|
|
|
+
|
|
|
|
+class Connection implements IConnectionSingle {
|
|
private $_params, $_socket, $_initCmds, $_reader;
|
|
private $_params, $_socket, $_initCmds, $_reader;
|
|
|
|
|
|
public function __construct(ConnectionParameters $parameters, ResponseReader $reader = null) {
|
|
public function __construct(ConnectionParameters $parameters, ResponseReader $reader = null) {
|
|
@@ -1245,7 +1251,7 @@ class Connection implements IConnection {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
-class ConnectionCluster implements IConnection, \IteratorAggregate {
|
|
|
|
|
|
+class ConnectionCluster implements IConnectionCluster, \IteratorAggregate {
|
|
private $_pool, $_distributor;
|
|
private $_pool, $_distributor;
|
|
|
|
|
|
public function __construct(Distribution\IDistributionStrategy $distributor = null) {
|
|
public function __construct(Distribution\IDistributionStrategy $distributor = null) {
|
|
@@ -1274,7 +1280,7 @@ class ConnectionCluster implements IConnection, \IteratorAggregate {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- public function add(Connection $connection) {
|
|
|
|
|
|
+ public function add(IConnectionSingle $connection) {
|
|
$parameters = $connection->getParameters();
|
|
$parameters = $connection->getParameters();
|
|
if (isset($parameters->alias)) {
|
|
if (isset($parameters->alias)) {
|
|
$this->_pool[$parameters->alias] = $connection;
|
|
$this->_pool[$parameters->alias] = $connection;
|
|
@@ -2006,7 +2012,7 @@ abstract class MultiBulkResponseIteratorBase implements \Iterator, \Countable {
|
|
class MultiBulkResponseIterator extends MultiBulkResponseIteratorBase {
|
|
class MultiBulkResponseIterator extends MultiBulkResponseIteratorBase {
|
|
private $_connection;
|
|
private $_connection;
|
|
|
|
|
|
- public function __construct(\Predis\Connection $connection, $size) {
|
|
|
|
|
|
+ public function __construct(\Predis\IConnectionSingle $connection, $size) {
|
|
$this->_connection = $connection;
|
|
$this->_connection = $connection;
|
|
$this->_reader = $connection->getResponseReader();
|
|
$this->_reader = $connection->getResponseReader();
|
|
$this->_position = 0;
|
|
$this->_position = 0;
|