DispatcherLoop.php 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. <?php
  2. /*
  3. * This file is part of the Predis package.
  4. *
  5. * (c) Daniele Alessandri <suppakilla@gmail.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Predis\PubSub;
  11. use Predis\ClientInterface;
  12. /**
  13. * Method-dispatcher loop built around the client-side abstraction of a Redis
  14. * Publish / Subscribe context.
  15. *
  16. * @author Daniele Alessandri <suppakilla@gmail.com>
  17. */
  18. class DispatcherLoop
  19. {
  20. private $pubSubContext;
  21. protected $callbacks;
  22. protected $defaultCallback;
  23. protected $subscriptionCallback;
  24. /**
  25. * @param ClientInterface $client Client instance used by the context.
  26. */
  27. public function __construct(ClientInterface $client)
  28. {
  29. $this->callbacks = array();
  30. $this->pubSubContext = $client->pubSubLoop();
  31. }
  32. /**
  33. * Checks if the passed argument is a valid callback.
  34. *
  35. * @param mixed $callable A callback.
  36. */
  37. protected function validateCallback($callable)
  38. {
  39. if (!is_callable($callable)) {
  40. throw new \InvalidArgumentException("A valid callable object must be provided");
  41. }
  42. }
  43. /**
  44. * Returns the underlying Publish / Subscribe context.
  45. *
  46. * @return PubSubContext
  47. */
  48. public function getPubSubContext()
  49. {
  50. return $this->pubSubContext;
  51. }
  52. /**
  53. * Sets a callback that gets invoked upon new subscriptions.
  54. *
  55. * @param mixed $callable A callback.
  56. */
  57. public function subscriptionCallback($callable = null)
  58. {
  59. if (isset($callable)) {
  60. $this->validateCallback($callable);
  61. }
  62. $this->subscriptionCallback = $callable;
  63. }
  64. /**
  65. * Sets a callback that gets invoked when a message is received on a
  66. * channel that does not have an associated callback.
  67. *
  68. * @param mixed $callable A callback.
  69. */
  70. public function defaultCallback($callable = null)
  71. {
  72. if (isset($callable)) {
  73. $this->validateCallback($callable);
  74. }
  75. $this->subscriptionCallback = $callable;
  76. }
  77. /**
  78. * Binds a callback to a channel.
  79. *
  80. * @param string $channel Channel name.
  81. * @param Callable $callback A callback.
  82. */
  83. public function attachCallback($channel, $callback)
  84. {
  85. $callbackName = $this->getPrefixKeys() . $channel;
  86. $this->validateCallback($callback);
  87. $this->callbacks[$callbackName] = $callback;
  88. $this->pubSubContext->subscribe($channel);
  89. }
  90. /**
  91. * Stops listening to a channel and removes the associated callback.
  92. *
  93. * @param string $channel Redis channel.
  94. */
  95. public function detachCallback($channel)
  96. {
  97. $callbackName = $this->getPrefixKeys() . $channel;
  98. if (isset($this->callbacks[$callbackName])) {
  99. unset($this->callbacks[$callbackName]);
  100. $this->pubSubContext->unsubscribe($channel);
  101. }
  102. }
  103. /**
  104. * Starts the dispatcher loop.
  105. */
  106. public function run()
  107. {
  108. foreach ($this->pubSubContext as $message) {
  109. $kind = $message->kind;
  110. if ($kind !== PubSubContext::MESSAGE && $kind !== PubSubContext::PMESSAGE) {
  111. if (isset($this->subscriptionCallback)) {
  112. $callback = $this->subscriptionCallback;
  113. call_user_func($callback, $message);
  114. }
  115. continue;
  116. }
  117. if (isset($this->callbacks[$message->channel])) {
  118. $callback = $this->callbacks[$message->channel];
  119. call_user_func($callback, $message->payload);
  120. } elseif (isset($this->defaultCallback)) {
  121. $callback = $this->defaultCallback;
  122. call_user_func($callback, $message);
  123. }
  124. }
  125. }
  126. /**
  127. * Terminates the dispatcher loop.
  128. */
  129. public function stop()
  130. {
  131. $this->pubSubContext->closeContext();
  132. }
  133. /**
  134. * Return the prefix of the keys
  135. *
  136. * @return string
  137. */
  138. protected function getPrefixKeys()
  139. {
  140. $options = $this->pubSubContext->getClient()->getOptions();
  141. if (isset($options->prefix)) {
  142. return $options->prefix->getPrefix();
  143. }
  144. return '';
  145. }
  146. }