From bf4b5ff4f54e3c408d0e5c2b493e7005ece5789a Mon Sep 17 00:00:00 2001 From: Quan Date: Fri, 26 Jun 2026 18:41:30 +0800 Subject: [PATCH 1/2] [ISSUE #10549] Fix lite topic reset offset: memory leak, FIFO block bypass, and offset-0 reset failure - Fix memory leak in removeResetOffset: clean up empty inner map entries from resetOffsetTable - Add eraseResetOffset for precise cleanup on lite topic removal - Skip FIFO block check in isFifoBlocked when server-side reset offset is pending - Fix ResetOffsetByTimeCommand: change resetOffset > 0 to >= 0 to allow resetting to offset 0 - Add unit tests for eraseResetOffset and isFifoBlocked reset bypass --- .../lite/AbstractLiteLifecycleManager.java | 1 + .../broker/offset/ConsumerOffsetManager.java | 17 +++++++++++++-- .../processor/PopLiteMessageProcessor.java | 4 ++++ .../offset/ConsumerOffsetManagerTest.java | 21 +++++++++++++++++++ .../PopLiteMessageProcessorTest.java | 11 ++++++++++ .../offset/ResetOffsetByTimeCommand.java | 2 +- 6 files changed, 53 insertions(+), 3 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/lite/AbstractLiteLifecycleManager.java b/broker/src/main/java/org/apache/rocketmq/broker/lite/AbstractLiteLifecycleManager.java index b038a692d30..f7f522b8332 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/lite/AbstractLiteLifecycleManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/lite/AbstractLiteLifecycleManager.java @@ -205,6 +205,7 @@ public void deleteLmq(String parentTopic, String lmqName) { groups.forEach(group -> { String topicAtGroup = lmqName + TOPIC_GROUP_SEPARATOR + group; brokerController.getConsumerOffsetManager().getOffsetTable().remove(topicAtGroup); + brokerController.getConsumerOffsetManager().eraseResetOffset(lmqName, group, 0); brokerController.getConsumerOffsetManager().removeConsumerOffset(topicAtGroup); // no iteration brokerController.getPopLiteMessageProcessor().getConsumerOrderInfoManager().remove(lmqName, group); }); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java index f9debf38579..1d3bf7bed09 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java @@ -466,8 +466,21 @@ public Long queryThenEraseResetOffset(String topic, String group, Integer queueI ConcurrentMap map = resetOffsetTable.get(key); if (null == map) { return null; - } else { - return map.remove(queueId); } + Long offset = map.remove(queueId); + if (map.isEmpty()) { + resetOffsetTable.computeIfPresent(key, (k, _map) -> + _map.isEmpty() ? null : _map + ); + } + return offset; + } + + public void eraseResetOffset(String topic, String group, int queueId) { + String key = topic + TOPIC_GROUP_SEPARATOR + group; + resetOffsetTable.computeIfPresent(key, (k, map) -> { + map.remove(queueId); + return map.isEmpty() ? null : map; + }); } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopLiteMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopLiteMessageProcessor.java index a1fa4171520..4da72ef4bdf 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopLiteMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopLiteMessageProcessor.java @@ -311,6 +311,10 @@ public Pair popLiteTopic(String parentTopic, St } public boolean isFifoBlocked(String attemptId, String group, String lmqName, long invisibleTime) { + if (brokerController.getBrokerConfig().isUseServerSideResetOffset() && + this.brokerController.getConsumerOffsetManager().hasOffsetReset(lmqName, group, 0)) { + return false; + } return consumerOrderInfoManager.checkBlock(attemptId, lmqName, group, 0, invisibleTime); } diff --git a/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java index 3ddd369c7fb..7e4faa4e42f 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java @@ -100,4 +100,25 @@ public void testOffsetPersistInMemory() { ConcurrentMap offsetTableLoaded = manager.getOffsetTable().get(group); Assert.assertEquals(table, offsetTableLoaded); } + + @Test + public void testEraseResetOffset() { + String topic = "Topic"; + String group = "Group"; + String key = topic + TOPIC_GROUP_SEPARATOR + group; + consumerOffsetManager.assignResetOffset(topic, group, 0, 100L); + consumerOffsetManager.assignResetOffset(topic, group, 1, 200L); + + Assert.assertTrue(consumerOffsetManager.hasOffsetReset(topic, group, 0)); + Assert.assertTrue(consumerOffsetManager.hasOffsetReset(topic, group, 1)); + + consumerOffsetManager.eraseResetOffset(topic, group, 0); + Assert.assertFalse(consumerOffsetManager.hasOffsetReset(topic, group, 0)); + Assert.assertTrue(consumerOffsetManager.hasOffsetReset(topic, group, 1)); + Assert.assertTrue(consumerOffsetManager.resetOffsetTable.containsKey(key)); + + consumerOffsetManager.eraseResetOffset(topic, group, 1); + Assert.assertFalse(consumerOffsetManager.hasOffsetReset(topic, group, 1)); + Assert.assertFalse(consumerOffsetManager.resetOffsetTable.containsKey(key)); + } } diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopLiteMessageProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopLiteMessageProcessorTest.java index 9705ab4f5af..957386b166b 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopLiteMessageProcessorTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopLiteMessageProcessorTest.java @@ -56,6 +56,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -149,6 +150,16 @@ public void testIsFifoBlocked() { verify(consumerOrderInfoManager).checkBlock("attemptId", "lmqName", "group", 0, 1000L); } + @Test + public void testIsFifoBlocked_hasResetOffset() { + brokerConfig.setUseServerSideResetOffset(true); + when(consumerOffsetManager.hasOffsetReset("lmqName", "group", 0)).thenReturn(true); + + assertFalse(popLiteMessageProcessor.isFifoBlocked("attemptId", "group", "lmqName", 1000L)); + verify(consumerOffsetManager).hasOffsetReset("lmqName", "group", 0); + verify(consumerOrderInfoManager, never()).checkBlock(anyString(), anyString(), anyString(), anyInt(), anyLong()); + } + @Test public void testGetPopOffset_normal() throws ConsumeQueueException { String group = "group"; diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeCommand.java index 84a301bd60c..f0326fdb758 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeCommand.java @@ -140,7 +140,7 @@ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) t defaultMQAdminExt.searchOffset(brokerAddr, topic, queueId, timestamp, 3000); System.out.printf("reset consumer offset to %d%n", resetOffset); - if (resetOffset > 0) { + if (resetOffset >= 0) { defaultMQAdminExt.resetOffsetByQueueId(brokerAddr, group, topic, queueId, resetOffset); } return; From 015a2cbfbbc427850fd34227eecda983aaa60ef3 Mon Sep 17 00:00:00 2001 From: Quan Date: Mon, 29 Jun 2026 14:16:03 +0800 Subject: [PATCH 2/2] chore: empty commit to trigger CI pipeline