Skip to content

Cpp support lite#1273

Open
zhaohai666 wants to merge 11 commits into
apache:masterfrom
zhaohai666:feature/cpp-support-lite-push
Open

Cpp support lite#1273
zhaohai666 wants to merge 11 commits into
apache:masterfrom
zhaohai666:feature/cpp-support-lite-push

Conversation

@zhaohai666

Copy link
Copy Markdown
Contributor

Cpp support lite push

@zhaohai666 zhaohai666 changed the title Cpp support lite push Cpp support lite Jun 12, 2026
@zhaohai666 zhaohai666 mentioned this pull request Jun 12, 2026
4 tasks

@RockteMQ-AI RockteMQ-AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review by github-manager-bot

Summary

Adds "lite push consumer" and "lite producer" support to the C++ client, including new APIs, subscription management, and examples.

Findings

  • [Info] This is a large feature PR (2539 additions, 39 files). The changes span CMake/Bazel build configs, new header/source files, examples, and tests.
  • [Info] LitePushConsumer / LitePushConsumerImpl — New consumer type with simplified API. Thread safety annotations (GUARDED_BY, LOCKS_EXCLUDED) are used consistently.
  • [Info] LiteSubscriptionManager — Manages subscriptions for lite push consumer. Includes unit tests.
  • [Info] thread_annotations_compat.h — Compatibility header for abseil thread safety macros. Good approach for supporting both old and new abseil versions.
  • [Warning] The PR description is very brief ("Cpp support lite"). Consider adding more detail about:
    • What "lite" means in this context (lightweight? simplified API?)
    • How it differs from the existing PushConsumer
    • Any protocol-level changes or new gRPC methods
  • [Warning] OffsetOption.h — New file. Ensure the offset option semantics are documented and consistent with other language clients.
  • [Info] Tests — LiteMessageTest.cpp and LiteSubscriptionManagerTest.cpp provide coverage for the new functionality.

Suggestions

  • Expand the PR description to explain the "lite" concept and its relationship to existing consumer types.
  • Consider adding a README or design doc for the lite push consumer feature.
  • Cross-repo: If "lite" is a new protocol feature, ensure the Proxy side (apache/rocketmq) supports it.

Overall: Significant feature addition with good test coverage. The brief description makes it harder to review, but the code quality appears solid.


Automated review by github-manager-bot

@RockteMQ-AI RockteMQ-AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review by github-manager-bot (Re-review after new commits)

Summary

Adds C++ LitePushConsumer support — a lightweight push consumer that binds to a parent topic and allows dynamic subscription/unsubscription of lite topics at runtime. Includes LiteSubscriptionManager for server-side sync, new error codes, Message API extensions, examples, and tests.

Findings

  • [Warning] LiteSubscriptionManager.cpp:73-76 — The periodic sync lambda captures raw this:

    auto sync_functor = [this]() { syncAllLiteSubscription(); };

    The comment acknowledges LiteSubscriptionManager does not inherit enable_shared_from_this. This is safe only if shutdown() reliably cancels the task before destruction completes. If the scheduler fires between shutdown() and actual destruction, or if shutdown() is not called (e.g., exception during start()), this is a use-after-free. Consider either:

    1. Inheriting from enable_shared_from_this and using weak_from_this()
    2. Or adding a guard in the lambda (e.g., check a destroyed flag under mutex)
  • [Info] LitePushConsumerImpl.cpp:37-40 — Constructor subscribes to bind topic with SUB_ALL ("*"), then start() creates the LiteSubscriptionManager. The ordering is correct — parent subscribe() before manager creation.

  • [Info] LiteSubscriptionManager.cpp:97-117subscribeLite() correctly validates the topic, checks quota, syncs with server, then updates local state. The two-phase approach (server sync → local update) means a failed server sync does not corrupt local state.

  • [Info] LiteSubscriptionManager.cpp:222-249syncLiteSubscription() iterates over all endpoints and sends the sync request to each. The callback captures &ec by reference, which is safe since the calls appear to be sequential within the loop.

  • [Info] Message.cpp — Adds lite_topic_ field and withLiteTopic()/liteTopic() accessors. Clean addition to the existing builder pattern.

  • [Info] OffsetOption.h — New enum for offset control (LAST, MIN, MAX, OFFSET, TAIL_N, TIMESTAMP). Well-structured with type() and value() accessors.

  • [Info] Error codes: IllegalLiteTopic, LiteSubscriptionQuotaExceeded — follow existing naming convention.

  • [Info] Tests cover lite subscription sync, quota enforcement, and lite message creation.

Cross-repo Note

This PR is in apache/rocketmq-clients (C++ SDK). The corresponding server-side lite topic support should be in apache/rocketmq. Ensure the SyncLiteSubscription RPC and LiteSubscriptionAction proto definitions are aligned between client and server.

Verdict

Solid implementation of the lite push consumer pattern. The main concern is the raw this capture in the periodic sync scheduler — consider adding a safety guard. Otherwise well-structured with proper mutex protection and error handling.


Automated review by github-manager-bot

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants