Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.broker.client.ClientChannelInfo;
import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
import org.apache.rocketmq.broker.client.ConsumerGroupEvent;
import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener;
import org.apache.rocketmq.broker.client.ProducerChangeListener;
Expand Down Expand Up @@ -440,7 +441,7 @@ protected GrpcClientChannel registerConsumer(ProxyContext ctx, String consumerGr
this.buildConsumeType(clientType),
this.buildMessageModel(clientType),
ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET,
this.buildSubscriptionDataSet(subscriptionEntryList),
this.buildSubscriptionDataSet(ctx, consumerGroup, subscriptionEntryList),
updateSubscription
);
return channel;
Expand Down Expand Up @@ -532,16 +533,38 @@ protected MessageModel buildMessageModel(ClientType clientType) {
return MessageModel.CLUSTERING;
}

protected Set<SubscriptionData> buildSubscriptionDataSet(List<SubscriptionEntry> subscriptionEntryList) {
protected Set<SubscriptionData> buildSubscriptionDataSet(ProxyContext ctx, String consumerGroup,
List<SubscriptionEntry> subscriptionEntryList) {
Set<SubscriptionData> subscriptionDataSet = new HashSet<>();
ConsumerGroupInfo consumerGroupInfo = this.messagingProcessor.getConsumerGroupInfo(ctx, consumerGroup);
for (SubscriptionEntry sub : subscriptionEntryList) {
String topicName = sub.getTopic().getName();
FilterExpression filterExpression = sub.getExpression();
subscriptionDataSet.add(buildSubscriptionData(topicName, filterExpression));
SubscriptionData subscriptionData = buildSubscriptionData(topicName, filterExpression);
reuseSubscriptionVersion(consumerGroupInfo, subscriptionData);
subscriptionDataSet.add(subscriptionData);
}
return subscriptionDataSet;
}

protected void reuseSubscriptionVersion(ConsumerGroupInfo consumerGroupInfo, SubscriptionData subscriptionData) {
if (consumerGroupInfo == null) {
return;
}
SubscriptionData oldSubscriptionData = consumerGroupInfo.findSubscriptionData(subscriptionData.getTopic());
if (oldSubscriptionData == null) {
return;
}
// FilterAPI.build creates a fresh subVersion every time gRPC settings rebuild the
// subscription. Normalize the version before equals so unchanged subscriptions do
// not look like real updates; restore it below when the subscription content differs.
long subVersion = subscriptionData.getSubVersion();
subscriptionData.setSubVersion(oldSubscriptionData.getSubVersion());
if (!oldSubscriptionData.equals(subscriptionData)) {
subscriptionData.setSubVersion(subVersion);
}
}

protected SubscriptionData buildSubscriptionData(String topicName, FilterExpression filterExpression) {
String expression = filterExpression.getExpression();
String expressionType = GrpcConverter.getInstance().buildExpressionType(filterExpression.getType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.rocketmq.broker.client.ClientChannelInfo;
import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
import org.apache.rocketmq.common.attribute.TopicMessageType;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.lite.LiteSubscriptionDTO;
import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.grpc.v2.BaseActivityTest;
Expand All @@ -58,6 +60,8 @@
import org.apache.rocketmq.remoting.protocol.body.CMResult;
import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
import org.assertj.core.util.Lists;
import org.junit.Before;
Expand Down Expand Up @@ -212,6 +216,72 @@ public void testConsumerHeartbeat() throws Throwable {
assertEquals("tag", data.getSubString());
}

@Test
public void testConsumerHeartbeatReuseSubVersionForSameSubscription() throws Throwable {
ProxyContext context = createContext();
when(grpcClientSettingsManager.getClientSettings(any())).thenReturn(buildConsumerSettings("tag"));
mockConsumerGroupInfo(buildTagSubscriptionData("tag", 123L));

ArgumentCaptor<Set<SubscriptionData>> subscriptionDatasArgumentCaptor = ArgumentCaptor.forClass(Set.class);
doNothing().when(this.messagingProcessor).registerConsumer(any(), anyString(), any(), any(), any(), any(),
subscriptionDatasArgumentCaptor.capture(), anyBoolean());

HeartbeatResponse response = this.sendConsumerHeartbeat(context);

assertEquals(Code.OK, response.getStatus().getCode());
SubscriptionData data = subscriptionDatasArgumentCaptor.getValue().stream().findAny().get();
assertThat(data.getSubVersion()).isEqualTo(123L);
}

@Test
public void testConsumerHeartbeatUseNewSubVersionWhenSubscriptionChanged() throws Throwable {
ProxyContext context = createContext();
when(grpcClientSettingsManager.getClientSettings(any())).thenReturn(buildConsumerSettings("tagB"));
mockConsumerGroupInfo(buildTagSubscriptionData("tagA", 123L));

ArgumentCaptor<Set<SubscriptionData>> subscriptionDatasArgumentCaptor = ArgumentCaptor.forClass(Set.class);
doNothing().when(this.messagingProcessor).registerConsumer(any(), anyString(), any(), any(), any(), any(),
subscriptionDatasArgumentCaptor.capture(), anyBoolean());

HeartbeatResponse response = this.sendConsumerHeartbeat(context);

assertEquals(Code.OK, response.getStatus().getCode());
SubscriptionData data = subscriptionDatasArgumentCaptor.getValue().stream().findAny().get();
assertThat(data.getSubVersion()).isGreaterThan(123L);
assertThat(data.getSubString()).isEqualTo("tagB");
}

private Settings buildConsumerSettings(String tag) {
return Settings.newBuilder()
.setClientType(ClientType.PUSH_CONSUMER)
.setSubscription(Subscription.newBuilder()
.setGroup(Resource.newBuilder().setName(CONSUMER_GROUP).build())
.addSubscriptions(SubscriptionEntry.newBuilder()
.setExpression(FilterExpression.newBuilder()
.setExpression(tag)
.setType(FilterType.TAG)
.build())
.setTopic(Resource.newBuilder().setName(TOPIC).build())
.build())
.build())
.build();
}

private void mockConsumerGroupInfo(SubscriptionData subscriptionData) {
ConsumerGroupInfo consumerGroupInfo = new ConsumerGroupInfo(CONSUMER_GROUP, ConsumeType.CONSUME_PASSIVELY,
MessageModel.CLUSTERING, ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumerGroupInfo.getSubscriptionTable().put(TOPIC, subscriptionData);
when(this.messagingProcessor.getConsumerGroupInfo(any(), anyString())).thenReturn(consumerGroupInfo);
}

private SubscriptionData buildTagSubscriptionData(String tag, long subVersion) {
SubscriptionData subscriptionData = new SubscriptionData(TOPIC, tag);
subscriptionData.getTagsSet().add(tag);
subscriptionData.getCodeSet().add(tag.hashCode());
subscriptionData.setSubVersion(subVersion);
return subscriptionData;
}

protected void assertClientChannelInfo(ClientChannelInfo clientChannelInfo, String group) {
assertEquals(LanguageCode.JAVA, clientChannelInfo.getLanguage());
assertEquals(CLIENT_ID, clientChannelInfo.getClientId());
Expand Down
Loading