ConsumerTest.php 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435
  1. <?php
  2. /*
  3. * This file is part of the Predis package.
  4. *
  5. * (c) Daniele Alessandri <suppakilla@gmail.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Predis\PubSub;
  11. use Predis\Client;
  12. use Predis\PubSub\Consumer as PubSubConsumer;
  13. use PredisTestCase;
  14. /**
  15. * @group realm-pubsub
  16. */
  17. class ConsumerTest extends PredisTestCase
  18. {
  19. /**
  20. * @group disconnected
  21. * @expectedException \Predis\NotSupportedException
  22. * @expectedExceptionMessage PUB/SUB commands are not supported by the current command factory.
  23. */
  24. public function testPubSubConsumerRequirePubSubRelatedCommand()
  25. {
  26. $commands = $this->getMock('Predis\Command\FactoryInterface');
  27. $commands
  28. ->expects($this->any())
  29. ->method('supportsCommands')
  30. ->will($this->returnValue(false));
  31. $client = new Client(null, array('commands' => $commands));
  32. new PubSubConsumer($client);
  33. }
  34. /**
  35. * @group disconnected
  36. * @expectedException \Predis\NotSupportedException
  37. * @expectedExceptionMessage Cannot initialize a PUB/SUB consumer over aggregate connections.
  38. */
  39. public function testPubSubConsumerDoesNotWorkOnClusters()
  40. {
  41. $cluster = $this->getMock('Predis\Connection\Cluster\ClusterInterface');
  42. $client = new Client($cluster);
  43. new PubSubConsumer($client);
  44. }
  45. /**
  46. * @group disconnected
  47. */
  48. public function testConstructorWithoutSubscriptionsDoesNotStartConsumer()
  49. {
  50. $connection = $this->getMock('Predis\Connection\NodeConnectionInterface');
  51. $client = $this->getMock('Predis\Client', array('executeCommand'), array($connection));
  52. $client->expects($this->never())->method('executeCommand');
  53. new PubSubConsumer($client);
  54. }
  55. /**
  56. * @group disconnected
  57. */
  58. public function testConstructorWithSubscriptionsStartsConsumer()
  59. {
  60. $commands = $this->getCommandFactory();
  61. $connection = $this->getMock('Predis\Connection\NodeConnectionInterface');
  62. $connection->expects($this->exactly(2))->method('writeRequest');
  63. $client = $this->getMock('Predis\Client', array('createCommand', 'writeRequest'), array($connection));
  64. $client
  65. ->expects($this->exactly(2))
  66. ->method('createCommand')
  67. ->with($this->logicalOr($this->equalTo('subscribe'), $this->equalTo('psubscribe')))
  68. ->will($this->returnCallback(function ($id, $args) use ($commands) {
  69. return $commands->createCommand($id, $args);
  70. }));
  71. $options = array('subscribe' => 'channel:foo', 'psubscribe' => 'channels:*');
  72. new PubSubConsumer($client, $options);
  73. }
  74. /**
  75. * @group disconnected
  76. */
  77. public function testStoppingConsumerWithTrueClosesConnection()
  78. {
  79. $connection = $this->getMock('Predis\Connection\NodeConnectionInterface');
  80. $client = $this->getMock('Predis\Client', array('disconnect'), array($connection));
  81. $client
  82. ->expects($this->once())
  83. ->method('disconnect');
  84. $pubsub = new PubSubConsumer($client, array('subscribe' => 'channel:foo'));
  85. $connection->expects($this->never())->method('writeRequest');
  86. $pubsub->stop(true);
  87. }
  88. /**
  89. * @group disconnected
  90. */
  91. public function testStoppingConsumerWithFalseSendsUnsubscriptions()
  92. {
  93. $commands = $this->getCommandFactory();
  94. $classUnsubscribe = $commands->getCommandClass('unsubscribe');
  95. $classPunsubscribe = $commands->getCommandClass('punsubscribe');
  96. $connection = $this->getMock('Predis\Connection\NodeConnectionInterface');
  97. $client = $this->getMock('Predis\Client', array('disconnect'), array($connection));
  98. $options = array('subscribe' => 'channel:foo', 'psubscribe' => 'channels:*');
  99. $pubsub = new PubSubConsumer($client, $options);
  100. $connection
  101. ->expects($this->exactly(2))
  102. ->method('writeRequest')
  103. ->with($this->logicalOr(
  104. $this->isInstanceOf($classUnsubscribe),
  105. $this->isInstanceOf($classPunsubscribe)
  106. ));
  107. $pubsub->stop(false);
  108. }
  109. /**
  110. * @group disconnected
  111. */
  112. public function testIsNotValidWhenNotSubscribed()
  113. {
  114. $connection = $this->getMock('Predis\Connection\NodeConnectionInterface');
  115. $client = $this->getMock('Predis\Client', array('disconnect'), array($connection));
  116. $pubsub = new PubSubConsumer($client);
  117. $this->assertFalse($pubsub->valid());
  118. $this->assertNull($pubsub->next());
  119. }
  120. /**
  121. * @group disconnected
  122. */
  123. public function testHandlesPongMessages()
  124. {
  125. $rawmessage = array('pong', '');
  126. $connection = $this->getMock('Predis\Connection\NodeConnectionInterface');
  127. $connection
  128. ->expects($this->once())
  129. ->method('read')
  130. ->will($this->returnValue($rawmessage));
  131. $client = new Client($connection);
  132. $pubsub = new PubSubConsumer($client, array('subscribe' => 'channel:foo'));
  133. $message = $pubsub->current();
  134. $this->assertSame('pong', $message->kind);
  135. $this->assertSame('', $message->payload);
  136. }
  137. /**
  138. * @group disconnected
  139. */
  140. public function testHandlesPongMessagesWithPayload()
  141. {
  142. $rawmessage = array('pong', 'foobar');
  143. $connection = $this->getMock('Predis\Connection\NodeConnectionInterface');
  144. $connection
  145. ->expects($this->once())
  146. ->method('read')
  147. ->will($this->returnValue($rawmessage));
  148. $client = new Client($connection);
  149. $pubsub = new PubSubConsumer($client, array('subscribe' => 'channel:foo'));
  150. $message = $pubsub->current();
  151. $this->assertSame('pong', $message->kind);
  152. $this->assertSame('foobar', $message->payload);
  153. }
  154. /**
  155. * @group disconnected
  156. */
  157. public function testReadsMessageFromConnection()
  158. {
  159. $rawmessage = array('message', 'channel:foo', 'message from channel');
  160. $connection = $this->getMock('Predis\Connection\NodeConnectionInterface');
  161. $connection
  162. ->expects($this->once())
  163. ->method('read')
  164. ->will($this->returnValue($rawmessage));
  165. $client = new Client($connection);
  166. $pubsub = new PubSubConsumer($client, array('subscribe' => 'channel:foo'));
  167. $message = $pubsub->current();
  168. $this->assertSame('message', $message->kind);
  169. $this->assertSame('channel:foo', $message->channel);
  170. $this->assertSame('message from channel', $message->payload);
  171. }
  172. /**
  173. * @group disconnected
  174. */
  175. public function testReadsPmessageFromConnection()
  176. {
  177. $rawmessage = array('pmessage', 'channel:*', 'channel:foo', 'message from channel');
  178. $connection = $this->getMock('Predis\Connection\NodeConnectionInterface');
  179. $connection
  180. ->expects($this->once())
  181. ->method('read')
  182. ->will($this->returnValue($rawmessage));
  183. $client = new Client($connection);
  184. $pubsub = new PubSubConsumer($client, array('psubscribe' => 'channel:*'));
  185. $message = $pubsub->current();
  186. $this->assertSame('pmessage', $message->kind);
  187. $this->assertSame('channel:*', $message->pattern);
  188. $this->assertSame('channel:foo', $message->channel);
  189. $this->assertSame('message from channel', $message->payload);
  190. }
  191. /**
  192. * @group disconnected
  193. */
  194. public function testReadsSubscriptionMessageFromConnection()
  195. {
  196. $rawmessage = array('subscribe', 'channel:foo', 1);
  197. $connection = $this->getMock('Predis\Connection\NodeConnectionInterface');
  198. $connection
  199. ->expects($this->once())
  200. ->method('read')
  201. ->will($this->returnValue($rawmessage));
  202. $client = new Client($connection);
  203. $pubsub = new PubSubConsumer($client, array('subscribe' => 'channel:foo'));
  204. $message = $pubsub->current();
  205. $this->assertSame('subscribe', $message->kind);
  206. $this->assertSame('channel:foo', $message->channel);
  207. $this->assertSame(1, $message->payload);
  208. }
  209. /**
  210. * @group disconnected
  211. */
  212. public function testReadsUnsubscriptionMessageFromConnection()
  213. {
  214. $rawmessage = array('unsubscribe', 'channel:foo', 1);
  215. $connection = $this->getMock('Predis\Connection\NodeConnectionInterface');
  216. $connection
  217. ->expects($this->once())
  218. ->method('read')
  219. ->will($this->returnValue($rawmessage));
  220. $client = new Client($connection);
  221. $pubsub = new PubSubConsumer($client, array('subscribe' => 'channel:foo'));
  222. $message = $pubsub->current();
  223. $this->assertSame('unsubscribe', $message->kind);
  224. $this->assertSame('channel:foo', $message->channel);
  225. $this->assertSame(1, $message->payload);
  226. }
  227. /**
  228. * @group disconnected
  229. */
  230. public function testUnsubscriptionMessageWithZeroChannelCountInvalidatesConsumer()
  231. {
  232. $rawmessage = array('unsubscribe', 'channel:foo', 0);
  233. $connection = $this->getMock('Predis\Connection\NodeConnectionInterface');
  234. $connection
  235. ->expects($this->once())
  236. ->method('read')
  237. ->will($this->returnValue($rawmessage));
  238. $client = new Client($connection);
  239. $pubsub = new PubSubConsumer($client, array('subscribe' => 'channel:foo'));
  240. $this->assertTrue($pubsub->valid());
  241. $message = $pubsub->current();
  242. $this->assertSame('unsubscribe', $message->kind);
  243. $this->assertSame('channel:foo', $message->channel);
  244. $this->assertSame(0, $message->payload);
  245. $this->assertFalse($pubsub->valid());
  246. }
  247. /**
  248. * @group disconnected
  249. */
  250. public function testGetUnderlyingClientInstance()
  251. {
  252. $connection = $this->getMock('Predis\Connection\NodeConnectionInterface');
  253. $client = new Client($connection);
  254. $pubsub = new PubSubConsumer($client);
  255. $this->assertSame($client, $pubsub->getClient());
  256. }
  257. // ******************************************************************** //
  258. // ---- INTEGRATION TESTS --------------------------------------------- //
  259. // ******************************************************************** //
  260. /**
  261. * @group connected
  262. * @requiresRedisVersion >= 2.0.0
  263. */
  264. public function testPubSubAgainstRedisServer()
  265. {
  266. $parameters = array(
  267. 'host' => REDIS_SERVER_HOST,
  268. 'port' => REDIS_SERVER_PORT,
  269. 'database' => REDIS_SERVER_DBNUM,
  270. // Prevents suite from handing on broken test
  271. 'read_write_timeout' => 2,
  272. );
  273. $messages = array();
  274. $producer = new Client($parameters);
  275. $producer->connect();
  276. $consumer = new Client($parameters);
  277. $consumer->connect();
  278. $pubsub = new PubSubConsumer($consumer);
  279. $pubsub->subscribe('channel:foo');
  280. $producer->publish('channel:foo', 'message1');
  281. $producer->publish('channel:foo', 'message2');
  282. $producer->publish('channel:foo', 'QUIT');
  283. foreach ($pubsub as $message) {
  284. if ($message->kind !== 'message') {
  285. continue;
  286. }
  287. $messages[] = ($payload = $message->payload);
  288. if ($payload === 'QUIT') {
  289. $pubsub->stop();
  290. }
  291. }
  292. $this->assertSame(array('message1', 'message2', 'QUIT'), $messages);
  293. $this->assertFalse($pubsub->valid());
  294. $this->assertEquals('ECHO', $consumer->echo('ECHO'));
  295. }
  296. /**
  297. * @group connected
  298. * @requiresRedisVersion >= 2.0.0
  299. * @requires extension pcntl
  300. */
  301. public function testPubSubAgainstRedisServerBlocking()
  302. {
  303. $parameters = array(
  304. 'host' => REDIS_SERVER_HOST,
  305. 'port' => REDIS_SERVER_PORT,
  306. 'database' => REDIS_SERVER_DBNUM,
  307. 'read_write_timeout' => -1, // -1 to set blocking reads
  308. );
  309. // create consumer before forking so the child can disconnect it
  310. $consumer = new Client($parameters);
  311. $consumer->connect();
  312. /*
  313. * fork
  314. * parent: consumer
  315. * child: producer
  316. */
  317. if ($childPID = pcntl_fork()) {
  318. $messages = array();
  319. $pubsub = new PubSubConsumer($consumer);
  320. $pubsub->subscribe('channel:foo');
  321. foreach ($pubsub as $message) {
  322. if ($message->kind !== 'message') {
  323. continue;
  324. }
  325. $messages[] = ($payload = $message->payload);
  326. if ($payload === 'QUIT') {
  327. $pubsub->stop();
  328. }
  329. }
  330. $this->assertSame(array('message1', 'message2', 'QUIT'), $messages);
  331. $this->assertFalse($pubsub->valid());
  332. $this->assertEquals('ECHO', $consumer->echo('ECHO'));
  333. // kill the child
  334. posix_kill($childPID, SIGKILL);
  335. } else {
  336. // create producer, read_write_timeout = 2 because it doesn't do blocking reads anyway
  337. $producer = new Client(array_replace($parameters, array('read_write_timeout' => 2)));
  338. $producer->connect();
  339. $producer->publish('channel:foo', 'message1');
  340. $producer->publish('channel:foo', 'message2');
  341. $producer->publish('channel:foo', 'QUIT');
  342. // sleep, giving the consumer a chance to respond to the QUIT message
  343. sleep(1);
  344. // disconnect the consumer because otherwise it could remain stuck in blocking read
  345. // if it failed to respond to the QUIT message
  346. $consumer->disconnect();
  347. // exit child
  348. exit(0);
  349. }
  350. }
  351. }