Consumer.php 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. <?php
  2. /*
  3. * This file is part of the Predis package.
  4. *
  5. * (c) Daniele Alessandri <suppakilla@gmail.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Predis\PubSub;
  11. use Predis\ClientException;
  12. use Predis\ClientInterface;
  13. use Predis\Command\Command;
  14. use Predis\Connection\AggregateConnectionInterface;
  15. use Predis\NotSupportedException;
  16. /**
  17. * PUB/SUB consumer abstraction.
  18. *
  19. * @author Daniele Alessandri <suppakilla@gmail.com>
  20. */
  21. class Consumer extends AbstractConsumer
  22. {
  23. private $client;
  24. private $options;
  25. /**
  26. * @param ClientInterface $client Client instance used by the consumer.
  27. * @param array $options Options for the consumer initialization.
  28. */
  29. public function __construct(ClientInterface $client, array $options = null)
  30. {
  31. $this->checkCapabilities($client);
  32. $this->options = $options ?: array();
  33. $this->client = $client;
  34. $this->genericSubscribeInit('subscribe');
  35. $this->genericSubscribeInit('psubscribe');
  36. }
  37. /**
  38. * Returns the underlying client instance used by the pub/sub iterator.
  39. *
  40. * @return ClientInterface
  41. */
  42. public function getClient()
  43. {
  44. return $this->client;
  45. }
  46. /**
  47. * Checks if the client instance satisfies the required conditions needed to
  48. * initialize a PUB/SUB consumer.
  49. *
  50. * @param ClientInterface $client Client instance used by the consumer.
  51. *
  52. * @throws NotSupportedException
  53. */
  54. private function checkCapabilities(ClientInterface $client)
  55. {
  56. if ($client->getConnection() instanceof AggregateConnectionInterface) {
  57. throw new NotSupportedException(
  58. 'Cannot initialize a PUB/SUB consumer over aggregate connections.'
  59. );
  60. }
  61. $commands = array('publish', 'subscribe', 'unsubscribe', 'psubscribe', 'punsubscribe');
  62. if ($client->getCommandFactory()->supportsCommands($commands) === false) {
  63. throw new NotSupportedException(
  64. 'PUB/SUB commands are not supported by the current command factory.'
  65. );
  66. }
  67. }
  68. /**
  69. * This method shares the logic to handle both SUBSCRIBE and PSUBSCRIBE.
  70. *
  71. * @param string $subscribeAction Type of subscription.
  72. */
  73. private function genericSubscribeInit($subscribeAction)
  74. {
  75. if (isset($this->options[$subscribeAction])) {
  76. $this->$subscribeAction($this->options[$subscribeAction]);
  77. }
  78. }
  79. /**
  80. * {@inheritdoc}
  81. */
  82. protected function writeRequest($method, $arguments)
  83. {
  84. $this->client->getConnection()->writeRequest(
  85. $this->client->createCommand($method,
  86. Command::normalizeArguments($arguments)
  87. )
  88. );
  89. }
  90. /**
  91. * {@inheritdoc}
  92. */
  93. protected function disconnect()
  94. {
  95. $this->client->disconnect();
  96. }
  97. /**
  98. * {@inheritdoc}
  99. */
  100. protected function getValue()
  101. {
  102. $response = $this->client->getConnection()->read();
  103. switch ($response[0]) {
  104. case self::SUBSCRIBE:
  105. case self::UNSUBSCRIBE:
  106. case self::PSUBSCRIBE:
  107. case self::PUNSUBSCRIBE:
  108. if ($response[2] === 0) {
  109. $this->invalidate();
  110. }
  111. // The missing break here is intentional as we must process
  112. // subscriptions and unsubscriptions as standard messages.
  113. // no break
  114. case self::MESSAGE:
  115. return (object) array(
  116. 'kind' => $response[0],
  117. 'channel' => $response[1],
  118. 'payload' => $response[2],
  119. );
  120. case self::PMESSAGE:
  121. return (object) array(
  122. 'kind' => $response[0],
  123. 'pattern' => $response[1],
  124. 'channel' => $response[2],
  125. 'payload' => $response[3],
  126. );
  127. case self::PONG:
  128. return (object) array(
  129. 'kind' => $response[0],
  130. 'payload' => $response[1],
  131. );
  132. default:
  133. throw new ClientException(
  134. "Unknown message type '{$response[0]}' received in the PUB/SUB context."
  135. );
  136. }
  137. }
  138. }