PubSubContext.php 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. <?php
  2. namespace Predis;
  3. class PubSubContext implements \Iterator {
  4. const SUBSCRIBE = 'subscribe';
  5. const UNSUBSCRIBE = 'unsubscribe';
  6. const PSUBSCRIBE = 'psubscribe';
  7. const PUNSUBSCRIBE = 'punsubscribe';
  8. const MESSAGE = 'message';
  9. const PMESSAGE = 'pmessage';
  10. const STATUS_VALID = 0x0001;
  11. const STATUS_SUBSCRIBED = 0x0010;
  12. const STATUS_PSUBSCRIBED = 0x0100;
  13. private $_client, $_position, $_options;
  14. public function __construct(Client $client, Array $options = null) {
  15. $this->checkCapabilities($client);
  16. $this->_options = $options ?: array();
  17. $this->_client = $client;
  18. $this->_statusFlags = self::STATUS_VALID;
  19. $this->genericSubscribeInit('subscribe');
  20. $this->genericSubscribeInit('psubscribe');
  21. }
  22. public function __destruct() {
  23. $this->closeContext();
  24. }
  25. private function checkCapabilities(Client $client) {
  26. if (Helpers::isCluster($client->getConnection())) {
  27. throw new ClientException(
  28. 'Cannot initialize a PUB/SUB context over a cluster of connections'
  29. );
  30. }
  31. $commands = array('publish', 'subscribe', 'unsubscribe', 'psubscribe', 'punsubscribe');
  32. if ($client->getProfile()->supportsCommands($commands) === false) {
  33. throw new ClientException(
  34. 'The current profile does not support PUB/SUB related commands'
  35. );
  36. }
  37. }
  38. private function genericSubscribeInit($subscribeAction) {
  39. if (isset($this->_options[$subscribeAction])) {
  40. $this->$subscribeAction($this->_options[$subscribeAction]);
  41. }
  42. }
  43. private function isFlagSet($value) {
  44. return ($this->_statusFlags & $value) === $value;
  45. }
  46. public function subscribe(/* arguments */) {
  47. $this->writeCommand(self::SUBSCRIBE, func_get_args());
  48. $this->_statusFlags |= self::STATUS_SUBSCRIBED;
  49. }
  50. public function unsubscribe(/* arguments */) {
  51. $this->writeCommand(self::UNSUBSCRIBE, func_get_args());
  52. }
  53. public function psubscribe(/* arguments */) {
  54. $this->writeCommand(self::PSUBSCRIBE, func_get_args());
  55. $this->_statusFlags |= self::STATUS_PSUBSCRIBED;
  56. }
  57. public function punsubscribe(/* arguments */) {
  58. $this->writeCommand(self::PUNSUBSCRIBE, func_get_args());
  59. }
  60. public function closeContext() {
  61. if ($this->valid()) {
  62. if ($this->isFlagSet(self::STATUS_SUBSCRIBED)) {
  63. $this->unsubscribe();
  64. }
  65. if ($this->isFlagSet(self::STATUS_PSUBSCRIBED)) {
  66. $this->punsubscribe();
  67. }
  68. }
  69. }
  70. private function writeCommand($method, $arguments) {
  71. if (count($arguments) === 1 && is_array($arguments[0])) {
  72. $arguments = $arguments[0];
  73. }
  74. $command = $this->_client->createCommand($method, $arguments);
  75. $this->_client->getConnection()->writeCommand($command);
  76. }
  77. public function rewind() {
  78. // NOOP
  79. }
  80. public function current() {
  81. return $this->getValue();
  82. }
  83. public function key() {
  84. return $this->_position;
  85. }
  86. public function next() {
  87. if ($this->isFlagSet(self::STATUS_VALID)) {
  88. $this->_position++;
  89. }
  90. return $this->_position;
  91. }
  92. public function valid() {
  93. $isValid = $this->isFlagSet(self::STATUS_VALID);
  94. $subscriptionFlags = self::STATUS_SUBSCRIBED + self::STATUS_PSUBSCRIBED;
  95. $hasSubscriptions = ($this->_statusFlags & $subscriptionFlags) > 0;
  96. return $isValid && $hasSubscriptions;
  97. }
  98. private function invalidate() {
  99. $this->_statusFlags = 0x0000;
  100. }
  101. private function getValue() {
  102. $response = $this->_client->getConnection()->read();
  103. switch ($response[0]) {
  104. case self::SUBSCRIBE:
  105. case self::UNSUBSCRIBE:
  106. case self::PSUBSCRIBE:
  107. case self::PUNSUBSCRIBE:
  108. if ($response[2] === 0) {
  109. $this->invalidate();
  110. }
  111. case self::MESSAGE:
  112. return (object) array(
  113. 'kind' => $response[0],
  114. 'channel' => $response[1],
  115. 'payload' => $response[2],
  116. );
  117. case self::PMESSAGE:
  118. return (object) array(
  119. 'kind' => $response[0],
  120. 'pattern' => $response[1],
  121. 'channel' => $response[2],
  122. 'payload' => $response[3],
  123. );
  124. default:
  125. throw new ClientException(
  126. "Received an unknown message type {$response[0]} inside of a pubsub context"
  127. );
  128. }
  129. }
  130. }