ConsumerTest.php 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344
  1. <?php
  2. /*
  3. * This file is part of the Predis package.
  4. *
  5. * (c) Daniele Alessandri <suppakilla@gmail.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Predis\PubSub;
  11. use PredisTestCase;
  12. use Predis\Client;
  13. use Predis\Profile;
  14. use Predis\PubSub\Consumer as PubSubConsumer;
  15. /**
  16. * @group realm-pubsub
  17. */
  18. class ConsumerTest extends PredisTestCase
  19. {
  20. /**
  21. * @group disconnected
  22. * @expectedException Predis\NotSupportedException
  23. * @expectedExceptionMessage The current profile does not support PUB/SUB related commands.
  24. */
  25. public function testPubSubConsumerRequirePubSubRelatedCommand()
  26. {
  27. $profile = $this->getMock('Predis\Profile\ProfileInterface');
  28. $profile->expects($this->any())
  29. ->method('supportsCommands')
  30. ->will($this->returnValue(false));
  31. $client = new Client(null, array('profile' => $profile));
  32. $pubsub = new PubSubConsumer($client);
  33. }
  34. /**
  35. * @group disconnected
  36. * @expectedException Predis\NotSupportedException
  37. * @expectedExceptionMessage Cannot initialize a PUB/SUB consumer over aggregate connections.
  38. */
  39. public function testPubSubConsumerDoesNotWorkOnClusters()
  40. {
  41. $cluster = $this->getMock('Predis\Connection\Aggregate\ClusterInterface');
  42. $client = new Client($cluster);
  43. $pubsub = new PubSubConsumer($client);
  44. }
  45. /**
  46. * @group disconnected
  47. */
  48. public function testConstructorWithoutSubscriptionsDoesNotStartConsumer()
  49. {
  50. $connection = $this->getMock('Predis\Connection\NodeConnectionInterface');
  51. $client = $this->getMock('Predis\Client', array('executeCommand'), array($connection));
  52. $client->expects($this->never())->method('executeCommand');
  53. $pubsub = new PubSubConsumer($client);
  54. }
  55. /**
  56. * @group disconnected
  57. */
  58. public function testConstructorWithSubscriptionsStartsConsumer()
  59. {
  60. $profile = Profile\Factory::get(REDIS_SERVER_VERSION);
  61. $cmdSubscribe = $profile->createCommand('subscribe', array('channel:foo'));
  62. $cmdPsubscribe = $profile->createCommand('psubscribe', array('channels:*'));
  63. $connection = $this->getMock('Predis\Connection\NodeConnectionInterface');
  64. $connection->expects($this->exactly(2))->method('writeRequest');
  65. $client = $this->getMock('Predis\Client', array('createCommand', 'writeRequest'), array($connection));
  66. $client->expects($this->exactly(2))
  67. ->method('createCommand')
  68. ->with($this->logicalOr($this->equalTo('subscribe'), $this->equalTo('psubscribe')))
  69. ->will($this->returnCallback(function ($id, $args) use ($profile) {
  70. return $profile->createCommand($id, $args);
  71. }));
  72. $options = array('subscribe' => 'channel:foo', 'psubscribe' => 'channels:*');
  73. $pubsub = new PubSubConsumer($client, $options);
  74. }
  75. /**
  76. * @group disconnected
  77. */
  78. public function testStoppingConsumerWithTrueClosesConnection()
  79. {
  80. $connection = $this->getMock('Predis\Connection\NodeConnectionInterface');
  81. $client = $this->getMock('Predis\Client', array('disconnect'), array($connection));
  82. $client->expects($this->exactly(1))->method('disconnect');
  83. $pubsub = new PubSubConsumer($client, array('subscribe' => 'channel:foo'));
  84. $connection->expects($this->never())->method('writeRequest');
  85. $pubsub->stop(true);
  86. }
  87. /**
  88. * @group disconnected
  89. */
  90. public function testStoppingConsumerWithFalseSendsUnsubscriptions()
  91. {
  92. $profile = Profile\Factory::get(REDIS_SERVER_VERSION);
  93. $classUnsubscribe = $profile->getCommandClass('unsubscribe');
  94. $classPunsubscribe = $profile->getCommandClass('punsubscribe');
  95. $connection = $this->getMock('Predis\Connection\NodeConnectionInterface');
  96. $client = $this->getMock('Predis\Client', array('disconnect'), array($connection));
  97. $options = array('subscribe' => 'channel:foo', 'psubscribe' => 'channels:*');
  98. $pubsub = new PubSubConsumer($client, $options);
  99. $connection->expects($this->exactly(2))
  100. ->method('writeRequest')
  101. ->with($this->logicalOr(
  102. $this->isInstanceOf($classUnsubscribe),
  103. $this->isInstanceOf($classPunsubscribe)
  104. ));
  105. $pubsub->stop(false);
  106. }
  107. /**
  108. * @group disconnected
  109. */
  110. public function testIsNotValidWhenNotSubscribed()
  111. {
  112. $connection = $this->getMock('Predis\Connection\NodeConnectionInterface');
  113. $client = $this->getMock('Predis\Client', array('disconnect'), array($connection));
  114. $pubsub = new PubSubConsumer($client);
  115. $this->assertFalse($pubsub->valid());
  116. $this->assertNull($pubsub->next());
  117. }
  118. /**
  119. * @group disconnected
  120. */
  121. public function testHandlesPongMessages()
  122. {
  123. $rawmessage = array('pong', '');
  124. $connection = $this->getMock('Predis\Connection\NodeConnectionInterface');
  125. $connection->expects($this->once())->method('read')->will($this->returnValue($rawmessage));
  126. $client = new Client($connection);
  127. $pubsub = new PubSubConsumer($client, array('subscribe' => 'channel:foo'));
  128. $message = $pubsub->current();
  129. $this->assertSame('pong', $message->kind);
  130. $this->assertSame('', $message->payload);
  131. }
  132. /**
  133. * @group disconnected
  134. */
  135. public function testHandlesPongMessagesWithPayload()
  136. {
  137. $rawmessage = array('pong', 'foobar');
  138. $connection = $this->getMock('Predis\Connection\NodeConnectionInterface');
  139. $connection->expects($this->once())->method('read')->will($this->returnValue($rawmessage));
  140. $client = new Client($connection);
  141. $pubsub = new PubSubConsumer($client, array('subscribe' => 'channel:foo'));
  142. $message = $pubsub->current();
  143. $this->assertSame('pong', $message->kind);
  144. $this->assertSame('foobar', $message->payload);
  145. }
  146. /**
  147. * @group disconnected
  148. */
  149. public function testReadsMessageFromConnection()
  150. {
  151. $rawmessage = array('message', 'channel:foo', 'message from channel');
  152. $connection = $this->getMock('Predis\Connection\NodeConnectionInterface');
  153. $connection->expects($this->once())->method('read')->will($this->returnValue($rawmessage));
  154. $client = new Client($connection);
  155. $pubsub = new PubSubConsumer($client, array('subscribe' => 'channel:foo'));
  156. $message = $pubsub->current();
  157. $this->assertSame('message', $message->kind);
  158. $this->assertSame('channel:foo', $message->channel);
  159. $this->assertSame('message from channel', $message->payload);
  160. }
  161. /**
  162. * @group disconnected
  163. */
  164. public function testReadsPmessageFromConnection()
  165. {
  166. $rawmessage = array('pmessage', 'channel:*', 'channel:foo', 'message from channel');
  167. $connection = $this->getMock('Predis\Connection\NodeConnectionInterface');
  168. $connection->expects($this->once())->method('read')->will($this->returnValue($rawmessage));
  169. $client = new Client($connection);
  170. $pubsub = new PubSubConsumer($client, array('psubscribe' => 'channel:*'));
  171. $message = $pubsub->current();
  172. $this->assertSame('pmessage', $message->kind);
  173. $this->assertSame('channel:*', $message->pattern);
  174. $this->assertSame('channel:foo', $message->channel);
  175. $this->assertSame('message from channel', $message->payload);
  176. }
  177. /**
  178. * @group disconnected
  179. */
  180. public function testReadsSubscriptionMessageFromConnection()
  181. {
  182. $rawmessage = array('subscribe', 'channel:foo', 1);
  183. $connection = $this->getMock('Predis\Connection\NodeConnectionInterface');
  184. $connection->expects($this->once())->method('read')->will($this->returnValue($rawmessage));
  185. $client = new Client($connection);
  186. $pubsub = new PubSubConsumer($client, array('subscribe' => 'channel:foo'));
  187. $message = $pubsub->current();
  188. $this->assertSame('subscribe', $message->kind);
  189. $this->assertSame('channel:foo', $message->channel);
  190. $this->assertSame(1, $message->payload);
  191. }
  192. /**
  193. * @group disconnected
  194. */
  195. public function testReadsUnsubscriptionMessageFromConnection()
  196. {
  197. $rawmessage = array('unsubscribe', 'channel:foo', 1);
  198. $connection = $this->getMock('Predis\Connection\NodeConnectionInterface');
  199. $connection->expects($this->once())->method('read')->will($this->returnValue($rawmessage));
  200. $client = new Client($connection);
  201. $pubsub = new PubSubConsumer($client, array('subscribe' => 'channel:foo'));
  202. $message = $pubsub->current();
  203. $this->assertSame('unsubscribe', $message->kind);
  204. $this->assertSame('channel:foo', $message->channel);
  205. $this->assertSame(1, $message->payload);
  206. }
  207. /**
  208. * @group disconnected
  209. */
  210. public function testUnsubscriptionMessageWithZeroChannelCountInvalidatesConsumer()
  211. {
  212. $rawmessage = array('unsubscribe', 'channel:foo', 0);
  213. $connection = $this->getMock('Predis\Connection\NodeConnectionInterface');
  214. $connection->expects($this->once())->method('read')->will($this->returnValue($rawmessage));
  215. $client = new Client($connection);
  216. $pubsub = new PubSubConsumer($client, array('subscribe' => 'channel:foo'));
  217. $this->assertTrue($pubsub->valid());
  218. $message = $pubsub->current();
  219. $this->assertSame('unsubscribe', $message->kind);
  220. $this->assertSame('channel:foo', $message->channel);
  221. $this->assertSame(0, $message->payload);
  222. $this->assertFalse($pubsub->valid());
  223. }
  224. /**
  225. * @group disconnected
  226. */
  227. public function testGetUnderlyingClientInstance()
  228. {
  229. $connection = $this->getMock('Predis\Connection\NodeConnectionInterface');
  230. $client = new Client($connection);
  231. $pubsub = new PubSubConsumer($client);
  232. $this->assertSame($client, $pubsub->getClient());
  233. }
  234. // ******************************************************************** //
  235. // ---- INTEGRATION TESTS --------------------------------------------- //
  236. // ******************************************************************** //
  237. /**
  238. * @group connected
  239. */
  240. public function testPubSubAgainstRedisServer()
  241. {
  242. $parameters = array(
  243. 'host' => REDIS_SERVER_HOST,
  244. 'port' => REDIS_SERVER_PORT,
  245. 'database' => REDIS_SERVER_DBNUM,
  246. // Prevents suite from handing on broken test
  247. 'read_write_timeout' => 2,
  248. );
  249. $options = array('profile' => REDIS_SERVER_VERSION);
  250. $messages = array();
  251. $producer = new Client($parameters, $options);
  252. $producer->connect();
  253. $consumer = new Client($parameters, $options);
  254. $consumer->connect();
  255. $pubsub = new PubSubConsumer($consumer);
  256. $pubsub->subscribe('channel:foo');
  257. $producer->publish('channel:foo', 'message1');
  258. $producer->publish('channel:foo', 'message2');
  259. $producer->publish('channel:foo', 'QUIT');
  260. foreach ($pubsub as $message) {
  261. if ($message->kind !== 'message') {
  262. continue;
  263. }
  264. $messages[] = ($payload = $message->payload);
  265. if ($payload === 'QUIT') {
  266. $pubsub->stop();
  267. }
  268. }
  269. $this->assertSame(array('message1', 'message2', 'QUIT'), $messages);
  270. $this->assertFalse($pubsub->valid());
  271. $this->assertEquals('ECHO', $consumer->echo('ECHO'));
  272. }
  273. }