WebdisConnection.php 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235
  1. <?php
  2. /*
  3. * This file is part of the Predis package.
  4. *
  5. * (c) Daniele Alessandri <suppakilla@gmail.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Predis\Network;
  11. // This class implements a Predis connection that actually talks with Webdis
  12. // (http://github.com/nicolasff/webdis) instead of connecting directly to Redis.
  13. // It relies on the cURL extension to communicate with the web server and the
  14. // phpiredis extension to parse the protocol of the replies returned in the http
  15. // response bodies.
  16. //
  17. // Some features are not yet available or they simply cannot be implemented:
  18. // - Pipelining commands.
  19. // - Publish / Subscribe.
  20. // - MULTI / EXEC transactions (not yet supported by Webdis).
  21. use Predis\IConnectionParameters;
  22. use Predis\ResponseError;
  23. use Predis\ClientException;
  24. use Predis\ServerException;
  25. use Predis\Commands\ICommand;
  26. use Predis\Protocol\ProtocolException;
  27. const ERR_MSG_EXTENSION = 'The %s extension must be loaded in order to be able to use this connection class';
  28. class WebdisConnection implements IConnectionSingle
  29. {
  30. private $_parameters;
  31. private $_resource;
  32. private $_reader;
  33. public function __construct(IConnectionParameters $parameters)
  34. {
  35. $this->_parameters = $parameters;
  36. if ($parameters->scheme !== 'http') {
  37. throw new \InvalidArgumentException("Invalid scheme: {$parameters->scheme}");
  38. }
  39. $this->checkExtensions();
  40. $this->_resource = $this->initializeCurl($parameters);
  41. $this->_reader = $this->initializeReader($parameters);
  42. }
  43. public function __destruct()
  44. {
  45. curl_close($this->_resource);
  46. phpiredis_reader_destroy($this->_reader);
  47. }
  48. private function throwNotSupportedException($function)
  49. {
  50. $class = __CLASS__;
  51. throw new \RuntimeException("The method $class::$function() is not supported");
  52. }
  53. private function checkExtensions()
  54. {
  55. if (!function_exists('curl_init')) {
  56. throw new ClientException(sprintf(ERR_MSG_EXTENSION, 'curl'));
  57. }
  58. if (!function_exists('phpiredis_reader_create')) {
  59. throw new ClientException(sprintf(ERR_MSG_EXTENSION, 'phpiredis'));
  60. }
  61. }
  62. private function initializeCurl(IConnectionParameters $parameters)
  63. {
  64. $options = array(
  65. CURLOPT_FAILONERROR => true,
  66. CURLOPT_CONNECTTIMEOUT_MS => $parameters->connection_timeout * 1000,
  67. CURLOPT_URL => "{$parameters->scheme}://{$parameters->host}:{$parameters->port}",
  68. CURLOPT_HTTP_VERSION => CURL_HTTP_VERSION_1_1,
  69. CURLOPT_POST => true,
  70. CURLOPT_WRITEFUNCTION => array($this, 'feedReader'),
  71. );
  72. if (isset($parameters->user, $parameters->pass)) {
  73. $options[CURLOPT_USERPWD] = "{$parameters->user}:{$parameters->pass}";
  74. }
  75. $resource = curl_init();
  76. curl_setopt_array($resource, $options);
  77. return $resource;
  78. }
  79. private function initializeReader(IConnectionParameters $parameters)
  80. {
  81. $reader = phpiredis_reader_create();
  82. phpiredis_reader_set_status_handler($reader, $this->getStatusHandler());
  83. phpiredis_reader_set_error_handler($reader, $this->getErrorHandler($parameters->throw_errors));
  84. return $reader;
  85. }
  86. private function getStatusHandler()
  87. {
  88. return function($payload) {
  89. return $payload === 'OK' ? true : $payload;
  90. };
  91. }
  92. private function getErrorHandler($throwErrors)
  93. {
  94. if ($throwErrors) {
  95. return function($errorMessage) {
  96. throw new ServerException($errorMessage);
  97. };
  98. }
  99. return function($errorMessage) {
  100. return new ResponseError($errorMessage);
  101. };
  102. }
  103. protected function feedReader($resource, $buffer)
  104. {
  105. phpiredis_reader_feed($this->_reader, $buffer);
  106. return strlen($buffer);
  107. }
  108. public function connect()
  109. {
  110. // NOOP
  111. }
  112. public function disconnect()
  113. {
  114. // NOOP
  115. }
  116. public function isConnected()
  117. {
  118. return true;
  119. }
  120. protected function getCommandId(ICommand $command)
  121. {
  122. switch (($commandId = $command->getId())) {
  123. case 'AUTH':
  124. case 'SELECT':
  125. case 'MULTI':
  126. case 'EXEC':
  127. case 'WATCH':
  128. case 'UNWATCH':
  129. case 'DISCARD':
  130. throw new \InvalidArgumentException("Disabled command: {$command->getId()}");
  131. default:
  132. return $commandId;
  133. }
  134. }
  135. public function writeCommand(ICommand $command)
  136. {
  137. $this->throwNotSupportedException(__FUNCTION__);
  138. }
  139. public function readResponse(ICommand $command)
  140. {
  141. $this->throwNotSupportedException(__FUNCTION__);
  142. }
  143. public function executeCommand(ICommand $command)
  144. {
  145. $resource = $this->_resource;
  146. $commandId = $this->getCommandId($command);
  147. if ($arguments = $command->getArguments()) {
  148. $arguments = implode('/', array_map('urlencode', $arguments));
  149. $serializedCommand = "$commandId/$arguments.raw";
  150. }
  151. else {
  152. $serializedCommand = "$commandId.raw";
  153. }
  154. curl_setopt($resource, CURLOPT_POSTFIELDS, $serializedCommand);
  155. if (curl_exec($resource) === false) {
  156. $error = curl_error($resource);
  157. $errno = curl_errno($resource);
  158. throw new ConnectionException($this, trim($error), $errno);
  159. }
  160. $readerState = phpiredis_reader_get_state($this->_reader);
  161. if ($readerState === PHPIREDIS_READER_STATE_COMPLETE) {
  162. $reply = phpiredis_reader_get_reply($this->_reader);
  163. if ($reply instanceof IReplyObject) {
  164. return $reply;
  165. }
  166. return $command->parseResponse($reply);
  167. }
  168. else {
  169. $error = phpiredis_reader_get_error($this->_reader);
  170. throw new ProtocolException($this, $error);
  171. }
  172. }
  173. public function getResource()
  174. {
  175. return $this->_resource;
  176. }
  177. public function getParameters()
  178. {
  179. return $this->_parameters;
  180. }
  181. public function pushInitCommand(ICommand $command)
  182. {
  183. $this->throwNotSupportedException(__FUNCTION__);
  184. }
  185. public function read()
  186. {
  187. $this->throwNotSupportedException(__FUNCTION__);
  188. }
  189. public function __toString()
  190. {
  191. return "{$this->_parameters->host}:{$this->_parameters->port}";
  192. }
  193. }