Sfoglia il codice sorgente

Added support for partial reads/writes. Note that stream_get_contents is not used because it doesn't seem to respect the timeout.

Daniele Alessandri 15 anni fa
parent
commit
4cc43775b0
1 ha cambiato i file con 50 aggiunte e 14 eliminazioni
  1. 50 14
      lib/Predis.php

+ 50 - 14
lib/Predis.php

@@ -303,22 +303,20 @@ class ResponseErrorSilentHandler implements IResponseHandler {
 
 class ResponseBulkHandler implements IResponseHandler {
     public function handle(Connection $connection, $dataLength) {
-        $socket = $connection->getSocket();
-
         if (!is_numeric($dataLength)) {
             throw new ClientException("Cannot parse '$dataLength' as data length");
         }
 
         if ($dataLength > 0) {
-            $value = stream_get_contents($socket, $dataLength);
+            $value = $connection->readBytes($dataLength);
             if ($value === false) {
                 throw new ClientException('An error has occurred while reading from the network stream');
             }
-            fread($socket, 2);
+            $connection->readBytes(2);
             return $value;
         }
         else if ($dataLength == 0) {
-            fread($socket, 2);
+            $connection->readBytes(2);
             return '';
         }
 
@@ -442,13 +440,9 @@ class ResponseReader {
     }
 
     public function read(Connection $connection) {
-        $header  = fgets($connection->getSocket());
-        if ($header === false) {
-           throw new ClientException('An error has occurred while reading from the network stream');
-        }
-
+        $header  = $connection->readLine();
         $prefix  = $header[0];
-        $payload = substr($header, 1, -2);
+        $payload = strlen($header) > 1 ? substr($header, 1) : '';
 
         if (!isset($this->_prefixHandlers[$prefix])) {
             throw new MalformedServerResponse("Unknown prefix '$prefix'");
@@ -813,7 +807,7 @@ class Connection implements IConnection {
     }
 
     public function writeCommand(Command $command) {
-        $written = fwrite($this->getSocket(), $command());
+        $written = $this->writeBytes($command());
         if ($written === false){
            throw new ClientException(sprintf(
                'An error has occurred while writing command %s on the network stream',
@@ -837,8 +831,7 @@ class Connection implements IConnection {
     }
 
     public function rawCommand($rawCommandData, $closesConnection = false) {
-        $socket = $this->getSocket();
-        $written = fwrite($socket, $rawCommandData);
+        $written = $this->writeBytes($rawCommandData);
         if ($written === false){
            throw new ClientException('An error has occurred while writing a raw command on the network stream');
         }
@@ -848,6 +841,49 @@ class Connection implements IConnection {
         return $this->_reader->read($this);
     }
 
+    public function writeBytes($value) {
+        $socket = $this->getSocket();
+        while (($length = strlen($value)) > 0) {
+            $written = fwrite($socket, $value);
+            if ($length === $written) {
+                return true;
+            }
+            if ($written === false || $written === 0) {
+                return false;
+            }
+            $value = substr($value, $written);
+        }
+        return true;
+    }
+
+    public function readBytes($length) {
+        $socket = $this->getSocket();
+        $value  = '';
+        do {
+            $chunk = fread($socket, $length);
+            if ($chunk === false || $chunk === '') {
+                return false;
+            }
+            $value .= $chunk;
+        }
+        while (($length -= strlen($chunk)) > 0);
+        return $value;
+    }
+
+    public function readLine() {
+        $socket = $this->getSocket();
+        $value  = '';
+        do {
+            $chunk = fgets($socket);
+            if ($chunk === false || strlen($chunk) == 0) {
+                return false;
+            }
+            $value .= $chunk;
+        }
+        while (substr($value, -2) !== ResponseReader::NEWLINE);
+        return substr($value, 0, -2);
+    }
+
     public function getSocket() {
         if (!$this->isConnected()) {
             $this->connect();