PhpiredisConnection.php 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272
  1. <?php
  2. /*
  3. This class provides the implementation of a Predis connection that internally
  4. uses the PHP socket extension for network communication and wraps the phpiredis
  5. C extension (PHP bindings for hiredis) to parse the Redis protocol. Everything
  6. is *highly experimental* (even the very same phpiredis since it is quite new),
  7. so use it at your own risk.
  8. This class is mainly intended to provide an optional low-overhead alternative
  9. for processing replies from Redis compared to the standard pure-PHP classes.
  10. Differences in speed when dealing with short inline replies are practically
  11. nonexistent, the actual speed boost is for long multibulk replies when this
  12. protocol processor can parse and return replies very fast.
  13. For instructions on how to build and install the phpiredis extension, please
  14. consult the repository of the project at http://github.com/seppo0010/phpiredis
  15. */
  16. namespace Predis\Network;
  17. use Predis\ResponseError;
  18. use Predis\ResponseQueued;
  19. use Predis\ClientException;
  20. use Predis\ServerException;
  21. use Predis\ConnectionParameters;
  22. use Predis\Commands\ICommand;
  23. class PhpiredisConnection extends ConnectionBase {
  24. private $_reader, $_throwErrors;
  25. public function __construct(ConnectionParameters $parameters) {
  26. if (!function_exists('socket_create')) {
  27. throw new ClientException(
  28. 'The socket extension must be loaded in order to be able to ' .
  29. 'use this connection class'
  30. );
  31. }
  32. parent::__construct($this->checkParameters($parameters));
  33. $this->_throwErrors = true;
  34. $this->initializeReader();
  35. }
  36. public function __destruct() {
  37. $this->disconnect();
  38. phpiredis_reader_destroy($this->_reader);
  39. }
  40. protected function checkParameters(ConnectionParameters $parameters) {
  41. switch ($parameters->scheme) {
  42. case 'unix':
  43. $pathToSocket = $parameters->path;
  44. if (!isset($pathToSocket)) {
  45. throw new \InvalidArgumentException('Missing UNIX domain socket path');
  46. }
  47. if (!file_exists($pathToSocket)) {
  48. throw new \InvalidArgumentException("Could not find $pathToSocket");
  49. }
  50. case 'tcp':
  51. return $parameters;
  52. default:
  53. throw new \InvalidArgumentException("Invalid scheme: {$parameters->scheme}");
  54. }
  55. if ($parameters->connection_persistent == true) {
  56. throw new \InvalidArgumentException(
  57. 'Persistent connections are not supported by this connection class.'
  58. );
  59. }
  60. }
  61. private function initializeReader() {
  62. if (!function_exists('phpiredis_reader_create')) {
  63. throw new ClientException(
  64. 'The phpiredis extension must be loaded in order to be able to ' .
  65. 'use this connection class'
  66. );
  67. }
  68. $this->_reader = phpiredis_reader_create();
  69. phpiredis_reader_set_status_handler($this->_reader, $this->getStatusHandler());
  70. phpiredis_reader_set_error_handler($this->_reader, $this->getErrorHandler());
  71. }
  72. private function getStatusHandler() {
  73. return function($payload) {
  74. switch ($payload) {
  75. case 'OK':
  76. return true;
  77. case 'QUEUED':
  78. return new ResponseQueued();
  79. default:
  80. return $payload;
  81. }
  82. };
  83. }
  84. private function getErrorHandler() {
  85. if ($this->_throwErrors) {
  86. return function($errorMessage) {
  87. throw new ServerException(substr($errorMessage, 4));
  88. };
  89. }
  90. return function($errorMessage) {
  91. return new ResponseError(substr($errorMessage, 4));
  92. };
  93. }
  94. private function emitSocketError() {
  95. $errno = socket_last_error();
  96. $errstr = socket_strerror($errno);
  97. $this->disconnect();
  98. $this->onCommunicationException(trim($errstr), $errno);
  99. }
  100. protected function createResource() {
  101. $parameters = $this->_params;
  102. $initializer = array($this, "{$parameters->scheme}SocketInitializer");
  103. $socket = call_user_func($initializer, $parameters);
  104. $this->setSocketOptions($socket, $parameters);
  105. return $socket;
  106. }
  107. private function tcpSocketInitializer(ConnectionParameters $parameters) {
  108. $socket = @socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
  109. if (!is_resource($socket)) {
  110. $this->emitSocketError();
  111. }
  112. $addressLong = ip2long($parameters->host);
  113. if ($addressLong == -1 || $addressLong === false) {
  114. $host = gethostbyname($parameters->host);
  115. }
  116. return $socket;
  117. }
  118. private function unixSocketInitializer(ConnectionParameters $parameters) {
  119. $socket = @socket_create(AF_UNIX, SOCK_STREAM, 0);
  120. if (!is_resource($socket)) {
  121. $this->emitSocketError();
  122. }
  123. return $socket;
  124. }
  125. private function setSocketOptions($socket, ConnectionParameters $parameters) {
  126. if ($parameters->scheme !== 'tcp') {
  127. return;
  128. }
  129. if (!socket_set_option($socket, SOL_TCP, TCP_NODELAY, 1)) {
  130. $this->emitSocketError();
  131. }
  132. if (!socket_set_option($socket, SOL_SOCKET, SO_REUSEADDR, 1)) {
  133. $this->emitSocketError();
  134. }
  135. if (isset($parameters->read_write_timeout)) {
  136. $rwtimeout = $parameters->read_write_timeout;
  137. $timeoutSec = floor($rwtimeout);
  138. $timeoutUsec = ($rwtimeout - $timeoutSec) * 1000000;
  139. $timeout = array('sec' => $timeoutSec, 'usec' => $timeoutUsec);
  140. if (!socket_set_option($socket, SOL_SOCKET, SO_SNDTIMEO, $timeout)) {
  141. $this->emitSocketError();
  142. }
  143. if (!socket_set_option($socket, SOL_SOCKET, SO_RCVTIMEO, $timeout)) {
  144. $this->emitSocketError();
  145. }
  146. }
  147. }
  148. private function connectWithTimeout(ConnectionParameters $parameters) {
  149. $socket = $this->_resource;
  150. socket_set_nonblock($socket);
  151. $host = $parameters->scheme === 'unix' ? $parameters->path : $parameters->host;
  152. if (@socket_connect($socket, $host, $parameters->port) === false) {
  153. $error = socket_last_error();
  154. if ($error != SOCKET_EINPROGRESS && $error != SOCKET_EALREADY) {
  155. $this->emitSocketError();
  156. }
  157. }
  158. socket_set_block($socket);
  159. $null = null;
  160. $selectable = array($socket);
  161. $timeout = $parameters->connection_timeout;
  162. $timeoutSecs = floor($timeout);
  163. $timeoutUSecs = ($timeout - $timeoutSecs) * 1000000;
  164. $selected = socket_select($selectable, $selectable, $null, $timeoutSecs, $timeoutUSecs);
  165. if ($selected === 2) {
  166. $this->onCommunicationException('Connection refused', SOCKET_ECONNREFUSED);
  167. }
  168. if ($selected === 0) {
  169. $this->onCommunicationException('Connection timed out', SOCKET_ETIMEDOUT);
  170. }
  171. if ($selected === false) {
  172. $this->emitSocketError();
  173. }
  174. }
  175. public function connect() {
  176. parent::connect();
  177. $this->connectWithTimeout($this->_params);
  178. if (count($this->_initCmds) > 0) {
  179. $this->sendInitializationCommands();
  180. }
  181. }
  182. public function disconnect() {
  183. if ($this->isConnected()) {
  184. socket_close($this->_resource);
  185. $this->_resource = null;
  186. }
  187. }
  188. private function sendInitializationCommands() {
  189. foreach ($this->_initCmds as $command) {
  190. $this->writeCommand($command);
  191. }
  192. foreach ($this->_initCmds as $command) {
  193. $this->readResponse($command);
  194. }
  195. }
  196. private function write($buffer) {
  197. $socket = $this->getResource();
  198. while (($length = strlen($buffer)) > 0) {
  199. $written = socket_write($socket, $buffer, $length);
  200. if ($length === $written) {
  201. return true;
  202. }
  203. if ($written === false) {
  204. $this->onCommunicationException('Error while writing bytes to the server');
  205. }
  206. $buffer = substr($buffer, $written);
  207. }
  208. }
  209. public function read() {
  210. $socket = $this->getResource();
  211. $reader = $this->_reader;
  212. while (($state = phpiredis_reader_get_state($reader)) === PHPIREDIS_READER_STATE_INCOMPLETE) {
  213. if (@socket_recv($socket, $buffer, 4096, 0) === false || $buffer === '') {
  214. $this->emitSocketError();
  215. }
  216. phpiredis_reader_feed($reader, $buffer);
  217. }
  218. if ($state === PHPIREDIS_READER_STATE_COMPLETE) {
  219. return phpiredis_reader_get_reply($reader);
  220. }
  221. else {
  222. $this->onCommunicationException(phpiredis_reader_get_error($reader));
  223. }
  224. }
  225. public function writeCommand(ICommand $command) {
  226. $cmdargs = $command->getArguments();
  227. array_unshift($cmdargs, $command->getId());
  228. $this->write(phpiredis_format_command($cmdargs));
  229. }
  230. public function readResponse(ICommand $command) {
  231. $reply = $this->read();
  232. return isset($reply->skipParse) ? $reply : $command->parseResponse($reply);
  233. }
  234. public function setProtocolOption($option, $value) {
  235. switch ($option) {
  236. case 'iterable_multibulk':
  237. // TODO: iterable multibulk replies cannot be supported
  238. break;
  239. case 'throw_errors':
  240. $this->_throwErrors = (bool) $value;
  241. phpiredis_reader_set_error_handler($this->_reader, $this->getErrorHandler());
  242. break;
  243. }
  244. }
  245. }