DispatcherLoop.php 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. <?php
  2. namespace Predis;
  3. class DispatcherLoop {
  4. private $_client;
  5. private $_pubSubContext;
  6. private $_callbacks;
  7. private $_defaultCallback;
  8. private $_subscriptionCallback;
  9. public function __construct(Client $client) {
  10. $this->_callbacks = array();
  11. $this->_client = $client;
  12. $this->_pubSubContext = $client->pubSub();
  13. }
  14. protected function validateCallback($callback) {
  15. if (!is_callable($callback)) {
  16. throw new ClientException(
  17. "The callback parameter must be a valid callable object"
  18. );
  19. }
  20. }
  21. public function getPubSubContext() {
  22. return $this->_pubSubContext;
  23. }
  24. public function subscriptionCallback($callback = null) {
  25. if (isset($callback)) {
  26. $this->validateCallback($callback);
  27. }
  28. $this->_subscriptionCallback = $callback;
  29. }
  30. public function defaultCallback($callback = null) {
  31. if (isset($callback)) {
  32. $this->validateCallback($callback);
  33. }
  34. $this->_subscriptionCallback = $callback;
  35. }
  36. public function attachCallback($channel, $callback) {
  37. $this->validateCallback($callback);
  38. $this->_callbacks[$channel] = $callback;
  39. $this->_pubSubContext->subscribe($channel);
  40. }
  41. public function detachCallback($channel) {
  42. if (isset($this->_callbacks[$channel])) {
  43. unset($this->_callbacks[$channel]);
  44. $this->_pubSubContext->unsubscribe($channel);
  45. }
  46. }
  47. public function run() {
  48. foreach ($this->_pubSubContext as $message) {
  49. $kind = $message->kind;
  50. if ($kind !== PubSubContext::MESSAGE && $kind !== PubSubContext::PMESSAGE) {
  51. if (isset($this->_subscriptionCallback)) {
  52. $callback = $this->_subscriptionCallback;
  53. $callback($message);
  54. }
  55. continue;
  56. }
  57. if (isset($this->_callbacks[$message->channel])) {
  58. $callback = $this->_callbacks[$message->channel];
  59. $callback($message->payload);
  60. }
  61. else if (isset($this->_defaultCallback)) {
  62. $callback = $this->_defaultCallback;
  63. $callback($message);
  64. }
  65. }
  66. }
  67. public function stop() {
  68. $this->_pubSubContext->closeContext();
  69. }
  70. }