PhpiredisStreamConnection.php 5.0 KB

  1. <?php
  2. /*
  3. * This file is part of the Predis package.
  4. *
  5. * (c) Daniele Alessandri <>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Predis\Connection;
  11. use Predis\NotSupportedException;
  12. use Predis\Command\CommandInterface;
  13. use Predis\Response;
  14. /**
  15. * This class provides the implementation of a Predis connection that uses PHP's
  16. * streams for network communication and wraps the phpiredis C extension (PHP
  17. * bindings for hiredis) to parse and serialize the Redis protocol. Everything
  18. * is highly experimental (even the very same phpiredis since it is quite new),
  19. * so use it at your own risk.
  20. *
  21. * This class is mainly intended to provide an optional low-overhead alternative
  22. * for processing replies from Redis compared to the standard pure-PHP classes.
  23. * Differences in speed when dealing with short inline replies are practically
  24. * nonexistent, the actual speed boost is for long multibulk replies when this
  25. * protocol processor can parse and return replies very fast.
  26. *
  27. * For instructions on how to build and install the phpiredis extension, please
  28. * consult the repository of the project.
  29. *
  30. * The connection parameters supported by this class are:
  31. *
  32. * - scheme: it can be either 'tcp' or 'unix'.
  33. * - host: hostname or IP address of the server.
  34. * - port: TCP port of the server.
  35. * - timeout: timeout to perform the connection.
  36. * - read_write_timeout: timeout of read / write operations.
  37. * - async_connect: performs the connection asynchronously.
  38. * - tcp_nodelay: enables or disables Nagle's algorithm for coalescing.
  39. * - persistent: the connection is left intact after a GC collection.
  40. *
  41. * @link
  42. * @author Daniele Alessandri <>
  43. */
  44. class PhpiredisStreamConnection extends StreamConnection
  45. {
  46. private $reader;
  47. /**
  48. * {@inheritdoc}
  49. */
  50. public function __construct(ConnectionParametersInterface $parameters)
  51. {
  52. $this->checkExtensions();
  53. $this->initializeReader();
  54. parent::__construct($parameters);
  55. }
  56. /**
  57. * {@inheritdoc}
  58. */
  59. public function __destruct()
  60. {
  61. phpiredis_reader_destroy($this->reader);
  62. parent::__destruct();
  63. }
  64. /**
  65. * Checks if the phpiredis extension is loaded in PHP.
  66. */
  67. protected function checkExtensions()
  68. {
  69. if (!function_exists('phpiredis_reader_create')) {
  70. throw new NotSupportedException(
  71. 'The phpiredis extension must be loaded in order to be able to use this connection class'
  72. );
  73. }
  74. }
  75. /**
  76. * Initializes the protocol reader resource.
  77. */
  78. protected function initializeReader()
  79. {
  80. $reader = phpiredis_reader_create();
  81. phpiredis_reader_set_status_handler($reader, $this->getStatusHandler());
  82. phpiredis_reader_set_error_handler($reader, $this->getErrorHandler());
  83. $this->reader = $reader;
  84. }
  85. /**
  86. * Gets the handler used by the protocol reader to handle status replies.
  87. *
  88. * @return \Closure
  89. */
  90. protected function getStatusHandler()
  91. {
  92. return function ($payload) {
  93. switch ($payload) {
  94. case 'OK':
  95. return true;
  96. case 'QUEUED':
  97. return new Response\StatusQueued();
  98. default:
  99. return $payload;
  100. }
  101. };
  102. }
  103. /**
  104. * Gets the handler used by the protocol reader to handle Redis errors.
  105. *
  106. * @param Boolean $throw_errors Specify if Redis errors throw exceptions.
  107. * @return \Closure
  108. */
  109. protected function getErrorHandler()
  110. {
  111. return function ($errorMessage) {
  112. return new Response\Error($errorMessage);
  113. };
  114. }
  115. /**
  116. * {@inheritdoc}
  117. */
  118. public function read()
  119. {
  120. $socket = $this->getResource();
  121. $reader = $this->reader;
  122. while (PHPIREDIS_READER_STATE_INCOMPLETE === $state = phpiredis_reader_get_state($reader)) {
  123. $buffer = fread($socket, 4096);
  124. if ($buffer === false || $buffer === '') {
  125. $this->onConnectionError('Error while reading bytes from the server');
  126. return;
  127. }
  128. phpiredis_reader_feed($reader, $buffer);
  129. }
  130. if ($state === PHPIREDIS_READER_STATE_COMPLETE) {
  131. return phpiredis_reader_get_reply($reader);
  132. } else {
  133. $this->onProtocolError(phpiredis_reader_get_error($reader));
  134. }
  135. }
  136. /**
  137. * {@inheritdoc}
  138. */
  139. public function writeCommand(CommandInterface $command)
  140. {
  141. $cmdargs = $command->getArguments();
  142. array_unshift($cmdargs, $command->getId());
  143. $this->writeBytes(phpiredis_format_command($cmdargs));
  144. }
  145. /**
  146. * {@inheritdoc}
  147. */
  148. public function __wakeup()
  149. {
  150. $this->checkExtensions();
  151. $this->initializeReader();
  152. }
  153. }