StreamConnection.php 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306
  1. <?php
  2. /*
  3. * This file is part of the Predis package.
  4. *
  5. * (c) Daniele Alessandri <suppakilla@gmail.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Predis\Connection;
  11. use Predis\ResponseError;
  12. use Predis\ResponseQueued;
  13. use Predis\ServerException;
  14. use Predis\NotSupportedException;
  15. use Predis\ConnectionParametersInterface;
  16. use Predis\Command\CommandInterface;
  17. use Predis\Iterator\MultiBulkResponseSimple;
  18. /**
  19. * Standard connection to Redis servers implemented on top of PHP's streams.
  20. * The connection parameters supported by this class are:
  21. *
  22. * - scheme: it can be either 'tcp' or 'unix'.
  23. * - host: hostname or IP address of the server.
  24. * - port: TCP port of the server.
  25. * - timeout: timeout to perform the connection.
  26. * - read_write_timeout: timeout of read / write operations.
  27. * - async_connect: performs the connection asynchronously.
  28. * - persistent: the connection is left intact after a GC collection.
  29. * - throw_errors: -ERR replies treated as exceptions.
  30. * - iterable_multibulk: multibulk replies treated as iterable objects.
  31. *
  32. * @author Daniele Alessandri <suppakilla@gmail.com>
  33. */
  34. class StreamConnection extends AbstractConnection
  35. {
  36. private $mbiterable;
  37. private $throwErrors;
  38. /**
  39. * Disconnects from the server and destroys the underlying resource when
  40. * PHP's garbage collector kicks in only if the connection has not been
  41. * marked as persistent.
  42. */
  43. public function __destruct()
  44. {
  45. if (!$this->parameters->persistent) {
  46. $this->disconnect();
  47. }
  48. }
  49. /**
  50. * {@inheritdoc}
  51. */
  52. protected function initializeProtocol(ConnectionParametersInterface $parameters)
  53. {
  54. $this->throwErrors = (bool) $parameters->throw_errors;
  55. $this->mbiterable = (bool) $parameters->iterable_multibulk;
  56. }
  57. /**
  58. * {@inheritdoc}
  59. */
  60. protected function createResource()
  61. {
  62. $parameters = $this->parameters;
  63. $initializer = "{$parameters->scheme}StreamInitializer";
  64. return $this->$initializer($parameters);
  65. }
  66. /**
  67. * Initializes a TCP stream resource.
  68. *
  69. * @param ConnectionParametersInterface $parameters Parameters used to initialize the connection.
  70. * @return resource
  71. */
  72. private function tcpStreamInitializer(ConnectionParametersInterface $parameters)
  73. {
  74. $uri = "tcp://{$parameters->host}:{$parameters->port}/";
  75. $flags = STREAM_CLIENT_CONNECT;
  76. if (isset($parameters->async_connect) && $parameters->async_connect === true) {
  77. $flags |= STREAM_CLIENT_ASYNC_CONNECT;
  78. }
  79. if (isset($parameters->persistent) && $parameters->persistent === true) {
  80. $flags |= STREAM_CLIENT_PERSISTENT;
  81. }
  82. $resource = @stream_socket_client(
  83. $uri, $errno, $errstr, $parameters->timeout, $flags
  84. );
  85. if (!$resource) {
  86. $this->onConnectionError(trim($errstr), $errno);
  87. }
  88. if (isset($parameters->read_write_timeout)) {
  89. $rwtimeout = $parameters->read_write_timeout;
  90. $rwtimeout = $rwtimeout > 0 ? $rwtimeout : -1;
  91. $timeoutSeconds = floor($rwtimeout);
  92. $timeoutUSeconds = ($rwtimeout - $timeoutSeconds) * 1000000;
  93. stream_set_timeout($resource, $timeoutSeconds, $timeoutUSeconds);
  94. }
  95. return $resource;
  96. }
  97. /**
  98. * Initializes a UNIX stream resource.
  99. *
  100. * @param ConnectionParametersInterface $parameters Parameters used to initialize the connection.
  101. * @return resource
  102. */
  103. private function unixStreamInitializer(ConnectionParametersInterface $parameters)
  104. {
  105. $uri = "unix://{$parameters->path}";
  106. $flags = STREAM_CLIENT_CONNECT;
  107. if ($parameters->persistent === true) {
  108. $flags |= STREAM_CLIENT_PERSISTENT;
  109. }
  110. $resource = @stream_socket_client(
  111. $uri, $errno, $errstr, $parameters->timeout, $flags
  112. );
  113. if (!$resource) {
  114. $this->onConnectionError(trim($errstr), $errno);
  115. }
  116. return $resource;
  117. }
  118. /**
  119. * {@inheritdoc}
  120. */
  121. public function connect()
  122. {
  123. parent::connect();
  124. if (count($this->initCmds) > 0){
  125. $this->sendInitializationCommands();
  126. }
  127. }
  128. /**
  129. * {@inheritdoc}
  130. */
  131. public function disconnect()
  132. {
  133. if ($this->isConnected()) {
  134. fclose($this->getResource());
  135. parent::disconnect();
  136. }
  137. }
  138. /**
  139. * Sends the initialization commands to Redis when the connection is opened.
  140. */
  141. private function sendInitializationCommands()
  142. {
  143. foreach ($this->initCmds as $command) {
  144. $this->writeCommand($command);
  145. }
  146. foreach ($this->initCmds as $command) {
  147. $this->readResponse($command);
  148. }
  149. }
  150. /**
  151. * Performs a write operation on the stream of the buffer containing a
  152. * command serialized with the Redis wire protocol.
  153. *
  154. * @param string $buffer Redis wire protocol representation of a command.
  155. */
  156. protected function writeBytes($buffer)
  157. {
  158. $socket = $this->getResource();
  159. while (($length = strlen($buffer)) > 0) {
  160. $written = fwrite($socket, $buffer);
  161. if ($length === $written) {
  162. return;
  163. }
  164. if ($written === false || $written === 0) {
  165. $this->onConnectionError('Error while writing bytes to the server');
  166. }
  167. $buffer = substr($buffer, $written);
  168. }
  169. }
  170. /**
  171. * {@inheritdoc}
  172. */
  173. public function read() {
  174. $socket = $this->getResource();
  175. $chunk = fgets($socket);
  176. if ($chunk === false || $chunk === '') {
  177. $this->onConnectionError('Error while reading line from the server');
  178. }
  179. $prefix = $chunk[0];
  180. $payload = substr($chunk, 1, -2);
  181. switch ($prefix) {
  182. case '+': // inline
  183. switch ($payload) {
  184. case 'OK':
  185. return true;
  186. case 'QUEUED':
  187. return new ResponseQueued();
  188. default:
  189. return $payload;
  190. }
  191. case '$': // bulk
  192. $size = (int) $payload;
  193. if ($size === -1) {
  194. return null;
  195. }
  196. $bulkData = '';
  197. $bytesLeft = ($size += 2);
  198. do {
  199. $chunk = fread($socket, min($bytesLeft, 4096));
  200. if ($chunk === false || $chunk === '') {
  201. $this->onConnectionError(
  202. 'Error while reading bytes from the server'
  203. );
  204. }
  205. $bulkData .= $chunk;
  206. $bytesLeft = $size - strlen($bulkData);
  207. } while ($bytesLeft > 0);
  208. return substr($bulkData, 0, -2);
  209. case '*': // multi bulk
  210. $count = (int) $payload;
  211. if ($count === -1) {
  212. return null;
  213. }
  214. if ($this->mbiterable === true) {
  215. return new MultiBulkResponseSimple($this, $count);
  216. }
  217. $multibulk = array();
  218. for ($i = 0; $i < $count; $i++) {
  219. $multibulk[$i] = $this->read();
  220. }
  221. return $multibulk;
  222. case ':': // integer
  223. return (int) $payload;
  224. case '-': // error
  225. if ($this->throwErrors) {
  226. throw new ServerException($payload);
  227. }
  228. return new ResponseError($payload);
  229. default:
  230. $this->onProtocolError("Unknown prefix: '$prefix'");
  231. }
  232. }
  233. /**
  234. * {@inheritdoc}
  235. */
  236. public function writeCommand(CommandInterface $command)
  237. {
  238. $commandId = $command->getId();
  239. $arguments = $command->getArguments();
  240. $cmdlen = strlen($commandId);
  241. $reqlen = count($arguments) + 1;
  242. $buffer = "*{$reqlen}\r\n\${$cmdlen}\r\n{$commandId}\r\n";
  243. for ($i = 0; $i < $reqlen - 1; $i++) {
  244. $argument = $arguments[$i];
  245. $arglen = strlen($argument);
  246. $buffer .= "\${$arglen}\r\n{$argument}\r\n";
  247. }
  248. $this->writeBytes($buffer);
  249. }
  250. /**
  251. * {@inheritdoc}
  252. */
  253. public function __sleep()
  254. {
  255. return array_merge(parent::__sleep(), array('mbiterable', 'throwErrors'));
  256. }
  257. }