Client.php 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217
  1. <?php
  2. namespace Predis;
  3. use Predis\Commands\ICommand;
  4. use Predis\Network\IConnection;
  5. use Predis\Network\IConnectionSingle;
  6. use Predis\Network\ConnectionCluster;
  7. use Predis\Profiles\ServerProfile;
  8. use Predis\Profiles\IServerProfile;
  9. class Client {
  10. const VERSION = '0.7.0-dev';
  11. private $_options, $_connectionFactory, $_profile, $_connection;
  12. public function __construct($parameters = null, $options = null) {
  13. $options = $this->filterOptions($options);
  14. $this->_options = $options;
  15. $this->_profile = $options->profile;
  16. $this->_connectionFactory = $options->connections;
  17. $this->_connection = $this->initializeConnection($parameters);
  18. }
  19. private function filterOptions($options) {
  20. if ($options === null) {
  21. return new ClientOptions();
  22. }
  23. if ($options instanceof ClientOptions) {
  24. return $options;
  25. }
  26. if (is_array($options)) {
  27. return new ClientOptions($options);
  28. }
  29. if ($options instanceof IServerProfile) {
  30. return new ClientOptions(array('profile' => $options));
  31. }
  32. if (is_string($options)) {
  33. return new ClientOptions(array('profile' => ServerProfile::get($options)));
  34. }
  35. throw new \InvalidArgumentException("Invalid type for client options");
  36. }
  37. private function initializeConnection($parameters) {
  38. if ($parameters === null) {
  39. return $this->createConnection(new ConnectionParameters());
  40. }
  41. if (is_array($parameters)) {
  42. if (isset($parameters[0])) {
  43. $clusterClass = $this->_options->cluster;
  44. $cluster = new $clusterClass($this->_options->key_distribution);
  45. foreach ($parameters as $single) {
  46. $cluster->add($single instanceof IConnectionSingle
  47. ? $single : $this->createConnection($single)
  48. );
  49. }
  50. return $cluster;
  51. }
  52. return $this->createConnection($parameters);
  53. }
  54. if ($parameters instanceof IConnection) {
  55. return $parameters;
  56. }
  57. return $this->createConnection($parameters);
  58. }
  59. private function createConnection($parameters) {
  60. $connection = $this->_connectionFactory->create($parameters);
  61. $this->pushInitCommands($connection);
  62. return $connection;
  63. }
  64. private function pushInitCommands(IConnectionSingle $connection) {
  65. $params = $connection->getParameters();
  66. if (isset($params->password)) {
  67. $connection->pushInitCommand($this->createCommand(
  68. 'auth', array($params->password)
  69. ));
  70. }
  71. if (isset($params->database)) {
  72. $connection->pushInitCommand($this->createCommand(
  73. 'select', array($params->database)
  74. ));
  75. }
  76. }
  77. public function getProfile() {
  78. return $this->_profile;
  79. }
  80. public function getOptions() {
  81. return $this->_options;
  82. }
  83. public function getConnectionFactory() {
  84. return $this->_connectionFactory;
  85. }
  86. public function getClientFor($connectionAlias) {
  87. if (($connection = $this->getConnection($connectionAlias)) === null) {
  88. throw new \InvalidArgumentException(
  89. "Invalid connection alias: '$connectionAlias'"
  90. );
  91. }
  92. return new Client($connection, $this->_options);
  93. }
  94. public function connect() {
  95. $this->_connection->connect();
  96. }
  97. public function disconnect() {
  98. $this->_connection->disconnect();
  99. }
  100. public function quit() {
  101. $this->disconnect();
  102. }
  103. public function isConnected() {
  104. return $this->_connection->isConnected();
  105. }
  106. public function getConnection($id = null) {
  107. if (isset($id)) {
  108. if (!Helpers::isCluster($this->_connection)) {
  109. throw new ClientException(
  110. 'Retrieving connections by alias is supported '.
  111. 'only with clustered connections'
  112. );
  113. }
  114. return $this->_connection->getConnectionById($id);
  115. }
  116. return $this->_connection;
  117. }
  118. public function __call($method, $arguments) {
  119. $command = $this->_profile->createCommand($method, $arguments);
  120. return $this->_connection->executeCommand($command);
  121. }
  122. public function createCommand($method, $arguments = array()) {
  123. return $this->_profile->createCommand($method, $arguments);
  124. }
  125. public function executeCommand(ICommand $command) {
  126. return $this->_connection->executeCommand($command);
  127. }
  128. public function executeCommandOnShards(ICommand $command) {
  129. if (Helpers::isCluster($this->_connection)) {
  130. $replies = array();
  131. foreach ($this->_connection as $connection) {
  132. $replies[] = $connection->executeCommand($command);
  133. }
  134. return $replies;
  135. }
  136. return array($this->_connection->executeCommand($command));
  137. }
  138. private function sharedInitializer($argv, $initializer) {
  139. switch (count($argv)) {
  140. case 0:
  141. return $this->$initializer();
  142. case 1:
  143. list($arg0) = $argv;
  144. return is_array($arg0)
  145. ? $this->$initializer($arg0)
  146. : $this->$initializer(null, $arg0);
  147. case 2:
  148. list($arg0, $arg1) = $argv;
  149. return $this->$initializer($arg0, $arg1);
  150. default:
  151. return $this->$initializer($this, $argv);
  152. }
  153. }
  154. public function pipeline(/* arguments */) {
  155. return $this->sharedInitializer(func_get_args(), 'initPipeline');
  156. }
  157. private function initPipeline(Array $options = null, $pipelineBlock = null) {
  158. $pipeline = null;
  159. if (isset($options)) {
  160. if (isset($options['safe']) && $options['safe'] == true) {
  161. $connection = $this->_connection;
  162. $pipeline = new PipelineContext($this,
  163. Helpers::isCluster($connection)
  164. ? new Pipeline\SafeClusterExecutor($connection)
  165. : new Pipeline\SafeExecutor($connection)
  166. );
  167. }
  168. else {
  169. $pipeline = new PipelineContext($this);
  170. }
  171. }
  172. return $this->pipelineExecute(
  173. $pipeline ?: new PipelineContext($this), $pipelineBlock
  174. );
  175. }
  176. private function pipelineExecute(PipelineContext $pipeline, $block) {
  177. return $block !== null ? $pipeline->execute($block) : $pipeline;
  178. }
  179. public function multiExec(/* arguments */) {
  180. return $this->sharedInitializer(func_get_args(), 'initMultiExec');
  181. }
  182. private function initMultiExec(Array $options = null, $transBlock = null) {
  183. $multi = isset($options) ? new MultiExecContext($this, $options) : new MultiExecContext($this);
  184. return $transBlock !== null ? $multi->execute($transBlock) : $multi;
  185. }
  186. public function pubSubContext(Array $options = null) {
  187. return new PubSubContext($this, $options);
  188. }
  189. }