diff --git a/providers/jikkou-provider-kafka/src/main/java/io/jikkou/kafka/action/KafkaConsumerGroupsResetOffsets.java b/providers/jikkou-provider-kafka/src/main/java/io/jikkou/kafka/action/KafkaConsumerGroupsResetOffsets.java index c7fa60e5e..2b915ee41 100644 --- a/providers/jikkou-provider-kafka/src/main/java/io/jikkou/kafka/action/KafkaConsumerGroupsResetOffsets.java +++ b/providers/jikkou-provider-kafka/src/main/java/io/jikkou/kafka/action/KafkaConsumerGroupsResetOffsets.java @@ -73,7 +73,7 @@ interface Config { ConfigProperty> TOPIC = ConfigProperty.ofList("topic") .displayName("Topic") - .description("The topic whose partitions must be included in the reset-offset action."); + .description("Topics to include in the reset-offset action. Each entry can be either 'topic' (all partitions) or 'topic:partition' (a specific partition)."); ConfigProperty> INCLUDES = ConfigProperty .ofList("includes") diff --git a/providers/jikkou-provider-kafka/src/main/java/io/jikkou/kafka/reconciler/service/KafkaAdminService.java b/providers/jikkou-provider-kafka/src/main/java/io/jikkou/kafka/reconciler/service/KafkaAdminService.java index da67e1bea..62c5e0f8a 100644 --- a/providers/jikkou-provider-kafka/src/main/java/io/jikkou/kafka/reconciler/service/KafkaAdminService.java +++ b/providers/jikkou-provider-kafka/src/main/java/io/jikkou/kafka/reconciler/service/KafkaAdminService.java @@ -77,8 +77,9 @@ public V1KafkaConsumerGroup resetConsumerGroupOffsets(final @NotNull String grou resetConsumerGroupOffsets(groupId, topics, OffsetSpec.forTimestamp(spec.timestamp()), dryRun); // TO_OFFSETS case ToOffset spec -> { - // Get the partitions for the given topics. - CompletableFuture> future = listTopicPartitions(topics); + // Resolve the topic selectors to the target partitions. + CompletableFuture> future = + resolveTopicPartitions(parseSelectors(topics)); Map offsets = AsyncUtils.getValueOrThrowException(future, JikkouRuntimeException::new) .stream() .collect(Collectors.toMap(Function.identity(), unused -> new OffsetAndMetadata(spec.offset()))); @@ -166,16 +167,15 @@ private V1KafkaConsumerGroup alterConsumerGroupOffsets(@NotNull String groupId, public CompletableFuture> listOffsets(@NotNull final List topics, @NotNull final OffsetSpec offsetSpec) { - // Get the partitions for the given topics. - CompletableFuture> future = listTopicPartitions(topics); + // Resolve the topic selectors to the target partitions. + CompletableFuture> future = resolveTopicPartitions(parseSelectors(topics)); - // Gets EARLIEST offsets for each topic partitions. + // Gets offsets for each resolved topic-partition. return future.thenCompose(partitions -> { var partitionOffsets = partitions.stream() .collect(Collectors.toMap(Function.identity(), it -> offsetSpec)); return client.listOffsets(partitionOffsets).all().toCompletionStage(); }); - } public CompletableFuture> listTopicPartitions(@NotNull final List topics) { @@ -192,6 +192,40 @@ public CompletableFuture> listTopicPartitions(@NotNull fina .toCompletableFuture(); } + /** + * Resolves a list of {@link TopicPartitionSelector} to concrete {@link TopicPartition}s. + * Bare selectors (no partition) are expanded via a single {@code describeTopics} call; + * fully-qualified selectors are used directly without an admin round-trip. Duplicates + * are removed, preserving first-seen order for deterministic logging. + */ + CompletableFuture> resolveTopicPartitions(@NotNull List selectors) { + List bareTopics = selectors.stream() + .filter(TopicPartitionSelector::isAllPartitions) + .map(TopicPartitionSelector::topic) + .distinct() + .toList(); + + List explicit = selectors.stream() + .filter(s -> !s.isAllPartitions()) + .map(s -> new TopicPartition(s.topic(), s.partition().getAsInt())) + .toList(); + + CompletableFuture> expanded = bareTopics.isEmpty() + ? CompletableFuture.completedFuture(List.of()) + : listTopicPartitions(bareTopics); + + return expanded.thenApply(expandedPartitions -> { + // LinkedHashSet preserves insertion order and removes duplicates. + LinkedHashSet union = new LinkedHashSet<>(expandedPartitions); + union.addAll(explicit); + return List.copyOf(union); + }); + } + + private static List parseSelectors(@NotNull List rawTopics) { + return rawTopics.stream().map(TopicPartitionSelector::parse).toList(); + } + /** * Lists all consumer groups for the specified states. * diff --git a/providers/jikkou-provider-kafka/src/main/java/io/jikkou/kafka/reconciler/service/TopicPartitionSelector.java b/providers/jikkou-provider-kafka/src/main/java/io/jikkou/kafka/reconciler/service/TopicPartitionSelector.java new file mode 100644 index 000000000..a385326d9 --- /dev/null +++ b/providers/jikkou-provider-kafka/src/main/java/io/jikkou/kafka/reconciler/service/TopicPartitionSelector.java @@ -0,0 +1,78 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright (c) The original authors + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.jikkou.kafka.reconciler.service; + +import java.util.Objects; +import java.util.OptionalInt; +import org.jetbrains.annotations.NotNull; + +/** + * A selector for either a whole topic (all partitions) or a single topic-partition. + * Parses the {@code topic} or {@code topic:partition} syntax used by + * {@code kafka-consumer-groups.sh}. + * + * @param topic the topic name, never {@code null} or empty. + * @param partition the partition; {@link OptionalInt#empty()} means "all partitions of the topic". + */ +public record TopicPartitionSelector(@NotNull String topic, @NotNull OptionalInt partition) { + + public TopicPartitionSelector { + Objects.requireNonNull(topic, "topic cannot be null"); + Objects.requireNonNull(partition, "partition cannot be null"); + if (topic.isEmpty()) { + throw new IllegalArgumentException("topic cannot be empty"); + } + } + + /** + * Parses a {@code topic} or {@code topic:partition} string. + * + * @param raw the raw input. + * @return the parsed selector. + * @throws IllegalArgumentException if the input is null, empty, or malformed. + */ + public static TopicPartitionSelector parse(String raw) { + if (raw == null || raw.isEmpty()) { + throw new IllegalArgumentException("topic selector cannot be null or empty"); + } + int idx = raw.indexOf(':'); + if (idx < 0) { + return new TopicPartitionSelector(raw, OptionalInt.empty()); + } + // Reject more than one ':' (kafka topic names never contain ':'). + if (raw.indexOf(':', idx + 1) >= 0) { + throw new IllegalArgumentException( + "Invalid topic selector '" + raw + "': expected 'topic' or 'topic:partition'"); + } + String topic = raw.substring(0, idx); + String partitionPart = raw.substring(idx + 1); + if (topic.isEmpty()) { + throw new IllegalArgumentException( + "Invalid topic selector '" + raw + "': topic name cannot be empty"); + } + if (partitionPart.isEmpty()) { + throw new IllegalArgumentException( + "Invalid topic selector '" + raw + "': partition cannot be empty"); + } + final int partition; + try { + partition = Integer.parseInt(partitionPart); + } catch (NumberFormatException e) { + throw new IllegalArgumentException( + "Invalid topic selector '" + raw + "': partition '" + partitionPart + "' is not an integer"); + } + if (partition < 0) { + throw new IllegalArgumentException( + "Invalid topic selector '" + raw + "': partition must be non-negative"); + } + return new TopicPartitionSelector(topic, OptionalInt.of(partition)); + } + + public boolean isAllPartitions() { + return partition.isEmpty(); + } +} diff --git a/providers/jikkou-provider-kafka/src/test/java/io/jikkou/kafka/reconciler/service/KafkaAdminServiceTest.java b/providers/jikkou-provider-kafka/src/test/java/io/jikkou/kafka/reconciler/service/KafkaAdminServiceTest.java index e27d86378..602eaeb7c 100644 --- a/providers/jikkou-provider-kafka/src/test/java/io/jikkou/kafka/reconciler/service/KafkaAdminServiceTest.java +++ b/providers/jikkou-provider-kafka/src/test/java/io/jikkou/kafka/reconciler/service/KafkaAdminServiceTest.java @@ -11,17 +11,31 @@ import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.anyCollection; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import io.jikkou.core.exceptions.JikkouRuntimeException; import io.jikkou.kafka.collections.V1KafkaConsumerGroupList; +import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.OptionalInt; +import java.util.Set; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.DescribeConsumerGroupsResult; +import org.apache.kafka.clients.admin.DescribeTopicsResult; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.TopicPartitionInfo; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; import reactor.core.publisher.Mono; class KafkaAdminServiceTest { @@ -127,4 +141,102 @@ void shouldReturnSameResultFromSyncAndAsyncVariants() { // Then assertEquals(sync.getItems(), async.getItems()); } + + @Test + void shouldExpandBareTopicsViaDescribeTopics() throws Exception { + AdminClient admin = mock(AdminClient.class); + DescribeTopicsResult result = mock(DescribeTopicsResult.class); + + Node node = new Node(0, "localhost", 9092); + TopicDescription description = new TopicDescription( + "my-topic", + false, + List.of( + new TopicPartitionInfo(0, node, List.of(node), List.of(node)), + new TopicPartitionInfo(1, node, List.of(node), List.of(node)) + ) + ); + Map byName = Map.of("my-topic", description); + when(result.allTopicNames()).thenReturn(KafkaFuture.completedFuture(byName)); + when(admin.describeTopics(anyCollection())).thenReturn(result); + + KafkaAdminService service = new KafkaAdminService(admin); + + List resolved = service.resolveTopicPartitions( + List.of(new TopicPartitionSelector("my-topic", OptionalInt.empty())) + ).get(); + + assertEquals( + Set.of(new TopicPartition("my-topic", 0), new TopicPartition("my-topic", 1)), + Set.copyOf(resolved) + ); + verify(admin, times(1)).describeTopics(anyCollection()); + } + + @Test + void shouldUseSpecificPartitionsDirectlyWithoutDescribingTopics() throws Exception { + AdminClient admin = mock(AdminClient.class); + KafkaAdminService service = new KafkaAdminService(admin); + + List resolved = service.resolveTopicPartitions( + List.of( + new TopicPartitionSelector("other-topic", OptionalInt.of(5)), + new TopicPartitionSelector("other-topic", OptionalInt.of(7)) + ) + ).get(); + + assertEquals( + Set.of(new TopicPartition("other-topic", 5), new TopicPartition("other-topic", 7)), + Set.copyOf(resolved) + ); + verify(admin, never()).describeTopics(anyCollection()); + } + + @Test + @SuppressWarnings("unchecked") + void shouldUnionBareAndSpecificEntriesAndDedupe() throws Exception { + AdminClient admin = mock(AdminClient.class); + DescribeTopicsResult result = mock(DescribeTopicsResult.class); + + Node node = new Node(0, "localhost", 9092); + TopicDescription bareDescription = new TopicDescription( + "bare-topic", + false, + List.of( + new TopicPartitionInfo(0, node, List.of(node), List.of(node)), + new TopicPartitionInfo(1, node, List.of(node), List.of(node)) + ) + ); + when(result.allTopicNames()).thenReturn( + KafkaFuture.completedFuture(Map.of("bare-topic", bareDescription)) + ); + when(admin.describeTopics(anyCollection())).thenReturn(result); + + KafkaAdminService service = new KafkaAdminService(admin); + + // bare-topic expands to (bare-topic, 0), (bare-topic, 1) + // typed entry adds (bare-topic, 1) — already present, must dedupe to one + // typed entry adds (other-topic, 3) + List resolved = service.resolveTopicPartitions( + List.of( + new TopicPartitionSelector("bare-topic", OptionalInt.empty()), + new TopicPartitionSelector("bare-topic", OptionalInt.of(1)), + new TopicPartitionSelector("other-topic", OptionalInt.of(3)) + ) + ).get(); + + assertEquals( + Set.of( + new TopicPartition("bare-topic", 0), + new TopicPartition("bare-topic", 1), + new TopicPartition("other-topic", 3) + ), + Set.copyOf(resolved) + ); + + // exactly one describeTopics call, only for the bare topic + ArgumentCaptor> captor = ArgumentCaptor.forClass(Collection.class); + verify(admin, times(1)).describeTopics(captor.capture()); + assertEquals(List.of("bare-topic"), List.copyOf(captor.getValue())); + } } diff --git a/providers/jikkou-provider-kafka/src/test/java/io/jikkou/kafka/reconciler/service/TopicPartitionSelectorTest.java b/providers/jikkou-provider-kafka/src/test/java/io/jikkou/kafka/reconciler/service/TopicPartitionSelectorTest.java new file mode 100644 index 000000000..656d33e24 --- /dev/null +++ b/providers/jikkou-provider-kafka/src/test/java/io/jikkou/kafka/reconciler/service/TopicPartitionSelectorTest.java @@ -0,0 +1,65 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright (c) The original authors + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.jikkou.kafka.reconciler.service; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.OptionalInt; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +class TopicPartitionSelectorTest { + + @Test + void shouldParseBareTopicAsAllPartitions() { + TopicPartitionSelector selector = TopicPartitionSelector.parse("my-topic"); + + assertEquals("my-topic", selector.topic()); + assertEquals(OptionalInt.empty(), selector.partition()); + assertTrue(selector.isAllPartitions()); + } + + @Test + void shouldParseTopicWithPartitionZero() { + TopicPartitionSelector selector = TopicPartitionSelector.parse("my-topic:0"); + + assertEquals("my-topic", selector.topic()); + assertEquals(OptionalInt.of(0), selector.partition()); + assertFalse(selector.isAllPartitions()); + } + + @Test + void shouldParseTopicWithPositivePartition() { + TopicPartitionSelector selector = TopicPartitionSelector.parse("my-topic:42"); + + assertEquals("my-topic", selector.topic()); + assertEquals(OptionalInt.of(42), selector.partition()); + assertFalse(selector.isAllPartitions()); + } + + @ParameterizedTest + @ValueSource(strings = { + "", // empty + ":3", // empty topic + "my-topic:", // empty partition + "my-topic:abc",// non-integer partition + "my-topic:-1", // negative partition + "my-topic:1:2" // multiple colons + }) + void shouldThrowForMalformedInput(String raw) { + assertThrows(IllegalArgumentException.class, () -> TopicPartitionSelector.parse(raw)); + } + + @Test + void shouldThrowForNullInput() { + assertThrows(IllegalArgumentException.class, () -> TopicPartitionSelector.parse(null)); + } +}