Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
"phpunit/phpunit": "11.*",
"laravel/pint": "1.*",
"phpstan/phpstan": "1.*",
"swoole/ide-helper": "5.1.2"
"swoole/ide-helper": "6.*"
},
"suggests": {
"ext-mongodb": "Needed to support MongoDB database pools",
Expand Down
14 changes: 7 additions & 7 deletions composer.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions src/Pools/Adapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ abstract public function count(): int;
* Execute a callback with lock protection if the adapter supports it
*
* @param callable $callback
* @param int $timeout Timeout in seconds
* @return mixed
*/
abstract public function synchronized(callable $callback, int $timeout): mixed;
abstract public function synchronized(callable $callback): mixed;
}
4 changes: 1 addition & 3 deletions src/Pools/Adapter/Stack.php
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,11 @@ public function count(): int
* Executes the callback without acquiring a lock.
*
* This implementation does not provide mutual exclusion.
* The `$timeout` parameter is ignored.
*
* @param callable $callback Callback to execute.
* @param int $timeout Ignored.
* @return mixed The value returned by the callback.
*/
public function synchronized(callable $callback, int $timeout): mixed
public function synchronized(callable $callback): mixed
{
return $callback();
}
Expand Down
26 changes: 12 additions & 14 deletions src/Pools/Adapter/Swoole.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,20 @@

use Utopia\Pools\Adapter;
use Swoole\Coroutine\Channel;
use Swoole\Coroutine\Lock;

class Swoole extends Adapter
{
protected Channel $pool;

protected Channel $lock;
// @phpstan-ignore-next-line
Comment thread
abnegate marked this conversation as resolved.
Outdated
protected Lock $lock;
public function initialize(int $size): static
{
$this->pool = new Channel($size);

// With channels, the current coroutine suspends and yields control to the event loop,
// allowing other coroutines to continue executing.
// Using a blocking lock freezes the worker thread, causing all coroutines in that
// worker to stop making progress.
$this->lock = new Channel(1);
$this->lock->push(true);
$this->pool = new Channel($size);
// @phpstan-ignore-next-line`
$this->lock = new Lock();
Comment thread
ArnabChatterjee20k marked this conversation as resolved.

return $this;
}
Expand Down Expand Up @@ -56,24 +54,24 @@ public function count(): int
* afterward, even if the callback throws an exception.
*
* @param callable $callback Callback to execute within the critical section.
* @param int $timeout Maximum time (in seconds) to wait for the lock.
* @return mixed The value returned by the callback.
*
* @throws \RuntimeException If the lock cannot be acquired within the timeout.
*/
public function synchronized(callable $callback, int $timeout): mixed
public function synchronized(callable $callback): mixed
{
$acquired = $this->lock->pop($timeout);
// @phpstan-ignore-next-line
$acquired = $this->lock->lock();

if (!$acquired) {
throw new \RuntimeException("Failed to acquire lock within {$timeout} seconds");
throw new \RuntimeException("Failed to acquire lock");
}

try {
return $callback();
} finally {
// Guaranteed to have space here; avoid timeouts so the token isn't lost.
$this->lock->push(true);
// @phpstan-ignore-next-line
$this->lock->unlock();
}
}
}
12 changes: 6 additions & 6 deletions src/Pools/Pool.php
Original file line number Diff line number Diff line change
Expand Up @@ -266,19 +266,19 @@ public function pop(): Connection
return true;
}
return false;
}, timeout: $this->getSynchronizationTimeout());
});

if ($shouldCreateConnections) {
try {
$connection = $this->createConnection();
$this->pool->synchronized(function () use ($connection) {
$this->active[$connection->getID()] = $connection;
}, timeout: $this->getSynchronizationTimeout());
});
return $connection;
} catch (\Exception $e) {
$this->pool->synchronized(function () {
$this->connectionsCreated--;
}, timeout: $this->getSynchronizationTimeout());
});
throw $e;
}
}
Expand All @@ -296,7 +296,7 @@ public function pop(): Connection
if ($connection instanceof Connection) {
$this->pool->synchronized(function () use ($connection) {
$this->active[$connection->getID()] = $connection;
}, timeout: $this->getSynchronizationTimeout());
});
return $connection;
}
}
Expand Down Expand Up @@ -406,15 +406,15 @@ private function destroyConnection(?Connection $connection = null): static
return true;
};
return false;
}, timeout: $this->getSynchronizationTimeout());
});

if ($shouldCreate) {
try {
$this->pool->push($this->createConnection());
} catch (Exception $e) {
$this->pool->synchronized(function () {
$this->connectionsCreated--;
}, timeout: $this->getSynchronizationTimeout());
});
throw $e;
}
}
Expand Down
5 changes: 5 additions & 0 deletions tests/Pools/Adapter/SwooleTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -353,4 +353,9 @@ public function testSwooleCoroutineStressTest(): void
$this->assertSame(10, $pool->count(), 'Pool should have all connections back');
});
}
public function testInitOutsideCoroutineNotThrowAnyError(): void
{
$pool = new Pool(new Swoole(), 'test', 1, fn () => 'x');

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar to the appwrite Realtime pubsub

$this->assertInstanceOf(Pool::class, $pool);
}
}