|
@@ -21,21 +21,9 @@ use Predis\NotSupportedException;
|
|
|
*
|
|
|
* @author Daniele Alessandri <suppakilla@gmail.com>
|
|
|
*/
|
|
|
-class PubSubContext implements \Iterator
|
|
|
+class PubSubContext extends AbstractPubSubContext
|
|
|
{
|
|
|
- const SUBSCRIBE = 'subscribe';
|
|
|
- const UNSUBSCRIBE = 'unsubscribe';
|
|
|
- const PSUBSCRIBE = 'psubscribe';
|
|
|
- const PUNSUBSCRIBE = 'punsubscribe';
|
|
|
- const MESSAGE = 'message';
|
|
|
- const PMESSAGE = 'pmessage';
|
|
|
-
|
|
|
- const STATUS_VALID = 1; // 0b0001
|
|
|
- const STATUS_SUBSCRIBED = 2; // 0b0010
|
|
|
- const STATUS_PSUBSCRIBED = 4; // 0b0100
|
|
|
-
|
|
|
private $client;
|
|
|
- private $position;
|
|
|
private $options;
|
|
|
|
|
|
/**
|
|
@@ -47,20 +35,11 @@ class PubSubContext implements \Iterator
|
|
|
$this->checkCapabilities($client);
|
|
|
$this->options = $options ?: array();
|
|
|
$this->client = $client;
|
|
|
- $this->statusFlags = self::STATUS_VALID;
|
|
|
|
|
|
$this->genericSubscribeInit('subscribe');
|
|
|
$this->genericSubscribeInit('psubscribe');
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Automatically closes the context when PHP's garbage collector kicks in.
|
|
|
- */
|
|
|
- public function __destruct()
|
|
|
- {
|
|
|
- $this->closeContext(true);
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* Checks if the passed client instance satisfies the required conditions
|
|
|
* needed to initialize a Publish / Subscribe context.
|
|
@@ -92,95 +71,9 @@ class PubSubContext implements \Iterator
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Checks if the specified flag is valid in the state of the context.
|
|
|
- *
|
|
|
- * @param int $value Flag.
|
|
|
- * @return Boolean
|
|
|
- */
|
|
|
- private function isFlagSet($value)
|
|
|
- {
|
|
|
- return ($this->statusFlags & $value) === $value;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Subscribes to the specified channels.
|
|
|
- *
|
|
|
- * @param mixed $arg,... One or more channel names.
|
|
|
- */
|
|
|
- public function subscribe(/* arguments */)
|
|
|
- {
|
|
|
- $this->writeCommand(self::SUBSCRIBE, func_get_args());
|
|
|
- $this->statusFlags |= self::STATUS_SUBSCRIBED;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Unsubscribes from the specified channels.
|
|
|
- *
|
|
|
- * @param mixed $arg,... One or more channel names.
|
|
|
- */
|
|
|
- public function unsubscribe(/* arguments */)
|
|
|
- {
|
|
|
- $this->writeCommand(self::UNSUBSCRIBE, func_get_args());
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Subscribes to the specified channels using a pattern.
|
|
|
- *
|
|
|
- * @param mixed $arg,... One or more channel name patterns.
|
|
|
- */
|
|
|
- public function psubscribe(/* arguments */)
|
|
|
- {
|
|
|
- $this->writeCommand(self::PSUBSCRIBE, func_get_args());
|
|
|
- $this->statusFlags |= self::STATUS_PSUBSCRIBED;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Unsubscribes from the specified channels using a pattern.
|
|
|
- *
|
|
|
- * @param mixed $arg,... One or more channel name patterns.
|
|
|
- */
|
|
|
- public function punsubscribe(/* arguments */)
|
|
|
- {
|
|
|
- $this->writeCommand(self::PUNSUBSCRIBE, func_get_args());
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Closes the context by unsubscribing from all the subscribed channels.
|
|
|
- * Optionally, the context can be forcefully closed by dropping the
|
|
|
- * underlying connection.
|
|
|
- *
|
|
|
- * @param Boolean $force Forcefully close the context by closing the connection.
|
|
|
- * @return Boolean Returns false if there are no pending messages.
|
|
|
- */
|
|
|
- public function closeContext($force = false)
|
|
|
- {
|
|
|
- if (!$this->valid()) {
|
|
|
- return false;
|
|
|
- }
|
|
|
-
|
|
|
- if ($force) {
|
|
|
- $this->invalidate();
|
|
|
- $this->client->disconnect();
|
|
|
- }
|
|
|
- else {
|
|
|
- if ($this->isFlagSet(self::STATUS_SUBSCRIBED)) {
|
|
|
- $this->unsubscribe();
|
|
|
- }
|
|
|
- if ($this->isFlagSet(self::STATUS_PSUBSCRIBED)) {
|
|
|
- $this->punsubscribe();
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- return !$force;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Writes a Redis command on the underlying connection.
|
|
|
- *
|
|
|
- * @param string $method ID of the command.
|
|
|
- * @param array $arguments List of arguments.
|
|
|
+ * {@inheritdoc}
|
|
|
*/
|
|
|
- private function writeCommand($method, $arguments)
|
|
|
+ protected function writeCommand($method, $arguments)
|
|
|
{
|
|
|
$arguments = Helpers::filterArrayArguments($arguments);
|
|
|
$command = $this->client->createCommand($method, $arguments);
|
|
@@ -190,71 +83,15 @@ class PubSubContext implements \Iterator
|
|
|
/**
|
|
|
* {@inheritdoc}
|
|
|
*/
|
|
|
- public function rewind()
|
|
|
- {
|
|
|
- // NOOP
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Returns the last message payload retrieved from the server and generated
|
|
|
- * by one of the active subscriptions.
|
|
|
- *
|
|
|
- * @return array
|
|
|
- */
|
|
|
- public function current()
|
|
|
- {
|
|
|
- return $this->getValue();
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * {@inheritdoc}
|
|
|
- */
|
|
|
- public function key()
|
|
|
+ protected function disconnect()
|
|
|
{
|
|
|
- return $this->position;
|
|
|
+ $this->client->disconnect();
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* {@inheritdoc}
|
|
|
*/
|
|
|
- public function next()
|
|
|
- {
|
|
|
- if ($this->valid()) {
|
|
|
- $this->position++;
|
|
|
- }
|
|
|
-
|
|
|
- return $this->position;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Checks if the the context is still in a valid state to continue.
|
|
|
- *
|
|
|
- * @return Boolean
|
|
|
- */
|
|
|
- public function valid()
|
|
|
- {
|
|
|
- $isValid = $this->isFlagSet(self::STATUS_VALID);
|
|
|
- $subscriptionFlags = self::STATUS_SUBSCRIBED | self::STATUS_PSUBSCRIBED;
|
|
|
- $hasSubscriptions = ($this->statusFlags & $subscriptionFlags) > 0;
|
|
|
-
|
|
|
- return $isValid && $hasSubscriptions;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Resets the state of the context.
|
|
|
- */
|
|
|
- private function invalidate()
|
|
|
- {
|
|
|
- $this->statusFlags = 0; // 0b0000;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Waits for a new message from the server generated by one of the active
|
|
|
- * subscriptions and returns it when available.
|
|
|
- *
|
|
|
- * @return array
|
|
|
- */
|
|
|
- private function getValue()
|
|
|
+ protected function getValue()
|
|
|
{
|
|
|
$response = $this->client->getConnection()->read();
|
|
|
|