PhpiredisConnection.php 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249
  1. <?php
  2. namespace Predis\Network;
  3. use Predis\ICommand;
  4. use Predis\ResponseError;
  5. use Predis\ResponseQueued;
  6. use Predis\ServerException;
  7. use Predis\ConnectionParameters;
  8. class PhpiredisConnection extends ConnectionBase {
  9. private $_reader, $_throwErrors;
  10. public function __construct(ConnectionParameters $parameters) {
  11. parent::__construct($this->checkParameters($parameters));
  12. $this->_throwErrors = true;
  13. $this->initializeReader();
  14. }
  15. public function __destruct() {
  16. $this->disconnect();
  17. phpiredis_reader_destroy($this->_reader);
  18. }
  19. protected function checkParameters(ConnectionParameters $parameters) {
  20. switch ($parameters->scheme) {
  21. case 'unix':
  22. $pathToSocket = $parameters->path;
  23. if (!isset($pathToSocket)) {
  24. throw new \InvalidArgumentException('Missing UNIX domain socket path');
  25. }
  26. if (!file_exists($pathToSocket)) {
  27. throw new \InvalidArgumentException("Could not find $pathToSocket");
  28. }
  29. case 'tcp':
  30. return $parameters;
  31. default:
  32. throw new \InvalidArgumentException("Invalid scheme: {$parameters->scheme}");
  33. }
  34. if ($parameters->connection_persistent == true) {
  35. throw new \InvalidArgumentException(
  36. 'Persistent connections are not supported by this connection class.'
  37. );
  38. }
  39. }
  40. private function initializeReader() {
  41. if (!function_exists('phpiredis_reader_create')) {
  42. throw new ClientException(
  43. 'The phpiredis extension must be loaded in order to ' .
  44. 'be able to use this protocol processor'
  45. );
  46. }
  47. $this->_reader = phpiredis_reader_create();
  48. phpiredis_reader_set_status_handler($this->_reader, $this->getStatusHandler());
  49. phpiredis_reader_set_error_handler($this->_reader, $this->getErrorHandler());
  50. }
  51. private function getStatusHandler() {
  52. return function($payload) {
  53. switch ($payload) {
  54. case 'OK':
  55. return true;
  56. case 'QUEUED':
  57. return new ResponseQueued();
  58. default:
  59. return $payload;
  60. }
  61. };
  62. }
  63. private function getErrorHandler() {
  64. if ($this->_throwErrors) {
  65. return function($errorMessage) {
  66. throw new ServerException(substr($errorMessage, 4));
  67. };
  68. }
  69. return function($errorMessage) {
  70. return new ResponseError(substr($errorMessage, 4));
  71. };
  72. }
  73. private function emitSocketError() {
  74. $errno = socket_last_error();
  75. $errstr = socket_strerror($errno);
  76. $this->disconnect();
  77. $this->onCommunicationException(trim($errstr), $errno);
  78. }
  79. protected function createResource() {
  80. $parameters = $this->_params;
  81. $initializer = array($this, "{$parameters->scheme}SocketInitializer");
  82. $socket = call_user_func($initializer, $parameters);
  83. $this->setSocketOptions($socket, $parameters);
  84. return $socket;
  85. }
  86. private function tcpSocketInitializer(ConnectionParameters $parameters) {
  87. $socket = @socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
  88. if (!is_resource($socket)) {
  89. $this->emitSocketError();
  90. }
  91. $addressLong = ip2long($parameters->host);
  92. if ($addressLong == -1 || $addressLong === false) {
  93. $host = gethostbyname($parameters->host);
  94. }
  95. return $socket;
  96. }
  97. private function unixSocketInitializer(ConnectionParameters $parameters) {
  98. $socket = @socket_create(AF_UNIX, SOCK_STREAM, 0);
  99. if (!is_resource($socket)) {
  100. $this->emitSocketError();
  101. }
  102. return $socket;
  103. }
  104. private function setSocketOptions($socket, ConnectionParameters $parameters) {
  105. if ($parameters->scheme !== 'tcp') {
  106. return;
  107. }
  108. if (!socket_set_option($socket, SOL_TCP, TCP_NODELAY, 1)) {
  109. $this->emitSocketError();
  110. }
  111. if (!socket_set_option($socket, SOL_SOCKET, SO_REUSEADDR, 1)) {
  112. $this->emitSocketError();
  113. }
  114. if (isset($parameters->read_write_timeout)) {
  115. $rwtimeout = $parameters->read_write_timeout;
  116. $timeoutSec = floor($rwtimeout);
  117. $timeoutUsec = ($rwtimeout - $timeoutSec) * 1000000;
  118. $timeout = array('sec' => $timeoutSec, 'usec' => $timeoutUsec);
  119. if (!socket_set_option($socket, SOL_SOCKET, SO_SNDTIMEO, $timeout)) {
  120. $this->emitSocketError();
  121. }
  122. if (!socket_set_option($socket, SOL_SOCKET, SO_RCVTIMEO, $timeout)) {
  123. $this->emitSocketError();
  124. }
  125. }
  126. }
  127. private function connectWithTimeout(ConnectionParameters $parameters) {
  128. $socket = $this->_resource;
  129. socket_set_nonblock($socket);
  130. $host = $parameters->scheme === 'unix' ? $parameters->path : $parameters->host;
  131. if (@socket_connect($socket, $host, $parameters->port) === false) {
  132. $error = socket_last_error();
  133. if ($error != SOCKET_EINPROGRESS && $error != SOCKET_EALREADY) {
  134. $this->emitSocketError();
  135. }
  136. }
  137. socket_set_block($socket);
  138. $null = null;
  139. $selectable = array($socket);
  140. $timeout = $parameters->connection_timeout;
  141. $timeoutSecs = floor($timeout);
  142. $timeoutUSecs = ($timeout - $timeoutSecs) * 1000000;
  143. $selected = socket_select($selectable, $selectable, $null, $timeoutSecs, $timeoutUSecs);
  144. if ($selected === 2) {
  145. $this->onCommunicationException('Connection refused', SOCKET_ECONNREFUSED);
  146. }
  147. if ($selected === 0) {
  148. $this->onCommunicationException('Connection timed out', SOCKET_ETIMEDOUT);
  149. }
  150. if ($selected === false) {
  151. $this->emitSocketError();
  152. }
  153. }
  154. public function connect() {
  155. parent::connect();
  156. $this->connectWithTimeout($this->_params);
  157. if (count($this->_initCmds) > 0) {
  158. $this->sendInitializationCommands();
  159. }
  160. }
  161. public function disconnect() {
  162. if ($this->isConnected()) {
  163. socket_close($this->_resource);
  164. $this->_resource = null;
  165. }
  166. }
  167. private function sendInitializationCommands() {
  168. foreach ($this->_initCmds as $command) {
  169. $this->writeCommand($command);
  170. }
  171. foreach ($this->_initCmds as $command) {
  172. $this->readResponse($command);
  173. }
  174. }
  175. private function write($buffer) {
  176. $socket = $this->getResource();
  177. while (($length = strlen($buffer)) > 0) {
  178. $written = socket_write($socket, $buffer, $length);
  179. if ($length === $written) {
  180. return true;
  181. }
  182. if ($written === false) {
  183. $this->onCommunicationException('Error while writing bytes to the server');
  184. }
  185. $buffer = substr($buffer, $written);
  186. }
  187. }
  188. public function read() {
  189. $socket = $this->getResource();
  190. $reader = $this->_reader;
  191. while (($state = phpiredis_reader_get_state($reader)) === PHPIREDIS_READER_STATE_INCOMPLETE) {
  192. if (@socket_recv($socket, $buffer, 4096, 0) === false || $buffer === '') {
  193. $this->emitSocketError();
  194. }
  195. phpiredis_reader_feed($reader, $buffer);
  196. }
  197. if ($state === PHPIREDIS_READER_STATE_COMPLETE) {
  198. return phpiredis_reader_get_reply($reader);
  199. }
  200. else {
  201. $this->onCommunicationException(phpiredis_reader_get_error($reader));
  202. }
  203. }
  204. public function writeCommand(ICommand $command) {
  205. $cmdargs = $command->getArguments();
  206. array_unshift($cmdargs, $command->getCommandId());
  207. $this->write(phpiredis_format_command($cmdargs));
  208. }
  209. public function readResponse(ICommand $command) {
  210. $reply = $this->read();
  211. return isset($reply->skipParse) ? $reply : $command->parseResponse($reply);
  212. }
  213. public function setProtocolOption($option, $value) {
  214. switch ($option) {
  215. case 'iterable_multibulk':
  216. // TODO: iterable multibulk replies cannot be supported
  217. break;
  218. case 'throw_errors':
  219. $this->_throwErrors = (bool) $value;
  220. phpiredis_reader_set_error_handler($this->_reader, $this->getErrorHandler());
  221. break;
  222. }
  223. }
  224. }