DispatcherLoop.php 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. <?php
  2. /*
  3. * This file is part of the Predis package.
  4. *
  5. * (c) Daniele Alessandri <suppakilla@gmail.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Predis\PubSub;
  11. use Predis\Client;
  12. use Predis\ClientException;
  13. /**
  14. * Method-dispatcher loop built around the client-side abstraction of a Redis
  15. * Publish / Subscribe context.
  16. *
  17. * @author Daniele Alessandri <suppakilla@gmail.com>
  18. */
  19. class DispatcherLoop
  20. {
  21. private $client;
  22. private $pubSubContext;
  23. private $callbacks;
  24. private $defaultCallback;
  25. private $subscriptionCallback;
  26. /**
  27. * @param Client Client instance used by the context.
  28. */
  29. public function __construct(Client $client)
  30. {
  31. $this->callbacks = array();
  32. $this->client = $client;
  33. $this->pubSubContext = $client->pubSub();
  34. }
  35. /**
  36. * Checks if the passed argument is a valid callback.
  37. *
  38. * @param mixed A callback.
  39. */
  40. protected function validateCallback($callable)
  41. {
  42. if (!is_callable($callable)) {
  43. throw new ClientException("A valid callable object must be provided");
  44. }
  45. }
  46. /**
  47. * Returns the underlying Publish / Subscribe context.
  48. *
  49. * @return PubSubContext
  50. */
  51. public function getPubSubContext()
  52. {
  53. return $this->pubSubContext;
  54. }
  55. /**
  56. * Sets a callback that gets invoked upon new subscriptions.
  57. *
  58. * @param mixed $callable A callback.
  59. */
  60. public function subscriptionCallback($callable = null)
  61. {
  62. if (isset($callable)) {
  63. $this->validateCallback($callable);
  64. }
  65. $this->subscriptionCallback = $callable;
  66. }
  67. /**
  68. * Sets a callback that gets invoked when a message is received on a
  69. * channel that does not have an associated callback.
  70. *
  71. * @param mixed $callable A callback.
  72. */
  73. public function defaultCallback($callable = null)
  74. {
  75. if (isset($callable)) {
  76. $this->validateCallback($callable);
  77. }
  78. $this->subscriptionCallback = $callable;
  79. }
  80. /**
  81. * Binds a callback to a channel.
  82. *
  83. * @param string $channel Channel name.
  84. * @param Callable $callback A callback.
  85. */
  86. public function attachCallback($channel, $callback)
  87. {
  88. $this->validateCallback($callback);
  89. $this->callbacks[$channel] = $callback;
  90. $this->pubSubContext->subscribe($channel);
  91. }
  92. /**
  93. * Stops listening to a channel and removes the associated callback.
  94. *
  95. * @param string $channel Redis channel.
  96. */
  97. public function detachCallback($channel)
  98. {
  99. if (isset($this->callbacks[$channel])) {
  100. unset($this->callbacks[$channel]);
  101. $this->pubSubContext->unsubscribe($channel);
  102. }
  103. }
  104. /**
  105. * Starts the dispatcher loop.
  106. */
  107. public function run()
  108. {
  109. foreach ($this->pubSubContext as $message) {
  110. $kind = $message->kind;
  111. if ($kind !== PubSubContext::MESSAGE && $kind !== PubSubContext::PMESSAGE) {
  112. if (isset($this->subscriptionCallback)) {
  113. $callback = $this->subscriptionCallback;
  114. $callback($message);
  115. }
  116. continue;
  117. }
  118. if (isset($this->callbacks[$message->channel])) {
  119. $callback = $this->callbacks[$message->channel];
  120. $callback($message->payload);
  121. }
  122. else if (isset($this->defaultCallback)) {
  123. $callback = $this->defaultCallback;
  124. $callback($message);
  125. }
  126. }
  127. }
  128. /**
  129. * Terminates the dispatcher loop.
  130. */
  131. public function stop()
  132. {
  133. $this->pubSubContext->closeContext();
  134. }
  135. }