Client.php 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220
  1. <?php
  2. namespace Predis;
  3. use Predis\Commands\ICommand;
  4. use Predis\Network\IConnection;
  5. use Predis\Network\IConnectionSingle;
  6. use Predis\Profiles\IServerProfile;
  7. use Predis\Profiles\ServerProfile;
  8. use Predis\Pipeline\PipelineContext;
  9. class Client {
  10. const VERSION = '0.7.0-dev';
  11. private $_options, $_connectionFactory, $_profile, $_connection;
  12. public function __construct($parameters = null, $options = null) {
  13. $options = $this->filterOptions($options);
  14. $this->_options = $options;
  15. $profile = $options->profile;
  16. if (isset($options->prefix)) {
  17. $profile->setProcessor($options->prefix);
  18. }
  19. $this->_profile = $profile;
  20. $this->_connectionFactory = $options->connections;
  21. $this->_connection = $this->initializeConnection($parameters);
  22. }
  23. private function filterOptions($options) {
  24. if ($options === null) {
  25. return new ClientOptions();
  26. }
  27. if (is_array($options)) {
  28. return new ClientOptions($options);
  29. }
  30. if ($options instanceof ClientOptions) {
  31. return $options;
  32. }
  33. if ($options instanceof IServerProfile) {
  34. return new ClientOptions(array('profile' => $options));
  35. }
  36. if (is_string($options)) {
  37. return new ClientOptions(array('profile' => ServerProfile::get($options)));
  38. }
  39. throw new \InvalidArgumentException("Invalid type for client options");
  40. }
  41. private function initializeConnection($parameters) {
  42. if ($parameters === null) {
  43. return $this->createConnection(new ConnectionParameters());
  44. }
  45. if (is_array($parameters)) {
  46. if (isset($parameters[0])) {
  47. $cluster = $this->_options->cluster;
  48. foreach ($parameters as $single) {
  49. $cluster->add($single instanceof IConnectionSingle
  50. ? $single : $this->createConnection($single)
  51. );
  52. }
  53. return $cluster;
  54. }
  55. return $this->createConnection($parameters);
  56. }
  57. if ($parameters instanceof IConnection) {
  58. return $parameters;
  59. }
  60. return $this->createConnection($parameters);
  61. }
  62. protected function createConnection($parameters) {
  63. $connection = $this->_connectionFactory->create($parameters);
  64. $parameters = $connection->getParameters();
  65. if (isset($parameters->password)) {
  66. $connection->pushInitCommand(
  67. $this->createCommand('auth', array($parameters->password))
  68. );
  69. }
  70. if (isset($parameters->database)) {
  71. $connection->pushInitCommand(
  72. $this->createCommand('select', array($parameters->database))
  73. );
  74. }
  75. return $connection;
  76. }
  77. public function getProfile() {
  78. return $this->_profile;
  79. }
  80. public function getOptions() {
  81. return $this->_options;
  82. }
  83. public function getConnectionFactory() {
  84. return $this->_connectionFactory;
  85. }
  86. public function getClientFor($connectionAlias) {
  87. if (($connection = $this->getConnection($connectionAlias)) === null) {
  88. throw new \InvalidArgumentException(
  89. "Invalid connection alias: '$connectionAlias'"
  90. );
  91. }
  92. return new Client($connection, $this->_options);
  93. }
  94. public function connect() {
  95. $this->_connection->connect();
  96. }
  97. public function disconnect() {
  98. $this->_connection->disconnect();
  99. }
  100. public function quit() {
  101. $this->disconnect();
  102. }
  103. public function isConnected() {
  104. return $this->_connection->isConnected();
  105. }
  106. public function getConnection($id = null) {
  107. if (isset($id)) {
  108. if (!Helpers::isCluster($this->_connection)) {
  109. throw new ClientException(
  110. 'Retrieving connections by alias is supported '.
  111. 'only with clustered connections'
  112. );
  113. }
  114. return $this->_connection->getConnectionById($id);
  115. }
  116. return $this->_connection;
  117. }
  118. public function __call($method, $arguments) {
  119. $command = $this->_profile->createCommand($method, $arguments);
  120. return $this->_connection->executeCommand($command);
  121. }
  122. public function createCommand($method, $arguments = array()) {
  123. return $this->_profile->createCommand($method, $arguments);
  124. }
  125. public function executeCommand(ICommand $command) {
  126. return $this->_connection->executeCommand($command);
  127. }
  128. public function executeCommandOnShards(ICommand $command) {
  129. if (Helpers::isCluster($this->_connection)) {
  130. $replies = array();
  131. foreach ($this->_connection as $connection) {
  132. $replies[] = $connection->executeCommand($command);
  133. }
  134. return $replies;
  135. }
  136. return array($this->_connection->executeCommand($command));
  137. }
  138. private function sharedInitializer($argv, $initializer) {
  139. switch (count($argv)) {
  140. case 0:
  141. return $this->$initializer();
  142. case 1:
  143. list($arg0) = $argv;
  144. return is_array($arg0)
  145. ? $this->$initializer($arg0)
  146. : $this->$initializer(null, $arg0);
  147. case 2:
  148. list($arg0, $arg1) = $argv;
  149. return $this->$initializer($arg0, $arg1);
  150. default:
  151. return $this->$initializer($this, $argv);
  152. }
  153. }
  154. public function pipeline(/* arguments */) {
  155. return $this->sharedInitializer(func_get_args(), 'initPipeline');
  156. }
  157. private function initPipeline(Array $options = null, $pipelineBlock = null) {
  158. $pipeline = null;
  159. if (isset($options)) {
  160. if (isset($options['safe']) && $options['safe'] == true) {
  161. $connection = $this->_connection;
  162. $pipeline = new PipelineContext($this,
  163. Helpers::isCluster($connection)
  164. ? new Pipeline\SafeClusterExecutor()
  165. : new Pipeline\SafeExecutor()
  166. );
  167. }
  168. else {
  169. $pipeline = new PipelineContext($this);
  170. }
  171. }
  172. return $this->pipelineExecute(
  173. $pipeline ?: new PipelineContext($this), $pipelineBlock
  174. );
  175. }
  176. private function pipelineExecute(PipelineContext $pipeline, $block) {
  177. return $block !== null ? $pipeline->execute($block) : $pipeline;
  178. }
  179. public function multiExec(/* arguments */) {
  180. return $this->sharedInitializer(func_get_args(), 'initMultiExec');
  181. }
  182. private function initMultiExec(Array $options = null, $transBlock = null) {
  183. $multi = isset($options) ? new MultiExecContext($this, $options) : new MultiExecContext($this);
  184. return $transBlock !== null ? $multi->execute($transBlock) : $multi;
  185. }
  186. public function pubSubContext(Array $options = null) {
  187. return new PubSubContext($this, $options);
  188. }
  189. public function monitor() {
  190. return new MonitorContext($this);
  191. }
  192. }