12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788 |
- <?php
- namespace Predis\Network;
- use Predis\IConnectionParameters;
- use Predis\CommunicationException;
- use Predis\Commands\ICommand;
- use Predis\Protocols\IProtocolProcessor;
- use Predis\Protocols\TextProtocol;
- class ComposableStreamConnection extends StreamConnection implements IConnectionComposable {
- private $_protocol;
- public function __construct(IConnectionParameters $parameters, IProtocolProcessor $protocol = null) {
- $this->setProtocol($protocol ?: new TextProtocol());
- parent::__construct($parameters);
- }
- protected function initializeProtocol(IConnectionParameters $parameters) {
- $this->_protocol->setOption('throw_errors', $parameters->throw_errors);
- $this->_protocol->setOption('iterable_multibulk', $parameters->iterable_multibulk);
- }
- public function setProtocol(IProtocolProcessor $protocol) {
- if ($protocol === null) {
- throw new \InvalidArgumentException("The protocol instance cannot be a null value");
- }
- $this->_protocol = $protocol;
- }
- public function getProtocol() {
- return $this->_protocol;
- }
- public function writeBytes($value) {
- $socket = $this->getResource();
- while (($length = strlen($value)) > 0) {
- $written = fwrite($socket, $value);
- if ($length === $written) {
- return true;
- }
- if ($written === false || $written === 0) {
- $this->onCommunicationException('Error while writing bytes to the server');
- }
- $value = substr($value, $written);
- }
- return true;
- }
- public function readBytes($length) {
- if ($length <= 0) {
- throw new \InvalidArgumentException('Length parameter must be greater than 0');
- }
- $socket = $this->getResource();
- $value = '';
- do {
- $chunk = fread($socket, $length);
- if ($chunk === false || $chunk === '') {
- $this->onCommunicationException('Error while reading bytes from the server');
- }
- $value .= $chunk;
- }
- while (($length -= strlen($chunk)) > 0);
- return $value;
- }
- public function readLine() {
- $socket = $this->getResource();
- $value = '';
- do {
- $chunk = fgets($socket);
- if ($chunk === false || $chunk === '') {
- $this->onCommunicationException('Error while reading line from the server');
- }
- $value .= $chunk;
- }
- while (substr($value, -2) !== "\r\n");
- return substr($value, 0, -2);
- }
- public function writeCommand(ICommand $command) {
- $this->_protocol->write($this, $command);
- }
- public function read() {
- return $this->_protocol->read($this);
- }
- }
|