MultiExecContext.php 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327
  1. <?php
  2. namespace Predis\Transaction;
  3. use Predis\Client;
  4. use Predis\Helpers;
  5. use Predis\ResponseQueued;
  6. use Predis\ClientException;
  7. use Predis\ServerException;
  8. use Predis\CommunicationException;
  9. use Predis\Protocol\ProtocolException;
  10. class MultiExecContext
  11. {
  12. const STATE_RESET = 0x00000;
  13. const STATE_INITIALIZED = 0x00001;
  14. const STATE_INSIDEBLOCK = 0x00010;
  15. const STATE_DISCARDED = 0x00100;
  16. const STATE_CAS = 0x01000;
  17. const STATE_WATCH = 0x10000;
  18. private $_state;
  19. private $_canWatch;
  20. protected $_client;
  21. protected $_options;
  22. protected $_commands;
  23. public function __construct(Client $client, Array $options = null)
  24. {
  25. $this->checkCapabilities($client);
  26. $this->_options = $options ?: array();
  27. $this->_client = $client;
  28. $this->reset();
  29. }
  30. protected function setState($flags)
  31. {
  32. $this->_state = $flags;
  33. }
  34. protected function getState()
  35. {
  36. return $this->_state;
  37. }
  38. protected function flagState($flags)
  39. {
  40. $this->_state |= $flags;
  41. }
  42. protected function unflagState($flags)
  43. {
  44. $this->_state &= ~$flags;
  45. }
  46. protected function checkState($flags)
  47. {
  48. return ($this->_state & $flags) === $flags;
  49. }
  50. private function checkCapabilities(Client $client)
  51. {
  52. if (Helpers::isCluster($client->getConnection())) {
  53. throw new ClientException(
  54. 'Cannot initialize a MULTI/EXEC context over a cluster of connections'
  55. );
  56. }
  57. $profile = $client->getProfile();
  58. if ($profile->supportsCommands(array('multi', 'exec', 'discard')) === false) {
  59. throw new ClientException(
  60. 'The current profile does not support MULTI, EXEC and DISCARD'
  61. );
  62. }
  63. $this->_canWatch = $profile->supportsCommands(array('watch', 'unwatch'));
  64. }
  65. private function isWatchSupported()
  66. {
  67. if ($this->_canWatch === false) {
  68. throw new ClientException(
  69. 'The current profile does not support WATCH and UNWATCH'
  70. );
  71. }
  72. }
  73. protected function reset()
  74. {
  75. $this->setState(self::STATE_RESET);
  76. $this->_commands = array();
  77. }
  78. protected function initialize()
  79. {
  80. if ($this->checkState(self::STATE_INITIALIZED)) {
  81. return;
  82. }
  83. $options = $this->_options;
  84. if (isset($options['cas']) && $options['cas']) {
  85. $this->flagState(self::STATE_CAS);
  86. }
  87. if (isset($options['watch'])) {
  88. $this->watch($options['watch']);
  89. }
  90. $cas = $this->checkState(self::STATE_CAS);
  91. $discarded = $this->checkState(self::STATE_DISCARDED);
  92. if (!$cas || ($cas && $discarded)) {
  93. $this->_client->multi();
  94. if ($discarded) {
  95. $this->unflagState(self::STATE_CAS);
  96. }
  97. }
  98. $this->unflagState(self::STATE_DISCARDED);
  99. $this->flagState(self::STATE_INITIALIZED);
  100. }
  101. public function __call($method, $arguments)
  102. {
  103. $this->initialize();
  104. $client = $this->_client;
  105. if ($this->checkState(self::STATE_CAS)) {
  106. return call_user_func_array(array($client, $method), $arguments);
  107. }
  108. $command = $client->createCommand($method, $arguments);
  109. $response = $client->executeCommand($command);
  110. if (!$response instanceof ResponseQueued) {
  111. $this->onProtocolError('The server did not respond with a QUEUED status reply');
  112. }
  113. $this->_commands[] = $command;
  114. return $this;
  115. }
  116. public function watch($keys)
  117. {
  118. $this->isWatchSupported();
  119. if ($this->checkState(self::STATE_INITIALIZED) && !$this->checkState(self::STATE_CAS)) {
  120. throw new ClientException('WATCH after MULTI is not allowed');
  121. }
  122. $watchReply = $this->_client->watch($keys);
  123. $this->flagState(self::STATE_WATCH);
  124. return $watchReply;
  125. }
  126. public function multi()
  127. {
  128. if ($this->checkState(self::STATE_INITIALIZED | self::STATE_CAS)) {
  129. $this->unflagState(self::STATE_CAS);
  130. $this->_client->multi();
  131. }
  132. else {
  133. $this->initialize();
  134. }
  135. return $this;
  136. }
  137. public function unwatch()
  138. {
  139. $this->isWatchSupported();
  140. $this->unflagState(self::STATE_WATCH);
  141. $this->_client->unwatch();
  142. return $this;
  143. }
  144. public function discard()
  145. {
  146. if ($this->checkState(self::STATE_INITIALIZED)) {
  147. $command = $this->checkState(self::STATE_CAS) ? 'unwatch' : 'discard';
  148. $this->_client->$command();
  149. $this->reset();
  150. $this->flagState(self::STATE_DISCARDED);
  151. }
  152. return $this;
  153. }
  154. public function exec()
  155. {
  156. return $this->execute();
  157. }
  158. private function checkBeforeExecution($block)
  159. {
  160. if ($this->checkState(self::STATE_INSIDEBLOCK)) {
  161. throw new ClientException(
  162. "Cannot invoke 'execute' or 'exec' inside an active client transaction block"
  163. );
  164. }
  165. if ($block) {
  166. if (!is_callable($block)) {
  167. throw new \InvalidArgumentException(
  168. 'Argument passed must be a callable object'
  169. );
  170. }
  171. if (count($this->_commands) > 0) {
  172. $this->discard();
  173. throw new ClientException(
  174. 'Cannot execute a transaction block after using fluent interface'
  175. );
  176. }
  177. }
  178. if (isset($this->_options['retry']) && !isset($block)) {
  179. $this->discard();
  180. throw new \InvalidArgumentException(
  181. 'Automatic retries can be used only when a transaction block is provided'
  182. );
  183. }
  184. }
  185. public function execute($block = null)
  186. {
  187. $this->checkBeforeExecution($block);
  188. $reply = null;
  189. $returnValues = array();
  190. $attemptsLeft = isset($this->_options['retry']) ? (int)$this->_options['retry'] : 0;
  191. do {
  192. if ($block !== null) {
  193. $this->executeTransactionBlock($block);
  194. }
  195. if (count($this->_commands) === 0) {
  196. if ($this->checkState(self::STATE_WATCH)) {
  197. $this->discard();
  198. }
  199. return;
  200. }
  201. $reply = $this->_client->exec();
  202. if ($reply === null) {
  203. if ($attemptsLeft === 0) {
  204. $message = 'The current transaction has been aborted by the server';
  205. throw new AbortedMultiExecException($this, $message);
  206. }
  207. $this->reset();
  208. if (isset($this->_options['on_retry']) && is_callable($this->_options['on_retry'])) {
  209. call_user_func($this->_options['on_retry'], $this, $attemptsLeft);
  210. }
  211. continue;
  212. }
  213. break;
  214. } while ($attemptsLeft-- > 0);
  215. $execReply = $reply instanceof \Iterator ? iterator_to_array($reply) : $reply;
  216. $sizeofReplies = count($execReply);
  217. $commands = $this->_commands;
  218. if ($sizeofReplies !== count($commands)) {
  219. $this->onProtocolError("EXEC returned an unexpected number of replies");
  220. }
  221. for ($i = 0; $i < $sizeofReplies; $i++) {
  222. $commandReply = $execReply[$i];
  223. if ($commandReply instanceof \Iterator) {
  224. $commandReply = iterator_to_array($commandReply);
  225. }
  226. $returnValues[$i] = $commands[$i]->parseResponse($commandReply);
  227. unset($commands[$i]);
  228. }
  229. return $returnValues;
  230. }
  231. protected function executeTransactionBlock($block)
  232. {
  233. $blockException = null;
  234. $this->flagState(self::STATE_INSIDEBLOCK);
  235. try {
  236. $block($this);
  237. }
  238. catch (CommunicationException $exception) {
  239. $blockException = $exception;
  240. }
  241. catch (ServerException $exception) {
  242. $blockException = $exception;
  243. }
  244. catch (\Exception $exception) {
  245. $blockException = $exception;
  246. $this->discard();
  247. }
  248. $this->unflagState(self::STATE_INSIDEBLOCK);
  249. if ($blockException !== null) {
  250. throw $blockException;
  251. }
  252. }
  253. private function onProtocolError($message)
  254. {
  255. // Since a MULTI/EXEC block cannot be initialized over a clustered
  256. // connection, we can safely assume that Predis\Client::getConnection()
  257. // will always return an instance of Predis\Network\IConnectionSingle.
  258. Helpers::onCommunicationException(new ProtocolException(
  259. $this->_client->getConnection(), $message
  260. ));
  261. }
  262. }