DispatcherLoop.php 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. <?php
  2. /*
  3. * This file is part of the Predis package.
  4. *
  5. * (c) Daniele Alessandri <suppakilla@gmail.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Predis\PubSub;
  11. use Predis\ClientInterface;
  12. /**
  13. * Method-dispatcher loop built around the client-side abstraction of a Redis
  14. * Publish / Subscribe context.
  15. *
  16. * @author Daniele Alessandri <suppakilla@gmail.com>
  17. */
  18. class DispatcherLoop
  19. {
  20. private $client;
  21. private $pubSubContext;
  22. private $callbacks;
  23. private $defaultCallback;
  24. private $subscriptionCallback;
  25. /**
  26. * @param ClientInterface Client instance used by the context.
  27. */
  28. public function __construct(ClientInterface $client)
  29. {
  30. $this->callbacks = array();
  31. $this->client = $client;
  32. $this->pubSubContext = $client->pubSub();
  33. }
  34. /**
  35. * Checks if the passed argument is a valid callback.
  36. *
  37. * @param mixed A callback.
  38. */
  39. protected function validateCallback($callable)
  40. {
  41. if (!is_callable($callable)) {
  42. throw new \InvalidArgumentException("A valid callable object must be provided");
  43. }
  44. }
  45. /**
  46. * Returns the underlying Publish / Subscribe context.
  47. *
  48. * @return PubSubContext
  49. */
  50. public function getPubSubContext()
  51. {
  52. return $this->pubSubContext;
  53. }
  54. /**
  55. * Sets a callback that gets invoked upon new subscriptions.
  56. *
  57. * @param mixed $callable A callback.
  58. */
  59. public function subscriptionCallback($callable = null)
  60. {
  61. if (isset($callable)) {
  62. $this->validateCallback($callable);
  63. }
  64. $this->subscriptionCallback = $callable;
  65. }
  66. /**
  67. * Sets a callback that gets invoked when a message is received on a
  68. * channel that does not have an associated callback.
  69. *
  70. * @param mixed $callable A callback.
  71. */
  72. public function defaultCallback($callable = null)
  73. {
  74. if (isset($callable)) {
  75. $this->validateCallback($callable);
  76. }
  77. $this->subscriptionCallback = $callable;
  78. }
  79. /**
  80. * Binds a callback to a channel.
  81. *
  82. * @param string $channel Channel name.
  83. * @param Callable $callback A callback.
  84. */
  85. public function attachCallback($channel, $callback)
  86. {
  87. $this->validateCallback($callback);
  88. $this->callbacks[$channel] = $callback;
  89. $this->pubSubContext->subscribe($channel);
  90. }
  91. /**
  92. * Stops listening to a channel and removes the associated callback.
  93. *
  94. * @param string $channel Redis channel.
  95. */
  96. public function detachCallback($channel)
  97. {
  98. if (isset($this->callbacks[$channel])) {
  99. unset($this->callbacks[$channel]);
  100. $this->pubSubContext->unsubscribe($channel);
  101. }
  102. }
  103. /**
  104. * Starts the dispatcher loop.
  105. */
  106. public function run()
  107. {
  108. foreach ($this->pubSubContext as $message) {
  109. $kind = $message->kind;
  110. if ($kind !== PubSubContext::MESSAGE && $kind !== PubSubContext::PMESSAGE) {
  111. if (isset($this->subscriptionCallback)) {
  112. $callback = $this->subscriptionCallback;
  113. call_user_func($callback, $message);
  114. }
  115. continue;
  116. }
  117. if (isset($this->callbacks[$message->channel])) {
  118. $callback = $this->callbacks[$message->channel];
  119. call_user_func($callback, $message->payload);
  120. } else if (isset($this->defaultCallback)) {
  121. $callback = $this->defaultCallback;
  122. call_user_func($callback, $message);
  123. }
  124. }
  125. }
  126. /**
  127. * Terminates the dispatcher loop.
  128. */
  129. public function stop()
  130. {
  131. $this->pubSubContext->closeContext();
  132. }
  133. }