MultiExecContext.php 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272
  1. <?php
  2. namespace Predis\Transaction;
  3. use Predis\Client;
  4. use Predis\Helpers;
  5. use Predis\ResponseQueued;
  6. use Predis\ClientException;
  7. use Predis\ServerException;
  8. use Predis\CommunicationException;
  9. use Predis\Protocol\ProtocolException;
  10. class MultiExecContext {
  11. const STATE_RESET = 0x00000;
  12. const STATE_INITIALIZED = 0x00001;
  13. const STATE_INSIDEBLOCK = 0x00010;
  14. const STATE_DISCARDED = 0x00100;
  15. const STATE_CAS = 0x01000;
  16. const STATE_WATCH = 0x10000;
  17. private $_client;
  18. private $_options;
  19. private $_state;
  20. private $_canWatch;
  21. private $_commands;
  22. public function __construct(Client $client, Array $options = null) {
  23. $this->checkCapabilities($client);
  24. $this->_options = $options ?: array();
  25. $this->_client = $client;
  26. $this->reset();
  27. }
  28. protected function setState($flags) {
  29. $this->_state = $flags;
  30. }
  31. protected function getState() {
  32. return $this->_state;
  33. }
  34. protected function flagState($flags) {
  35. $this->_state |= $flags;
  36. }
  37. protected function unflagState($flags) {
  38. $this->_state &= ~$flags;
  39. }
  40. protected function checkState($flags) {
  41. return ($this->_state & $flags) === $flags;
  42. }
  43. private function checkCapabilities(Client $client) {
  44. if (Helpers::isCluster($client->getConnection())) {
  45. throw new ClientException(
  46. 'Cannot initialize a MULTI/EXEC context over a cluster of connections'
  47. );
  48. }
  49. $profile = $client->getProfile();
  50. if ($profile->supportsCommands(array('multi', 'exec', 'discard')) === false) {
  51. throw new ClientException(
  52. 'The current profile does not support MULTI, EXEC and DISCARD commands'
  53. );
  54. }
  55. $this->_canWatch = $profile->supportsCommands(array('watch', 'unwatch'));
  56. }
  57. private function isWatchSupported() {
  58. if ($this->_canWatch === false) {
  59. throw new ClientException(
  60. 'The current profile does not support WATCH and UNWATCH commands'
  61. );
  62. }
  63. }
  64. private function reset() {
  65. $this->setState(self::STATE_RESET);
  66. $this->_commands = array();
  67. }
  68. private function initialize() {
  69. if ($this->checkState(self::STATE_INITIALIZED)) {
  70. return;
  71. }
  72. $options = $this->_options;
  73. if (isset($options['cas']) && $options['cas']) {
  74. $this->flagState(self::STATE_CAS);
  75. }
  76. if (isset($options['watch'])) {
  77. $this->watch($options['watch']);
  78. }
  79. $cas = $this->checkState(self::STATE_CAS);
  80. $discarded = $this->checkState(self::STATE_DISCARDED);
  81. if (!$cas || ($cas && $discarded)) {
  82. $this->_client->multi();
  83. if ($discarded) {
  84. $this->unflagState(self::STATE_CAS);
  85. }
  86. }
  87. $this->unflagState(self::STATE_DISCARDED);
  88. $this->flagState(self::STATE_INITIALIZED);
  89. }
  90. public function __call($method, $arguments) {
  91. $this->initialize();
  92. $client = $this->_client;
  93. if ($this->checkState(self::STATE_CAS)) {
  94. return call_user_func_array(array($client, $method), $arguments);
  95. }
  96. $command = $client->createCommand($method, $arguments);
  97. $response = $client->executeCommand($command);
  98. if (!$response instanceof ResponseQueued) {
  99. $this->onProtocolError('The server did not respond with a QUEUED status reply');
  100. }
  101. $this->_commands[] = $command;
  102. return $this;
  103. }
  104. public function watch($keys) {
  105. $this->isWatchSupported();
  106. if ($this->checkState(self::STATE_INITIALIZED) && !$this->checkState(self::STATE_CAS)) {
  107. throw new ClientException('WATCH inside MULTI is not allowed');
  108. }
  109. $watchReply = $this->_client->watch($keys);
  110. $this->flagState(self::STATE_WATCH);
  111. return $watchReply;
  112. }
  113. public function multi() {
  114. if ($this->checkState(self::STATE_INITIALIZED | self::STATE_CAS)) {
  115. $this->unflagState(self::STATE_CAS);
  116. $this->_client->multi();
  117. }
  118. else {
  119. $this->initialize();
  120. }
  121. return $this;
  122. }
  123. public function unwatch() {
  124. $this->isWatchSupported();
  125. $this->unflagState(self::STATE_WATCH);
  126. $this->_client->unwatch();
  127. return $this;
  128. }
  129. public function discard() {
  130. if ($this->checkState(self::STATE_INITIALIZED)) {
  131. $command = $this->checkState(self::STATE_CAS) ? 'unwatch' : 'discard';
  132. $this->_client->$command();
  133. $this->reset();
  134. $this->flagState(self::STATE_DISCARDED);
  135. }
  136. return $this;
  137. }
  138. public function exec() {
  139. return $this->execute();
  140. }
  141. private function checkBeforeExecution($block) {
  142. if ($this->checkState(self::STATE_INSIDEBLOCK)) {
  143. throw new ClientException(
  144. "Cannot invoke 'execute' or 'exec' inside an active client transaction block"
  145. );
  146. }
  147. if ($block) {
  148. if (!is_callable($block)) {
  149. throw new \InvalidArgumentException(
  150. 'Argument passed must be a callable object'
  151. );
  152. }
  153. if (count($this->_commands) > 0) {
  154. $this->discard();
  155. throw new ClientException(
  156. 'Cannot execute a transaction block after using fluent interface'
  157. );
  158. }
  159. }
  160. if (isset($this->_options['retry']) && !isset($block)) {
  161. $this->discard();
  162. throw new \InvalidArgumentException(
  163. 'Automatic retries can be used only when a transaction block is provided'
  164. );
  165. }
  166. }
  167. public function execute($block = null) {
  168. $this->checkBeforeExecution($block);
  169. $reply = null;
  170. $returnValues = array();
  171. $attemptsLeft = isset($this->_options['retry']) ? (int)$this->_options['retry'] : 0;
  172. do {
  173. if ($block !== null) {
  174. $this->executeTransactionBlock($block);
  175. }
  176. if (count($this->_commands) === 0) {
  177. if ($this->checkState(self::STATE_WATCH)) {
  178. $this->discard();
  179. }
  180. return;
  181. }
  182. $reply = $this->_client->exec();
  183. if ($reply === null) {
  184. if ($attemptsLeft === 0) {
  185. $message = 'The current transaction has been aborted by the server';
  186. throw new AbortedMultiExecException($this, $message);
  187. }
  188. $this->reset();
  189. if (isset($this->_options['on_retry']) && is_callable($this->_options['on_retry'])) {
  190. call_user_func($this->_options['on_retry'], $this, $attemptsLeft);
  191. }
  192. continue;
  193. }
  194. break;
  195. } while ($attemptsLeft-- > 0);
  196. $execReply = $reply instanceof \Iterator ? iterator_to_array($reply) : $reply;
  197. $sizeofReplies = count($execReply);
  198. $commands = $this->_commands;
  199. if ($sizeofReplies !== count($commands)) {
  200. $this->onProtocolError('Unexpected number of responses for a MultiExecContext');
  201. }
  202. for ($i = 0; $i < $sizeofReplies; $i++) {
  203. $commandReply = $execReply[$i];
  204. if ($commandReply instanceof \Iterator) {
  205. $commandReply = iterator_to_array($commandReply);
  206. }
  207. $returnValues[$i] = $commands[$i]->parseResponse($commandReply);
  208. unset($commands[$i]);
  209. }
  210. return $returnValues;
  211. }
  212. private function executeTransactionBlock($block) {
  213. $blockException = null;
  214. $this->flagState(self::STATE_INSIDEBLOCK);
  215. try {
  216. $block($this);
  217. }
  218. catch (CommunicationException $exception) {
  219. $blockException = $exception;
  220. }
  221. catch (ServerException $exception) {
  222. $blockException = $exception;
  223. }
  224. catch (\Exception $exception) {
  225. $blockException = $exception;
  226. $this->discard();
  227. }
  228. $this->unflagState(self::STATE_INSIDEBLOCK);
  229. if ($blockException !== null) {
  230. throw $blockException;
  231. }
  232. }
  233. private function onProtocolError($message) {
  234. // Since a MULTI/EXEC block cannot be initialized over a clustered
  235. // connection, we can safely assume that Predis\Client::getConnection()
  236. // will always return an instance of Predis\Network\IConnectionSingle.
  237. Helpers::onCommunicationException(new ProtocolException(
  238. $this->_client->getConnection(), $message
  239. ));
  240. }
  241. }