|
@@ -340,4 +340,71 @@ class ConsumerTest extends PredisTestCase
|
|
|
$this->assertFalse($pubsub->valid());
|
|
|
$this->assertEquals('ECHO', $consumer->echo('ECHO'));
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * @group connected
|
|
|
+ */
|
|
|
+ public function testPubSubAgainstRedisServerBlocking()
|
|
|
+ {
|
|
|
+ $parameters = array(
|
|
|
+ 'host' => REDIS_SERVER_HOST,
|
|
|
+ 'port' => REDIS_SERVER_PORT,
|
|
|
+ 'database' => REDIS_SERVER_DBNUM,
|
|
|
+ 'read_write_timeout' => -1, // -1 to set blocking reads
|
|
|
+ );
|
|
|
+
|
|
|
+ $options = array('profile' => REDIS_SERVER_VERSION);
|
|
|
+
|
|
|
+ // create consumer before forking so the child can disconnect it
|
|
|
+ $consumer = new Client($parameters, $options);
|
|
|
+ $consumer->connect();
|
|
|
+
|
|
|
+ /*
|
|
|
+ * fork
|
|
|
+ * parent: consumer
|
|
|
+ * child: producer
|
|
|
+ */
|
|
|
+ if ($childPID = pcntl_fork()) {
|
|
|
+ $messages = array();
|
|
|
+
|
|
|
+ $pubsub = new PubSubConsumer($consumer);
|
|
|
+ $pubsub->subscribe('channel:foo');
|
|
|
+
|
|
|
+ foreach ($pubsub as $message) {
|
|
|
+ if ($message->kind !== 'message') {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ $messages[] = ($payload = $message->payload);
|
|
|
+ if ($payload === 'QUIT') {
|
|
|
+ $pubsub->stop();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ $this->assertSame(array('message1', 'message2', 'QUIT'), $messages);
|
|
|
+ $this->assertFalse($pubsub->valid());
|
|
|
+ $this->assertEquals('ECHO', $consumer->echo('ECHO'));
|
|
|
+
|
|
|
+ // kill the child
|
|
|
+ posix_kill($childPID, SIGKILL);
|
|
|
+ } else {
|
|
|
+ // create producer, read_write_timeout = 2 because it doesn't do blocking reads anyway
|
|
|
+ $producer = new Client(array_replace($parameters, array('read_write_timeout' => 2)), $options);
|
|
|
+ $producer->connect();
|
|
|
+
|
|
|
+ $producer->publish('channel:foo', 'message1');
|
|
|
+ $producer->publish('channel:foo', 'message2');
|
|
|
+
|
|
|
+ $producer->publish('channel:foo', 'QUIT');
|
|
|
+
|
|
|
+ // sleep, giving the consumer a chance to respond to the QUIT message
|
|
|
+ sleep(1);
|
|
|
+
|
|
|
+ // disconnect the consumer because otherwise it could remain stuck in blocking read
|
|
|
+ // if it failed to respond to the QUIT message
|
|
|
+ $consumer->disconnect();
|
|
|
+
|
|
|
+ // exit child
|
|
|
+ exit(0);
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|