diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index f68742949ff..bcdcabd0a88 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -1433,6 +1433,21 @@ public void sendOneway(Message msg, MessageQueueSelector selector, Object arg) public TransactionSendResult sendMessageInTransaction(final Message msg, final TransactionListener localTransactionListener, final Object arg) throws MQClientException { + return sendMessageInTransaction(msg, localTransactionListener, null, null, arg); + } + + public TransactionSendResult sendMessageInTransaction(final Message msg, + final MessageQueueSelector selector, final Object selectorArg, final Object arg) + throws MQClientException { + if (selector == null) { + throw new MQClientException("MessageQueueSelector is null", null); + } + return sendMessageInTransaction(msg, null, selector, selectorArg, arg); + } + + private TransactionSendResult sendMessageInTransaction(final Message msg, + final TransactionListener localTransactionListener, final MessageQueueSelector selector, + final Object selectorArg, final Object arg) throws MQClientException { TransactionListener transactionListener = getCheckListener(); if (null == localTransactionListener && null == transactionListener) { throw new MQClientException("tranExecutor is null", null); @@ -1445,7 +1460,13 @@ public TransactionSendResult sendMessageInTransaction(final Message msg, MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true"); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup()); try { - sendResult = this.send(msg); + if (selector == null) { + sendResult = this.send(msg); + } else { + MessageQueue mq = this.invokeMessageQueueSelector(msg, selector, selectorArg, + this.defaultMQProducer.getSendMsgTimeout()); + sendResult = this.send(msg, mq); + } } catch (Exception e) { throw new MQClientException("send message Exception", e); } diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java index 2091bbabbff..fc422005bb8 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java @@ -950,6 +950,27 @@ public TransactionSendResult sendMessageInTransaction(Message msg, throw new RuntimeException("sendMessageInTransaction not implement, please use TransactionMQProducer class"); } + /** + * This method is used to send transactional messages with a message queue selector. + * + *

The {@code selectorArg} is passed only to {@link MessageQueueSelector#select(List, Message, Object)}. + * The {@code transactionArg} is passed only to {@link TransactionListener#executeLocalTransaction(Message, Object)}. + * This method follows the existing selector send semantics and does not perform the broker or queue reselection + * retry used by the default send path.

+ * + * @param msg Transactional message to send. + * @param selector Message queue selector. + * @param selectorArg Argument used by the selector. + * @param transactionArg Argument used along with local transaction executor. + * @return Transaction result. + * @throws MQClientException if there is any client error. + */ + @Override + public TransactionSendResult sendMessageInTransaction(Message msg, + MessageQueueSelector selector, Object selectorArg, Object transactionArg) throws MQClientException { + throw new RuntimeException("sendMessageInTransaction not implement, please use TransactionMQProducer class"); + } + /** * This method will be removed in a certain version after April 5, 2020, so please do not use this method. * diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java index 4286fdd7f96..13a652dcc42 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java @@ -84,6 +84,26 @@ void sendOneway(final Message msg, final MessageQueueSelector selector, final Ob TransactionSendResult sendMessageInTransaction(final Message msg, final Object arg) throws MQClientException; + /** + * Send a transactional message with a message queue selector. + * + *

The {@code selectorArg} is passed only to {@link MessageQueueSelector#select(List, Message, Object)}. + * The {@code transactionArg} is passed only to {@link TransactionListener#executeLocalTransaction(Message, Object)}. + * This method follows the existing selector send semantics and does not perform the broker or queue reselection + * retry used by the default send path.

+ * + * @param msg transactional message to send. + * @param selector message queue selector. + * @param selectorArg argument used by the selector. + * @param transactionArg argument used by the local transaction executor. + * @return transaction result. + * @throws MQClientException if the message cannot be sent. + */ + default TransactionSendResult sendMessageInTransaction(final Message msg, + final MessageQueueSelector selector, final Object selectorArg, final Object transactionArg) throws MQClientException { + throw new RuntimeException("sendMessageInTransaction not implement, please use TransactionMQProducer class"); + } + //for batch SendResult send(final Collection msgs) throws MQClientException, RemotingException, MQBrokerException, InterruptedException; diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java index 5c7b437809a..1c83bb20de1 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java @@ -89,6 +89,35 @@ public TransactionSendResult sendMessageInTransaction(final Message msg, return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg); } + /** + * Send a transactional message with a message queue selector. + * + *

The {@code selectorArg} is passed only to {@link MessageQueueSelector#select(List, Message, Object)}. + * The {@code transactionArg} is passed only to {@link TransactionListener#executeLocalTransaction(Message, Object)}. + * This method follows the existing selector send semantics and does not perform the broker or queue reselection + * retry used by the default send path.

+ * + * @param msg transactional message to send. + * @param selector message queue selector. + * @param selectorArg argument used by the selector. + * @param transactionArg argument used by the local transaction executor. + * @return transaction result. + * @throws MQClientException if the message cannot be sent. + */ + @Override + public TransactionSendResult sendMessageInTransaction(final Message msg, + final MessageQueueSelector selector, final Object selectorArg, final Object transactionArg) throws MQClientException { + if (null == this.transactionListener) { + throw new MQClientException("TransactionListener is null", null); + } + if (null == selector) { + throw new MQClientException("MessageQueueSelector is null", null); + } + + msg.setTopic(NamespaceUtil.wrapNamespace(this.getNamespace(), msg.getTopic())); + return this.defaultMQProducerImpl.sendMessageInTransaction(msg, selector, selectorArg, transactionArg); + } + public TransactionCheckListener getTransactionCheckListener() { return transactionCheckListener; } diff --git a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java index 33cf0df390d..ff549b1db2c 100644 --- a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java @@ -29,14 +29,17 @@ import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl; import org.apache.rocketmq.client.impl.producer.TopicPublishInfo; import org.apache.rocketmq.client.latency.MQFaultStrategy; +import org.apache.rocketmq.common.ServiceState; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.compression.CompressionType; import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.remoting.netty.NettyRemotingClient; +import org.apache.rocketmq.remoting.protocol.header.EndTransactionRequestHeader; import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader; import org.apache.rocketmq.remoting.protocol.route.BrokerData; import org.apache.rocketmq.remoting.protocol.route.QueueData; @@ -50,6 +53,7 @@ import org.mockito.junit.MockitoJUnitRunner; import java.lang.reflect.Field; +import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -63,6 +67,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown; @@ -77,6 +82,8 @@ import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.nullable; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -777,6 +784,145 @@ public void assertSendMessageInTransaction() throws MQClientException { assertNull(result); } + @Test(expected = RuntimeException.class) + public void assertSendMessageInTransactionByQueueSelector() throws MQClientException { + MessageQueueSelector selector = mock(MessageQueueSelector.class); + TransactionSendResult result = producer.sendMessageInTransaction(message, selector, 1, 2); + assertNull(result); + } + + @Test + public void testSendMessageInTransactionByQueueSelector() throws Exception { + final String transactionGroup = producerGroupPrefix + "_transaction_" + System.nanoTime(); + TransactionMQProducer transactionProducer = new TransactionMQProducer(transactionGroup); + final AtomicReference actualTransactionArg = new AtomicReference<>(); + transactionProducer.setTransactionListener(new TransactionListener() { + @Override + public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { + actualTransactionArg.set(arg); + return LocalTransactionState.COMMIT_MESSAGE; + } + + @Override + public LocalTransactionState checkLocalTransaction(MessageExt msg) { + return LocalTransactionState.COMMIT_MESSAGE; + } + }); + transactionProducer.setNamesrvAddr("127.0.0.1:9876"); + prepareTransactionProducer(transactionProducer); + + try { + final AtomicReference requestHeaderRef = new AtomicReference<>(); + doAnswer(invocation -> { + SendMessageRequestHeader requestHeader = invocation.getArgument(3); + requestHeaderRef.set(requestHeader); + SendResult sendResult = createSendResult(SendStatus.SEND_OK); + sendResult.setOffsetMsgId(MessageDecoder.createMessageId(new InetSocketAddress("127.0.0.1", 12), 1)); + sendResult.setMessageQueue(new MessageQueue(topic, "BrokerA", requestHeader.getQueueId())); + return sendResult; + }).when(mQClientAPIImpl).sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), + anyLong(), any(CommunicationMode.class), nullable(SendCallback.class), nullable(TopicPublishInfo.class), + nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class)); + + final AtomicReference endTransactionBrokerAddr = new AtomicReference<>(); + final AtomicReference endTransactionRequestHeader = new AtomicReference<>(); + doAnswer(invocation -> { + endTransactionBrokerAddr.set(invocation.getArgument(0)); + endTransactionRequestHeader.set(invocation.getArgument(1)); + return null; + }).when(mQClientAPIImpl).endTransactionOneway(anyString(), any(EndTransactionRequestHeader.class), + nullable(String.class), anyLong()); + + final AtomicReference actualSelectorArg = new AtomicReference<>(); + TransactionSendResult result = transactionProducer.sendMessageInTransaction(message, new MessageQueueSelector() { + @Override + public MessageQueue select(List mqs, Message msg, Object arg) { + actualSelectorArg.set(arg); + return mqs.get(2); + } + }, "selectorArg", "transactionArg"); + + assertThat(actualSelectorArg.get()).isEqualTo("selectorArg"); + assertThat(actualTransactionArg.get()).isEqualTo("transactionArg"); + assertThat(requestHeaderRef.get().getQueueId()).isEqualTo(2); + assertThat(result.getMessageQueue()).isEqualTo(new MessageQueue(topic, "BrokerA", 2)); + assertThat(endTransactionBrokerAddr.get()).isEqualTo("127.0.0.1:10911"); + assertThat(endTransactionRequestHeader.get().getBrokerName()).isEqualTo("BrokerA"); + } finally { + transactionProducer.getDefaultMQProducerImpl().destroyTransactionEnv(); + } + } + + @Test + public void testSendMessageInTransactionByNullQueueSelector() { + TransactionMQProducer transactionProducer = new TransactionMQProducer(producerGroupPrefix + "_transaction_" + System.nanoTime()); + transactionProducer.setTransactionListener(new TransactionListener() { + @Override + public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { + return LocalTransactionState.COMMIT_MESSAGE; + } + + @Override + public LocalTransactionState checkLocalTransaction(MessageExt msg) { + return LocalTransactionState.COMMIT_MESSAGE; + } + }); + + try { + transactionProducer.sendMessageInTransaction(message, null, "selectorArg", "transactionArg"); + failBecauseExceptionWasNotThrown(MQClientException.class); + } catch (MQClientException e) { + assertThat(e).hasMessageContaining("MessageQueueSelector is null"); + } + } + + @Test + public void testSendMessageInTransactionBySelectorWithDifferentTopicQueue() throws Exception { + final String transactionGroup = producerGroupPrefix + "_transaction_" + System.nanoTime(); + TransactionMQProducer transactionProducer = new TransactionMQProducer(transactionGroup); + transactionProducer.setTransactionListener(new TransactionListener() { + @Override + public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { + return LocalTransactionState.COMMIT_MESSAGE; + } + + @Override + public LocalTransactionState checkLocalTransaction(MessageExt msg) { + return LocalTransactionState.COMMIT_MESSAGE; + } + }); + transactionProducer.setNamesrvAddr("127.0.0.1:9876"); + prepareTransactionProducer(transactionProducer); + + try { + transactionProducer.sendMessageInTransaction(message, new MessageQueueSelector() { + @Override + public MessageQueue select(List mqs, Message msg, Object arg) { + return new MessageQueue("OtherTopic", mqs.get(0).getBrokerName(), mqs.get(0).getQueueId()); + } + }, "selectorArg", "transactionArg"); + failBecauseExceptionWasNotThrown(MQClientException.class); + } catch (MQClientException e) { + assertThat(e).hasMessageContaining("send message Exception"); + assertThat(e.getCause()).hasMessageContaining("message's topic not equal mq's topic"); + } finally { + transactionProducer.getDefaultMQProducerImpl().destroyTransactionEnv(); + } + } + + private void prepareTransactionProducer(TransactionMQProducer transactionProducer) throws Exception { + transactionProducer.getDefaultMQProducerImpl().initTransactionEnv(); + mQClientFactory.getClientConfig().setNamesrvAddr(transactionProducer.getNamesrvAddr()); + TopicPublishInfo topicPublishInfo = MQClientInstance.topicRouteData2TopicPublishInfo(topic, createTopicRoute()); + topicPublishInfo.setHaveTopicRouterInfo(true); + transactionProducer.getDefaultMQProducerImpl().updateTopicPublishInfo(topic, topicPublishInfo); + doReturn("127.0.0.1:10911").when(mQClientFactory).findBrokerAddressInPublish(anyString()); + Field field = DefaultMQProducerImpl.class.getDeclaredField("mQClientFactory"); + field.setAccessible(true); + field.set(transactionProducer.getDefaultMQProducerImpl(), mQClientFactory); + transactionProducer.getDefaultMQProducerImpl().setServiceState(ServiceState.RUNNING); + } + @Test public void assertSearchOffset() throws MQClientException, NoSuchFieldException, IllegalAccessException { setDefaultMQProducerImpl(); diff --git a/test/src/test/resources/schema/api/client.producer.DefaultMQProducer.schema b/test/src/test/resources/schema/api/client.producer.DefaultMQProducer.schema index d1111fb4572..1747a1be8de 100644 --- a/test/src/test/resources/schema/api/client.producer.DefaultMQProducer.schema +++ b/test/src/test/resources/schema/api/client.producer.DefaultMQProducer.schema @@ -122,6 +122,7 @@ Method send(org.apache.rocketmq.client.producer.SendCallback,org.apache.rocketmq Method send(org.apache.rocketmq.client.producer.SendCallback,org.apache.rocketmq.common.message.Message,org.apache.rocketmq.common.message.MessageQueue) : public throws (void) Method send(org.apache.rocketmq.common.message.Message) : public throws (org.apache.rocketmq.client.producer.SendResult) Method send(org.apache.rocketmq.common.message.Message,org.apache.rocketmq.common.message.MessageQueue) : public throws (org.apache.rocketmq.client.producer.SendResult) +Method sendMessageInTransaction(java.lang.Object,java.lang.Object,org.apache.rocketmq.client.producer.MessageQueueSelector,org.apache.rocketmq.common.message.Message) : public throws (org.apache.rocketmq.client.producer.TransactionSendResult) Method sendMessageInTransaction(java.lang.Object,org.apache.rocketmq.common.message.Message) : public throws (org.apache.rocketmq.client.producer.TransactionSendResult) Method sendOneway(java.lang.Object,org.apache.rocketmq.client.producer.MessageQueueSelector,org.apache.rocketmq.common.message.Message) : public throws (void) Method sendOneway(org.apache.rocketmq.common.message.Message) : public throws (void)