|
@@ -44,8 +44,6 @@ use Predis\Response;
|
|
|
*/
|
|
|
class PhpiredisConnection extends AbstractConnection
|
|
|
{
|
|
|
- const ERR_MSG_EXTENSION = 'The %s extension must be loaded in order to be able to use this connection class';
|
|
|
-
|
|
|
private $reader;
|
|
|
|
|
|
/**
|
|
@@ -53,10 +51,11 @@ class PhpiredisConnection extends AbstractConnection
|
|
|
*/
|
|
|
public function __construct(ConnectionParametersInterface $parameters)
|
|
|
{
|
|
|
- $this->checkExtensions();
|
|
|
- $this->initializeReader();
|
|
|
+ $this->assertExtensions();
|
|
|
|
|
|
parent::__construct($parameters);
|
|
|
+
|
|
|
+ $this->reader = $this->createReader();
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -73,43 +72,62 @@ class PhpiredisConnection extends AbstractConnection
|
|
|
/**
|
|
|
* Checks if the socket and phpiredis extensions are loaded in PHP.
|
|
|
*/
|
|
|
- private function checkExtensions()
|
|
|
+ protected function assertExtensions()
|
|
|
{
|
|
|
- if (!function_exists('socket_create')) {
|
|
|
- throw new NotSupportedException(sprintf(self::ERR_MSG_EXTENSION, 'socket'));
|
|
|
+ if (!extension_loaded('sockets')) {
|
|
|
+ throw new NotSupportedException(
|
|
|
+ 'The "sockets" extension is required by this connection backend'
|
|
|
+ );
|
|
|
}
|
|
|
- if (!function_exists('phpiredis_reader_create')) {
|
|
|
- throw new NotSupportedException(sprintf(self::ERR_MSG_EXTENSION, 'phpiredis'));
|
|
|
+
|
|
|
+ if (!extension_loaded('phpiredis')) {
|
|
|
+ throw new NotSupportedException(
|
|
|
+ 'The "phpiredis" extension is required by this connection backend'
|
|
|
+ );
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* {@inheritdoc}
|
|
|
*/
|
|
|
- protected function checkParameters(ConnectionParametersInterface $parameters)
|
|
|
+ protected function assertParameters(ConnectionParametersInterface $parameters)
|
|
|
{
|
|
|
if (isset($parameters->persistent)) {
|
|
|
- $this->onInvalidOption('persistent', $parameters);
|
|
|
+ throw new NotSupportedException(
|
|
|
+ "Persistent connections are not supported by this connection backend"
|
|
|
+ );
|
|
|
}
|
|
|
|
|
|
- return parent::checkParameters($parameters);
|
|
|
+ return parent::assertParameters($parameters);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Initializes the protocol reader resource.
|
|
|
+ * Creates a new instance of the protocol reader resource.
|
|
|
+ *
|
|
|
+ * @return resource
|
|
|
*/
|
|
|
- private function initializeReader()
|
|
|
+ private function createReader()
|
|
|
{
|
|
|
$reader = phpiredis_reader_create();
|
|
|
|
|
|
phpiredis_reader_set_status_handler($reader, $this->getStatusHandler());
|
|
|
phpiredis_reader_set_error_handler($reader, $this->getErrorHandler());
|
|
|
|
|
|
- $this->reader = $reader;
|
|
|
+ return $reader;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Returns the underlying protocol reader resource.
|
|
|
+ *
|
|
|
+ * @return resource
|
|
|
+ */
|
|
|
+ protected function getReader()
|
|
|
+ {
|
|
|
+ return $this->reader;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Gets the handler used by the protocol reader to handle status replies.
|
|
|
+ * Returns the handler used by the protocol reader to handle status replies.
|
|
|
*
|
|
|
* @return \Closure
|
|
|
*/
|
|
@@ -130,15 +148,14 @@ class PhpiredisConnection extends AbstractConnection
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Gets the handler used by the protocol reader to handle Redis errors.
|
|
|
+ * Returns the handler used by the protocol reader to handle Redis errors.
|
|
|
*
|
|
|
- * @param Boolean $throw_errors Specify if Redis errors throw exceptions.
|
|
|
* @return \Closure
|
|
|
*/
|
|
|
- private function getErrorHandler()
|
|
|
+ protected function getErrorHandler()
|
|
|
{
|
|
|
- return function ($errorMessage) {
|
|
|
- return new Response\Error($errorMessage);
|
|
|
+ return function ($payload) {
|
|
|
+ return new Response\Error($payload);
|
|
|
};
|
|
|
}
|
|
|
|
|
@@ -160,18 +177,17 @@ class PhpiredisConnection extends AbstractConnection
|
|
|
*/
|
|
|
protected function createResource()
|
|
|
{
|
|
|
- $parameters = $this->parameters;
|
|
|
-
|
|
|
$isUnix = $this->parameters->scheme === 'unix';
|
|
|
$domain = $isUnix ? AF_UNIX : AF_INET;
|
|
|
$protocol = $isUnix ? 0 : SOL_TCP;
|
|
|
|
|
|
$socket = @call_user_func('socket_create', $domain, SOCK_STREAM, $protocol);
|
|
|
+
|
|
|
if (!is_resource($socket)) {
|
|
|
$this->emitSocketError();
|
|
|
}
|
|
|
|
|
|
- $this->setSocketOptions($socket, $parameters);
|
|
|
+ $this->setSocketOptions($socket, $this->parameters);
|
|
|
|
|
|
return $socket;
|
|
|
}
|
|
@@ -222,7 +238,7 @@ class PhpiredisConnection extends AbstractConnection
|
|
|
* @param ConnectionParametersInterface $parameters Parameters used to initialize the connection.
|
|
|
* @return string
|
|
|
*/
|
|
|
- private function getAddress(ConnectionParametersInterface $parameters)
|
|
|
+ private static function getAddress(ConnectionParametersInterface $parameters)
|
|
|
{
|
|
|
if ($parameters->scheme === 'unix') {
|
|
|
return $parameters->path;
|
|
@@ -234,6 +250,7 @@ class PhpiredisConnection extends AbstractConnection
|
|
|
if (($addresses = gethostbynamel($host)) === false) {
|
|
|
$this->onConnectionError("Cannot resolve the address of $host");
|
|
|
}
|
|
|
+
|
|
|
return $addresses[array_rand($addresses)];
|
|
|
}
|
|
|
|
|
@@ -255,6 +272,7 @@ class PhpiredisConnection extends AbstractConnection
|
|
|
|
|
|
if (@socket_connect($socket, $host, $parameters->port) === false) {
|
|
|
$error = socket_last_error();
|
|
|
+
|
|
|
if ($error != SOCKET_EINPROGRESS && $error != SOCKET_EALREADY) {
|
|
|
$this->emitSocketError();
|
|
|
}
|
|
@@ -322,6 +340,7 @@ class PhpiredisConnection extends AbstractConnection
|
|
|
if ($length === $written) {
|
|
|
return;
|
|
|
}
|
|
|
+
|
|
|
if ($written === false) {
|
|
|
$this->onConnectionError('Error while writing bytes to the server');
|
|
|
}
|
|
@@ -338,7 +357,7 @@ class PhpiredisConnection extends AbstractConnection
|
|
|
$socket = $this->getResource();
|
|
|
$reader = $this->reader;
|
|
|
|
|
|
- while (($state = phpiredis_reader_get_state($reader)) === PHPIREDIS_READER_STATE_INCOMPLETE) {
|
|
|
+ while (PHPIREDIS_READER_STATE_INCOMPLETE === $state = phpiredis_reader_get_state($reader)) {
|
|
|
if (@socket_recv($socket, $buffer, 4096, 0) === false || $buffer === '') {
|
|
|
$this->emitSocketError();
|
|
|
}
|
|
@@ -358,9 +377,10 @@ class PhpiredisConnection extends AbstractConnection
|
|
|
*/
|
|
|
public function writeCommand(CommandInterface $command)
|
|
|
{
|
|
|
- $cmdargs = $command->getArguments();
|
|
|
- array_unshift($cmdargs, $command->getId());
|
|
|
- $this->write(phpiredis_format_command($cmdargs));
|
|
|
+ $arguments = $command->getArguments();
|
|
|
+ array_unshift($arguments, $command->getId());
|
|
|
+
|
|
|
+ $this->write(phpiredis_format_command($arguments));
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -368,7 +388,7 @@ class PhpiredisConnection extends AbstractConnection
|
|
|
*/
|
|
|
public function __wakeup()
|
|
|
{
|
|
|
- $this->checkExtensions();
|
|
|
- $this->initializeReader();
|
|
|
+ $this->assertExtensions();
|
|
|
+ $this->reader = $this->createReader();
|
|
|
}
|
|
|
}
|