Client.php 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261
  1. <?php
  2. namespace Predis;
  3. use Predis\Commands\ICommand;
  4. use Predis\Network\IConnection;
  5. use Predis\Network\IConnectionSingle;
  6. use Predis\Profiles\IServerProfile;
  7. use Predis\Profiles\ServerProfile;
  8. use Predis\Pipeline\PipelineContext;
  9. use Predis\Transaction\MultiExecContext;
  10. class Client
  11. {
  12. const VERSION = '0.7.0-dev';
  13. private $_options;
  14. private $_profile;
  15. private $_connection;
  16. private $_connectionFactory;
  17. public function __construct($parameters = null, $options = null)
  18. {
  19. $options = $this->filterOptions($options);
  20. $profile = $options->profile;
  21. if (isset($options->prefix)) {
  22. $profile->setProcessor($options->prefix);
  23. }
  24. $this->_options = $options;
  25. $this->_profile = $profile;
  26. $this->_connectionFactory = $options->connections;
  27. $this->_connection = $this->initializeConnection($parameters);
  28. }
  29. private function filterOptions($options)
  30. {
  31. if ($options === null) {
  32. return new ClientOptions();
  33. }
  34. if (is_array($options)) {
  35. return new ClientOptions($options);
  36. }
  37. if ($options instanceof ClientOptions) {
  38. return $options;
  39. }
  40. if ($options instanceof IServerProfile) {
  41. return new ClientOptions(array('profile' => $options));
  42. }
  43. if (is_string($options)) {
  44. return new ClientOptions(array('profile' => ServerProfile::get($options)));
  45. }
  46. throw new \InvalidArgumentException("Invalid type for client options");
  47. }
  48. private function initializeConnection($parameters)
  49. {
  50. if ($parameters === null) {
  51. return $this->createConnection(new ConnectionParameters());
  52. }
  53. if (is_array($parameters)) {
  54. if (isset($parameters[0])) {
  55. $cluster = $this->_options->cluster;
  56. foreach ($parameters as $node) {
  57. $connection = $node instanceof IConnectionSingle ? $node : $this->createConnection($node);
  58. $cluster->add($connection);
  59. }
  60. return $cluster;
  61. }
  62. return $this->createConnection($parameters);
  63. }
  64. if ($parameters instanceof IConnection) {
  65. return $parameters;
  66. }
  67. return $this->createConnection($parameters);
  68. }
  69. protected function createConnection($parameters)
  70. {
  71. $connection = $this->_connectionFactory->create($parameters);
  72. $parameters = $connection->getParameters();
  73. if (isset($parameters->password)) {
  74. $command = $this->createCommand('auth', array($parameters->password));
  75. $connection->pushInitCommand($command);
  76. }
  77. if (isset($parameters->database)) {
  78. $command = $this->createCommand('select', array($parameters->database));
  79. $connection->pushInitCommand($command);
  80. }
  81. return $connection;
  82. }
  83. public function getProfile()
  84. {
  85. return $this->_profile;
  86. }
  87. public function getOptions()
  88. {
  89. return $this->_options;
  90. }
  91. public function getConnectionFactory()
  92. {
  93. return $this->_connectionFactory;
  94. }
  95. public function getClientFor($connectionAlias)
  96. {
  97. if (($connection = $this->getConnection($connectionAlias)) === null) {
  98. throw new \InvalidArgumentException("Invalid connection alias: '$connectionAlias'");
  99. }
  100. return new Client($connection, $this->_options);
  101. }
  102. public function connect()
  103. {
  104. $this->_connection->connect();
  105. }
  106. public function disconnect()
  107. {
  108. $this->_connection->disconnect();
  109. }
  110. public function quit()
  111. {
  112. $this->disconnect();
  113. }
  114. public function isConnected()
  115. {
  116. return $this->_connection->isConnected();
  117. }
  118. public function getConnection($id = null)
  119. {
  120. if (isset($id)) {
  121. if (!Helpers::isCluster($this->_connection)) {
  122. throw new ClientException(
  123. 'Retrieving connections by alias is supported only with clustered connections'
  124. );
  125. }
  126. return $this->_connection->getConnectionById($id);
  127. }
  128. return $this->_connection;
  129. }
  130. public function __call($method, $arguments)
  131. {
  132. $command = $this->_profile->createCommand($method, $arguments);
  133. return $this->_connection->executeCommand($command);
  134. }
  135. public function createCommand($method, $arguments = array())
  136. {
  137. return $this->_profile->createCommand($method, $arguments);
  138. }
  139. public function executeCommand(ICommand $command)
  140. {
  141. return $this->_connection->executeCommand($command);
  142. }
  143. public function executeCommandOnShards(ICommand $command)
  144. {
  145. if (Helpers::isCluster($this->_connection)) {
  146. $replies = array();
  147. foreach ($this->_connection as $connection) {
  148. $replies[] = $connection->executeCommand($command);
  149. }
  150. return $replies;
  151. }
  152. return array($this->_connection->executeCommand($command));
  153. }
  154. private function sharedInitializer($argv, $initializer)
  155. {
  156. switch (count($argv)) {
  157. case 0:
  158. return $this->$initializer();
  159. case 1:
  160. list($arg0) = $argv;
  161. return is_array($arg0) ? $this->$initializer($arg0) : $this->$initializer(null, $arg0);
  162. case 2:
  163. list($arg0, $arg1) = $argv;
  164. return $this->$initializer($arg0, $arg1);
  165. default:
  166. return $this->$initializer($this, $argv);
  167. }
  168. }
  169. public function pipeline(/* arguments */)
  170. {
  171. return $this->sharedInitializer(func_get_args(), 'initPipeline');
  172. }
  173. protected function initPipeline(Array $options = null, $pipelineBlock = null)
  174. {
  175. $pipeline = new PipelineContext($this, $options);
  176. return $this->pipelineExecute($pipeline, $pipelineBlock);
  177. }
  178. private function pipelineExecute(PipelineContext $pipeline, $block)
  179. {
  180. return $block !== null ? $pipeline->execute($block) : $pipeline;
  181. }
  182. public function multiExec(/* arguments */)
  183. {
  184. return $this->sharedInitializer(func_get_args(), 'initMultiExec');
  185. }
  186. protected function initMultiExec(Array $options = null, $block = null)
  187. {
  188. $transaction = new MultiExecContext($this, $options ?: array());
  189. return isset($block) ? $transaction->execute($block) : $transaction;
  190. }
  191. public function pubSub(/* arguments */)
  192. {
  193. return $this->sharedInitializer(func_get_args(), 'initPubSub');
  194. }
  195. protected function initPubSub(Array $options = null, $block = null)
  196. {
  197. $pubsub = new PubSubContext($this, $options);
  198. if (!isset($block)) {
  199. return $pubsub;
  200. }
  201. foreach ($pubsub as $message) {
  202. if ($block($pubsub, $message) === false) {
  203. $pubsub->closeContext();
  204. }
  205. }
  206. }
  207. public function monitor()
  208. {
  209. return new MonitorContext($this);
  210. }
  211. }