WebdisConnection.php 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196
  1. <?php
  2. /*
  3. This class implements a Predis connection that actually talks with Webdis
  4. (http://github.com/nicolasff/webdis) instead of connecting directly to Redis.
  5. It relies on the http PECL extension to communicate with the web server and the
  6. phpiredis extension to parse the protocol of the replies returned in the http
  7. response bodies.
  8. Some features are not yet available or they simply cannot be implemented:
  9. - Pipelining commands.
  10. - Publish / Subscribe.
  11. - MULTI / EXEC transactions (not yet supported by Webdis).
  12. */
  13. namespace Predis\Network;
  14. use Predis\IConnectionParameters;
  15. use Predis\ResponseError;
  16. use Predis\ClientException;
  17. use Predis\ServerException;
  18. use Predis\Commands\ICommand;
  19. use Predis\Protocol\ProtocolException;
  20. const ERR_MSG_EXTENSION = 'The %s extension must be loaded in order to be able to use this connection class';
  21. class WebdisConnection implements IConnectionSingle {
  22. private $_parameters;
  23. private $_resource;
  24. private $_reader;
  25. public function __construct(IConnectionParameters $parameters) {
  26. $this->_parameters = $parameters;
  27. if ($parameters->scheme !== 'http') {
  28. throw new \InvalidArgumentException("Invalid scheme: {$parameters->scheme}");
  29. }
  30. $this->checkExtensions();
  31. $this->_resource = $this->initializeCurl($parameters);
  32. $this->_reader = $this->initializeReader($parameters);
  33. }
  34. public function __destruct() {
  35. curl_close($this->_resource);
  36. phpiredis_reader_destroy($this->_reader);
  37. }
  38. private function throwNotSupportedException($function) {
  39. $class = __CLASS__;
  40. throw new \RuntimeException("The method $class::$function() is not supported");
  41. }
  42. private function checkExtensions() {
  43. if (!function_exists('curl_init')) {
  44. throw new ClientException(sprintf(ERR_MSG_EXTENSION, 'curl'));
  45. }
  46. if (!function_exists('phpiredis_reader_create')) {
  47. throw new ClientException(sprintf(ERR_MSG_EXTENSION, 'phpiredis'));
  48. }
  49. }
  50. private function initializeCurl(IConnectionParameters $parameters) {
  51. $options = array(
  52. CURLOPT_FAILONERROR => true,
  53. CURLOPT_CONNECTTIMEOUT_MS => $parameters->connection_timeout * 1000,
  54. CURLOPT_URL => "{$parameters->scheme}://{$parameters->host}:{$parameters->port}",
  55. CURLOPT_HTTP_VERSION => CURL_HTTP_VERSION_1_1,
  56. CURLOPT_POST => true,
  57. CURLOPT_WRITEFUNCTION => array($this, 'feedReader'),
  58. );
  59. if (isset($parameters->user, $parameters->pass)) {
  60. $options[CURLOPT_USERPWD] = "{$parameters->user}:{$parameters->pass}";
  61. }
  62. $resource = curl_init();
  63. curl_setopt_array($resource, $options);
  64. return $resource;
  65. }
  66. private function initializeReader(IConnectionParameters $parameters) {
  67. $reader = phpiredis_reader_create();
  68. phpiredis_reader_set_status_handler($reader, $this->getStatusHandler());
  69. phpiredis_reader_set_error_handler($reader, $this->getErrorHandler($parameters->throw_errors));
  70. return $reader;
  71. }
  72. private function getStatusHandler() {
  73. return function($payload) {
  74. return $payload === 'OK' ? true : $payload;
  75. };
  76. }
  77. private function getErrorHandler($throwErrors) {
  78. if ($throwErrors) {
  79. return function($errorMessage) {
  80. throw new ServerException($errorMessage);
  81. };
  82. }
  83. return function($errorMessage) {
  84. return new ResponseError($errorMessage);
  85. };
  86. }
  87. protected function feedReader($resource, $buffer) {
  88. phpiredis_reader_feed($this->_reader, $buffer);
  89. return strlen($buffer);
  90. }
  91. public function connect() {
  92. // NOOP
  93. }
  94. public function disconnect() {
  95. // NOOP
  96. }
  97. public function isConnected() {
  98. return true;
  99. }
  100. protected function getCommandId(ICommand $command) {
  101. switch (($commandId = $command->getId())) {
  102. case 'AUTH':
  103. case 'SELECT':
  104. case 'MULTI':
  105. case 'EXEC':
  106. case 'WATCH':
  107. case 'UNWATCH':
  108. case 'DISCARD':
  109. throw new \InvalidArgumentException("Disabled command: {$command->getId()}");
  110. default:
  111. return $commandId;
  112. }
  113. }
  114. public function writeCommand(ICommand $command) {
  115. $this->throwNotSupportedException(__FUNCTION__);
  116. }
  117. public function readResponse(ICommand $command) {
  118. $this->throwNotSupportedException(__FUNCTION__);
  119. }
  120. public function executeCommand(ICommand $command) {
  121. $resource = $this->_resource;
  122. $commandId = $this->getCommandId($command);
  123. if ($arguments = $command->getArguments()) {
  124. $arguments = implode('/', array_map('urlencode', $arguments));
  125. $serializedCommand = "$commandId/$arguments.raw";
  126. }
  127. else {
  128. $serializedCommand = "$commandId.raw";
  129. }
  130. curl_setopt($resource, CURLOPT_POSTFIELDS, $serializedCommand);
  131. if (curl_exec($resource) === false) {
  132. $error = curl_error($resource);
  133. $errno = curl_errno($resource);
  134. throw new ConnectionException($this, trim($error), $errno);
  135. }
  136. $readerState = phpiredis_reader_get_state($this->_reader);
  137. if ($readerState === PHPIREDIS_READER_STATE_COMPLETE) {
  138. $reply = phpiredis_reader_get_reply($this->_reader);
  139. if ($reply instanceof IReplyObject) {
  140. return $reply;
  141. }
  142. return $command->parseResponse($reply);
  143. }
  144. else {
  145. $error = phpiredis_reader_get_error($this->_reader);
  146. throw new ProtocolException($this, $error);
  147. }
  148. }
  149. public function getResource() {
  150. return $this->_resource;
  151. }
  152. public function getParameters() {
  153. return $this->_parameters;
  154. }
  155. public function pushInitCommand(ICommand $command) {
  156. $this->throwNotSupportedException(__FUNCTION__);
  157. }
  158. public function read() {
  159. $this->throwNotSupportedException(__FUNCTION__);
  160. }
  161. public function __toString() {
  162. return "{$this->_parameters->host}:{$this->_parameters->port}";
  163. }
  164. }