diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index f68742949ff..bcdcabd0a88 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -1433,6 +1433,21 @@ public void sendOneway(Message msg, MessageQueueSelector selector, Object arg) public TransactionSendResult sendMessageInTransaction(final Message msg, final TransactionListener localTransactionListener, final Object arg) throws MQClientException { + return sendMessageInTransaction(msg, localTransactionListener, null, null, arg); + } + + public TransactionSendResult sendMessageInTransaction(final Message msg, + final MessageQueueSelector selector, final Object selectorArg, final Object arg) + throws MQClientException { + if (selector == null) { + throw new MQClientException("MessageQueueSelector is null", null); + } + return sendMessageInTransaction(msg, null, selector, selectorArg, arg); + } + + private TransactionSendResult sendMessageInTransaction(final Message msg, + final TransactionListener localTransactionListener, final MessageQueueSelector selector, + final Object selectorArg, final Object arg) throws MQClientException { TransactionListener transactionListener = getCheckListener(); if (null == localTransactionListener && null == transactionListener) { throw new MQClientException("tranExecutor is null", null); @@ -1445,7 +1460,13 @@ public TransactionSendResult sendMessageInTransaction(final Message msg, MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true"); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup()); try { - sendResult = this.send(msg); + if (selector == null) { + sendResult = this.send(msg); + } else { + MessageQueue mq = this.invokeMessageQueueSelector(msg, selector, selectorArg, + this.defaultMQProducer.getSendMsgTimeout()); + sendResult = this.send(msg, mq); + } } catch (Exception e) { throw new MQClientException("send message Exception", e); } diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java index 2091bbabbff..fc422005bb8 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java @@ -950,6 +950,27 @@ public TransactionSendResult sendMessageInTransaction(Message msg, throw new RuntimeException("sendMessageInTransaction not implement, please use TransactionMQProducer class"); } + /** + * This method is used to send transactional messages with a message queue selector. + * + *
The {@code selectorArg} is passed only to {@link MessageQueueSelector#select(List, Message, Object)}. + * The {@code transactionArg} is passed only to {@link TransactionListener#executeLocalTransaction(Message, Object)}. + * This method follows the existing selector send semantics and does not perform the broker or queue reselection + * retry used by the default send path.
+ * + * @param msg Transactional message to send. + * @param selector Message queue selector. + * @param selectorArg Argument used by the selector. + * @param transactionArg Argument used along with local transaction executor. + * @return Transaction result. + * @throws MQClientException if there is any client error. + */ + @Override + public TransactionSendResult sendMessageInTransaction(Message msg, + MessageQueueSelector selector, Object selectorArg, Object transactionArg) throws MQClientException { + throw new RuntimeException("sendMessageInTransaction not implement, please use TransactionMQProducer class"); + } + /** * This method will be removed in a certain version after April 5, 2020, so please do not use this method. * diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java index 4286fdd7f96..13a652dcc42 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java @@ -84,6 +84,26 @@ void sendOneway(final Message msg, final MessageQueueSelector selector, final Ob TransactionSendResult sendMessageInTransaction(final Message msg, final Object arg) throws MQClientException; + /** + * Send a transactional message with a message queue selector. + * + *The {@code selectorArg} is passed only to {@link MessageQueueSelector#select(List, Message, Object)}. + * The {@code transactionArg} is passed only to {@link TransactionListener#executeLocalTransaction(Message, Object)}. + * This method follows the existing selector send semantics and does not perform the broker or queue reselection + * retry used by the default send path.
+ * + * @param msg transactional message to send. + * @param selector message queue selector. + * @param selectorArg argument used by the selector. + * @param transactionArg argument used by the local transaction executor. + * @return transaction result. + * @throws MQClientException if the message cannot be sent. + */ + default TransactionSendResult sendMessageInTransaction(final Message msg, + final MessageQueueSelector selector, final Object selectorArg, final Object transactionArg) throws MQClientException { + throw new RuntimeException("sendMessageInTransaction not implement, please use TransactionMQProducer class"); + } + //for batch SendResult send(final CollectionThe {@code selectorArg} is passed only to {@link MessageQueueSelector#select(List, Message, Object)}. + * The {@code transactionArg} is passed only to {@link TransactionListener#executeLocalTransaction(Message, Object)}. + * This method follows the existing selector send semantics and does not perform the broker or queue reselection + * retry used by the default send path.
+ * + * @param msg transactional message to send. + * @param selector message queue selector. + * @param selectorArg argument used by the selector. + * @param transactionArg argument used by the local transaction executor. + * @return transaction result. + * @throws MQClientException if the message cannot be sent. + */ + @Override + public TransactionSendResult sendMessageInTransaction(final Message msg, + final MessageQueueSelector selector, final Object selectorArg, final Object transactionArg) throws MQClientException { + if (null == this.transactionListener) { + throw new MQClientException("TransactionListener is null", null); + } + if (null == selector) { + throw new MQClientException("MessageQueueSelector is null", null); + } + + msg.setTopic(NamespaceUtil.wrapNamespace(this.getNamespace(), msg.getTopic())); + return this.defaultMQProducerImpl.sendMessageInTransaction(msg, selector, selectorArg, transactionArg); + } + public TransactionCheckListener getTransactionCheckListener() { return transactionCheckListener; } diff --git a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java index 33cf0df390d..ff549b1db2c 100644 --- a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java @@ -29,14 +29,17 @@ import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl; import org.apache.rocketmq.client.impl.producer.TopicPublishInfo; import org.apache.rocketmq.client.latency.MQFaultStrategy; +import org.apache.rocketmq.common.ServiceState; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.compression.CompressionType; import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.remoting.netty.NettyRemotingClient; +import org.apache.rocketmq.remoting.protocol.header.EndTransactionRequestHeader; import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader; import org.apache.rocketmq.remoting.protocol.route.BrokerData; import org.apache.rocketmq.remoting.protocol.route.QueueData; @@ -50,6 +53,7 @@ import org.mockito.junit.MockitoJUnitRunner; import java.lang.reflect.Field; +import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -63,6 +67,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown; @@ -77,6 +82,8 @@ import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.nullable; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -777,6 +784,145 @@ public void assertSendMessageInTransaction() throws MQClientException { assertNull(result); } + @Test(expected = RuntimeException.class) + public void assertSendMessageInTransactionByQueueSelector() throws MQClientException { + MessageQueueSelector selector = mock(MessageQueueSelector.class); + TransactionSendResult result = producer.sendMessageInTransaction(message, selector, 1, 2); + assertNull(result); + } + + @Test + public void testSendMessageInTransactionByQueueSelector() throws Exception { + final String transactionGroup = producerGroupPrefix + "_transaction_" + System.nanoTime(); + TransactionMQProducer transactionProducer = new TransactionMQProducer(transactionGroup); + final AtomicReference