DispatcherLoop.php 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. <?php
  2. namespace Predis;
  3. class DispatcherLoop
  4. {
  5. private $_client;
  6. private $_pubSubContext;
  7. private $_callbacks;
  8. private $_defaultCallback;
  9. private $_subscriptionCallback;
  10. public function __construct(Client $client)
  11. {
  12. $this->_callbacks = array();
  13. $this->_client = $client;
  14. $this->_pubSubContext = $client->pubSub();
  15. }
  16. protected function validateCallback($callback)
  17. {
  18. if (!is_callable($callback)) {
  19. throw new ClientException(
  20. "The callback parameter must be a valid callable object"
  21. );
  22. }
  23. }
  24. public function getPubSubContext()
  25. {
  26. return $this->_pubSubContext;
  27. }
  28. public function subscriptionCallback($callback = null)
  29. {
  30. if (isset($callback)) {
  31. $this->validateCallback($callback);
  32. }
  33. $this->_subscriptionCallback = $callback;
  34. }
  35. public function defaultCallback($callback = null)
  36. {
  37. if (isset($callback)) {
  38. $this->validateCallback($callback);
  39. }
  40. $this->_subscriptionCallback = $callback;
  41. }
  42. public function attachCallback($channel, $callback)
  43. {
  44. $this->validateCallback($callback);
  45. $this->_callbacks[$channel] = $callback;
  46. $this->_pubSubContext->subscribe($channel);
  47. }
  48. public function detachCallback($channel)
  49. {
  50. if (isset($this->_callbacks[$channel])) {
  51. unset($this->_callbacks[$channel]);
  52. $this->_pubSubContext->unsubscribe($channel);
  53. }
  54. }
  55. public function run()
  56. {
  57. foreach ($this->_pubSubContext as $message) {
  58. $kind = $message->kind;
  59. if ($kind !== PubSubContext::MESSAGE && $kind !== PubSubContext::PMESSAGE) {
  60. if (isset($this->_subscriptionCallback)) {
  61. $callback = $this->_subscriptionCallback;
  62. $callback($message);
  63. }
  64. continue;
  65. }
  66. if (isset($this->_callbacks[$message->channel])) {
  67. $callback = $this->_callbacks[$message->channel];
  68. $callback($message->payload);
  69. }
  70. else if (isset($this->_defaultCallback)) {
  71. $callback = $this->_defaultCallback;
  72. $callback($message);
  73. }
  74. }
  75. }
  76. public function stop()
  77. {
  78. $this->_pubSubContext->closeContext();
  79. }
  80. }