Skip to content

Conversation

@loks0n
Copy link
Contributor

@loks0n loks0n commented Sep 5, 2025

Summary by CodeRabbit

  • New Features

    • Worker stop lifecycle: added worker-stop hooks and observable "Worker Stopped" events; expanded lifecycle logging.
    • Connection close API: new explicit close method for connections to support resource teardown.
  • Bug Fixes

    • AMQP: ensure consumption is stopped before closing to prevent hanging consumers.
  • Chores

    • Standardized container volume mappings for test and service environments.

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai

This comment was marked as spam.

coderabbitai[bot]

This comment was marked as outdated.

coderabbitai[bot]

This comment was marked as outdated.

@loks0n loks0n requested a review from abnegate September 10, 2025 14:47
@loks0n loks0n changed the title feat: add workerStop handling feat: add workerStop handler, graceful exits Sep 10, 2025
coderabbitai[bot]

This comment was marked as outdated.

@loks0n loks0n requested review from Meldiron and Copilot October 7, 2025 12:25
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

Graceful shutdown and lifecycle hooks for queue workers across adapters, plus test container updates for signal support.

  • Add per-worker stop hooks and worker stop event handling.
  • Introduce signal-based graceful shutdown for Swoole and ensure AMQP consumers stop before closing.
  • Standardize test container volumes and enable pcntl in images.

Reviewed Changes

Copilot reviewed 11 out of 11 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
tests/Queue/servers/Workerman/worker.php Adds workerStop hook usage for test worker lifecycle.
tests/Queue/servers/SwooleRedisCluster/worker.php Adds workerStop hook usage for cluster worker lifecycle.
tests/Queue/servers/Swoole/worker.php Adds workerStop hook usage for Swoole worker lifecycle.
tests/Queue/servers/AMQP/worker.php Adds workerStop hook usage for AMQP worker lifecycle.
tests/Queue/servers/SwooleRedisCluster/Dockerfile Enables pcntl for proper signal handling in tests.
tests/Queue/servers/Swoole/Dockerfile Enables pcntl for proper signal handling in tests.
tests/Queue/servers/AMQP/Dockerfile Enables pcntl for proper signal handling in tests.
src/Queue/Server.php Adds workerStop Hook API and integrates stop handling into worker lifecycle.
src/Queue/Broker/AMQP.php Attempts to stop consumption before closing channel/connection.
src/Queue/Adapter/Swoole.php Adds signal handlers and proper EVENT_* usage; introduces main-process stop path.
docker-compose.yml Standardizes container code paths and volumes for tests.

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 11 out of 11 changed files in this pull request and generated 6 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 15 to 17
/** @var callable */
private $onStop;
Copy link

Copilot AI Dec 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The property $onStop is declared without a nullable type or default value. This should be declared as private ?callable $onStop = null; to be explicit about its nullable nature and avoid potential issues.

Suggested change
/** @var callable */
private $onStop;
private ?callable $onStop = null;

Copilot uses AI. Check for mistakes.
@loks0n loks0n force-pushed the feat-workerstop branch 2 times, most recently from 6efa651 to 8dba579 Compare December 16, 2025 11:58
@loks0n loks0n requested a review from Copilot December 16, 2025 11:58
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (5)
src/Queue/Adapter/Swoole.php (4)

93-104: Mirror the parameter type fixes in workerStop().

Apply the same parameter corrections as in workerStart(): rename $pool to $_pool and type $workerId as int.

Apply this diff:

