MultiExecContext.php 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242
  1. <?php
  2. namespace Predis\Transaction;
  3. use Predis\Client;
  4. use Predis\Helpers;
  5. use Predis\ResponseQueued;
  6. use Predis\ClientException;
  7. use Predis\ServerException;
  8. use Predis\CommunicationException;
  9. use Predis\Protocol\ProtocolException;
  10. class MultiExecContext {
  11. private $_client;
  12. private $_options;
  13. private $_commands;
  14. private $_supportsWatch;
  15. private $_initialized;
  16. private $_discarded;
  17. private $_insideBlock;
  18. private $_checkAndSet;
  19. private $_watchedKeys;
  20. public function __construct(Client $client, Array $options = null) {
  21. $this->checkCapabilities($client);
  22. $this->_options = $options ?: array();
  23. $this->_client = $client;
  24. $this->reset();
  25. }
  26. private function checkCapabilities(Client $client) {
  27. if (Helpers::isCluster($client->getConnection())) {
  28. throw new ClientException(
  29. 'Cannot initialize a MULTI/EXEC context over a cluster of connections'
  30. );
  31. }
  32. $profile = $client->getProfile();
  33. if ($profile->supportsCommands(array('multi', 'exec', 'discard')) === false) {
  34. throw new ClientException(
  35. 'The current profile does not support MULTI, EXEC and DISCARD commands'
  36. );
  37. }
  38. $this->_supportsWatch = $profile->supportsCommands(array('watch', 'unwatch'));
  39. }
  40. private function isWatchSupported() {
  41. if ($this->_supportsWatch === false) {
  42. throw new ClientException(
  43. 'The current profile does not support WATCH and UNWATCH commands'
  44. );
  45. }
  46. }
  47. private function reset() {
  48. $this->_initialized = false;
  49. $this->_discarded = false;
  50. $this->_checkAndSet = false;
  51. $this->_insideBlock = false;
  52. $this->_watchedKeys = false;
  53. $this->_commands = array();
  54. }
  55. private function initialize() {
  56. if ($this->_initialized === true) {
  57. return;
  58. }
  59. $options = $this->_options;
  60. $this->_checkAndSet = isset($options['cas']) && $options['cas'];
  61. if (isset($options['watch'])) {
  62. $this->watch($options['watch']);
  63. }
  64. if (!$this->_checkAndSet || ($this->_discarded && $this->_checkAndSet)) {
  65. $this->_client->multi();
  66. if ($this->_discarded) {
  67. $this->_checkAndSet = false;
  68. }
  69. }
  70. $this->_initialized = true;
  71. $this->_discarded = false;
  72. }
  73. public function __call($method, $arguments) {
  74. $this->initialize();
  75. $client = $this->_client;
  76. if ($this->_checkAndSet) {
  77. return call_user_func_array(array($client, $method), $arguments);
  78. }
  79. $command = $client->createCommand($method, $arguments);
  80. $response = $client->executeCommand($command);
  81. if (!$response instanceof ResponseQueued) {
  82. $this->onProtocolError('The server did not respond with a QUEUED status reply');
  83. }
  84. $this->_commands[] = $command;
  85. return $this;
  86. }
  87. public function watch($keys) {
  88. $this->isWatchSupported();
  89. $this->_watchedKeys = true;
  90. if ($this->_initialized && !$this->_checkAndSet) {
  91. throw new ClientException('WATCH inside MULTI is not allowed');
  92. }
  93. return $this->_client->watch($keys);
  94. }
  95. public function multi() {
  96. if ($this->_initialized && $this->_checkAndSet) {
  97. $this->_checkAndSet = false;
  98. $this->_client->multi();
  99. return $this;
  100. }
  101. $this->initialize();
  102. return $this;
  103. }
  104. public function unwatch() {
  105. $this->isWatchSupported();
  106. $this->_watchedKeys = false;
  107. $this->_client->unwatch();
  108. return $this;
  109. }
  110. public function discard() {
  111. if ($this->_initialized === true) {
  112. $command = $this->_checkAndSet ? 'unwatch' : 'discard';
  113. $this->_client->$command();
  114. $this->reset();
  115. $this->_discarded = true;
  116. }
  117. return $this;
  118. }
  119. public function exec() {
  120. return $this->execute();
  121. }
  122. private function checkBeforeExecution($block) {
  123. if ($this->_insideBlock === true) {
  124. throw new ClientException(
  125. "Cannot invoke 'execute' or 'exec' inside an active client transaction block"
  126. );
  127. }
  128. if ($block) {
  129. if (!is_callable($block)) {
  130. throw new \InvalidArgumentException(
  131. 'Argument passed must be a callable object'
  132. );
  133. }
  134. if (count($this->_commands) > 0) {
  135. $this->discard();
  136. throw new ClientException(
  137. 'Cannot execute a transaction block after using fluent interface'
  138. );
  139. }
  140. }
  141. if (isset($this->_options['retry']) && !isset($block)) {
  142. $this->discard();
  143. throw new \InvalidArgumentException(
  144. 'Automatic retries can be used only when a transaction block is provided'
  145. );
  146. }
  147. }
  148. public function execute($block = null) {
  149. $this->checkBeforeExecution($block);
  150. $reply = null;
  151. $returnValues = array();
  152. $attemptsLeft = isset($this->_options['retry']) ? (int)$this->_options['retry'] : 0;
  153. do {
  154. $blockException = null;
  155. if ($block !== null) {
  156. $this->_insideBlock = true;
  157. try {
  158. $block($this);
  159. }
  160. catch (CommunicationException $exception) {
  161. $blockException = $exception;
  162. }
  163. catch (ServerException $exception) {
  164. $blockException = $exception;
  165. }
  166. catch (\Exception $exception) {
  167. $blockException = $exception;
  168. $this->discard();
  169. }
  170. $this->_insideBlock = false;
  171. if ($blockException !== null) {
  172. throw $blockException;
  173. }
  174. }
  175. if (count($this->_commands) === 0) {
  176. if ($this->_watchedKeys) {
  177. $this->discard();
  178. return;
  179. }
  180. return;
  181. }
  182. $reply = $this->_client->exec();
  183. if ($reply === null) {
  184. if ($attemptsLeft === 0) {
  185. $message = 'The current transaction has been aborted by the server';
  186. throw new AbortedMultiExecException($this, $message);
  187. }
  188. $this->reset();
  189. if (isset($this->_options['on_retry']) && is_callable($this->_options['on_retry'])) {
  190. call_user_func($this->_options['on_retry'], $this, $attemptsLeft);
  191. }
  192. continue;
  193. }
  194. break;
  195. } while ($attemptsLeft-- > 0);
  196. $execReply = $reply instanceof \Iterator ? iterator_to_array($reply) : $reply;
  197. $sizeofReplies = count($execReply);
  198. $commands = &$this->_commands;
  199. if ($sizeofReplies !== count($commands)) {
  200. $this->onProtocolError('Unexpected number of responses for a MultiExecContext');
  201. }
  202. for ($i = 0; $i < $sizeofReplies; $i++) {
  203. $returnValues[] = $commands[$i]->parseResponse($execReply[$i] instanceof \Iterator
  204. ? iterator_to_array($execReply[$i])
  205. : $execReply[$i]
  206. );
  207. unset($commands[$i]);
  208. }
  209. return $returnValues;
  210. }
  211. private function onProtocolError($message) {
  212. // Since a MULTI/EXEC block cannot be initialized over a clustered
  213. // connection, we can safely assume that Predis\Client::getConnection()
  214. // will always return an instance of Predis\Network\IConnectionSingle.
  215. Helpers::onCommunicationException(new ProtocolException(
  216. $this->_client->getConnection(), $message
  217. ));
  218. }
  219. }