Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1433,6 +1433,21 @@ public void sendOneway(Message msg, MessageQueueSelector selector, Object arg)
public TransactionSendResult sendMessageInTransaction(final Message msg,
final TransactionListener localTransactionListener, final Object arg)
throws MQClientException {
return sendMessageInTransaction(msg, localTransactionListener, null, null, arg);
}

public TransactionSendResult sendMessageInTransaction(final Message msg,
final MessageQueueSelector selector, final Object selectorArg, final Object arg)
throws MQClientException {
if (selector == null) {
throw new MQClientException("MessageQueueSelector is null", null);
}
return sendMessageInTransaction(msg, null, selector, selectorArg, arg);
}

private TransactionSendResult sendMessageInTransaction(final Message msg,
final TransactionListener localTransactionListener, final MessageQueueSelector selector,
final Object selectorArg, final Object arg) throws MQClientException {
TransactionListener transactionListener = getCheckListener();
if (null == localTransactionListener && null == transactionListener) {
throw new MQClientException("tranExecutor is null", null);
Expand All @@ -1445,7 +1460,13 @@ public TransactionSendResult sendMessageInTransaction(final Message msg,
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
try {
sendResult = this.send(msg);
if (selector == null) {
sendResult = this.send(msg);
} else {
MessageQueue mq = this.invokeMessageQueueSelector(msg, selector, selectorArg,
this.defaultMQProducer.getSendMsgTimeout());
sendResult = this.send(msg, mq);
}
} catch (Exception e) {
throw new MQClientException("send message Exception", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -950,6 +950,27 @@ public TransactionSendResult sendMessageInTransaction(Message msg,
throw new RuntimeException("sendMessageInTransaction not implement, please use TransactionMQProducer class");
}

/**
* This method is used to send transactional messages with a message queue selector.
*
* <p>The {@code selectorArg} is passed only to {@link MessageQueueSelector#select(List, Message, Object)}.
* The {@code transactionArg} is passed only to {@link TransactionListener#executeLocalTransaction(Message, Object)}.
* This method follows the existing selector send semantics and does not perform the broker or queue reselection
* retry used by the default send path.</p>
*
* @param msg Transactional message to send.
* @param selector Message queue selector.
* @param selectorArg Argument used by the selector.
* @param transactionArg Argument used along with local transaction executor.
* @return Transaction result.
* @throws MQClientException if there is any client error.
*/
@Override
public TransactionSendResult sendMessageInTransaction(Message msg,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this method already has the same default implementation in MQProducer, do we still need to duplicate it in DefaultMQProducer?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Runtime-wise, the override is not strictly required because MQProducer already provides the same default implementation.

I kept it in DefaultMQProducer for two reasons:

  1. To keep the transactional-send rejection explicit and consistent with the existing sendMessageInTransaction(Message, Object) method in DefaultMQProducer.
  2. To make the public API change visible in DefaultMQProducer.schema. The current schema tool collects declared methods from the class/superclasses, but does not include interface default methods.

If you prefer to avoid the duplicated override, I can remove it and adjust the schema expectation accordingly.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation. I think we should keep the style consistent. If we prefer to explicitly implement this method in DefaultMQProducer, should we avoid making it a default method in MQProducer and keep it consistent with the other interface methods?

MessageQueueSelector selector, Object selectorArg, Object transactionArg) throws MQClientException {
throw new RuntimeException("sendMessageInTransaction not implement, please use TransactionMQProducer class");
}

/**
* This method will be removed in a certain version after April 5, 2020, so please do not use this method.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,26 @@ void sendOneway(final Message msg, final MessageQueueSelector selector, final Ob
TransactionSendResult sendMessageInTransaction(final Message msg,
final Object arg) throws MQClientException;

/**
* Send a transactional message with a message queue selector.
*
* <p>The {@code selectorArg} is passed only to {@link MessageQueueSelector#select(List, Message, Object)}.
* The {@code transactionArg} is passed only to {@link TransactionListener#executeLocalTransaction(Message, Object)}.
* This method follows the existing selector send semantics and does not perform the broker or queue reselection
* retry used by the default send path.</p>
*
* @param msg transactional message to send.
* @param selector message queue selector.
* @param selectorArg argument used by the selector.
* @param transactionArg argument used by the local transaction executor.
* @return transaction result.
* @throws MQClientException if the message cannot be sent.
*/
default TransactionSendResult sendMessageInTransaction(final Message msg,
final MessageQueueSelector selector, final Object selectorArg, final Object transactionArg) throws MQClientException {
throw new RuntimeException("sendMessageInTransaction not implement, please use TransactionMQProducer class");
}

//for batch
SendResult send(final Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException,
InterruptedException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,35 @@ public TransactionSendResult sendMessageInTransaction(final Message msg,
return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg);
}

/**
* Send a transactional message with a message queue selector.
*
* <p>The {@code selectorArg} is passed only to {@link MessageQueueSelector#select(List, Message, Object)}.
* The {@code transactionArg} is passed only to {@link TransactionListener#executeLocalTransaction(Message, Object)}.
* This method follows the existing selector send semantics and does not perform the broker or queue reselection
* retry used by the default send path.</p>
*
* @param msg transactional message to send.
* @param selector message queue selector.
* @param selectorArg argument used by the selector.
* @param transactionArg argument used by the local transaction executor.
* @return transaction result.
* @throws MQClientException if the message cannot be sent.
*/
@Override
public TransactionSendResult sendMessageInTransaction(final Message msg,
final MessageQueueSelector selector, final Object selectorArg, final Object transactionArg) throws MQClientException {
if (null == this.transactionListener) {
throw new MQClientException("TransactionListener is null", null);
}
if (null == selector) {
throw new MQClientException("MessageQueueSelector is null", null);
}

msg.setTopic(NamespaceUtil.wrapNamespace(this.getNamespace(), msg.getTopic()));
return this.defaultMQProducerImpl.sendMessageInTransaction(msg, selector, selectorArg, transactionArg);
}

public TransactionCheckListener getTransactionCheckListener() {
return transactionCheckListener;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,17 @@
import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
import org.apache.rocketmq.client.latency.MQFaultStrategy;
import org.apache.rocketmq.common.ServiceState;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.compression.CompressionType;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
import org.apache.rocketmq.remoting.protocol.header.EndTransactionRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.route.BrokerData;
import org.apache.rocketmq.remoting.protocol.route.QueueData;
Expand All @@ -50,6 +53,7 @@
import org.mockito.junit.MockitoJUnitRunner;

import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -63,6 +67,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown;
Expand All @@ -77,6 +82,8 @@
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -777,6 +784,145 @@ public void assertSendMessageInTransaction() throws MQClientException {
assertNull(result);
}

@Test(expected = RuntimeException.class)
public void assertSendMessageInTransactionByQueueSelector() throws MQClientException {
MessageQueueSelector selector = mock(MessageQueueSelector.class);
TransactionSendResult result = producer.sendMessageInTransaction(message, selector, 1, 2);
assertNull(result);
}

@Test
public void testSendMessageInTransactionByQueueSelector() throws Exception {
final String transactionGroup = producerGroupPrefix + "_transaction_" + System.nanoTime();
TransactionMQProducer transactionProducer = new TransactionMQProducer(transactionGroup);
final AtomicReference<Object> actualTransactionArg = new AtomicReference<>();
transactionProducer.setTransactionListener(new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
actualTransactionArg.set(arg);
return LocalTransactionState.COMMIT_MESSAGE;
}

@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
return LocalTransactionState.COMMIT_MESSAGE;
}
});
transactionProducer.setNamesrvAddr("127.0.0.1:9876");
prepareTransactionProducer(transactionProducer);

try {
final AtomicReference<SendMessageRequestHeader> requestHeaderRef = new AtomicReference<>();
doAnswer(invocation -> {
SendMessageRequestHeader requestHeader = invocation.getArgument(3);
requestHeaderRef.set(requestHeader);
SendResult sendResult = createSendResult(SendStatus.SEND_OK);
sendResult.setOffsetMsgId(MessageDecoder.createMessageId(new InetSocketAddress("127.0.0.1", 12), 1));
sendResult.setMessageQueue(new MessageQueue(topic, "BrokerA", requestHeader.getQueueId()));
return sendResult;
}).when(mQClientAPIImpl).sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class),
anyLong(), any(CommunicationMode.class), nullable(SendCallback.class), nullable(TopicPublishInfo.class),
nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class));

final AtomicReference<String> endTransactionBrokerAddr = new AtomicReference<>();
final AtomicReference<EndTransactionRequestHeader> endTransactionRequestHeader = new AtomicReference<>();
doAnswer(invocation -> {
endTransactionBrokerAddr.set(invocation.getArgument(0));
endTransactionRequestHeader.set(invocation.getArgument(1));
return null;
}).when(mQClientAPIImpl).endTransactionOneway(anyString(), any(EndTransactionRequestHeader.class),
nullable(String.class), anyLong());

final AtomicReference<Object> actualSelectorArg = new AtomicReference<>();
TransactionSendResult result = transactionProducer.sendMessageInTransaction(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
actualSelectorArg.set(arg);
return mqs.get(2);
}
}, "selectorArg", "transactionArg");

