PubSubContext.php 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. <?php
  2. namespace Predis;
  3. class PubSubContext implements \Iterator {
  4. const SUBSCRIBE = 'subscribe';
  5. const UNSUBSCRIBE = 'unsubscribe';
  6. const PSUBSCRIBE = 'psubscribe';
  7. const PUNSUBSCRIBE = 'punsubscribe';
  8. const MESSAGE = 'message';
  9. const PMESSAGE = 'pmessage';
  10. const STATUS_VALID = 0x0001;
  11. const STATUS_SUBSCRIBED = 0x0010;
  12. const STATUS_PSUBSCRIBED = 0x0100;
  13. private $_client, $_position, $_options;
  14. public function __construct(Client $client, Array $options = null) {
  15. $this->checkCapabilities($client);
  16. $this->_options = $options ?: array();
  17. $this->_client = $client;
  18. $this->_statusFlags = self::STATUS_VALID;
  19. $this->genericSubscribeInit('subscribe');
  20. $this->genericSubscribeInit('psubscribe');
  21. }
  22. public function __destruct() {
  23. $this->closeContext();
  24. }
  25. private function checkCapabilities(Client $client) {
  26. if (Helpers::isCluster($client->getConnection())) {
  27. throw new ClientException(
  28. 'Cannot initialize a PUB/SUB context over a cluster of connections'
  29. );
  30. }
  31. $profile = $client->getProfile();
  32. $commands = array('publish', 'subscribe', 'unsubscribe', 'psubscribe', 'punsubscribe');
  33. if ($profile->supportsCommands($commands) === false) {
  34. throw new ClientException(
  35. 'The current profile does not support PUB/SUB related commands'
  36. );
  37. }
  38. }
  39. private function genericSubscribeInit($subscribeAction) {
  40. if (isset($this->_options[$subscribeAction])) {
  41. $this->$subscribeAction($this->_options[$subscribeAction]);
  42. }
  43. }
  44. private function isFlagSet($value) {
  45. return ($this->_statusFlags & $value) === $value;
  46. }
  47. public function subscribe(/* arguments */) {
  48. $this->writeCommand(self::SUBSCRIBE, func_get_args());
  49. $this->_statusFlags |= self::STATUS_SUBSCRIBED;
  50. }
  51. public function unsubscribe(/* arguments */) {
  52. $this->writeCommand(self::UNSUBSCRIBE, func_get_args());
  53. }
  54. public function psubscribe(/* arguments */) {
  55. $this->writeCommand(self::PSUBSCRIBE, func_get_args());
  56. $this->_statusFlags |= self::STATUS_PSUBSCRIBED;
  57. }
  58. public function punsubscribe(/* arguments */) {
  59. $this->writeCommand(self::PUNSUBSCRIBE, func_get_args());
  60. }
  61. public function closeContext() {
  62. if ($this->valid()) {
  63. if ($this->isFlagSet(self::STATUS_SUBSCRIBED)) {
  64. $this->unsubscribe();
  65. }
  66. if ($this->isFlagSet(self::STATUS_PSUBSCRIBED)) {
  67. $this->punsubscribe();
  68. }
  69. }
  70. }
  71. private function writeCommand($method, $arguments) {
  72. if (count($arguments) === 1 && is_array($arguments[0])) {
  73. $arguments = $arguments[0];
  74. }
  75. $command = $this->_client->createCommand($method, $arguments);
  76. $this->_client->getConnection()->writeCommand($command);
  77. }
  78. public function rewind() {
  79. // NOOP
  80. }
  81. public function current() {
  82. return $this->getValue();
  83. }
  84. public function key() {
  85. return $this->_position;
  86. }
  87. public function next() {
  88. if ($this->isFlagSet(self::STATUS_VALID)) {
  89. $this->_position++;
  90. }
  91. return $this->_position;
  92. }
  93. public function valid() {
  94. $subscriptions = self::STATUS_SUBSCRIBED + self::STATUS_PSUBSCRIBED;
  95. return $this->isFlagSet(self::STATUS_VALID)
  96. && ($this->_statusFlags & $subscriptions) > 0;
  97. }
  98. private function invalidate() {
  99. $this->_statusFlags = 0x0000;
  100. }
  101. private function getValue() {
  102. $response = $this->_client->getConnection()->read();
  103. switch ($response[0]) {
  104. case self::SUBSCRIBE:
  105. case self::UNSUBSCRIBE:
  106. case self::PSUBSCRIBE:
  107. case self::PUNSUBSCRIBE:
  108. if ($response[2] === 0) {
  109. $this->invalidate();
  110. }
  111. case self::MESSAGE:
  112. return (object) array(
  113. 'kind' => $response[0],
  114. 'channel' => $response[1],
  115. 'payload' => $response[2],
  116. );
  117. case self::PMESSAGE:
  118. return (object) array(
  119. 'kind' => $response[0],
  120. 'pattern' => $response[1],
  121. 'channel' => $response[2],
  122. 'payload' => $response[3],
  123. );
  124. default:
  125. throw new ClientException(
  126. "Received an unknown message type {$response[0]} inside of a pubsub context"
  127. );
  128. }
  129. }
  130. }