PhpiredisConnection.php 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326
  1. <?php
  2. /*
  3. * This file is part of the Predis package.
  4. *
  5. * (c) Daniele Alessandri <suppakilla@gmail.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Predis\Network;
  11. // This class provides the implementation of a Predis connection that internally
  12. // uses the PHP socket extension for network communication and wraps the phpiredis
  13. // C extension (PHP bindings for hiredis) to parse the Redis protocol. Everything
  14. // is *highly experimental* (even the very same phpiredis since it is quite new),
  15. // so use it at your own risk.
  16. // This class is mainly intended to provide an optional low-overhead alternative
  17. // for processing replies from Redis compared to the standard pure-PHP classes.
  18. // Differences in speed when dealing with short inline replies are practically
  19. // nonexistent, the actual speed boost is for long multibulk replies when this
  20. // protocol processor can parse and return replies very fast.
  21. // For instructions on how to build and install the phpiredis extension, please
  22. // consult the repository of the project at http://github.com/seppo0010/phpiredis
  23. use Predis\ResponseError;
  24. use Predis\ResponseQueued;
  25. use Predis\ClientException;
  26. use Predis\ServerException;
  27. use Predis\IConnectionParameters;
  28. use Predis\Commands\ICommand;
  29. class PhpiredisConnection extends ConnectionBase
  30. {
  31. private $_reader;
  32. public function __construct(IConnectionParameters $parameters)
  33. {
  34. if (!function_exists('socket_create')) {
  35. throw new ClientException(
  36. 'The socket extension must be loaded in order to be able to ' .
  37. 'use this connection class'
  38. );
  39. }
  40. parent::__construct($parameters);
  41. }
  42. public function __destruct()
  43. {
  44. phpiredis_reader_destroy($this->_reader);
  45. parent::__destruct();
  46. }
  47. protected function checkParameters(IConnectionParameters $parameters)
  48. {
  49. if ($parameters->isSetByUser('iterable_multibulk')) {
  50. $this->onInvalidOption('iterable_multibulk', $parameters);
  51. }
  52. if ($parameters->isSetByUser('connection_persistent')) {
  53. $this->onInvalidOption('connection_persistent', $parameters);
  54. }
  55. return parent::checkParameters($parameters);
  56. }
  57. private function initializeReader($throw_errors = true)
  58. {
  59. if (!function_exists('phpiredis_reader_create')) {
  60. throw new ClientException(
  61. 'The phpiredis extension must be loaded in order to be able to ' .
  62. 'use this connection class'
  63. );
  64. }
  65. $reader = phpiredis_reader_create();
  66. phpiredis_reader_set_status_handler($reader, $this->getStatusHandler());
  67. phpiredis_reader_set_error_handler($reader, $this->getErrorHandler($throw_errors));
  68. $this->_reader = $reader;
  69. }
  70. protected function initializeProtocol(IConnectionParameters $parameters)
  71. {
  72. $this->initializeReader($parameters->throw_errors);
  73. }
  74. private function getStatusHandler()
  75. {
  76. return function($payload) {
  77. switch ($payload) {
  78. case 'OK':
  79. return true;
  80. case 'QUEUED':
  81. return new ResponseQueued();
  82. default:
  83. return $payload;
  84. }
  85. };
  86. }
  87. private function getErrorHandler($throwErrors = true)
  88. {
  89. if ($throwErrors) {
  90. return function($errorMessage) {
  91. throw new ServerException($errorMessage);
  92. };
  93. }
  94. return function($errorMessage) {
  95. return new ResponseError($errorMessage);
  96. };
  97. }
  98. private function emitSocketError()
  99. {
  100. $errno = socket_last_error();
  101. $errstr = socket_strerror($errno);
  102. $this->disconnect();
  103. $this->onConnectionError(trim($errstr), $errno);
  104. }
  105. protected function createResource()
  106. {
  107. $parameters = $this->_params;
  108. $initializer = array($this, "{$parameters->scheme}SocketInitializer");
  109. $socket = call_user_func($initializer, $parameters);
  110. $this->setSocketOptions($socket, $parameters);
  111. return $socket;
  112. }
  113. private function tcpSocketInitializer(IConnectionParameters $parameters)
  114. {
  115. $socket = @socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
  116. if (!is_resource($socket)) {
  117. $this->emitSocketError();
  118. }
  119. return $socket;
  120. }
  121. private function unixSocketInitializer(IConnectionParameters $parameters)
  122. {
  123. $socket = @socket_create(AF_UNIX, SOCK_STREAM, 0);
  124. if (!is_resource($socket)) {
  125. $this->emitSocketError();
  126. }
  127. return $socket;
  128. }
  129. private function setSocketOptions($socket, IConnectionParameters $parameters)
  130. {
  131. if ($parameters->scheme !== 'tcp') {
  132. return;
  133. }
  134. if (!socket_set_option($socket, SOL_TCP, TCP_NODELAY, 1)) {
  135. $this->emitSocketError();
  136. }
  137. if (!socket_set_option($socket, SOL_SOCKET, SO_REUSEADDR, 1)) {
  138. $this->emitSocketError();
  139. }
  140. if (isset($parameters->read_write_timeout)) {
  141. $rwtimeout = $parameters->read_write_timeout;
  142. $timeoutSec = floor($rwtimeout);
  143. $timeoutUsec = ($rwtimeout - $timeoutSec) * 1000000;
  144. $timeout = array(
  145. 'sec' => $timeoutSec,
  146. 'usec' => $timeoutUsec,
  147. );
  148. if (!socket_set_option($socket, SOL_SOCKET, SO_SNDTIMEO, $timeout)) {
  149. $this->emitSocketError();
  150. }
  151. if (!socket_set_option($socket, SOL_SOCKET, SO_RCVTIMEO, $timeout)) {
  152. $this->emitSocketError();
  153. }
  154. }
  155. }
  156. private function getAddress(IConnectionParameters $parameters)
  157. {
  158. if ($parameters->scheme === 'unix') {
  159. return $parameters->path;
  160. }
  161. $host = $parameters->host;
  162. if (ip2long($host) === false) {
  163. if (($address = gethostbyname($host)) === $host) {
  164. $this->onConnectionError("Cannot resolve the address of $host");
  165. }
  166. return $address;
  167. }
  168. return $host;
  169. }
  170. private function connectWithTimeout(IConnectionParameters $parameters) {
  171. $host = self::getAddress($parameters);
  172. $socket = $this->getResource();
  173. socket_set_nonblock($socket);
  174. if (@socket_connect($socket, $host, $parameters->port) === false) {
  175. $error = socket_last_error();
  176. if ($error != SOCKET_EINPROGRESS && $error != SOCKET_EALREADY) {
  177. $this->emitSocketError();
  178. }
  179. }
  180. socket_set_block($socket);
  181. $null = null;
  182. $selectable = array($socket);
  183. $timeout = $parameters->connection_timeout;
  184. $timeoutSecs = floor($timeout);
  185. $timeoutUSecs = ($timeout - $timeoutSecs) * 1000000;
  186. $selected = socket_select($selectable, $selectable, $null, $timeoutSecs, $timeoutUSecs);
  187. if ($selected === 2) {
  188. $this->onConnectionError('Connection refused', SOCKET_ECONNREFUSED);
  189. }
  190. if ($selected === 0) {
  191. $this->onConnectionError('Connection timed out', SOCKET_ETIMEDOUT);
  192. }
  193. if ($selected === false) {
  194. $this->emitSocketError();
  195. }
  196. }
  197. public function connect()
  198. {
  199. parent::connect();
  200. $this->connectWithTimeout($this->_params);
  201. if (count($this->_initCmds) > 0) {
  202. $this->sendInitializationCommands();
  203. }
  204. }
  205. public function disconnect()
  206. {
  207. if ($this->isConnected()) {
  208. socket_close($this->getResource());
  209. parent::disconnect();
  210. }
  211. }
  212. private function sendInitializationCommands()
  213. {
  214. foreach ($this->_initCmds as $command) {
  215. $this->writeCommand($command);
  216. }
  217. foreach ($this->_initCmds as $command) {
  218. $this->readResponse($command);
  219. }
  220. }
  221. private function write($buffer)
  222. {
  223. $socket = $this->getResource();
  224. while (($length = strlen($buffer)) > 0) {
  225. $written = socket_write($socket, $buffer, $length);
  226. if ($length === $written) {
  227. return;
  228. }
  229. if ($written === false) {
  230. $this->onConnectionError('Error while writing bytes to the server');
  231. }
  232. $buffer = substr($buffer, $written);
  233. }
  234. }
  235. public function read()
  236. {
  237. $socket = $this->getResource();
  238. $reader = $this->_reader;
  239. while (($state = phpiredis_reader_get_state($reader)) === PHPIREDIS_READER_STATE_INCOMPLETE) {
  240. if (@socket_recv($socket, $buffer, 4096, 0) === false || $buffer === '') {
  241. $this->emitSocketError();
  242. }
  243. phpiredis_reader_feed($reader, $buffer);
  244. }
  245. if ($state === PHPIREDIS_READER_STATE_COMPLETE) {
  246. return phpiredis_reader_get_reply($reader);
  247. }
  248. else {
  249. $this->onProtocolError(phpiredis_reader_get_error($reader));
  250. }
  251. }
  252. public function writeCommand(ICommand $command)
  253. {
  254. $cmdargs = $command->getArguments();
  255. array_unshift($cmdargs, $command->getId());
  256. $this->write(phpiredis_format_command($cmdargs));
  257. }
  258. }