TextProtocol.php 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. <?php
  2. namespace Predis\Protocols;
  3. use Predis\ICommand;
  4. use Predis\CommunicationException;
  5. use Predis\Network\IConnectionSingle;
  6. use Predis\Iterators\MultiBulkResponseSimple;
  7. class TextProtocol implements IRedisProtocol {
  8. const NEWLINE = "\r\n";
  9. const OK = 'OK';
  10. const ERROR = 'ERR';
  11. const QUEUED = 'QUEUED';
  12. const NULL = 'nil';
  13. const PREFIX_STATUS = '+';
  14. const PREFIX_ERROR = '-';
  15. const PREFIX_INTEGER = ':';
  16. const PREFIX_BULK = '$';
  17. const PREFIX_MULTI_BULK = '*';
  18. const BUFFER_SIZE = 8192;
  19. private $_mbiterable, $_throwErrors, $_serializer;
  20. public function __construct() {
  21. $this->_mbiterable = false;
  22. $this->_throwErrors = true;
  23. $this->_serializer = new TextCommandSerializer();
  24. }
  25. public function write(IConnectionSingle $connection, ICommand $command) {
  26. $buffer = $this->_serializer->serialize($command);
  27. $socket = $connection->getResource();
  28. while (($length = strlen($buffer)) > 0) {
  29. $written = fwrite($socket, $buffer);
  30. if ($length === $written) {
  31. return;
  32. }
  33. if ($written === false || $written === 0) {
  34. throw new CommunicationException(
  35. $connection, 'Error while writing bytes to the server'
  36. );
  37. }
  38. $value = substr($buffer, $written);
  39. }
  40. }
  41. public function read(IConnectionSingle $connection) {
  42. $socket = $connection->getResource();
  43. $chunk = fgets($socket);
  44. if ($chunk === false || $chunk === '') {
  45. throw new CommunicationException(
  46. $connection, 'Error while reading line from the server'
  47. );
  48. }
  49. $prefix = $chunk[0];
  50. $payload = substr($chunk, 1, -2);
  51. switch ($prefix) {
  52. case '+': // inline
  53. if ($payload === 'OK') {
  54. return true;
  55. }
  56. if ($payload === 'QUEUED') {
  57. return new \Predis\ResponseQueued();
  58. }
  59. return $payload;
  60. case '$': // bulk
  61. $size = (int) $payload;
  62. if ($size === -1) {
  63. return null;
  64. }
  65. $bulkData = '';
  66. $bytesLeft = ($size += 2);
  67. do {
  68. $chunk = fread($socket, min($bytesLeft, self::BUFFER_SIZE));
  69. if ($chunk === false || $chunk === '') {
  70. throw new CommunicationException(
  71. $connection, 'Error while reading bytes from the server'
  72. );
  73. }
  74. $bulkData .= $chunk;
  75. $bytesLeft = $size - strlen($bulkData);
  76. } while ($bytesLeft > 0);
  77. return substr($bulkData, 0, -2);
  78. case '*': // multi bulk
  79. $count = (int) $payload;
  80. if ($count === -1) {
  81. return null;
  82. }
  83. if ($this->_mbiterable == true) {
  84. return new MultiBulkResponseSimple($connection, $count);
  85. }
  86. $multibulk = array();
  87. for ($i = 0; $i < $count; $i++) {
  88. $multibulk[$i] = $this->read($connection);
  89. }
  90. return $multibulk;
  91. case ':': // integer
  92. return (int) $payload;
  93. case '-': // error
  94. $errorMessage = substr($payload, 4);
  95. if ($this->_throwErrors) {
  96. throw new \Predis\ServerException($errorMessage);
  97. }
  98. return new \Predis\ResponseError($errorMessage);
  99. default:
  100. throw new CommunicationException(
  101. $connection, "Unknown prefix: '$prefix'"
  102. );
  103. }
  104. }
  105. public function setOption($option, $value) {
  106. switch ($option) {
  107. case 'iterable_multibulk':
  108. $this->_mbiterable = (bool) $value;
  109. break;
  110. case 'throw_errors':
  111. $this->_throwErrors = (bool) $value;
  112. break;
  113. }
  114. }
  115. }