* * For the full copyright and license information, please view the LICENSE * file that was distributed with this source code. */ namespace Predis\Pipeline; use Predis\ClientInterface; use Predis\BasicClientInterface; use Predis\ExecutableContextInterface; use Predis\Command\CommandInterface; use Predis\Helpers; use Predis\ClientException; /** * Abstraction of a pipeline context where write and read operations * of commands and their replies over the network are pipelined. * * @author Daniele Alessandri */ class PipelineContext implements BasicClientInterface, ExecutableContextInterface { private $client; private $executor; private $pipeline = array(); private $replies = array(); private $running = false; /** * @param ClientInterface Client instance used by the context. * @param array Options for the context initialization. */ public function __construct(ClientInterface $client, Array $options = null) { $this->client = $client; $this->executor = $this->createExecutor($client, $options ?: array()); } /** * Returns a pipeline executor depending on the kind of the underlying * connection and the passed options. * * @param ClientInterface Client instance used by the context. * @param array Options for the context initialization. * @return PipelineExecutorInterface */ protected function createExecutor(ClientInterface $client, Array $options) { if (isset($options['executor'])) { $executor = $options['executor']; if (is_callable($executor)) { $executor = call_user_func($executor, $client, $options); } if (!$executor instanceof PipelineExecutorInterface) { $message = 'The executor option accepts only instances of Predis\Pipeline\PipelineExecutorInterface'; throw new \InvalidArgumentException($message); } return $executor; } $clientOpts = $client->getOptions(); $useExceptions = isset($clientOpts->exceptions) ? $clientOpts->exceptions : true; return new StandardExecutor($useExceptions); } protected function getDefaultExecutor() { $clientOpts = $client->getOptions(); $useExceptions = isset($clientOpts->exceptions) ? $clientOpts->exceptions : true; $executor = new StandardExecutor($useExceptions); } /** * Queues a command into the pipeline buffer. * * @param string $method Command ID. * @param array $arguments Arguments for the command. * @return PipelineContext */ public function __call($method, $arguments) { $command = $this->client->createCommand($method, $arguments); $this->recordCommand($command); return $this; } /** * Queues a command instance into the pipeline buffer. * * @param CommandInterface $command Command to queue in the buffer. */ protected function recordCommand(CommandInterface $command) { $this->pipeline[] = $command; } /** * Queues a command instance into the pipeline buffer. * * @param CommandInterface $command Command to queue in the buffer. */ public function executeCommand(CommandInterface $command) { $this->recordCommand($command); } /** * Flushes the buffer that holds the queued commands. * * @param Boolean $send Specifies if the commands in the buffer should be sent to Redis. * @return PipelineContext */ public function flushPipeline($send = true) { if (count($this->pipeline) > 0) { if ($send) { $connection = $this->client->getConnection(); $replies = $this->executor->execute($connection, $this->pipeline); $this->replies = array_merge($this->replies, $replies); } $this->pipeline = array(); } return $this; } /** * Marks the running status of the pipeline. * * @param Boolean $bool True if the pipeline is running. * False if the pipeline is not running. */ private function setRunning($bool) { if ($bool === true && $this->running === true) { throw new ClientException("This pipeline is already opened"); } $this->running = $bool; } /** * Handles the actual execution of the whole pipeline. * * @param mixed $callable Optional callback for execution. * @return array */ public function execute($callable = null) { if ($callable && !is_callable($callable)) { throw new \InvalidArgumentException('Argument passed must be a callable object'); } $this->setRunning(true); $pipelineBlockException = null; try { if ($callable !== null) { call_user_func($callable, $this); } $this->flushPipeline(); } catch (\Exception $exception) { $pipelineBlockException = $exception; } $this->setRunning(false); if ($pipelineBlockException !== null) { throw $pipelineBlockException; } return $this->replies; } /** * Returns the underlying client instance used by the pipeline object. * * @return ClientInterface */ public function getClient() { return $this->client; } /** * Returns the underlying pipeline executor used by the pipeline object. * * @return PipelineExecutorInterface */ public function getExecutor() { return $this->executor; } }