* * For the full copyright and license information, please view the LICENSE * file that was distributed with this source code. */ namespace Predis\Pipeline; use Predis\Client; use Predis\ClientException; use Predis\Profile; use Predis\Response; use PredisTestCase; /** * */ class PipelineTest extends PredisTestCase { /** * @group disconnected */ public function testConstructor() { $client = new Client(); $pipeline = new Pipeline($client); $this->assertSame($client, $pipeline->getClient()); } /** * @group disconnected */ public function testCallDoesNotSendCommandsWithoutExecute() { $connection = $this->getMock('Predis\Connection\NodeConnectionInterface'); $connection->expects($this->never())->method('writeRequest'); $connection->expects($this->never())->method('readResponse'); $pipeline = new Pipeline(new Client($connection)); $pipeline->echo('one'); $pipeline->echo('two'); $pipeline->echo('three'); } /** * @group disconnected */ public function testCallReturnsPipelineForFluentInterface() { $connection = $this->getMock('Predis\Connection\NodeConnectionInterface'); $connection->expects($this->never())->method('writeRequest'); $connection->expects($this->never())->method('readResponse'); $pipeline = new Pipeline(new Client($connection)); $this->assertSame($pipeline, $pipeline->echo('one')); $this->assertSame($pipeline, $pipeline->echo('one')->echo('two')->echo('three')); } /** * @group disconnected */ public function testDoesNotParseComplexResponseObjects() { $object = $this->getMock('Predis\Response\ResponseInterface'); $connection = $this->getMock('Predis\Connection\NodeConnectionInterface'); $connection->expects($this->once()) ->method('readResponse') ->will($this->returnValue($object)); $pipeline = new Pipeline(new Client($connection)); $pipeline->ping(); $this->assertSame(array($object), $pipeline->execute()); } /** * @group disconnected * @expectedException \Predis\Response\ServerException * @expectedExceptionMessage ERR Test error */ public function testThrowsServerExceptionOnResponseErrorByDefault() { $error = new Response\Error('ERR Test error'); $connection = $this->getMock('Predis\Connection\NodeConnectionInterface'); $connection->expects($this->once()) ->method('readResponse') ->will($this->returnValue($error)); $pipeline = new Pipeline(new Client($connection)); $pipeline->ping(); $pipeline->ping(); $pipeline->execute(); } /** * @group disconnected */ public function testReturnsResponseErrorWithClientExceptionsSetToFalse() { $error = $this->getMock('Predis\Response\ErrorInterface'); $connection = $this->getMock('Predis\Connection\NodeConnectionInterface'); $connection->expects($this->exactly(2)) ->method('readResponse') ->will($this->returnValue($error)); $client = new Client($connection, array('exceptions' => false)); $pipeline = new Pipeline($client); $pipeline->ping(); $pipeline->ping(); $this->assertSame(array($error, $error), $pipeline->execute()); } /** * @group disconnected */ public function testExecuteReturnsPipelineForFluentInterface() { $profile = Profile\Factory::getDefault(); $connection = $this->getMock('Predis\Connection\NodeConnectionInterface'); $pipeline = new Pipeline(new Client($connection)); $command = $profile->createCommand('echo', array('one')); $this->assertSame($pipeline, $pipeline->executeCommand($command)); } /** * @group disconnected */ public function testExecuteCommandDoesNotSendCommandsWithoutExecute() { $profile = Profile\Factory::getDefault(); $connection = $this->getMock('Predis\Connection\NodeConnectionInterface'); $connection->expects($this->never())->method('writeRequest'); $connection->expects($this->never())->method('readResponse'); $pipeline = new Pipeline(new Client($connection)); $pipeline->executeCommand($profile->createCommand('echo', array('one'))); $pipeline->executeCommand($profile->createCommand('echo', array('two'))); $pipeline->executeCommand($profile->createCommand('echo', array('three'))); } /** * @group disconnected */ public function testExecuteWithEmptyBuffer() { $connection = $this->getMock('Predis\Connection\NodeConnectionInterface'); $connection->expects($this->never())->method('writeRequest'); $connection->expects($this->never())->method('readResponse'); $pipeline = new Pipeline(new Client($connection)); $this->assertSame(array(), $pipeline->execute()); } /** * @group disconnected */ public function testExecuteWithFilledBuffer() { $connection = $this->getMock('Predis\Connection\NodeConnectionInterface'); $connection->expects($this->exactly(3)) ->method('writeRequest'); $connection->expects($this->exactly(3)) ->method('readResponse') ->will($this->returnCallback($this->getReadCallback())); $pipeline = new Pipeline(new Client($connection)); $pipeline->echo('one'); $pipeline->echo('two'); $pipeline->echo('three'); $pipeline->flushPipeline(); $this->assertSame(array('one', 'two', 'three'), $pipeline->execute()); } /** * @group disconnected */ public function testFlushWithFalseArgumentDiscardsBuffer() { $pipeline = new Pipeline(new Client()); $pipeline->echo('one'); $pipeline->echo('two'); $pipeline->echo('three'); $pipeline->flushPipeline(false); $this->assertSame(array(), $pipeline->execute()); } /** * @group disconnected */ public function testFlushHandlesPartialBuffers() { $connection = $this->getMock('Predis\Connection\NodeConnectionInterface'); $connection->expects($this->exactly(4)) ->method('writeRequest'); $connection->expects($this->exactly(4)) ->method('readResponse') ->will($this->returnCallback($this->getReadCallback())); $pipeline = new Pipeline(new Client($connection)); $pipeline->echo('one'); $pipeline->echo('two'); $pipeline->flushPipeline(); $pipeline->echo('three'); $pipeline->echo('four'); $this->assertSame(array('one', 'two', 'three', 'four'), $pipeline->execute()); } /** * @group disconnected */ public function testSwitchesToMasterWithReplicationConnection() { $pong = new Response\Status('PONG'); $connection = $this->getMock('Predis\Connection\Aggregate\ReplicationInterface'); $connection->expects($this->once()) ->method('switchTo') ->with('master'); $connection->expects($this->exactly(3)) ->method('writeRequest'); $connection->expects($this->exactly(3)) ->method('readResponse') ->will($this->returnValue($pong)); $pipeline = new Pipeline(new Client($connection)); $pipeline->ping(); $pipeline->ping(); $pipeline->ping(); $this->assertSame(array($pong, $pong, $pong), $pipeline->execute()); } /** * @group disconnected */ public function testExecuteAcceptsCallableArgument() { $test = $this; $pipeline = new Pipeline(new Client()); $callable = function ($pipe) use ($test, $pipeline) { $test->assertSame($pipeline, $pipe); $pipe->flushPipeline(false); }; $pipeline->execute($callable); } /** * @group disconnected * @expectedException InvalidArgumentException */ public function testExecuteDoesNotAcceptNonCallableArgument() { $noncallable = new \stdClass(); $pipeline = new Pipeline(new Client()); $pipeline->execute($noncallable); } /** * @group disconnected * @expectedException \Predis\ClientException */ public function testExecuteInsideCallableArgumentThrowsException() { $pipeline = new Pipeline(new Client()); $pipeline->execute(function ($pipe) { $pipe->execute(); }); } /** * @group disconnected */ public function testExecuteWithCallableArgumentRunsPipelineInCallable() { $connection = $this->getMock('Predis\Connection\NodeConnectionInterface'); $connection->expects($this->exactly(4)) ->method('writeRequest'); $connection->expects($this->exactly(4)) ->method('readResponse') ->will($this->returnCallback($this->getReadCallback())); $pipeline = new Pipeline(new Client($connection)); $responses = $pipeline->execute(function ($pipe) { $pipe->echo('one'); $pipe->echo('two'); $pipe->echo('three'); $pipe->echo('four'); }); $this->assertSame(array('one', 'two', 'three', 'four'), $responses); } /** * @group disconnected */ public function testExecuteWithCallableArgumentHandlesExceptions() { $exception = null; $connection = $this->getMock('Predis\Connection\NodeConnectionInterface'); $connection->expects($this->never())->method('writeRequest'); $connection->expects($this->never())->method('readResponse'); $pipeline = new Pipeline(new Client($connection)); $exception = null; $responses = null; try { $responses = $pipeline->execute(function ($pipe) { $pipe->echo('one'); $pipe->echo('two'); throw new ClientException('TEST'); }); } catch (\Exception $exception) { // NOOP } $this->assertInstanceOf('Predis\ClientException', $exception); $this->assertSame('TEST', $exception->getMessage()); $this->assertNull($responses); } // ******************************************************************** // // ---- INTEGRATION TESTS --------------------------------------------- // // ******************************************************************** // /** * @group connected */ public function testIntegrationWithFluentInterface() { $pipeline = $this->getClient()->pipeline(); $results = $pipeline->echo('one') ->echo('two') ->echo('three') ->execute(); $this->assertSame(array('one', 'two', 'three'), $results); } /** * @group connected */ public function testIntegrationWithCallableBlock() { $client = $this->getClient(); $results = $client->pipeline(function ($pipe) { $pipe->set('foo', 'bar'); $pipe->get('foo'); }); $this->assertEquals(array('OK', 'bar'), $results); $this->assertTrue($client->exists('foo')); } /** * @group connected */ public function testOutOfBandMessagesInsidePipeline() { $oob = null; $client = $this->getClient(); $results = $client->pipeline(function ($pipe) use (&$oob) { $pipe->set('foo', 'bar'); $oob = $pipe->getClient()->echo('oob message'); $pipe->get('foo'); }); $this->assertEquals(array('OK', 'bar'), $results); $this->assertSame('oob message', $oob); $this->assertTrue($client->exists('foo')); } /** * @group connected */ public function testIntegrationWithClientExceptionInCallableBlock() { $exception = null; $client = $this->getClient(); try { $client->pipeline(function ($pipe) { $pipe->set('foo', 'bar'); throw new ClientException('TEST'); }); } catch (\Exception $exception) { // NOOP } $this->assertInstanceOf('Predis\ClientException', $exception); $this->assertSame('TEST', $exception->getMessage()); $this->assertFalse($client->exists('foo')); } /** * @group connected */ public function testIntegrationWithServerExceptionInCallableBlock() { $exception = null; $client = $this->getClient(); try { $client->pipeline(function ($pipe) { $pipe->set('foo', 'bar'); // LPUSH on a string key fails, but won't stop // the pipeline to send the commands. $pipe->lpush('foo', 'bar'); $pipe->set('hoge', 'piyo'); }); } catch (\Exception $exception) { // NOOP } $this->assertInstanceOf('Predis\Response\ServerException', $exception); $this->assertTrue($client->exists('foo')); $this->assertTrue($client->exists('hoge')); } /** * @group connected */ public function testIntegrationWithServerErrorInCallableBlock() { $client = $this->getClient(array(), array('exceptions' => false)); $results = $client->pipeline(function ($pipe) { $pipe->set('foo', 'bar'); $pipe->lpush('foo', 'bar'); // LPUSH on a string key fails. $pipe->get('foo'); }); $this->assertEquals('OK', $results[0]); $this->assertInstanceOf('Predis\Response\Error', $results[1]); $this->assertSame('bar', $results[2]); } // ******************************************************************** // // ---- HELPER METHODS ------------------------------------------------ // // ******************************************************************** // /** * Returns a client instance connected to the specified Redis * server instance to perform integration tests. * * @param array $parameters Additional connection parameters. * @param array $options Additional client options. * * @return Client */ protected function getClient(array $parameters = array(), array $options = array()) { return $this->createClient($parameters, $options); } /** * Helper method that returns a callback used to emulate the response to an * ECHO command. * * @return \Closure */ protected function getReadCallback() { return function ($command) { if (($id = $command->getId()) !== 'ECHO') { throw new \InvalidArgumentException("Expected ECHO, got {$id}"); } list($echoed) = $command->getArguments(); return $echoed; }; } }