TcpConnection.php 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. <?php
  2. namespace Predis\Network;
  3. use Predis\ICommand;
  4. use Predis\ConnectionParameters;
  5. use Predis\CommunicationException;
  6. use Predis\Protocols\IRedisProtocol;
  7. use Predis\Protocols\TextProtocol;
  8. class TcpConnection extends ConnectionBase implements IConnectionSingle {
  9. public function __construct(ConnectionParameters $parameters, IRedisProtocol $protocol = null) {
  10. parent::__construct($this->checkParameters($parameters), $protocol ?: new TextProtocol());
  11. }
  12. public function __destruct() {
  13. if (!$this->_params->connection_persistent) {
  14. $this->disconnect();
  15. }
  16. }
  17. protected function checkParameters(ConnectionParameters $parameters) {
  18. $scheme = $parameters->scheme;
  19. if ($scheme != 'tcp' && $scheme != 'redis') {
  20. throw new \InvalidArgumentException("Invalid scheme: {$scheme}");
  21. }
  22. return $parameters;
  23. }
  24. protected function createResource() {
  25. $uri = sprintf('tcp://%s:%d/', $this->_params->host, $this->_params->port);
  26. $connectFlags = STREAM_CLIENT_CONNECT;
  27. if ($this->_params->connection_async) {
  28. $connectFlags |= STREAM_CLIENT_ASYNC_CONNECT;
  29. }
  30. if ($this->_params->connection_persistent) {
  31. $connectFlags |= STREAM_CLIENT_PERSISTENT;
  32. }
  33. $this->_socket = @stream_socket_client(
  34. $uri, $errno, $errstr, $this->_params->connection_timeout, $connectFlags
  35. );
  36. if (!$this->_socket) {
  37. $this->onCommunicationException(trim($errstr), $errno);
  38. }
  39. if (isset($this->_params->read_write_timeout)) {
  40. $timeoutSeconds = floor($this->_params->read_write_timeout);
  41. $timeoutUSeconds = ($this->_params->read_write_timeout - $timeoutSeconds) * 1000000;
  42. stream_set_timeout($this->_socket, $timeoutSeconds, $timeoutUSeconds);
  43. }
  44. }
  45. private function sendInitializationCommands() {
  46. foreach ($this->_initCmds as $command) {
  47. $this->writeCommand($command);
  48. }
  49. foreach ($this->_initCmds as $command) {
  50. $this->readResponse($command);
  51. }
  52. }
  53. public function connect() {
  54. parent::connect();
  55. if (count($this->_initCmds) > 0){
  56. $this->sendInitializationCommands();
  57. }
  58. }
  59. public function writeCommand(ICommand $command) {
  60. $this->_protocol->write($this, $command);
  61. }
  62. public function readResponse(ICommand $command) {
  63. $response = $this->_protocol->read($this);
  64. return isset($response->skipParse) ? $response : $command->parseResponse($response);
  65. }
  66. public function writeBytes($value) {
  67. $socket = $this->getResource();
  68. while (($length = strlen($value)) > 0) {
  69. $written = fwrite($socket, $value);
  70. if ($length === $written) {
  71. return true;
  72. }
  73. if ($written === false || $written === 0) {
  74. $this->onCommunicationException('Error while writing bytes to the server');
  75. }
  76. $value = substr($value, $written);
  77. }
  78. return true;
  79. }
  80. public function readBytes($length) {
  81. if ($length <= 0) {
  82. throw new \InvalidArgumentException('Length parameter must be greater than 0');
  83. }
  84. $socket = $this->getResource();
  85. $value = '';
  86. do {
  87. $chunk = fread($socket, $length);
  88. if ($chunk === false || $chunk === '') {
  89. $this->onCommunicationException('Error while reading bytes from the server');
  90. }
  91. $value .= $chunk;
  92. }
  93. while (($length -= strlen($chunk)) > 0);
  94. return $value;
  95. }
  96. public function readLine() {
  97. $socket = $this->getResource();
  98. $value = '';
  99. do {
  100. $chunk = fgets($socket);
  101. if ($chunk === false || $chunk === '') {
  102. $this->onCommunicationException('Error while reading line from the server');
  103. }
  104. $value .= $chunk;
  105. }
  106. while (substr($value, -2) !== "\r\n");
  107. return substr($value, 0, -2);
  108. }
  109. }