Skip to content

[ISSUE #10545] Support queue selector for transactional sends#10548

Open
Alaske wants to merge 2 commits into
apache:developfrom
Alaske:feature/yibai/transactional_mesg_queue
Open

[ISSUE #10545] Support queue selector for transactional sends#10548
Alaske wants to merge 2 commits into
apache:developfrom
Alaske:feature/yibai/transactional_mesg_queue

Conversation

@Alaske

@Alaske Alaske commented Jun 25, 2026

Copy link
Copy Markdown

Which Issue(s) This PR Fixes

Fixes #10545

Brief Description

This PR adds MessageQueueSelector support for transactional message sending in the Java client.

Currently normal message sends can use MessageQueueSelector, but transactional sends only support:

sendMessageInTransaction(Message msg, Object arg)

This PR adds a selector-aware overload so users can control the real queue used by the transactional half message:

sendMessageInTransaction(
    Message msg,
    MessageQueueSelector selector,
    Object selectorArg,
    Object transactionArg
)

How Did You Implement It

  • Added a Java 8 default method to MQProducer to preserve binary compatibility for third-party implementations.
  • Kept DefaultMQProducer behavior unchanged by explicitly rejecting transactional sends and directing users to TransactionMQProducer.
  • Implemented the new overload in TransactionMQProducer.
  • Added a selector-aware transaction send path in DefaultMQProducerImpl.
  • Kept selectorArg and transactionArg separate:
    • selectorArg is passed only to MessageQueueSelector.
    • transactionArg is passed only to TransactionListener#executeLocalTransaction.
  • Reused the existing selector queue resolution and specified-queue send validation so selected queues follow the same validation semantics as normal selector sends.
  • Did not change broker, store, or remoting protocol behavior. The existing broker transaction flow already stores REAL_TOPIC / REAL_QID for half messages and restores the real queue when committing.

How to Verify It

Added unit tests covering:

  • TransactionMQProducer sends transactional messages to the queue selected by MessageQueueSelector.
  • selectorArg is passed to the selector and transactionArg is passed to the local transaction listener.
  • DefaultMQProducer still rejects transactional sends.
  • Null selector is rejected.
  • A selector returning a queue with a different topic is rejected consistently with normal selector send behavior.
  • End transaction routing uses the selected broker/queue.

Verified with JDK 8:

mvn -pl client \
  -Dtest=DefaultMQProducerTest#assertSendMessageInTransactionByQueueSelector+testSendMessageInTransactionByQueueSelector+testSendMessageInTransactionByNullQueueSelector+testSendMessageInTransactionBySelectorWithDifferentTopicQueue \
  -DskipITs test

Result:

Tests run: 4, Failures: 0, Errors: 0, Skipped: 0
Checkstyle: 0 violations
SpotBugs: 0 issues

The public API schema for DefaultMQProducer was also updated.

@Alaske

Alaske commented Jun 25, 2026

Copy link
Copy Markdown
Author

this PR is ready for review.

The change is limited to the Java client and does not modify broker/store/remoting protocol behavior. I verified the focused client tests with JDK 8 locally; details are included in the PR description.

Could you please approve the GitHub Actions workflows when you have time?

@RockteMQ-AI RockteMQ-AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review by github-manager-bot

Summary

Adds MessageQueueSelector support for transactional message sending in the Java client, allowing users to control which queue the transactional half-message is sent to.

Findings

Overall this is a well-structured and well-tested PR. The implementation correctly:

  • Uses a Java 8 default method in MQProducer to preserve binary compatibility

  • Keeps selectorArg and transactionArg cleanly separated

  • Reuses existing selector queue resolution and validation logic

  • Applies namespace wrapping in TransactionMQProducer consistently with the existing sendMessageInTransaction

  • Updates the API schema file

  • [Info] DefaultMQProducerImpl.java:1439 — The null check for selector is performed in both TransactionMQProducer.sendMessageInTransaction() (line ~109) and DefaultMQProducerImpl.sendMessageInTransaction() (line ~1439). The check in the impl is technically unreachable when called through TransactionMQProducer, but it's good defensive programming if the impl method is ever called directly. No action needed.

  • [Info] DefaultMQProducerTest.java:785 — The test assertSendMessageInTransactionByQueueSelector verifies that DefaultMQProducer (not TransactionMQProducer) rejects the selector-based transactional send. The test name could be slightly more descriptive, e.g. assertDefaultMQProducerRejectsSendMessageInTransactionByQueueSelector, to distinguish it from the happy-path test. This is purely cosmetic.

Suggestions

No blocking suggestions. The implementation is clean, the test coverage is thorough (null selector, different-topic rejection, arg separation, end-transaction routing), and backward compatibility is properly maintained.

Cross-repo Note

No changes needed in apache/rocketmq-clients — this is a Java client-only API addition and does not affect the gRPC protocol or broker behavior.


Automated review by github-manager-bot

@Alaske Alaske changed the title Support queue selector for transactional sends (#10545) [[ISSUE #10545]] Support queue selector for transactional sends Jun 26, 2026
@Alaske Alaske changed the title [[ISSUE #10545]] Support queue selector for transactional sends [ISSUE #10545] Support queue selector for transactional sends Jun 26, 2026
@codecov-commenter

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 70.58824% with 5 lines in your changes missing coverage. Please review.
✅ Project coverage is 48.17%. Comparing base (2e6632f) to head (8588038).
⚠️ Report is 1 commits behind head on develop.

Files with missing lines Patch % Lines
...mq/client/impl/producer/DefaultMQProducerImpl.java 77.77% 1 Missing and 1 partial ⚠️
...ocketmq/client/producer/TransactionMQProducer.java 66.66% 1 Missing and 1 partial ⚠️
...rg/apache/rocketmq/client/producer/MQProducer.java 0.00% 1 Missing ⚠️
Additional details and impacted files
@@              Coverage Diff              @@
##             develop   #10548      +/-   ##
=============================================
- Coverage      48.27%   48.17%   -0.10%     
+ Complexity     13435    13418      -17     
=============================================
  Files           1377     1379       +2     
  Lines         100844   100838       -6     
  Branches       13036    13038       +2     
=============================================
- Hits           48678    48582      -96     
- Misses         46217    46294      +77     
- Partials        5949     5962      +13     

☔ View full report in Codecov by Harness.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

* @throws MQClientException if there is any client error.
*/
@Override
public TransactionSendResult sendMessageInTransaction(Message msg,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this method already has the same default implementation in MQProducer, do we still need to duplicate it in DefaultMQProducer?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Runtime-wise, the override is not strictly required because MQProducer already provides the same default implementation.

I kept it in DefaultMQProducer for two reasons:

  1. To keep the transactional-send rejection explicit and consistent with the existing sendMessageInTransaction(Message, Object) method in DefaultMQProducer.
  2. To make the public API change visible in DefaultMQProducer.schema. The current schema tool collects declared methods from the class/superclasses, but does not include interface default methods.

If you prefer to avoid the duplicated override, I can remove it and adjust the schema expectation accordingly.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation. I think we should keep the style consistent. If we prefer to explicitly implement this method in DefaultMQProducer, should we avoid making it a default method in MQProducer and keep it consistent with the other interface methods?

@Alaske Alaske requested review from RockteMQ-AI and yx9o June 26, 2026 09:00

@RockteMQ-AI RockteMQ-AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review by github-manager-bot

Summary

Adds MessageQueueSelector support for transactional message sending. Users can now control which queue the transactional half-message is sent to, while keeping selectorArg and transactionArg separate.

Findings

API Design

  • [Info] MQProducer.java:87-103 — New default method on the interface preserves binary compatibility for third-party MQProducer implementations. Good choice.
  • [Info] DefaultMQProducer.java:953-973 — Correctly throws RuntimeException directing users to TransactionMQProducer, consistent with the existing sendMessageInTransaction(msg, arg) pattern.
  • [Info] TransactionMQProducer.java:104-121 — Validates both transactionListener and selector for null before proceeding. Namespace wrapping is applied. Clean.

Implementation

  • [Info] DefaultMQProducerImpl.java:1436-1448 — The existing public method delegates to the new private 5-parameter method with selector=null, selectorArg=null. This preserves the original behavior exactly.
  • [Info] DefaultMQProducerImpl.java:1463-1469 — When selector != null, uses invokeMessageQueueSelector to resolve the queue, then calls send(msg, mq) for the specific-queue path. This correctly reuses existing selector validation (including the topic mismatch check tested in testSendMessageInTransactionBySelectorWithDifferentTopicQueue).
  • [Info] The separation of selectorArg (passed only to MessageQueueSelector.select()) and transactionArg (passed only to TransactionListener.executeLocalTransaction()) is clean and avoids the ambiguity of a single shared argument.

Tests

  • [Info] DefaultMQProducerTest.java — Comprehensive test coverage:
    • assertSendMessageInTransactionByQueueSelector: Verifies DefaultMQProducer rejects transactional sends
    • testSendMessageInTransactionByQueueSelector: Full integration test verifying selector arg routing, transaction arg routing, queue selection, and end-transaction routing
    • testSendMessageInTransactionByNullQueueSelector: Null selector rejection
    • testSendMessageInTransactionBySelectorWithDifferentTopicQueue: Topic mismatch rejection
  • [Info] api/client.producer.DefaultMQProducer.schema — Public API schema updated to include the new method.

Suggestions

  • [Warning] DefaultMQProducerImpl.java:1463-1469 — The selector-based path uses this.send(msg, mq) which goes through the specific-queue send path. This path does not perform the broker/queue reselection retry that the default this.send(msg) path does (as noted in the Javadoc). This is documented and intentional, but worth confirming that users understand the trade-off: they get deterministic queue selection at the cost of no automatic retry on broker unavailability.
  • [Info] The @Test(expected = RuntimeException.class) on assertSendMessageInTransactionByQueueSelector is a bit loose — it catches any RuntimeException rather than asserting a specific message. The testSendMessageInTransactionByNullQueueSelector test does this better with explicit message assertion. Minor style point.

Verdict

Well-designed API extension with proper backward compatibility, clean argument separation, and thorough test coverage. LGTM.


Automated review by github-manager-bot

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Enhancement] Support MessageQueueSelector for transactional messages

4 participants