Consumer.php 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. <?php
  2. /*
  3. * This file is part of the Predis package.
  4. *
  5. * (c) Daniele Alessandri <suppakilla@gmail.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Predis\Monitor;
  11. use Iterator;
  12. use Predis\ClientInterface;
  13. use Predis\NotSupportedException;
  14. use Predis\Connection\AggregateConnectionInterface;
  15. /**
  16. * Redis MONITOR consumer.
  17. *
  18. * @author Daniele Alessandri <suppakilla@gmail.com>
  19. */
  20. class Consumer implements Iterator
  21. {
  22. private $client;
  23. private $valid;
  24. private $position;
  25. /**
  26. * @param ClientInterface $client Client instance used by the consumer.
  27. */
  28. public function __construct(ClientInterface $client)
  29. {
  30. $this->assertClient($client);
  31. $this->client = $client;
  32. $this->start();
  33. }
  34. /**
  35. * Automatically stops the consumer when the garbage collector kicks in.
  36. */
  37. public function __destruct()
  38. {
  39. $this->stop();
  40. }
  41. /**
  42. * Checks if the passed client instance satisfies the required conditions
  43. * needed to initialize a monitor consumer.
  44. *
  45. * @param ClientInterface $client Client instance used by the consumer.
  46. */
  47. private function assertClient(ClientInterface $client)
  48. {
  49. if ($client->getConnection() instanceof AggregateConnectionInterface) {
  50. throw new NotSupportedException(
  51. 'Cannot initialize a monitor consumer over aggregate connections.'
  52. );
  53. }
  54. if ($client->getProfile()->supportsCommand('monitor') === false) {
  55. throw new NotSupportedException("The current profile does not support 'MONITOR'.");
  56. }
  57. }
  58. /**
  59. * Initializes the consumer and sends the MONITOR command to the server.
  60. */
  61. protected function start()
  62. {
  63. $this->valid = true;
  64. $monitor = $this->client->createCommand('monitor');
  65. $this->client->executeCommand($monitor);
  66. }
  67. /**
  68. * Stops the consumer. Internally this is done by disconnecting from server
  69. * since there is no way to terminate the stream initialized by MONITOR.
  70. */
  71. public function stop()
  72. {
  73. $this->client->disconnect();
  74. $this->valid = false;
  75. }
  76. /**
  77. * {@inheritdoc}
  78. */
  79. public function rewind()
  80. {
  81. // NOOP
  82. }
  83. /**
  84. * Returns the last message payload retrieved from the server.
  85. *
  86. * @return Object
  87. */
  88. public function current()
  89. {
  90. return $this->getValue();
  91. }
  92. /**
  93. * {@inheritdoc}
  94. */
  95. public function key()
  96. {
  97. return $this->position;
  98. }
  99. /**
  100. * {@inheritdoc}
  101. */
  102. public function next()
  103. {
  104. $this->position++;
  105. }
  106. /**
  107. * Checks if the the consumer is still in a valid state to continue.
  108. *
  109. * @return Boolean
  110. */
  111. public function valid()
  112. {
  113. return $this->valid;
  114. }
  115. /**
  116. * Waits for a new message from the server generated by MONITOR and returns
  117. * it when available.
  118. *
  119. * @return Object
  120. */
  121. private function getValue()
  122. {
  123. $database = 0;
  124. $client = null;
  125. $event = $this->client->getConnection()->read();
  126. $callback = function ($matches) use (&$database, &$client) {
  127. if (2 === $count = count($matches)) {
  128. // Redis <= 2.4
  129. $database = (int) $matches[1];
  130. }
  131. if (4 === $count) {
  132. // Redis >= 2.6
  133. $database = (int) $matches[2];
  134. $client = $matches[3];
  135. }
  136. return ' ';
  137. };
  138. $event = preg_replace_callback('/ \(db (\d+)\) | \[(\d+) (.*?)\] /', $callback, $event, 1);
  139. @list($timestamp, $command, $arguments) = explode(' ', $event, 3);
  140. return (object) array(
  141. 'timestamp' => (float) $timestamp,
  142. 'database' => $database,
  143. 'client' => $client,
  144. 'command' => substr($command, 1, -1),
  145. 'arguments' => $arguments,
  146. );
  147. }
  148. }