diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index 8294ffd422f..235f2d7d954 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -230,7 +230,6 @@ import org.apache.rocketmq.remoting.protocol.header.SearchOffsetRequestHeader; import org.apache.rocketmq.remoting.protocol.header.SearchOffsetResponseHeader; import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader; -import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeaderV2; import org.apache.rocketmq.remoting.protocol.header.SendMessageResponseHeader; import org.apache.rocketmq.remoting.protocol.header.TriggerLiteDispatchRequestHeader; import org.apache.rocketmq.remoting.protocol.header.UnlockBatchMqRequestHeader; @@ -548,16 +547,12 @@ public SendResult sendMessage( String msgType = msg.getProperty(MessageConst.PROPERTY_MESSAGE_TYPE); boolean isReply = msgType != null && msgType.equals(MixAll.REPLY_MESSAGE_FLAG); if (isReply) { - if (sendSmartMsg) { - SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader); - request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE_V2, requestHeaderV2); - } else { - request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE, requestHeader); - } + request = RemotingCommand.createRequestCommand( + sendSmartMsg ? RequestCode.SEND_REPLY_MESSAGE_V2 : RequestCode.SEND_REPLY_MESSAGE, requestHeader); } else { if (sendSmartMsg || msg instanceof MessageBatch) { - SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader); - request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2); + request = RemotingCommand.createRequestCommand( + msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeader); } else { request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader); } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java b/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java index f577ea2043d..fd672ba548c 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java @@ -89,7 +89,6 @@ import org.apache.rocketmq.remoting.protocol.header.SearchOffsetRequestHeader; import org.apache.rocketmq.remoting.protocol.header.SearchOffsetResponseHeader; import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader; -import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeaderV2; import org.apache.rocketmq.remoting.protocol.header.UnlockBatchMqRequestHeader; import org.apache.rocketmq.remoting.protocol.header.UpdateConsumerOffsetRequestHeader; import org.apache.rocketmq.remoting.protocol.heartbeat.HeartbeatData; @@ -176,8 +175,7 @@ public CompletableFuture sendMessageAsync( SendMessageRequestHeader requestHeader, long timeoutMillis ) { - SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader); - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE_V2, requestHeaderV2); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE_V2, requestHeader); request.setBody(msg.getBody()); return this.getRemotingClient().invoke(brokerAddr, request, timeoutMillis).thenCompose(response -> { @@ -198,8 +196,7 @@ public CompletableFuture sendMessageAsync( SendMessageRequestHeader requestHeader, long timeoutMillis ) { - SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader); - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.SEND_BATCH_MESSAGE, requestHeaderV2); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.SEND_BATCH_MESSAGE, requestHeader); CompletableFuture future = new CompletableFuture<>(); try { diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/SendMessageRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/SendMessageRequestHeader.java index 2857fb516a6..f8335a03423 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/SendMessageRequestHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/SendMessageRequestHeader.java @@ -21,6 +21,8 @@ package org.apache.rocketmq.remoting.protocol.header; import com.google.common.base.MoreObjects; +import io.netty.buffer.ByteBuf; +import java.util.HashMap; import org.apache.rocketmq.common.action.Action; import org.apache.rocketmq.common.action.RocketMQAction; import org.apache.rocketmq.common.resource.ResourceType; @@ -28,12 +30,13 @@ import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.annotation.CFNullable; import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.protocol.FastCodesHeader; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RequestCode; import org.apache.rocketmq.remoting.rpc.TopicQueueRequestHeader; @RocketMQAction(value = RequestCode.SEND_MESSAGE, action = Action.PUB) -public class SendMessageRequestHeader extends TopicQueueRequestHeader { +public class SendMessageRequestHeader extends TopicQueueRequestHeader implements FastCodesHeader { @CFNotNull private String producerGroup; @CFNotNull @@ -121,10 +124,14 @@ public Long getBornTimestamp() { return bornTimestamp; } - public void setBornTimestamp(Long bornTimestamp) { + public void setBornTimestamp(long bornTimestamp) { this.bornTimestamp = bornTimestamp; } + public void setBornTimestamp(Long bornTimestamp) { + this.bornTimestamp = bornTimestamp != null ? bornTimestamp : 0L; + } + public Integer getFlag() { return flag; } @@ -183,22 +190,140 @@ public void setBatch(Boolean batch) { } public static SendMessageRequestHeader parseRequestHeader(RemotingCommand request) throws RemotingCommandException { - SendMessageRequestHeaderV2 requestHeaderV2 = null; - SendMessageRequestHeader requestHeader = null; - switch (request.getCode()) { - case RequestCode.SEND_BATCH_MESSAGE: - case RequestCode.SEND_MESSAGE_V2: - requestHeaderV2 = request.decodeCommandCustomHeader(SendMessageRequestHeaderV2.class); - case RequestCode.SEND_MESSAGE: - if (null == requestHeaderV2) { - requestHeader = request.decodeCommandCustomHeader(SendMessageRequestHeader.class); - } else { - requestHeader = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV1(requestHeaderV2); - } - default: - break; - } - return requestHeader; + return request.decodeCommandCustomHeader(SendMessageRequestHeader.class); + } + + @Override + public void encode(ByteBuf out) { + writeIfNotNull(out, "a", producerGroup); + writeIfNotNull(out, "b", topic); + writeIfNotNull(out, "c", defaultTopic); + writeIfNotNull(out, "d", defaultTopicQueueNums); + writeIfNotNull(out, "e", queueId); + writeIfNotNull(out, "f", sysFlag); + writeIfNotNull(out, "g", bornTimestamp); + writeIfNotNull(out, "h", flag); + writeIfNotNull(out, "i", properties); + writeIfNotNull(out, "j", reconsumeTimes); + writeIfNotNull(out, "k", unitMode); + writeIfNotNull(out, "l", maxReconsumeTimes); + writeIfNotNull(out, "m", batch); + writeIfNotNull(out, "n", getBname()); + } + + @Override + public void decode(HashMap fields) throws RemotingCommandException { + String str = fields.get("a"); + if (str == null) { + str = getAndCheckNotNull(fields, "producerGroup"); + } + if (str != null) { + this.producerGroup = str; + } + + str = fields.get("b"); + if (str == null) { + str = getAndCheckNotNull(fields, "topic"); + } + if (str != null) { + this.topic = str; + } + + str = fields.get("c"); + if (str == null) { + str = getAndCheckNotNull(fields, "defaultTopic"); + } + if (str != null) { + this.defaultTopic = str; + } + + str = fields.get("d"); + if (str == null) { + str = getAndCheckNotNull(fields, "defaultTopicQueueNums"); + } + if (str != null) { + this.defaultTopicQueueNums = Integer.parseInt(str); + } + + str = fields.get("e"); + if (str == null) { + str = getAndCheckNotNull(fields, "queueId"); + } + if (str != null) { + this.queueId = Integer.parseInt(str); + } + + str = fields.get("f"); + if (str == null) { + str = getAndCheckNotNull(fields, "sysFlag"); + } + if (str != null) { + this.sysFlag = Integer.parseInt(str); + } + + str = fields.get("g"); + if (str == null) { + str = getAndCheckNotNull(fields, "bornTimestamp"); + } + if (str != null) { + this.bornTimestamp = Long.parseLong(str); + } + + str = fields.get("h"); + if (str == null) { + str = getAndCheckNotNull(fields, "flag"); + } + if (str != null) { + this.flag = Integer.parseInt(str); + } + + str = fields.get("i"); + if (str == null) { + str = fields.get("properties"); + } + if (str != null) { + this.properties = str; + } + + str = fields.get("j"); + if (str == null) { + str = fields.get("reconsumeTimes"); + } + if (str != null) { + this.reconsumeTimes = Integer.parseInt(str); + } + + str = fields.get("k"); + if (str == null) { + str = fields.get("unitMode"); + } + if (str != null) { + this.unitMode = Boolean.parseBoolean(str); + } + + str = fields.get("l"); + if (str == null) { + str = fields.get("maxReconsumeTimes"); + } + if (str != null) { + this.maxReconsumeTimes = Integer.parseInt(str); + } + + str = fields.get("m"); + if (str == null) { + str = fields.get("batch"); + } + if (str != null) { + this.batch = Boolean.parseBoolean(str); + } + + str = fields.get("n"); + if (str == null) { + str = fields.get("brokerName"); + } + if (str != null) { + this.setBrokerName(str); + } } @Override