|
@@ -208,11 +208,11 @@ class Client {
|
|
return $this->_serverProfile->createCommand($method, $arguments);
|
|
return $this->_serverProfile->createCommand($method, $arguments);
|
|
}
|
|
}
|
|
|
|
|
|
- public function executeCommand(Command $command) {
|
|
|
|
|
|
+ public function executeCommand(ICommand $command) {
|
|
return $this->_connection->executeCommand($command);
|
|
return $this->_connection->executeCommand($command);
|
|
}
|
|
}
|
|
|
|
|
|
- public function executeCommandOnShards(Command $command) {
|
|
|
|
|
|
+ public function executeCommandOnShards(ICommand $command) {
|
|
$replies = array();
|
|
$replies = array();
|
|
if (Shared\Utils::isCluster($this->_connection)) {
|
|
if (Shared\Utils::isCluster($this->_connection)) {
|
|
foreach($this->_connection as $connection) {
|
|
foreach($this->_connection as $connection) {
|
|
@@ -428,10 +428,19 @@ class Protocol {
|
|
const PREFIX_MULTI_BULK = '*';
|
|
const PREFIX_MULTI_BULK = '*';
|
|
}
|
|
}
|
|
|
|
|
|
-abstract class Command {
|
|
|
|
- private $_arguments, $_hash;
|
|
|
|
|
|
+interface ICommand {
|
|
|
|
+ public function getCommandId();
|
|
|
|
+ public function canBeHashed();
|
|
|
|
+ public function closesConnection();
|
|
|
|
+ public function getHash(Distribution\IDistributionStrategy $distributor);
|
|
|
|
+ public function setArgumentsArray(Array $arguments);
|
|
|
|
+ public function getArguments();
|
|
|
|
+ public function parseResponse($data);
|
|
|
|
+ public function serialize();
|
|
|
|
+}
|
|
|
|
|
|
- public abstract function getCommandId();
|
|
|
|
|
|
+abstract class Command implements ICommand {
|
|
|
|
+ private $_arguments, $_hash;
|
|
|
|
|
|
protected function serializeRequest($command, $arguments) {
|
|
protected function serializeRequest($command, $arguments) {
|
|
$newline = Protocol::NEWLINE;
|
|
$newline = Protocol::NEWLINE;
|
|
@@ -722,7 +731,7 @@ class CommandPipeline {
|
|
return $this;
|
|
return $this;
|
|
}
|
|
}
|
|
|
|
|
|
- private function recordCommand(Command $command) {
|
|
|
|
|
|
+ private function recordCommand(ICommand $command) {
|
|
$this->_pipelineBuffer[] = $command;
|
|
$this->_pipelineBuffer[] = $command;
|
|
}
|
|
}
|
|
|
|
|
|
@@ -1190,9 +1199,9 @@ interface IConnection {
|
|
public function connect();
|
|
public function connect();
|
|
public function disconnect();
|
|
public function disconnect();
|
|
public function isConnected();
|
|
public function isConnected();
|
|
- public function writeCommand(Command $command);
|
|
|
|
- public function readResponse(Command $command);
|
|
|
|
- public function executeCommand(Command $command);
|
|
|
|
|
|
+ public function writeCommand(ICommand $command);
|
|
|
|
+ public function readResponse(ICommand $command);
|
|
|
|
+ public function executeCommand(ICommand $command);
|
|
}
|
|
}
|
|
|
|
|
|
interface IConnectionSingle extends IConnection {
|
|
interface IConnectionSingle extends IConnection {
|
|
@@ -1202,7 +1211,7 @@ interface IConnectionSingle extends IConnection {
|
|
}
|
|
}
|
|
|
|
|
|
interface IConnectionCluster extends IConnection {
|
|
interface IConnectionCluster extends IConnection {
|
|
- public function getConnection(Command $command);
|
|
|
|
|
|
+ public function getConnection(ICommand $command);
|
|
public function getConnectionById($connectionId);
|
|
public function getConnectionById($connectionId);
|
|
}
|
|
}
|
|
|
|
|
|
@@ -1308,7 +1317,7 @@ class TcpConnection implements IConnectionSingle {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- public function pushInitCommand(Command $command){
|
|
|
|
|
|
+ public function pushInitCommand(ICommand $command){
|
|
$this->_initCmds[] = $command;
|
|
$this->_initCmds[] = $command;
|
|
}
|
|
}
|
|
|
|
|
|
@@ -1327,17 +1336,17 @@ class TcpConnection implements IConnectionSingle {
|
|
);
|
|
);
|
|
}
|
|
}
|
|
|
|
|
|
- public function writeCommand(Command $command) {
|
|
|
|
|
|
+ public function writeCommand(ICommand $command) {
|
|
$this->writeBytes($command->serialize());
|
|
$this->writeBytes($command->serialize());
|
|
}
|
|
}
|
|
|
|
|
|
- public function readResponse(Command $command) {
|
|
|
|
|
|
+ public function readResponse(ICommand $command) {
|
|
$response = $this->_reader->read($this);
|
|
$response = $this->_reader->read($this);
|
|
$skipparse = isset($response->queued) || isset($response->error);
|
|
$skipparse = isset($response->queued) || isset($response->error);
|
|
return $skipparse ? $response : $command->parseResponse($response);
|
|
return $skipparse ? $response : $command->parseResponse($response);
|
|
}
|
|
}
|
|
|
|
|
|
- public function executeCommand(Command $command) {
|
|
|
|
|
|
+ public function executeCommand(ICommand $command) {
|
|
$this->writeCommand($command);
|
|
$this->writeCommand($command);
|
|
if ($command->closesConnection()) {
|
|
if ($command->closesConnection()) {
|
|
return $this->disconnect();
|
|
return $this->disconnect();
|
|
@@ -1460,7 +1469,7 @@ class ConnectionCluster implements IConnectionCluster, \IteratorAggregate {
|
|
$this->_distributor->add($connection, $parameters->weight);
|
|
$this->_distributor->add($connection, $parameters->weight);
|
|
}
|
|
}
|
|
|
|
|
|
- public function getConnection(Command $command) {
|
|
|
|
|
|
+ public function getConnection(ICommand $command) {
|
|
if ($command->canBeHashed() === false) {
|
|
if ($command->canBeHashed() === false) {
|
|
throw new ClientException(
|
|
throw new ClientException(
|
|
sprintf("Cannot send '%s' commands to a cluster of connections", $command->getCommandId())
|
|
sprintf("Cannot send '%s' commands to a cluster of connections", $command->getCommandId())
|
|
@@ -1478,15 +1487,15 @@ class ConnectionCluster implements IConnectionCluster, \IteratorAggregate {
|
|
return new \ArrayIterator($this->_pool);
|
|
return new \ArrayIterator($this->_pool);
|
|
}
|
|
}
|
|
|
|
|
|
- public function writeCommand(Command $command) {
|
|
|
|
|
|
+ public function writeCommand(ICommand $command) {
|
|
$this->getConnection($command)->writeCommand($command);
|
|
$this->getConnection($command)->writeCommand($command);
|
|
}
|
|
}
|
|
|
|
|
|
- public function readResponse(Command $command) {
|
|
|
|
|
|
+ public function readResponse(ICommand $command) {
|
|
return $this->getConnection($command)->readResponse($command);
|
|
return $this->getConnection($command)->readResponse($command);
|
|
}
|
|
}
|
|
|
|
|
|
- public function executeCommand(Command $command) {
|
|
|
|
|
|
+ public function executeCommand(ICommand $command) {
|
|
$connection = $this->getConnection($command);
|
|
$connection = $this->getConnection($command);
|
|
$connection->writeCommand($command);
|
|
$connection->writeCommand($command);
|
|
return $connection->readResponse($command);
|
|
return $connection->readResponse($command);
|