Client.php 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262
  1. <?php
  2. namespace Predis;
  3. use Predis\Commands\ICommand;
  4. use Predis\Network\IConnection;
  5. use Predis\Network\IConnectionSingle;
  6. use Predis\Profiles\IServerProfile;
  7. use Predis\Profiles\ServerProfile;
  8. use Predis\Pipeline\PipelineContext;
  9. use Predis\Transaction\MultiExecContext;
  10. class Client {
  11. const VERSION = '0.7.0-dev';
  12. private $_options;
  13. private $_profile;
  14. private $_connection;
  15. private $_connectionFactory;
  16. public function __construct($parameters = null, $options = null)
  17. {
  18. $options = $this->filterOptions($options);
  19. $profile = $options->profile;
  20. if (isset($options->prefix)) {
  21. $profile->setProcessor($options->prefix);
  22. }
  23. $this->_options = $options;
  24. $this->_profile = $profile;
  25. $this->_connectionFactory = $options->connections;
  26. $this->_connection = $this->initializeConnection($parameters);
  27. }
  28. private function filterOptions($options)
  29. {
  30. if ($options === null) {
  31. return new ClientOptions();
  32. }
  33. if (is_array($options)) {
  34. return new ClientOptions($options);
  35. }
  36. if ($options instanceof ClientOptions) {
  37. return $options;
  38. }
  39. if ($options instanceof IServerProfile) {
  40. return new ClientOptions(array('profile' => $options));
  41. }
  42. if (is_string($options)) {
  43. return new ClientOptions(array('profile' => ServerProfile::get($options)));
  44. }
  45. throw new \InvalidArgumentException("Invalid type for client options");
  46. }
  47. private function initializeConnection($parameters)
  48. {
  49. if ($parameters === null) {
  50. return $this->createConnection(new ConnectionParameters());
  51. }
  52. if (is_array($parameters)) {
  53. if (isset($parameters[0])) {
  54. $cluster = $this->_options->cluster;
  55. foreach ($parameters as $node) {
  56. $connection = $node instanceof IConnectionSingle ? $node : $this->createConnection($node);
  57. $cluster->add($connection);
  58. }
  59. return $cluster;
  60. }
  61. return $this->createConnection($parameters);
  62. }
  63. if ($parameters instanceof IConnection) {
  64. return $parameters;
  65. }
  66. return $this->createConnection($parameters);
  67. }
  68. protected function createConnection($parameters)
  69. {
  70. $connection = $this->_connectionFactory->create($parameters);
  71. $parameters = $connection->getParameters();
  72. if (isset($parameters->password)) {
  73. $command = $this->createCommand('auth', array($parameters->password));
  74. $connection->pushInitCommand($command);
  75. }
  76. if (isset($parameters->database)) {
  77. $command = $this->createCommand('select', array($parameters->database));
  78. $connection->pushInitCommand($command);
  79. }
  80. return $connection;
  81. }
  82. public function getProfile()
  83. {
  84. return $this->_profile;
  85. }
  86. public function getOptions()
  87. {
  88. return $this->_options;
  89. }
  90. public function getConnectionFactory()
  91. {
  92. return $this->_connectionFactory;
  93. }
  94. public function getClientFor($connectionAlias)
  95. {
  96. if (($connection = $this->getConnection($connectionAlias)) === null) {
  97. throw new \InvalidArgumentException("Invalid connection alias: '$connectionAlias'");
  98. }
  99. return new Client($connection, $this->_options);
  100. }
  101. public function connect()
  102. {
  103. $this->_connection->connect();
  104. }
  105. public function disconnect()
  106. {
  107. $this->_connection->disconnect();
  108. }
  109. public function quit()
  110. {
  111. $this->disconnect();
  112. }
  113. public function isConnected()
  114. {
  115. return $this->_connection->isConnected();
  116. }
  117. public function getConnection($id = null)
  118. {
  119. if (isset($id)) {
  120. if (!Helpers::isCluster($this->_connection)) {
  121. throw new ClientException(
  122. 'Retrieving connections by alias is supported only with clustered connections'
  123. );
  124. }
  125. return $this->_connection->getConnectionById($id);
  126. }
  127. return $this->_connection;
  128. }
  129. public function __call($method, $arguments)
  130. {
  131. $command = $this->_profile->createCommand($method, $arguments);
  132. return $this->_connection->executeCommand($command);
  133. }
  134. public function createCommand($method, $arguments = array())
  135. {
  136. return $this->_profile->createCommand($method, $arguments);
  137. }
  138. public function executeCommand(ICommand $command)
  139. {
  140. return $this->_connection->executeCommand($command);
  141. }
  142. public function executeCommandOnShards(ICommand $command)
  143. {
  144. if (Helpers::isCluster($this->_connection)) {
  145. $replies = array();
  146. foreach ($this->_connection as $connection) {
  147. $replies[] = $connection->executeCommand($command);
  148. }
  149. return $replies;
  150. }
  151. return array($this->_connection->executeCommand($command));
  152. }
  153. private function sharedInitializer($argv, $initializer)
  154. {
  155. switch (count($argv)) {
  156. case 0:
  157. return $this->$initializer();
  158. case 1:
  159. list($arg0) = $argv;
  160. return is_array($arg0) ? $this->$initializer($arg0) : $this->$initializer(null, $arg0);
  161. case 2:
  162. list($arg0, $arg1) = $argv;
  163. return $this->$initializer($arg0, $arg1);
  164. default:
  165. return $this->$initializer($this, $argv);
  166. }
  167. }
  168. public function pipeline(/* arguments */)
  169. {
  170. return $this->sharedInitializer(func_get_args(), 'initPipeline');
  171. }
  172. protected function initPipeline(Array $options = null, $pipelineBlock = null)
  173. {
  174. $pipeline = new PipelineContext($this, $options);
  175. return $this->pipelineExecute($pipeline, $pipelineBlock);
  176. }
  177. private function pipelineExecute(PipelineContext $pipeline, $block)
  178. {
  179. return $block !== null ? $pipeline->execute($block) : $pipeline;
  180. }
  181. public function multiExec(/* arguments */)
  182. {
  183. return $this->sharedInitializer(func_get_args(), 'initMultiExec');
  184. }
  185. protected function initMultiExec(Array $options = null, $block = null)
  186. {
  187. $transaction = new MultiExecContext($this, $options ?: array());
  188. return isset($block) ? $transaction->execute($block) : $transaction;
  189. }
  190. public function pubSub(/* arguments */)
  191. {
  192. return $this->sharedInitializer(func_get_args(), 'initPubSub');
  193. }
  194. protected function initPubSub(Array $options = null, $block = null)
  195. {
  196. $pubsub = new PubSubContext($this, $options);
  197. if (!isset($block)) {
  198. return $pubsub;
  199. }
  200. foreach ($pubsub as $message) {
  201. if ($block($pubsub, $message) === false) {
  202. $pubsub->closeContext();
  203. }
  204. }
  205. }
  206. public function monitor()
  207. {
  208. return new MonitorContext($this);
  209. }
  210. }