فهرست منبع

Merge branch 'hashring_rework' into hyves_contribs_port

Daniele Alessandri 15 سال پیش
والد
کامیت
5842c57b28
1فایلهای تغییر یافته به همراه77 افزوده شده و 22 حذف شده
  1. 77 22
      lib/Predis.php

+ 77 - 22
lib/Predis.php

@@ -179,7 +179,7 @@ abstract class Command {
         return true;
     }
 
-    public function getHash() {
+    public function getHash(Utilities\IRing $ring) {
         if (isset($this->_hash)) {
             return $this->_hash;
         }
@@ -193,7 +193,7 @@ abstract class Command {
                     $key = substr($key, ++$start, $end - $start);
                 }
 
-                $this->_hash = crc32($key);
+                $this->_hash = $ring->generateKey($key);
                 return $this->_hash;
             }
         }
@@ -210,10 +210,12 @@ abstract class Command {
 
     public function setArguments(/* arguments */) {
         $this->_arguments = $this->filterArguments(func_get_args());
+        $this->_hash = null;
     }
 
     public function setArgumentsArray(Array $arguments) {
         $this->_arguments = $this->filterArguments($arguments);
+        $this->_hash = null;
     }
 
     protected function getArguments() {
@@ -773,7 +775,7 @@ class ConnectionParameters {
             'connection_timeout' => self::getParamOrDefault($parameters, 'connection_timeout', self::DEFAULT_TIMEOUT), 
             'read_write_timeout' => self::getParamOrDefault($parameters, 'read_write_timeout'), 
             'alias'  => self::getParamOrDefault($parameters, 'alias'), 
-            'weight' => self::getParamOrDefault($parameters, 'weight', Utilities\HashRing::DEFAULT_WEIGHT), 
+            'weight' => self::getParamOrDefault($parameters, 'weight'), 
         );
     }
 
@@ -966,9 +968,9 @@ class Connection implements IConnection {
 class ConnectionCluster implements IConnection, \IteratorAggregate {
     private $_pool, $_ring;
 
-    public function __construct() {
+    public function __construct(Utilities\IRing $ring = null) {
         $this->_pool = array();
-        $this->_ring = new Utilities\HashRing();
+        $this->_ring = $ring ?: new Utilities\HashRing();
     }
 
     public function isConnected() {
@@ -1009,7 +1011,7 @@ class ConnectionCluster implements IConnection, \IteratorAggregate {
                 sprintf("Cannot send '%s' commands to a cluster of connections.", $command->getCommandId())
             );
         }
-        return $this->_ring->get($command->getHash());
+        return $this->_ring->get($command->getHash($this->_ring));
     }
 
     public function getConnectionById($id = null) {
@@ -1374,20 +1376,27 @@ class Shared {
     }
 }
 
-class HashRing {
+interface IRing {
+    public function add($node, $weight = null);
+    public function remove($node);
+    public function get($key);
+    public function generateKey($value);
+}
+
+class HashRing implements IRing {
     const DEFAULT_REPLICAS = 128;
     const DEFAULT_WEIGHT   = 100;
-    private $_nodes, $_ring, $_ringKeys, $_replicas;
+    private $_nodes, $_ring, $_ringKeys, $_ringKeysCount, $_replicas;
 
     public function __construct($replicas = self::DEFAULT_REPLICAS) {
         $this->_replicas = $replicas;
         $this->_nodes    = array();
     }
 
-    public function add($node, $weight = self::DEFAULT_WEIGHT) {
+    public function add($node, $weight = null) {
         // NOTE: in case of collisions in the hashes of the nodes, the node added
         //       last wins, thus the order in which nodes are added is significant.
-        $this->_nodes[] = array('object' => $node, 'weight' => (int) $weight);
+        $this->_nodes[] = array('object' => $node, 'weight' => (int) $weight ?: $this::DEFAULT_WEIGHT);
         $this->reset();
     }
 
@@ -1408,6 +1417,7 @@ class HashRing {
     private function reset() {
         unset($this->_ring);
         unset($this->_ringKeys);
+        unset($this->_ringKeysCount);
     }
 
     private function isInitialized() {
@@ -1435,17 +1445,26 @@ class HashRing {
         $totalWeight = $this->computeTotalWeight();
         $nodesCount  = count($this->_nodes);
         foreach ($this->_nodes as $node) {
-            $nodeObject  = $node['object'];
-            $nodeHash    = (string) $nodeObject;
             $weightRatio = $node['weight'] / $totalWeight;
-            $replicas    = (int) round($weightRatio * $nodesCount * $this->_replicas);
-            for ($i = 0; $i < $replicas; $i++) {
-                $key = crc32($nodeHash . ':' . $i);
-                $this->_ring[$key] = $nodeObject;
-            }
+            $this->addNodeToRing($this->_ring, $node, $nodesCount, $this->_replicas, $weightRatio);
         }
         ksort($this->_ring, SORT_NUMERIC);
         $this->_ringKeys = array_keys($this->_ring);
+        $this->_ringKeysCount = count($this->_ringKeys);
+    }
+
+    protected function addNodeToRing(&$ring, $node, $totalNodes, $replicas, $weightRatio) {
+        $nodeObject = $node['object'];
+        $nodeHash = (string) $nodeObject;
+        $replicas = (int) round($weightRatio * $totalNodes * $replicas);
+        for ($i = 0; $i < $replicas; $i++) {
+            $key = crc32("$nodeHash:$i");
+            $ring[$key] = $nodeObject;
+        }
+    }
+
+    public function generateKey($value) {
+        return crc32($value);
     }
 
     public function get($key) {
@@ -1453,12 +1472,9 @@ class HashRing {
     }
 
     private function getNodeKey($key) {
-        // NOTE: binary search for the last item in _ringkeys with a value
-        //       less or equal to the key. If no such item exists, return the 
-        //       last item.
         $this->initialize();
         $ringKeys = $this->_ringKeys;
-        $upper = count($ringKeys) - 1;
+        $upper = $this->_ringKeysCount - 1;
         $lower = 0;
 
         while ($lower <= $upper) {
@@ -1474,7 +1490,46 @@ class HashRing {
                 return $item;
             }
         }
-        return $ringKeys[$upper >= 0 ? $upper : count($ringKeys) - 1];
+        return $ringKeys[$this->wrapAroundStrategy($upper, $lower, $this->_ringKeysCount)];
+    }
+
+    protected function wrapAroundStrategy($upper, $lower, $ringKeysCount) {
+        // NOTE: binary search for the last item in _ringkeys with a value 
+        //       less or equal to the key. If no such item exists, return the 
+        //       last item.
+        return $upper >= 0 ? $upper : $ringKeysCount - 1;
+    }
+}
+
+class KetamaPureRing extends HashRing {
+    const DEFAULT_REPLICAS = 160;
+
+    public function __construct() {
+        parent::__construct($this::DEFAULT_REPLICAS);
+    }
+
+    protected function addNodeToRing(&$ring, $node, $totalNodes, $replicas, $weightRatio) {
+        $nodeObject = $node['object'];
+        $nodeHash = (string) $nodeObject;
+        $replicas = (int) floor($weightRatio * $totalNodes * ($replicas / 4));
+        for ($i = 0; $i < $replicas; $i++) {
+            $unpackedDigest = unpack('V4', md5("$nodeHash-$i", true));
+            foreach ($unpackedDigest as $key) {
+                $ring[$key] = $nodeObject;
+            }
+        }
+    }
+
+    public function generateKey($value) {
+        $hash = unpack('V', md5($value, true));
+        return $hash[1];
+    }
+
+    protected function wrapAroundStrategy($upper, $lower, $ringKeysCount) {
+        // NOTE: binary search for the first item in _ringkeys with a value 
+        //       greater or equal to the key. If no such item exists, return the 
+        //       first item.
+        return $lower < $ringKeysCount ? $lower : 0;
     }
 }