-
Notifications
You must be signed in to change notification settings - Fork 3
feat: add workerStop handler, graceful exits #53
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
This comment was marked as spam.
This comment was marked as spam.
a8a21e0 to
f51092a
Compare
f51092a to
6a7d673
Compare
6a7d673 to
e078822
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Graceful shutdown and lifecycle hooks for queue workers across adapters, plus test container updates for signal support.
- Add per-worker stop hooks and worker stop event handling.
- Introduce signal-based graceful shutdown for Swoole and ensure AMQP consumers stop before closing.
- Standardize test container volumes and enable pcntl in images.
Reviewed Changes
Copilot reviewed 11 out of 11 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/Queue/servers/Workerman/worker.php | Adds workerStop hook usage for test worker lifecycle. |
| tests/Queue/servers/SwooleRedisCluster/worker.php | Adds workerStop hook usage for cluster worker lifecycle. |
| tests/Queue/servers/Swoole/worker.php | Adds workerStop hook usage for Swoole worker lifecycle. |
| tests/Queue/servers/AMQP/worker.php | Adds workerStop hook usage for AMQP worker lifecycle. |
| tests/Queue/servers/SwooleRedisCluster/Dockerfile | Enables pcntl for proper signal handling in tests. |
| tests/Queue/servers/Swoole/Dockerfile | Enables pcntl for proper signal handling in tests. |
| tests/Queue/servers/AMQP/Dockerfile | Enables pcntl for proper signal handling in tests. |
| src/Queue/Server.php | Adds workerStop Hook API and integrates stop handling into worker lifecycle. |
| src/Queue/Broker/AMQP.php | Attempts to stop consumption before closing channel/connection. |
| src/Queue/Adapter/Swoole.php | Adds signal handlers and proper EVENT_* usage; introduces main-process stop path. |
| docker-compose.yml | Standardizes container code paths and volumes for tests. |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 11 out of 11 changed files in this pull request and generated 6 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
src/Queue/Adapter/Swoole.php
Outdated
| /** @var callable */ | ||
| private $onStop; |
Copilot
AI
Dec 2, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The property $onStop is declared without a nullable type or default value. This should be declared as private ?callable $onStop = null; to be explicit about its nullable nature and avoid potential issues.
| /** @var callable */ | |
| private $onStop; | |
| private ?callable $onStop = null; |
6efa651 to
8dba579
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (5)
src/Queue/Adapter/Swoole.php (4)
93-104: Mirror the parameter type fixes inworkerStop().Apply the same parameter corrections as in
workerStart(): rename$poolto$_pooland type$workerIdasint.Apply this diff:
- $this->pool->on(Constant::EVENT_WORKER_STOP, function ( - Pool $pool, - string $workerId, - ) use ($callback) { + $this->pool->on(Constant::EVENT_WORKER_STOP, function ( + Pool $_pool, + int $workerId, + ) use ($callback) { \call_user_func($callback, $workerId); });
54-64: Remove the directonStopinvocation fromstop().Calling
$this->onStop()without a$workerIdparameter will cause anArgumentCountError, as the callback registered viaworkerStop()expects a$workerIdargument (see line 100 andServer.phpline 377). TheEVENT_WORKER_STOPhandler already invokes the callback correctly for each worker.Apply this diff:
public function stop(): self { - if ($this->onStop) { - \call_user_func($this->onStop); - } - Console::info("[Swoole] Shutting down process pool..."); $this->pool->shutdown(); Console::success("[Swoole] Process pool stopped."); return $this; }
66-91: Fix parameter types and silence PHPMD warning.Two issues with the event handler signature:
- The
$poolparameter is unused and should be renamed to$_poolto silence PHPMD.- Swoole passes
$workerIdasint, notstring.Apply this diff:
- $this->pool->on(Constant::EVENT_WORKER_START, function ( - Pool $pool, - string $workerId, - ) use ($callback) { + $this->pool->on(Constant::EVENT_WORKER_START, function ( + Pool $_pool, + int $workerId, + ) use ($callback) { // Register signal handlers in worker Process::signal(SIGTERM, function () {
16-17: Type the$onStopproperty explicitly.The property should use a typed declaration instead of a docblock to prevent accidental non-callable assignments and clarify intent.
Apply this diff:
- /** @var callable */ - private $onStop; + private ?callable $onStop = null;src/Queue/Server.php (1)
106-110: Guard against undefined index when accessing resource callbacks.Line 109 accesses
self::$resourcesCallbacks[$name]["reset"]before line 111 checks if$nameexists in the array. This can trigger an undefined index warning.Apply this diff to reorder the checks:
if ( !\array_key_exists($name, $this->resources) || $fresh || - self::$resourcesCallbacks[$name]["reset"] + (isset(self::$resourcesCallbacks[$name]) && self::$resourcesCallbacks[$name]["reset"]) ) { if (!\array_key_exists($name, self::$resourcesCallbacks)) {
🧹 Nitpick comments (1)
src/Queue/Server.php (1)
428-443: LGTM!The
workerStop()andgetWorkerStop()methods mirror theworkerStart()pattern correctly, enabling registration and retrieval of multiple stop hooks.Optional: The docblock on line 437 could be refined to "Returns worker stop hooks." for better grammar, but this is a minor nitpick.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (12)
docker-compose.yml(5 hunks)src/Queue/Adapter/Swoole.php(2 hunks)src/Queue/Broker/AMQP.php(1 hunks)src/Queue/Broker/Pool.php(2 hunks)src/Queue/Server.php(11 hunks)tests/Queue/servers/AMQP/Dockerfile(1 hunks)tests/Queue/servers/AMQP/worker.php(1 hunks)tests/Queue/servers/Swoole/Dockerfile(1 hunks)tests/Queue/servers/Swoole/worker.php(1 hunks)tests/Queue/servers/SwooleRedisCluster/Dockerfile(1 hunks)tests/Queue/servers/SwooleRedisCluster/worker.php(1 hunks)tests/Queue/servers/Workerman/worker.php(1 hunks)
✅ Files skipped from review due to trivial changes (1)
- src/Queue/Broker/Pool.php
🚧 Files skipped from review as they are similar to previous changes (5)
- tests/Queue/servers/SwooleRedisCluster/Dockerfile
- tests/Queue/servers/Swoole/Dockerfile
- src/Queue/Broker/AMQP.php
- tests/Queue/servers/Workerman/worker.php
- tests/Queue/servers/AMQP/worker.php
🧰 Additional context used
🧬 Code graph analysis (2)
src/Queue/Adapter/Swoole.php (2)
src/Queue/Adapter.php (6)
Adapter(5-49)__construct(12-16)stop(28-28)start(22-22)workerStart(35-35)workerStop(42-42)src/Queue/Server.php (5)
__construct(84-88)stop(236-247)start(266-401)workerStart(407-413)workerStop(428-434)
tests/Queue/servers/Swoole/worker.php (5)
src/Queue/Server.php (4)
Server(14-530)job(90-94)workerStart(407-413)workerStop(428-434)src/Queue/Adapter.php (3)
Adapter(5-49)workerStart(35-35)workerStop(42-42)src/Queue/Adapter/Swoole.php (3)
Swoole(12-110)workerStart(66-91)workerStop(93-104)src/Queue/Connection/Redis.php (1)
Redis(7-184)tests/Queue/servers/tests.php (1)
handleRequest(5-47)
🪛 PHPMD (2.15.0)
src/Queue/Adapter/Swoole.php
69-69: Avoid unused parameters such as '$pool'. (undefined)
(UnusedFormalParameter)
97-97: Avoid unused parameters such as '$pool'. (undefined)
(UnusedFormalParameter)
🔇 Additional comments (14)
tests/Queue/servers/AMQP/Dockerfile (1)
5-5: LGTM!Installing the PCNTL extension is necessary for the signal-handling features introduced in this PR.
docker-compose.yml (1)
6-8: LGTM!Standardizing volume mount paths to
/usr/src/codeacross all services improves consistency and maintainability.Also applies to: 20-22, 31-33, 43-45, 55-57
tests/Queue/servers/Swoole/worker.php (3)
3-9: LGTM!The updated require paths and explicit imports improve code clarity and align with the refactored queue infrastructure.
11-26: LGTM!The refactored server setup is clean and follows the new chained API pattern. The use of
handleRequest(...)with the spread operator is appropriate.
28-30: LGTM!The new
workerStop()hook correctly registers a shutdown action, mirroring theworkerStart()pattern. This enables graceful cleanup when workers terminate.tests/Queue/servers/SwooleRedisCluster/worker.php (2)
3-21: LGTM!The refactored setup correctly instantiates a Redis cluster-backed consumer and follows the same clean API pattern as the other test workers.
30-36: LGTM!Both
workerStart()andworkerStop()hooks are correctly implemented and consistent with the other test worker implementations.src/Queue/Adapter/Swoole.php (1)
31-52: LGTM!The signal handling using
Process::signalfor graceful shutdown is appropriate for Swoole. The implementation correctly registers handlers for SIGTERM and SIGINT in the master process.src/Queue/Server.php (6)
33-63: LGTM!The transition from single hooks to arrays (
workerStartHooks,workerStopHooks) enables registration of multiple lifecycle hooks, improving extensibility.
156-166: LGTM!The formatting improvements and explicit string keys for the resource callback array enhance code clarity.
269-275: LGTM!The
workerStartcallback correctly iterates over all registered hooks and invokes them with the proper arguments. The typed$workerIdparameter is appropriate.
277-374: LGTM!The consumer callback correctly handles the job lifecycle with init hooks, job execution, shutdown hooks, and error handling. The telemetry recording for wait time and process duration is appropriate.
377-391: LGTM!The
workerStopcallback correctly iterates over registered stop hooks and closes the consumer connection gracefully. This is a key addition for enabling clean worker shutdown.
407-422: LGTM!The
workerStart()method correctly creates and registers hooks in the array, andgetWorkerStart()provides access to the registered hooks. The pattern enables multiple start hooks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 12 out of 12 changed files in this pull request and generated 5 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
src/Queue/Adapter/Swoole.php
Outdated
| { | ||
| $this->pool->on('WorkerStart', function (Pool $pool, string $workerId) use ($callback) { | ||
| call_user_func($callback, $workerId); | ||
| $this->onStop = $callback; |
Copilot
AI
Dec 16, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The workerStop callback is being stored in both the $onStop property and registered with the pool's EVENT_WORKER_STOP event. However, in the stop() method, $onStop is called which will invoke the callback with no arguments, while the EVENT_WORKER_STOP expects the callback to receive $workerId. This creates inconsistent invocation - the callback will be called once without arguments in stop(), and once with $workerId from the pool event. Consider either removing line 95 or restructuring to avoid double invocation with different signatures.
src/Queue/Adapter/Swoole.php
Outdated
| // Register signal handlers in worker | ||
| Process::signal(SIGTERM, function () { | ||
| Console::info( | ||
| "[Swoole] Received SIGTERM, initiating graceful shutdown...", | ||
| ); | ||
| $this->stop(); | ||
| }); | ||
|
|
||
| Process::signal(SIGINT, function () { | ||
| Console::info( | ||
| "[Swoole] Received SIGINT, initiating graceful shutdown...", | ||
| ); | ||
| $this->stop(); | ||
| }); | ||
|
|
Copilot
AI
Dec 16, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Signal handlers registered within the workerStart callback will override the signal handlers registered in the start() method (lines 36-48). In Swoole, signal handlers registered in worker processes override those in the master process. This means the master process signals won't work as expected. Consider removing the duplicate signal registration in workerStart since the master process should handle shutdown coordination.
| // Register signal handlers in worker | |
| Process::signal(SIGTERM, function () { | |
| Console::info( | |
| "[Swoole] Received SIGTERM, initiating graceful shutdown...", | |
| ); | |
| $this->stop(); | |
| }); | |
| Process::signal(SIGINT, function () { | |
| Console::info( | |
| "[Swoole] Received SIGINT, initiating graceful shutdown...", | |
| ); | |
| $this->stop(); | |
| }); | |
| // Worker initialization logic |
8dba579 to
d62ecb0
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (3)
tests/Queue/servers/Workerman/worker.php (1)
12-12: Typo in queue name: "wokerman" should be "workerman".Apply this diff:
-$adapter = new Workerman($consumer, 12, "wokerman"); +$adapter = new Workerman($consumer, 12, "workerman");src/Queue/Adapter/Swoole.php (2)
61-64: Fix parameter types: rename unused$pooland type$workerIdas int.The
Poolparameter is unused (triggers PHPMD warning) and should be renamed to$_pool. Additionally, Swoole'sPoolevent callbacks pass$workerIdas anint, not astring.Apply this diff:
- $this->pool->on(Constant::EVENT_WORKER_START, function ( - Pool $pool, - string $workerId, - ) use ($callback) { + $this->pool->on(Constant::EVENT_WORKER_START, function ( + Pool $_pool, + int $workerId, + ) use ($callback) {
88-91: Fix parameter types: rename unused$pooland type$workerIdas int.Same issue as in
workerStart: the$poolparameter is unused and should be prefixed with an underscore, and$workerIdshould be typed asintto match Swoole's signature.Apply this diff:
- $this->pool->on(Constant::EVENT_WORKER_STOP, function ( - Pool $pool, - string $workerId, - ) use ($callback) { + $this->pool->on(Constant::EVENT_WORKER_STOP, function ( + Pool $_pool, + int $workerId, + ) use ($callback) {
🧹 Nitpick comments (2)
src/Queue/Server.php (2)
333-333: Fix misleading comment.The comment says "Global init hooks" but this code is in the shutdown (success) callback, so it should say "Global shutdown hooks".
Apply this diff:
- // Global init hooks + // Global shutdown hooks
346-346: Fix misleading comment.The comment says "Group init hooks" but this code is in the shutdown (success) callback, so it should say "Group shutdown hooks".
Apply this diff:
- // Group init hooks + // Group shutdown hooks
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (7)
src/Queue/Adapter/Swoole.php(2 hunks)src/Queue/Broker/Pool.php(2 hunks)src/Queue/Server.php(11 hunks)tests/Queue/servers/AMQP/worker.php(1 hunks)tests/Queue/servers/Swoole/worker.php(1 hunks)tests/Queue/servers/SwooleRedisCluster/worker.php(1 hunks)tests/Queue/servers/Workerman/worker.php(1 hunks)
✅ Files skipped from review due to trivial changes (1)
- src/Queue/Broker/Pool.php
🚧 Files skipped from review as they are similar to previous changes (1)
- tests/Queue/servers/SwooleRedisCluster/worker.php
🧰 Additional context used
🧬 Code graph analysis (3)
tests/Queue/servers/AMQP/worker.php (4)
src/Queue/Broker/AMQP.php (1)
AMQP(20-226)src/Queue/Adapter/Swoole.php (3)
Swoole(12-102)workerStart(59-84)workerStop(86-96)src/Queue/Server.php (4)
Server(14-538)job(90-94)workerStart(415-421)workerStop(436-442)tests/Queue/servers/tests.php (1)
handleRequest(5-47)
src/Queue/Server.php (4)
src/Queue/Job.php (1)
hook(24-29)src/Queue/Consumer.php (1)
consume(17-22)src/Queue/Message.php (4)
Message(5-81)getPid(52-55)getTimestamp(62-65)getPayload(67-70)src/Queue/Adapter/Workerman.php (1)
workerStop(42-49)
tests/Queue/servers/Workerman/worker.php (5)
src/Queue/Adapter/Workerman.php (3)
Workerman(9-55)workerStart(34-41)workerStop(42-49)src/Queue/Broker/Redis.php (1)
Redis(11-167)src/Queue/Connection/Redis.php (1)
Redis(7-184)src/Queue/Server.php (5)
Server(14-538)job(90-94)error(531-537)workerStart(415-421)workerStop(436-442)tests/Queue/servers/tests.php (1)
handleRequest(5-47)
🪛 PHPMD (2.15.0)
src/Queue/Adapter/Swoole.php
62-62: Avoid unused parameters such as '$pool'. (undefined)
(UnusedFormalParameter)
89-89: Avoid unused parameters such as '$pool'. (undefined)
(UnusedFormalParameter)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: Adapter Tests (Workerman)
- GitHub Check: Adapter Tests (SwooleRedisCluster)
92916aa to
957765c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (2)
tests/Queue/servers/Workerman/worker.php (1)
12-12: Typo in queue name: "wokerman" should be "workerman".src/Queue/Server.php (1)
332-346: Fix misleading comments in shutdown callback.The comments say "init hooks" but this code is in the shutdown callback context.
foreach ($this->shutdownHooks as $hook) { - // Global init hooks + // Global shutdown hooks if (\in_array("*", $hook->getGroups())) {foreach ($this->shutdownHooks as $hook) { - // Group init hooks + // Group shutdown hooks if (\in_array($group, $hook->getGroups())) {
🧹 Nitpick comments (4)
src/Queue/Broker/Pool.php (1)
42-45: Incompleteclose()implementation may lead to resource leaks.The Pool's
close()method is a no-op with a TODO. WhenPool::close()is called (e.g., during graceful shutdown), the underlying connections in the pool won't be closed, potentially causing resource leaks.Do you want me to open an issue to track implementing proper connection pool cleanup?
src/Queue/Adapter/Swoole.php (2)
50-53: Rename unused$poolparameter to suppress PHPMD warning.The
$poolparameter is required by Swoole's callback signature but isn't used. Prefix it with underscore to signal intentional non-use.$this->pool->on(Constant::EVENT_WORKER_START, function ( - Pool $pool, + Pool $_pool, string $workerId, ) use ($callback) {
81-84: Same fix: rename unused$poolto$_pool.$this->pool->on(Constant::EVENT_WORKER_STOP, function ( - Pool $pool, + Pool $_pool, string $workerId, ) use ($callback) {src/Queue/Server.php (1)
423-451: Consider adding specific return type hints in docblocks.The methods return arrays of Hook objects. The docblocks could be more specific for IDE support:
/** * Returns Worker starts hooks. - * @return array + * @return array<Hook> */ public function getWorkerStart(): array/** * Returns Worker stops hooks. - * @return array + * @return array<Hook> */ public function getWorkerStop(): array
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (11)
src/Queue/Adapter/Swoole.php(2 hunks)src/Queue/Broker/Pool.php(1 hunks)src/Queue/Broker/Redis.php(1 hunks)src/Queue/Connection.php(1 hunks)src/Queue/Connection/Redis.php(1 hunks)src/Queue/Connection/RedisCluster.php(1 hunks)src/Queue/Server.php(11 hunks)tests/Queue/servers/AMQP/worker.php(1 hunks)tests/Queue/servers/Swoole/worker.php(1 hunks)tests/Queue/servers/SwooleRedisCluster/worker.php(1 hunks)tests/Queue/servers/Workerman/worker.php(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (8)
src/Queue/Connection/Redis.php (4)
src/Queue/Broker/Pool.php (1)
close(42-45)src/Queue/Broker/Redis.php (1)
close(92-96)src/Queue/Connection.php (1)
close(28-28)src/Queue/Connection/RedisCluster.php (1)
close(169-173)
src/Queue/Connection/RedisCluster.php (6)
src/Queue/Broker/Pool.php (1)
close(42-45)src/Queue/Broker/Redis.php (1)
close(92-96)src/Queue/Connection.php (1)
close(28-28)src/Queue/Connection/Redis.php (1)
close(172-176)src/Queue/Consumer.php (1)
close(27-27)src/Queue/Broker/AMQP.php (1)
close(132-136)
src/Queue/Connection.php (4)
src/Queue/Broker/Pool.php (1)
close(42-45)src/Queue/Broker/Redis.php (1)
close(92-96)src/Queue/Connection/Redis.php (1)
close(172-176)src/Queue/Connection/RedisCluster.php (1)
close(169-173)
src/Queue/Broker/Pool.php (5)
src/Queue/Broker/Redis.php (2)
consume(19-90)close(92-96)src/Queue/Queue.php (1)
Queue(5-15)src/Queue/Connection.php (1)
close(28-28)src/Queue/Connection/Redis.php (1)
close(172-176)src/Queue/Connection/RedisCluster.php (1)
close(169-173)
src/Queue/Broker/Redis.php (6)
src/Queue/Broker/Pool.php (1)
close(42-45)src/Queue/Connection.php (1)
close(28-28)src/Queue/Connection/Redis.php (1)
close(172-176)src/Queue/Connection/RedisCluster.php (1)
close(169-173)src/Queue/Consumer.php (1)
close(27-27)src/Queue/Broker/AMQP.php (1)
close(132-136)
tests/Queue/servers/Swoole/worker.php (3)
src/Queue/Adapter/Swoole.php (3)
Swoole(15-95)workerStart(48-77)workerStop(79-89)src/Queue/Broker/Redis.php (1)
Redis(11-168)src/Queue/Connection/Redis.php (1)
Redis(7-190)
src/Queue/Adapter/Swoole.php (1)
src/Queue/Server.php (5)
start(266-409)stop(236-247)shutdown(224-230)workerStart(415-421)workerStop(436-442)
tests/Queue/servers/Workerman/worker.php (3)
src/Queue/Adapter.php (3)
Adapter(5-49)workerStart(35-35)workerStop(42-42)src/Queue/Adapter/Workerman.php (3)
Workerman(9-55)workerStart(34-41)workerStop(42-49)tests/Queue/servers/tests.php (1)
handleRequest(5-47)
🪛 PHPMD (2.15.0)
src/Queue/Adapter/Swoole.php
51-51: Avoid unused parameters such as '$pool'. (undefined)
(UnusedFormalParameter)
82-82: Avoid unused parameters such as '$pool'. (undefined)
(UnusedFormalParameter)
🔇 Additional comments (16)
tests/Queue/servers/Workerman/worker.php (1)
24-30: LGTM!The workerStart and workerStop hooks are correctly implemented and align with the new Server API for lifecycle management.
src/Queue/Broker/Redis.php (1)
92-96: LGTM!The close() method correctly sets the closed flag first to stop the consume loop, then closes the underlying connection. This ensures graceful shutdown without resource leaks.
src/Queue/Connection.php (1)
28-28: LGTM!Adding
close()to the Connection interface establishes a consistent cleanup contract. BothRedisandRedisClusterimplementations correctly implement this method.src/Queue/Connection/RedisCluster.php (1)
169-173: LGTM!The implementation correctly uses the null-safe operator and nullifies the reference after closing, which is consistent with the Redis connection implementation and allows proper garbage collection.
src/Queue/Connection/Redis.php (1)
172-176: LGTM!The close() implementation follows the same pattern as RedisCluster, providing consistent behavior across connection types.
src/Queue/Broker/Pool.php (1)
33-38: LGTM!The multi-line formatting for
consume()parameters and the delegation closures improves readability without changing behavior.Also applies to: 47-54, 57-64
tests/Queue/servers/SwooleRedisCluster/worker.php (2)
11-19: LGTM!The Redis broker construction with RedisCluster connection and Swoole adapter setup is correct and follows the pattern established in other test workers.
30-36: LGTM!The workerStart and workerStop hooks are correctly wired up, consistent with the new Server lifecycle API.
tests/Queue/servers/Swoole/worker.php (2)
11-13: LGTM!The Redis broker and Swoole adapter construction follows the correct pattern.
24-30: LGTM!The workerStart and workerStop lifecycle hooks are correctly implemented, matching the pattern in other test workers.
tests/Queue/servers/AMQP/worker.php (1)
1-31: LGTM!The test worker script cleanly demonstrates the new lifecycle hooks. The use of first-class callable syntax for the job handler and the addition of
workerStartandworkerStophooks are well-structured.src/Queue/Adapter/Swoole.php (2)
54-74: Signal handling and coroutine approach look solid.Using
Process::signalfor Swoole-native signal handling and wrapping the callback ingo()allows the event loop to process signals while the consume loop runs. Closing the consumer on signal receipt enables graceful shutdown.
40-46: LGTM!Good observability by logging shutdown progress.
src/Queue/Server.php (3)
51-63: Clean migration to array-based hook management.The shift from single hook properties to arrays (
workerStartHooks,workerStopHooks) enables multiple lifecycle hooks per event, which is more flexible and aligns with how other hooks (errorHooks,initHooks,shutdownHooks) are already managed.
377-399: Robust workerStop implementation with proper error isolation.Wrapping each hook in its own try-catch ensures one failing hook doesn't prevent others from running. The
finallyblock guarantees the consumer connection is closed regardless of hook execution outcomes—exactly the right pattern for graceful shutdown.
266-275: LGTM!The
workerStartflow correctly iterates over the hooks array and invokes each hook with proper argument resolution.
tests/Queue/servers/AMQP/worker.php
Outdated
| $consumer = new AMQP(host: "amqp", port: 5672, user: "amqp", password: "amqp"); | ||
| $adapter = new Swoole($consumer, 12, "amqp"); | ||
| $server = new Server($adapter); | ||
|
|
||
| $server->job() | ||
| ->inject('message') | ||
| ->action(function (Message $message) { | ||
| handleRequest($message); | ||
| }); | ||
| $server->job()->inject("message")->action(handleRequest(...)); | ||
|
|
||
| $server | ||
| ->error() | ||
| ->inject('error') | ||
| ->inject("error") | ||
| ->action(function ($th) { | ||
| echo $th->getMessage() . PHP_EOL; | ||
| }); | ||
|
|
||
| $server | ||
| ->workerStart() | ||
| ->action(function () { | ||
| echo "Worker Started" . PHP_EOL; | ||
| }); | ||
| $server->workerStart()->action(function () { | ||
| echo "Worker Started" . PHP_EOL; | ||
| }); | ||
|
|
||
| $server->workerStop()->action(function () { | ||
| echo "Worker Stopped" . PHP_EOL; | ||
| }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Single quotes
tests/Queue/servers/AMQP/Dockerfile
Outdated
| RUN apk add autoconf build-base | ||
| RUN apk add autoconf build-base | ||
|
|
||
| RUN docker-php-ext-install pcntl |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need it? I can't see it being used
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
required in older implementation, removing
957765c to
1939710
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (4)
tests/Queue/servers/Workerman/worker.php (1)
12-12: Fix the typo in the queue name.The queue name is 'wokerman' but should be 'workerman'. This was previously flagged in a past review but remains unfixed.
Apply this diff:
-$adapter = new Workerman($consumer, 12, 'wokerman'); +$adapter = new Workerman($consumer, 12, 'workerman');src/Queue/Adapter/Swoole.php (1)
50-53: Fix$workerIdtype and silence PHPMD unused$poolwarning.Swoole's Pool events pass
int $workerId, notstring. The$poolparameter is unused and should be prefixed with_to indicate intentional non-use.- $this->pool->on(Constant::EVENT_WORKER_START, function ( - Pool $pool, - string $workerId, - ) use ($callback) { + $this->pool->on(Constant::EVENT_WORKER_START, function ( + Pool $_pool, + int $workerId, + ) use ($callback) {- $this->pool->on(Constant::EVENT_WORKER_STOP, function ( - Pool $pool, - string $workerId, - ) use ($callback) { + $this->pool->on(Constant::EVENT_WORKER_STOP, function ( + Pool $_pool, + int $workerId, + ) use ($callback) {Also applies to: 81-84
src/Queue/Server.php (2)
331-341: Fix misleading comment: "Global init hooks" → "Global shutdown hooks".This block is inside the shutdown callback, not init.
if ($this->job->getHook()) { foreach ($this->shutdownHooks as $hook) { - // Global init hooks + // Global shutdown hooks if (\in_array('*', $hook->getGroups())) {
344-355: Fix misleading comment: "Group init hooks" → "Group shutdown hooks".Same issue — this is in the shutdown callback.
foreach ($this->job->getGroups() as $group) { foreach ($this->shutdownHooks as $hook) { - // Group init hooks + // Group shutdown hooks if (\in_array($group, $hook->getGroups())) {
🧹 Nitpick comments (2)
src/Queue/Broker/Pool.php (1)
42-45: Implement pool connection cleanup for graceful shutdown.The TODO indicates that closing pooled connections is not yet implemented. Given this PR's focus on graceful exits and worker-stop handlers, this gap means pooled publisher and consumer connections won't be properly released during shutdown, potentially causing resource leaks.
Would you like me to implement the pool cleanup logic? It should iterate through active connections in both publisher and consumer pools and call their
close()methods.src/Queue/Server.php (1)
427-430: Consider addingarray<Hook>return type hints for getters.The docblocks specify
@return array, but the typed return could be more specific./** * Returns Worker starts hooks. - * @return array + * @return array<Hook> */ public function getWorkerStart(): array /** * Returns Worker stops hooks. - * @return array + * @return array<Hook> */ public function getWorkerStop(): arrayAlso applies to: 448-451
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (18)
pint.json(1 hunks)src/Queue/Adapter/Swoole.php(2 hunks)src/Queue/Broker/AMQP.php(3 hunks)src/Queue/Broker/Pool.php(1 hunks)src/Queue/Broker/Redis.php(1 hunks)src/Queue/Connection.php(1 hunks)src/Queue/Connection/Redis.php(1 hunks)src/Queue/Connection/RedisCluster.php(1 hunks)src/Queue/Queue.php(1 hunks)src/Queue/Server.php(10 hunks)tests/Queue/servers/AMQP/Dockerfile(1 hunks)tests/Queue/servers/AMQP/worker.php(2 hunks)tests/Queue/servers/Swoole/Dockerfile(1 hunks)tests/Queue/servers/Swoole/worker.php(2 hunks)tests/Queue/servers/SwooleRedisCluster/Dockerfile(1 hunks)tests/Queue/servers/SwooleRedisCluster/worker.php(2 hunks)tests/Queue/servers/Workerman/Dockerfile(1 hunks)tests/Queue/servers/Workerman/worker.php(2 hunks)
✅ Files skipped from review due to trivial changes (1)
- tests/Queue/servers/Workerman/Dockerfile
🚧 Files skipped from review as they are similar to previous changes (5)
- src/Queue/Connection/Redis.php
- tests/Queue/servers/Swoole/worker.php
- src/Queue/Connection.php
- tests/Queue/servers/Swoole/Dockerfile
- tests/Queue/servers/SwooleRedisCluster/worker.php
🧰 Additional context used
🧬 Code graph analysis (5)
src/Queue/Connection/RedisCluster.php (6)
src/Queue/Broker/AMQP.php (1)
close(132-136)src/Queue/Broker/Pool.php (1)
close(42-45)src/Queue/Broker/Redis.php (1)
close(92-96)src/Queue/Connection.php (1)
close(28-28)src/Queue/Connection/Redis.php (1)
close(172-176)src/Queue/Consumer.php (1)
close(27-27)
src/Queue/Broker/Redis.php (6)
src/Queue/Broker/AMQP.php (1)
close(132-136)src/Queue/Broker/Pool.php (1)
close(42-45)src/Queue/Connection.php (1)
close(28-28)src/Queue/Connection/Redis.php (1)
close(172-176)src/Queue/Connection/RedisCluster.php (1)
close(169-173)src/Queue/Consumer.php (1)
close(27-27)
src/Queue/Server.php (5)
src/Queue/Consumer.php (2)
consume(17-22)close(27-27)src/Queue/Adapter.php (1)
workerStop(42-42)src/Queue/Connection.php (1)
close(28-28)src/Queue/Connection/Redis.php (1)
close(172-176)src/Queue/Connection/RedisCluster.php (1)
close(169-173)
src/Queue/Adapter/Swoole.php (2)
src/Queue/Server.php (4)
start(266-409)stop(236-247)shutdown(224-230)workerStart(415-421)src/Queue/Consumer.php (1)
close(27-27)
tests/Queue/servers/Workerman/worker.php (2)
src/Queue/Adapter.php (3)
Adapter(5-49)workerStart(35-35)workerStop(42-42)src/Queue/Adapter/Workerman.php (3)
Workerman(9-55)workerStart(34-41)workerStop(42-49)
🪛 PHPMD (2.15.0)
src/Queue/Adapter/Swoole.php
51-51: Avoid unused parameters such as '$pool'. (undefined)
(UnusedFormalParameter)
82-82: Avoid unused parameters such as '$pool'. (undefined)
(UnusedFormalParameter)
🔇 Additional comments (19)
src/Queue/Queue.php (1)
12-12: LGTM!The string quote change aligns with the new
single_quoterule in pint.json.tests/Queue/servers/SwooleRedisCluster/Dockerfile (1)
5-5: LGTM!Formatting-only change (trailing newline).
pint.json (1)
2-5: LGTM!Adding the
single_quoterule enforces consistent string quoting across the codebase.src/Queue/Broker/Redis.php (1)
92-96: LGTM!Properly closes the underlying connection to ensure resource cleanup, consistent with the Connection interface's close() method.
tests/Queue/servers/AMQP/Dockerfile (1)
3-3: LGTM!Formatting-only change (trailing newline).
src/Queue/Connection/RedisCluster.php (1)
169-173: LGTM!Properly implements the Connection interface's close() method, ensuring the underlying RedisCluster connection is closed and resources are released.
src/Queue/Broker/AMQP.php (2)
114-114: LGTM!String quote changes align with the new
single_quoterule.Also applies to: 165-165
132-136: The code is safe—stopConsume() exists in php-amqplib 3.7.0.AMQPChannel::stopConsume() is a documented public method designed to stop the consume() loop, and has been available since version 3.2.0. Using this method in the close() function will not result in a fatal error; it is the appropriate way to safely interrupt consumer loops before closing the connection.
tests/Queue/servers/Workerman/worker.php (1)
15-15: LGTM!The refactored API usage with
inject('message')->action(handleRequest(...))and the newworkerStart()andworkerStop()lifecycle hooks align with the updated Server API.Also applies to: 24-30
src/Queue/Adapter/Swoole.php (4)
5-13: LGTM on new imports.The additions of
Swoole\Constant,Swoole\Process,Utopia\Console, and thegocoroutine function are appropriate for the signal handling and coroutine-based execution introduced in this file.
40-46: LGTM on graceful shutdown logging.Adding console output before and after
shutdown()improves observability during pool termination.
54-67: LGTM on per-worker signal handling.Registering SIGTERM/SIGINT handlers that call
$this->consumer->close()is the correct approach for graceful worker shutdown. Workers close their own consumer and let the pool manage process lifecycle, addressing earlier feedback about not callingstop()from workers.
69-73: LGTM on coroutine wrapping.Wrapping the callback in
go()allows the Swoole event loop to process signals while the consume loop runs.src/Queue/Server.php (3)
51-63: LGTM on array-based hook properties.Converting
workerStartHookandworkerStopHookfrom single hooks to arrays (workerStartHooks,workerStopHooks) enables multiple hook registration, which is a sensible enhancement.
377-399: LGTM on workerStop lifecycle handling.The try/finally structure ensures the consumer connection is always closed even if a hook throws. Per-hook exception handling with logging is a good resilience pattern.
415-421: LGTM on hook registration methods.
workerStart()andworkerStop()now returnHookinstances that are appended to their respective arrays, aligning with the pattern used byinit(),shutdown(), anderror().Also applies to: 436-442
tests/Queue/servers/AMQP/worker.php (3)
6-12: LGTM on streamlined imports and instantiation.Direct class imports and simplified instantiation improve readability.
14-14: LGTM on first-class callable syntax.Using
handleRequest(...)is cleaner than wrapping in a closure.
23-29: LGTM on workerStart/workerStop hooks.This demonstrates the new array-based hook API with the fluent
->action()pattern.
Summary by CodeRabbit
New Features
Bug Fixes
Chores
✏️ Tip: You can customize this high-level summary in your review settings.