Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ interface Config {

ConfigProperty<List<String>> TOPIC = ConfigProperty.ofList("topic")
.displayName("Topic")
.description("The topic whose partitions must be included in the reset-offset action.");
.description("Topics to include in the reset-offset action. Each entry can be either 'topic' (all partitions) or 'topic:partition' (a specific partition).");

ConfigProperty<List<String>> INCLUDES = ConfigProperty
.ofList("includes")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,9 @@ public V1KafkaConsumerGroup resetConsumerGroupOffsets(final @NotNull String grou
resetConsumerGroupOffsets(groupId, topics, OffsetSpec.forTimestamp(spec.timestamp()), dryRun);
// TO_OFFSETS
case ToOffset spec -> {
// Get the partitions for the given topics.
CompletableFuture<List<TopicPartition>> future = listTopicPartitions(topics);
// Resolve the topic selectors to the target partitions.
CompletableFuture<List<TopicPartition>> future =
resolveTopicPartitions(parseSelectors(topics));
Map<TopicPartition, OffsetAndMetadata> offsets = AsyncUtils.getValueOrThrowException(future, JikkouRuntimeException::new)
.stream()
.collect(Collectors.toMap(Function.identity(), unused -> new OffsetAndMetadata(spec.offset())));
Expand Down Expand Up @@ -166,16 +167,15 @@ private V1KafkaConsumerGroup alterConsumerGroupOffsets(@NotNull String groupId,

public CompletableFuture<Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo>> listOffsets(@NotNull final List<String> topics,
@NotNull final OffsetSpec offsetSpec) {
// Get the partitions for the given topics.
CompletableFuture<List<TopicPartition>> future = listTopicPartitions(topics);
// Resolve the topic selectors to the target partitions.
CompletableFuture<List<TopicPartition>> future = resolveTopicPartitions(parseSelectors(topics));

// Gets EARLIEST offsets for each topic partitions.
// Gets offsets for each resolved topic-partition.
return future.thenCompose(partitions -> {
var partitionOffsets = partitions.stream()
.collect(Collectors.toMap(Function.identity(), it -> offsetSpec));
return client.listOffsets(partitionOffsets).all().toCompletionStage();
});

}

public CompletableFuture<List<TopicPartition>> listTopicPartitions(@NotNull final List<String> topics) {
Expand All @@ -192,6 +192,40 @@ public CompletableFuture<List<TopicPartition>> listTopicPartitions(@NotNull fina
.toCompletableFuture();
}

/**
* Resolves a list of {@link TopicPartitionSelector} to concrete {@link TopicPartition}s.
* Bare selectors (no partition) are expanded via a single {@code describeTopics} call;
* fully-qualified selectors are used directly without an admin round-trip. Duplicates
* are removed, preserving first-seen order for deterministic logging.
*/
CompletableFuture<List<TopicPartition>> resolveTopicPartitions(@NotNull List<TopicPartitionSelector> selectors) {
List<String> bareTopics = selectors.stream()
.filter(TopicPartitionSelector::isAllPartitions)
.map(TopicPartitionSelector::topic)
.distinct()
.toList();

List<TopicPartition> explicit = selectors.stream()
.filter(s -> !s.isAllPartitions())
.map(s -> new TopicPartition(s.topic(), s.partition().getAsInt()))
.toList();

CompletableFuture<List<TopicPartition>> expanded = bareTopics.isEmpty()
? CompletableFuture.completedFuture(List.of())
: listTopicPartitions(bareTopics);

return expanded.thenApply(expandedPartitions -> {
// LinkedHashSet preserves insertion order and removes duplicates.
LinkedHashSet<TopicPartition> union = new LinkedHashSet<>(expandedPartitions);
union.addAll(explicit);
return List.copyOf(union);
});
}

private static List<TopicPartitionSelector> parseSelectors(@NotNull List<String> rawTopics) {
return rawTopics.stream().map(TopicPartitionSelector::parse).toList();
}

/**
* Lists all consumer groups for the specified states.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright (c) The original authors
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.jikkou.kafka.reconciler.service;

import java.util.Objects;
import java.util.OptionalInt;
import org.jetbrains.annotations.NotNull;

/**
* A selector for either a whole topic (all partitions) or a single topic-partition.
* Parses the {@code topic} or {@code topic:partition} syntax used by
* {@code kafka-consumer-groups.sh}.
*
* @param topic the topic name, never {@code null} or empty.
* @param partition the partition; {@link OptionalInt#empty()} means "all partitions of the topic".
*/
public record TopicPartitionSelector(@NotNull String topic, @NotNull OptionalInt partition) {

public TopicPartitionSelector {
Objects.requireNonNull(topic, "topic cannot be null");
Objects.requireNonNull(partition, "partition cannot be null");
if (topic.isEmpty()) {
throw new IllegalArgumentException("topic cannot be empty");
}
}

/**
* Parses a {@code topic} or {@code topic:partition} string.
*
* @param raw the raw input.
* @return the parsed selector.
* @throws IllegalArgumentException if the input is null, empty, or malformed.
*/
public static TopicPartitionSelector parse(String raw) {
if (raw == null || raw.isEmpty()) {
throw new IllegalArgumentException("topic selector cannot be null or empty");
}
int idx = raw.indexOf(':');
if (idx < 0) {
return new TopicPartitionSelector(raw, OptionalInt.empty());
}
// Reject more than one ':' (kafka topic names never contain ':').
if (raw.indexOf(':', idx + 1) >= 0) {
throw new IllegalArgumentException(
"Invalid topic selector '" + raw + "': expected 'topic' or 'topic:partition'");
}
String topic = raw.substring(0, idx);
String partitionPart = raw.substring(idx + 1);
if (topic.isEmpty()) {
throw new IllegalArgumentException(
"Invalid topic selector '" + raw + "': topic name cannot be empty");
}
if (partitionPart.isEmpty()) {
throw new IllegalArgumentException(
"Invalid topic selector '" + raw + "': partition cannot be empty");
}
final int partition;
try {
partition = Integer.parseInt(partitionPart);
} catch (NumberFormatException e) {

Check warning on line 64 in providers/jikkou-provider-kafka/src/main/java/io/jikkou/kafka/reconciler/service/TopicPartitionSelector.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Replace "e" with an unnamed pattern.

See more on https://sonarcloud.io/project/issues?id=streamthoughts_jikkou&issues=AZ5jyQc6sSTOk-vzgula&open=AZ5jyQc6sSTOk-vzgula&pullRequest=772
throw new IllegalArgumentException(
"Invalid topic selector '" + raw + "': partition '" + partitionPart + "' is not an integer");
}
if (partition < 0) {
throw new IllegalArgumentException(
"Invalid topic selector '" + raw + "': partition must be non-negative");
}
return new TopicPartitionSelector(topic, OptionalInt.of(partition));
}

public boolean isAllPartitions() {
return partition.isEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,31 @@
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.anyCollection;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import io.jikkou.core.exceptions.JikkouRuntimeException;
import io.jikkou.kafka.collections.V1KafkaConsumerGroupList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.OptionalInt;
import java.util.Set;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.DescribeConsumerGroupsResult;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import reactor.core.publisher.Mono;

class KafkaAdminServiceTest {
Expand Down Expand Up @@ -127,4 +141,102 @@
// Then
assertEquals(sync.getItems(), async.getItems());
}

@Test
void shouldExpandBareTopicsViaDescribeTopics() throws Exception {
AdminClient admin = mock(AdminClient.class);
DescribeTopicsResult result = mock(DescribeTopicsResult.class);

Node node = new Node(0, "localhost", 9092);
TopicDescription description = new TopicDescription(
"my-topic",
false,
List.of(
new TopicPartitionInfo(0, node, List.of(node), List.of(node)),
new TopicPartitionInfo(1, node, List.of(node), List.of(node))
)
);
Map<String, TopicDescription> byName = Map.of("my-topic", description);
when(result.allTopicNames()).thenReturn(KafkaFuture.completedFuture(byName));
when(admin.describeTopics(anyCollection())).thenReturn(result);

KafkaAdminService service = new KafkaAdminService(admin);

Check warning on line 163 in providers/jikkou-provider-kafka/src/test/java/io/jikkou/kafka/reconciler/service/KafkaAdminServiceTest.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Rename "service" which hides the field declared at line 44.

See more on https://sonarcloud.io/project/issues?id=streamthoughts_jikkou&issues=AZ545OWJ47GWV_L6fWen&open=AZ545OWJ47GWV_L6fWen&pullRequest=772

List<TopicPartition> resolved = service.resolveTopicPartitions(
List.of(new TopicPartitionSelector("my-topic", OptionalInt.empty()))
).get();

assertEquals(
Set.of(new TopicPartition("my-topic", 0), new TopicPartition("my-topic", 1)),
Set.copyOf(resolved)
);
verify(admin, times(1)).describeTopics(anyCollection());
}

@Test
void shouldUseSpecificPartitionsDirectlyWithoutDescribingTopics() throws Exception {
AdminClient admin = mock(AdminClient.class);
KafkaAdminService service = new KafkaAdminService(admin);

Check warning on line 179 in providers/jikkou-provider-kafka/src/test/java/io/jikkou/kafka/reconciler/service/KafkaAdminServiceTest.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Rename "service" which hides the field declared at line 44.

See more on https://sonarcloud.io/project/issues?id=streamthoughts_jikkou&issues=AZ545OWJ47GWV_L6fWeo&open=AZ545OWJ47GWV_L6fWeo&pullRequest=772

List<TopicPartition> resolved = service.resolveTopicPartitions(
List.of(
new TopicPartitionSelector("other-topic", OptionalInt.of(5)),
new TopicPartitionSelector("other-topic", OptionalInt.of(7))
)
).get();

assertEquals(
Set.of(new TopicPartition("other-topic", 5), new TopicPartition("other-topic", 7)),
Set.copyOf(resolved)
);
verify(admin, never()).describeTopics(anyCollection());
}

@Test
@SuppressWarnings("unchecked")
void shouldUnionBareAndSpecificEntriesAndDedupe() throws Exception {
AdminClient admin = mock(AdminClient.class);
DescribeTopicsResult result = mock(DescribeTopicsResult.class);

Node node = new Node(0, "localhost", 9092);
TopicDescription bareDescription = new TopicDescription(
"bare-topic",
false,
List.of(
new TopicPartitionInfo(0, node, List.of(node), List.of(node)),
new TopicPartitionInfo(1, node, List.of(node), List.of(node))
)
);
when(result.allTopicNames()).thenReturn(
KafkaFuture.completedFuture(Map.of("bare-topic", bareDescription))
);
when(admin.describeTopics(anyCollection())).thenReturn(result);

KafkaAdminService service = new KafkaAdminService(admin);

Check warning on line 215 in providers/jikkou-provider-kafka/src/test/java/io/jikkou/kafka/reconciler/service/KafkaAdminServiceTest.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Rename "service" which hides the field declared at line 44.

See more on https://sonarcloud.io/project/issues?id=streamthoughts_jikkou&issues=AZ545OWJ47GWV_L6fWep&open=AZ545OWJ47GWV_L6fWep&pullRequest=772

// bare-topic expands to (bare-topic, 0), (bare-topic, 1)
// typed entry adds (bare-topic, 1) — already present, must dedupe to one
// typed entry adds (other-topic, 3)
List<TopicPartition> resolved = service.resolveTopicPartitions(
List.of(
new TopicPartitionSelector("bare-topic", OptionalInt.empty()),
new TopicPartitionSelector("bare-topic", OptionalInt.of(1)),
new TopicPartitionSelector("other-topic", OptionalInt.of(3))
)
).get();

assertEquals(
Set.of(
new TopicPartition("bare-topic", 0),
new TopicPartition("bare-topic", 1),
new TopicPartition("other-topic", 3)
),
Set.copyOf(resolved)
);

// exactly one describeTopics call, only for the bare topic
ArgumentCaptor<Collection<String>> captor = ArgumentCaptor.forClass(Collection.class);
verify(admin, times(1)).describeTopics(captor.capture());
assertEquals(List.of("bare-topic"), List.copyOf(captor.getValue()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright (c) The original authors
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.jikkou.kafka.reconciler.service;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.OptionalInt;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

class TopicPartitionSelectorTest {

@Test
void shouldParseBareTopicAsAllPartitions() {
TopicPartitionSelector selector = TopicPartitionSelector.parse("my-topic");

assertEquals("my-topic", selector.topic());
assertEquals(OptionalInt.empty(), selector.partition());
assertTrue(selector.isAllPartitions());
}

@Test
void shouldParseTopicWithPartitionZero() {
TopicPartitionSelector selector = TopicPartitionSelector.parse("my-topic:0");

assertEquals("my-topic", selector.topic());
assertEquals(OptionalInt.of(0), selector.partition());
assertFalse(selector.isAllPartitions());
}

@Test
void shouldParseTopicWithPositivePartition() {
TopicPartitionSelector selector = TopicPartitionSelector.parse("my-topic:42");

assertEquals("my-topic", selector.topic());
assertEquals(OptionalInt.of(42), selector.partition());
assertFalse(selector.isAllPartitions());
}

@ParameterizedTest
@ValueSource(strings = {
"", // empty
":3", // empty topic
"my-topic:", // empty partition
"my-topic:abc",// non-integer partition
"my-topic:-1", // negative partition
"my-topic:1:2" // multiple colons
})
void shouldThrowForMalformedInput(String raw) {
assertThrows(IllegalArgumentException.class, () -> TopicPartitionSelector.parse(raw));
}

@Test
void shouldThrowForNullInput() {
assertThrows(IllegalArgumentException.class, () -> TopicPartitionSelector.parse(null));
}
}
Loading