ConsumerTest.php 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409
  1. <?php
  2. /*
  3. * This file is part of the Predis package.
  4. *
  5. * (c) Daniele Alessandri <suppakilla@gmail.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Predis\PubSub;
  11. use Predis\Client;
  12. use Predis\PubSub\Consumer as PubSubConsumer;
  13. use PredisTestCase;
  14. /**
  15. * @group realm-pubsub
  16. */
  17. class ConsumerTest extends PredisTestCase
  18. {
  19. /**
  20. * @group disconnected
  21. * @expectedException \Predis\NotSupportedException
  22. * @expectedExceptionMessage PUB/SUB commands are not supported by the current command factory.
  23. */
  24. public function testPubSubConsumerRequirePubSubRelatedCommand()
  25. {
  26. $commands = $this->getMock('Predis\Command\FactoryInterface');
  27. $commands->expects($this->any())
  28. ->method('supportsCommands')
  29. ->will($this->returnValue(false));
  30. $client = new Client(null, array('commands' => $commands));
  31. new PubSubConsumer($client);
  32. }
  33. /**
  34. * @group disconnected
  35. * @expectedException \Predis\NotSupportedException
  36. * @expectedExceptionMessage Cannot initialize a PUB/SUB consumer over aggregate connections.
  37. */
  38. public function testPubSubConsumerDoesNotWorkOnClusters()
  39. {
  40. $cluster = $this->getMock('Predis\Connection\Cluster\ClusterInterface');
  41. $client = new Client($cluster);
  42. new PubSubConsumer($client);
  43. }
  44. /**
  45. * @group disconnected
  46. */
  47. public function testConstructorWithoutSubscriptionsDoesNotStartConsumer()
  48. {
  49. $connection = $this->getMock('Predis\Connection\NodeConnectionInterface');
  50. $client = $this->getMock('Predis\Client', array('executeCommand'), array($connection));
  51. $client->expects($this->never())->method('executeCommand');
  52. new PubSubConsumer($client);
  53. }
  54. /**
  55. * @group disconnected
  56. */
  57. public function testConstructorWithSubscriptionsStartsConsumer()
  58. {
  59. $commands = $this->getCommandFactory();
  60. $connection = $this->getMock('Predis\Connection\NodeConnectionInterface');
  61. $connection->expects($this->exactly(2))->method('writeRequest');
  62. $client = $this->getMock('Predis\Client', array('createCommand', 'writeRequest'), array($connection));
  63. $client->expects($this->exactly(2))
  64. ->method('createCommand')
  65. ->with($this->logicalOr($this->equalTo('subscribe'), $this->equalTo('psubscribe')))
  66. ->will($this->returnCallback(function ($id, $args) use ($commands) {
  67. return $commands->createCommand($id, $args);
  68. }));
  69. $options = array('subscribe' => 'channel:foo', 'psubscribe' => 'channels:*');
  70. new PubSubConsumer($client, $options);
  71. }
  72. /**
  73. * @group disconnected
  74. */
  75. public function testStoppingConsumerWithTrueClosesConnection()
  76. {
  77. $connection = $this->getMock('Predis\Connection\NodeConnectionInterface');
  78. $client = $this->getMock('Predis\Client', array('disconnect'), array($connection));
  79. $client->expects($this->exactly(1))->method('disconnect');
  80. $pubsub = new PubSubConsumer($client, array('subscribe' => 'channel:foo'));
  81. $connection->expects($this->never())->method('writeRequest');
  82. $pubsub->stop(true);
  83. }
  84. /**
  85. * @group disconnected
  86. */
  87. public function testStoppingConsumerWithFalseSendsUnsubscriptions()
  88. {
  89. $commands = $this->getCommandFactory();
  90. $classUnsubscribe = $commands->getCommandClass('unsubscribe');
  91. $classPunsubscribe = $commands->getCommandClass('punsubscribe');
  92. $connection = $this->getMock('Predis\Connection\NodeConnectionInterface');
  93. $client = $this->getMock('Predis\Client', array('disconnect'), array($connection));
  94. $options = array('subscribe' => 'channel:foo', 'psubscribe' => 'channels:*');
  95. $pubsub = new PubSubConsumer($client, $options);
  96. $connection->expects($this->exactly(2))
  97. ->method('writeRequest')
  98. ->with($this->logicalOr(
  99. $this->isInstanceOf($classUnsubscribe),
  100. $this->isInstanceOf($classPunsubscribe)
  101. ));
  102. $pubsub->stop(false);
  103. }
  104. /**
  105. * @group disconnected
  106. */
  107. public function testIsNotValidWhenNotSubscribed()
  108. {
  109. $connection = $this->getMock('Predis\Connection\NodeConnectionInterface');
  110. $client = $this->getMock('Predis\Client', array('disconnect'), array($connection));
  111. $pubsub = new PubSubConsumer($client);
  112. $this->assertFalse($pubsub->valid());
  113. $this->assertNull($pubsub->next());
  114. }
  115. /**
  116. * @group disconnected
  117. */
  118. public function testHandlesPongMessages()
  119. {
  120. $rawmessage = array('pong', '');
  121. $connection = $this->getMock('Predis\Connection\NodeConnectionInterface');
  122. $connection->expects($this->once())->method('read')->will($this->returnValue($rawmessage));
  123. $client = new Client($connection);
  124. $pubsub = new PubSubConsumer($client, array('subscribe' => 'channel:foo'));
  125. $message = $pubsub->current();
  126. $this->assertSame('pong', $message->kind);
  127. $this->assertSame('', $message->payload);
  128. }
  129. /**
  130. * @group disconnected
  131. */
  132. public function testHandlesPongMessagesWithPayload()
  133. {
  134. $rawmessage = array('pong', 'foobar');
  135. $connection = $this->getMock('Predis\Connection\NodeConnectionInterface');
  136. $connection->expects($this->once())->method('read')->will($this->returnValue($rawmessage));
  137. $client = new Client($connection);
  138. $pubsub = new PubSubConsumer($client, array('subscribe' => 'channel:foo'));
  139. $message = $pubsub->current();
  140. $this->assertSame('pong', $message->kind);
  141. $this->assertSame('foobar', $message->payload);
  142. }
  143. /**
  144. * @group disconnected
  145. */
  146. public function testReadsMessageFromConnection()
  147. {
  148. $rawmessage = array('message', 'channel:foo', 'message from channel');
  149. $connection = $this->getMock('Predis\Connection\NodeConnectionInterface');
  150. $connection->expects($this->once())->method('read')->will($this->returnValue($rawmessage));
  151. $client = new Client($connection);
  152. $pubsub = new PubSubConsumer($client, array('subscribe' => 'channel:foo'));
  153. $message = $pubsub->current();
  154. $this->assertSame('message', $message->kind);
  155. $this->assertSame('channel:foo', $message->channel);
  156. $this->assertSame('message from channel', $message->payload);
  157. }
  158. /**
  159. * @group disconnected
  160. */
  161. public function testReadsPmessageFromConnection()
  162. {
  163. $rawmessage = array('pmessage', 'channel:*', 'channel:foo', 'message from channel');
  164. $connection = $this->getMock('Predis\Connection\NodeConnectionInterface');
  165. $connection->expects($this->once())->method('read')->will($this->returnValue($rawmessage));
  166. $client = new Client($connection);
  167. $pubsub = new PubSubConsumer($client, array('psubscribe' => 'channel:*'));
  168. $message = $pubsub->current();
  169. $this->assertSame('pmessage', $message->kind);
  170. $this->assertSame('channel:*', $message->pattern);
  171. $this->assertSame('channel:foo', $message->channel);
  172. $this->assertSame('message from channel', $message->payload);
  173. }
  174. /**
  175. * @group disconnected
  176. */
  177. public function testReadsSubscriptionMessageFromConnection()
  178. {
  179. $rawmessage = array('subscribe', 'channel:foo', 1);
  180. $connection = $this->getMock('Predis\Connection\NodeConnectionInterface');
  181. $connection->expects($this->once())->method('read')->will($this->returnValue($rawmessage));
  182. $client = new Client($connection);
  183. $pubsub = new PubSubConsumer($client, array('subscribe' => 'channel:foo'));
  184. $message = $pubsub->current();
  185. $this->assertSame('subscribe', $message->kind);
  186. $this->assertSame('channel:foo', $message->channel);
  187. $this->assertSame(1, $message->payload);
  188. }
  189. /**
  190. * @group disconnected
  191. */
  192. public function testReadsUnsubscriptionMessageFromConnection()
  193. {
  194. $rawmessage = array('unsubscribe', 'channel:foo', 1);
  195. $connection = $this->getMock('Predis\Connection\NodeConnectionInterface');
  196. $connection->expects($this->once())->method('read')->will($this->returnValue($rawmessage));
  197. $client = new Client($connection);
  198. $pubsub = new PubSubConsumer($client, array('subscribe' => 'channel:foo'));
  199. $message = $pubsub->current();
  200. $this->assertSame('unsubscribe', $message->kind);
  201. $this->assertSame('channel:foo', $message->channel);
  202. $this->assertSame(1, $message->payload);
  203. }
  204. /**
  205. * @group disconnected
  206. */
  207. public function testUnsubscriptionMessageWithZeroChannelCountInvalidatesConsumer()
  208. {
  209. $rawmessage = array('unsubscribe', 'channel:foo', 0);
  210. $connection = $this->getMock('Predis\Connection\NodeConnectionInterface');
  211. $connection->expects($this->once())->method('read')->will($this->returnValue($rawmessage));
  212. $client = new Client($connection);
  213. $pubsub = new PubSubConsumer($client, array('subscribe' => 'channel:foo'));
  214. $this->assertTrue($pubsub->valid());
  215. $message = $pubsub->current();
  216. $this->assertSame('unsubscribe', $message->kind);
  217. $this->assertSame('channel:foo', $message->channel);
  218. $this->assertSame(0, $message->payload);
  219. $this->assertFalse($pubsub->valid());
  220. }
  221. /**
  222. * @group disconnected
  223. */
  224. public function testGetUnderlyingClientInstance()
  225. {
  226. $connection = $this->getMock('Predis\Connection\NodeConnectionInterface');
  227. $client = new Client($connection);
  228. $pubsub = new PubSubConsumer($client);
  229. $this->assertSame($client, $pubsub->getClient());
  230. }
  231. // ******************************************************************** //
  232. // ---- INTEGRATION TESTS --------------------------------------------- //
  233. // ******************************************************************** //
  234. /**
  235. * @group connected
  236. * @requiresRedisVersion >= 2.0.0
  237. */
  238. public function testPubSubAgainstRedisServer()
  239. {
  240. $parameters = array(
  241. 'host' => REDIS_SERVER_HOST,
  242. 'port' => REDIS_SERVER_PORT,
  243. 'database' => REDIS_SERVER_DBNUM,
  244. // Prevents suite from handing on broken test
  245. 'read_write_timeout' => 2,
  246. );
  247. $messages = array();
  248. $producer = new Client($parameters);
  249. $producer->connect();
  250. $consumer = new Client($parameters);
  251. $consumer->connect();
  252. $pubsub = new PubSubConsumer($consumer);
  253. $pubsub->subscribe('channel:foo');
  254. $producer->publish('channel:foo', 'message1');
  255. $producer->publish('channel:foo', 'message2');
  256. $producer->publish('channel:foo', 'QUIT');
  257. foreach ($pubsub as $message) {
  258. if ($message->kind !== 'message') {
  259. continue;
  260. }
  261. $messages[] = ($payload = $message->payload);
  262. if ($payload === 'QUIT') {
  263. $pubsub->stop();
  264. }
  265. }
  266. $this->assertSame(array('message1', 'message2', 'QUIT'), $messages);
  267. $this->assertFalse($pubsub->valid());
  268. $this->assertEquals('ECHO', $consumer->echo('ECHO'));
  269. }
  270. /**
  271. * @group connected
  272. * @requiresRedisVersion >= 2.0.0
  273. * @requires extension pcntl
  274. */
  275. public function testPubSubAgainstRedisServerBlocking()
  276. {
  277. $parameters = array(
  278. 'host' => REDIS_SERVER_HOST,
  279. 'port' => REDIS_SERVER_PORT,
  280. 'database' => REDIS_SERVER_DBNUM,
  281. 'read_write_timeout' => -1, // -1 to set blocking reads
  282. );
  283. // create consumer before forking so the child can disconnect it
  284. $consumer = new Client($parameters);
  285. $consumer->connect();
  286. /*
  287. * fork
  288. * parent: consumer
  289. * child: producer
  290. */
  291. if ($childPID = pcntl_fork()) {
  292. $messages = array();
  293. $pubsub = new PubSubConsumer($consumer);
  294. $pubsub->subscribe('channel:foo');
  295. foreach ($pubsub as $message) {
  296. if ($message->kind !== 'message') {
  297. continue;
  298. }
  299. $messages[] = ($payload = $message->payload);
  300. if ($payload === 'QUIT') {
  301. $pubsub->stop();
  302. }
  303. }
  304. $this->assertSame(array('message1', 'message2', 'QUIT'), $messages);
  305. $this->assertFalse($pubsub->valid());
  306. $this->assertEquals('ECHO', $consumer->echo('ECHO'));
  307. // kill the child
  308. posix_kill($childPID, SIGKILL);
  309. } else {
  310. // create producer, read_write_timeout = 2 because it doesn't do blocking reads anyway
  311. $producer = new Client(array_replace($parameters, array('read_write_timeout' => 2)));
  312. $producer->connect();
  313. $producer->publish('channel:foo', 'message1');
  314. $producer->publish('channel:foo', 'message2');
  315. $producer->publish('channel:foo', 'QUIT');
  316. // sleep, giving the consumer a chance to respond to the QUIT message
  317. sleep(1);
  318. // disconnect the consumer because otherwise it could remain stuck in blocking read
  319. // if it failed to respond to the QUIT message
  320. $consumer->disconnect();
  321. // exit child
  322. exit(0);
  323. }
  324. }
  325. }