FastTextProtocol.php 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. <?php
  2. namespace Predis\Protocols;
  3. use Predis\Utils;
  4. use Predis\ICommand;
  5. use Predis\CommunicationException;
  6. use Predis\Network\IConnectionSingle;
  7. use Predis\Iterators\MultiBulkResponseSimple;
  8. class FastTextProtocol implements IRedisProtocol {
  9. const BUFFER_SIZE = 8192;
  10. private $_mbiterable, $_throwErrors;
  11. public function __construct() {
  12. $this->_mbiterable = false;
  13. $this->_throwErrors = true;
  14. }
  15. public function write(IConnectionSingle $connection, ICommand $command) {
  16. $commandId = $command->getCommandId();
  17. $arguments = $command->getArguments();
  18. $cmdlen = strlen($commandId);
  19. $reqlen = count($arguments) + 1;
  20. $buffer = "*{$reqlen}\r\n\${$cmdlen}\r\n{$commandId}\r\n";
  21. for ($i = 0; $i < $reqlen - 1; $i++) {
  22. $argument = $arguments[$i];
  23. $arglen = strlen($argument);
  24. $buffer .= "\${$arglen}\r\n{$argument}\r\n";
  25. }
  26. fwrite($connection->getResource(), $buffer);
  27. }
  28. public function read(IConnectionSingle $connection) {
  29. $socket = $connection->getResource();
  30. $chunk = fgets($socket);
  31. if ($chunk === false || $chunk === '') {
  32. throw new CommunicationException(
  33. $connection, 'Error while reading line from the server'
  34. );
  35. }
  36. $prefix = $chunk[0];
  37. $payload = substr($chunk, 1, -2);
  38. switch ($prefix) {
  39. case '+': // inline
  40. if ($payload === 'OK') {
  41. return true;
  42. }
  43. if ($payload === 'QUEUED') {
  44. return new \Predis\ResponseQueued();
  45. }
  46. return $payload;
  47. case '$': // bulk
  48. $size = (int) $payload;
  49. if ($size === -1) {
  50. return null;
  51. }
  52. $bulkData = '';
  53. $bytesLeft = ($size += 2);
  54. do {
  55. $chunk = fread($socket, min($bytesLeft, self::BUFFER_SIZE));
  56. if ($chunk === false || $chunk === '') {
  57. throw new CommunicationException(
  58. $connection, 'Error while reading bytes from the server'
  59. );
  60. }
  61. $bulkData .= $chunk;
  62. $bytesLeft = $size - strlen($bulkData);
  63. } while ($bytesLeft > 0);
  64. return substr($bulkData, 0, -2);
  65. case '*': // multi bulk
  66. $count = (int) $payload;
  67. if ($count === -1) {
  68. return null;
  69. }
  70. if ($this->_mbiterable == true) {
  71. return new MultiBulkResponseSimple($connection, $count);
  72. }
  73. $multibulk = array();
  74. for ($i = 0; $i < $count; $i++) {
  75. $multibulk[$i] = $this->read($connection);
  76. }
  77. return $multibulk;
  78. case ':': // integer
  79. return (int) $payload;
  80. case '-': // error
  81. $errorMessage = substr($payload, 4);
  82. if ($this->_throwErrors) {
  83. throw new \Predis\ServerException($errorMessage);
  84. }
  85. return new \Predis\ResponseError($errorMessage);
  86. default:
  87. throw new CommunicationException(
  88. $connection, "Unknown prefix: '$prefix'"
  89. );
  90. }
  91. }
  92. public function setOption($option, $value) {
  93. switch ($option) {
  94. case 'iterable_multibulk':
  95. $this->_mbiterable = (bool) $value;
  96. break;
  97. case 'throw_errors':
  98. case 'throw_on_error':
  99. $this->_throwErrors = (bool) $value;
  100. break;
  101. }
  102. }
  103. }
  104. ?>