WebdisConnection.php 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. <?php
  2. /*
  3. This class implements a Predis connection that actually talks with Webdis
  4. (http://github.com/nicolasff/webdis) instead of connecting directly to Redis.
  5. It relies on the http PECL extension to communicate with the web server and the
  6. phpiredis extension to parse the protocol of the replies returned in the http
  7. response bodies.
  8. Since this connection class is highly experimental, some features have not been
  9. implemented yet (or they simply cannot be implemented at all). Here is a list:
  10. - Pipelining commands.
  11. - Publish / Subscribe.
  12. - MULTI / EXEC transactions (not yet supported by Webdis).
  13. */
  14. namespace Predis\Network;
  15. use HttpRequest;
  16. use Predis\IConnectionParameters;
  17. use Predis\ResponseError;
  18. use Predis\ClientException;
  19. use Predis\ServerException;
  20. use Predis\CommunicationException;
  21. use Predis\Commands\ICommand;
  22. const ERR_MSG_EXTENSION = 'The %s extension must be loaded in order to be able to use this connection class';
  23. class WebdisConnection implements IConnectionSingle {
  24. private $_parameters;
  25. private $_webdisUrl;
  26. private $_reader;
  27. private static function throwNotImplementedException($class, $function) {
  28. throw new \RuntimeException("The method $class::$function() is not implemented");
  29. }
  30. public function __construct(IConnectionParameters $parameters) {
  31. $this->checkExtensions();
  32. if ($parameters->scheme !== 'http') {
  33. throw new \InvalidArgumentException("Invalid scheme: {$parameters->scheme}");
  34. }
  35. $this->_parameters = $parameters;
  36. $this->_webdisUrl = "{$parameters->scheme}://{$parameters->host}:{$parameters->port}";
  37. $this->_reader = $this->initializeReader($parameters);
  38. }
  39. public function __destruct() {
  40. phpiredis_reader_destroy($this->_reader);
  41. }
  42. private function checkExtensions() {
  43. if (!class_exists("HttpRequest")) {
  44. throw new ClientException(sprintf(ERR_MSG_EXTENSION, 'http'));
  45. }
  46. if (!function_exists('phpiredis_reader_create')) {
  47. throw new ClientException(sprintf(ERR_MSG_EXTENSION, 'phpiredis'));
  48. }
  49. }
  50. private function initializeReader(IConnectionParameters $parameters) {
  51. $throwErrors = $parameters->throw_errors;
  52. $reader = phpiredis_reader_create();
  53. phpiredis_reader_set_status_handler($reader, $this->getStatusHandler());
  54. phpiredis_reader_set_error_handler($reader, $this->getErrorHandler($throwErrors));
  55. return $reader;
  56. }
  57. private function getStatusHandler() {
  58. return function($payload) {
  59. return $payload === 'OK' ? true : $payload;
  60. };
  61. }
  62. private function getErrorHandler($throwErrors) {
  63. if ($throwErrors) {
  64. return function($errorMessage) {
  65. throw new ServerException($errorMessage);
  66. };
  67. }
  68. return function($errorMessage) {
  69. return new ResponseError($errorMessage);
  70. };
  71. }
  72. private static function argumentsSerializer($str, $arg) {
  73. $str .= '/' . urlencode($arg);
  74. return $str;
  75. }
  76. public function connect() {
  77. // NOOP
  78. }
  79. public function disconnect() {
  80. // NOOP
  81. }
  82. public function isConnected() {
  83. return true;
  84. }
  85. public function writeCommand(ICommand $command) {
  86. self::throwNotImplementedException(__CLASS__, __FUNCTION__);
  87. }
  88. public function readResponse(ICommand $command) {
  89. self::throwNotImplementedException(__CLASS__, __FUNCTION__);
  90. }
  91. public function executeCommand(ICommand $command) {
  92. $params = $this->_parameters;
  93. $arguments = array_reduce($command->getArguments(), 'self::argumentsSerializer');
  94. $request = new HttpRequest($this->_webdisUrl, HttpRequest::METH_POST);
  95. $request->setBody(sprintf('%s%s.raw', $command->getId(), $arguments));
  96. $request->send();
  97. phpiredis_reader_feed($this->_reader, $request->getResponseBody());
  98. $reply = phpiredis_reader_get_reply($this->_reader);
  99. return isset($reply->skipParse) ? $reply : $command->parseResponse($reply);
  100. }
  101. public function getResource() {
  102. self::throwNotImplementedException(__CLASS__, __FUNCTION__);
  103. }
  104. public function getParameters() {
  105. return $this->_parameters;
  106. }
  107. public function pushInitCommand(ICommand $command) {
  108. self::throwNotImplementedException(__CLASS__, __FUNCTION__);
  109. }
  110. public function read() {
  111. self::throwNotImplementedException(__CLASS__, __FUNCTION__);
  112. }
  113. public function __toString() {
  114. return "{$this->_parameters->host}:{$this->_parameters->port}";
  115. }
  116. }