-        $this->pool->on(Constant::EVENT_WORKER_STOP, function (
-            Pool $pool,
-            string $workerId,
-        ) use ($callback) {
+        $this->pool->on(Constant::EVENT_WORKER_STOP, function (
+            Pool $_pool,
+            int $workerId,
+        ) use ($callback) {
             \call_user_func($callback, $workerId);
         });

54-64: Remove the direct onStop invocation from stop().

Calling $this->onStop() without a $workerId parameter will cause an ArgumentCountError, as the callback registered via workerStop() expects a $workerId argument (see line 100 and Server.php line 377). The EVENT_WORKER_STOP handler already invokes the callback correctly for each worker.

Apply this diff:

 public function stop(): self
 {
-    if ($this->onStop) {
-        \call_user_func($this->onStop);
-    }
-
     Console::info("[Swoole] Shutting down process pool...");
     $this->pool->shutdown();
     Console::success("[Swoole] Process pool stopped.");
     return $this;
 }

66-91: Fix parameter types and silence PHPMD warning.

Two issues with the event handler signature:

  1. The $pool parameter is unused and should be renamed to $_pool to silence PHPMD.
  2. Swoole passes $workerId as int, not string.

Apply this diff:

-        $this->pool->on(Constant::EVENT_WORKER_START, function (
-            Pool $pool,
-            string $workerId,
-        ) use ($callback) {
+        $this->pool->on(Constant::EVENT_WORKER_START, function (
+            Pool $_pool,
+            int $workerId,
+        ) use ($callback) {
             // Register signal handlers in worker
             Process::signal(SIGTERM, function () {

16-17: Type the $onStop property explicitly.

The property should use a typed declaration instead of a docblock to prevent accidental non-callable assignments and clarify intent.

Apply this diff:

-    /** @var callable */
-    private $onStop;
+    private ?callable $onStop = null;
src/Queue/Server.php (1)

106-110: Guard against undefined index when accessing resource callbacks.

Line 109 accesses self::$resourcesCallbacks[$name]["reset"] before line 111 checks if $name exists in the array. This can trigger an undefined index warning.

Apply this diff to reorder the checks:

         if (
             !\array_key_exists($name, $this->resources) ||
             $fresh ||
-            self::$resourcesCallbacks[$name]["reset"]
+            (isset(self::$resourcesCallbacks[$name]) && self::$resourcesCallbacks[$name]["reset"])
         ) {
             if (!\array_key_exists($name, self::$resourcesCallbacks)) {
🧹 Nitpick comments (1)
src/Queue/Server.php (1)

428-443: LGTM!

The workerStop() and getWorkerStop() methods mirror the workerStart() pattern correctly, enabling registration and retrieval of multiple stop hooks.

Optional: The docblock on line 437 could be refined to "Returns worker stop hooks." for better grammar, but this is a minor nitpick.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between e078822 and 8dba579.

📒 Files selected for processing (12)
  • docker-compose.yml (5 hunks)
  • src/Queue/Adapter/Swoole.php (2 hunks)
  • src/Queue/Broker/AMQP.php (1 hunks)
  • src/Queue/Broker/Pool.php (2 hunks)
  • src/Queue/Server.php (11 hunks)
  • tests/Queue/servers/AMQP/Dockerfile (1 hunks)
  • tests/Queue/servers/AMQP/worker.php (1 hunks)
  • tests/Queue/servers/Swoole/Dockerfile (1 hunks)
  • tests/Queue/servers/Swoole/worker.php (1 hunks)
  • tests/Queue/servers/SwooleRedisCluster/Dockerfile (1 hunks)
  • tests/Queue/servers/SwooleRedisCluster/worker.php (1 hunks)
  • tests/Queue/servers/Workerman/worker.php (1 hunks)
✅ Files skipped from review due to trivial changes (1)
  • src/Queue/Broker/Pool.php
🚧 Files skipped from review as they are similar to previous changes (5)
  • tests/Queue/servers/SwooleRedisCluster/Dockerfile
  • tests/Queue/servers/Swoole/Dockerfile
  • src/Queue/Broker/AMQP.php
  • tests/Queue/servers/Workerman/worker.php
  • tests/Queue/servers/AMQP/worker.php
🧰 Additional context used
🧬 Code graph analysis (2)
src/Queue/Adapter/Swoole.php (2)
src/Queue/Adapter.php (6)
  • Adapter (5-49)
  • __construct (12-16)
  • stop (28-28)
  • start (22-22)
  • workerStart (35-35)
  • workerStop (42-42)
src/Queue/Server.php (5)
  • __construct (84-88)
  • stop (236-247)
  • start (266-401)
  • workerStart (407-413)
  • workerStop (428-434)
tests/Queue/servers/Swoole/worker.php (5)
src/Queue/Server.php (4)
  • Server (14-530)
  • job (90-94)
  • workerStart (407-413)
  • workerStop (428-434)
src/Queue/Adapter.php (3)
  • Adapter (5-49)
  • workerStart (35-35)
  • workerStop (42-42)
src/Queue/Adapter/Swoole.php (3)
  • Swoole (12-110)
  • workerStart (66-91)
  • workerStop (93-104)
src/Queue/Connection/Redis.php (1)
  • Redis (7-184)
tests/Queue/servers/tests.php (1)
  • handleRequest (5-47)
🪛 PHPMD (2.15.0)
src/Queue/Adapter/Swoole.php

69-69: Avoid unused parameters such as '$pool'. (undefined)

(UnusedFormalParameter)


97-97: Avoid unused parameters such as '$pool'. (undefined)

(UnusedFormalParameter)

🔇 Additional comments (14)
tests/Queue/servers/AMQP/Dockerfile (1)

5-5: LGTM!

Installing the PCNTL extension is necessary for the signal-handling features introduced in this PR.

docker-compose.yml (1)

6-8: LGTM!

Standardizing volume mount paths to /usr/src/code across all services improves consistency and maintainability.

Also applies to: 20-22, 31-33, 43-45, 55-57

tests/Queue/servers/Swoole/worker.php (3)

3-9: LGTM!

The updated require paths and explicit imports improve code clarity and align with the refactored queue infrastructure.


11-26: LGTM!

The refactored server setup is clean and follows the new chained API pattern. The use of handleRequest(...) with the spread operator is appropriate.


28-30: LGTM!

The new workerStop() hook correctly registers a shutdown action, mirroring the workerStart() pattern. This enables graceful cleanup when workers terminate.

tests/Queue/servers/SwooleRedisCluster/worker.php (2)

3-21: LGTM!

The refactored setup correctly instantiates a Redis cluster-backed consumer and follows the same clean API pattern as the other test workers.


30-36: LGTM!

Both workerStart() and workerStop() hooks are correctly implemented and consistent with the other test worker implementations.

src/Queue/Adapter/Swoole.php (1)

31-52: LGTM!

The signal handling using Process::signal for graceful shutdown is appropriate for Swoole. The implementation correctly registers handlers for SIGTERM and SIGINT in the master process.

src/Queue/Server.php (6)

33-63: LGTM!

The transition from single hooks to arrays (workerStartHooks, workerStopHooks) enables registration of multiple lifecycle hooks, improving extensibility.


156-166: LGTM!

The formatting improvements and explicit string keys for the resource callback array enhance code clarity.


269-275: LGTM!

The workerStart callback correctly iterates over all registered hooks and invokes them with the proper arguments. The typed $workerId parameter is appropriate.


277-374: LGTM!

The consumer callback correctly handles the job lifecycle with init hooks, job execution, shutdown hooks, and error handling. The telemetry recording for wait time and process duration is appropriate.


377-391: LGTM!

The workerStop callback correctly iterates over registered stop hooks and closes the consumer connection gracefully. This is a key addition for enabling clean worker shutdown.


407-422: LGTM!

The workerStart() method correctly creates and registers hooks in the array, and getWorkerStart() provides access to the registered hooks. The pattern enables multiple start hooks.

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 12 out of 12 changed files in this pull request and generated 5 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

{
$this->pool->on('WorkerStart', function (Pool $pool, string $workerId) use ($callback) {
call_user_func($callback, $workerId);
$this->onStop = $callback;
Copy link

Copilot AI Dec 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The workerStop callback is being stored in both the $onStop property and registered with the pool's EVENT_WORKER_STOP event. However, in the stop() method, $onStop is called which will invoke the callback with no arguments, while the EVENT_WORKER_STOP expects the callback to receive $workerId. This creates inconsistent invocation - the callback will be called once without arguments in stop(), and once with $workerId from the pool event. Consider either removing line 95 or restructuring to avoid double invocation with different signatures.

Copilot uses AI. Check for mistakes.
Comment on lines 72 to 86
// Register signal handlers in worker
Process::signal(SIGTERM, function () {
Console::info(
"[Swoole] Received SIGTERM, initiating graceful shutdown...",
);
$this->stop();
});

Process::signal(SIGINT, function () {
Console::info(
"[Swoole] Received SIGINT, initiating graceful shutdown...",
);
$this->stop();
});

Copy link

Copilot AI Dec 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Signal handlers registered within the workerStart callback will override the signal handlers registered in the start() method (lines 36-48). In Swoole, signal handlers registered in worker processes override those in the master process. This means the master process signals won't work as expected. Consider removing the duplicate signal registration in workerStart since the master process should handle shutdown coordination.

Suggested change
// Register signal handlers in worker
Process::signal(SIGTERM, function () {
Console::info(
"[Swoole] Received SIGTERM, initiating graceful shutdown...",
);
$this->stop();
});
Process::signal(SIGINT, function () {
Console::info(
"[Swoole] Received SIGINT, initiating graceful shutdown...",
);
$this->stop();
});
// Worker initialization logic

Copilot uses AI. Check for mistakes.
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (3)
tests/Queue/servers/Workerman/worker.php (1)

12-12: Typo in queue name: "wokerman" should be "workerman".

Apply this diff:

-$adapter = new Workerman($consumer, 12, "wokerman");
+$adapter = new Workerman($consumer, 12, "workerman");
src/Queue/Adapter/Swoole.php (2)

61-64: Fix parameter types: rename unused $pool and type $workerId as int.

The Pool parameter is unused (triggers PHPMD warning) and should be renamed to $_pool. Additionally, Swoole's Pool event callbacks pass $workerId as an int, not a string.

Apply this diff:

-        $this->pool->on(Constant::EVENT_WORKER_START, function (
-            Pool $pool,
-            string $workerId,
-        ) use ($callback) {
+        $this->pool->on(Constant::EVENT_WORKER_START, function (
+            Pool $_pool,
+            int $workerId,
+        ) use ($callback) {

88-91: Fix parameter types: rename unused $pool and type $workerId as int.

Same issue as in workerStart: the $pool parameter is unused and should be prefixed with an underscore, and $workerId should be typed as int to match Swoole's signature.

Apply this diff:

-        $this->pool->on(Constant::EVENT_WORKER_STOP, function (
-            Pool $pool,
-            string $workerId,
-        ) use ($callback) {
+        $this->pool->on(Constant::EVENT_WORKER_STOP, function (
+            Pool $_pool,
+            int $workerId,
+        ) use ($callback) {
🧹 Nitpick comments (2)
src/Queue/Server.php (2)

333-333: Fix misleading comment.

The comment says "Global init hooks" but this code is in the shutdown (success) callback, so it should say "Global shutdown hooks".

Apply this diff:

-                                // Global init hooks
+                                // Global shutdown hooks

346-346: Fix misleading comment.

The comment says "Group init hooks" but this code is in the shutdown (success) callback, so it should say "Group shutdown hooks".

Apply this diff:

-                                // Group init hooks
+                                // Group shutdown hooks
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 8dba579 and d62ecb0.

📒 Files selected for processing (7)
  • src/Queue/Adapter/Swoole.php (2 hunks)
  • src/Queue/Broker/Pool.php (2 hunks)
  • src/Queue/Server.php (11 hunks)
  • tests/Queue/servers/AMQP/worker.php (1 hunks)
  • tests/Queue/servers/Swoole/worker.php (1 hunks)
  • tests/Queue/servers/SwooleRedisCluster/worker.php (1 hunks)
  • tests/Queue/servers/Workerman/worker.php (1 hunks)
✅ Files skipped from review due to trivial changes (1)
  • src/Queue/Broker/Pool.php
🚧 Files skipped from review as they are similar to previous changes (1)
  • tests/Queue/servers/SwooleRedisCluster/worker.php
🧰 Additional context used
🧬 Code graph analysis (3)
tests/Queue/servers/AMQP/worker.php (4)
src/Queue/Broker/AMQP.php (1)
  • AMQP (20-226)
src/Queue/Adapter/Swoole.php (3)
  • Swoole (12-102)
  • workerStart (59-84)
  • workerStop (86-96)
src/Queue/Server.php (4)
  • Server (14-538)
  • job (90-94)
  • workerStart (415-421)
  • workerStop (436-442)
tests/Queue/servers/tests.php (1)
  • handleRequest (5-47)
src/Queue/Server.php (4)
src/Queue/Job.php (1)
  • hook (24-29)
src/Queue/Consumer.php (1)
  • consume (17-22)
src/Queue/Message.php (4)
  • Message (5-81)
  • getPid (52-55)
  • getTimestamp (62-65)
  • getPayload (67-70)
src/Queue/Adapter/Workerman.php (1)
  • workerStop (42-49)
tests/Queue/servers/Workerman/worker.php (5)
src/Queue/Adapter/Workerman.php (3)
  • Workerman (9-55)
  • workerStart (34-41)
  • workerStop (42-49)
src/Queue/Broker/Redis.php (1)
  • Redis (11-167)
src/Queue/Connection/Redis.php (1)
  • Redis (7-184)
src/Queue/Server.php (5)
  • Server (14-538)
  • job (90-94)
  • error (531-537)
  • workerStart (415-421)
  • workerStop (436-442)
tests/Queue/servers/tests.php (1)
  • handleRequest (5-47)
🪛 PHPMD (2.15.0)
src/Queue/Adapter/Swoole.php

62-62: Avoid unused parameters such as '$pool'. (undefined)

(UnusedFormalParameter)


89-89: Avoid unused parameters such as '$pool'. (undefined)

(UnusedFormalParameter)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: Adapter Tests (Workerman)
  • GitHub Check: Adapter Tests (SwooleRedisCluster)

@loks0n loks0n force-pushed the feat-workerstop branch 2 times, most recently from 92916aa to 957765c Compare December 16, 2025 13:29
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (2)
tests/Queue/servers/Workerman/worker.php (1)

12-12: Typo in queue name: "wokerman" should be "workerman".

src/Queue/Server.php (1)

332-346: Fix misleading comments in shutdown callback.

The comments say "init hooks" but this code is in the shutdown callback context.

                     foreach ($this->shutdownHooks as $hook) {
-                                // Global init hooks
+                                // Global shutdown hooks
                         if (\in_array("*", $hook->getGroups())) {
                             foreach ($this->shutdownHooks as $hook) {
-                                // Group init hooks
+                                // Group shutdown hooks
                                 if (\in_array($group, $hook->getGroups())) {
🧹 Nitpick comments (4)
src/Queue/Broker/Pool.php (1)

42-45: Incomplete close() implementation may lead to resource leaks.

The Pool's close() method is a no-op with a TODO. When Pool::close() is called (e.g., during graceful shutdown), the underlying connections in the pool won't be closed, potentially causing resource leaks.

Do you want me to open an issue to track implementing proper connection pool cleanup?

src/Queue/Adapter/Swoole.php (2)

50-53: Rename unused $pool parameter to suppress PHPMD warning.

The $pool parameter is required by Swoole's callback signature but isn't used. Prefix it with underscore to signal intentional non-use.

     $this->pool->on(Constant::EVENT_WORKER_START, function (
-        Pool $pool,
+        Pool $_pool,
         string $workerId,
     ) use ($callback) {

81-84: Same fix: rename unused $pool to $_pool.

     $this->pool->on(Constant::EVENT_WORKER_STOP, function (
-        Pool $pool,
+        Pool $_pool,
         string $workerId,
     ) use ($callback) {
src/Queue/Server.php (1)

423-451: Consider adding specific return type hints in docblocks.

The methods return arrays of Hook objects. The docblocks could be more specific for IDE support:

 /**
  * Returns Worker starts hooks.
- * @return array
+ * @return array<Hook>
  */
 public function getWorkerStart(): array
 /**
  * Returns Worker stops hooks.
- * @return array
+ * @return array<Hook>
  */
 public function getWorkerStop(): array
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between d62ecb0 and 957765c.

📒 Files selected for processing (11)
  • src/Queue/Adapter/Swoole.php (2 hunks)
  • src/Queue/Broker/Pool.php (1 hunks)
  • src/Queue/Broker/Redis.php (1 hunks)
  • src/Queue/Connection.php (1 hunks)
  • src/Queue/Connection/Redis.php (1 hunks)
  • src/Queue/Connection/RedisCluster.php (1 hunks)
  • src/Queue/Server.php (11 hunks)
  • tests/Queue/servers/AMQP/worker.php (1 hunks)
  • tests/Queue/servers/Swoole/worker.php (1 hunks)
  • tests/Queue/servers/SwooleRedisCluster/worker.php (1 hunks)
  • tests/Queue/servers/Workerman/worker.php (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (8)
src/Queue/Connection/Redis.php (4)
src/Queue/Broker/Pool.php (1)
  • close (42-45)
src/Queue/Broker/Redis.php (1)
  • close (92-96)
src/Queue/Connection.php (1)
  • close (28-28)
src/Queue/Connection/RedisCluster.php (1)
  • close (169-173)
src/Queue/Connection/RedisCluster.php (6)
src/Queue/Broker/Pool.php (1)
  • close (42-45)
src/Queue/Broker/Redis.php (1)
  • close (92-96)
src/Queue/Connection.php (1)
  • close (28-28)
src/Queue/Connection/Redis.php (1)
  • close (172-176)
src/Queue/Consumer.php (1)
  • close (27-27)
src/Queue/Broker/AMQP.php (1)
  • close (132-136)
src/Queue/Connection.php (4)
src/Queue/Broker/Pool.php (1)
  • close (42-45)
src/Queue/Broker/Redis.php (1)
  • close (92-96)
src/Queue/Connection/Redis.php (1)
  • close (172-176)
src/Queue/Connection/RedisCluster.php (1)
  • close (169-173)
src/Queue/Broker/Pool.php (5)
src/Queue/Broker/Redis.php (2)
  • consume (19-90)
  • close (92-96)
src/Queue/Queue.php (1)
  • Queue (5-15)
src/Queue/Connection.php (1)
  • close (28-28)
src/Queue/Connection/Redis.php (1)
  • close (172-176)
src/Queue/Connection/RedisCluster.php (1)
  • close (169-173)
src/Queue/Broker/Redis.php (6)
src/Queue/Broker/Pool.php (1)
  • close (42-45)
src/Queue/Connection.php (1)
  • close (28-28)
src/Queue/Connection/Redis.php (1)
  • close (172-176)
src/Queue/Connection/RedisCluster.php (1)
  • close (169-173)
src/Queue/Consumer.php (1)
  • close (27-27)
src/Queue/Broker/AMQP.php (1)
  • close (132-136)
tests/Queue/servers/Swoole/worker.php (3)
src/Queue/Adapter/Swoole.php (3)
  • Swoole (15-95)
  • workerStart (48-77)
  • workerStop (79-89)
src/Queue/Broker/Redis.php (1)
  • Redis (11-168)
src/Queue/Connection/Redis.php (1)
  • Redis (7-190)
src/Queue/Adapter/Swoole.php (1)
src/Queue/Server.php (5)
  • start (266-409)
  • stop (236-247)
  • shutdown (224-230)
  • workerStart (415-421)
  • workerStop (436-442)
tests/Queue/servers/Workerman/worker.php (3)
src/Queue/Adapter.php (3)
  • Adapter (5-49)
  • workerStart (35-35)
  • workerStop (42-42)
src/Queue/Adapter/Workerman.php (3)
  • Workerman (9-55)
  • workerStart (34-41)
  • workerStop (42-49)
tests/Queue/servers/tests.php (1)
  • handleRequest (5-47)
🪛 PHPMD (2.15.0)
src/Queue/Adapter/Swoole.php

51-51: Avoid unused parameters such as '$pool'. (undefined)

(UnusedFormalParameter)


82-82: Avoid unused parameters such as '$pool'. (undefined)

(UnusedFormalParameter)

🔇 Additional comments (16)
tests/Queue/servers/Workerman/worker.php (1)

24-30: LGTM!

The workerStart and workerStop hooks are correctly implemented and align with the new Server API for lifecycle management.

src/Queue/Broker/Redis.php (1)

92-96: LGTM!

The close() method correctly sets the closed flag first to stop the consume loop, then closes the underlying connection. This ensures graceful shutdown without resource leaks.

src/Queue/Connection.php (1)

28-28: LGTM!

Adding close() to the Connection interface establishes a consistent cleanup contract. Both Redis and RedisCluster implementations correctly implement this method.

src/Queue/Connection/RedisCluster.php (1)

169-173: LGTM!

The implementation correctly uses the null-safe operator and nullifies the reference after closing, which is consistent with the Redis connection implementation and allows proper garbage collection.

src/Queue/Connection/Redis.php (1)

172-176: LGTM!

The close() implementation follows the same pattern as RedisCluster, providing consistent behavior across connection types.

src/Queue/Broker/Pool.php (1)

33-38: LGTM!

The multi-line formatting for consume() parameters and the delegation closures improves readability without changing behavior.

Also applies to: 47-54, 57-64

tests/Queue/servers/SwooleRedisCluster/worker.php (2)

11-19: LGTM!

The Redis broker construction with RedisCluster connection and Swoole adapter setup is correct and follows the pattern established in other test workers.


30-36: LGTM!

The workerStart and workerStop hooks are correctly wired up, consistent with the new Server lifecycle API.

tests/Queue/servers/Swoole/worker.php (2)

11-13: LGTM!

The Redis broker and Swoole adapter construction follows the correct pattern.


24-30: LGTM!

The workerStart and workerStop lifecycle hooks are correctly implemented, matching the pattern in other test workers.

tests/Queue/servers/AMQP/worker.php (1)

1-31: LGTM!

The test worker script cleanly demonstrates the new lifecycle hooks. The use of first-class callable syntax for the job handler and the addition of workerStart and workerStop hooks are well-structured.

src/Queue/Adapter/Swoole.php (2)

54-74: Signal handling and coroutine approach look solid.

Using Process::signal for Swoole-native signal handling and wrapping the callback in go() allows the event loop to process signals while the consume loop runs. Closing the consumer on signal receipt enables graceful shutdown.


40-46: LGTM!

Good observability by logging shutdown progress.

src/Queue/Server.php (3)

51-63: Clean migration to array-based hook management.

The shift from single hook properties to arrays (workerStartHooks, workerStopHooks) enables multiple lifecycle hooks per event, which is more flexible and aligns with how other hooks (errorHooks, initHooks, shutdownHooks) are already managed.


377-399: Robust workerStop implementation with proper error isolation.

Wrapping each hook in its own try-catch ensures one failing hook doesn't prevent others from running. The finally block guarantees the consumer connection is closed regardless of hook execution outcomes—exactly the right pattern for graceful shutdown.


266-275: LGTM!

The workerStart flow correctly iterates over the hooks array and invokes each hook with proper argument resolution.

Comment on lines 10 to 29
$consumer = new AMQP(host: "amqp", port: 5672, user: "amqp", password: "amqp");
$adapter = new Swoole($consumer, 12, "amqp");
$server = new Server($adapter);

$server->job()
->inject('message')
->action(function (Message $message) {
handleRequest($message);
});
$server->job()->inject("message")->action(handleRequest(...));

$server
->error()
->inject('error')
->inject("error")
->action(function ($th) {
echo $th->getMessage() . PHP_EOL;
});

$server
->workerStart()
->action(function () {
echo "Worker Started" . PHP_EOL;
});
$server->workerStart()->action(function () {
echo "Worker Started" . PHP_EOL;
});

$server->workerStop()->action(function () {
echo "Worker Stopped" . PHP_EOL;
});
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Single quotes

RUN apk add autoconf build-base
RUN apk add autoconf build-base

RUN docker-php-ext-install pcntl
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need it? I can't see it being used

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

required in older implementation, removing

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (4)
tests/Queue/servers/Workerman/worker.php (1)

12-12: Fix the typo in the queue name.

The queue name is 'wokerman' but should be 'workerman'. This was previously flagged in a past review but remains unfixed.

Apply this diff:

-$adapter = new Workerman($consumer, 12, 'wokerman');
+$adapter = new Workerman($consumer, 12, 'workerman');
src/Queue/Adapter/Swoole.php (1)

50-53: Fix $workerId type and silence PHPMD unused $pool warning.

Swoole's Pool events pass int $workerId, not string. The $pool parameter is unused and should be prefixed with _ to indicate intentional non-use.

-        $this->pool->on(Constant::EVENT_WORKER_START, function (
-            Pool $pool,
-            string $workerId,
-        ) use ($callback) {
+        $this->pool->on(Constant::EVENT_WORKER_START, function (
+            Pool $_pool,
+            int $workerId,
+        ) use ($callback) {
-        $this->pool->on(Constant::EVENT_WORKER_STOP, function (
-            Pool $pool,
-            string $workerId,
-        ) use ($callback) {
+        $this->pool->on(Constant::EVENT_WORKER_STOP, function (
+            Pool $_pool,
+            int $workerId,
+        ) use ($callback) {

Also applies to: 81-84

src/Queue/Server.php (2)

331-341: Fix misleading comment: "Global init hooks" → "Global shutdown hooks".

This block is inside the shutdown callback, not init.

                 if ($this->job->getHook()) {
                     foreach ($this->shutdownHooks as $hook) {
-                        // Global init hooks
+                        // Global shutdown hooks
                         if (\in_array('*', $hook->getGroups())) {

344-355: Fix misleading comment: "Group init hooks" → "Group shutdown hooks".

Same issue — this is in the shutdown callback.

                 foreach ($this->job->getGroups() as $group) {
                     foreach ($this->shutdownHooks as $hook) {
-                        // Group init hooks
+                        // Group shutdown hooks
                         if (\in_array($group, $hook->getGroups())) {
🧹 Nitpick comments (2)
src/Queue/Broker/Pool.php (1)

42-45: Implement pool connection cleanup for graceful shutdown.

The TODO indicates that closing pooled connections is not yet implemented. Given this PR's focus on graceful exits and worker-stop handlers, this gap means pooled publisher and consumer connections won't be properly released during shutdown, potentially causing resource leaks.

Would you like me to implement the pool cleanup logic? It should iterate through active connections in both publisher and consumer pools and call their close() methods.

src/Queue/Server.php (1)

427-430: Consider adding array<Hook> return type hints for getters.

The docblocks specify @return array, but the typed return could be more specific.

     /**
      * Returns Worker starts hooks.
-     * @return array
+     * @return array<Hook>
      */
     public function getWorkerStart(): array

     /**
      * Returns Worker stops hooks.
-     * @return array
+     * @return array<Hook>
      */
     public function getWorkerStop(): array

Also applies to: 448-451

📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 957765c and 1939710.

📒 Files selected for processing (18)
  • pint.json (1 hunks)
  • src/Queue/Adapter/Swoole.php (2 hunks)
  • src/Queue/Broker/AMQP.php (3 hunks)
  • src/Queue/Broker/Pool.php (1 hunks)
  • src/Queue/Broker/Redis.php (1 hunks)
  • src/Queue/Connection.php (1 hunks)
  • src/Queue/Connection/Redis.php (1 hunks)
  • src/Queue/Connection/RedisCluster.php (1 hunks)
  • src/Queue/Queue.php (1 hunks)
  • src/Queue/Server.php (10 hunks)
  • tests/Queue/servers/AMQP/Dockerfile (1 hunks)
  • tests/Queue/servers/AMQP/worker.php (2 hunks)
  • tests/Queue/servers/Swoole/Dockerfile (1 hunks)
  • tests/Queue/servers/Swoole/worker.php (2 hunks)
  • tests/Queue/servers/SwooleRedisCluster/Dockerfile (1 hunks)
  • tests/Queue/servers/SwooleRedisCluster/worker.php (2 hunks)
  • tests/Queue/servers/Workerman/Dockerfile (1 hunks)
  • tests/Queue/servers/Workerman/worker.php (2 hunks)
✅ Files skipped from review due to trivial changes (1)
  • tests/Queue/servers/Workerman/Dockerfile
🚧 Files skipped from review as they are similar to previous changes (5)
  • src/Queue/Connection/Redis.php
  • tests/Queue/servers/Swoole/worker.php
  • src/Queue/Connection.php
  • tests/Queue/servers/Swoole/Dockerfile
  • tests/Queue/servers/SwooleRedisCluster/worker.php
🧰 Additional context used
🧬 Code graph analysis (5)
src/Queue/Connection/RedisCluster.php (6)
src/Queue/Broker/AMQP.php (1)
  • close (132-136)
src/Queue/Broker/Pool.php (1)
  • close (42-45)
src/Queue/Broker/Redis.php (1)
  • close (92-96)
src/Queue/Connection.php (1)
  • close (28-28)
src/Queue/Connection/Redis.php (1)
  • close (172-176)
src/Queue/Consumer.php (1)
  • close (27-27)
src/Queue/Broker/Redis.php (6)
src/Queue/Broker/AMQP.php (1)
  • close (132-136)
src/Queue/Broker/Pool.php (1)
  • close (42-45)
src/Queue/Connection.php (1)
  • close (28-28)
src/Queue/Connection/Redis.php (1)
  • close (172-176)
src/Queue/Connection/RedisCluster.php (1)
  • close (169-173)
src/Queue/Consumer.php (1)
  • close (27-27)
src/Queue/Server.php (5)
src/Queue/Consumer.php (2)
  • consume (17-22)
  • close (27-27)
src/Queue/Adapter.php (1)
  • workerStop (42-42)
src/Queue/Connection.php (1)
  • close (28-28)
src/Queue/Connection/Redis.php (1)
  • close (172-176)
src/Queue/Connection/RedisCluster.php (1)
  • close (169-173)
src/Queue/Adapter/Swoole.php (2)
src/Queue/Server.php (4)
  • start (266-409)
  • stop (236-247)
  • shutdown (224-230)
  • workerStart (415-421)
src/Queue/Consumer.php (1)
  • close (27-27)
tests/Queue/servers/Workerman/worker.php (2)
src/Queue/Adapter.php (3)
  • Adapter (5-49)
  • workerStart (35-35)
  • workerStop (42-42)
src/Queue/Adapter/Workerman.php (3)
  • Workerman (9-55)
  • workerStart (34-41)
  • workerStop (42-49)
🪛 PHPMD (2.15.0)
src/Queue/Adapter/Swoole.php

51-51: Avoid unused parameters such as '$pool'. (undefined)

(UnusedFormalParameter)


82-82: Avoid unused parameters such as '$pool'. (undefined)

(UnusedFormalParameter)

🔇 Additional comments (19)
src/Queue/Queue.php (1)

12-12: LGTM!

The string quote change aligns with the new single_quote rule in pint.json.

tests/Queue/servers/SwooleRedisCluster/Dockerfile (1)

5-5: LGTM!

Formatting-only change (trailing newline).

pint.json (1)

2-5: LGTM!

Adding the single_quote rule enforces consistent string quoting across the codebase.

src/Queue/Broker/Redis.php (1)

92-96: LGTM!

Properly closes the underlying connection to ensure resource cleanup, consistent with the Connection interface's close() method.

tests/Queue/servers/AMQP/Dockerfile (1)

3-3: LGTM!

Formatting-only change (trailing newline).

src/Queue/Connection/RedisCluster.php (1)

169-173: LGTM!

Properly implements the Connection interface's close() method, ensuring the underlying RedisCluster connection is closed and resources are released.

src/Queue/Broker/AMQP.php (2)

114-114: LGTM!

String quote changes align with the new single_quote rule.

Also applies to: 165-165


132-136: The code is safe—stopConsume() exists in php-amqplib 3.7.0.

AMQPChannel::stopConsume() is a documented public method designed to stop the consume() loop, and has been available since version 3.2.0. Using this method in the close() function will not result in a fatal error; it is the appropriate way to safely interrupt consumer loops before closing the connection.

tests/Queue/servers/Workerman/worker.php (1)

15-15: LGTM!

The refactored API usage with inject('message')->action(handleRequest(...)) and the new workerStart() and workerStop() lifecycle hooks align with the updated Server API.

Also applies to: 24-30

src/Queue/Adapter/Swoole.php (4)

5-13: LGTM on new imports.

The additions of Swoole\Constant, Swoole\Process, Utopia\Console, and the go coroutine function are appropriate for the signal handling and coroutine-based execution introduced in this file.


40-46: LGTM on graceful shutdown logging.

Adding console output before and after shutdown() improves observability during pool termination.


54-67: LGTM on per-worker signal handling.

Registering SIGTERM/SIGINT handlers that call $this->consumer->close() is the correct approach for graceful worker shutdown. Workers close their own consumer and let the pool manage process lifecycle, addressing earlier feedback about not calling stop() from workers.


69-73: LGTM on coroutine wrapping.

Wrapping the callback in go() allows the Swoole event loop to process signals while the consume loop runs.

src/Queue/Server.php (3)

51-63: LGTM on array-based hook properties.

Converting workerStartHook and workerStopHook from single hooks to arrays (workerStartHooks, workerStopHooks) enables multiple hook registration, which is a sensible enhancement.


377-399: LGTM on workerStop lifecycle handling.

The try/finally structure ensures the consumer connection is always closed even if a hook throws. Per-hook exception handling with logging is a good resilience pattern.


415-421: LGTM on hook registration methods.

workerStart() and workerStop() now return Hook instances that are appended to their respective arrays, aligning with the pattern used by init(), shutdown(), and error().

Also applies to: 436-442

tests/Queue/servers/AMQP/worker.php (3)

6-12: LGTM on streamlined imports and instantiation.

Direct class imports and simplified instantiation improve readability.


14-14: LGTM on first-class callable syntax.

Using handleRequest(...) is cleaner than wrapping in a closure.


23-29: LGTM on workerStart/workerStop hooks.

This demonstrates the new array-based hook API with the fluent ->action() pattern.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants