MultiExecContext.php 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232
  1. <?php
  2. namespace Predis;
  3. class MultiExecContext {
  4. private $_initialized, $_discarded, $_insideBlock, $_checkAndSet, $_watchedKeys;
  5. private $_client, $_options, $_commands, $_supportsWatch;
  6. public function __construct(Client $client, Array $options = null) {
  7. $this->checkCapabilities($client);
  8. $this->_options = $options ?: array();
  9. $this->_client = $client;
  10. $this->reset();
  11. }
  12. private function checkCapabilities(Client $client) {
  13. if (Utils::isCluster($client->getConnection())) {
  14. throw new ClientException(
  15. 'Cannot initialize a MULTI/EXEC context over a cluster of connections'
  16. );
  17. }
  18. $profile = $client->getProfile();
  19. if ($profile->supportsCommands(array('multi', 'exec', 'discard')) === false) {
  20. throw new ClientException(
  21. 'The current profile does not support MULTI, EXEC and DISCARD commands'
  22. );
  23. }
  24. $this->_supportsWatch = $profile->supportsCommands(array('watch', 'unwatch'));
  25. }
  26. private function isWatchSupported() {
  27. if ($this->_supportsWatch === false) {
  28. throw new ClientException(
  29. 'The current profile does not support WATCH and UNWATCH commands'
  30. );
  31. }
  32. }
  33. private function reset() {
  34. $this->_initialized = false;
  35. $this->_discarded = false;
  36. $this->_checkAndSet = false;
  37. $this->_insideBlock = false;
  38. $this->_watchedKeys = false;
  39. $this->_commands = array();
  40. }
  41. private function initialize() {
  42. if ($this->_initialized === true) {
  43. return;
  44. }
  45. $options = $this->_options;
  46. $this->_checkAndSet = isset($options['cas']) && $options['cas'];
  47. if (isset($options['watch'])) {
  48. $this->watch($options['watch']);
  49. }
  50. if (!$this->_checkAndSet || ($this->_discarded && $this->_checkAndSet)) {
  51. $this->_client->multi();
  52. if ($this->_discarded) {
  53. $this->_checkAndSet = false;
  54. }
  55. }
  56. $this->_initialized = true;
  57. $this->_discarded = false;
  58. }
  59. public function __call($method, $arguments) {
  60. $this->initialize();
  61. $client = $this->_client;
  62. if ($this->_checkAndSet) {
  63. return call_user_func_array(array($client, $method), $arguments);
  64. }
  65. $command = $client->createCommand($method, $arguments);
  66. $response = $client->executeCommand($command);
  67. if (!$response instanceof ResponseQueued) {
  68. $this->malformedServerResponse(
  69. 'The server did not respond with a QUEUED status reply'
  70. );
  71. }
  72. $this->_commands[] = $command;
  73. return $this;
  74. }
  75. public function watch($keys) {
  76. $this->isWatchSupported();
  77. $this->_watchedKeys = true;
  78. if ($this->_initialized && !$this->_checkAndSet) {
  79. throw new ClientException('WATCH inside MULTI is not allowed');
  80. }
  81. return $this->_client->watch($keys);
  82. }
  83. public function multi() {
  84. if ($this->_initialized && $this->_checkAndSet) {
  85. $this->_checkAndSet = false;
  86. $this->_client->multi();
  87. return $this;
  88. }
  89. $this->initialize();
  90. return $this;
  91. }
  92. public function unwatch() {
  93. $this->isWatchSupported();
  94. $this->_watchedKeys = false;
  95. $this->_client->unwatch();
  96. return $this;
  97. }
  98. public function discard() {
  99. if ($this->_initialized === true) {
  100. $command = $this->_checkAndSet ? 'unwatch' : 'discard';
  101. $this->_client->$command();
  102. $this->reset();
  103. $this->_discarded = true;
  104. }
  105. return $this;
  106. }
  107. public function exec() {
  108. return $this->execute();
  109. }
  110. private function checkBeforeExecution($block) {
  111. if ($this->_insideBlock === true) {
  112. throw new ClientException(
  113. "Cannot invoke 'execute' or 'exec' inside an active client transaction block"
  114. );
  115. }
  116. if ($block) {
  117. if (!is_callable($block)) {
  118. throw new \InvalidArgumentException(
  119. 'Argument passed must be a callable object'
  120. );
  121. }
  122. if (count($this->_commands) > 0) {
  123. $this->discard();
  124. throw new ClientException(
  125. 'Cannot execute a transaction block after using fluent interface'
  126. );
  127. }
  128. }
  129. if (isset($this->_options['retry']) && !isset($block)) {
  130. $this->discard();
  131. throw new \InvalidArgumentException(
  132. 'Automatic retries can be used only when a transaction block is provided'
  133. );
  134. }
  135. }
  136. public function execute($block = null) {
  137. $this->checkBeforeExecution($block);
  138. $reply = null;
  139. $returnValues = array();
  140. $attemptsLeft = isset($this->_options['retry']) ? (int)$this->_options['retry'] : 0;
  141. do {
  142. $blockException = null;
  143. if ($block !== null) {
  144. $this->_insideBlock = true;
  145. try {
  146. $block($this);
  147. }
  148. catch (CommunicationException $exception) {
  149. $blockException = $exception;
  150. }
  151. catch (ServerException $exception) {
  152. $blockException = $exception;
  153. }
  154. catch (\Exception $exception) {
  155. $blockException = $exception;
  156. $this->discard();
  157. }
  158. $this->_insideBlock = false;
  159. if ($blockException !== null) {
  160. throw $blockException;
  161. }
  162. }
  163. if (count($this->_commands) === 0) {
  164. if ($this->_watchedKeys) {
  165. $this->discard();
  166. return;
  167. }
  168. return;
  169. }
  170. $reply = $this->_client->exec();
  171. if ($reply === null) {
  172. if ($attemptsLeft === 0) {
  173. throw new AbortedMultiExec(
  174. 'The current transaction has been aborted by the server'
  175. );
  176. }
  177. $this->reset();
  178. if (isset($this->_options['on_retry']) && is_callable($this->_options['on_retry'])) {
  179. call_user_func($this->_options['on_retry'], $this, $attemptsLeft);
  180. }
  181. continue;
  182. }
  183. break;
  184. } while ($attemptsLeft-- > 0);
  185. $execReply = $reply instanceof \Iterator ? iterator_to_array($reply) : $reply;
  186. $sizeofReplies = count($execReply);
  187. $commands = &$this->_commands;
  188. if ($sizeofReplies !== count($commands)) {
  189. $this->malformedServerResponse(
  190. 'Unexpected number of responses for a MultiExecContext'
  191. );
  192. }
  193. for ($i = 0; $i < $sizeofReplies; $i++) {
  194. $returnValues[] = $commands[$i]->parseResponse($execReply[$i] instanceof \Iterator
  195. ? iterator_to_array($execReply[$i])
  196. : $execReply[$i]
  197. );
  198. unset($commands[$i]);
  199. }
  200. return $returnValues;
  201. }
  202. private function malformedServerResponse($message) {
  203. // Since a MULTI/EXEC block cannot be initialized over a clustered
  204. // connection, we can safely assume that Predis\Client::getConnection()
  205. // will always return an instance of Predis\Network\IConnectionSingle.
  206. Utils::onCommunicationException(new MalformedServerResponse(
  207. $this->_client->getConnection(), $message
  208. ));
  209. }
  210. }