|
@@ -0,0 +1,180 @@
|
|
|
|
+<?php
|
|
|
|
+
|
|
|
|
+/*
|
|
|
|
+ * This file is part of the Predis package.
|
|
|
|
+ *
|
|
|
|
+ * (c) Daniele Alessandri <suppakilla@gmail.com>
|
|
|
|
+ *
|
|
|
|
+ * For the full copyright and license information, please view the LICENSE
|
|
|
|
+ * file that was distributed with this source code.
|
|
|
|
+ */
|
|
|
|
+
|
|
|
|
+namespace Predis\Connection;
|
|
|
|
+
|
|
|
|
+use Predis\NotSupportedException;
|
|
|
|
+use Predis\ResponseError;
|
|
|
|
+use Predis\ResponseQueued;
|
|
|
|
+use Predis\Command\CommandInterface;
|
|
|
|
+
|
|
|
|
+/**
|
|
|
|
+ * This class provides the implementation of a Predis connection that uses PHP's
|
|
|
|
+ * streams for network communication and wraps the phpiredis C extension (PHP
|
|
|
|
+ * bindings for hiredis) to parse and serialize the Redis protocol. Everything
|
|
|
|
+ * is highly experimental (even the very same phpiredis since it is quite new),
|
|
|
|
+ * so use it at your own risk.
|
|
|
|
+ *
|
|
|
|
+ * This class is mainly intended to provide an optional low-overhead alternative
|
|
|
|
+ * for processing replies from Redis compared to the standard pure-PHP classes.
|
|
|
|
+ * Differences in speed when dealing with short inline replies are practically
|
|
|
|
+ * nonexistent, the actual speed boost is for long multibulk replies when this
|
|
|
|
+ * protocol processor can parse and return replies very fast.
|
|
|
|
+ *
|
|
|
|
+ * For instructions on how to build and install the phpiredis extension, please
|
|
|
|
+ * consult the repository of the project.
|
|
|
|
+ *
|
|
|
|
+ * The connection parameters supported by this class are:
|
|
|
|
+ *
|
|
|
|
+ * - scheme: it can be either 'tcp' or 'unix'.
|
|
|
|
+ * - host: hostname or IP address of the server.
|
|
|
|
+ * - port: TCP port of the server.
|
|
|
|
+ * - timeout: timeout to perform the connection.
|
|
|
|
+ * - read_write_timeout: timeout of read / write operations.
|
|
|
|
+ * - async_connect: performs the connection asynchronously.
|
|
|
|
+ * - persistent: the connection is left intact after a GC collection.
|
|
|
|
+ *
|
|
|
|
+ * @link https://github.com/nrk/phpiredis
|
|
|
|
+ * @author Daniele Alessandri <suppakilla@gmail.com>
|
|
|
|
+ */
|
|
|
|
+class PhpiredisStreamConnection extends StreamConnection
|
|
|
|
+{
|
|
|
|
+ private $reader;
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * {@inheritdoc}
|
|
|
|
+ */
|
|
|
|
+ public function __construct(ConnectionParametersInterface $parameters)
|
|
|
|
+ {
|
|
|
|
+ $this->checkExtensions();
|
|
|
|
+ $this->initializeReader();
|
|
|
|
+
|
|
|
|
+ parent::__construct($parameters);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Checks if the phpiredis extension is loaded in PHP.
|
|
|
|
+ */
|
|
|
|
+ protected function checkExtensions()
|
|
|
|
+ {
|
|
|
|
+ if (!function_exists('phpiredis_reader_create')) {
|
|
|
|
+ throw new NotSupportedException(
|
|
|
|
+ 'The phpiredis extension must be loaded in order to be able to use this connection class'
|
|
|
|
+ );
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * {@inheritdoc}
|
|
|
|
+ */
|
|
|
|
+ protected function checkParameters(ConnectionParametersInterface $parameters)
|
|
|
|
+ {
|
|
|
|
+ if ($parameters->iterable_multibulk === true) {
|
|
|
|
+ $this->onInvalidOption('iterable_multibulk', $parameters);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ return parent::checkParameters($parameters);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Initializes the protocol reader resource.
|
|
|
|
+ */
|
|
|
|
+ protected function initializeReader()
|
|
|
|
+ {
|
|
|
|
+ $reader = phpiredis_reader_create();
|
|
|
|
+
|
|
|
|
+ phpiredis_reader_set_status_handler($reader, $this->getStatusHandler());
|
|
|
|
+ phpiredis_reader_set_error_handler($reader, $this->getErrorHandler());
|
|
|
|
+
|
|
|
|
+ $this->reader = $reader;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Gets the handler used by the protocol reader to handle status replies.
|
|
|
|
+ *
|
|
|
|
+ * @return \Closure
|
|
|
|
+ */
|
|
|
|
+ protected function getStatusHandler()
|
|
|
|
+ {
|
|
|
|
+ return function ($payload) {
|
|
|
|
+ switch ($payload) {
|
|
|
|
+ case 'OK':
|
|
|
|
+ return true;
|
|
|
|
+
|
|
|
|
+ case 'QUEUED':
|
|
|
|
+ return new ResponseQueued();
|
|
|
|
+
|
|
|
|
+ default:
|
|
|
|
+ return $payload;
|
|
|
|
+ }
|
|
|
|
+ };
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Gets the handler used by the protocol reader to handle Redis errors.
|
|
|
|
+ *
|
|
|
|
+ * @param Boolean $throw_errors Specify if Redis errors throw exceptions.
|
|
|
|
+ * @return \Closure
|
|
|
|
+ */
|
|
|
|
+ protected function getErrorHandler()
|
|
|
|
+ {
|
|
|
|
+ return function ($errorMessage) {
|
|
|
|
+ return new ResponseError($errorMessage);
|
|
|
|
+ };
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * {@inheritdoc}
|
|
|
|
+ */
|
|
|
|
+ public function read()
|
|
|
|
+ {
|
|
|
|
+ $socket = $this->getResource();
|
|
|
|
+ $reader = $this->reader;
|
|
|
|
+
|
|
|
|
+ while (PHPIREDIS_READER_STATE_INCOMPLETE === $state = phpiredis_reader_get_state($reader)) {
|
|
|
|
+ $buffer = fread($socket, 4096);
|
|
|
|
+
|
|
|
|
+ if ($buffer === false || $buffer === '') {
|
|
|
|
+ $this->onConnectionError('Error while reading bytes from the server');
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ phpiredis_reader_feed($reader, $buffer);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if ($state === PHPIREDIS_READER_STATE_COMPLETE) {
|
|
|
|
+ return phpiredis_reader_get_reply($reader);
|
|
|
|
+ } else {
|
|
|
|
+ $this->onProtocolError(phpiredis_reader_get_error($reader));
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * {@inheritdoc}
|
|
|
|
+ */
|
|
|
|
+ public function writeCommand(CommandInterface $command)
|
|
|
|
+ {
|
|
|
|
+ $cmdargs = $command->getArguments();
|
|
|
|
+ array_unshift($cmdargs, $command->getId());
|
|
|
|
+ $this->writeBytes(phpiredis_format_command($cmdargs));
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * {@inheritdoc}
|
|
|
|
+ */
|
|
|
|
+ public function __sleep()
|
|
|
|
+ {
|
|
|
|
+ $this->checkExtensions();
|
|
|
|
+ $this->initializeReader();
|
|
|
|
+
|
|
|
|
+ return array_diff(parent::__sleep(), array('mbiterable'));
|
|
|
|
+ }
|
|
|
|
+}
|