Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 15 additions & 10 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ services:
container_name: tests
build: .
volumes:
- ./src:/usr/local/src/src
- ./tests:/usr/local/src/tests
- ./vendor:/usr/src/code/vendor
- ./src:/usr/src/code/src
- ./tests:/usr/src/code/tests
depends_on:
- swoole
- swoole-amqp
Expand All @@ -16,8 +17,9 @@ services:
build: ./tests/Queue/servers/Swoole/.
command: php /usr/src/code/tests/Queue/servers/Swoole/worker.php
volumes:
- ./src:/usr/local/src/src
- ./tests:/usr/local/src/tests
- ./vendor:/usr/src/code/vendor
- ./src:/usr/src/code/src
- ./tests:/usr/src/code/tests
depends_on:
- redis

Expand All @@ -26,8 +28,9 @@ services:
build: ./tests/Queue/servers/SwooleRedisCluster/.
command: php /usr/src/code/tests/Queue/servers/SwooleRedisCluster/worker.php
volumes:
- ./src:/usr/local/src/src
- ./tests:/usr/local/src/tests
- ./vendor:/usr/src/code/vendor
- ./src:/usr/src/code/src
- ./tests:/usr/src/code/tests
depends_on:
redis-cluster-0:
condition: service_healthy
Expand All @@ -37,8 +40,9 @@ services:
build: ./tests/Queue/servers/AMQP/.
command: php /usr/src/code/tests/Queue/servers/AMQP/worker.php
volumes:
- ./src:/usr/local/src/src
- ./tests:/usr/local/src/tests
- ./vendor:/usr/src/code/vendor
- ./src:/usr/src/code/src
- ./tests:/usr/src/code/tests
depends_on:
amqp:
condition: service_healthy
Expand All @@ -48,8 +52,9 @@ services:
build: ./tests/Queue/servers/Workerman/.
command: php /usr/src/code/tests/Queue/servers/Workerman/worker.php start
volumes:
- ./src:/usr/local/src/src
- ./tests:/usr/local/src/tests
- ./vendor:/usr/src/code/vendor
- ./src:/usr/src/code/src
- ./tests:/usr/src/code/tests
depends_on:
- redis

Expand Down
5 changes: 4 additions & 1 deletion pint.json
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
{
"preset": "psr12"
"preset": "psr12",
"rules": {
"single_quote": true
}
}
51 changes: 45 additions & 6 deletions src/Queue/Adapter/Swoole.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,26 @@

namespace Utopia\Queue\Adapter;

use Swoole\Constant;
use Swoole\Process;
use Swoole\Process\Pool;

use Utopia\Console;
use Utopia\Queue\Adapter;
use Utopia\Queue\Consumer;

use function Swoole\Coroutine\go;

