|
@@ -84,29 +84,19 @@ class StreamConnection extends AbstractConnection
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Initializes a TCP stream resource.
|
|
|
+ * Creates a connected stream socket resource.
|
|
|
*
|
|
|
- * @param ParametersInterface $parameters Initialization parameters for the connection.
|
|
|
+ * @param ParametersInterface $parameters Connection parameters.
|
|
|
+ * @param string $address Address for stream_socket_client().
|
|
|
+ * @param int $flags Flags for stream_socket_client().
|
|
|
*
|
|
|
* @return resource
|
|
|
*/
|
|
|
- protected function tcpStreamInitializer(ParametersInterface $parameters)
|
|
|
+ protected function createStreamSocket(ParametersInterface $parameters, $address, $flags)
|
|
|
{
|
|
|
- $uri = "tcp://{$parameters->host}:{$parameters->port}";
|
|
|
- $flags = STREAM_CLIENT_CONNECT;
|
|
|
-
|
|
|
- if (isset($parameters->async_connect) && (bool) $parameters->async_connect) {
|
|
|
- $flags |= STREAM_CLIENT_ASYNC_CONNECT;
|
|
|
- }
|
|
|
-
|
|
|
- if (isset($parameters->persistent) && (bool) $parameters->persistent) {
|
|
|
- $flags |= STREAM_CLIENT_PERSISTENT;
|
|
|
- $uri .= strpos($path = $parameters->path, '/') === 0 ? $path : "/$path";
|
|
|
- }
|
|
|
-
|
|
|
- $resource = @stream_socket_client($uri, $errno, $errstr, (float) $parameters->timeout, $flags);
|
|
|
+ $timeout = (float) $parameters->timeout;
|
|
|
|
|
|
- if (!$resource) {
|
|
|
+ if (!$resource = @stream_socket_client($address, $errno, $errstr, $timeout, $flags)) {
|
|
|
$this->onConnectionError(trim($errstr), $errno);
|
|
|
}
|
|
|
|
|
@@ -126,6 +116,32 @@ class StreamConnection extends AbstractConnection
|
|
|
return $resource;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Initializes a TCP stream resource.
|
|
|
+ *
|
|
|
+ * @param ParametersInterface $parameters Initialization parameters for the connection.
|
|
|
+ *
|
|
|
+ * @return resource
|
|
|
+ */
|
|
|
+ protected function tcpStreamInitializer(ParametersInterface $parameters)
|
|
|
+ {
|
|
|
+ $address = "tcp://{$parameters->host}:{$parameters->port}";
|
|
|
+ $flags = STREAM_CLIENT_CONNECT;
|
|
|
+
|
|
|
+ if (isset($parameters->async_connect) && $parameters->async_connect) {
|
|
|
+ $flags |= STREAM_CLIENT_ASYNC_CONNECT;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (isset($parameters->persistent) && $parameters->persistent) {
|
|
|
+ $flags |= STREAM_CLIENT_PERSISTENT;
|
|
|
+ $address .= strpos($path = $parameters->path, '/') === 0 ? $path : "/$path";
|
|
|
+ }
|
|
|
+
|
|
|
+ $resource = $this->createStreamSocket($parameters, $address, $flags);
|
|
|
+
|
|
|
+ return $resource;
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Initializes a UNIX stream resource.
|
|
|
*
|
|
@@ -136,29 +152,17 @@ class StreamConnection extends AbstractConnection
|
|
|
protected function unixStreamInitializer(ParametersInterface $parameters)
|
|
|
{
|
|
|
if (!isset($parameters->path)) {
|
|
|
- throw new InvalidArgumentException('Missing UNIX domain socket path.');
|
|
|
+ throw new \InvalidArgumentException('Missing UNIX domain socket path.');
|
|
|
}
|
|
|
|
|
|
- $uri = "unix://{$parameters->path}";
|
|
|
+ $address = "unix://{$parameters->path}";
|
|
|
$flags = STREAM_CLIENT_CONNECT;
|
|
|
|
|
|
- if ((bool) $parameters->persistent) {
|
|
|
+ if (isset($parameters->persistent) && $parameters->persistent) {
|
|
|
$flags |= STREAM_CLIENT_PERSISTENT;
|
|
|
}
|
|
|
|
|
|
- $resource = @stream_socket_client($uri, $errno, $errstr, (float) $parameters->timeout, $flags);
|
|
|
-
|
|
|
- if (!$resource) {
|
|
|
- $this->onConnectionError(trim($errstr), $errno);
|
|
|
- }
|
|
|
-
|
|
|
- if (isset($parameters->read_write_timeout)) {
|
|
|
- $rwtimeout = (float) $parameters->read_write_timeout;
|
|
|
- $rwtimeout = $rwtimeout > 0 ? $rwtimeout : -1;
|
|
|
- $timeoutSeconds = floor($rwtimeout);
|
|
|
- $timeoutUSeconds = ($rwtimeout - $timeoutSeconds) * 1000000;
|
|
|
- stream_set_timeout($resource, $timeoutSeconds, $timeoutUSeconds);
|
|
|
- }
|
|
|
+ $resource = $this->createStreamSocket($parameters, $address, $flags);
|
|
|
|
|
|
return $resource;
|
|
|
}
|