ConsumerTest.php 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308
  1. <?php
  2. /*
  3. * This file is part of the Predis package.
  4. *
  5. * (c) Daniele Alessandri <suppakilla@gmail.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Predis\PubSub;
  11. use PredisTestCase;
  12. use Predis\Client;
  13. use Predis\Profile;
  14. use Predis\PubSub\Consumer as PubSubConsumer;
  15. /**
  16. * @group realm-pubsub
  17. */
  18. class ConsumerTest extends PredisTestCase
  19. {
  20. /**
  21. * @group disconnected
  22. * @expectedException Predis\NotSupportedException
  23. * @expectedExceptionMessage The current profile does not support PUB/SUB related commands
  24. */
  25. public function testPubSubConsumerRequirePubSubRelatedCommand()
  26. {
  27. $profile = $this->getMock('Predis\Profile\ProfileInterface');
  28. $profile->expects($this->any())
  29. ->method('supportsCommands')
  30. ->will($this->returnValue(false));
  31. $client = new Client(null, array('profile' => $profile));
  32. $pubsub = new PubSubConsumer($client);
  33. }
  34. /**
  35. * @group disconnected
  36. * @expectedException Predis\NotSupportedException
  37. * @expectedExceptionMessage Cannot initialize a PUB/SUB consumer when using aggregated connections
  38. */
  39. public function testPubSubConsumerDoesNotWorkOnClusters()
  40. {
  41. $cluster = $this->getMock('Predis\Connection\ClusterConnectionInterface');
  42. $client = new Client($cluster);
  43. $pubsub = new PubSubConsumer($client);
  44. }
  45. /**
  46. * @group disconnected
  47. */
  48. public function testConstructorWithoutSubscriptionsDoesNotStartConsumer()
  49. {
  50. $connection = $this->getMock('Predis\Connection\SingleConnectionInterface');
  51. $client = $this->getMock('Predis\Client', array('executeCommand'), array($connection));
  52. $client->expects($this->never())->method('executeCommand');
  53. $pubsub = new PubSubConsumer($client);
  54. }
  55. /**
  56. * @group disconnected
  57. */
  58. public function testConstructorWithSubscriptionsStartsConsumer()
  59. {
  60. $profile = Profile\Factory::get(REDIS_SERVER_VERSION);
  61. $cmdSubscribe = $profile->createCommand('subscribe', array('channel:foo'));
  62. $cmdPsubscribe = $profile->createCommand('psubscribe', array('channels:*'));
  63. $connection = $this->getMock('Predis\Connection\SingleConnectionInterface');
  64. $connection->expects($this->exactly(2))->method('writeRequest');
  65. $client = $this->getMock('Predis\Client', array('createCommand', 'writeRequest'), array($connection));
  66. $client->expects($this->exactly(2))
  67. ->method('createCommand')
  68. ->with($this->logicalOr($this->equalTo('subscribe'), $this->equalTo('psubscribe')))
  69. ->will($this->returnCallback(function ($id, $args) use ($profile) {
  70. return $profile->createCommand($id, $args);
  71. }));
  72. $options = array('subscribe' => 'channel:foo', 'psubscribe' => 'channels:*');
  73. $pubsub = new PubSubConsumer($client, $options);
  74. }
  75. /**
  76. * @group disconnected
  77. */
  78. public function testStoppingConsumerWithTrueClosesConnection()
  79. {
  80. $connection = $this->getMock('Predis\Connection\SingleConnectionInterface');
  81. $client = $this->getMock('Predis\Client', array('disconnect'), array($connection));
  82. $client->expects($this->exactly(1))->method('disconnect');
  83. $pubsub = new PubSubConsumer($client, array('subscribe' => 'channel:foo'));
  84. $connection->expects($this->never())->method('writeRequest');
  85. $pubsub->stop(true);
  86. }
  87. /**
  88. * @group disconnected
  89. */
  90. public function testStoppingConsumerWithFalseSendsUnsubscriptions()
  91. {
  92. $profile = Profile\Factory::get(REDIS_SERVER_VERSION);
  93. $classUnsubscribe = $profile->getCommandClass('unsubscribe');
  94. $classPunsubscribe = $profile->getCommandClass('punsubscribe');
  95. $connection = $this->getMock('Predis\Connection\SingleConnectionInterface');
  96. $client = $this->getMock('Predis\Client', array('disconnect'), array($connection));
  97. $options = array('subscribe' => 'channel:foo', 'psubscribe' => 'channels:*');
  98. $pubsub = new PubSubConsumer($client, $options);
  99. $connection->expects($this->exactly(2))
  100. ->method('writeRequest')
  101. ->with($this->logicalOr(
  102. $this->isInstanceOf($classUnsubscribe),
  103. $this->isInstanceOf($classPunsubscribe)
  104. ));
  105. $pubsub->stop(false);
  106. }
  107. /**
  108. * @group disconnected
  109. */
  110. public function testIsNotValidWhenNotSubscribed()
  111. {
  112. $connection = $this->getMock('Predis\Connection\SingleConnectionInterface');
  113. $client = $this->getMock('Predis\Client', array('disconnect'), array($connection));
  114. $pubsub = new PubSubConsumer($client);
  115. $this->assertFalse($pubsub->valid());
  116. $this->assertNull($pubsub->next());
  117. }
  118. /**
  119. * @group disconnected
  120. */
  121. public function testReadsMessageFromConnection()
  122. {
  123. $rawmessage = array('message', 'channel:foo', 'message from channel');
  124. $connection = $this->getMock('Predis\Connection\SingleConnectionInterface');
  125. $connection->expects($this->once())->method('read')->will($this->returnValue($rawmessage));
  126. $client = new Client($connection);
  127. $pubsub = new PubSubConsumer($client, array('subscribe' => 'channel:foo'));
  128. $message = $pubsub->current();
  129. $this->assertSame('message', $message->kind);
  130. $this->assertSame('channel:foo', $message->channel);
  131. $this->assertSame('message from channel', $message->payload);
  132. }
  133. /**
  134. * @group disconnected
  135. */
  136. public function testReadsPmessageFromConnection()
  137. {
  138. $rawmessage = array('pmessage', 'channel:*', 'channel:foo', 'message from channel');
  139. $connection = $this->getMock('Predis\Connection\SingleConnectionInterface');
  140. $connection->expects($this->once())->method('read')->will($this->returnValue($rawmessage));
  141. $client = new Client($connection);
  142. $pubsub = new PubSubConsumer($client, array('psubscribe' => 'channel:*'));
  143. $message = $pubsub->current();
  144. $this->assertSame('pmessage', $message->kind);
  145. $this->assertSame('channel:*', $message->pattern);
  146. $this->assertSame('channel:foo', $message->channel);
  147. $this->assertSame('message from channel', $message->payload);
  148. }
  149. /**
  150. * @group disconnected
  151. */
  152. public function testReadsSubscriptionMessageFromConnection()
  153. {
  154. $rawmessage = array('subscribe', 'channel:foo', 1);
  155. $connection = $this->getMock('Predis\Connection\SingleConnectionInterface');
  156. $connection->expects($this->once())->method('read')->will($this->returnValue($rawmessage));
  157. $client = new Client($connection);
  158. $pubsub = new PubSubConsumer($client, array('subscribe' => 'channel:foo'));
  159. $message = $pubsub->current();
  160. $this->assertSame('subscribe', $message->kind);
  161. $this->assertSame('channel:foo', $message->channel);
  162. $this->assertSame(1, $message->payload);
  163. }
  164. /**
  165. * @group disconnected
  166. */
  167. public function testReadsUnsubscriptionMessageFromConnection()
  168. {
  169. $rawmessage = array('unsubscribe', 'channel:foo', 1);
  170. $connection = $this->getMock('Predis\Connection\SingleConnectionInterface');
  171. $connection->expects($this->once())->method('read')->will($this->returnValue($rawmessage));
  172. $client = new Client($connection);
  173. $pubsub = new PubSubConsumer($client, array('subscribe' => 'channel:foo'));
  174. $message = $pubsub->current();
  175. $this->assertSame('unsubscribe', $message->kind);
  176. $this->assertSame('channel:foo', $message->channel);
  177. $this->assertSame(1, $message->payload);
  178. }
  179. /**
  180. * @group disconnected
  181. */
  182. public function testUnsubscriptionMessageWithZeroChannelCountInvalidatesConsumer()
  183. {
  184. $rawmessage = array('unsubscribe', 'channel:foo', 0);
  185. $connection = $this->getMock('Predis\Connection\SingleConnectionInterface');
  186. $connection->expects($this->once())->method('read')->will($this->returnValue($rawmessage));
  187. $client = new Client($connection);
  188. $pubsub = new PubSubConsumer($client, array('subscribe' => 'channel:foo'));
  189. $this->assertTrue($pubsub->valid());
  190. $message = $pubsub->current();
  191. $this->assertSame('unsubscribe', $message->kind);
  192. $this->assertSame('channel:foo', $message->channel);
  193. $this->assertSame(0, $message->payload);
  194. $this->assertFalse($pubsub->valid());
  195. }
  196. /**
  197. * @group disconnected
  198. */
  199. public function testGetUnderlyingClientInstance()
  200. {
  201. $connection = $this->getMock('Predis\Connection\SingleConnectionInterface');
  202. $client = new Client($connection);
  203. $pubsub = new PubSubConsumer($client);
  204. $this->assertSame($client, $pubsub->getClient());
  205. }
  206. // ******************************************************************** //
  207. // ---- INTEGRATION TESTS --------------------------------------------- //
  208. // ******************************************************************** //
  209. /**
  210. * @group connected
  211. */
  212. public function testPubSubAgainstRedisServer()
  213. {
  214. $parameters = array(
  215. 'host' => REDIS_SERVER_HOST,
  216. 'port' => REDIS_SERVER_PORT,
  217. 'database' => REDIS_SERVER_DBNUM,
  218. // Prevents suite from handing on broken test
  219. 'read_write_timeout' => 2,
  220. );
  221. $options = array('profile' => REDIS_SERVER_VERSION);
  222. $messages = array();
  223. $producer = new Client($parameters, $options);
  224. $producer->connect();
  225. $consumer = new Client($parameters, $options);
  226. $consumer->connect();
  227. $pubsub = new PubSubConsumer($consumer);
  228. $pubsub->subscribe('channel:foo');
  229. $producer->publish('channel:foo', 'message1');
  230. $producer->publish('channel:foo', 'message2');
  231. $producer->publish('channel:foo', 'QUIT');
  232. foreach ($pubsub as $message) {
  233. if ($message->kind !== 'message') {
  234. continue;
  235. }
  236. $messages[] = ($payload = $message->payload);
  237. if ($payload === 'QUIT') {
  238. $pubsub->stop();
  239. }
  240. }
  241. $this->assertSame(array('message1', 'message2', 'QUIT'), $messages);
  242. $this->assertFalse($pubsub->valid());
  243. $this->assertEquals('PONG', $consumer->ping());
  244. }
  245. }