class Swoole extends Adapter
{
protected Pool $pool;

public function __construct(Consumer $consumer, int $workerNum, string $queue, string $namespace = 'utopia-queue')
{
public function __construct(
Consumer $consumer,
int $workerNum,
string $queue,
string $namespace = 'utopia-queue',
) {
parent::__construct($workerNum, $queue, $namespace);

$this->consumer = $consumer;
Expand All @@ -20,30 +30,59 @@ public function __construct(Consumer $consumer, int $workerNum, string $queue, s

public function start(): self
{
// Enable coroutine hooks for Redis and other extensions
$this->pool->set(['enable_coroutine' => true]);

$this->pool->start();
return $this;
}

public function stop(): self
{
Console::info('[Swoole] Shutting down process pool...');
$this->pool->shutdown();
Console::success('[Swoole] Process pool stopped.');
return $this;
}

public function workerStart(callable $callback): self
{
$this->pool->on('WorkerStart', function (Pool $pool, string $workerId) use ($callback) {
call_user_func($callback, $workerId);
$this->pool->on(Constant::EVENT_WORKER_START, function (
Pool $pool,
string $workerId,
) use ($callback) {
// Register signal handlers for graceful shutdown
Process::signal(SIGTERM, function () use ($workerId) {
Console::info(
"[Swoole] Worker {$workerId} received SIGTERM, stopping consumer...",
);
$this->consumer->close();
});

Process::signal(SIGINT, function () use ($workerId) {
Console::info(
"[Swoole] Worker {$workerId} received SIGINT, stopping consumer...",
);
$this->consumer->close();
});

// Run consume loop in a coroutine to allow event loop to process signals
// The coroutine container waits for all child coroutines before worker exits
go(function () use ($callback, $workerId) {
\call_user_func($callback, $workerId);
});
});

return $this;
}

public function workerStop(callable $callback): self
{
$this->pool->on('WorkerStart', function (Pool $pool, string $workerId) use ($callback) {
call_user_func($callback, $workerId);
$this->pool->on(Constant::EVENT_WORKER_STOP, function (
Pool $pool,
string $workerId,
) use ($callback) {
\call_user_func($callback, $workerId);
});

return $this;
Expand Down
5 changes: 3 additions & 2 deletions src/Queue/Broker/AMQP.php
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public function consume(Queue $queue, callable $messageCallback, callable $succe
$channel->exchange_declare("{$queue->namespace}.failed", AMQPExchangeType::TOPIC, durable: true, auto_delete: false, arguments: new AMQPTable($this->exchangeArguments));

// 2. Declare the working queue and configure the DLX for receiving rejected messages.
$channel->queue_declare($queue->name, durable: true, auto_delete: false, arguments: new AMQPTable(array_merge($this->queueArguments, ["x-dead-letter-exchange" => "{$queue->namespace}.failed"])));
$channel->queue_declare($queue->name, durable: true, auto_delete: false, arguments: new AMQPTable(array_merge($this->queueArguments, ['x-dead-letter-exchange' => "{$queue->namespace}.failed"])));
$channel->queue_bind($queue->name, $queue->namespace, routing_key: $queue->name);

// 3. Declare the dead-letter-queue and bind it to the DLX.
Expand All @@ -131,6 +131,7 @@ public function consume(Queue $queue, callable $messageCallback, callable $succe

public function close(): void
{
$this->channel?->stopConsume();
$this->channel?->getConnection()?->close();
}

Expand Down Expand Up @@ -161,7 +162,7 @@ public function getQueueSize(Queue $queue, bool $failedJobs = false): int
{
$queueName = $queue->name;
if ($failedJobs) {
$queueName = $queueName . ".failed";
$queueName = $queueName . '.failed';
}

$client = new Client();
Expand Down
20 changes: 15 additions & 5 deletions src/Queue/Broker/Pool.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,26 +30,36 @@ public function getQueueSize(Queue $queue, bool $failedJobs = false): int
return $this->delegatePublish(__FUNCTION__, \func_get_args());
}

public function consume(Queue $queue, callable $messageCallback, callable $successCallback, callable $errorCallback): void
{
public function consume(
Queue $queue,
callable $messageCallback,
callable $successCallback,
callable $errorCallback,
): void {
$this->delegateConsumer(__FUNCTION__, \func_get_args());
}

public function close(): void
{
$this->delegateConsumer(__FUNCTION__, \func_get_args());
// TODO: Implement closing all connections in the pool
}

protected function delegatePublish(string $method, array $args): mixed
{
return $this->publisher?->use(function (Publisher $adapter) use ($method, $args) {
return $this->publisher?->use(function (Publisher $adapter) use (
$method,
$args,
) {
return $adapter->$method(...$args);
});
}

protected function delegateConsumer(string $method, array $args): mixed
{
return $this->consumer?->use(function (Consumer $adapter) use ($method, $args) {
return $this->consumer?->use(function (Consumer $adapter) use (
$method,
$args,
) {
return $adapter->$method(...$args);
});
}
Expand Down
1 change: 1 addition & 0 deletions src/Queue/Broker/Redis.php
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ public function consume(Queue $queue, callable $messageCallback, callable $succe
public function close(): void
{
$this->closed = true;
$this->connection->close();
}

public function enqueue(Queue $queue, array $payload): bool
Expand Down
1 change: 1 addition & 0 deletions src/Queue/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,5 @@ public function setArray(string $key, array $value): bool;
public function increment(string $key): int;
public function decrement(string $key): int;
public function ping(): bool;
public function close(): void;
}
6 changes: 6 additions & 0 deletions src/Queue/Connection/Redis.php
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,12 @@ public function ping(): bool
}
}

public function close(): void
{
$this->redis?->close();
$this->redis = null;
}

protected function getRedis(): \Redis
{
if ($this->redis) {
Expand Down
6 changes: 6 additions & 0 deletions src/Queue/Connection/RedisCluster.php
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,12 @@ public function ping(): bool
}
}

public function close(): void
{
$this->redis?->close();
$this->redis = null;
}

protected function getRedis(): \RedisCluster
{
if ($this->redis) {
Expand Down
2 changes: 1 addition & 1 deletion src/Queue/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ public function __construct(
public string $namespace = 'utopia-queue',
) {
if (empty($this->name)) {
throw new \InvalidArgumentException("Cannot create queue with empty name.");
throw new \InvalidArgumentException('Cannot create queue with empty name.');
}
}
}
Loading