ConsumerTest.php 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411
  1. <?php
  2. /*
  3. * This file is part of the Predis package.
  4. *
  5. * (c) Daniele Alessandri <suppakilla@gmail.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Predis\PubSub;
  11. use Predis\Client;
  12. use Predis\Profile;
  13. use Predis\PubSub\Consumer as PubSubConsumer;
  14. use PredisTestCase;
  15. /**
  16. * @group realm-pubsub
  17. */
  18. class ConsumerTest extends PredisTestCase
  19. {
  20. /**
  21. * @group disconnected
  22. * @expectedException \Predis\NotSupportedException
  23. * @expectedExceptionMessage The current profile does not support PUB/SUB related commands.
  24. */
  25. public function testPubSubConsumerRequirePubSubRelatedCommand()
  26. {
  27. $profile = $this->getMock('Predis\Profile\ProfileInterface');
  28. $profile->expects($this->any())
  29. ->method('supportsCommands')
  30. ->will($this->returnValue(false));
  31. $client = new Client(null, array('profile' => $profile));
  32. new PubSubConsumer($client);
  33. }
  34. /**
  35. * @group disconnected
  36. * @expectedException \Predis\NotSupportedException
  37. * @expectedExceptionMessage Cannot initialize a PUB/SUB consumer over aggregate connections.
  38. */
  39. public function testPubSubConsumerDoesNotWorkOnClusters()
  40. {
  41. $cluster = $this->getMock('Predis\Connection\Aggregate\ClusterInterface');
  42. $client = new Client($cluster);
  43. new PubSubConsumer($client);
  44. }
  45. /**
  46. * @group disconnected
  47. */
  48. public function testConstructorWithoutSubscriptionsDoesNotStartConsumer()
  49. {
  50. $connection = $this->getMock('Predis\Connection\NodeConnectionInterface');
  51. $client = $this->getMock('Predis\Client', array('executeCommand'), array($connection));
  52. $client->expects($this->never())->method('executeCommand');
  53. new PubSubConsumer($client);
  54. }
  55. /**
  56. * @group disconnected
  57. */
  58. public function testConstructorWithSubscriptionsStartsConsumer()
  59. {
  60. $profile = Profile\Factory::get(REDIS_SERVER_VERSION);
  61. $connection = $this->getMock('Predis\Connection\NodeConnectionInterface');
  62. $connection->expects($this->exactly(2))->method('writeRequest');
  63. $client = $this->getMock('Predis\Client', array('createCommand', 'writeRequest'), array($connection));
  64. $client->expects($this->exactly(2))
  65. ->method('createCommand')
  66. ->with($this->logicalOr($this->equalTo('subscribe'), $this->equalTo('psubscribe')))
  67. ->will($this->returnCallback(function ($id, $args) use ($profile) {
  68. return $profile->createCommand($id, $args);
  69. }));
  70. $options = array('subscribe' => 'channel:foo', 'psubscribe' => 'channels:*');
  71. new PubSubConsumer($client, $options);
  72. }
  73. /**
  74. * @group disconnected
  75. */
  76. public function testStoppingConsumerWithTrueClosesConnection()
  77. {
  78. $connection = $this->getMock('Predis\Connection\NodeConnectionInterface');
  79. $client = $this->getMock('Predis\Client', array('disconnect'), array($connection));
  80. $client->expects($this->exactly(1))->method('disconnect');
  81. $pubsub = new PubSubConsumer($client, array('subscribe' => 'channel:foo'));
  82. $connection->expects($this->never())->method('writeRequest');
  83. $pubsub->stop(true);
  84. }
  85. /**
  86. * @group disconnected
  87. */
  88. public function testStoppingConsumerWithFalseSendsUnsubscriptions()
  89. {
  90. $profile = Profile\Factory::get(REDIS_SERVER_VERSION);
  91. $classUnsubscribe = $profile->getCommandClass('unsubscribe');
  92. $classPunsubscribe = $profile->getCommandClass('punsubscribe');
  93. $connection = $this->getMock('Predis\Connection\NodeConnectionInterface');
  94. $client = $this->getMock('Predis\Client', array('disconnect'), array($connection));
  95. $options = array('subscribe' => 'channel:foo', 'psubscribe' => 'channels:*');
  96. $pubsub = new PubSubConsumer($client, $options);
  97. $connection->expects($this->exactly(2))
  98. ->method('writeRequest')
  99. ->with($this->logicalOr(
  100. $this->isInstanceOf($classUnsubscribe),
  101. $this->isInstanceOf($classPunsubscribe)
  102. ));
  103. $pubsub->stop(false);
  104. }
  105. /**
  106. * @group disconnected
  107. */
  108. public function testIsNotValidWhenNotSubscribed()
  109. {
  110. $connection = $this->getMock('Predis\Connection\NodeConnectionInterface');
  111. $client = $this->getMock('Predis\Client', array('disconnect'), array($connection));
  112. $pubsub = new PubSubConsumer($client);
  113. $this->assertFalse($pubsub->valid());
  114. $this->assertNull($pubsub->next());
  115. }
  116. /**
  117. * @group disconnected
  118. */
  119. public function testHandlesPongMessages()
  120. {
  121. $rawmessage = array('pong', '');
  122. $connection = $this->getMock('Predis\Connection\NodeConnectionInterface');
  123. $connection->expects($this->once())->method('read')->will($this->returnValue($rawmessage));
  124. $client = new Client($connection);
  125. $pubsub = new PubSubConsumer($client, array('subscribe' => 'channel:foo'));
  126. $message = $pubsub->current();
  127. $this->assertSame('pong', $message->kind);
  128. $this->assertSame('', $message->payload);
  129. }
  130. /**
  131. * @group disconnected
  132. */
  133. public function testHandlesPongMessagesWithPayload()
  134. {
  135. $rawmessage = array('pong', 'foobar');
  136. $connection = $this->getMock('Predis\Connection\NodeConnectionInterface');
  137. $connection->expects($this->once())->method('read')->will($this->returnValue($rawmessage));
  138. $client = new Client($connection);
  139. $pubsub = new PubSubConsumer($client, array('subscribe' => 'channel:foo'));
  140. $message = $pubsub->current();
  141. $this->assertSame('pong', $message->kind);
  142. $this->assertSame('foobar', $message->payload);
  143. }
  144. /**
  145. * @group disconnected
  146. */
  147. public function testReadsMessageFromConnection()
  148. {
  149. $rawmessage = array('message', 'channel:foo', 'message from channel');
  150. $connection = $this->getMock('Predis\Connection\NodeConnectionInterface');
  151. $connection->expects($this->once())->method('read')->will($this->returnValue($rawmessage));
  152. $client = new Client($connection);
  153. $pubsub = new PubSubConsumer($client, array('subscribe' => 'channel:foo'));
  154. $message = $pubsub->current();
  155. $this->assertSame('message', $message->kind);
  156. $this->assertSame('channel:foo', $message->channel);
  157. $this->assertSame('message from channel', $message->payload);
  158. }
  159. /**
  160. * @group disconnected
  161. */
  162. public function testReadsPmessageFromConnection()
  163. {
  164. $rawmessage = array('pmessage', 'channel:*', 'channel:foo', 'message from channel');
  165. $connection = $this->getMock('Predis\Connection\NodeConnectionInterface');
  166. $connection->expects($this->once())->method('read')->will($this->returnValue($rawmessage));
  167. $client = new Client($connection);
  168. $pubsub = new PubSubConsumer($client, array('psubscribe' => 'channel:*'));
  169. $message = $pubsub->current();
  170. $this->assertSame('pmessage', $message->kind);
  171. $this->assertSame('channel:*', $message->pattern);
  172. $this->assertSame('channel:foo', $message->channel);
  173. $this->assertSame('message from channel', $message->payload);
  174. }
  175. /**
  176. * @group disconnected
  177. */
  178. public function testReadsSubscriptionMessageFromConnection()
  179. {
  180. $rawmessage = array('subscribe', 'channel:foo', 1);
  181. $connection = $this->getMock('Predis\Connection\NodeConnectionInterface');
  182. $connection->expects($this->once())->method('read')->will($this->returnValue($rawmessage));
  183. $client = new Client($connection);
  184. $pubsub = new PubSubConsumer($client, array('subscribe' => 'channel:foo'));
  185. $message = $pubsub->current();
  186. $this->assertSame('subscribe', $message->kind);
  187. $this->assertSame('channel:foo', $message->channel);
  188. $this->assertSame(1, $message->payload);
  189. }
  190. /**
  191. * @group disconnected
  192. */
  193. public function testReadsUnsubscriptionMessageFromConnection()
  194. {
  195. $rawmessage = array('unsubscribe', 'channel:foo', 1);
  196. $connection = $this->getMock('Predis\Connection\NodeConnectionInterface');
  197. $connection->expects($this->once())->method('read')->will($this->returnValue($rawmessage));
  198. $client = new Client($connection);
  199. $pubsub = new PubSubConsumer($client, array('subscribe' => 'channel:foo'));
  200. $message = $pubsub->current();
  201. $this->assertSame('unsubscribe', $message->kind);
  202. $this->assertSame('channel:foo', $message->channel);
  203. $this->assertSame(1, $message->payload);
  204. }
  205. /**
  206. * @group disconnected
  207. */
  208. public function testUnsubscriptionMessageWithZeroChannelCountInvalidatesConsumer()
  209. {
  210. $rawmessage = array('unsubscribe', 'channel:foo', 0);
  211. $connection = $this->getMock('Predis\Connection\NodeConnectionInterface');
  212. $connection->expects($this->once())->method('read')->will($this->returnValue($rawmessage));
  213. $client = new Client($connection);
  214. $pubsub = new PubSubConsumer($client, array('subscribe' => 'channel:foo'));
  215. $this->assertTrue($pubsub->valid());
  216. $message = $pubsub->current();
  217. $this->assertSame('unsubscribe', $message->kind);
  218. $this->assertSame('channel:foo', $message->channel);
  219. $this->assertSame(0, $message->payload);
  220. $this->assertFalse($pubsub->valid());
  221. }
  222. /**
  223. * @group disconnected
  224. */
  225. public function testGetUnderlyingClientInstance()
  226. {
  227. $connection = $this->getMock('Predis\Connection\NodeConnectionInterface');
  228. $client = new Client($connection);
  229. $pubsub = new PubSubConsumer($client);
  230. $this->assertSame($client, $pubsub->getClient());
  231. }
  232. // ******************************************************************** //
  233. // ---- INTEGRATION TESTS --------------------------------------------- //
  234. // ******************************************************************** //
  235. /**
  236. * @group connected
  237. */
  238. public function testPubSubAgainstRedisServer()
  239. {
  240. $parameters = array(
  241. 'host' => REDIS_SERVER_HOST,
  242. 'port' => REDIS_SERVER_PORT,
  243. 'database' => REDIS_SERVER_DBNUM,
  244. // Prevents suite from handing on broken test
  245. 'read_write_timeout' => 2,
  246. );
  247. $options = array('profile' => REDIS_SERVER_VERSION);
  248. $messages = array();
  249. $producer = new Client($parameters, $options);
  250. $producer->connect();
  251. $consumer = new Client($parameters, $options);
  252. $consumer->connect();
  253. $pubsub = new PubSubConsumer($consumer);
  254. $pubsub->subscribe('channel:foo');
  255. $producer->publish('channel:foo', 'message1');
  256. $producer->publish('channel:foo', 'message2');
  257. $producer->publish('channel:foo', 'QUIT');
  258. foreach ($pubsub as $message) {
  259. if ($message->kind !== 'message') {
  260. continue;
  261. }
  262. $messages[] = ($payload = $message->payload);
  263. if ($payload === 'QUIT') {
  264. $pubsub->stop();
  265. }
  266. }
  267. $this->assertSame(array('message1', 'message2', 'QUIT'), $messages);
  268. $this->assertFalse($pubsub->valid());
  269. $this->assertEquals('ECHO', $consumer->echo('ECHO'));
  270. }
  271. /**
  272. * @group connected
  273. * @requires extension pcntl
  274. */
  275. public function testPubSubAgainstRedisServerBlocking()
  276. {
  277. $parameters = array(
  278. 'host' => REDIS_SERVER_HOST,
  279. 'port' => REDIS_SERVER_PORT,
  280. 'database' => REDIS_SERVER_DBNUM,
  281. 'read_write_timeout' => -1, // -1 to set blocking reads
  282. );
  283. $options = array('profile' => REDIS_SERVER_VERSION);
  284. // create consumer before forking so the child can disconnect it
  285. $consumer = new Client($parameters, $options);
  286. $consumer->connect();
  287. /*
  288. * fork
  289. * parent: consumer
  290. * child: producer
  291. */
  292. if ($childPID = pcntl_fork()) {
  293. $messages = array();
  294. $pubsub = new PubSubConsumer($consumer);
  295. $pubsub->subscribe('channel:foo');
  296. foreach ($pubsub as $message) {
  297. if ($message->kind !== 'message') {
  298. continue;
  299. }
  300. $messages[] = ($payload = $message->payload);
  301. if ($payload === 'QUIT') {
  302. $pubsub->stop();
  303. }
  304. }
  305. $this->assertSame(array('message1', 'message2', 'QUIT'), $messages);
  306. $this->assertFalse($pubsub->valid());
  307. $this->assertEquals('ECHO', $consumer->echo('ECHO'));
  308. // kill the child
  309. posix_kill($childPID, SIGKILL);
  310. } else {
  311. // create producer, read_write_timeout = 2 because it doesn't do blocking reads anyway
  312. $producer = new Client(array_replace($parameters, array('read_write_timeout' => 2)), $options);
  313. $producer->connect();
  314. $producer->publish('channel:foo', 'message1');
  315. $producer->publish('channel:foo', 'message2');
  316. $producer->publish('channel:foo', 'QUIT');
  317. // sleep, giving the consumer a chance to respond to the QUIT message
  318. sleep(1);
  319. // disconnect the consumer because otherwise it could remain stuck in blocking read
  320. // if it failed to respond to the QUIT message
  321. $consumer->disconnect();
  322. // exit child
  323. exit(0);
  324. }
  325. }
  326. }