DispatcherLoop.php 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. <?php
  2. /*
  3. * This file is part of the Predis package.
  4. *
  5. * (c) Daniele Alessandri <suppakilla@gmail.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Predis;
  11. /**
  12. * Method-dispatcher loop built around the client-side abstraction of a Redis
  13. * Publish / Subscribe context.
  14. *
  15. * @author Daniele Alessandri <suppakilla@gmail.com>
  16. */
  17. class DispatcherLoop
  18. {
  19. private $client;
  20. private $pubSubContext;
  21. private $callbacks;
  22. private $defaultCallback;
  23. private $subscriptionCallback;
  24. /**
  25. * @param Client Client instance used by the context.
  26. */
  27. public function __construct(Client $client)
  28. {
  29. $this->callbacks = array();
  30. $this->client = $client;
  31. $this->pubSubContext = $client->pubSub();
  32. }
  33. /**
  34. * Checks if the passed argument is a valid callback.
  35. *
  36. * @param mixed A callback.
  37. */
  38. protected function validateCallback($callable)
  39. {
  40. if (!is_callable($callable)) {
  41. throw new ClientException("A valid callable object must be provided");
  42. }
  43. }
  44. /**
  45. * Returns the underlying Publish / Subscribe context.
  46. *
  47. * @return PubSubContext
  48. */
  49. public function getPubSubContext()
  50. {
  51. return $this->pubSubContext;
  52. }
  53. /**
  54. * Sets a callback that gets invoked upon new subscriptions.
  55. *
  56. * @param mixed $callable A callback.
  57. */
  58. public function subscriptionCallback($callable = null)
  59. {
  60. if (isset($callable)) {
  61. $this->validateCallback($callable);
  62. }
  63. $this->subscriptionCallback = $callable;
  64. }
  65. /**
  66. * Sets a callback that gets invoked when a message is received on a
  67. * channel that does not have an associated callback.
  68. *
  69. * @param mixed $callable A callback.
  70. */
  71. public function defaultCallback($callable = null)
  72. {
  73. if (isset($callable)) {
  74. $this->validateCallback($callable);
  75. }
  76. $this->subscriptionCallback = $callable;
  77. }
  78. /**
  79. * Binds a callback to a channel.
  80. *
  81. * @param string $channel Channel name.
  82. * @param Callable $callback A callback.
  83. */
  84. public function attachCallback($channel, $callback)
  85. {
  86. $this->validateCallback($callback);
  87. $this->callbacks[$channel] = $callback;
  88. $this->pubSubContext->subscribe($channel);
  89. }
  90. /**
  91. * Stops listening to a channel and removes the associated callback.
  92. *
  93. * @param string $channel Redis channel.
  94. */
  95. public function detachCallback($channel)
  96. {
  97. if (isset($this->callbacks[$channel])) {
  98. unset($this->callbacks[$channel]);
  99. $this->pubSubContext->unsubscribe($channel);
  100. }
  101. }
  102. /**
  103. * Starts the dispatcher loop.
  104. */
  105. public function run()
  106. {
  107. foreach ($this->pubSubContext as $message) {
  108. $kind = $message->kind;
  109. if ($kind !== PubSubContext::MESSAGE && $kind !== PubSubContext::PMESSAGE) {
  110. if (isset($this->subscriptionCallback)) {
  111. $callback = $this->subscriptionCallback;
  112. $callback($message);
  113. }
  114. continue;
  115. }
  116. if (isset($this->callbacks[$message->channel])) {
  117. $callback = $this->callbacks[$message->channel];
  118. $callback($message->payload);
  119. }
  120. else if (isset($this->defaultCallback)) {
  121. $callback = $this->defaultCallback;
  122. $callback($message);
  123. }
  124. }
  125. }
  126. /**
  127. * Terminates the dispatcher loop.
  128. */
  129. public function stop()
  130. {
  131. $this->pubSubContext->closeContext();
  132. }
  133. }