PubSubContext.php 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. <?php
  2. namespace Predis;
  3. class PubSubContext implements \Iterator {
  4. const SUBSCRIBE = 'subscribe';
  5. const UNSUBSCRIBE = 'unsubscribe';
  6. const PSUBSCRIBE = 'psubscribe';
  7. const PUNSUBSCRIBE = 'punsubscribe';
  8. const MESSAGE = 'message';
  9. const PMESSAGE = 'pmessage';
  10. const STATUS_VALID = 0x0001;
  11. const STATUS_SUBSCRIBED = 0x0010;
  12. const STATUS_PSUBSCRIBED = 0x0100;
  13. private $_client;
  14. private $_position;
  15. private $_options;
  16. public function __construct(Client $client, Array $options = null) {
  17. $this->checkCapabilities($client);
  18. $this->_options = $options ?: array();
  19. $this->_client = $client;
  20. $this->_statusFlags = self::STATUS_VALID;
  21. $this->genericSubscribeInit('subscribe');
  22. $this->genericSubscribeInit('psubscribe');
  23. }
  24. public function __destruct() {
  25. $this->closeContext();
  26. }
  27. private function checkCapabilities(Client $client) {
  28. if (Helpers::isCluster($client->getConnection())) {
  29. throw new ClientException(
  30. 'Cannot initialize a PUB/SUB context over a cluster of connections'
  31. );
  32. }
  33. $commands = array('publish', 'subscribe', 'unsubscribe', 'psubscribe', 'punsubscribe');
  34. if ($client->getProfile()->supportsCommands($commands) === false) {
  35. throw new ClientException(
  36. 'The current profile does not support PUB/SUB related commands'
  37. );
  38. }
  39. }
  40. private function genericSubscribeInit($subscribeAction) {
  41. if (isset($this->_options[$subscribeAction])) {
  42. $this->$subscribeAction($this->_options[$subscribeAction]);
  43. }
  44. }
  45. private function isFlagSet($value) {
  46. return ($this->_statusFlags & $value) === $value;
  47. }
  48. public function subscribe(/* arguments */) {
  49. $this->writeCommand(self::SUBSCRIBE, func_get_args());
  50. $this->_statusFlags |= self::STATUS_SUBSCRIBED;
  51. }
  52. public function unsubscribe(/* arguments */) {
  53. $this->writeCommand(self::UNSUBSCRIBE, func_get_args());
  54. }
  55. public function psubscribe(/* arguments */) {
  56. $this->writeCommand(self::PSUBSCRIBE, func_get_args());
  57. $this->_statusFlags |= self::STATUS_PSUBSCRIBED;
  58. }
  59. public function punsubscribe(/* arguments */) {
  60. $this->writeCommand(self::PUNSUBSCRIBE, func_get_args());
  61. }
  62. public function closeContext() {
  63. if ($this->valid()) {
  64. if ($this->isFlagSet(self::STATUS_SUBSCRIBED)) {
  65. $this->unsubscribe();
  66. }
  67. if ($this->isFlagSet(self::STATUS_PSUBSCRIBED)) {
  68. $this->punsubscribe();
  69. }
  70. }
  71. }
  72. private function writeCommand($method, $arguments) {
  73. $arguments = Helpers::filterArrayArguments($arguments);
  74. $command = $this->_client->createCommand($method, $arguments);
  75. $this->_client->getConnection()->writeCommand($command);
  76. }
  77. public function rewind() {
  78. // NOOP
  79. }
  80. public function current() {
  81. return $this->getValue();
  82. }
  83. public function key() {
  84. return $this->_position;
  85. }
  86. public function next() {
  87. if ($this->isFlagSet(self::STATUS_VALID)) {
  88. $this->_position++;
  89. }
  90. return $this->_position;
  91. }
  92. public function valid() {
  93. $isValid = $this->isFlagSet(self::STATUS_VALID);
  94. $subscriptionFlags = self::STATUS_SUBSCRIBED | self::STATUS_PSUBSCRIBED;
  95. $hasSubscriptions = ($this->_statusFlags & $subscriptionFlags) > 0;
  96. return $isValid && $hasSubscriptions;
  97. }
  98. private function invalidate() {
  99. $this->_statusFlags = 0x0000;
  100. }
  101. private function getValue() {
  102. $response = $this->_client->getConnection()->read();
  103. switch ($response[0]) {
  104. case self::SUBSCRIBE:
  105. case self::UNSUBSCRIBE:
  106. case self::PSUBSCRIBE:
  107. case self::PUNSUBSCRIBE:
  108. if ($response[2] === 0) {
  109. $this->invalidate();
  110. }
  111. case self::MESSAGE:
  112. return (object) array(
  113. 'kind' => $response[0],
  114. 'channel' => $response[1],
  115. 'payload' => $response[2],
  116. );
  117. case self::PMESSAGE:
  118. return (object) array(
  119. 'kind' => $response[0],
  120. 'pattern' => $response[1],
  121. 'channel' => $response[2],
  122. 'payload' => $response[3],
  123. );
  124. default:
  125. throw new ClientException(
  126. "Received an unknown message type {$response[0]} inside of a pubsub context"
  127. );
  128. }
  129. }
  130. }