checkCapabilities($client); $this->_options = $options ?: array(); $this->_client = $client; $this->_statusFlags = self::STATUS_VALID; $this->genericSubscribeInit('subscribe'); $this->genericSubscribeInit('psubscribe'); } public function __destruct() { $this->closeContext(); } private function checkCapabilities(Client $client) { if (Utils::isCluster($client->getConnection())) { throw new ClientException( 'Cannot initialize a PUB/SUB context over a cluster of connections' ); } $profile = $client->getProfile(); $commands = array('publish', 'subscribe', 'unsubscribe', 'psubscribe', 'punsubscribe'); if ($profile->supportsCommands($commands) === false) { throw new ClientException( 'The current profile does not support PUB/SUB related commands' ); } } private function genericSubscribeInit($subscribeAction) { if (isset($this->_options[$subscribeAction])) { if (is_array($this->_options[$subscribeAction])) { foreach ($this->_options[$subscribeAction] as $subscription) { $this->$subscribeAction($subscription); } } else { $this->$subscribeAction($this->_options[$subscribeAction]); } } } private function isFlagSet($value) { return ($this->_statusFlags & $value) === $value; } public function subscribe(/* arguments */) { $this->writeCommand(self::SUBSCRIBE, func_get_args()); $this->_statusFlags |= self::STATUS_SUBSCRIBED; } public function unsubscribe(/* arguments */) { $this->writeCommand(self::UNSUBSCRIBE, func_get_args()); } public function psubscribe(/* arguments */) { $this->writeCommand(self::PSUBSCRIBE, func_get_args()); $this->_statusFlags |= self::STATUS_PSUBSCRIBED; } public function punsubscribe(/* arguments */) { $this->writeCommand(self::PUNSUBSCRIBE, func_get_args()); } public function closeContext() { if ($this->valid()) { if ($this->isFlagSet(self::STATUS_SUBSCRIBED)) { $this->unsubscribe(); } if ($this->isFlagSet(self::STATUS_PSUBSCRIBED)) { $this->punsubscribe(); } } } private function writeCommand($method, $arguments) { if (count($arguments) === 1 && is_array($arguments[0])) { $arguments = $arguments[0]; } $command = $this->_client->createCommand($method, $arguments); $this->_client->getConnection()->writeCommand($command); } public function rewind() { // NOOP } public function current() { return $this->getValue(); } public function key() { return $this->_position; } public function next() { if ($this->isFlagSet(self::STATUS_VALID)) { $this->_position++; } return $this->_position; } public function valid() { $subscriptions = self::STATUS_SUBSCRIBED + self::STATUS_PSUBSCRIBED; return $this->isFlagSet(self::STATUS_VALID) && ($this->_statusFlags & $subscriptions) > 0; } private function invalidate() { $this->_statusFlags = 0x0000; } private function getValue() { $connection = $this->_client->getConnection(); $protocol = $connection->getProtocol(); $response = $protocol->read($connection); switch ($response[0]) { case self::SUBSCRIBE: case self::UNSUBSCRIBE: case self::PSUBSCRIBE: case self::PUNSUBSCRIBE: if ($response[2] === 0) { $this->invalidate(); } case self::MESSAGE: return (object) array( 'kind' => $response[0], 'channel' => $response[1], 'payload' => $response[2], ); case self::PMESSAGE: return (object) array( 'kind' => $response[0], 'pattern' => $response[1], 'channel' => $response[2], 'payload' => $response[3], ); default: throw new ClientException( "Received an unknown message type {$response[0]} inside of a pubsub context" ); } } }