MultiExecContext.php 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336
  1. <?php
  2. /*
  3. * This file is part of the Predis package.
  4. *
  5. * (c) Daniele Alessandri <suppakilla@gmail.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Predis\Transaction;
  11. use Predis\Client;
  12. use Predis\Helpers;
  13. use Predis\ResponseQueued;
  14. use Predis\ClientException;
  15. use Predis\ServerException;
  16. use Predis\CommunicationException;
  17. use Predis\Protocol\ProtocolException;
  18. class MultiExecContext
  19. {
  20. const STATE_RESET = 0x00000;
  21. const STATE_INITIALIZED = 0x00001;
  22. const STATE_INSIDEBLOCK = 0x00010;
  23. const STATE_DISCARDED = 0x00100;
  24. const STATE_CAS = 0x01000;
  25. const STATE_WATCH = 0x10000;
  26. private $_state;
  27. private $_canWatch;
  28. protected $_client;
  29. protected $_options;
  30. protected $_commands;
  31. public function __construct(Client $client, Array $options = null)
  32. {
  33. $this->checkCapabilities($client);
  34. $this->_options = $options ?: array();
  35. $this->_client = $client;
  36. $this->reset();
  37. }
  38. protected function setState($flags)
  39. {
  40. $this->_state = $flags;
  41. }
  42. protected function getState()
  43. {
  44. return $this->_state;
  45. }
  46. protected function flagState($flags)
  47. {
  48. $this->_state |= $flags;
  49. }
  50. protected function unflagState($flags)
  51. {
  52. $this->_state &= ~$flags;
  53. }
  54. protected function checkState($flags)
  55. {
  56. return ($this->_state & $flags) === $flags;
  57. }
  58. private function checkCapabilities(Client $client)
  59. {
  60. if (Helpers::isCluster($client->getConnection())) {
  61. throw new ClientException(
  62. 'Cannot initialize a MULTI/EXEC context over a cluster of connections'
  63. );
  64. }
  65. $profile = $client->getProfile();
  66. if ($profile->supportsCommands(array('multi', 'exec', 'discard')) === false) {
  67. throw new ClientException(
  68. 'The current profile does not support MULTI, EXEC and DISCARD'
  69. );
  70. }
  71. $this->_canWatch = $profile->supportsCommands(array('watch', 'unwatch'));
  72. }
  73. private function isWatchSupported()
  74. {
  75. if ($this->_canWatch === false) {
  76. throw new ClientException(
  77. 'The current profile does not support WATCH and UNWATCH'
  78. );
  79. }
  80. }
  81. protected function reset()
  82. {
  83. $this->setState(self::STATE_RESET);
  84. $this->_commands = array();
  85. }
  86. protected function initialize()
  87. {
  88. if ($this->checkState(self::STATE_INITIALIZED)) {
  89. return;
  90. }
  91. $options = $this->_options;
  92. if (isset($options['cas']) && $options['cas']) {
  93. $this->flagState(self::STATE_CAS);
  94. }
  95. if (isset($options['watch'])) {
  96. $this->watch($options['watch']);
  97. }
  98. $cas = $this->checkState(self::STATE_CAS);
  99. $discarded = $this->checkState(self::STATE_DISCARDED);
  100. if (!$cas || ($cas && $discarded)) {
  101. $this->_client->multi();
  102. if ($discarded) {
  103. $this->unflagState(self::STATE_CAS);
  104. }
  105. }
  106. $this->unflagState(self::STATE_DISCARDED);
  107. $this->flagState(self::STATE_INITIALIZED);
  108. }
  109. public function __call($method, $arguments)
  110. {
  111. $this->initialize();
  112. $client = $this->_client;
  113. if ($this->checkState(self::STATE_CAS)) {
  114. return call_user_func_array(array($client, $method), $arguments);
  115. }
  116. $command = $client->createCommand($method, $arguments);
  117. $response = $client->executeCommand($command);
  118. if (!$response instanceof ResponseQueued) {
  119. $this->onProtocolError('The server did not respond with a QUEUED status reply');
  120. }
  121. $this->_commands[] = $command;
  122. return $this;
  123. }
  124. public function watch($keys)
  125. {
  126. $this->isWatchSupported();
  127. if ($this->checkState(self::STATE_INITIALIZED) && !$this->checkState(self::STATE_CAS)) {
  128. throw new ClientException('WATCH after MULTI is not allowed');
  129. }
  130. $watchReply = $this->_client->watch($keys);
  131. $this->flagState(self::STATE_WATCH);
  132. return $watchReply;
  133. }
  134. public function multi()
  135. {
  136. if ($this->checkState(self::STATE_INITIALIZED | self::STATE_CAS)) {
  137. $this->unflagState(self::STATE_CAS);
  138. $this->_client->multi();
  139. }
  140. else {
  141. $this->initialize();
  142. }
  143. return $this;
  144. }
  145. public function unwatch()
  146. {
  147. $this->isWatchSupported();
  148. $this->unflagState(self::STATE_WATCH);
  149. $this->_client->unwatch();
  150. return $this;
  151. }
  152. public function discard()
  153. {
  154. if ($this->checkState(self::STATE_INITIALIZED)) {
  155. $command = $this->checkState(self::STATE_CAS) ? 'unwatch' : 'discard';
  156. $this->_client->$command();
  157. $this->reset();
  158. $this->flagState(self::STATE_DISCARDED);
  159. }
  160. return $this;
  161. }
  162. public function exec()
  163. {
  164. return $this->execute();
  165. }
  166. private function checkBeforeExecution($block)
  167. {
  168. if ($this->checkState(self::STATE_INSIDEBLOCK)) {
  169. throw new ClientException(
  170. "Cannot invoke 'execute' or 'exec' inside an active client transaction block"
  171. );
  172. }
  173. if ($block) {
  174. if (!is_callable($block)) {
  175. throw new \InvalidArgumentException(
  176. 'Argument passed must be a callable object'
  177. );
  178. }
  179. if (count($this->_commands) > 0) {
  180. $this->discard();
  181. throw new ClientException(
  182. 'Cannot execute a transaction block after using fluent interface'
  183. );
  184. }
  185. }
  186. if (isset($this->_options['retry']) && !isset($block)) {
  187. $this->discard();
  188. throw new \InvalidArgumentException(
  189. 'Automatic retries can be used only when a transaction block is provided'
  190. );
  191. }
  192. }
  193. public function execute($block = null)
  194. {
  195. $this->checkBeforeExecution($block);
  196. $reply = null;
  197. $returnValues = array();
  198. $attemptsLeft = isset($this->_options['retry']) ? (int)$this->_options['retry'] : 0;
  199. do {
  200. if ($block !== null) {
  201. $this->executeTransactionBlock($block);
  202. }
  203. if (count($this->_commands) === 0) {
  204. if ($this->checkState(self::STATE_WATCH)) {
  205. $this->discard();
  206. }
  207. return;
  208. }
  209. $reply = $this->_client->exec();
  210. if ($reply === null) {
  211. if ($attemptsLeft === 0) {
  212. $message = 'The current transaction has been aborted by the server';
  213. throw new AbortedMultiExecException($this, $message);
  214. }
  215. $this->reset();
  216. if (isset($this->_options['on_retry']) && is_callable($this->_options['on_retry'])) {
  217. call_user_func($this->_options['on_retry'], $this, $attemptsLeft);
  218. }
  219. continue;
  220. }
  221. break;
  222. } while ($attemptsLeft-- > 0);
  223. $execReply = $reply instanceof \Iterator ? iterator_to_array($reply) : $reply;
  224. $sizeofReplies = count($execReply);
  225. $commands = $this->_commands;
  226. if ($sizeofReplies !== count($commands)) {
  227. $this->onProtocolError("EXEC returned an unexpected number of replies");
  228. }
  229. for ($i = 0; $i < $sizeofReplies; $i++) {
  230. $commandReply = $execReply[$i];
  231. if ($commandReply instanceof \Iterator) {
  232. $commandReply = iterator_to_array($commandReply);
  233. }
  234. $returnValues[$i] = $commands[$i]->parseResponse($commandReply);
  235. unset($commands[$i]);
  236. }
  237. return $returnValues;
  238. }
  239. protected function executeTransactionBlock($block)
  240. {
  241. $blockException = null;
  242. $this->flagState(self::STATE_INSIDEBLOCK);
  243. try {
  244. $block($this);
  245. }
  246. catch (CommunicationException $exception) {
  247. $blockException = $exception;
  248. }
  249. catch (ServerException $exception) {
  250. $blockException = $exception;
  251. }
  252. catch (\Exception $exception) {
  253. $blockException = $exception;
  254. $this->discard();
  255. }
  256. $this->unflagState(self::STATE_INSIDEBLOCK);
  257. if ($blockException !== null) {
  258. throw $blockException;
  259. }
  260. }
  261. private function onProtocolError($message)
  262. {
  263. // Since a MULTI/EXEC block cannot be initialized over a clustered
  264. // connection, we can safely assume that Predis\Client::getConnection()
  265. // will always return an instance of Predis\Network\IConnectionSingle.
  266. Helpers::onCommunicationException(new ProtocolException(
  267. $this->_client->getConnection(), $message
  268. ));
  269. }
  270. }