diff --git a/src/Server/TCP/Connection.php b/src/Server/TCP/Connection.php index d1b2067..cf264e1 100644 --- a/src/Server/TCP/Connection.php +++ b/src/Server/TCP/Connection.php @@ -3,6 +3,7 @@ namespace Utopia\Proxy\Server\TCP; use Swoole\Coroutine\Client; +use Swoole\Coroutine\Socket; /** * Per-connection state struct. @@ -16,6 +17,14 @@ class Connection { public ?Client $backend = null; + /** + * Socket exported from the backend Client for the forward coroutine. + * Kept here so onClose can close it directly — after exportSocket() + * the Client no longer owns the fd, so Client::close() alone cannot + * unblock a coroutine parked on an untimed recv(). + */ + public ?Socket $backendSocket = null; + public int $port = 0; public int $inbound = 0; @@ -25,6 +34,7 @@ class Connection public function reset(): void { $this->backend = null; + $this->backendSocket = null; $this->port = 0; $this->inbound = 0; $this->outbound = 0; diff --git a/src/Server/TCP/Swoole.php b/src/Server/TCP/Swoole.php index e4359a7..ffd3e7a 100644 --- a/src/Server/TCP/Swoole.php +++ b/src/Server/TCP/Swoole.php @@ -4,7 +4,6 @@ use Swoole\Constant; use Swoole\Coroutine; -use Swoole\Coroutine\Client; use Swoole\Coroutine\Socket; use Swoole\Server; use Swoole\Server\Port; @@ -332,7 +331,7 @@ public function onReceive(Server $server, int $fd, string $data, int $port): voi // forward loop entirely. Otherwise fall back to the coroutine // path as before. if (!$adapter->activateSockmap($fd)) { - $this->forward($server, $fd, $backend); + $this->forward($server, $fd, $connection); } } catch (\Exception $e) { @@ -349,20 +348,25 @@ public function onReceive(Server $server, int $fd, string $data, int $port): voi * read loop because the backend socket is not registered with the * server's reactor. */ - protected function forward(Server $server, int $clientFd, Client $backend): void + protected function forward(Server $server, int $clientFd, Connection $connection): void { $bufferSize = $this->config->receiveBufferSize; - $exported = $backend->exportSocket(); + $exported = $connection->backend?->exportSocket(); if (!$exported instanceof Socket) { $server->close($clientFd); return; } $backendSocket = $exported; + $connection->backendSocket = $backendSocket; + // Read with no timeout: the exported backend socket inherits the + // connect timeout as its read timeout, so a bare recv() would return + // false after a few idle seconds and tear down the session. Dead + // peers are detected via TCP keepalive and FIN/RST instead. \go(static function () use ($server, $clientFd, $backendSocket, $bufferSize): void { while ($server->exist($clientFd)) { /** @var string|false $data */ - $data = $backendSocket->recv($bufferSize); + $data = $backendSocket->recv($bufferSize, -1); if ($data === false || $data === '') { break; } @@ -412,6 +416,11 @@ public function onClose(Server $server, int $fd, int $reactorId): void $adapter->closeConnection($fd); } + // Close the exported backend socket to unblock the forward + // coroutine's untimed recv() — Client::close() above cannot, since + // the Client no longer owns the fd after exportSocket(). + $connection->backendSocket?->close(); + $connection->reset(); } diff --git a/src/Server/TCP/Swoole/Coroutine.php b/src/Server/TCP/Swoole/Coroutine.php index c9b8838..d9ad04c 100644 --- a/src/Server/TCP/Swoole/Coroutine.php +++ b/src/Server/TCP/Swoole/Coroutine.php @@ -226,10 +226,16 @@ protected function handleConnectionData(Connection $connection, int $port): void /** @var Socket $backendSocket */ $backendSocket = $backendClient->exportSocket(); + // Pipe reads must not inherit socket read timeouts: Client::connect() + // re-stamps the connect timeout as the socket's read timeout, so a + // bare recv() would return false after a few idle seconds — + // indistinguishable from a closed peer — and silently tear down one + // direction of a long-lived session. Use -1 (no timeout) and rely on + // TCP keepalive and FIN/RST for dead-peer detection. \go(function () use ($clientSocket, $backendSocket, $bufferSize, $done): void { while (true) { /** @var string|false $data */ - $data = $backendSocket->recv($bufferSize); + $data = $backendSocket->recv($bufferSize, -1); if ($data === false || $data === '') { break; } @@ -237,6 +243,9 @@ protected function handleConnectionData(Connection $connection, int $port): void break; } } + // Unblock the client read loop — it has no timeout, so a dead + // backend would otherwise leave it parked forever on an idle client. + $clientSocket->close(); $done->push(true); }); @@ -251,7 +260,7 @@ protected function handleConnectionData(Connection $connection, int $port): void while (true) { /** @var string|false $data */ - $data = $clientSocket->recv($bufferSize); + $data = $clientSocket->recv($bufferSize, -1); if ($data === false || $data === '') { break; } @@ -261,8 +270,9 @@ protected function handleConnectionData(Connection $connection, int $port): void } $backendSocket->close(); + // The relay coroutine closes the client socket before signalling, + // so no further close is needed once pop() returns. $done->pop(); - $clientSocket->close(); $adapter->closeConnection($clientId); diff --git a/tests/Integration/CoroutinePipeIdleTest.php b/tests/Integration/CoroutinePipeIdleTest.php new file mode 100644 index 0000000..6135dcb --- /dev/null +++ b/tests/Integration/CoroutinePipeIdleTest.php @@ -0,0 +1,126 @@ +markTestSkipped('ext-swoole is required.'); + } + } + + public function testBackendRelaySurvivesIdleBeyondConnectTimeout(): void + { + $received = []; + $error = null; + + Coroutine\run(function () use (&$received, &$error): void { + $server = null; + $client = null; + + try { + // Backend that answers, goes idle for longer than the proxy's + // connect/read timeouts, then sends a delayed payload. + $listener = new Socket(\AF_INET, \SOCK_STREAM, 0); + $listener->bind('127.0.0.1', 0); + $listener->listen(128); + + /** @var array{port: int} $address */ + $address = $listener->getsockname(); + $backendPort = $address['port']; + + Coroutine::create(function () use ($listener): void { + /** @var Socket $peer */ + $peer = $listener->accept(5.0); + $peer->recv(4096, 5.0); + $peer->send('first'); + Coroutine::sleep(1.0); + $peer->send('second'); + $peer->recv(4096, 5.0); + $peer->close(); + $listener->close(); + }); + + [$server, $proxyPort] = $this->startProxy($backendPort); + $server->start(); + Coroutine::sleep(0.05); + + $client = new Socket(\AF_INET, \SOCK_STREAM, 0); + $this->assertTrue($client->connect('127.0.0.1', $proxyPort, 2.0)); + $this->assertNotFalse($client->send('init')); + + $received[] = $client->recv(4096, 2.0); + + // The backend stays silent for 1.0s here — longer than the + // 0.4s timeouts. Without the fix the backend→client relay + // breaks during this idle window and 'second' never arrives. + $received[] = $client->recv(4096, 3.0); + } catch (\Throwable $e) { + $error = $e; + } finally { + $client?->close(); + $server?->shutdown(); + } + }); + + if ($error !== null) { + throw $error; + } + + $this->assertSame('first', $received[0] ?? null); + $this->assertSame('second', $received[1] ?? null); + } + + /** + * Bind the proxy on a random port, retrying on collision. + * + * @return array{0: CoroutineServer, 1: int} + */ + private function startProxy(int $backendPort): array + { + $resolver = new Fixed("127.0.0.1:{$backendPort}"); + $attempts = 0; + + while (true) { + $proxyPort = \random_int(20_000, 60_000); + + $config = new Config( + ports: [$proxyPort], + host: '127.0.0.1', + timeout: 0.4, + connectTimeout: 0.4, + skipValidation: true, + ); + + try { + return [new CoroutineServer($resolver, $config), $proxyPort]; + } catch (\Swoole\Exception $e) { + if (++$attempts >= 10) { + throw $e; + } + } + } + } +}