WaitContext.php 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. <?php
  2. /*
  3. * This file is part of the Predis package.
  4. *
  5. * (c) Daniele Alessandri <suppakilla@gmail.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Predis\Replication;
  11. use Predis\ClientContextInterface;
  12. use Predis\ClientException;
  13. use Predis\ClientInterface;
  14. use Predis\Command\CommandInterface;
  15. use Predis\Command\RawCommand;
  16. use Predis\Connection\ConnectionInterface;
  17. use Predis\Connection\Cluster\ClusterInterface;
  18. use Predis\Connection\Replication\ReplicationInterface;
  19. /**
  20. * Abstraction for the WAIT command.
  21. *
  22. * This can be used with a client connected to a single node or configured with
  23. * both cluster and replication backends.
  24. *
  25. * @see http://redis.io/commands/wait
  26. *
  27. * @author Daniele Alessandri <suppakilla@gmail.com>
  28. */
  29. class WaitContext
  30. {
  31. /**
  32. * @var ClientInterface
  33. */
  34. private $client;
  35. /**
  36. * @var CommandInterface[]
  37. */
  38. private $commands = array();
  39. /**
  40. * @param ClientInterface $client Client instance.
  41. */
  42. public function __construct(ClientInterface $client)
  43. {
  44. $this->client = $client;
  45. }
  46. /**
  47. * {@inheritdoc}
  48. */
  49. public function __call($commandID, $arguments)
  50. {
  51. $command = $this->client->createCommand($commandID, $arguments);
  52. $response = $this->executeCommand($command);
  53. return $response;
  54. }
  55. /**
  56. * {@inheritdoc}
  57. */
  58. public function executeCommand(CommandInterface $command)
  59. {
  60. $response = $this->client->executeCommand($command);
  61. $this->commands[] = $command;
  62. return $response;
  63. }
  64. /**
  65. * Executes the WAIT command against a connection.
  66. *
  67. * @param ConnectionInterface $connection [description]
  68. * @param int $numslaves Minimum number of slaves for acknowledgment.
  69. * @param int $timeout Timeout in milliseconds for acknowledgment.
  70. *
  71. * @return int
  72. */
  73. private function executeWaitCommand(ConnectionInterface $connection, $numslaves, $timeout)
  74. {
  75. $response = $connection->executeCommand(
  76. RawCommand::create('WAIT', $numslaves, $timeout)
  77. );
  78. return $response;
  79. }
  80. /**
  81. * Returns the connection for WAIT from a replication backend.
  82. *
  83. * @param ReplicationInterface $replication Replication connection backend.
  84. *
  85. * @return ConnectionInterface
  86. */
  87. private function getConnectionFromReplication(ReplicationInterface $replication)
  88. {
  89. return $replication->getCurrent();
  90. }
  91. /**
  92. * Returns the connection for WAIT from a cluster backend.
  93. *
  94. * @param ClusterInterface $cluster Cluster connection backend.
  95. *
  96. * @return ConnectionInterface
  97. */
  98. private function getConnectionFromCluster(ClusterInterface $cluster)
  99. {
  100. $command = reset($this->commands);
  101. $slot = $command->getSlot();
  102. foreach ($this->commands as $command) {
  103. if ($slot !== $command->getSlot()) {
  104. throw new ClientException('Cross-slot operations are not allowed');
  105. }
  106. }
  107. // TODO: we should actually fetch the connection by slot but this is not
  108. // currently supported by all our cluster backends.
  109. $connection = $cluster->getConnection($command);
  110. return $connection;
  111. }
  112. /**
  113. * Executes the WAIT command and returns the status of acknowledgment.
  114. *
  115. * When the client is operating in replication mode WAIT is executed against
  116. * the connection currently in use by the underlying backend. On the other
  117. * hand when it is operating in cluster mode WAIT is executed against only
  118. * one connection as cross-slot operations are not allowed.
  119. *
  120. * @param int $numslaves Minimum number of slaves for acknowledgment.
  121. * @param int $timeout Timeout in milliseconds for acknowledgment.
  122. * @param int &$slaves Set with the number of slaves that acknowledged the writes.
  123. *
  124. * @return bool
  125. */
  126. public function wait($numslaves, $timeout, &$slaves = 0)
  127. {
  128. if (!$this->commands) {
  129. return false;
  130. }
  131. $connection = $this->client->getConnection();
  132. if ($connection instanceof ReplicationInterface) {
  133. $connection = $this->getConnectionFromReplication($connection);
  134. } elseif ($connection instanceof ClusterInterface) {
  135. $connection = $this->getConnectionFromCluster($connection);
  136. }
  137. $this->commands = array();
  138. $slaves = $this->executeWaitCommand($connection, $numslaves, $timeout);
  139. $acknowledged = $slaves >= $numslaves;
  140. return $acknowledged;
  141. }
  142. /**
  143. * Returns the underlying client instance.
  144. *
  145. * @return ClientInterface
  146. */
  147. public function getClient()
  148. {
  149. return $this->client;
  150. }
  151. }