FastTextProtocol.php 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. <?php
  2. namespace Predis\Protocols;
  3. use Predis\Utils;
  4. use Predis\ICommand;
  5. use Predis\MalformedServerResponse;
  6. use Predis\Network\IConnectionSingle;
  7. use Predis\Iterators\MultiBulkResponseSimple;
  8. class FastTextProtocol implements IRedisProtocol {
  9. private $_mbiterable, $_throwErrors;
  10. public function __construct() {
  11. $this->_mbiterable = false;
  12. $this->_throwErrors = true;
  13. }
  14. public function write(IConnectionSingle $connection, ICommand $command) {
  15. $commandId = $command->getCommandId();
  16. $arguments = $command->getArguments();
  17. $cmdlen = strlen($commandId);
  18. $reqlen = count($arguments) + 1;
  19. $buffer = "*{$reqlen}\r\n\${$cmdlen}\r\n{$commandId}\r\n";
  20. for ($i = 0; $i < $reqlen - 1; $i++) {
  21. $argument = $arguments[$i];
  22. $arglen = strlen($argument);
  23. $buffer .= "\${$arglen}\r\n{$argument}\r\n";
  24. }
  25. fwrite($connection->getResource(), $buffer);
  26. }
  27. public function read(IConnectionSingle $connection) {
  28. $bufferSize = 8192;
  29. $socket = $connection->getResource();
  30. $header = fgets($socket);
  31. $prefix = $header[0];
  32. $payload = substr($header, 1);
  33. switch ($prefix) {
  34. case '+': // inline
  35. $status = substr($payload, 0, -2);
  36. if ($status === 'OK') {
  37. return true;
  38. }
  39. if ($status === 'QUEUED') {
  40. return new \Predis\ResponseQueued();
  41. }
  42. return $status;
  43. case '$': // bulk
  44. $size = (int) $payload;
  45. if ($size === -1) {
  46. return null;
  47. }
  48. $bulkData = '';
  49. $bytesLeft = $size;
  50. do {
  51. $bulkData .= fread($socket, min($bytesLeft, $bufferSize));
  52. $bytesLeft = $size - strlen($bulkData);
  53. } while ($bytesLeft > 0);
  54. fread($socket, 2); // discard CRLF
  55. return $bulkData;
  56. case '*': // multi bulk
  57. $count = (int) $payload;
  58. if ($count === -1) {
  59. return null;
  60. }
  61. if ($this->_mbiterable == true) {
  62. return new MultiBulkResponseSimple($connection, $count);
  63. }
  64. $multibulk = array();
  65. for ($i = 0; $i < $count; $i++) {
  66. $size = (int) substr(fgets($socket), 1);
  67. if ($size === -1) {
  68. return $multibulk;
  69. }
  70. $bulkData = '';
  71. $bytesLeft = $size;
  72. do {
  73. $bulkData .= fread($socket, min($bytesLeft, $bufferSize));
  74. $bytesLeft = $size - strlen($bulkData);
  75. } while ($bytesLeft > 0);
  76. $multibulk[$i] = $bulkData;
  77. fread($socket, 2); // discard CRLF
  78. }
  79. return $multibulk;
  80. case ':': // integer
  81. return (int) $payload;
  82. case '-': // error
  83. if ($this->_throwErrors) {
  84. throw new Exception($payload);
  85. }
  86. return new Predis\ResponseError($payload);
  87. default:
  88. Utils::onCommunicationException(new MalformedServerResponse(
  89. $connection, "Unknown prefix: '$prefix'"
  90. ));
  91. }
  92. }
  93. public function setOption($option, $value) {
  94. switch ($option) {
  95. case 'iterable_multibulk':
  96. $this->_mbiterable = (bool) $value;
  97. break;
  98. case 'throw_errors':
  99. case 'throw_on_error':
  100. $this->_throwErrors = (bool) $value;
  101. break;
  102. }
  103. }
  104. }
  105. ?>