assertThat(actualSelectorArg.get()).isEqualTo("selectorArg");
assertThat(actualTransactionArg.get()).isEqualTo("transactionArg");
assertThat(requestHeaderRef.get().getQueueId()).isEqualTo(2);
assertThat(result.getMessageQueue()).isEqualTo(new MessageQueue(topic, "BrokerA", 2));
assertThat(endTransactionBrokerAddr.get()).isEqualTo("127.0.0.1:10911");
assertThat(endTransactionRequestHeader.get().getBrokerName()).isEqualTo("BrokerA");
} finally {
transactionProducer.getDefaultMQProducerImpl().destroyTransactionEnv();
}
}

@Test
public void testSendMessageInTransactionByNullQueueSelector() {
TransactionMQProducer transactionProducer = new TransactionMQProducer(producerGroupPrefix + "_transaction_" + System.nanoTime());
transactionProducer.setTransactionListener(new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
return LocalTransactionState.COMMIT_MESSAGE;
}

@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
return LocalTransactionState.COMMIT_MESSAGE;
}
});

try {
transactionProducer.sendMessageInTransaction(message, null, "selectorArg", "transactionArg");
failBecauseExceptionWasNotThrown(MQClientException.class);
} catch (MQClientException e) {
assertThat(e).hasMessageContaining("MessageQueueSelector is null");
}
}

