Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ public void deleteLmq(String parentTopic, String lmqName) {
groups.forEach(group -> {
String topicAtGroup = lmqName + TOPIC_GROUP_SEPARATOR + group;
brokerController.getConsumerOffsetManager().getOffsetTable().remove(topicAtGroup);
brokerController.getConsumerOffsetManager().eraseResetOffset(lmqName, group, 0);
brokerController.getConsumerOffsetManager().removeConsumerOffset(topicAtGroup); // no iteration
brokerController.getPopLiteMessageProcessor().getConsumerOrderInfoManager().remove(lmqName, group);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -466,8 +466,21 @@ public Long queryThenEraseResetOffset(String topic, String group, Integer queueI
ConcurrentMap<Integer, Long> map = resetOffsetTable.get(key);
if (null == map) {
return null;
} else {
return map.remove(queueId);
}
Long offset = map.remove(queueId);
if (map.isEmpty()) {
resetOffsetTable.computeIfPresent(key, (k, _map) ->
_map.isEmpty() ? null : _map
);
}
return offset;
}

public void eraseResetOffset(String topic, String group, int queueId) {
String key = topic + TOPIC_GROUP_SEPARATOR + group;
resetOffsetTable.computeIfPresent(key, (k, map) -> {
map.remove(queueId);
return map.isEmpty() ? null : map;
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,10 @@ public Pair<StringBuilder, GetMessageResult> popLiteTopic(String parentTopic, St
}

public boolean isFifoBlocked(String attemptId, String group, String lmqName, long invisibleTime) {
if (brokerController.getBrokerConfig().isUseServerSideResetOffset() &&
this.brokerController.getConsumerOffsetManager().hasOffsetReset(lmqName, group, 0)) {
return false;
}
return consumerOrderInfoManager.checkBlock(attemptId, lmqName, group, 0, invisibleTime);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,4 +100,25 @@ public void testOffsetPersistInMemory() {
ConcurrentMap<Integer, Long> offsetTableLoaded = manager.getOffsetTable().get(group);
Assert.assertEquals(table, offsetTableLoaded);
}

@Test
public void testEraseResetOffset() {
String topic = "Topic";
String group = "Group";
String key = topic + TOPIC_GROUP_SEPARATOR + group;
consumerOffsetManager.assignResetOffset(topic, group, 0, 100L);
consumerOffsetManager.assignResetOffset(topic, group, 1, 200L);

Assert.assertTrue(consumerOffsetManager.hasOffsetReset(topic, group, 0));
Assert.assertTrue(consumerOffsetManager.hasOffsetReset(topic, group, 1));

consumerOffsetManager.eraseResetOffset(topic, group, 0);
Assert.assertFalse(consumerOffsetManager.hasOffsetReset(topic, group, 0));
Assert.assertTrue(consumerOffsetManager.hasOffsetReset(topic, group, 1));
Assert.assertTrue(consumerOffsetManager.resetOffsetTable.containsKey(key));

consumerOffsetManager.eraseResetOffset(topic, group, 1);
Assert.assertFalse(consumerOffsetManager.hasOffsetReset(topic, group, 1));
Assert.assertFalse(consumerOffsetManager.resetOffsetTable.containsKey(key));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
Expand Down Expand Up @@ -149,6 +150,16 @@ public void testIsFifoBlocked() {
verify(consumerOrderInfoManager).checkBlock("attemptId", "lmqName", "group", 0, 1000L);
}

@Test
public void testIsFifoBlocked_hasResetOffset() {
brokerConfig.setUseServerSideResetOffset(true);
when(consumerOffsetManager.hasOffsetReset("lmqName", "group", 0)).thenReturn(true);

assertFalse(popLiteMessageProcessor.isFifoBlocked("attemptId", "group", "lmqName", 1000L));
verify(consumerOffsetManager).hasOffsetReset("lmqName", "group", 0);
verify(consumerOrderInfoManager, never()).checkBlock(anyString(), anyString(), anyString(), anyInt(), anyLong());
}

@Test
public void testGetPopOffset_normal() throws ConsumeQueueException {
String group = "group";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) t
defaultMQAdminExt.searchOffset(brokerAddr, topic, queueId, timestamp, 3000);

System.out.printf("reset consumer offset to %d%n", resetOffset);
if (resetOffset > 0) {
if (resetOffset >= 0) {
defaultMQAdminExt.resetOffsetByQueueId(brokerAddr, group, topic, queueId, resetOffset);
}
return;
Expand Down
Loading