PubSubContext.php 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. <?php
  2. /*
  3. * This file is part of the Predis package.
  4. *
  5. * (c) Daniele Alessandri <suppakilla@gmail.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Predis\PubSub;
  11. use Predis\ClientInterface;
  12. use Predis\Helpers;
  13. use Predis\ClientException;
  14. use Predis\NotSupportedException;
  15. /**
  16. * Client-side abstraction of a Publish / Subscribe context.
  17. *
  18. * @author Daniele Alessandri <suppakilla@gmail.com>
  19. */
  20. class PubSubContext extends AbstractPubSubContext
  21. {
  22. private $client;
  23. private $options;
  24. /**
  25. * @param ClientInterface Client instance used by the context.
  26. * @param array Options for the context initialization.
  27. */
  28. public function __construct(ClientInterface $client, Array $options = null)
  29. {
  30. $this->checkCapabilities($client);
  31. $this->options = $options ?: array();
  32. $this->client = $client;
  33. $this->genericSubscribeInit('subscribe');
  34. $this->genericSubscribeInit('psubscribe');
  35. }
  36. /**
  37. * Checks if the passed client instance satisfies the required conditions
  38. * needed to initialize a Publish / Subscribe context.
  39. *
  40. * @param ClientInterface Client instance used by the context.
  41. */
  42. private function checkCapabilities(ClientInterface $client)
  43. {
  44. if (Helpers::isCluster($client->getConnection())) {
  45. throw new NotSupportedException('Cannot initialize a PUB/SUB context over a cluster of connections');
  46. }
  47. $commands = array('publish', 'subscribe', 'unsubscribe', 'psubscribe', 'punsubscribe');
  48. if ($client->getProfile()->supportsCommands($commands) === false) {
  49. throw new NotSupportedException('The current profile does not support PUB/SUB related commands');
  50. }
  51. }
  52. /**
  53. * This method shares the logic to handle both SUBSCRIBE and PSUBSCRIBE.
  54. *
  55. * @param string $subscribeAction Type of subscription.
  56. */
  57. private function genericSubscribeInit($subscribeAction)
  58. {
  59. if (isset($this->options[$subscribeAction])) {
  60. $this->$subscribeAction($this->options[$subscribeAction]);
  61. }
  62. }
  63. /**
  64. * {@inheritdoc}
  65. */
  66. protected function writeCommand($method, $arguments)
  67. {
  68. $arguments = Helpers::filterArrayArguments($arguments);
  69. $command = $this->client->createCommand($method, $arguments);
  70. $this->client->getConnection()->writeCommand($command);
  71. }
  72. /**
  73. * {@inheritdoc}
  74. */
  75. protected function disconnect()
  76. {
  77. $this->client->disconnect();
  78. }
  79. /**
  80. * {@inheritdoc}
  81. */
  82. protected function getValue()
  83. {
  84. $response = $this->client->getConnection()->read();
  85. switch ($response[0]) {
  86. case self::SUBSCRIBE:
  87. case self::UNSUBSCRIBE:
  88. case self::PSUBSCRIBE:
  89. case self::PUNSUBSCRIBE:
  90. if ($response[2] === 0) {
  91. $this->invalidate();
  92. }
  93. case self::MESSAGE:
  94. return (object) array(
  95. 'kind' => $response[0],
  96. 'channel' => $response[1],
  97. 'payload' => $response[2],
  98. );
  99. case self::PMESSAGE:
  100. return (object) array(
  101. 'kind' => $response[0],
  102. 'pattern' => $response[1],
  103. 'channel' => $response[2],
  104. 'payload' => $response[3],
  105. );
  106. default:
  107. $message = "Received an unknown message type {$response[0]} inside of a pubsub context";
  108. throw new ClientException($message);
  109. }
  110. }
  111. }