[ISSUE #10545] Support queue selector for transactional sends#10548
[ISSUE #10545] Support queue selector for transactional sends#10548Alaske wants to merge 2 commits into
Conversation
|
this PR is ready for review. The change is limited to the Java client and does not modify broker/store/remoting protocol behavior. I verified the focused client tests with JDK 8 locally; details are included in the PR description. Could you please approve the GitHub Actions workflows when you have time? |
RockteMQ-AI
left a comment
There was a problem hiding this comment.
Review by github-manager-bot
Summary
Adds MessageQueueSelector support for transactional message sending in the Java client, allowing users to control which queue the transactional half-message is sent to.
Findings
Overall this is a well-structured and well-tested PR. The implementation correctly:
-
Uses a Java 8
defaultmethod inMQProducerto preserve binary compatibility -
Keeps
selectorArgandtransactionArgcleanly separated -
Reuses existing selector queue resolution and validation logic
-
Applies namespace wrapping in
TransactionMQProducerconsistently with the existingsendMessageInTransaction -
Updates the API schema file
-
[Info]
DefaultMQProducerImpl.java:1439— The null check forselectoris performed in bothTransactionMQProducer.sendMessageInTransaction()(line ~109) andDefaultMQProducerImpl.sendMessageInTransaction()(line ~1439). The check in the impl is technically unreachable when called throughTransactionMQProducer, but it's good defensive programming if the impl method is ever called directly. No action needed. -
[Info]
DefaultMQProducerTest.java:785— The testassertSendMessageInTransactionByQueueSelectorverifies thatDefaultMQProducer(notTransactionMQProducer) rejects the selector-based transactional send. The test name could be slightly more descriptive, e.g.assertDefaultMQProducerRejectsSendMessageInTransactionByQueueSelector, to distinguish it from the happy-path test. This is purely cosmetic.
Suggestions
No blocking suggestions. The implementation is clean, the test coverage is thorough (null selector, different-topic rejection, arg separation, end-transaction routing), and backward compatibility is properly maintained.
Cross-repo Note
No changes needed in apache/rocketmq-clients — this is a Java client-only API addition and does not affect the gRPC protocol or broker behavior.
Automated review by github-manager-bot
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## develop #10548 +/- ##
=============================================
- Coverage 48.27% 48.17% -0.10%
+ Complexity 13435 13418 -17
=============================================
Files 1377 1379 +2
Lines 100844 100838 -6
Branches 13036 13038 +2
=============================================
- Hits 48678 48582 -96
- Misses 46217 46294 +77
- Partials 5949 5962 +13 ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
| * @throws MQClientException if there is any client error. | ||
| */ | ||
| @Override | ||
| public TransactionSendResult sendMessageInTransaction(Message msg, |
There was a problem hiding this comment.
Since this method already has the same default implementation in MQProducer, do we still need to duplicate it in DefaultMQProducer?
There was a problem hiding this comment.
Runtime-wise, the override is not strictly required because MQProducer already provides the same default implementation.
I kept it in DefaultMQProducer for two reasons:
- To keep the transactional-send rejection explicit and consistent with the existing
sendMessageInTransaction(Message, Object)method inDefaultMQProducer. - To make the public API change visible in
DefaultMQProducer.schema. The current schema tool collects declared methods from the class/superclasses, but does not include interface default methods.
If you prefer to avoid the duplicated override, I can remove it and adjust the schema expectation accordingly.
There was a problem hiding this comment.
Thanks for the explanation. I think we should keep the style consistent. If we prefer to explicitly implement this method in DefaultMQProducer, should we avoid making it a default method in MQProducer and keep it consistent with the other interface methods?
RockteMQ-AI
left a comment
There was a problem hiding this comment.
Review by github-manager-bot
Summary
Adds MessageQueueSelector support for transactional message sending. Users can now control which queue the transactional half-message is sent to, while keeping selectorArg and transactionArg separate.
Findings
API Design
- [Info]
MQProducer.java:87-103— Newdefaultmethod on the interface preserves binary compatibility for third-partyMQProducerimplementations. Good choice. - [Info]
DefaultMQProducer.java:953-973— Correctly throwsRuntimeExceptiondirecting users toTransactionMQProducer, consistent with the existingsendMessageInTransaction(msg, arg)pattern. - [Info]
TransactionMQProducer.java:104-121— Validates bothtransactionListenerandselectorfor null before proceeding. Namespace wrapping is applied. Clean.
Implementation
- [Info]
DefaultMQProducerImpl.java:1436-1448— The existing public method delegates to the new private 5-parameter method withselector=null, selectorArg=null. This preserves the original behavior exactly. - [Info]
DefaultMQProducerImpl.java:1463-1469— Whenselector != null, usesinvokeMessageQueueSelectorto resolve the queue, then callssend(msg, mq)for the specific-queue path. This correctly reuses existing selector validation (including the topic mismatch check tested intestSendMessageInTransactionBySelectorWithDifferentTopicQueue). - [Info] The separation of
selectorArg(passed only toMessageQueueSelector.select()) andtransactionArg(passed only toTransactionListener.executeLocalTransaction()) is clean and avoids the ambiguity of a single shared argument.
Tests
- [Info]
DefaultMQProducerTest.java— Comprehensive test coverage:assertSendMessageInTransactionByQueueSelector: VerifiesDefaultMQProducerrejects transactional sendstestSendMessageInTransactionByQueueSelector: Full integration test verifying selector arg routing, transaction arg routing, queue selection, and end-transaction routingtestSendMessageInTransactionByNullQueueSelector: Null selector rejectiontestSendMessageInTransactionBySelectorWithDifferentTopicQueue: Topic mismatch rejection
- [Info]
api/client.producer.DefaultMQProducer.schema— Public API schema updated to include the new method.
Suggestions
- [Warning]
DefaultMQProducerImpl.java:1463-1469— The selector-based path usesthis.send(msg, mq)which goes through the specific-queue send path. This path does not perform the broker/queue reselection retry that the defaultthis.send(msg)path does (as noted in the Javadoc). This is documented and intentional, but worth confirming that users understand the trade-off: they get deterministic queue selection at the cost of no automatic retry on broker unavailability. - [Info] The
@Test(expected = RuntimeException.class)onassertSendMessageInTransactionByQueueSelectoris a bit loose — it catches anyRuntimeExceptionrather than asserting a specific message. ThetestSendMessageInTransactionByNullQueueSelectortest does this better with explicit message assertion. Minor style point.
Verdict
Well-designed API extension with proper backward compatibility, clean argument separation, and thorough test coverage. LGTM.
Automated review by github-manager-bot
Which Issue(s) This PR Fixes
Fixes #10545
Brief Description
This PR adds
MessageQueueSelectorsupport for transactional message sending in the Java client.Currently normal message sends can use
MessageQueueSelector, but transactional sends only support:This PR adds a selector-aware overload so users can control the real queue used by the transactional half message:
How Did You Implement It
defaultmethod toMQProducerto preserve binary compatibility for third-party implementations.DefaultMQProducerbehavior unchanged by explicitly rejecting transactional sends and directing users toTransactionMQProducer.TransactionMQProducer.DefaultMQProducerImpl.selectorArgandtransactionArgseparate:selectorArgis passed only toMessageQueueSelector.transactionArgis passed only toTransactionListener#executeLocalTransaction.REAL_TOPIC/REAL_QIDfor half messages and restores the real queue when committing.How to Verify It
Added unit tests covering:
TransactionMQProducersends transactional messages to the queue selected byMessageQueueSelector.selectorArgis passed to the selector andtransactionArgis passed to the local transaction listener.DefaultMQProducerstill rejects transactional sends.Verified with JDK 8:
mvn -pl client \ -Dtest=DefaultMQProducerTest#assertSendMessageInTransactionByQueueSelector+testSendMessageInTransactionByQueueSelector+testSendMessageInTransactionByNullQueueSelector+testSendMessageInTransactionBySelectorWithDifferentTopicQueue \ -DskipITs testResult:
The public API schema for
DefaultMQProducerwas also updated.