Consumer.php 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. <?php
  2. /*
  3. * This file is part of the Predis package.
  4. *
  5. * (c) Daniele Alessandri <suppakilla@gmail.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Predis\Monitor;
  11. use Iterator;
  12. use Predis\ClientInterface;
  13. use Predis\NotSupportedException;
  14. use Predis\Connection\AggregateConnectionInterface;
  15. /**
  16. * Redis MONITOR consumer.
  17. *
  18. * @author Daniele Alessandri <suppakilla@gmail.com>
  19. */
  20. class Consumer implements Iterator
  21. {
  22. private $client;
  23. private $valid;
  24. private $position;
  25. /**
  26. * @param ClientInterface $client Client instance used by the consumer.
  27. */
  28. public function __construct(ClientInterface $client)
  29. {
  30. $this->assertClient($client);
  31. $this->client = $client;
  32. $this->start();
  33. }
  34. /**
  35. * Automatically stops the consumer when the garbage collector kicks in.
  36. */
  37. public function __destruct()
  38. {
  39. $this->stop();
  40. }
  41. /**
  42. * Checks if the passed client instance satisfies the required conditions
  43. * needed to initialize a monitor consumer.
  44. *
  45. * @param ClientInterface $client Client instance used by the consumer.
  46. */
  47. private function assertClient(ClientInterface $client)
  48. {
  49. if ($client->getConnection() instanceof AggregateConnectionInterface) {
  50. throw new NotSupportedException(
  51. 'Cannot initialize a monitor consumer when using aggregate connections'
  52. );
  53. }
  54. if ($client->getProfile()->supportsCommand('monitor') === false) {
  55. throw new NotSupportedException(
  56. 'The current profile does not support the MONITOR command'
  57. );
  58. }
  59. }
  60. /**
  61. * Initializes the consumer and sends the MONITOR command to the server.
  62. */
  63. protected function start()
  64. {
  65. $this->valid = true;
  66. $monitor = $this->client->createCommand('monitor');
  67. $this->client->executeCommand($monitor);
  68. }
  69. /**
  70. * Stops the consumer. Internally this is done by disconnecting from server
  71. * since there is no way to terminate the stream initialized by MONITOR.
  72. */
  73. public function stop()
  74. {
  75. $this->client->disconnect();
  76. $this->valid = false;
  77. }
  78. /**
  79. * {@inheritdoc}
  80. */
  81. public function rewind()
  82. {
  83. // NOOP
  84. }
  85. /**
  86. * Returns the last message payload retrieved from the server.
  87. *
  88. * @return Object
  89. */
  90. public function current()
  91. {
  92. return $this->getValue();
  93. }
  94. /**
  95. * {@inheritdoc}
  96. */
  97. public function key()
  98. {
  99. return $this->position;
  100. }
  101. /**
  102. * {@inheritdoc}
  103. */
  104. public function next()
  105. {
  106. $this->position++;
  107. }
  108. /**
  109. * Checks if the the consumer is still in a valid state to continue.
  110. *
  111. * @return Boolean
  112. */
  113. public function valid()
  114. {
  115. return $this->valid;
  116. }
  117. /**
  118. * Waits for a new message from the server generated by MONITOR and returns
  119. * it when available.
  120. *
  121. * @return Object
  122. */
  123. private function getValue()
  124. {
  125. $database = 0;
  126. $client = null;
  127. $event = $this->client->getConnection()->read();
  128. $callback = function ($matches) use (&$database, &$client) {
  129. if (2 === $count = count($matches)) {
  130. // Redis <= 2.4
  131. $database = (int) $matches[1];
  132. }
  133. if (4 === $count) {
  134. // Redis >= 2.6
  135. $database = (int) $matches[2];
  136. $client = $matches[3];
  137. }
  138. return ' ';
  139. };
  140. $event = preg_replace_callback('/ \(db (\d+)\) | \[(\d+) (.*?)\] /', $callback, $event, 1);
  141. @list($timestamp, $command, $arguments) = explode(' ', $event, 3);
  142. return (object) array(
  143. 'timestamp' => (float) $timestamp,
  144. 'database' => $database,
  145. 'client' => $client,
  146. 'command' => substr($command, 1, -1),
  147. 'arguments' => $arguments,
  148. );
  149. }
  150. }