123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435 |
- <?php
- /*
- * This file is part of the Predis package.
- *
- * (c) Daniele Alessandri <suppakilla@gmail.com>
- *
- * For the full copyright and license information, please view the LICENSE
- * file that was distributed with this source code.
- */
- namespace Predis\PubSub;
- use Predis\Client;
- use Predis\PubSub\Consumer as PubSubConsumer;
- use PredisTestCase;
- /**
- * @group realm-pubsub
- */
- class ConsumerTest extends PredisTestCase
- {
- /**
- * @group disconnected
- * @expectedException \Predis\NotSupportedException
- * @expectedExceptionMessage PUB/SUB commands are not supported by the current command factory.
- */
- public function testPubSubConsumerRequirePubSubRelatedCommand()
- {
- $commands = $this->getMock('Predis\Command\FactoryInterface');
- $commands
- ->expects($this->any())
- ->method('supportsCommands')
- ->will($this->returnValue(false));
- $client = new Client(null, array('commands' => $commands));
- new PubSubConsumer($client);
- }
- /**
- * @group disconnected
- * @expectedException \Predis\NotSupportedException
- * @expectedExceptionMessage Cannot initialize a PUB/SUB consumer over aggregate connections.
- */
- public function testPubSubConsumerDoesNotWorkOnClusters()
- {
- $cluster = $this->getMock('Predis\Connection\Cluster\ClusterInterface');
- $client = new Client($cluster);
- new PubSubConsumer($client);
- }
- /**
- * @group disconnected
- */
- public function testConstructorWithoutSubscriptionsDoesNotStartConsumer()
- {
- $connection = $this->getMock('Predis\Connection\NodeConnectionInterface');
- $client = $this->getMock('Predis\Client', array('executeCommand'), array($connection));
- $client->expects($this->never())->method('executeCommand');
- new PubSubConsumer($client);
- }
- /**
- * @group disconnected
- */
- public function testConstructorWithSubscriptionsStartsConsumer()
- {
- $commands = $this->getCommandFactory();
- $connection = $this->getMock('Predis\Connection\NodeConnectionInterface');
- $connection->expects($this->exactly(2))->method('writeRequest');
- $client = $this->getMock('Predis\Client', array('createCommand', 'writeRequest'), array($connection));
- $client
- ->expects($this->exactly(2))
- ->method('createCommand')
- ->with($this->logicalOr($this->equalTo('subscribe'), $this->equalTo('psubscribe')))
- ->will($this->returnCallback(function ($id, $args) use ($commands) {
- return $commands->createCommand($id, $args);
- }));
- $options = array('subscribe' => 'channel:foo', 'psubscribe' => 'channels:*');
- new PubSubConsumer($client, $options);
- }
- /**
- * @group disconnected
- */
- public function testStoppingConsumerWithTrueClosesConnection()
- {
- $connection = $this->getMock('Predis\Connection\NodeConnectionInterface');
- $client = $this->getMock('Predis\Client', array('disconnect'), array($connection));
- $client
- ->expects($this->once())
- ->method('disconnect');
- $pubsub = new PubSubConsumer($client, array('subscribe' => 'channel:foo'));
- $connection->expects($this->never())->method('writeRequest');
- $pubsub->stop(true);
- }
- /**
- * @group disconnected
- */
- public function testStoppingConsumerWithFalseSendsUnsubscriptions()
- {
- $commands = $this->getCommandFactory();
- $classUnsubscribe = $commands->getCommandClass('unsubscribe');
- $classPunsubscribe = $commands->getCommandClass('punsubscribe');
- $connection = $this->getMock('Predis\Connection\NodeConnectionInterface');
- $client = $this->getMock('Predis\Client', array('disconnect'), array($connection));
- $options = array('subscribe' => 'channel:foo', 'psubscribe' => 'channels:*');
- $pubsub = new PubSubConsumer($client, $options);
- $connection
- ->expects($this->exactly(2))
- ->method('writeRequest')
- ->with($this->logicalOr(
- $this->isInstanceOf($classUnsubscribe),
- $this->isInstanceOf($classPunsubscribe)
- ));
- $pubsub->stop(false);
- }
- /**
- * @group disconnected
- */
- public function testIsNotValidWhenNotSubscribed()
- {
- $connection = $this->getMock('Predis\Connection\NodeConnectionInterface');
- $client = $this->getMock('Predis\Client', array('disconnect'), array($connection));
- $pubsub = new PubSubConsumer($client);
- $this->assertFalse($pubsub->valid());
- $this->assertNull($pubsub->next());
- }
- /**
- * @group disconnected
- */
- public function testHandlesPongMessages()
- {
- $rawmessage = array('pong', '');
- $connection = $this->getMock('Predis\Connection\NodeConnectionInterface');
- $connection
- ->expects($this->once())
- ->method('read')
- ->will($this->returnValue($rawmessage));
- $client = new Client($connection);
- $pubsub = new PubSubConsumer($client, array('subscribe' => 'channel:foo'));
- $message = $pubsub->current();
- $this->assertSame('pong', $message->kind);
- $this->assertSame('', $message->payload);
- }
- /**
- * @group disconnected
- */
- public function testHandlesPongMessagesWithPayload()
- {
- $rawmessage = array('pong', 'foobar');
- $connection = $this->getMock('Predis\Connection\NodeConnectionInterface');
- $connection
- ->expects($this->once())
- ->method('read')
- ->will($this->returnValue($rawmessage));
- $client = new Client($connection);
- $pubsub = new PubSubConsumer($client, array('subscribe' => 'channel:foo'));
- $message = $pubsub->current();
- $this->assertSame('pong', $message->kind);
- $this->assertSame('foobar', $message->payload);
- }
- /**
- * @group disconnected
- */
- public function testReadsMessageFromConnection()
- {
- $rawmessage = array('message', 'channel:foo', 'message from channel');
- $connection = $this->getMock('Predis\Connection\NodeConnectionInterface');
- $connection
- ->expects($this->once())
- ->method('read')
- ->will($this->returnValue($rawmessage));
- $client = new Client($connection);
- $pubsub = new PubSubConsumer($client, array('subscribe' => 'channel:foo'));
- $message = $pubsub->current();
- $this->assertSame('message', $message->kind);
- $this->assertSame('channel:foo', $message->channel);
- $this->assertSame('message from channel', $message->payload);
- }
- /**
- * @group disconnected
- */
- public function testReadsPmessageFromConnection()
- {
- $rawmessage = array('pmessage', 'channel:*', 'channel:foo', 'message from channel');
- $connection = $this->getMock('Predis\Connection\NodeConnectionInterface');
- $connection
- ->expects($this->once())
- ->method('read')
- ->will($this->returnValue($rawmessage));
- $client = new Client($connection);
- $pubsub = new PubSubConsumer($client, array('psubscribe' => 'channel:*'));
- $message = $pubsub->current();
- $this->assertSame('pmessage', $message->kind);
- $this->assertSame('channel:*', $message->pattern);
- $this->assertSame('channel:foo', $message->channel);
- $this->assertSame('message from channel', $message->payload);
- }
- /**
- * @group disconnected
- */
- public function testReadsSubscriptionMessageFromConnection()
- {
- $rawmessage = array('subscribe', 'channel:foo', 1);
- $connection = $this->getMock('Predis\Connection\NodeConnectionInterface');
- $connection
- ->expects($this->once())
- ->method('read')
- ->will($this->returnValue($rawmessage));
- $client = new Client($connection);
- $pubsub = new PubSubConsumer($client, array('subscribe' => 'channel:foo'));
- $message = $pubsub->current();
- $this->assertSame('subscribe', $message->kind);
- $this->assertSame('channel:foo', $message->channel);
- $this->assertSame(1, $message->payload);
- }
- /**
- * @group disconnected
- */
- public function testReadsUnsubscriptionMessageFromConnection()
- {
- $rawmessage = array('unsubscribe', 'channel:foo', 1);
- $connection = $this->getMock('Predis\Connection\NodeConnectionInterface');
- $connection
- ->expects($this->once())
- ->method('read')
- ->will($this->returnValue($rawmessage));
- $client = new Client($connection);
- $pubsub = new PubSubConsumer($client, array('subscribe' => 'channel:foo'));
- $message = $pubsub->current();
- $this->assertSame('unsubscribe', $message->kind);
- $this->assertSame('channel:foo', $message->channel);
- $this->assertSame(1, $message->payload);
- }
- /**
- * @group disconnected
- */
- public function testUnsubscriptionMessageWithZeroChannelCountInvalidatesConsumer()
- {
- $rawmessage = array('unsubscribe', 'channel:foo', 0);
- $connection = $this->getMock('Predis\Connection\NodeConnectionInterface');
- $connection
- ->expects($this->once())
- ->method('read')
- ->will($this->returnValue($rawmessage));
- $client = new Client($connection);
- $pubsub = new PubSubConsumer($client, array('subscribe' => 'channel:foo'));
- $this->assertTrue($pubsub->valid());
- $message = $pubsub->current();
- $this->assertSame('unsubscribe', $message->kind);
- $this->assertSame('channel:foo', $message->channel);
- $this->assertSame(0, $message->payload);
- $this->assertFalse($pubsub->valid());
- }
- /**
- * @group disconnected
- */
- public function testGetUnderlyingClientInstance()
- {
- $connection = $this->getMock('Predis\Connection\NodeConnectionInterface');
- $client = new Client($connection);
- $pubsub = new PubSubConsumer($client);
- $this->assertSame($client, $pubsub->getClient());
- }
- // ******************************************************************** //
- // ---- INTEGRATION TESTS --------------------------------------------- //
- // ******************************************************************** //
- /**
- * @group connected
- * @requiresRedisVersion >= 2.0.0
- */
- public function testPubSubAgainstRedisServer()
- {
- $parameters = array(
- 'host' => REDIS_SERVER_HOST,
- 'port' => REDIS_SERVER_PORT,
- 'database' => REDIS_SERVER_DBNUM,
- // Prevents suite from handing on broken test
- 'read_write_timeout' => 2,
- );
- $messages = array();
- $producer = new Client($parameters);
- $producer->connect();
- $consumer = new Client($parameters);
- $consumer->connect();
- $pubsub = new PubSubConsumer($consumer);
- $pubsub->subscribe('channel:foo');
- $producer->publish('channel:foo', 'message1');
- $producer->publish('channel:foo', 'message2');
- $producer->publish('channel:foo', 'QUIT');
- foreach ($pubsub as $message) {
- if ($message->kind !== 'message') {
- continue;
- }
- $messages[] = ($payload = $message->payload);
- if ($payload === 'QUIT') {
- $pubsub->stop();
- }
- }
- $this->assertSame(array('message1', 'message2', 'QUIT'), $messages);
- $this->assertFalse($pubsub->valid());
- $this->assertEquals('ECHO', $consumer->echo('ECHO'));
- }
- /**
- * @group connected
- * @requiresRedisVersion >= 2.0.0
- * @requires extension pcntl
- */
- public function testPubSubAgainstRedisServerBlocking()
- {
- $parameters = array(
- 'host' => REDIS_SERVER_HOST,
- 'port' => REDIS_SERVER_PORT,
- 'database' => REDIS_SERVER_DBNUM,
- 'read_write_timeout' => -1, // -1 to set blocking reads
- );
- // create consumer before forking so the child can disconnect it
- $consumer = new Client($parameters);
- $consumer->connect();
- /*
- * fork
- * parent: consumer
- * child: producer
- */
- if ($childPID = pcntl_fork()) {
- $messages = array();
- $pubsub = new PubSubConsumer($consumer);
- $pubsub->subscribe('channel:foo');
- foreach ($pubsub as $message) {
- if ($message->kind !== 'message') {
- continue;
- }
- $messages[] = ($payload = $message->payload);
- if ($payload === 'QUIT') {
- $pubsub->stop();
- }
- }
- $this->assertSame(array('message1', 'message2', 'QUIT'), $messages);
- $this->assertFalse($pubsub->valid());
- $this->assertEquals('ECHO', $consumer->echo('ECHO'));
- // kill the child
- posix_kill($childPID, SIGKILL);
- } else {
- // create producer, read_write_timeout = 2 because it doesn't do blocking reads anyway
- $producer = new Client(array_replace($parameters, array('read_write_timeout' => 2)));
- $producer->connect();
- $producer->publish('channel:foo', 'message1');
- $producer->publish('channel:foo', 'message2');
- $producer->publish('channel:foo', 'QUIT');
- // sleep, giving the consumer a chance to respond to the QUIT message
- sleep(1);
- // disconnect the consumer because otherwise it could remain stuck in blocking read
- // if it failed to respond to the QUIT message
- $consumer->disconnect();
- // exit child
- exit(0);
- }
- }
- }
|