PubSubContext.php 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280
  1. <?php
  2. /*
  3. * This file is part of the Predis package.
  4. *
  5. * (c) Daniele Alessandri <suppakilla@gmail.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Predis\PubSub;
  11. use Predis\Client;
  12. use Predis\Helpers;
  13. use Predis\ClientException;
  14. /**
  15. * Client-side abstraction of a Publish / Subscribe context.
  16. *
  17. * @author Daniele Alessandri <suppakilla@gmail.com>
  18. */
  19. class PubSubContext implements \Iterator
  20. {
  21. const SUBSCRIBE = 'subscribe';
  22. const UNSUBSCRIBE = 'unsubscribe';
  23. const PSUBSCRIBE = 'psubscribe';
  24. const PUNSUBSCRIBE = 'punsubscribe';
  25. const MESSAGE = 'message';
  26. const PMESSAGE = 'pmessage';
  27. const STATUS_VALID = 0x0001;
  28. const STATUS_SUBSCRIBED = 0x0010;
  29. const STATUS_PSUBSCRIBED = 0x0100;
  30. private $client;
  31. private $position;
  32. private $options;
  33. /**
  34. * @param Client Client instance used by the context.
  35. * @param array Options for the context initialization.
  36. */
  37. public function __construct(Client $client, Array $options = null)
  38. {
  39. $this->checkCapabilities($client);
  40. $this->options = $options ?: array();
  41. $this->client = $client;
  42. $this->statusFlags = self::STATUS_VALID;
  43. $this->genericSubscribeInit('subscribe');
  44. $this->genericSubscribeInit('psubscribe');
  45. }
  46. /**
  47. * Automatically closes the context when PHP's garbage collector kicks in.
  48. */
  49. public function __destruct()
  50. {
  51. $this->closeContext();
  52. }
  53. /**
  54. * Checks if the passed client instance satisfies the required conditions
  55. * needed to initialize a Publish / Subscribe context.
  56. *
  57. * @param Client Client instance used by the context.
  58. */
  59. private function checkCapabilities(Client $client)
  60. {
  61. if (Helpers::isCluster($client->getConnection())) {
  62. throw new ClientException(
  63. 'Cannot initialize a PUB/SUB context over a cluster of connections'
  64. );
  65. }
  66. $commands = array('publish', 'subscribe', 'unsubscribe', 'psubscribe', 'punsubscribe');
  67. if ($client->getProfile()->supportsCommands($commands) === false) {
  68. throw new ClientException(
  69. 'The current profile does not support PUB/SUB related commands'
  70. );
  71. }
  72. }
  73. /**
  74. * This method shares the logic to handle both SUBSCRIBE and PSUBSCRIBE.
  75. *
  76. * @param string $subscribeAction Type of subscription.
  77. */
  78. private function genericSubscribeInit($subscribeAction)
  79. {
  80. if (isset($this->options[$subscribeAction])) {
  81. $this->$subscribeAction($this->options[$subscribeAction]);
  82. }
  83. }
  84. /**
  85. * Checks if the specified flag is valid in the state of the context.
  86. *
  87. * @param int $value Flag.
  88. * @return Boolean
  89. */
  90. private function isFlagSet($value)
  91. {
  92. return ($this->statusFlags & $value) === $value;
  93. }
  94. /**
  95. * Subscribes to the specified channels.
  96. *
  97. * @param mixed $arg,... One or more channel names.
  98. */
  99. public function subscribe(/* arguments */)
  100. {
  101. $this->writeCommand(self::SUBSCRIBE, func_get_args());
  102. $this->statusFlags |= self::STATUS_SUBSCRIBED;
  103. }
  104. /**
  105. * Unsubscribes from the specified channels.
  106. *
  107. * @param mixed $arg,... One or more channel names.
  108. */
  109. public function unsubscribe(/* arguments */)
  110. {
  111. $this->writeCommand(self::UNSUBSCRIBE, func_get_args());
  112. }
  113. /**
  114. * Subscribes to the specified channels using a pattern.
  115. *
  116. * @param mixed $arg,... One or more channel name patterns.
  117. */
  118. public function psubscribe(/* arguments */)
  119. {
  120. $this->writeCommand(self::PSUBSCRIBE, func_get_args());
  121. $this->statusFlags |= self::STATUS_PSUBSCRIBED;
  122. }
  123. /**
  124. * Unsubscribes from the specified channels using a pattern.
  125. *
  126. * @param mixed $arg,... One or more channel name patterns.
  127. */
  128. public function punsubscribe(/* arguments */)
  129. {
  130. $this->writeCommand(self::PUNSUBSCRIBE, func_get_args());
  131. }
  132. /**
  133. * Closes the context by unsubscribing from all the subscribed channels.
  134. */
  135. public function closeContext()
  136. {
  137. if ($this->valid()) {
  138. if ($this->isFlagSet(self::STATUS_SUBSCRIBED)) {
  139. $this->unsubscribe();
  140. }
  141. if ($this->isFlagSet(self::STATUS_PSUBSCRIBED)) {
  142. $this->punsubscribe();
  143. }
  144. }
  145. }
  146. /**
  147. * Writes a Redis command on the underlying connection.
  148. *
  149. * @param string $method ID of the command.
  150. * @param array $arguments List of arguments.
  151. */
  152. private function writeCommand($method, $arguments)
  153. {
  154. $arguments = Helpers::filterArrayArguments($arguments);
  155. $command = $this->client->createCommand($method, $arguments);
  156. $this->client->getConnection()->writeCommand($command);
  157. }
  158. /**
  159. * {@inheritdoc}
  160. */
  161. public function rewind()
  162. {
  163. // NOOP
  164. }
  165. /**
  166. * Returns the last message payload retrieved from the server and generated
  167. * by one of the active subscriptions.
  168. *
  169. * @return array
  170. */
  171. public function current()
  172. {
  173. return $this->getValue();
  174. }
  175. /**
  176. * {@inheritdoc}
  177. */
  178. public function key()
  179. {
  180. return $this->position;
  181. }
  182. /**
  183. * {@inheritdoc}
  184. */
  185. public function next()
  186. {
  187. if ($this->isFlagSet(self::STATUS_VALID)) {
  188. $this->position++;
  189. }
  190. return $this->position;
  191. }
  192. /**
  193. * Checks if the the context is still in a valid state to continue.
  194. *
  195. * @return Boolean
  196. */
  197. public function valid()
  198. {
  199. $isValid = $this->isFlagSet(self::STATUS_VALID);
  200. $subscriptionFlags = self::STATUS_SUBSCRIBED | self::STATUS_PSUBSCRIBED;
  201. $hasSubscriptions = ($this->statusFlags & $subscriptionFlags) > 0;
  202. return $isValid && $hasSubscriptions;
  203. }
  204. /**
  205. * Resets the state of the context.
  206. */
  207. private function invalidate()
  208. {
  209. $this->statusFlags = 0x0000;
  210. }
  211. /**
  212. * Waits for a new message from the server generated by one of the active
  213. * subscriptions and returns it when available.
  214. *
  215. * @return array
  216. */
  217. private function getValue()
  218. {
  219. $response = $this->client->getConnection()->read();
  220. switch ($response[0]) {
  221. case self::SUBSCRIBE:
  222. case self::UNSUBSCRIBE:
  223. case self::PSUBSCRIBE:
  224. case self::PUNSUBSCRIBE:
  225. if ($response[2] === 0) {
  226. $this->invalidate();
  227. }
  228. case self::MESSAGE:
  229. return (object) array(
  230. 'kind' => $response[0],
  231. 'channel' => $response[1],
  232. 'payload' => $response[2],
  233. );
  234. case self::PMESSAGE:
  235. return (object) array(
  236. 'kind' => $response[0],
  237. 'pattern' => $response[1],
  238. 'channel' => $response[2],
  239. 'payload' => $response[3],
  240. );
  241. default:
  242. throw new ClientException(
  243. "Received an unknown message type {$response[0]} inside of a pubsub context"
  244. );
  245. }
  246. }
  247. }