Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions broker/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ GenTestRules(
"src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerRegisterTest",
"src/test/java/org/apache/rocketmq/broker/BrokerOuterAPITest",
],
medium_tests = [
"src/test/java/org/apache/rocketmq/broker/BrokerShutdownTest",
],
deps = [
":tests",
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1207,6 +1207,8 @@ public void registerProcessor() {
*/
remotingServer.registerProcessor(RequestCode.CHANGE_MESSAGE_INVISIBLETIME, this.changeInvisibleTimeProcessor, this.ackMessageExecutor);
fastRemotingServer.registerProcessor(RequestCode.CHANGE_MESSAGE_INVISIBLETIME, this.changeInvisibleTimeProcessor, this.ackMessageExecutor);
remotingServer.registerProcessor(RequestCode.BATCH_CHANGE_MESSAGE_INVISIBLETIME, this.changeInvisibleTimeProcessor, this.ackMessageExecutor);
fastRemotingServer.registerProcessor(RequestCode.BATCH_CHANGE_MESSAGE_INVISIBLETIME, this.changeInvisibleTimeProcessor, this.ackMessageExecutor);
/**
* notificationProcessor
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,28 @@ public List<PopConsumerRecord> deleteRecords(List<PopConsumerRecord> consumerRec
return remain;
}

public void writeAndDeleteRecords(List<PopConsumerRecord> writeRecordList,
List<PopConsumerRecord> deleteRecordList) {
if (deleteRecordList.isEmpty()) {
consumerRecordStore.writeRecords(writeRecordList);
return;
}

List<PopConsumerRecord> storeDeleteRecords = new ArrayList<>(deleteRecordList.size());
List<PopConsumerRecord> bufferDeleteRecords = new ArrayList<>(deleteRecordList.size());
deleteRecordList.forEach(consumerRecord -> {
ConsumerRecords consumerRecords = consumerRecordTable.get(this.getKey(consumerRecord));
if (consumerRecords != null && consumerRecords.contains(consumerRecord)) {
bufferDeleteRecords.add(consumerRecord);
} else {
storeDeleteRecords.add(consumerRecord);
}
});

consumerRecordStore.writeAndDeleteRecords(writeRecordList, storeDeleteRecords);
deleteRecords(bufferDeleteRecords);
}

public int cleanupRecords(Consumer<PopConsumerRecord> consumer) {
int remain = 0;
Iterator<Map.Entry<String, ConsumerRecords>> iterator = consumerRecordTable.entrySet().iterator();
Expand Down Expand Up @@ -231,6 +253,10 @@ public boolean delete(PopConsumerRecord record) {
return recordTreeMap.remove(record.getOffset()) != null;
}

public boolean contains(PopConsumerRecord record) {
return recordTreeMap.containsKey(record.getOffset());
}

public long getMinOffsetInBuffer() {
Map.Entry<Long, PopConsumerRecord> entry = removeTreeMap.firstEntry();
if (entry != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ public interface PopConsumerKVStore {
*/
void deleteRecords(List<PopConsumerRecord> consumerRecordList);

/**
* Writes and deletes consumer records from the storage in a single batch.
* @param writeRecordList The list of consumer records to be written.
* @param deleteRecordList The list of consumer records to be deleted.
*/
void writeAndDeleteRecords(List<PopConsumerRecord> writeRecordList, List<PopConsumerRecord> deleteRecordList);

/**
* Scans and returns a list of expired consumer records within the specified time range.
* @param lowerTime The start time (inclusive) of the time range to search, in milliseconds.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,24 @@ public void deleteRecords(List<PopConsumerRecord> consumerRecordList) {
}
}

@Override
public void writeAndDeleteRecords(List<PopConsumerRecord> writeRecordList,
List<PopConsumerRecord> deleteRecordList) {
if (!writeRecordList.isEmpty() || !deleteRecordList.isEmpty()) {
try (WriteBatch writeBatch = new WriteBatch()) {
for (PopConsumerRecord record : deleteRecordList) {
writeBatch.delete(columnFamilyHandle, record.getKeyBytes());
}
for (PopConsumerRecord record : writeRecordList) {
writeBatch.put(columnFamilyHandle, record.getKeyBytes(), record.getValueBytes());
}
this.db.write(writeOptions, writeBatch);
} catch (RocksDBException e) {
throw new RuntimeException("Write and delete record error", e);
}
}
}

@Override
// https://github.com/facebook/rocksdb/issues/10300
public List<PopConsumerRecord> scanExpiredRecords(long lower, long upper, int maxCount) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
import org.apache.rocketmq.common.utils.ConcurrentHashMapUtils;
import org.apache.rocketmq.remoting.protocol.body.ChangeInvisibleTimeRequestEntry;
import org.apache.rocketmq.remoting.protocol.header.ExtraInfoUtil;
import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.store.AppendMessageStatus;
Expand Down Expand Up @@ -496,7 +497,6 @@ public CompletableFuture<Boolean> ackAsync(
public void changeInvisibilityDuration(long popTime, long invisibleTime, long changedPopTime,
long changedInvisibleTime, String groupId, String topicId,
int queueId, long offset, boolean suspend) {

if (brokerConfig.isPopConsumerKVServiceLog()) {
log.info("PopConsumerService change, time={}, invisible={}, " +
"groupId={}, topic={}, queueId={}, offset={}, new time={}, new invisible={}",
Expand All @@ -517,21 +517,67 @@ public void changeInvisibilityDuration(long popTime, long invisibleTime, long ch
if (skipWrite) {
log.info("PopConsumerService change invisibility skip, time={}, " +
"groupId={}, topicId={}, queueId={}, offset={}", popTime, groupId, topicId, queueId, offset);
} else {
this.popConsumerStore.writeRecords(Collections.singletonList(ckRecord));
}

List<PopConsumerRecord> ckRecords = skipWrite ? Collections.emptyList() : Collections.singletonList(ckRecord);
List<PopConsumerRecord> ackRecords = Collections.singletonList(ackRecord);
if (brokerConfig.isEnablePopBufferMerge() && popConsumerCache != null) {
if (popConsumerCache.deleteRecords(Collections.singletonList(ackRecord)).isEmpty()) {
return;
popConsumerCache.writeAndDeleteRecords(ckRecords, ackRecords);
} else {
this.popConsumerStore.writeAndDeleteRecords(ckRecords, ackRecords);
}
}

public void batchChangeInvisibilityDuration(List<ChangeInvisibleTimeRequestEntry> changeRecords) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changeInvisibilityDuration can call batchChangeInvisibilityDuration

if (changeRecords == null || changeRecords.isEmpty()) {
return;
}

List<PopConsumerRecord> ckRecords = new ArrayList<>(changeRecords.size());
List<PopConsumerRecord> ackRecords = new ArrayList<>(changeRecords.size());
List<PopConsumerRecord> storeAckRecords = new ArrayList<>(changeRecords.size());

for (ChangeInvisibleTimeRequestEntry changeRecord : changeRecords) {
if (brokerConfig.isPopConsumerKVServiceLog()) {
log.info("PopConsumerService batch change, time={}, invisible={}, " +
"groupId={}, topic={}, queueId={}, offset={}, new time={}, new invisible={}",
changeRecord.getPopTime(), changeRecord.getOldInvisibleTime(), changeRecord.getConsumerGroup(),
changeRecord.getTopic(), changeRecord.getQueueId(), changeRecord.getOffset(),
changeRecord.getChangedPopTime(), changeRecord.getChangedInvisibleTime());
}

PopConsumerRecord ckRecord = new PopConsumerRecord(
changeRecord.getChangedPopTime(), changeRecord.getConsumerGroup(), changeRecord.getTopic(),
changeRecord.getQueueId(), 0, changeRecord.getChangedInvisibleTime(), changeRecord.getOffset(),
null, changeRecord.isSuspend());

PopConsumerRecord ackRecord = new PopConsumerRecord(
changeRecord.getPopTime(), changeRecord.getConsumerGroup(), changeRecord.getTopic(),
changeRecord.getQueueId(), 0, changeRecord.getOldInvisibleTime(), changeRecord.getOffset(),
null, changeRecord.isSuspend());

boolean skipWrite = brokerConfig.isPopReviveSkipIfGroupAbsent() &&
!brokerController.getSubscriptionGroupManager().containsSubscriptionGroup(changeRecord.getConsumerGroup());

if (skipWrite) {
log.info("PopConsumerService batch change invisibility skip, time={}, " +
"groupId={}, topicId={}, queueId={}, offset={}", changeRecord.getPopTime(),
changeRecord.getConsumerGroup(), changeRecord.getTopic(), changeRecord.getQueueId(),
changeRecord.getOffset());
} else {
ckRecords.add(ckRecord);
}

ackRecords.add(ackRecord);
if (skipWrite || ckRecord.getVisibilityTimeout() != ackRecord.getVisibilityTimeout()) {
storeAckRecords.add(ackRecord);
}
}

// If the new CK has the same key as the old CK (same visibilityTimeout),
// the write already overwrites the old record in RocksDB, skip delete
// to avoid removing the newly written record.
if (skipWrite || ckRecord.getVisibilityTimeout() != ackRecord.getVisibilityTimeout()) {
this.popConsumerStore.deleteRecords(Collections.singletonList(ackRecord));
if (brokerConfig.isEnablePopBufferMerge() && popConsumerCache != null) {
popConsumerCache.writeAndDeleteRecords(ckRecords, ackRecords);
} else {
this.popConsumerStore.writeAndDeleteRecords(ckRecords, storeAckRecords);
}
}

Expand Down
Loading
Loading