FastTextProtocol.php 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. <?php
  2. namespace Predis\Protocols;
  3. use Predis\Utils;
  4. use Predis\ICommand;
  5. use Predis\CommunicationException;
  6. use Predis\Network\IConnectionSingle;
  7. use Predis\Iterators\MultiBulkResponseSimple;
  8. class FastTextProtocol implements IRedisProtocol {
  9. private $_mbiterable, $_throwErrors;
  10. public function __construct() {
  11. $this->_mbiterable = false;
  12. $this->_throwErrors = true;
  13. }
  14. public function write(IConnectionSingle $connection, ICommand $command) {
  15. $commandId = $command->getCommandId();
  16. $arguments = $command->getArguments();
  17. $cmdlen = strlen($commandId);
  18. $reqlen = count($arguments) + 1;
  19. $buffer = "*{$reqlen}\r\n\${$cmdlen}\r\n{$commandId}\r\n";
  20. for ($i = 0; $i < $reqlen - 1; $i++) {
  21. $argument = $arguments[$i];
  22. $arglen = strlen($argument);
  23. $buffer .= "\${$arglen}\r\n{$argument}\r\n";
  24. }
  25. fwrite($connection->getResource(), $buffer);
  26. }
  27. public function read(IConnectionSingle $connection) {
  28. $bufferSize = 8192;
  29. $socket = $connection->getResource();
  30. $chunk = fgets($socket);
  31. if ($chunk === false || $chunk === '') {
  32. throw new CommunicationException(
  33. $connection, 'Error while reading line from the server'
  34. );
  35. }
  36. $prefix = $chunk[0];
  37. $payload = substr($chunk, 1);
  38. switch ($prefix) {
  39. case '+': // inline
  40. $status = substr($payload, 0, -2);
  41. if ($status === 'OK') {
  42. return true;
  43. }
  44. if ($status === 'QUEUED') {
  45. return new \Predis\ResponseQueued();
  46. }
  47. return $status;
  48. case '$': // bulk
  49. $size = (int) $payload;
  50. if ($size === -1) {
  51. return null;
  52. }
  53. $bulkData = '';
  54. $bytesLeft = $size;
  55. do {
  56. $chunk = fread($socket, min($bytesLeft, $bufferSize));
  57. if ($chunk === false || $chunk === '') {
  58. throw new CommunicationException(
  59. $connection, 'Error while reading bytes from the server'
  60. );
  61. }
  62. $bulkData .= $chunk;
  63. $bytesLeft = $size - strlen($bulkData);
  64. } while ($bytesLeft > 0);
  65. fread($socket, 2); // discard CRLF
  66. return $bulkData;
  67. case '*': // multi bulk
  68. $count = (int) $payload;
  69. if ($count === -1) {
  70. return null;
  71. }
  72. if ($this->_mbiterable == true) {
  73. return new MultiBulkResponseSimple($connection, $count);
  74. }
  75. $multibulk = array();
  76. for ($i = 0; $i < $count; $i++) {
  77. $chunk = fgets($socket);
  78. if ($chunk === false || $chunk === '') {
  79. throw new CommunicationException(
  80. $connection, 'Error while reading line from the server'
  81. );
  82. }
  83. $size = (int) substr($chunk, 1);
  84. if ($size === -1) {
  85. return $multibulk;
  86. }
  87. $bulkData = '';
  88. $bytesLeft = $size;
  89. do {
  90. $chunk = fread($socket, min($bytesLeft, $bufferSize));
  91. if ($chunk === false || $chunk === '') {
  92. throw new CommunicationException(
  93. $connection, 'Error while reading bytes from the server'
  94. );
  95. }
  96. $bulkData .= $chunk;
  97. $bytesLeft = $size - strlen($bulkData);
  98. } while ($bytesLeft > 0);
  99. $multibulk[$i] = $bulkData;
  100. fread($socket, 2); // discard CRLF
  101. }
  102. return $multibulk;
  103. case ':': // integer
  104. return (int) $payload;
  105. case '-': // error
  106. if ($this->_throwErrors) {
  107. throw new Exception($payload);
  108. }
  109. return new Predis\ResponseError($payload);
  110. default:
  111. throw new CommunicationException(
  112. $connection, "Unknown prefix: '$prefix'"
  113. );
  114. }
  115. }
  116. public function setOption($option, $value) {
  117. switch ($option) {
  118. case 'iterable_multibulk':
  119. $this->_mbiterable = (bool) $value;
  120. break;
  121. case 'throw_errors':
  122. case 'throw_on_error':
  123. $this->_throwErrors = (bool) $value;
  124. break;
  125. }
  126. }
  127. }
  128. ?>