PubSubContext.php 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. <?php
  2. namespace Predis;
  3. class PubSubContext implements \Iterator {
  4. const SUBSCRIBE = 'subscribe';
  5. const UNSUBSCRIBE = 'unsubscribe';
  6. const PSUBSCRIBE = 'psubscribe';
  7. const PUNSUBSCRIBE = 'punsubscribe';
  8. const MESSAGE = 'message';
  9. const PMESSAGE = 'pmessage';
  10. const STATUS_VALID = 0x0001;
  11. const STATUS_SUBSCRIBED = 0x0010;
  12. const STATUS_PSUBSCRIBED = 0x0100;
  13. private $_client, $_position, $_options;
  14. public function __construct(Client $client, Array $options = null) {
  15. $this->checkCapabilities($client);
  16. $this->_options = $options ?: array();
  17. $this->_client = $client;
  18. $this->_statusFlags = self::STATUS_VALID;
  19. $this->genericSubscribeInit('subscribe');
  20. $this->genericSubscribeInit('psubscribe');
  21. }
  22. public function __destruct() {
  23. $this->closeContext();
  24. }
  25. private function checkCapabilities(Client $client) {
  26. if (Utils::isCluster($client->getConnection())) {
  27. throw new ClientException(
  28. 'Cannot initialize a PUB/SUB context over a cluster of connections'
  29. );
  30. }
  31. $profile = $client->getProfile();
  32. $commands = array('publish', 'subscribe', 'unsubscribe', 'psubscribe', 'punsubscribe');
  33. if ($profile->supportsCommands($commands) === false) {
  34. throw new ClientException(
  35. 'The current profile does not support PUB/SUB related commands'
  36. );
  37. }
  38. }
  39. private function genericSubscribeInit($subscribeAction) {
  40. if (isset($this->_options[$subscribeAction])) {
  41. if (is_array($this->_options[$subscribeAction])) {
  42. foreach ($this->_options[$subscribeAction] as $subscription) {
  43. $this->$subscribeAction($subscription);
  44. }
  45. }
  46. else {
  47. $this->$subscribeAction($this->_options[$subscribeAction]);
  48. }
  49. }
  50. }
  51. private function isFlagSet($value) {
  52. return ($this->_statusFlags & $value) === $value;
  53. }
  54. public function subscribe(/* arguments */) {
  55. $this->writeCommand(self::SUBSCRIBE, func_get_args());
  56. $this->_statusFlags |= self::STATUS_SUBSCRIBED;
  57. }
  58. public function unsubscribe(/* arguments */) {
  59. $this->writeCommand(self::UNSUBSCRIBE, func_get_args());
  60. }
  61. public function psubscribe(/* arguments */) {
  62. $this->writeCommand(self::PSUBSCRIBE, func_get_args());
  63. $this->_statusFlags |= self::STATUS_PSUBSCRIBED;
  64. }
  65. public function punsubscribe(/* arguments */) {
  66. $this->writeCommand(self::PUNSUBSCRIBE, func_get_args());
  67. }
  68. public function closeContext() {
  69. if ($this->valid()) {
  70. if ($this->isFlagSet(self::STATUS_SUBSCRIBED)) {
  71. $this->unsubscribe();
  72. }
  73. if ($this->isFlagSet(self::STATUS_PSUBSCRIBED)) {
  74. $this->punsubscribe();
  75. }
  76. }
  77. }
  78. private function writeCommand($method, $arguments) {
  79. if (count($arguments) === 1 && is_array($arguments[0])) {
  80. $arguments = $arguments[0];
  81. }
  82. $command = $this->_client->createCommand($method, $arguments);
  83. $this->_client->getConnection()->writeCommand($command);
  84. }
  85. public function rewind() {
  86. // NOOP
  87. }
  88. public function current() {
  89. return $this->getValue();
  90. }
  91. public function key() {
  92. return $this->_position;
  93. }
  94. public function next() {
  95. if ($this->isFlagSet(self::STATUS_VALID)) {
  96. $this->_position++;
  97. }
  98. return $this->_position;
  99. }
  100. public function valid() {
  101. $subscriptions = self::STATUS_SUBSCRIBED + self::STATUS_PSUBSCRIBED;
  102. return $this->isFlagSet(self::STATUS_VALID)
  103. && ($this->_statusFlags & $subscriptions) > 0;
  104. }
  105. private function invalidate() {
  106. $this->_statusFlags = 0x0000;
  107. }
  108. private function getValue() {
  109. $connection = $this->_client->getConnection();
  110. $protocol = $connection->getProtocol();
  111. $response = $protocol->read($connection);
  112. switch ($response[0]) {
  113. case self::SUBSCRIBE:
  114. case self::UNSUBSCRIBE:
  115. case self::PSUBSCRIBE:
  116. case self::PUNSUBSCRIBE:
  117. if ($response[2] === 0) {
  118. $this->invalidate();
  119. }
  120. case self::MESSAGE:
  121. return (object) array(
  122. 'kind' => $response[0],
  123. 'channel' => $response[1],
  124. 'payload' => $response[2],
  125. );
  126. case self::PMESSAGE:
  127. return (object) array(
  128. 'kind' => $response[0],
  129. 'pattern' => $response[1],
  130. 'channel' => $response[2],
  131. 'payload' => $response[3],
  132. );
  133. default:
  134. throw new ClientException(
  135. "Received an unknown message type {$response[0]} inside of a pubsub context"
  136. );
  137. }
  138. }
  139. }