Skip to content

Commit 6efa651

Browse files
committed
chore: add workerstop
1 parent eed8b7c commit 6efa651

File tree

7 files changed

+325
-229
lines changed

7 files changed

+325
-229
lines changed

src/Queue/Adapter/Swoole.php

Lines changed: 49 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@
33
namespace Utopia\Queue\Adapter;
44

55
use Swoole\Constant;
6+
use Swoole\Process;
67
use Swoole\Process\Pool;
7-
use Utopia\CLI\Console;
8+
use Utopia\Console;
89
use Utopia\Queue\Adapter;
910
use Utopia\Queue\Consumer;
1011

@@ -15,8 +16,12 @@ class Swoole extends Adapter
1516
/** @var callable */
1617
private $onStop;
1718

18-
public function __construct(Consumer $consumer, int $workerNum, string $queue, string $namespace = 'utopia-queue')
19-
{
19+
public function __construct(
20+
Consumer $consumer,
21+
int $workerNum,
22+
string $queue,
23+
string $namespace = "utopia-queue",
24+
) {
2025
parent::__construct($workerNum, $queue, $namespace);
2126

2227
$this->consumer = $consumer;
@@ -25,25 +30,22 @@ public function __construct(Consumer $consumer, int $workerNum, string $queue, s
2530

2631
public function start(): self
2732
{
28-
$this->pool->set(['enable_coroutine' => true]);
29-
30-
// Register signal handlers in the main process before starting pool
31-
if (extension_loaded('pcntl')) {
32-
pcntl_signal(SIGTERM, function () {
33-
Console::info("[Swoole] Received SIGTERM, initiating graceful shutdown...");
34-
$this->stop();
35-
});
36-
37-
pcntl_signal(SIGINT, function () {
38-
Console::info("[Swoole] Received SIGINT, initiating graceful shutdown...");
39-
$this->stop();
40-
});
33+
$this->pool->set(["enable_coroutine" => true]);
34+
35+
// Register signal handlers
36+
Process::signal(SIGTERM, function () {
37+
Console::info(
38+
"[Swoole] Received SIGTERM, initiating graceful shutdown...",
39+
);
40+
$this->stop();
41+
});
4142

42-
// Enable async signals
43-
pcntl_async_signals(true);
44-
} else {
45-
Console::warning("[Swoole] pcntl extension is not loaded, worker will not shutdown gracefully.");
46-
}
43+
Process::signal(SIGINT, function () {
44+
Console::info(
45+
"[Swoole] Received SIGINT, initiating graceful shutdown...",
46+
);
47+
$this->stop();
48+
});
4749

4850
$this->pool->start();
4951
return $this;
@@ -52,7 +54,7 @@ public function start(): self
5254
public function stop(): self
5355
{
5456
if ($this->onStop) {
55-
call_user_func($this->onStop);
57+
\call_user_func($this->onStop);
5658
}
5759

5860
Console::info("[Swoole] Shutting down process pool...");
@@ -63,23 +65,26 @@ public function stop(): self
6365

6466
public function workerStart(callable $callback): self
6567
{
66-
$this->pool->on(Constant::EVENT_WORKER_START, function (Pool $pool, string $workerId) use ($callback) {
67-
// Register signal handlers in each worker process for graceful shutdown
68-
if (extension_loaded('pcntl')) {
69-
pcntl_signal(SIGTERM, function () use ($workerId) {
70-
Console::info("[Worker] Worker {$workerId} received SIGTERM, closing consumer...");
71-
$this->consumer->close();
72-
});
73-
74-
pcntl_signal(SIGINT, function () use ($workerId) {
75-
Console::info("[Worker] Worker {$workerId} received SIGINT, closing consumer...");
76-
$this->consumer->close();
77-
});
78-
79-
pcntl_async_signals(true);
80-
}
81-
82-
call_user_func($callback, $workerId);
68+
$this->pool->on(Constant::EVENT_WORKER_START, function (
69+
Pool $pool,
70+
string $workerId,
71+
) use ($callback) {
72+
// Register signal handlers in worker
73+
Process::signal(SIGTERM, function () {
74+
Console::info(
75+
"[Swoole] Received SIGTERM, initiating graceful shutdown...",
76+
);
77+
$this->stop();
78+
});
79+
80+
Process::signal(SIGINT, function () {
81+
Console::info(
82+
"[Swoole] Received SIGINT, initiating graceful shutdown...",
83+
);
84+
$this->stop();
85+
});
86+
87+
\call_user_func($callback, $workerId);
8388
});
8489

8590
return $this;
@@ -88,8 +93,11 @@ public function workerStart(callable $callback): self
8893
public function workerStop(callable $callback): self
8994
{
9095
$this->onStop = $callback;
91-
$this->pool->on(Constant::EVENT_WORKER_STOP, function (Pool $pool, string $workerId) use ($callback) {
92-
call_user_func($callback, $workerId);
96+
$this->pool->on(Constant::EVENT_WORKER_STOP, function (
97+
Pool $pool,
98+
string $workerId,
99+
) use ($callback) {
100+
\call_user_func($callback, $workerId);
93101
});
94102

95103
return $this;

src/Queue/Broker/Pool.php

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,7 @@
1212
public function __construct(
1313
private ?UtopiaPool $publisher = null,
1414
private ?UtopiaPool $consumer = null,
15-
) {
16-
}
15+
) {}
1716

1817
public function enqueue(Queue $queue, array $payload): bool
1918
{
@@ -30,8 +29,12 @@ public function getQueueSize(Queue $queue, bool $failedJobs = false): int
3029
return $this->delegatePublish(__FUNCTION__, \func_get_args());
3130
}
3231

33-
public function consume(Queue $queue, callable $messageCallback, callable $successCallback, callable $errorCallback): void
34-
{
32+
public function consume(
33+
Queue $queue,
34+
callable $messageCallback,
35+
callable $successCallback,
36+
callable $errorCallback,
37+
): void {
3538
$this->delegateConsumer(__FUNCTION__, \func_get_args());
3639
}
3740

@@ -42,14 +45,20 @@ public function close(): void
4245

4346
protected function delegatePublish(string $method, array $args): mixed
4447
{
45-
return $this->publisher?->use(function (Publisher $adapter) use ($method, $args) {
48+
return $this->publisher?->use(function (Publisher $adapter) use (
49+
$method,
50+
$args,
51+
) {
4652
return $adapter->$method(...$args);
4753
});
4854
}
4955

5056
protected function delegateConsumer(string $method, array $args): mixed
5157
{
52-
return $this->consumer?->use(function (Consumer $adapter) use ($method, $args) {
58+
return $this->consumer?->use(function (Consumer $adapter) use (
59+
$method,
60+
$args,
61+
) {
5362
return $adapter->$method(...$args);
5463
});
5564
}

0 commit comments

Comments
 (0)