TextProtocol.php 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. <?php
  2. namespace Predis\Protocol\Text;
  3. use Predis\Helpers;
  4. use Predis\ResponseError;
  5. use Predis\ResponseQueued;
  6. use Predis\ServerException;
  7. use Predis\Commands\ICommand;
  8. use Predis\Protocol\IProtocolProcessor;
  9. use Predis\Protocol\ProtocolException;
  10. use Predis\Network\IConnectionComposable;
  11. use Predis\Iterators\MultiBulkResponseSimple;
  12. class TextProtocol implements IProtocolProcessor
  13. {
  14. const NEWLINE = "\r\n";
  15. const OK = 'OK';
  16. const ERROR = 'ERR';
  17. const QUEUED = 'QUEUED';
  18. const NULL = 'nil';
  19. const PREFIX_STATUS = '+';
  20. const PREFIX_ERROR = '-';
  21. const PREFIX_INTEGER = ':';
  22. const PREFIX_BULK = '$';
  23. const PREFIX_MULTI_BULK = '*';
  24. const BUFFER_SIZE = 4096;
  25. private $_mbiterable;
  26. private $_throwErrors;
  27. private $_serializer;
  28. public function __construct()
  29. {
  30. $this->_mbiterable = false;
  31. $this->_throwErrors = true;
  32. $this->_serializer = new TextCommandSerializer();
  33. }
  34. public function write(IConnectionComposable $connection, ICommand $command)
  35. {
  36. $connection->writeBytes($this->_serializer->serialize($command));
  37. }
  38. public function read(IConnectionComposable $connection)
  39. {
  40. $chunk = $connection->readLine();
  41. $prefix = $chunk[0];
  42. $payload = substr($chunk, 1);
  43. switch ($prefix) {
  44. case '+': // inline
  45. switch ($payload) {
  46. case 'OK':
  47. return true;
  48. case 'QUEUED':
  49. return new ResponseQueued();
  50. default:
  51. return $payload;
  52. }
  53. case '$': // bulk
  54. $size = (int) $payload;
  55. if ($size === -1) {
  56. return null;
  57. }
  58. return substr($connection->readBytes($size + 2), 0, -2);
  59. case '*': // multi bulk
  60. $count = (int) $payload;
  61. if ($count === -1) {
  62. return null;
  63. }
  64. if ($this->_mbiterable == true) {
  65. return new MultiBulkResponseSimple($connection, $count);
  66. }
  67. $multibulk = array();
  68. for ($i = 0; $i < $count; $i++) {
  69. $multibulk[$i] = $this->read($connection);
  70. }
  71. return $multibulk;
  72. case ':': // integer
  73. return (int) $payload;
  74. case '-': // error
  75. if ($this->_throwErrors) {
  76. throw new ServerException($payload);
  77. }
  78. return new ResponseError($payload);
  79. default:
  80. Helpers::onCommunicationException(new ProtocolException(
  81. $connection, "Unknown prefix: '$prefix'"
  82. ));
  83. }
  84. }
  85. public function setOption($option, $value)
  86. {
  87. switch ($option) {
  88. case 'iterable_multibulk':
  89. $this->_mbiterable = (bool) $value;
  90. break;
  91. case 'throw_errors':
  92. $this->_throwErrors = (bool) $value;
  93. break;
  94. }
  95. }
  96. }