DispatcherLoop.php 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. <?php
  2. /*
  3. * This file is part of the Predis package.
  4. *
  5. * (c) Daniele Alessandri <suppakilla@gmail.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Predis\PubSub;
  11. use InvalidArgumentException;
  12. use Predis\ClientInterface;
  13. /**
  14. * Method-dispatcher loop built around the client-side abstraction of a Redis
  15. * PUB / SUB context.
  16. *
  17. * @author Daniele Alessandri <suppakilla@gmail.com>
  18. */
  19. class DispatcherLoop
  20. {
  21. private $pubsub;
  22. protected $callbacks;
  23. protected $defaultCallback;
  24. protected $subscriptionCallback;
  25. /**
  26. * @param ClientInterface $client Client instance used by the context.
  27. */
  28. public function __construct(ClientInterface $client)
  29. {
  30. $this->callbacks = array();
  31. $this->pubsub = $client->pubSubLoop();
  32. }
  33. /**
  34. * Checks if the passed argument is a valid callback.
  35. *
  36. * @param mixed $callable A callback.
  37. */
  38. protected function assertCallback($callable)
  39. {
  40. if (!is_callable($callable)) {
  41. throw new InvalidArgumentException('The given argument must be a callable object.');
  42. }
  43. }
  44. /**
  45. * Returns the underlying PUB / SUB context.
  46. *
  47. * @return Consumer
  48. */
  49. public function getPubSubConsumer()
  50. {
  51. return $this->pubsub;
  52. }
  53. /**
  54. * Sets a callback that gets invoked upon new subscriptions.
  55. *
  56. * @param mixed $callable A callback.
  57. */
  58. public function subscriptionCallback($callable = null)
  59. {
  60. if (isset($callable)) {
  61. $this->assertCallback($callable);
  62. }
  63. $this->subscriptionCallback = $callable;
  64. }
  65. /**
  66. * Sets a callback that gets invoked when a message is received on a
  67. * channel that does not have an associated callback.
  68. *
  69. * @param mixed $callable A callback.
  70. */
  71. public function defaultCallback($callable = null)
  72. {
  73. if (isset($callable)) {
  74. $this->assertCallback($callable);
  75. }
  76. $this->subscriptionCallback = $callable;
  77. }
  78. /**
  79. * Binds a callback to a channel.
  80. *
  81. * @param string $channel Channel name.
  82. * @param Callable $callback A callback.
  83. */
  84. public function attachCallback($channel, $callback)
  85. {
  86. $callbackName = $this->getPrefixKeys() . $channel;
  87. $this->assertCallback($callback);
  88. $this->callbacks[$callbackName] = $callback;
  89. $this->pubsub->subscribe($channel);
  90. }
  91. /**
  92. * Stops listening to a channel and removes the associated callback.
  93. *
  94. * @param string $channel Redis channel.
  95. */
  96. public function detachCallback($channel)
  97. {
  98. $callbackName = $this->getPrefixKeys() . $channel;
  99. if (isset($this->callbacks[$callbackName])) {
  100. unset($this->callbacks[$callbackName]);
  101. $this->pubsub->unsubscribe($channel);
  102. }
  103. }
  104. /**
  105. * Starts the dispatcher loop.
  106. */
  107. public function run()
  108. {
  109. foreach ($this->pubsub as $message) {
  110. $kind = $message->kind;
  111. if ($kind !== Consumer::MESSAGE && $kind !== Consumer::PMESSAGE) {
  112. if (isset($this->subscriptionCallback)) {
  113. $callback = $this->subscriptionCallback;
  114. call_user_func($callback, $message);
  115. }
  116. continue;
  117. }
  118. if (isset($this->callbacks[$message->channel])) {
  119. $callback = $this->callbacks[$message->channel];
  120. call_user_func($callback, $message->payload);
  121. } elseif (isset($this->defaultCallback)) {
  122. $callback = $this->defaultCallback;
  123. call_user_func($callback, $message);
  124. }
  125. }
  126. }
  127. /**
  128. * Terminates the dispatcher loop.
  129. */
  130. public function stop()
  131. {
  132. $this->pubsub->stop();
  133. }
  134. /**
  135. * Return the prefix used for keys
  136. *
  137. * @return string
  138. */
  139. protected function getPrefixKeys()
  140. {
  141. $options = $this->pubsub->getClient()->getOptions();
  142. if (isset($options->prefix)) {
  143. return $options->prefix->getPrefix();
  144. }
  145. return '';
  146. }
  147. }