@Test
public void testSendMessageInTransactionBySelectorWithDifferentTopicQueue() throws Exception {
final String transactionGroup = producerGroupPrefix + "_transaction_" + System.nanoTime();
TransactionMQProducer transactionProducer = new TransactionMQProducer(transactionGroup);
transactionProducer.setTransactionListener(new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
return LocalTransactionState.COMMIT_MESSAGE;
}

@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
return LocalTransactionState.COMMIT_MESSAGE;
}
});
transactionProducer.setNamesrvAddr("127.0.0.1:9876");
prepareTransactionProducer(transactionProducer);

try {
transactionProducer.sendMessageInTransaction(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
return new MessageQueue("OtherTopic", mqs.get(0).getBrokerName(), mqs.get(0).getQueueId());
}
}, "selectorArg", "transactionArg");
failBecauseExceptionWasNotThrown(MQClientException.class);
} catch (MQClientException e) {
assertThat(e).hasMessageContaining("send message Exception");
assertThat(e.getCause()).hasMessageContaining("message's topic not equal mq's topic");
} finally {
transactionProducer.getDefaultMQProducerImpl().destroyTransactionEnv();
}
}

private void prepareTransactionProducer(TransactionMQProducer transactionProducer) throws Exception {
transactionProducer.getDefaultMQProducerImpl().initTransactionEnv();
mQClientFactory.getClientConfig().setNamesrvAddr(transactionProducer.getNamesrvAddr());
TopicPublishInfo topicPublishInfo = MQClientInstance.topicRouteData2TopicPublishInfo(topic, createTopicRoute());
topicPublishInfo.setHaveTopicRouterInfo(true);
transactionProducer.getDefaultMQProducerImpl().updateTopicPublishInfo(topic, topicPublishInfo);
doReturn("127.0.0.1:10911").when(mQClientFactory).findBrokerAddressInPublish(anyString());
Field field = DefaultMQProducerImpl.class.getDeclaredField("mQClientFactory");
field.setAccessible(true);
field.set(transactionProducer.getDefaultMQProducerImpl(), mQClientFactory);
transactionProducer.getDefaultMQProducerImpl().setServiceState(ServiceState.RUNNING);
}

@Test
public void assertSearchOffset() throws MQClientException, NoSuchFieldException, IllegalAccessException {
setDefaultMQProducerImpl();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ Method send(org.apache.rocketmq.client.producer.SendCallback,org.apache.rocketmq
Method send(org.apache.rocketmq.client.producer.SendCallback,org.apache.rocketmq.common.message.Message,org.apache.rocketmq.common.message.MessageQueue) : public throws (void)
Method send(org.apache.rocketmq.common.message.Message) : public throws (org.apache.rocketmq.client.producer.SendResult)
Method send(org.apache.rocketmq.common.message.Message,org.apache.rocketmq.common.message.MessageQueue) : public throws (org.apache.rocketmq.client.producer.SendResult)
Method sendMessageInTransaction(java.lang.Object,java.lang.Object,org.apache.rocketmq.client.producer.MessageQueueSelector,org.apache.rocketmq.common.message.Message) : public throws (org.apache.rocketmq.client.producer.TransactionSendResult)
Method sendMessageInTransaction(java.lang.Object,org.apache.rocketmq.common.message.Message) : public throws (org.apache.rocketmq.client.producer.TransactionSendResult)
Method sendOneway(java.lang.Object,org.apache.rocketmq.client.producer.MessageQueueSelector,org.apache.rocketmq.common.message.Message) : public throws (void)
Method sendOneway(org.apache.rocketmq.common.message.Message) : public throws (void)
Expand Down