123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179 |
- <?php
- namespace Predis\Connection;
- use Predis\NotSupportedException;
- use Predis\Command\CommandInterface;
- use Predis\Response\Error as ErrorResponse;
- use Predis\Response\Status as StatusResponse;
- class PhpiredisStreamConnection extends StreamConnection
- {
- private $reader;
-
- public function __construct(ParametersInterface $parameters)
- {
- $this->assertExtensions();
- parent::__construct($parameters);
- $this->reader = $this->createReader();
- }
-
- public function __destruct()
- {
- phpiredis_reader_destroy($this->reader);
- parent::__destruct();
- }
-
- private function assertExtensions()
- {
- if (!extension_loaded('phpiredis')) {
- throw new NotSupportedException(
- 'The "phpiredis" extension is required by this connection backend.'
- );
- }
- }
-
- private function createReader()
- {
- $reader = phpiredis_reader_create();
- phpiredis_reader_set_status_handler($reader, $this->getStatusHandler());
- phpiredis_reader_set_error_handler($reader, $this->getErrorHandler());
- return $reader;
- }
-
- protected function getReader()
- {
- return $this->reader;
- }
-
- protected function getStatusHandler()
- {
- return function ($payload) {
- return StatusResponse::get($payload);
- };
- }
-
- protected function getErrorHandler()
- {
- return function ($errorMessage) {
- return new ErrorResponse($errorMessage);
- };
- }
-
- public function read()
- {
- $socket = $this->getResource();
- $reader = $this->reader;
- while (PHPIREDIS_READER_STATE_INCOMPLETE === $state = phpiredis_reader_get_state($reader)) {
- $buffer = fread($socket, 4096);
- if ($buffer === false || $buffer === '') {
- $this->onConnectionError('Error while reading bytes from the server.');
- }
- phpiredis_reader_feed($reader, $buffer);
- }
- if ($state === PHPIREDIS_READER_STATE_COMPLETE) {
- return phpiredis_reader_get_reply($reader);
- } else {
- $this->onProtocolError(phpiredis_reader_get_error($reader));
- }
- }
-
- public function writeRequest(CommandInterface $command)
- {
- $arguments = $command->getArguments();
- array_unshift($arguments, $command->getId());
- $this->write(phpiredis_format_command($arguments));
- }
-
- public function __wakeup()
- {
- $this->assertExtensions();
- $this->reader = $this->createReader();
- }
- }
|