|
@@ -17,22 +17,57 @@ class Client {
|
|
|
|
|
|
public function __construct($host = Connection::DEFAULT_HOST, $port = Connection::DEFAULT_PORT) {
|
|
public function __construct($host = Connection::DEFAULT_HOST, $port = Connection::DEFAULT_PORT) {
|
|
$this->_pipelining = false;
|
|
$this->_pipelining = false;
|
|
- $this->_connection = new Connection($host, $port);
|
|
|
|
$this->_registeredCommands = self::initializeDefaultCommands();
|
|
$this->_registeredCommands = self::initializeDefaultCommands();
|
|
|
|
+
|
|
|
|
+ $this->setConnection($this->createConnection(
|
|
|
|
+ func_num_args() === 1 && is_array($host) || @stripos('redis://') === 0
|
|
|
|
+ ? $host
|
|
|
|
+ : array('host' => $host, 'port' => $port)
|
|
|
|
+ ));
|
|
}
|
|
}
|
|
|
|
|
|
public function __destruct() {
|
|
public function __destruct() {
|
|
$this->_connection->disconnect();
|
|
$this->_connection->disconnect();
|
|
}
|
|
}
|
|
|
|
|
|
- public static function createCluster(/* arguments */) {
|
|
|
|
- $cluster = new ConnectionCluster();
|
|
|
|
- foreach (func_get_args() as $parameters) {
|
|
|
|
- $cluster->add(new Connection($parameters['host'], $parameters['port']));
|
|
|
|
|
|
+ public static function create(/* arguments */) {
|
|
|
|
+ $argv = func_get_args();
|
|
|
|
+ $argc = func_num_args();
|
|
|
|
+
|
|
|
|
+ if ($argc == 1) {
|
|
|
|
+ return new Client($argv[0]);
|
|
|
|
+ }
|
|
|
|
+ else if ($argc > 1) {
|
|
|
|
+ $client = new Client();
|
|
|
|
+ $cluster = new ConnectionCluster();
|
|
|
|
+ foreach ($argv as $parameters) {
|
|
|
|
+ // TODO: this is a bit dirty...
|
|
|
|
+ $cluster->add($client->createConnection($parameters));
|
|
|
|
+ }
|
|
|
|
+ $client->setConnection($cluster);
|
|
|
|
+ return $client;
|
|
|
|
+ }
|
|
|
|
+ else {
|
|
|
|
+ return new Client();
|
|
}
|
|
}
|
|
- $client = new Client();
|
|
|
|
- $client->setConnection($cluster);
|
|
|
|
- return $client;
|
|
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private function createConnection($parameters) {
|
|
|
|
+ $params = new ConnectionParameters($parameters);
|
|
|
|
+ $connection = new Connection($params);
|
|
|
|
+
|
|
|
|
+ if ($params->password !== null) {
|
|
|
|
+ $connection->pushInitCommand($this->createCommandInstance(
|
|
|
|
+ 'auth', array($params->password)
|
|
|
|
+ ));
|
|
|
|
+ }
|
|
|
|
+ if ($params->database !== null) {
|
|
|
|
+ $connection->pushInitCommand($this->createCommandInstance(
|
|
|
|
+ 'select', array($params->database)
|
|
|
|
+ ));
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ return $connection;
|
|
}
|
|
}
|
|
|
|
|
|
private function setConnection(IConnection $connection) {
|
|
private function setConnection(IConnection $connection) {
|
|
@@ -517,6 +552,59 @@ class CommandPipeline {
|
|
|
|
|
|
/* ------------------------------------------------------------------------- */
|
|
/* ------------------------------------------------------------------------- */
|
|
|
|
|
|
|
|
+class ConnectionParameters {
|
|
|
|
+ private $_parameters;
|
|
|
|
+
|
|
|
|
+ public function __construct($parameters) {
|
|
|
|
+ $this->_parameters = is_array($parameters)
|
|
|
|
+ ? self::filterConnectionParams($parameters)
|
|
|
|
+ : self::parseURI($parameters);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private static function parseURI($uri) {
|
|
|
|
+ $parsed = @parse_url($uri);
|
|
|
|
+
|
|
|
|
+ if ($parsed == false || $parsed['scheme'] != 'redis' || $parsed['host'] == null) {
|
|
|
|
+ throw new ClientException("Invalid URI: $uri");
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if (array_key_exists('query', $parsed)) {
|
|
|
|
+ $details = array();
|
|
|
|
+ foreach (explode('&', $parsed['query']) as $kv) {
|
|
|
|
+ list($k, $v) = explode('=', $kv);
|
|
|
|
+ switch ($k) {
|
|
|
|
+ case 'database':
|
|
|
|
+ $details['database'] = $v;
|
|
|
|
+ break;
|
|
|
|
+ case 'password':
|
|
|
|
+ $details['password'] = $v;
|
|
|
|
+ break;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ $parsed = array_merge($parsed, $details);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ return self::filterConnectionParams($parsed);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private static function getParamOrDefault(Array $parameters, $param, $default = null) {
|
|
|
|
+ return array_key_exists($param, $parameters) ? $parameters[$param] : $default;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private static function filterConnectionParams($parameters) {
|
|
|
|
+ return array(
|
|
|
|
+ 'host' => self::getParamOrDefault($parameters, 'host', Connection::DEFAULT_HOST),
|
|
|
|
+ 'port' => (int) self::getParamOrDefault($parameters, 'port', Connection::DEFAULT_PORT),
|
|
|
|
+ 'database' => self::getParamOrDefault($parameters, 'database'),
|
|
|
|
+ 'password' => self::getParamOrDefault($parameters, 'password')
|
|
|
|
+ );
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public function __get($parameter) {
|
|
|
|
+ return $this->_parameters[$parameter];
|
|
|
|
+ }
|
|
|
|
+}
|
|
|
|
+
|
|
interface IConnection {
|
|
interface IConnection {
|
|
public function connect();
|
|
public function connect();
|
|
public function disconnect();
|
|
public function disconnect();
|
|
@@ -531,11 +619,11 @@ class Connection implements IConnection {
|
|
const CONNECTION_TIMEOUT = 2;
|
|
const CONNECTION_TIMEOUT = 2;
|
|
const READ_WRITE_TIMEOUT = 5;
|
|
const READ_WRITE_TIMEOUT = 5;
|
|
|
|
|
|
- private $_host, $_port, $_socket;
|
|
|
|
|
|
+ private $_params, $_socket, $_initCmds;
|
|
|
|
|
|
- public function __construct($host = self::DEFAULT_HOST, $port = self::DEFAULT_PORT) {
|
|
|
|
- $this->_host = $host;
|
|
|
|
- $this->_port = $port;
|
|
|
|
|
|
+ public function __construct(ConnectionParameters $parameters) {
|
|
|
|
+ $this->_params = $parameters;
|
|
|
|
+ $this->_initCmds = array();
|
|
}
|
|
}
|
|
|
|
|
|
public function __destruct() {
|
|
public function __destruct() {
|
|
@@ -550,12 +638,16 @@ class Connection implements IConnection {
|
|
if ($this->isConnected()) {
|
|
if ($this->isConnected()) {
|
|
throw new ClientException('Connection already estabilished');
|
|
throw new ClientException('Connection already estabilished');
|
|
}
|
|
}
|
|
- $uri = sprintf('tcp://%s:%d/', $this->_host, $this->_port);
|
|
|
|
|
|
+ $uri = sprintf('tcp://%s:%d/', $this->_params->host, $this->_params->port);
|
|
$this->_socket = @stream_socket_client($uri, $errno, $errstr, self::CONNECTION_TIMEOUT);
|
|
$this->_socket = @stream_socket_client($uri, $errno, $errstr, self::CONNECTION_TIMEOUT);
|
|
if (!$this->_socket) {
|
|
if (!$this->_socket) {
|
|
throw new ClientException(trim($errstr), $errno);
|
|
throw new ClientException(trim($errstr), $errno);
|
|
}
|
|
}
|
|
stream_set_timeout($this->_socket, self::READ_WRITE_TIMEOUT);
|
|
stream_set_timeout($this->_socket, self::READ_WRITE_TIMEOUT);
|
|
|
|
+
|
|
|
|
+ if (count($this->_initCmds) > 0){
|
|
|
|
+ $this->sendInitializationCommands();
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
public function disconnect() {
|
|
public function disconnect() {
|
|
@@ -564,6 +656,19 @@ class Connection implements IConnection {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ public function pushInitCommand(Command $command){
|
|
|
|
+ $this->_initCmds[] = $command;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private function sendInitializationCommands() {
|
|
|
|
+ foreach ($this->_initCmds as $command) {
|
|
|
|
+ $this->writeCommand($command);
|
|
|
|
+ }
|
|
|
|
+ foreach ($this->_initCmds as $command) {
|
|
|
|
+ $this->readResponse($command);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
public function writeCommand(Command $command) {
|
|
public function writeCommand(Command $command) {
|
|
fwrite($this->getSocket(), $command());
|
|
fwrite($this->getSocket(), $command());
|
|
}
|
|
}
|
|
@@ -593,7 +698,7 @@ class Connection implements IConnection {
|
|
}
|
|
}
|
|
|
|
|
|
public function __toString() {
|
|
public function __toString() {
|
|
- return sprintf('tcp://%s:%d/', $this->_host, $this->_port);
|
|
|
|
|
|
+ return sprintf('tcp://%s:%d/', $this->_params->host, $this->_params->port);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|