filterOptions($options); $this->_options = $options; $this->_profile = $options->profile; $this->_connectionFactory = $options->connections; $this->_connection = $this->initializeConnection($parameters); } private function filterOptions($options) { if ($options === null) { return new ClientOptions(); } if ($options instanceof ClientOptions) { return $options; } if (is_array($options)) { return new ClientOptions($options); } if ($options instanceof IServerProfile) { return new ClientOptions(array('profile' => $options)); } if (is_string($options)) { return new ClientOptions(array('profile' => ServerProfile::get($options))); } throw new \InvalidArgumentException("Invalid type for client options"); } private function initializeConnection($parameters) { if ($parameters === null) { return $this->createConnection(new ConnectionParameters()); } if (is_array($parameters)) { if (isset($parameters[0])) { $cluster = $this->_options->cluster; foreach ($parameters as $single) { $cluster->add($single instanceof IConnectionSingle ? $single : $this->createConnection($single) ); } return $cluster; } return $this->createConnection($parameters); } if ($parameters instanceof IConnection) { return $parameters; } return $this->createConnection($parameters); } private function createConnection($parameters) { $connection = $this->_connectionFactory->create($parameters); $this->pushInitCommands($connection); return $connection; } private function pushInitCommands(IConnectionSingle $connection) { $params = $connection->getParameters(); if (isset($params->password)) { $connection->pushInitCommand($this->createCommand( 'auth', array($params->password) )); } if (isset($params->database)) { $connection->pushInitCommand($this->createCommand( 'select', array($params->database) )); } } public function getProfile() { return $this->_profile; } public function getOptions() { return $this->_options; } public function getConnectionFactory() { return $this->_connectionFactory; } public function getClientFor($connectionAlias) { if (($connection = $this->getConnection($connectionAlias)) === null) { throw new \InvalidArgumentException( "Invalid connection alias: '$connectionAlias'" ); } return new Client($connection, $this->_options); } public function connect() { $this->_connection->connect(); } public function disconnect() { $this->_connection->disconnect(); } public function quit() { $this->disconnect(); } public function isConnected() { return $this->_connection->isConnected(); } public function getConnection($id = null) { if (isset($id)) { if (!Helpers::isCluster($this->_connection)) { throw new ClientException( 'Retrieving connections by alias is supported '. 'only with clustered connections' ); } return $this->_connection->getConnectionById($id); } return $this->_connection; } public function __call($method, $arguments) { $command = $this->_profile->createCommand($method, $arguments); return $this->_connection->executeCommand($command); } public function createCommand($method, $arguments = array()) { return $this->_profile->createCommand($method, $arguments); } public function executeCommand(ICommand $command) { return $this->_connection->executeCommand($command); } public function executeCommandOnShards(ICommand $command) { if (Helpers::isCluster($this->_connection)) { $replies = array(); foreach ($this->_connection as $connection) { $replies[] = $connection->executeCommand($command); } return $replies; } return array($this->_connection->executeCommand($command)); } private function sharedInitializer($argv, $initializer) { switch (count($argv)) { case 0: return $this->$initializer(); case 1: list($arg0) = $argv; return is_array($arg0) ? $this->$initializer($arg0) : $this->$initializer(null, $arg0); case 2: list($arg0, $arg1) = $argv; return $this->$initializer($arg0, $arg1); default: return $this->$initializer($this, $argv); } } public function pipeline(/* arguments */) { return $this->sharedInitializer(func_get_args(), 'initPipeline'); } private function initPipeline(Array $options = null, $pipelineBlock = null) { $pipeline = null; if (isset($options)) { if (isset($options['safe']) && $options['safe'] == true) { $connection = $this->_connection; $pipeline = new PipelineContext($this, Helpers::isCluster($connection) ? new Pipeline\SafeClusterExecutor() : new Pipeline\SafeExecutor() ); } else { $pipeline = new PipelineContext($this); } } return $this->pipelineExecute( $pipeline ?: new PipelineContext($this), $pipelineBlock ); } private function pipelineExecute(PipelineContext $pipeline, $block) { return $block !== null ? $pipeline->execute($block) : $pipeline; } public function multiExec(/* arguments */) { return $this->sharedInitializer(func_get_args(), 'initMultiExec'); } private function initMultiExec(Array $options = null, $transBlock = null) { $multi = isset($options) ? new MultiExecContext($this, $options) : new MultiExecContext($this); return $transBlock !== null ? $multi->execute($transBlock) : $multi; } public function pubSubContext(Array $options = null) { return new PubSubContext($this, $options); } public function monitor() { return new MonitorContext($this); } }