TextProtocol.php 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. <?php
  2. namespace Predis\Protocols;
  3. use Predis\ResponseError;
  4. use Predis\ResponseQueued;
  5. use Predis\ServerException;
  6. use Predis\CommunicationException;
  7. use Predis\Commands\ICommand;
  8. use Predis\Network\IConnectionComposable;
  9. use Predis\Iterators\MultiBulkResponseSimple;
  10. class TextProtocol implements IProtocolProcessor {
  11. const NEWLINE = "\r\n";
  12. const OK = 'OK';
  13. const ERROR = 'ERR';
  14. const QUEUED = 'QUEUED';
  15. const NULL = 'nil';
  16. const PREFIX_STATUS = '+';
  17. const PREFIX_ERROR = '-';
  18. const PREFIX_INTEGER = ':';
  19. const PREFIX_BULK = '$';
  20. const PREFIX_MULTI_BULK = '*';
  21. const BUFFER_SIZE = 4096;
  22. private $_mbiterable, $_throwErrors, $_serializer;
  23. public function __construct() {
  24. $this->_mbiterable = false;
  25. $this->_throwErrors = true;
  26. $this->_serializer = new TextCommandSerializer();
  27. }
  28. public function write(IConnectionComposable $connection, ICommand $command) {
  29. $connection->writeBytes($this->_serializer->serialize($command));
  30. }
  31. public function read(IConnectionComposable $connection) {
  32. $chunk = $connection->readLine();
  33. $prefix = $chunk[0];
  34. $payload = substr($chunk, 1);
  35. switch ($prefix) {
  36. case '+': // inline
  37. switch ($payload) {
  38. case 'OK':
  39. return true;
  40. case 'QUEUED':
  41. return new ResponseQueued();
  42. default:
  43. return $payload;
  44. }
  45. case '$': // bulk
  46. $size = (int) $payload;
  47. if ($size === -1) {
  48. return null;
  49. }
  50. return substr($connection->readBytes($size + 2), 0, -2);
  51. case '*': // multi bulk
  52. $count = (int) $payload;
  53. if ($count === -1) {
  54. return null;
  55. }
  56. if ($this->_mbiterable == true) {
  57. return new MultiBulkResponseSimple($connection, $count);
  58. }
  59. $multibulk = array();
  60. for ($i = 0; $i < $count; $i++) {
  61. $multibulk[$i] = $this->read($connection);
  62. }
  63. return $multibulk;
  64. case ':': // integer
  65. return (int) $payload;
  66. case '-': // error
  67. $errorMessage = substr($payload, 4);
  68. if ($this->_throwErrors) {
  69. throw new ServerException($errorMessage);
  70. }
  71. return new ResponseError($errorMessage);
  72. default:
  73. throw new CommunicationException(
  74. $connection, "Unknown prefix: '$prefix'"
  75. );
  76. }
  77. }
  78. public function setOption($option, $value) {
  79. switch ($option) {
  80. case 'iterable_multibulk':
  81. $this->_mbiterable = (bool) $value;
  82. break;
  83. case 'throw_errors':
  84. $this->_throwErrors = (bool) $value;
  85. break;
  86. }
  87. }
  88. }