PhpiredisConnection.php 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256
  1. <?php
  2. /*
  3. This class provides the implementation of a Predis connection that internally
  4. uses the PHP socket extension for network communication and wraps the phpiredis
  5. C extension (PHP bindings for hiredis) to parse the Redis protocol. Everything
  6. is *highly experimental* (even the very same phpiredis since it is quite new),
  7. so use it at your own risk.
  8. This class is mainly intended to provide an optional low-overhead alternative
  9. for processing replies from Redis compared to the standard pure-PHP classes.
  10. Differences in speed when dealing with short inline replies are practically
  11. nonexistent, the actual speed boost is for long multibulk replies when this
  12. protocol processor can parse and return replies very fast.
  13. For instructions on how to build and install the phpiredis extension, please
  14. consult the repository of the project at http://github.com/seppo0010/phpiredis
  15. */
  16. namespace Predis\Network;
  17. use Predis\ResponseError;
  18. use Predis\ResponseQueued;
  19. use Predis\ClientException;
  20. use Predis\ServerException;
  21. use Predis\IConnectionParameters;
  22. use Predis\Commands\ICommand;
  23. class PhpiredisConnection extends ConnectionBase {
  24. private $_reader;
  25. public function __construct(IConnectionParameters $parameters) {
  26. if (!function_exists('socket_create')) {
  27. throw new ClientException(
  28. 'The socket extension must be loaded in order to be able to ' .
  29. 'use this connection class'
  30. );
  31. }
  32. parent::__construct($parameters);
  33. }
  34. public function __destruct() {
  35. phpiredis_reader_destroy($this->_reader);
  36. parent::__destruct();
  37. }
  38. protected function checkParameters(IConnectionParameters $parameters) {
  39. if ($parameters->isSetByUser('iterable_multibulk')) {
  40. $this->onInvalidOption('iterable_multibulk', $parameters);
  41. }
  42. if ($parameters->isSetByUser('connection_persistent')) {
  43. $this->onInvalidOption('connection_persistent', $parameters);
  44. }
  45. return parent::checkParameters($parameters);
  46. }
  47. private function initializeReader($throw_errors = true) {
  48. if (!function_exists('phpiredis_reader_create')) {
  49. throw new ClientException(
  50. 'The phpiredis extension must be loaded in order to be able to ' .
  51. 'use this connection class'
  52. );
  53. }
  54. $reader = phpiredis_reader_create();
  55. phpiredis_reader_set_status_handler($reader, $this->getStatusHandler());
  56. phpiredis_reader_set_error_handler($reader, $this->getErrorHandler($throw_errors));
  57. $this->_reader = $reader;
  58. }
  59. protected function initializeProtocol(IConnectionParameters $parameters) {
  60. $this->initializeReader($parameters->throw_errors);
  61. }
  62. private function getStatusHandler() {
  63. return function($payload) {
  64. switch ($payload) {
  65. case 'OK':
  66. return true;
  67. case 'QUEUED':
  68. return new ResponseQueued();
  69. default:
  70. return $payload;
  71. }
  72. };
  73. }
  74. private function getErrorHandler($throwErrors = true) {
  75. if ($throwErrors) {
  76. return function($errorMessage) {
  77. throw new ServerException($errorMessage);
  78. };
  79. }
  80. return function($errorMessage) {
  81. return new ResponseError($errorMessage);
  82. };
  83. }
  84. private function emitSocketError() {
  85. $errno = socket_last_error();
  86. $errstr = socket_strerror($errno);
  87. $this->disconnect();
  88. $this->onConnectionError(trim($errstr), $errno);
  89. }
  90. protected function createResource() {
  91. $parameters = $this->_params;
  92. $initializer = array($this, "{$parameters->scheme}SocketInitializer");
  93. $socket = call_user_func($initializer, $parameters);
  94. $this->setSocketOptions($socket, $parameters);
  95. return $socket;
  96. }
  97. private function tcpSocketInitializer(IConnectionParameters $parameters) {
  98. $socket = @socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
  99. if (!is_resource($socket)) {
  100. $this->emitSocketError();
  101. }
  102. return $socket;
  103. }
  104. private function unixSocketInitializer(IConnectionParameters $parameters) {
  105. $socket = @socket_create(AF_UNIX, SOCK_STREAM, 0);
  106. if (!is_resource($socket)) {
  107. $this->emitSocketError();
  108. }
  109. return $socket;
  110. }
  111. private function setSocketOptions($socket, IConnectionParameters $parameters) {
  112. if ($parameters->scheme !== 'tcp') {
  113. return;
  114. }
  115. if (!socket_set_option($socket, SOL_TCP, TCP_NODELAY, 1)) {
  116. $this->emitSocketError();
  117. }
  118. if (!socket_set_option($socket, SOL_SOCKET, SO_REUSEADDR, 1)) {
  119. $this->emitSocketError();
  120. }
  121. if (isset($parameters->read_write_timeout)) {
  122. $rwtimeout = $parameters->read_write_timeout;
  123. $timeoutSec = floor($rwtimeout);
  124. $timeoutUsec = ($rwtimeout - $timeoutSec) * 1000000;
  125. $timeout = array('sec' => $timeoutSec, 'usec' => $timeoutUsec);
  126. if (!socket_set_option($socket, SOL_SOCKET, SO_SNDTIMEO, $timeout)) {
  127. $this->emitSocketError();
  128. }
  129. if (!socket_set_option($socket, SOL_SOCKET, SO_RCVTIMEO, $timeout)) {
  130. $this->emitSocketError();
  131. }
  132. }
  133. }
  134. private function getAddress(IConnectionParameters $parameters) {
  135. if ($parameters->scheme === 'unix') {
  136. return $parameters->path;
  137. }
  138. $host = $parameters->host;
  139. if (ip2long($host) === false) {
  140. if (($address = gethostbyname($host)) === $host) {
  141. $this->onConnectionError("Cannot resolve the address of $host");
  142. }
  143. return $address;
  144. }
  145. return $host;
  146. }
  147. private function connectWithTimeout(IConnectionParameters $parameters) {
  148. $host = self::getAddress($parameters);
  149. $socket = $this->getResource();
  150. socket_set_nonblock($socket);
  151. if (@socket_connect($socket, $host, $parameters->port) === false) {
  152. $error = socket_last_error();
  153. if ($error != SOCKET_EINPROGRESS && $error != SOCKET_EALREADY) {
  154. $this->emitSocketError();
  155. }
  156. }
  157. socket_set_block($socket);
  158. $null = null;
  159. $selectable = array($socket);
  160. $timeout = $parameters->connection_timeout;
  161. $timeoutSecs = floor($timeout);
  162. $timeoutUSecs = ($timeout - $timeoutSecs) * 1000000;
  163. $selected = socket_select($selectable, $selectable, $null, $timeoutSecs, $timeoutUSecs);
  164. if ($selected === 2) {
  165. $this->onConnectionError('Connection refused', SOCKET_ECONNREFUSED);
  166. }
  167. if ($selected === 0) {
  168. $this->onConnectionError('Connection timed out', SOCKET_ETIMEDOUT);
  169. }
  170. if ($selected === false) {
  171. $this->emitSocketError();
  172. }
  173. }
  174. public function connect() {
  175. parent::connect();
  176. $this->connectWithTimeout($this->_params);
  177. if (count($this->_initCmds) > 0) {
  178. $this->sendInitializationCommands();
  179. }
  180. }
  181. public function disconnect() {
  182. if ($this->isConnected()) {
  183. socket_close($this->getResource());
  184. parent::disconnect();
  185. }
  186. }
  187. private function sendInitializationCommands() {
  188. foreach ($this->_initCmds as $command) {
  189. $this->writeCommand($command);
  190. }
  191. foreach ($this->_initCmds as $command) {
  192. $this->readResponse($command);
  193. }
  194. }
  195. private function write($buffer) {
  196. $socket = $this->getResource();
  197. while (($length = strlen($buffer)) > 0) {
  198. $written = socket_write($socket, $buffer, $length);
  199. if ($length === $written) {
  200. return;
  201. }
  202. if ($written === false) {
  203. $this->onConnectionError('Error while writing bytes to the server');
  204. }
  205. $buffer = substr($buffer, $written);
  206. }
  207. }
  208. public function read() {
  209. $socket = $this->getResource();
  210. $reader = $this->_reader;
  211. while (($state = phpiredis_reader_get_state($reader)) === PHPIREDIS_READER_STATE_INCOMPLETE) {
  212. if (@socket_recv($socket, $buffer, 4096, 0) === false || $buffer === '') {
  213. $this->emitSocketError();
  214. }
  215. phpiredis_reader_feed($reader, $buffer);
  216. }
  217. if ($state === PHPIREDIS_READER_STATE_COMPLETE) {
  218. return phpiredis_reader_get_reply($reader);
  219. }
  220. else {
  221. $this->onProtocolError(phpiredis_reader_get_error($reader));
  222. }
  223. }
  224. public function writeCommand(ICommand $command) {
  225. $cmdargs = $command->getArguments();
  226. array_unshift($cmdargs, $command->getId());
  227. $this->write(phpiredis_format_command($cmdargs));
  228. }
  229. }