diff --git a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java index eda80dfd44f..821b5fb552c 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java @@ -72,6 +72,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; @@ -105,6 +106,7 @@ import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CONSUME_MODE_KEY; import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_RETRY_KEY; import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_SYSTEM_KEY; +import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_MESSAGE_TYPE_KEY; import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_LANGUAGE_KEY; import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_NODE_ID; import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_NODE_TYPE; @@ -164,6 +166,11 @@ public class BrokerMetricsManager { private LongCounter rollBackMessagesTotal = new NopLongCounter(); private LongHistogram transactionFinishLatency = new NopLongHistogram(); + private final ConcurrentHashMap topicAttributesCache = new ConcurrentHashMap<>(); + private volatile String lastTopicName; + private volatile String lastTopicMsgType; + private volatile Attributes lastTopicAttributes; + private final RemotingMetricsManager remotingMetricsManager; private final PopMetricsManager popMetricsManager; @@ -195,6 +202,25 @@ public AttributesBuilder newAttributesBuilder() { return attributesBuilder; } + public Attributes getOrBuildTopicAttributes(String topic, String messageType, boolean isSystem) { + Attributes lastAttrs = this.lastTopicAttributes; + if (lastAttrs != null && topic.equals(this.lastTopicName) && messageType.equals(this.lastTopicMsgType)) { + return lastAttrs; + } + String cacheKey = topic + '|' + messageType; + Attributes attrs = topicAttributesCache.computeIfAbsent(cacheKey, k -> + newAttributesBuilder() + .put(LABEL_TOPIC_KEY, topic) + .put(LABEL_MESSAGE_TYPE_KEY, messageType) + .put(LABEL_IS_SYSTEM_KEY, isSystem) + .build() + ); + this.lastTopicName = topic; + this.lastTopicMsgType = messageType; + this.lastTopicAttributes = attrs; + return attrs; + } + private Attributes buildLagAttributes(ConsumerLagCalculator.BaseCalculateResult result) { AttributesBuilder attributesBuilder = newAttributesBuilder(); attributesBuilder.put(LABEL_CONSUMER_GROUP_KEY, result.group); diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/RemotingCodeDistributionHandler.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/RemotingCodeDistributionHandler.java index c6a97fe441b..06192a50651 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/RemotingCodeDistributionHandler.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/RemotingCodeDistributionHandler.java @@ -32,6 +32,10 @@ public class RemotingCodeDistributionHandler extends ChannelDuplexHandler { private final ConcurrentMap inboundDistribution; private final ConcurrentMap outboundDistribution; + private volatile int lastInCode = Integer.MIN_VALUE; + private volatile LongAdder lastInAdder; + private volatile int lastOutCode = Integer.MIN_VALUE; + private volatile LongAdder lastOutAdder; public RemotingCodeDistributionHandler() { inboundDistribution = new ConcurrentHashMap<>(); @@ -39,12 +43,24 @@ public RemotingCodeDistributionHandler() { } private void countInbound(int requestCode) { + if (requestCode == lastInCode) { + lastInAdder.increment(); + return; + } LongAdder item = inboundDistribution.computeIfAbsent(requestCode, k -> new LongAdder()); + lastInCode = requestCode; + lastInAdder = item; item.increment(); } private void countOutbound(int responseCode) { + if (responseCode == lastOutCode) { + lastOutAdder.increment(); + return; + } LongAdder item = outboundDistribution.computeIfAbsent(responseCode, k -> new LongAdder()); + lastOutCode = responseCode; + lastOutAdder = item; item.increment(); }