Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion src/Adapter/TCP.php
Original file line number Diff line number Diff line change
Expand Up @@ -205,9 +205,27 @@ public function getConnection(string $data, int $fd): Client
}

$result = $this->route($data);
[$host, $port] = self::parseEndpoint($result->endpoint, $this->port);
$initialData = $this->transformInitialData($data, $result);

return $this->connect($result, $fd, $initialData);
}

/**
* Establish and cache a backend connection from an already resolved route.
*
* Protocol-aware callers can resolve after custom negotiation, then reuse
* the same backend dialing and socket-option path as getConnection().
*
* @throws \Exception
*/
public function connect(ConnectionResult $result, int $fd, string $initialData = ''): Client
{
if (isset($this->connections[$fd])) {
return $this->connections[$fd];
}

[$host, $port] = self::parseEndpoint($result->endpoint, $this->port);

$client = new Client(SWOOLE_SOCK_TCP);
$client->set([
'timeout' => $this->timeout,
Expand Down
1 change: 1 addition & 0 deletions src/Server/TCP/Config.php
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public function __construct(
public readonly int $cacheTTL = 0,
public readonly ?TLS $tls = null,
public readonly ?\Closure $adapterFactory = null,
public readonly ?\Closure $connectionHandler = null,
) {
$cpus = \swoole_cpu_num();
$this->workers = $workers ?? $cpus;
Expand Down
5 changes: 1 addition & 4 deletions src/Server/TCP/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
*
* One instance lives in an SplFixedArray slot keyed by file descriptor while
* a client is connected. Replaces the previous map of three independent
* associative arrays (backends, ports, pending TLS) with a single cache-line
* associative arrays (backends and ports) with a single cache-line
* friendly object lookup.
*/
class Connection
Expand All @@ -18,8 +18,6 @@ class Connection

public int $port = 0;

public bool $pendingTls = false;

public int $inbound = 0;

public int $outbound = 0;
Expand All @@ -28,7 +26,6 @@ public function reset(): void
{
$this->backend = null;
$this->port = 0;
$this->pendingTls = false;
$this->inbound = 0;
$this->outbound = 0;
}
Expand Down
45 changes: 23 additions & 22 deletions src/Server/TCP/Swoole.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,9 @@
* PROCESS mode. This matches HAProxy's nbthread-per-core model and roughly
* doubles small-request throughput on CPU-bound forwarding workloads.
*
* Supports optional TLS termination:
* - PostgreSQL: STARTTLS via SSLRequest/SSLResponse handshake
* - MySQL: SSL capability flag in server greeting
*
* When TLS is enabled, the server uses SWOOLE_SOCK_TCP | SWOOLE_SSL socket type
* and Swoole handles the TLS handshake natively. For PostgreSQL STARTTLS, the
* proxy intercepts the SSLRequest message, responds with 'S', and Swoole
* upgrades the connection to TLS before forwarding the subsequent startup message.
* Supports optional TLS termination for protocols that start with TLS
* immediately after accept. Protocols that negotiate before routing should
* use the coroutine server with a custom connection handler.
*
* Example:
* ```php
Expand Down Expand Up @@ -286,10 +281,6 @@ public function onWorkerStart(Server $server, int $workerId): void

/**
* Main receive handler
*
* When TLS is enabled, handles protocol-specific SSL negotiation:
* - PostgreSQL: Intercepts SSLRequest, responds 'S', Swoole upgrades to TLS
* - MySQL: Swoole handles SSL natively via SWOOLE_SSL socket type
*/
public function onReceive(Server $server, int $fd, string $data, int $port): void
{
Expand Down Expand Up @@ -319,16 +310,6 @@ public function onReceive(Server $server, int $fd, string $data, int $port): voi
$this->connections[$fd] = $connection;
}

// Handle PostgreSQL STARTTLS: SSLRequest comes before the real startup message.
if ($this->tlsContext !== null && $port === 5432 && TLS::isPostgreSQLSSLRequest($data)) {
$server->send($fd, TLS::PG_SSL_RESPONSE_OK);
$connection->pendingTls = true;

return;
}

$connection->pendingTls = false;

try {
$adapter = $this->adapters[$port] ?? null;
if ($adapter === null) {
Expand Down Expand Up @@ -393,6 +374,26 @@ protected function forward(Server $server, int $clientFd, Client $backend): void
});
}

/**
* @return array<string, mixed>
*/
public function stats(): array
{
$stats = $this->server->stats();
if (!\is_array($stats)) {
return [];
}

$normalized = [];
foreach ($stats as $key => $value) {
if (\is_string($key)) {
$normalized[$key] = $value;
}
}

return $normalized;
}

public function onClose(Server $server, int $fd, int $reactorId): void
{
if ($this->config->logConnections) {
Expand Down
95 changes: 70 additions & 25 deletions src/Server/TCP/Swoole/Coroutine.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@
* on most workloads — this one is kept for users who need coroutine-level
* control over each connection (e.g. custom protocol state machines).
*
* Supports optional TLS termination:
* - PostgreSQL: STARTTLS via SSLRequest/SSLResponse handshake
* - MySQL: SSL capability flag in server greeting
* Supports optional custom per-connection handling for protocols that need
* application-level negotiation before the proxy can route a backend.
*
* Example:
* ```php
Expand All @@ -44,12 +43,16 @@ class Coroutine
/** @var array<int, TCPAdapter> */
protected array $adapters = [];

protected int $connections = 0;

protected Config $config;

protected ?TLSContext $tlsContext = null;

protected ?Resolver $resolver;

protected ?int $gcTimer = null;

public function __construct(
?Resolver $resolver = null,
?Config $config = null,
Expand Down Expand Up @@ -106,10 +109,10 @@ protected function configureServers(): void
'log_level' => $this->config->logLevel,
]);

$ssl = $this->tlsContext !== null;

foreach ($this->config->ports as $port) {
$server = new CoroutineServer($this->config->host, $port, $ssl, $this->config->enableReusePort);
// Custom handlers may need plaintext protocol bytes before TLS.
// Keep the listener plain so callers can decide when to upgrade.
$server = new CoroutineServer($this->config->host, $port, false, $this->config->enableReusePort);

$settings = [
'open_tcp_nodelay' => true,
Expand All @@ -122,10 +125,6 @@ protected function configureServers(): void
'buffer_output_size' => $this->config->bufferOutputSize,
];

if ($this->tlsContext !== null) {
$settings = \array_merge($settings, $this->tlsContext->toSwooleConfig());
}

$server->set($settings);

$server->handle(function (Connection $connection) use ($port): void {
Expand Down Expand Up @@ -159,7 +158,7 @@ public function onStart(): void
public function onWorkerStart(int $workerId = 0): void
{
\gc_disable();
Timer::tick($this->config->gcIntervalMs, static function (): void {
$this->gcTimer = Timer::tick($this->config->gcIntervalMs, static function (): void {
\gc_collect_cycles();
});

Expand All @@ -169,6 +168,17 @@ public function onWorkerStart(int $workerId = 0): void
}

protected function handleConnection(Connection $connection, int $port): void
{
$this->connections++;

try {
$this->handleConnectionData($connection, $port);
} finally {
$this->connections--;
}
}

protected function handleConnectionData(Connection $connection, int $port): void
{
/** @var Socket $clientSocket */
$clientSocket = $connection->exportSocket();
Expand All @@ -180,6 +190,20 @@ protected function handleConnection(Connection $connection, int $port): void
Console::log("Client #{$clientId} connected to port {$port}");
}

if ($this->config->connectionHandler !== null) {
$handled = ($this->config->connectionHandler)(
$connection,
$port,
$adapter,
$this->config,
$this->tlsContext,
);

if ($handled === true) {
return;
}
}

/** @var string|false $data */
$data = $clientSocket->recv($bufferSize);
if ($data === false || $data === '') {
Expand All @@ -188,20 +212,6 @@ protected function handleConnection(Connection $connection, int $port): void
return;
}

// PostgreSQL STARTTLS: clients send an SSLRequest before the startup
// message. Respond with 'S' and read the real startup packet.
if ($this->tlsContext !== null && $port === 5432 && TLS::isPostgreSQLSSLRequest($data)) {
$clientSocket->sendAll(TLS::PG_SSL_RESPONSE_OK);

/** @var string|false $data */
$data = $clientSocket->recv($bufferSize);
if ($data === false || $data === '') {
$clientSocket->close();

return;
}
}

$done = new Channel(1);

try {
Expand Down Expand Up @@ -261,6 +271,29 @@ protected function handleConnection(Connection $connection, int $port): void
}
}

/**
* @return array{connection_num: int}
*/
public function stats(): array
{
return [
'connection_num' => $this->connections,
];
}

protected function startTLS(Socket $socket): bool
{
if ($this->tlsContext === null) {
return false;
}

if (!$socket->setProtocol($this->tlsContext->toSwooleProtocolConfig())) {
return false;
}

return $socket->sslHandshake();
}

public function start(): void
{
$runner = function (): void {
Expand All @@ -283,6 +316,18 @@ public function start(): void
SwooleCoroutine\run($runner);
}

public function shutdown(): void
{
if ($this->gcTimer !== null) {
Timer::clear($this->gcTimer);
$this->gcTimer = null;
}

foreach ($this->servers as $server) {
$server->shutdown();
}
}

/**
* Report whether the JIT is actually enabled inside this Swoole worker.
*
Expand Down
65 changes: 3 additions & 62 deletions src/Server/TCP/TLS.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
* Holds certificate paths, protocol constraints, cipher configuration,
* and mTLS (mutual TLS) settings for TLS-terminated TCP connections.
*
* Supports:
* - PostgreSQL STARTTLS (SSLRequest upgrade from plaintext)
* - MySQL SSL handshake (SSL capability flag in server greeting)
* This class only describes TLS material and policy. Protocols that negotiate
* TLS after plaintext bytes should implement that negotiation outside the
* generic proxy core.
*
* Example:
* ```php
Expand All @@ -27,28 +27,6 @@
*/
class TLS
{
/**
* PostgreSQL SSLRequest message (8 bytes):
* - Int32(8): message length
* - Int32(80877103): SSL request code
*/
public const PG_SSL_REQUEST = "\x00\x00\x00\x08\x04\xd2\x16\x2f";

/**
* PostgreSQL SSLResponse: server willing to accept SSL
*/
public const PG_SSL_RESPONSE_OK = 'S';

/**
* PostgreSQL SSLResponse: server unwilling to accept SSL
*/
public const PG_SSL_RESPONSE_REJECT = 'N';

/**
* MySQL capability flag: CLIENT_SSL (0x00000800)
*/
public const MYSQL_CLIENT_SSL_FLAG = 0x00000800;

/**
* Default cipher suites — strong, modern, broadly compatible
*/
Expand Down Expand Up @@ -123,41 +101,4 @@ public function isMutual(): bool
return $this->requireClientCert && $this->ca !== '';
}

/**
* Detect whether a raw data packet is a PostgreSQL SSLRequest message
*
* The SSLRequest is exactly 8 bytes:
* - Int32(8): length
* - Int32(80877103): SSL request code (0x04D2162F)
*/
public static function isPostgreSQLSSLRequest(string $data): bool
{
return strlen($data) === 8 && $data === self::PG_SSL_REQUEST;
}

/**
* Detect whether a raw data packet is a MySQL SSL handshake request
*
* After receiving the server greeting with SSL capability flag,
* the client sends an SSL request packet. This is identified by:
* - Packet length >= 4 bytes (header)
* - Capability flags in bytes 4-7 include CLIENT_SSL (0x0800)
* - Sequence ID = 1 (byte 3)
*/
public static function isMySQLSSLRequest(string $data): bool
{
if (strlen($data) < 36) {
return false;
}

// Sequence ID should be 1 (client response to server greeting)
if (ord($data[3]) !== 1) {
return false;
}

// Read capability flags (little-endian uint16 at offset 4)
$capLow = ord($data[4]) | (ord($data[5]) << 8);

return ($capLow & self::MYSQL_CLIENT_SSL_FLAG) !== 0;
}
}
Loading
Loading