DispatcherLoop.php 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. <?php
  2. /*
  3. * This file is part of the Predis package.
  4. *
  5. * (c) Daniele Alessandri <suppakilla@gmail.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Predis;
  11. class DispatcherLoop
  12. {
  13. private $_client;
  14. private $_pubSubContext;
  15. private $_callbacks;
  16. private $_defaultCallback;
  17. private $_subscriptionCallback;
  18. public function __construct(Client $client)
  19. {
  20. $this->_callbacks = array();
  21. $this->_client = $client;
  22. $this->_pubSubContext = $client->pubSub();
  23. }
  24. protected function validateCallback($callback)
  25. {
  26. if (!is_callable($callback)) {
  27. throw new ClientException(
  28. "The callback parameter must be a valid callable object"
  29. );
  30. }
  31. }
  32. public function getPubSubContext()
  33. {
  34. return $this->_pubSubContext;
  35. }
  36. public function subscriptionCallback($callback = null)
  37. {
  38. if (isset($callback)) {
  39. $this->validateCallback($callback);
  40. }
  41. $this->_subscriptionCallback = $callback;
  42. }
  43. public function defaultCallback($callback = null)
  44. {
  45. if (isset($callback)) {
  46. $this->validateCallback($callback);
  47. }
  48. $this->_subscriptionCallback = $callback;
  49. }
  50. public function attachCallback($channel, $callback)
  51. {
  52. $this->validateCallback($callback);
  53. $this->_callbacks[$channel] = $callback;
  54. $this->_pubSubContext->subscribe($channel);
  55. }
  56. public function detachCallback($channel)
  57. {
  58. if (isset($this->_callbacks[$channel])) {
  59. unset($this->_callbacks[$channel]);
  60. $this->_pubSubContext->unsubscribe($channel);
  61. }
  62. }
  63. public function run()
  64. {
  65. foreach ($this->_pubSubContext as $message) {
  66. $kind = $message->kind;
  67. if ($kind !== PubSubContext::MESSAGE && $kind !== PubSubContext::PMESSAGE) {
  68. if (isset($this->_subscriptionCallback)) {
  69. $callback = $this->_subscriptionCallback;
  70. $callback($message);
  71. }
  72. continue;
  73. }
  74. if (isset($this->_callbacks[$message->channel])) {
  75. $callback = $this->_callbacks[$message->channel];
  76. $callback($message->payload);
  77. }
  78. else if (isset($this->_defaultCallback)) {
  79. $callback = $this->_defaultCallback;
  80. $callback($message);
  81. }
  82. }
  83. }
  84. public function stop()
  85. {
  86. $this->_pubSubContext->closeContext();
  87. }
  88. }