WebdisConnection.php 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. <?php
  2. /*
  3. This class implements a Predis connection that actually talks with Webdis
  4. (http://github.com/nicolasff/webdis) instead of connecting directly to Redis.
  5. It relies on the http PECL extension to communicate with the web server and the
  6. phpiredis extension to parse the protocol of the replies returned in the http
  7. response bodies.
  8. Since this connection class is highly experimental, some features have not been
  9. implemented yet (or they simply cannot be implemented at all). Here is a list:
  10. - Pipelining commands.
  11. - Publish / Subscribe.
  12. - MULTI / EXEC transactions (not yet supported by Webdis).
  13. */
  14. namespace Predis\Network;
  15. use HttpRequest;
  16. use Predis\IConnectionParameters;
  17. use Predis\ResponseError;
  18. use Predis\ClientException;
  19. use Predis\ServerException;
  20. use Predis\CommunicationException;
  21. use Predis\Commands\ICommand;
  22. const ERR_MSG_EXTENSION = 'The %s extension must be loaded in order to be able to use this connection class';
  23. class WebdisConnection implements IConnectionSingle {
  24. private $_parameters;
  25. private $_webdisUrl;
  26. private $_reader;
  27. private static function throwNotImplementedException($class, $function) {
  28. throw new \RuntimeException("The method $class::$function() is not implemented");
  29. }
  30. public function __construct(IConnectionParameters $parameters) {
  31. $this->checkExtensions();
  32. if ($parameters->scheme !== 'http') {
  33. throw new \InvalidArgumentException("Invalid scheme: {$parameters->scheme}");
  34. }
  35. $this->_parameters = $parameters;
  36. $this->_webdisUrl = "{$parameters->scheme}://{$parameters->host}:{$parameters->port}";
  37. $this->_reader = $this->initializeReader($parameters);
  38. }
  39. public function __destruct() {
  40. phpiredis_reader_destroy($this->_reader);
  41. }
  42. private function checkExtensions() {
  43. if (!class_exists("HttpRequest")) {
  44. throw new ClientException(sprintf(ERR_MSG_EXTENSION, 'http'));
  45. }
  46. if (!function_exists('phpiredis_reader_create')) {
  47. throw new ClientException(sprintf(ERR_MSG_EXTENSION, 'phpiredis'));
  48. }
  49. }
  50. private function initializeReader(IConnectionParameters $parameters) {
  51. $throwErrors = $parameters->throw_errors;
  52. $reader = phpiredis_reader_create();
  53. phpiredis_reader_set_status_handler($reader, $this->getStatusHandler());
  54. phpiredis_reader_set_error_handler($reader, $this->getErrorHandler($throwErrors));
  55. return $reader;
  56. }
  57. private function getStatusHandler() {
  58. return function($payload) {
  59. return $payload === 'OK' ? true : $payload;
  60. };
  61. }
  62. private function getErrorHandler($throwErrors) {
  63. if ($throwErrors) {
  64. return function($errorMessage) {
  65. throw new ServerException($errorMessage);
  66. };
  67. }
  68. return function($errorMessage) {
  69. return new ResponseError($errorMessage);
  70. };
  71. }
  72. public function connect() {
  73. // NOOP
  74. }
  75. public function disconnect() {
  76. // NOOP
  77. }
  78. public function isConnected() {
  79. return true;
  80. }
  81. public function writeCommand(ICommand $command) {
  82. self::throwNotImplementedException(__CLASS__, __FUNCTION__);
  83. }
  84. public function readResponse(ICommand $command) {
  85. self::throwNotImplementedException(__CLASS__, __FUNCTION__);
  86. }
  87. protected function getCommandId(ICommand $command) {
  88. switch (($commandId = $command->getId())) {
  89. case 'AUTH':
  90. case 'SELECT':
  91. case 'MULTI':
  92. case 'EXEC':
  93. case 'WATCH':
  94. case 'UNWATCH':
  95. case 'DISCARD':
  96. throw new \InvalidArgumentException("Disabled command: {$command->getId()}");
  97. default:
  98. return $commandId;
  99. }
  100. }
  101. public function executeCommand(ICommand $command) {
  102. $commandId = $this->getCommandId($command);
  103. $arguments = implode('/', array_map('urlencode', $command->getArguments()));
  104. $request = new HttpRequest($this->_webdisUrl, HttpRequest::METH_POST);
  105. $request->setBody("$commandId/$arguments.raw");
  106. $request->send();
  107. phpiredis_reader_feed($this->_reader, $request->getResponseBody());
  108. $reply = phpiredis_reader_get_reply($this->_reader);
  109. if ($reply instanceof IReplyObject) {
  110. return $reply;
  111. }
  112. return $command->parseResponse($reply);
  113. }
  114. public function getResource() {
  115. self::throwNotImplementedException(__CLASS__, __FUNCTION__);
  116. }
  117. public function getParameters() {
  118. return $this->_parameters;
  119. }
  120. public function pushInitCommand(ICommand $command) {
  121. self::throwNotImplementedException(__CLASS__, __FUNCTION__);
  122. }
  123. public function read() {
  124. self::throwNotImplementedException(__CLASS__, __FUNCTION__);
  125. }
  126. public function __toString() {
  127. return "{$this->_parameters->host}:{$this->_parameters->port}";
  128. }
  129. }