PubSubContextTest.php 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340
  1. <?php
  2. /*
  3. * This file is part of the Predis package.
  4. *
  5. * (c) Daniele Alessandri <suppakilla@gmail.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Predis\PubSub;
  11. use PredisTestCase;
  12. use Predis\Client;
  13. use Predis\Profile\ServerProfile;
  14. /**
  15. * @group realm-pubsub
  16. */
  17. class PubSubContextTest extends PredisTestCase
  18. {
  19. /**
  20. * @group disconnected
  21. * @expectedException Predis\NotSupportedException
  22. * @expectedExceptionMessage The current profile does not support PUB/SUB related commands
  23. */
  24. public function testPubSubContextRequirePubSubRelatedCommand()
  25. {
  26. $profile = $this->getMock('Predis\Profile\ServerProfileInterface');
  27. $profile->expects($this->any())
  28. ->method('supportsCommands')
  29. ->will($this->returnValue(false));
  30. $client = new Client(null, array('profile' => $profile));
  31. $pubsub = new PubSubContext($client);
  32. }
  33. /**
  34. * @group disconnected
  35. * @expectedException Predis\NotSupportedException
  36. * @expectedExceptionMessage Cannot initialize a PUB/SUB context when using aggregated connections
  37. */
  38. public function testPubSubContextDoesNotWorkOnClusters()
  39. {
  40. $cluster = $this->getMock('Predis\Connection\ClusterConnectionInterface');
  41. $client = new Client($cluster);
  42. $pubsub = new PubSubContext($client);
  43. }
  44. /**
  45. * @group disconnected
  46. */
  47. public function testConstructorWithoutSubscriptionsDoesNotOpenContext()
  48. {
  49. $connection = $this->getMock('Predis\Connection\SingleConnectionInterface');
  50. $client = $this->getMock('Predis\Client', array('executeCommand'), array($connection));
  51. $client->expects($this->never())->method('executeCommand');
  52. $pubsub = new PubSubContext($client);
  53. }
  54. /**
  55. * @group disconnected
  56. */
  57. public function testConstructorWithSubscriptionsOpensContext()
  58. {
  59. $profile = ServerProfile::get(REDIS_SERVER_VERSION);
  60. $cmdSubscribe = $profile->createCommand('subscribe', array('channel:foo'));
  61. $cmdPsubscribe = $profile->createCommand('psubscribe', array('channels:*'));
  62. $connection = $this->getMock('Predis\Connection\SingleConnectionInterface');
  63. $connection->expects($this->exactly(2))->method('writeCommand');
  64. $client = $this->getMock('Predis\Client', array('createCommand', 'writeCommand'), array($connection));
  65. $client->expects($this->exactly(2))
  66. ->method('createCommand')
  67. ->with($this->logicalOr($this->equalTo('subscribe'), $this->equalTo('psubscribe')))
  68. ->will($this->returnCallback(function ($id, $args) use ($profile) {
  69. return $profile->createCommand($id, $args);
  70. }));
  71. $options = array('subscribe' => 'channel:foo', 'psubscribe' => 'channels:*');
  72. $pubsub = new PubSubContext($client, $options);
  73. }
  74. /**
  75. * @group disconnected
  76. */
  77. public function testClosingContextWithTrueClosesConnection()
  78. {
  79. $connection = $this->getMock('Predis\Connection\SingleConnectionInterface');
  80. $client = $this->getMock('Predis\Client', array('disconnect'), array($connection));
  81. $client->expects($this->exactly(1))->method('disconnect');
  82. $pubsub = new PubSubContext($client, array('subscribe' => 'channel:foo'));
  83. $connection->expects($this->never())->method('writeCommand');
  84. $pubsub->closeContext(true);
  85. }
  86. /**
  87. * @group disconnected
  88. */
  89. public function testClosingContextWithFalseSendsUnsubscriptions()
  90. {
  91. $profile = ServerProfile::get(REDIS_SERVER_VERSION);
  92. $classUnsubscribe = $profile->getCommandClass('unsubscribe');
  93. $classPunsubscribe = $profile->getCommandClass('punsubscribe');
  94. $connection = $this->getMock('Predis\Connection\SingleConnectionInterface');
  95. $client = $this->getMock('Predis\Client', array('disconnect'), array($connection));
  96. $options = array('subscribe' => 'channel:foo', 'psubscribe' => 'channels:*');
  97. $pubsub = new PubSubContext($client, $options);
  98. $connection->expects($this->exactly(2))
  99. ->method('writeCommand')
  100. ->with($this->logicalOr(
  101. $this->isInstanceOf($classUnsubscribe),
  102. $this->isInstanceOf($classPunsubscribe)
  103. ));
  104. $pubsub->closeContext(false);
  105. }
  106. /**
  107. * @group disconnected
  108. */
  109. public function testIsNotValidWhenNotSubscribed()
  110. {
  111. $connection = $this->getMock('Predis\Connection\SingleConnectionInterface');
  112. $client = $this->getMock('Predis\Client', array('disconnect'), array($connection));
  113. $pubsub = new PubSubContext($client);
  114. $this->assertFalse($pubsub->valid());
  115. $this->assertNull($pubsub->next());
  116. }
  117. public function testHandlesPongMessages()
  118. {
  119. $rawmessage = array('pong', '');
  120. $connection = $this->getMock('Predis\Connection\SingleConnectionInterface');
  121. $connection->expects($this->once())->method('read')->will($this->returnValue($rawmessage));
  122. $client = new Client($connection);
  123. $pubsub = new PubSubContext($client, array('subscribe' => 'channel:foo'));
  124. $message = $pubsub->current();
  125. $this->assertSame('pong', $message->kind);
  126. $this->assertSame('', $message->payload);
  127. }
  128. /**
  129. * @group disconnected
  130. */
  131. public function testHandlesPongMessagesWithPayload()
  132. {
  133. $rawmessage = array('pong', 'foobar');
  134. $connection = $this->getMock('Predis\Connection\SingleConnectionInterface');
  135. $connection->expects($this->once())->method('read')->will($this->returnValue($rawmessage));
  136. $client = new Client($connection);
  137. $pubsub = new PubSubContext($client, array('subscribe' => 'channel:foo'));
  138. $message = $pubsub->current();
  139. $this->assertSame('pong', $message->kind);
  140. $this->assertSame('foobar', $message->payload);
  141. }
  142. /**
  143. * @group disconnected
  144. */
  145. public function testReadsMessageFromConnection()
  146. {
  147. $rawmessage = array('message', 'channel:foo', 'message from channel');
  148. $connection = $this->getMock('Predis\Connection\SingleConnectionInterface');
  149. $connection->expects($this->once())->method('read')->will($this->returnValue($rawmessage));
  150. $client = new Client($connection);
  151. $pubsub = new PubSubContext($client, array('subscribe' => 'channel:foo'));
  152. $message = $pubsub->current();
  153. $this->assertSame('message', $message->kind);
  154. $this->assertSame('channel:foo', $message->channel);
  155. $this->assertSame('message from channel', $message->payload);
  156. }
  157. /**
  158. * @group disconnected
  159. */
  160. public function testReadsPmessageFromConnection()
  161. {
  162. $rawmessage = array('pmessage', 'channel:*', 'channel:foo', 'message from channel');
  163. $connection = $this->getMock('Predis\Connection\SingleConnectionInterface');
  164. $connection->expects($this->once())->method('read')->will($this->returnValue($rawmessage));
  165. $client = new Client($connection);
  166. $pubsub = new PubSubContext($client, array('psubscribe' => 'channel:*'));
  167. $message = $pubsub->current();
  168. $this->assertSame('pmessage', $message->kind);
  169. $this->assertSame('channel:*', $message->pattern);
  170. $this->assertSame('channel:foo', $message->channel);
  171. $this->assertSame('message from channel', $message->payload);
  172. }
  173. /**
  174. * @group disconnected
  175. */
  176. public function testReadsSubscriptionMessageFromConnection()
  177. {
  178. $rawmessage = array('subscribe', 'channel:foo', 1);
  179. $connection = $this->getMock('Predis\Connection\SingleConnectionInterface');
  180. $connection->expects($this->once())->method('read')->will($this->returnValue($rawmessage));
  181. $client = new Client($connection);
  182. $pubsub = new PubSubContext($client, array('subscribe' => 'channel:foo'));
  183. $message = $pubsub->current();
  184. $this->assertSame('subscribe', $message->kind);
  185. $this->assertSame('channel:foo', $message->channel);
  186. $this->assertSame(1, $message->payload);
  187. }
  188. /**
  189. * @group disconnected
  190. */
  191. public function testReadsUnsubscriptionMessageFromConnection()
  192. {
  193. $rawmessage = array('unsubscribe', 'channel:foo', 1);
  194. $connection = $this->getMock('Predis\Connection\SingleConnectionInterface');
  195. $connection->expects($this->once())->method('read')->will($this->returnValue($rawmessage));
  196. $client = new Client($connection);
  197. $pubsub = new PubSubContext($client, array('subscribe' => 'channel:foo'));
  198. $message = $pubsub->current();
  199. $this->assertSame('unsubscribe', $message->kind);
  200. $this->assertSame('channel:foo', $message->channel);
  201. $this->assertSame(1, $message->payload);
  202. }
  203. /**
  204. * @group disconnected
  205. */
  206. public function testUnsubscriptionMessageWithZeroChannelCountInvalidatesContext()
  207. {
  208. $rawmessage = array('unsubscribe', 'channel:foo', 0);
  209. $connection = $this->getMock('Predis\Connection\SingleConnectionInterface');
  210. $connection->expects($this->once())->method('read')->will($this->returnValue($rawmessage));
  211. $client = new Client($connection);
  212. $pubsub = new PubSubContext($client, array('subscribe' => 'channel:foo'));
  213. $this->assertTrue($pubsub->valid());
  214. $message = $pubsub->current();
  215. $this->assertSame('unsubscribe', $message->kind);
  216. $this->assertSame('channel:foo', $message->channel);
  217. $this->assertSame(0, $message->payload);
  218. $this->assertFalse($pubsub->valid());
  219. }
  220. /**
  221. * @group disconnected
  222. */
  223. public function testGetUnderlyingClientInstance()
  224. {
  225. $connection = $this->getMock('Predis\Connection\SingleConnectionInterface');
  226. $client = new Client($connection);
  227. $pubsub = new PubSubContext($client);
  228. $this->assertSame($client, $pubsub->getClient());
  229. }
  230. // ******************************************************************** //
  231. // ---- INTEGRATION TESTS --------------------------------------------- //
  232. // ******************************************************************** //
  233. /**
  234. * @group connected
  235. */
  236. public function testPubSubAgainstRedisServer()
  237. {
  238. $parameters = array(
  239. 'host' => REDIS_SERVER_HOST,
  240. 'port' => REDIS_SERVER_PORT,
  241. 'database' => REDIS_SERVER_DBNUM,
  242. // Prevents suite from handing on broken test
  243. 'read_write_timeout' => 2,
  244. );
  245. $options = array('profile' => REDIS_SERVER_VERSION);
  246. $messages = array();
  247. $producer = new Client($parameters, $options);
  248. $producer->connect();
  249. $consumer = new Client($parameters, $options);
  250. $consumer->connect();
  251. $pubsub = new PubSubContext($consumer);
  252. $pubsub->subscribe('channel:foo');
  253. $producer->publish('channel:foo', 'message1');
  254. $producer->publish('channel:foo', 'message2');
  255. $producer->publish('channel:foo', 'QUIT');
  256. foreach ($pubsub as $message) {
  257. if ($message->kind !== 'message') {
  258. continue;
  259. }
  260. $messages[] = ($payload = $message->payload);
  261. if ($payload === 'QUIT') {
  262. $pubsub->closeContext();
  263. }
  264. }
  265. $this->assertSame(array('message1', 'message2', 'QUIT'), $messages);
  266. $this->assertFalse($pubsub->valid());
  267. $this->assertEquals('ECHO', $consumer->echo('ECHO'));
  268. }
  269. }