Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions src/Pools/Adapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ abstract public function count(): int;
* Execute a callback with lock protection if the adapter supports it
*
* @param callable $callback
* @param int $timeout Timeout in seconds
* @return mixed
*/
abstract public function synchronized(callable $callback, int $timeout): mixed;
abstract public function synchronized(callable $callback): mixed;
}
4 changes: 1 addition & 3 deletions src/Pools/Adapter/Stack.php
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,11 @@ public function count(): int
* Executes the callback without acquiring a lock.
*
* This implementation does not provide mutual exclusion.
* The `$timeout` parameter is ignored.
*
* @param callable $callback Callback to execute.
* @param int $timeout Ignored.
* @return mixed The value returned by the callback.
*/
public function synchronized(callable $callback, int $timeout): mixed
public function synchronized(callable $callback): mixed
{
return $callback();
}
Expand Down
22 changes: 8 additions & 14 deletions src/Pools/Adapter/Swoole.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,18 @@

use Utopia\Pools\Adapter;
use Swoole\Coroutine\Channel;
use Swoole\Coroutine\Lock;

class Swoole extends Adapter
{
protected Channel $pool;

protected Channel $lock;
protected Lock $lock;
public function initialize(int $size): static
{
$this->pool = new Channel($size);

// With channels, the current coroutine suspends and yields control to the event loop,
// allowing other coroutines to continue executing.
// Using a blocking lock freezes the worker thread, causing all coroutines in that
// worker to stop making progress.
$this->lock = new Channel(1);
$this->lock->push(true);
$this->pool = new Channel($size);
$this->lock = new Lock();
Comment thread
ArnabChatterjee20k marked this conversation as resolved.

return $this;
}
Expand Down Expand Up @@ -56,24 +52,22 @@ public function count(): int
* afterward, even if the callback throws an exception.
*
* @param callable $callback Callback to execute within the critical section.
* @param int $timeout Maximum time (in seconds) to wait for the lock.
* @return mixed The value returned by the callback.
*
* @throws \RuntimeException If the lock cannot be acquired within the timeout.
*/
public function synchronized(callable $callback, int $timeout): mixed
public function synchronized(callable $callback): mixed
{
$acquired = $this->lock->pop($timeout);
$acquired = $this->lock->lock();

if (!$acquired) {
throw new \RuntimeException("Failed to acquire lock within {$timeout} seconds");
throw new \RuntimeException("Failed to acquire lock");
}

try {
return $callback();
} finally {
// Guaranteed to have space here; avoid timeouts so the token isn't lost.
$this->lock->push(true);
$this->lock->unlock();
}
}
}
12 changes: 6 additions & 6 deletions src/Pools/Pool.php
Original file line number Diff line number Diff line change
Expand Up @@ -266,19 +266,19 @@ public function pop(): Connection
return true;
}
return false;
}, timeout: $this->getSynchronizationTimeout());
});

if ($shouldCreateConnections) {
try {
$connection = $this->createConnection();
$this->pool->synchronized(function () use ($connection) {
$this->active[$connection->getID()] = $connection;
}, timeout: $this->getSynchronizationTimeout());
});
return $connection;
} catch (\Exception $e) {
$this->pool->synchronized(function () {
$this->connectionsCreated--;
}, timeout: $this->getSynchronizationTimeout());
});
throw $e;
}
}
Expand All @@ -296,7 +296,7 @@ public function pop(): Connection
if ($connection instanceof Connection) {
$this->pool->synchronized(function () use ($connection) {
$this->active[$connection->getID()] = $connection;
}, timeout: $this->getSynchronizationTimeout());
});
return $connection;
}
}
Expand Down Expand Up @@ -406,15 +406,15 @@ private function destroyConnection(?Connection $connection = null): static
return true;
};
return false;
}, timeout: $this->getSynchronizationTimeout());
});

if ($shouldCreate) {
try {
$this->pool->push($this->createConnection());
} catch (Exception $e) {
$this->pool->synchronized(function () {
$this->connectionsCreated--;
}, timeout: $this->getSynchronizationTimeout());
});
throw $e;
}
}
Expand Down
5 changes: 5 additions & 0 deletions tests/Pools/Adapter/SwooleTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -353,4 +353,9 @@ public function testSwooleCoroutineStressTest(): void
$this->assertSame(10, $pool->count(), 'Pool should have all connections back');
});
}
public function testInitOutsideCoroutineNotThrowAnyError(): void
{
$pool = new Pool(new Swoole(), 'test', 1, fn () => 'x');

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar to the appwrite Realtime pubsub

$this->assertInstanceOf(Pool::class, $pool);
}
}
Loading