* * For the full copyright and license information, please view the LICENSE * file that was distributed with this source code. */ namespace Predis\PubSub; use Predis\Client; use Predis\Helpers; use Predis\ClientException; use Predis\NotSupportedException; /** * Client-side abstraction of a Publish / Subscribe context. * * @author Daniele Alessandri */ class PubSubContext implements \Iterator { const SUBSCRIBE = 'subscribe'; const UNSUBSCRIBE = 'unsubscribe'; const PSUBSCRIBE = 'psubscribe'; const PUNSUBSCRIBE = 'punsubscribe'; const MESSAGE = 'message'; const PMESSAGE = 'pmessage'; const STATUS_VALID = 1; // 0b0001 const STATUS_SUBSCRIBED = 2; // 0b0010 const STATUS_PSUBSCRIBED = 4; // 0b0100 private $client; private $position; private $options; /** * @param Client Client instance used by the context. * @param array Options for the context initialization. */ public function __construct(Client $client, Array $options = null) { $this->checkCapabilities($client); $this->options = $options ?: array(); $this->client = $client; $this->statusFlags = self::STATUS_VALID; $this->genericSubscribeInit('subscribe'); $this->genericSubscribeInit('psubscribe'); } /** * Automatically closes the context when PHP's garbage collector kicks in. */ public function __destruct() { $this->closeContext(true); } /** * Checks if the passed client instance satisfies the required conditions * needed to initialize a Publish / Subscribe context. * * @param Client Client instance used by the context. */ private function checkCapabilities(Client $client) { if (Helpers::isCluster($client->getConnection())) { throw new NotSupportedException('Cannot initialize a PUB/SUB context over a cluster of connections'); } $commands = array('publish', 'subscribe', 'unsubscribe', 'psubscribe', 'punsubscribe'); if ($client->getProfile()->supportsCommands($commands) === false) { throw new NotSupportedException('The current profile does not support PUB/SUB related commands'); } } /** * This method shares the logic to handle both SUBSCRIBE and PSUBSCRIBE. * * @param string $subscribeAction Type of subscription. */ private function genericSubscribeInit($subscribeAction) { if (isset($this->options[$subscribeAction])) { $this->$subscribeAction($this->options[$subscribeAction]); } } /** * Checks if the specified flag is valid in the state of the context. * * @param int $value Flag. * @return Boolean */ private function isFlagSet($value) { return ($this->statusFlags & $value) === $value; } /** * Subscribes to the specified channels. * * @param mixed $arg,... One or more channel names. */ public function subscribe(/* arguments */) { $this->writeCommand(self::SUBSCRIBE, func_get_args()); $this->statusFlags |= self::STATUS_SUBSCRIBED; } /** * Unsubscribes from the specified channels. * * @param mixed $arg,... One or more channel names. */ public function unsubscribe(/* arguments */) { $this->writeCommand(self::UNSUBSCRIBE, func_get_args()); } /** * Subscribes to the specified channels using a pattern. * * @param mixed $arg,... One or more channel name patterns. */ public function psubscribe(/* arguments */) { $this->writeCommand(self::PSUBSCRIBE, func_get_args()); $this->statusFlags |= self::STATUS_PSUBSCRIBED; } /** * Unsubscribes from the specified channels using a pattern. * * @param mixed $arg,... One or more channel name patterns. */ public function punsubscribe(/* arguments */) { $this->writeCommand(self::PUNSUBSCRIBE, func_get_args()); } /** * Closes the context by unsubscribing from all the subscribed channels. * Optionally, the context can be forcefully closed by dropping the * underlying connection. * * @param Boolean $force Forcefully close the context by closing the connection. * @return Boolean Returns false if there are no pending messages. */ public function closeContext($force = false) { if (!$this->valid()) { return false; } if ($force) { $this->invalidate(); $this->client->disconnect(); } else { if ($this->isFlagSet(self::STATUS_SUBSCRIBED)) { $this->unsubscribe(); } if ($this->isFlagSet(self::STATUS_PSUBSCRIBED)) { $this->punsubscribe(); } } return !$force; } /** * Writes a Redis command on the underlying connection. * * @param string $method ID of the command. * @param array $arguments List of arguments. */ private function writeCommand($method, $arguments) { $arguments = Helpers::filterArrayArguments($arguments); $command = $this->client->createCommand($method, $arguments); $this->client->getConnection()->writeCommand($command); } /** * {@inheritdoc} */ public function rewind() { // NOOP } /** * Returns the last message payload retrieved from the server and generated * by one of the active subscriptions. * * @return array */ public function current() { return $this->getValue(); } /** * {@inheritdoc} */ public function key() { return $this->position; } /** * {@inheritdoc} */ public function next() { if ($this->valid()) { $this->position++; } return $this->position; } /** * Checks if the the context is still in a valid state to continue. * * @return Boolean */ public function valid() { $isValid = $this->isFlagSet(self::STATUS_VALID); $subscriptionFlags = self::STATUS_SUBSCRIBED | self::STATUS_PSUBSCRIBED; $hasSubscriptions = ($this->statusFlags & $subscriptionFlags) > 0; return $isValid && $hasSubscriptions; } /** * Resets the state of the context. */ private function invalidate() { $this->statusFlags = 0; // 0b0000; } /** * Waits for a new message from the server generated by one of the active * subscriptions and returns it when available. * * @return array */ private function getValue() { $response = $this->client->getConnection()->read(); switch ($response[0]) { case self::SUBSCRIBE: case self::UNSUBSCRIBE: case self::PSUBSCRIBE: case self::PUNSUBSCRIBE: if ($response[2] === 0) { $this->invalidate(); } case self::MESSAGE: return (object) array( 'kind' => $response[0], 'channel' => $response[1], 'payload' => $response[2], ); case self::PMESSAGE: return (object) array( 'kind' => $response[0], 'pattern' => $response[1], 'channel' => $response[2], 'payload' => $response[3], ); default: $message = "Received an unknown message type {$response[0]} inside of a pubsub context"; throw new ClientException($message); } } }