TextProtocol.php 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. <?php
  2. namespace Predis\Protocols;
  3. use Predis\Helpers;
  4. use Predis\ResponseError;
  5. use Predis\ResponseQueued;
  6. use Predis\ServerException;
  7. use Predis\ProtocolException;
  8. use Predis\Commands\ICommand;
  9. use Predis\Network\IConnectionComposable;
  10. use Predis\Iterators\MultiBulkResponseSimple;
  11. class TextProtocol implements IProtocolProcessor {
  12. const NEWLINE = "\r\n";
  13. const OK = 'OK';
  14. const ERROR = 'ERR';
  15. const QUEUED = 'QUEUED';
  16. const NULL = 'nil';
  17. const PREFIX_STATUS = '+';
  18. const PREFIX_ERROR = '-';
  19. const PREFIX_INTEGER = ':';
  20. const PREFIX_BULK = '$';
  21. const PREFIX_MULTI_BULK = '*';
  22. const BUFFER_SIZE = 4096;
  23. private $_mbiterable, $_throwErrors, $_serializer;
  24. public function __construct() {
  25. $this->_mbiterable = false;
  26. $this->_throwErrors = true;
  27. $this->_serializer = new TextCommandSerializer();
  28. }
  29. public function write(IConnectionComposable $connection, ICommand $command) {
  30. $connection->writeBytes($this->_serializer->serialize($command));
  31. }
  32. public function read(IConnectionComposable $connection) {
  33. $chunk = $connection->readLine();
  34. $prefix = $chunk[0];
  35. $payload = substr($chunk, 1);
  36. switch ($prefix) {
  37. case '+': // inline
  38. switch ($payload) {
  39. case 'OK':
  40. return true;
  41. case 'QUEUED':
  42. return new ResponseQueued();
  43. default:
  44. return $payload;
  45. }
  46. case '$': // bulk
  47. $size = (int) $payload;
  48. if ($size === -1) {
  49. return null;
  50. }
  51. return substr($connection->readBytes($size + 2), 0, -2);
  52. case '*': // multi bulk
  53. $count = (int) $payload;
  54. if ($count === -1) {
  55. return null;
  56. }
  57. if ($this->_mbiterable == true) {
  58. return new MultiBulkResponseSimple($connection, $count);
  59. }
  60. $multibulk = array();
  61. for ($i = 0; $i < $count; $i++) {
  62. $multibulk[$i] = $this->read($connection);
  63. }
  64. return $multibulk;
  65. case ':': // integer
  66. return (int) $payload;
  67. case '-': // error
  68. $errorMessage = substr($payload, 4);
  69. if ($this->_throwErrors) {
  70. throw new ServerException($errorMessage);
  71. }
  72. return new ResponseError($errorMessage);
  73. default:
  74. Helpers::onCommunicationException(new ProtocolException(
  75. $connection, "Unknown prefix: '$prefix'"
  76. ));
  77. }
  78. }
  79. public function setOption($option, $value) {
  80. switch ($option) {
  81. case 'iterable_multibulk':
  82. $this->_mbiterable = (bool) $value;
  83. break;
  84. case 'throw_errors':
  85. $this->_throwErrors = (bool) $value;
  86. break;
  87. }
  88. }
  89. }