|
@@ -76,22 +76,16 @@ class Client {
|
|
}
|
|
}
|
|
|
|
|
|
private function setupClient($options) {
|
|
private function setupClient($options) {
|
|
- $this->_responseReader = new ResponseReader();
|
|
|
|
- $this->_options = self::filterClientOptions($options);
|
|
|
|
-
|
|
|
|
- $this->setProfile($this->_options->profile);
|
|
|
|
- if ($this->_options->iterable_multibulk === true) {
|
|
|
|
- $this->_responseReader->setHandler(
|
|
|
|
- Protocol::PREFIX_MULTI_BULK,
|
|
|
|
- new ResponseMultiBulkStreamHandler()
|
|
|
|
- );
|
|
|
|
- }
|
|
|
|
- if ($this->_options->throw_on_error === false) {
|
|
|
|
- $this->_responseReader->setHandler(
|
|
|
|
- Protocol::PREFIX_ERROR,
|
|
|
|
- new ResponseErrorSilentHandler()
|
|
|
|
- );
|
|
|
|
- }
|
|
|
|
|
|
+ $options = self::filterClientOptions($options);
|
|
|
|
+
|
|
|
|
+ $this->setProfile($options->profile);
|
|
|
|
+
|
|
|
|
+ $reader = $options->reader;
|
|
|
|
+ $reader->setOption('iterable_multibulk', $options->iterable_multibulk);
|
|
|
|
+ $reader->setOption('throw_on_error', $options->throw_on_error);
|
|
|
|
+
|
|
|
|
+ $this->_options = $options;
|
|
|
|
+ $this->_responseReader = $reader;
|
|
}
|
|
}
|
|
|
|
|
|
private function setupConnection($parameters) {
|
|
private function setupConnection($parameters) {
|
|
@@ -355,6 +349,28 @@ class ClientOptionsThrowOnError implements IClientOptionsHandler {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+class ClientOptionsReader implements IClientOptionsHandler {
|
|
|
|
+ public function validate($option, $value) {
|
|
|
|
+ if ($value instanceof \Predis\IResponseReader) {
|
|
|
|
+ return $value;
|
|
|
|
+ }
|
|
|
|
+ if (is_string($value)) {
|
|
|
|
+ if ($value === 'fast') {
|
|
|
|
+ return new \Predis\FastResponseReader();
|
|
|
|
+ }
|
|
|
|
+ $valueReflection = new \ReflectionClass($value);
|
|
|
|
+ if ($valueReflection->isSubclassOf('\Predis\Distribution\IDistributionStrategy')) {
|
|
|
|
+ return new $value;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ throw new \InvalidArgumentException("Invalid value for option $option");
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public function getDefault() {
|
|
|
|
+ return new \Predis\ResponseReader();
|
|
|
|
+ }
|
|
|
|
+}
|
|
|
|
+
|
|
class ClientOptions {
|
|
class ClientOptions {
|
|
private static $_optionsHandlers;
|
|
private static $_optionsHandlers;
|
|
private $_options;
|
|
private $_options;
|
|
@@ -376,6 +392,7 @@ class ClientOptions {
|
|
'key_distribution' => new \Predis\ClientOptionsKeyDistribution(),
|
|
'key_distribution' => new \Predis\ClientOptionsKeyDistribution(),
|
|
'iterable_multibulk' => new \Predis\ClientOptionsIterableMultiBulk(),
|
|
'iterable_multibulk' => new \Predis\ClientOptionsIterableMultiBulk(),
|
|
'throw_on_error' => new \Predis\ClientOptionsThrowOnError(),
|
|
'throw_on_error' => new \Predis\ClientOptionsThrowOnError(),
|
|
|
|
+ 'reader' => new \Predis\ClientOptionsReader(),
|
|
);
|
|
);
|
|
}
|
|
}
|
|
|
|
|
|
@@ -661,7 +678,95 @@ class ResponseIntegerHandler implements IResponseHandler {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
-class ResponseReader {
|
|
|
|
|
|
+interface IResponseReader {
|
|
|
|
+ public function read(Connection $connection);
|
|
|
|
+ public function setOption($option, $value);
|
|
|
|
+ public function getOption($option);
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+class FastResponseReader implements IResponseReader {
|
|
|
|
+ private $_iterableMultibulk, $_throwErrors;
|
|
|
|
+
|
|
|
|
+ public function __construct() {
|
|
|
|
+ $this->_iterableMultibulk = false;
|
|
|
|
+ $this->_throwErrors = true;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public function read(Connection $connection) {
|
|
|
|
+ $chunk = $connection->readLine();
|
|
|
|
+ $prefix = $chunk[0];
|
|
|
|
+ $payload = substr($chunk, 1);
|
|
|
|
+ switch ($prefix) {
|
|
|
|
+ case '+': // inline
|
|
|
|
+ switch ($payload) {
|
|
|
|
+ case 'OK':
|
|
|
|
+ return true;
|
|
|
|
+ case 'QUEUED':
|
|
|
|
+ return new ResponseQueued();
|
|
|
|
+ default:
|
|
|
|
+ return $payload;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ case '$': // bulk
|
|
|
|
+ $size = (int) $payload;
|
|
|
|
+ if ($size === -1) {
|
|
|
|
+ return null;
|
|
|
|
+ }
|
|
|
|
+ return substr($connection->readBytes($size + 2), 0, -2);
|
|
|
|
+
|
|
|
|
+ case '*': // multi bulk
|
|
|
|
+ $count = (int) $payload;
|
|
|
|
+ if ($count === -1) {
|
|
|
|
+ return null;
|
|
|
|
+ }
|
|
|
|
+ if ($this->_iterableMultibulk) {
|
|
|
|
+ return new MultiBulkResponseSimple($connection, $count);
|
|
|
|
+ }
|
|
|
|
+ $multibulk = array();
|
|
|
|
+ for ($i = 0; $i < $count; $i++) {
|
|
|
|
+ $multibulk[$i] = $this->read($connection);
|
|
|
|
+ }
|
|
|
|
+ return $multibulk;
|
|
|
|
+
|
|
|
|
+ case ':': // integer
|
|
|
|
+ return (int) $payload;
|
|
|
|
+
|
|
|
|
+ case '-': // error
|
|
|
|
+ $errorMessage = substr($payload, 4);
|
|
|
|
+ if ($this->_throwErrors) {
|
|
|
|
+ throw new ServerException($errorMessage);
|
|
|
|
+ }
|
|
|
|
+ return new ResponseError($errorMessage);
|
|
|
|
+
|
|
|
|
+ default:
|
|
|
|
+ throw new CommunicationException(
|
|
|
|
+ $connection, "Unknown prefix: '$prefix'"
|
|
|
|
+ );
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public function setOption($option, $value) {
|
|
|
|
+ switch ($option) {
|
|
|
|
+ case 'iterable_multibulk':
|
|
|
|
+ $this->_iterableMultibulk = (bool) $value;
|
|
|
|
+ break;
|
|
|
|
+ case 'throw_on_error':
|
|
|
|
+ $this->_throwErrors = (bool) $value;
|
|
|
|
+ break;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public function getOption($option) {
|
|
|
|
+ switch ($option) {
|
|
|
|
+ case 'iterable_multibulk':
|
|
|
|
+ return $this->_iterableMultibulk;
|
|
|
|
+ case 'throw_on_error':
|
|
|
|
+ return $this->_throwErrors;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+class ResponseReader implements IResponseReader {
|
|
private $_prefixHandlers;
|
|
private $_prefixHandlers;
|
|
|
|
|
|
public function __construct() {
|
|
public function __construct() {
|
|
@@ -707,6 +812,28 @@ class ResponseReader {
|
|
$connection, $message
|
|
$connection, $message
|
|
));
|
|
));
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ public function setOption($option, $value) {
|
|
|
|
+ switch ($option) {
|
|
|
|
+ case 'iterable_multibulk':
|
|
|
|
+ $handler = $value ? 'Predis\ResponseMultiBulkStreamHandler' : 'Predis\ResponseMultiBulkHandler';
|
|
|
|
+ $this->_prefixHandlers[Protocol::PREFIX_MULTI_BULK] = new $handler();
|
|
|
|
+ break;
|
|
|
|
+ case 'throw_on_error':
|
|
|
|
+ $handler = $value ? 'Predis\ResponseErrorHandler' : 'Predis\ResponseErrorSilentHandler';
|
|
|
|
+ $this->_prefixHandlers[Protocol::PREFIX_ERROR] = new $handler();
|
|
|
|
+ break;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public function getOption($option) {
|
|
|
|
+ switch ($option) {
|
|
|
|
+ case 'iterable_multibulk':
|
|
|
|
+ return $this->_prefixHandlers[Protocol::PREFIX_MULTI_BULK] instanceof ResponseMultiBulkStreamHandler;
|
|
|
|
+ case 'throw_on_error':
|
|
|
|
+ return $this->_prefixHandlers[Protocol::PREFIX_ERROR] instanceof ResponseErrorHandler;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
class ResponseError {
|
|
class ResponseError {
|
|
@@ -1317,7 +1444,7 @@ interface IConnection {
|
|
class Connection implements IConnection {
|
|
class Connection implements IConnection {
|
|
private $_params, $_socket, $_initCmds, $_reader, $_initializer;
|
|
private $_params, $_socket, $_initCmds, $_reader, $_initializer;
|
|
|
|
|
|
- public function __construct(ConnectionParameters $parameters, ResponseReader $reader = null) {
|
|
|
|
|
|
+ public function __construct(ConnectionParameters $parameters, IResponseReader $reader = null) {
|
|
$scheme = $parameters->scheme;
|
|
$scheme = $parameters->scheme;
|
|
if ($scheme !== 'redis' && $scheme !== 'tcp' && $scheme !== 'unix') {
|
|
if ($scheme !== 'redis' && $scheme !== 'tcp' && $scheme !== 'unix') {
|
|
throw new \InvalidArgumentException("Invalid scheme: $scheme");
|
|
throw new \InvalidArgumentException("Invalid scheme: $scheme");
|