Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/Server/TCP/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
namespace Utopia\Proxy\Server\TCP;

use Swoole\Coroutine\Client;
use Swoole\Coroutine\Socket;

/**
* Per-connection state struct.
Expand All @@ -16,6 +17,14 @@ class Connection
{
public ?Client $backend = null;

/**
* Socket exported from the backend Client for the forward coroutine.
* Kept here so onClose can close it directly — after exportSocket()
* the Client no longer owns the fd, so Client::close() alone cannot
* unblock a coroutine parked on an untimed recv().
*/
public ?Socket $backendSocket = null;

public int $port = 0;

public int $inbound = 0;
Expand All @@ -25,6 +34,7 @@ class Connection
public function reset(): void
{
$this->backend = null;
$this->backendSocket = null;
$this->port = 0;
$this->inbound = 0;
$this->outbound = 0;
Expand Down
19 changes: 14 additions & 5 deletions src/Server/TCP/Swoole.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

use Swoole\Constant;
use Swoole\Coroutine;
use Swoole\Coroutine\Client;
use Swoole\Coroutine\Socket;
use Swoole\Server;
use Swoole\Server\Port;
Expand Down Expand Up @@ -332,7 +331,7 @@ public function onReceive(Server $server, int $fd, string $data, int $port): voi
// forward loop entirely. Otherwise fall back to the coroutine
// path as before.
if (!$adapter->activateSockmap($fd)) {
$this->forward($server, $fd, $backend);
$this->forward($server, $fd, $connection);
}

} catch (\Exception $e) {
Expand All @@ -349,20 +348,25 @@ public function onReceive(Server $server, int $fd, string $data, int $port): voi
* read loop because the backend socket is not registered with the
* server's reactor.
*/
protected function forward(Server $server, int $clientFd, Client $backend): void
protected function forward(Server $server, int $clientFd, Connection $connection): void
{
$bufferSize = $this->config->receiveBufferSize;
$exported = $backend->exportSocket();
$exported = $connection->backend?->exportSocket();
if (!$exported instanceof Socket) {
$server->close($clientFd);

return;
}
$backendSocket = $exported;
$connection->backendSocket = $backendSocket;
// Read with no timeout: the exported backend socket inherits the
// connect timeout as its read timeout, so a bare recv() would return
// false after a few idle seconds and tear down the session. Dead
// peers are detected via TCP keepalive and FIN/RST instead.
\go(static function () use ($server, $clientFd, $backendSocket, $bufferSize): void {
while ($server->exist($clientFd)) {
/** @var string|false $data */
$data = $backendSocket->recv($bufferSize);
$data = $backendSocket->recv($bufferSize, -1);
if ($data === false || $data === '') {
break;
}
Expand Down Expand Up @@ -412,6 +416,11 @@ public function onClose(Server $server, int $fd, int $reactorId): void
$adapter->closeConnection($fd);
}

// Close the exported backend socket to unblock the forward
// coroutine's untimed recv() — Client::close() above cannot, since
// the Client no longer owns the fd after exportSocket().
$connection->backendSocket?->close();

$connection->reset();
}

Expand Down
16 changes: 13 additions & 3 deletions src/Server/TCP/Swoole/Coroutine.php
Original file line number Diff line number Diff line change
Expand Up @@ -226,17 +226,26 @@ protected function handleConnectionData(Connection $connection, int $port): void
/** @var Socket $backendSocket */
$backendSocket = $backendClient->exportSocket();

// Pipe reads must not inherit socket read timeouts: Client::connect()
// re-stamps the connect timeout as the socket's read timeout, so a
// bare recv() would return false after a few idle seconds —
// indistinguishable from a closed peer — and silently tear down one
// direction of a long-lived session. Use -1 (no timeout) and rely on
// TCP keepalive and FIN/RST for dead-peer detection.
\go(function () use ($clientSocket, $backendSocket, $bufferSize, $done): void {
while (true) {
/** @var string|false $data */
$data = $backendSocket->recv($bufferSize);
$data = $backendSocket->recv($bufferSize, -1);
if ($data === false || $data === '') {
break;
}
if ($clientSocket->sendAll($data) === false) {
break;
}
}
// Unblock the client read loop — it has no timeout, so a dead
// backend would otherwise leave it parked forever on an idle client.
$clientSocket->close();
$done->push(true);
});

Expand All @@ -251,7 +260,7 @@ protected function handleConnectionData(Connection $connection, int $port): void

while (true) {
/** @var string|false $data */
$data = $clientSocket->recv($bufferSize);
$data = $clientSocket->recv($bufferSize, -1);
if ($data === false || $data === '') {
break;
}
Expand All @@ -261,8 +270,9 @@ protected function handleConnectionData(Connection $connection, int $port): void
}

$backendSocket->close();
// The relay coroutine closes the client socket before signalling,
// so no further close is needed once pop() returns.
$done->pop();
$clientSocket->close();

$adapter->closeConnection($clientId);

Expand Down
126 changes: 126 additions & 0 deletions tests/Integration/CoroutinePipeIdleTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
<?php

namespace Utopia\Tests\Integration;

use PHPUnit\Framework\TestCase;
use Swoole\Coroutine;
use Swoole\Coroutine\Socket;
use Utopia\Proxy\Resolver\Fixed;
use Utopia\Proxy\Server\TCP\Config;
use Utopia\Proxy\Server\TCP\Swoole\Coroutine as CoroutineServer;

/**
* Regression test for the pipe-loop timeout bug: relay recv() calls with no
* timeout argument inherit the socket's read timeout. The backend client
* socket gets the connect timeout re-stamped as its read timeout by
* Client::connect(), so recv() returned false (ETIMEDOUT) after a few idle
* seconds — indistinguishable from a closed peer — and the backend→client
* relay silently died on idle long-lived sessions.
*
* The fix reads with recv($bufferSize, -1) and relies on TCP keepalive and
* FIN/RST for dead-peer detection.
*
* @group integration
*/
class CoroutinePipeIdleTest extends TestCase
{
protected function setUp(): void
{
if (! \extension_loaded('swoole')) {
$this->markTestSkipped('ext-swoole is required.');
}
}

public function testBackendRelaySurvivesIdleBeyondConnectTimeout(): void
{
$received = [];
$error = null;

Coroutine\run(function () use (&$received, &$error): void {
$server = null;
$client = null;

try {
// Backend that answers, goes idle for longer than the proxy's
// connect/read timeouts, then sends a delayed payload.
$listener = new Socket(\AF_INET, \SOCK_STREAM, 0);
$listener->bind('127.0.0.1', 0);
$listener->listen(128);

/** @var array{port: int} $address */
$address = $listener->getsockname();
$backendPort = $address['port'];

Coroutine::create(function () use ($listener): void {
/** @var Socket $peer */
$peer = $listener->accept(5.0);
$peer->recv(4096, 5.0);
$peer->send('first');
Coroutine::sleep(1.0);
$peer->send('second');
$peer->recv(4096, 5.0);
$peer->close();
$listener->close();
});

[$server, $proxyPort] = $this->startProxy($backendPort);
$server->start();
Coroutine::sleep(0.05);

$client = new Socket(\AF_INET, \SOCK_STREAM, 0);
$this->assertTrue($client->connect('127.0.0.1', $proxyPort, 2.0));
$this->assertNotFalse($client->send('init'));

$received[] = $client->recv(4096, 2.0);

// The backend stays silent for 1.0s here — longer than the
// 0.4s timeouts. Without the fix the backend→client relay
// breaks during this idle window and 'second' never arrives.
$received[] = $client->recv(4096, 3.0);
} catch (\Throwable $e) {
$error = $e;
} finally {
$client?->close();
$server?->shutdown();
}
});

if ($error !== null) {
throw $error;
}

$this->assertSame('first', $received[0] ?? null);
$this->assertSame('second', $received[1] ?? null);
}

/**
* Bind the proxy on a random port, retrying on collision.
*
* @return array{0: CoroutineServer, 1: int}
*/
private function startProxy(int $backendPort): array
{
$resolver = new Fixed("127.0.0.1:{$backendPort}");
$attempts = 0;

while (true) {
$proxyPort = \random_int(20_000, 60_000);

$config = new Config(
ports: [$proxyPort],
host: '127.0.0.1',
timeout: 0.4,
connectTimeout: 0.4,
skipValidation: true,
);

try {
return [new CoroutineServer($resolver, $config), $proxyPort];
} catch (\Swoole\Exception $e) {
if (++$attempts >= 10) {
throw $e;
}
}
}
}
}
Loading