* * For the full copyright and license information, please view the LICENSE * file that was distributed with this source code. */ namespace Predis\Connection; use Predis\NotSupportedException; use Predis\Command\CommandInterface; use Predis\Response; /** * This class provides the implementation of a Predis connection that uses PHP's * streams for network communication and wraps the phpiredis C extension (PHP * bindings for hiredis) to parse and serialize the Redis protocol. Everything * is highly experimental (even the very same phpiredis since it is quite new), * so use it at your own risk. * * This class is mainly intended to provide an optional low-overhead alternative * for processing replies from Redis compared to the standard pure-PHP classes. * Differences in speed when dealing with short inline replies are practically * nonexistent, the actual speed boost is for long multibulk replies when this * protocol processor can parse and return replies very fast. * * For instructions on how to build and install the phpiredis extension, please * consult the repository of the project. * * The connection parameters supported by this class are: * * - scheme: it can be either 'tcp' or 'unix'. * - host: hostname or IP address of the server. * - port: TCP port of the server. * - timeout: timeout to perform the connection. * - read_write_timeout: timeout of read / write operations. * - async_connect: performs the connection asynchronously. * - tcp_nodelay: enables or disables Nagle's algorithm for coalescing. * - persistent: the connection is left intact after a GC collection. * * @link https://github.com/nrk/phpiredis * @author Daniele Alessandri */ class PhpiredisStreamConnection extends StreamConnection { private $reader; /** * {@inheritdoc} */ public function __construct(ConnectionParametersInterface $parameters) { $this->checkExtensions(); $this->initializeReader(); parent::__construct($parameters); } /** * {@inheritdoc} */ public function __destruct() { phpiredis_reader_destroy($this->reader); parent::__destruct(); } /** * Checks if the phpiredis extension is loaded in PHP. */ protected function checkExtensions() { if (!function_exists('phpiredis_reader_create')) { throw new NotSupportedException( 'The phpiredis extension must be loaded in order to be able to use this connection class' ); } } /** * Initializes the protocol reader resource. */ protected function initializeReader() { $reader = phpiredis_reader_create(); phpiredis_reader_set_status_handler($reader, $this->getStatusHandler()); phpiredis_reader_set_error_handler($reader, $this->getErrorHandler()); $this->reader = $reader; } /** * Gets the handler used by the protocol reader to handle status replies. * * @return \Closure */ protected function getStatusHandler() { return function ($payload) { switch ($payload) { case 'OK': return true; case 'QUEUED': return new Response\StatusQueued(); default: return $payload; } }; } /** * Gets the handler used by the protocol reader to handle Redis errors. * * @param Boolean $throw_errors Specify if Redis errors throw exceptions. * @return \Closure */ protected function getErrorHandler() { return function ($errorMessage) { return new Response\Error($errorMessage); }; } /** * {@inheritdoc} */ public function read() { $socket = $this->getResource(); $reader = $this->reader; while (PHPIREDIS_READER_STATE_INCOMPLETE === $state = phpiredis_reader_get_state($reader)) { $buffer = fread($socket, 4096); if ($buffer === false || $buffer === '') { $this->onConnectionError('Error while reading bytes from the server'); return; } phpiredis_reader_feed($reader, $buffer); } if ($state === PHPIREDIS_READER_STATE_COMPLETE) { return phpiredis_reader_get_reply($reader); } else { $this->onProtocolError(phpiredis_reader_get_error($reader)); } } /** * {@inheritdoc} */ public function writeCommand(CommandInterface $command) { $cmdargs = $command->getArguments(); array_unshift($cmdargs, $command->getId()); $this->writeBytes(phpiredis_format_command($cmdargs)); } /** * {@inheritdoc} */ public function __wakeup() { $this->checkExtensions(); $this->initializeReader(); } }