FastTextProtocol.php 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. <?php
  2. namespace Predis\Protocols;
  3. use Predis\Utils;
  4. use Predis\ICommand;
  5. use Predis\CommunicationException;
  6. use Predis\Network\IConnectionSingle;
  7. use Predis\Iterators\MultiBulkResponseSimple;
  8. class FastTextProtocol implements IRedisProtocol {
  9. private $_mbiterable, $_throwErrors;
  10. public function __construct() {
  11. $this->_mbiterable = false;
  12. $this->_throwErrors = true;
  13. }
  14. public function write(IConnectionSingle $connection, ICommand $command) {
  15. $commandId = $command->getCommandId();
  16. $arguments = $command->getArguments();
  17. $cmdlen = strlen($commandId);
  18. $reqlen = count($arguments) + 1;
  19. $buffer = "*{$reqlen}\r\n\${$cmdlen}\r\n{$commandId}\r\n";
  20. for ($i = 0; $i < $reqlen - 1; $i++) {
  21. $argument = $arguments[$i];
  22. $arglen = strlen($argument);
  23. $buffer .= "\${$arglen}\r\n{$argument}\r\n";
  24. }
  25. fwrite($connection->getResource(), $buffer);
  26. }
  27. public function read(IConnectionSingle $connection) {
  28. $bufferSize = 8192;
  29. $socket = $connection->getResource();
  30. $chunk = fgets($socket);
  31. if ($chunk === false || $chunk === '') {
  32. throw new CommunicationException(
  33. $connection, 'Error while reading line from the server'
  34. );
  35. }
  36. $prefix = $chunk[0];
  37. $payload = substr($chunk, 1);
  38. switch ($prefix) {
  39. case '+': // inline
  40. $status = substr($payload, 0, -2);
  41. if ($status === 'OK') {
  42. return true;
  43. }
  44. if ($status === 'QUEUED') {
  45. return new \Predis\ResponseQueued();
  46. }
  47. return $status;
  48. case '$': // bulk
  49. $size = (int) $payload;
  50. if ($size === -1) {
  51. return null;
  52. }
  53. $bulkData = '';
  54. $bytesLeft = ($size += 2);
  55. do {
  56. $chunk = fread($socket, min($bytesLeft, $bufferSize));
  57. if ($chunk === false || $chunk === '') {
  58. throw new CommunicationException(
  59. $connection, 'Error while reading bytes from the server'
  60. );
  61. }
  62. $bulkData .= $chunk;
  63. $bytesLeft = $size - strlen($bulkData);
  64. } while ($bytesLeft > 0);
  65. return substr($bulkData, 0, -2);
  66. case '*': // multi bulk
  67. $count = (int) $payload;
  68. if ($count === -1) {
  69. return null;
  70. }
  71. if ($this->_mbiterable == true) {
  72. return new MultiBulkResponseSimple($connection, $count);
  73. }
  74. $multibulk = array();
  75. for ($i = 0; $i < $count; $i++) {
  76. $chunk = fgets($socket);
  77. if ($chunk === false || $chunk === '') {
  78. throw new CommunicationException(
  79. $connection, 'Error while reading line from the server'
  80. );
  81. }
  82. $size = (int) substr($chunk, 1);
  83. if ($size === -1) {
  84. return $multibulk;
  85. }
  86. $bulkData = '';
  87. $bytesLeft = ($size += 2);
  88. do {
  89. $chunk = fread($socket, min($bytesLeft, $bufferSize));
  90. if ($chunk === false || $chunk === '') {
  91. throw new CommunicationException(
  92. $connection, 'Error while reading bytes from the server'
  93. );
  94. }
  95. $bulkData .= $chunk;
  96. $bytesLeft = $size - strlen($bulkData);
  97. } while ($bytesLeft > 0);
  98. $multibulk[$i] = substr($bulkData, 0, -2);
  99. }
  100. return $multibulk;
  101. case ':': // integer
  102. return (int) $payload;
  103. case '-': // error
  104. $errorMessage = substr($payload, 4);
  105. if ($this->_throwErrors) {
  106. throw new \Predis\ServerException($errorMessage);
  107. }
  108. return new \Predis\ResponseError($errorMessage);
  109. default:
  110. throw new CommunicationException(
  111. $connection, "Unknown prefix: '$prefix'"
  112. );
  113. }
  114. }
  115. public function setOption($option, $value) {
  116. switch ($option) {
  117. case 'iterable_multibulk':
  118. $this->_mbiterable = (bool) $value;
  119. break;
  120. case 'throw_errors':
  121. case 'throw_on_error':
  122. $this->_throwErrors = (bool) $value;
  123. break;
  124. }
  125. }
  126. }
  127. ?>