From 8b7f5747aa321c8561eb2f53c7822b78324fd3a8 Mon Sep 17 00:00:00 2001 From: lea konvalinka Date: Mon, 10 Nov 2025 15:08:51 +0100 Subject: [PATCH 01/17] fix(flagd): no retry for certain error codes, implement test steps Signed-off-by: lea konvalinka --- .../contrib/providers/flagd/FlagdOptions.java | 9 +++++++++ .../connector/sync/SyncStreamQueueSource.java | 18 +++++++++++++++--- .../flagd/e2e/steps/ProviderSteps.java | 14 ++++++++++++++ .../providers/flagd/e2e/steps/Utils.java | 3 +++ .../flagd/e2e/steps/config/ConfigSteps.java | 1 + providers/flagd/test-harness | 2 +- 6 files changed, 43 insertions(+), 4 deletions(-) diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdOptions.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdOptions.java index a2463a946..9b5bb61a5 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdOptions.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdOptions.java @@ -10,6 +10,7 @@ import io.grpc.ClientInterceptor; import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.api.OpenTelemetry; +import java.util.ArrayList; import java.util.List; import java.util.function.Function; import lombok.Builder; @@ -122,6 +123,14 @@ public class FlagdOptions { @Builder.Default private int retryGracePeriod = fallBackToEnvOrDefault(Config.STREAM_RETRY_GRACE_PERIOD, Config.DEFAULT_STREAM_RETRY_GRACE_PERIOD); + + /** + * List of grpc response status codes for which failed connections are not retried. + * Defaults to empty list + */ + @Builder.Default + private List nonRetryableStatusCodes = new ArrayList<>(); + /** * Selector to be used with flag sync gRPC contract. **/ diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java index 196ab77a6..0cc787db4 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java @@ -16,10 +16,12 @@ import dev.openfeature.flagd.grpc.sync.Sync.SyncFlagsRequest; import dev.openfeature.flagd.grpc.sync.Sync.SyncFlagsResponse; import dev.openfeature.sdk.Awaitable; +import dev.openfeature.sdk.exceptions.FatalError; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.grpc.Status; import io.grpc.StatusRuntimeException; import io.grpc.stub.StreamObserver; +import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -49,6 +51,7 @@ public class SyncStreamQueueSource implements QueueSource { private final BlockingQueue outgoingQueue = new LinkedBlockingQueue<>(QUEUE_SIZE); private final FlagSyncServiceStub flagSyncStub; private final FlagSyncServiceBlockingStub metadataStub; + private final List nonRetryableStatusCodes; /** * Creates a new SyncStreamQueueSource responsible for observing the event stream. @@ -65,6 +68,7 @@ public SyncStreamQueueSource(final FlagdOptions options, Consumer Date: Fri, 14 Nov 2025 12:14:38 +0100 Subject: [PATCH 02/17] attempt to handle fatal error Signed-off-by: lea konvalinka --- .../contrib/providers/flagd/FlagdOptions.java | 2 +- .../providers/flagd/FlagdProvider.java | 27 +++++++----- .../resolver/process/InProcessResolver.java | 3 ++ .../resolver/process/storage/FlagStore.java | 5 +++ .../storage/connector/QueuePayloadType.java | 3 +- .../connector/sync/SyncStreamQueueSource.java | 42 ++++++++++++++----- .../flagd/e2e/steps/ProviderSteps.java | 2 +- .../flagd/e2e/steps/config/ConfigSteps.java | 1 - 8 files changed, 60 insertions(+), 25 deletions(-) diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdOptions.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdOptions.java index 9b5bb61a5..993f55bdd 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdOptions.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdOptions.java @@ -129,7 +129,7 @@ public class FlagdOptions { * Defaults to empty list */ @Builder.Default - private List nonRetryableStatusCodes = new ArrayList<>(); + private List fatalStatusCodes = new ArrayList<>(); /** * Selector to be used with flag sync gRPC contract. diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java index 4ce6e06ee..082f5a59e 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java @@ -5,6 +5,7 @@ import dev.openfeature.contrib.providers.flagd.resolver.process.InProcessResolver; import dev.openfeature.contrib.providers.flagd.resolver.rpc.RpcResolver; import dev.openfeature.contrib.providers.flagd.resolver.rpc.cache.Cache; +import dev.openfeature.sdk.ErrorCode; import dev.openfeature.sdk.EvaluationContext; import dev.openfeature.sdk.EventProvider; import dev.openfeature.sdk.Hook; @@ -135,7 +136,7 @@ public void initialize(EvaluationContext evaluationContext) throws Exception { public void shutdown() { synchronized (syncResources) { try { - if (!syncResources.isInitialized() || syncResources.isShutDown()) { + if (syncResources.isShutDown()) { return; } @@ -193,7 +194,7 @@ EvaluationContext getEnrichedContext() { @SuppressWarnings("checkstyle:fallthrough") private void onProviderEvent(FlagdProviderEvent flagdProviderEvent) { - log.debug("FlagdProviderEvent event {} ", flagdProviderEvent.getEvent()); + log.info("FlagdProviderEvent event {} ", flagdProviderEvent.getEvent()); synchronized (syncResources) { /* * We only use Error and Ready as previous states. @@ -222,20 +223,26 @@ private void onProviderEvent(FlagdProviderEvent flagdProviderEvent) { onReady(); syncResources.setPreviousEvent(ProviderEvent.PROVIDER_READY); break; - - case PROVIDER_ERROR: - if (syncResources.getPreviousEvent() != ProviderEvent.PROVIDER_ERROR) { - onError(); - syncResources.setPreviousEvent(ProviderEvent.PROVIDER_ERROR); + case PROVIDER_STALE: + if (syncResources.getPreviousEvent() != ProviderEvent.PROVIDER_STALE) { + onStale(); + syncResources.setPreviousEvent(ProviderEvent.PROVIDER_STALE); } break; - + case PROVIDER_ERROR: + onError(); + break; default: log.warn("Unknown event {}", flagdProviderEvent.getEvent()); } } } + private void onError() { + this.emitProviderError(ProviderEventDetails.builder().errorCode(ErrorCode.PROVIDER_FATAL).build()); + shutdown(); + } + private void onConfigurationChanged(FlagdProviderEvent flagdProviderEvent) { this.emitProviderConfigurationChanged(ProviderEventDetails.builder() .flagsChanged(flagdProviderEvent.getFlagsChanged()) @@ -255,7 +262,7 @@ private void onReady() { ProviderEventDetails.builder().message("connected to flagd").build()); } - private void onError() { + private void onStale() { log.debug( "Stream error. Emitting STALE, scheduling ERROR, and waiting {}s for connection to become available.", gracePeriod); @@ -270,7 +277,7 @@ private void onError() { if (!errorExecutor.isShutdown()) { errorTask = errorExecutor.schedule( () -> { - if (syncResources.getPreviousEvent() == ProviderEvent.PROVIDER_ERROR) { + if (syncResources.getPreviousEvent() == ProviderEvent.PROVIDER_STALE) { log.error( "Provider did not reconnect successfully within {}s. Emitting ERROR event...", gracePeriod); diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolver.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolver.java index e54c938cf..f313d943b 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolver.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolver.java @@ -77,6 +77,9 @@ public void init() throws Exception { storageStateChange.getSyncMetadata())); log.debug("post onConnectionEvent.accept ProviderEvent.PROVIDER_CONFIGURATION_CHANGED"); break; + case STALE: + onConnectionEvent.accept(new FlagdProviderEvent(ProviderEvent.PROVIDER_STALE)); + break; case ERROR: onConnectionEvent.accept(new FlagdProviderEvent(ProviderEvent.PROVIDER_ERROR)); break; diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStore.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStore.java index eaa3dfa5f..a01f93c23 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStore.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStore.java @@ -138,6 +138,11 @@ private void streamerListener(final QueueSource connector) throws InterruptedExc } break; case ERROR: + if (!stateBlockingQueue.offer(new StorageStateChange(StorageState.STALE))) { + log.warn("Failed to convey STALE status, queue is full"); + } + break; + case FATAL: if (!stateBlockingQueue.offer(new StorageStateChange(StorageState.ERROR))) { log.warn("Failed to convey ERROR status, queue is full"); } diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/QueuePayloadType.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/QueuePayloadType.java index 93675fb60..74e02912e 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/QueuePayloadType.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/QueuePayloadType.java @@ -3,5 +3,6 @@ /** Payload type emitted by {@link QueueSource}. */ public enum QueuePayloadType { DATA, - ERROR + ERROR, + FATAL } diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java index 0cc787db4..915855b27 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java @@ -16,7 +16,6 @@ import dev.openfeature.flagd.grpc.sync.Sync.SyncFlagsRequest; import dev.openfeature.flagd.grpc.sync.Sync.SyncFlagsResponse; import dev.openfeature.sdk.Awaitable; -import dev.openfeature.sdk.exceptions.FatalError; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.grpc.Status; import io.grpc.StatusRuntimeException; @@ -51,7 +50,7 @@ public class SyncStreamQueueSource implements QueueSource { private final BlockingQueue outgoingQueue = new LinkedBlockingQueue<>(QUEUE_SIZE); private final FlagSyncServiceStub flagSyncStub; private final FlagSyncServiceBlockingStub metadataStub; - private final List nonRetryableStatusCodes; + private final List fatalStatusCodes; /** * Creates a new SyncStreamQueueSource responsible for observing the event stream. @@ -68,7 +67,7 @@ public SyncStreamQueueSource(final FlagdOptions options, Consumer queue, String message) { if (!queue.offer(new QueuePayload(QueuePayloadType.ERROR, message, null))) { log.error("Failed to convey ERROR status, queue is full"); } } + private static void enqueueFatal(BlockingQueue queue, String message) { + if (!queue.offer(new QueuePayload(QueuePayloadType.FATAL, message, null))) { + log.error("Failed to convey FATAL status, queue is full"); + } + } + private static class SyncStreamObserver implements StreamObserver { private final BlockingQueue outgoingQueue; private final AtomicBoolean shouldThrottle; private final Awaitable done = new Awaitable(); + private final List fatalStatusCodes; private Struct metadata; - public SyncStreamObserver(BlockingQueue outgoingQueue, AtomicBoolean shouldThrottle) { + public SyncStreamObserver(BlockingQueue outgoingQueue, AtomicBoolean shouldThrottle, List fatalStatusCodes) { this.outgoingQueue = outgoingQueue; this.shouldThrottle = shouldThrottle; + this.fatalStatusCodes = fatalStatusCodes; } @Override @@ -260,9 +275,14 @@ public void onNext(SyncFlagsResponse syncFlagsResponse) { @Override public void onError(Throwable throwable) { try { + Status status = Status.fromThrowable(throwable); String message = throwable != null ? throwable.getMessage() : "unknown"; log.debug("Stream error: {}, will restart", message, throwable); - enqueueError(outgoingQueue, String.format("Error from stream: %s", message)); + if (fatalStatusCodes.contains(status.getCode())) { + enqueueFatal(outgoingQueue, String.format("Error from stream: %s", message)); + } else { + enqueueError(outgoingQueue, String.format("Error from stream: %s", message)); + } // Set throttling flag to ensure backoff before retry this.shouldThrottle.set(true); diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/ProviderSteps.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/ProviderSteps.java index e2c7eef1e..230446f88 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/ProviderSteps.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/ProviderSteps.java @@ -202,6 +202,6 @@ public void the_flag_was_modded() { @Then("the client is in {} state") public void the_client_is_in_fatal_state(String clientState) { - assertThat(state.client.getProviderState()).isEqualTo(ProviderState.FATAL); + assertThat(state.client.getProviderState()).isEqualTo(ProviderState.valueOf(clientState.toUpperCase())); } } diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/config/ConfigSteps.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/config/ConfigSteps.java index c1dad8dae..25a6cdc7d 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/config/ConfigSteps.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/config/ConfigSteps.java @@ -121,7 +121,6 @@ private static String mapOptionNames(String option) { propertyMapper.put("keepAliveTime", "keepAlive"); propertyMapper.put("retryBackoffMaxMs", "keepAlive"); propertyMapper.put("cache", "cacheType"); - propertyMapper.put("fatalStatusCodes", "nonRetryableStatusCodes"); if (propertyMapper.get(option) != null) { option = propertyMapper.get(option); From 654c8dad658e891f61cd8af51ed7385e4a8cabe2 Mon Sep 17 00:00:00 2001 From: lea konvalinka Date: Mon, 24 Nov 2025 10:49:57 +0100 Subject: [PATCH 03/17] fix(flagd): update testbed + step, fix event Signed-off-by: lea konvalinka --- .../providers/flagd/resolver/common/ChannelConnector.java | 2 +- .../contrib/providers/flagd/e2e/steps/ProviderSteps.java | 4 ++-- .../openfeature/contrib/providers/flagd/e2e/steps/Utils.java | 5 ++++- providers/flagd/test-harness | 2 +- 4 files changed, 8 insertions(+), 5 deletions(-) diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/ChannelConnector.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/ChannelConnector.java index 6261affe7..032b1766c 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/ChannelConnector.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/ChannelConnector.java @@ -86,7 +86,7 @@ private void onStateChange() { log.debug("Channel state changed to: {}", currentState); if (currentState == ConnectivityState.TRANSIENT_FAILURE || currentState == ConnectivityState.SHUTDOWN) { this.onConnectionEvent.accept(new FlagdProviderEvent( - ProviderEvent.PROVIDER_ERROR, Collections.emptyList(), new ImmutableStructure())); + ProviderEvent.PROVIDER_STALE, Collections.emptyList(), new ImmutableStructure())); } if (currentState != ConnectivityState.SHUTDOWN) { log.debug("continuing to monitor the grpc channel"); diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/ProviderSteps.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/ProviderSteps.java index 230446f88..0467c56e5 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/ProviderSteps.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/ProviderSteps.java @@ -200,8 +200,8 @@ public void the_flag_was_modded() { .statusCode(200); } - @Then("the client is in {} state") - public void the_client_is_in_fatal_state(String clientState) { + @Then("the client should be in {} state") + public void the_client_should_be_in_fatal_state(String clientState) { assertThat(state.client.getProviderState()).isEqualTo(ProviderState.valueOf(clientState.toUpperCase())); } } diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/Utils.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/Utils.java index c50c08397..a89f8560e 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/Utils.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/Utils.java @@ -4,8 +4,10 @@ import dev.openfeature.contrib.providers.flagd.resolver.rpc.cache.CacheType; import dev.openfeature.sdk.Value; import java.io.IOException; +import java.util.Arrays; import java.util.List; import java.util.Objects; +import java.util.stream.Collectors; import org.testcontainers.shaded.com.fasterxml.jackson.databind.ObjectMapper; public final class Utils { @@ -39,7 +41,8 @@ public static Object convert(String value, String type) throws ClassNotFoundExce case "CacheType": return CacheType.valueOf(value.toUpperCase()).getValue(); case "StringList": - return List.of(value); + return value.isEmpty() ? List.of() : Arrays.stream(value.split(",")).map(String::trim).collect( + Collectors.toList()); case "Object": return Value.objectToValue(new ObjectMapper().readValue(value, Object.class)); } diff --git a/providers/flagd/test-harness b/providers/flagd/test-harness index bde8977a4..6948dcbab 160000 --- a/providers/flagd/test-harness +++ b/providers/flagd/test-harness @@ -1 +1 @@ -Subproject commit bde8977a4fa2b59ba4359bcf902e9adf4555d085 +Subproject commit 6948dcbabef284fae4a4c1d03ce5e0bd9ea34c17 From 07195a7180817c06c866210b6f46eddaf9f3ee28 Mon Sep 17 00:00:00 2001 From: lea konvalinka Date: Fri, 12 Dec 2025 12:04:41 +0100 Subject: [PATCH 04/17] adjust rpc resolver Signed-off-by: lea konvalinka --- .../contrib/providers/flagd/FlagdProvider.java | 3 +-- .../connector/sync/SyncStreamQueueSource.java | 1 - .../flagd/resolver/rpc/RpcResolver.java | 18 ++++++++++++++++-- 3 files changed, 17 insertions(+), 5 deletions(-) diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java index 082f5a59e..ddd87f949 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java @@ -194,7 +194,7 @@ EvaluationContext getEnrichedContext() { @SuppressWarnings("checkstyle:fallthrough") private void onProviderEvent(FlagdProviderEvent flagdProviderEvent) { - log.info("FlagdProviderEvent event {} ", flagdProviderEvent.getEvent()); + log.debug("FlagdProviderEvent event {} ", flagdProviderEvent.getEvent()); synchronized (syncResources) { /* * We only use Error and Ready as previous states. @@ -240,7 +240,6 @@ private void onProviderEvent(FlagdProviderEvent flagdProviderEvent) { private void onError() { this.emitProviderError(ProviderEventDetails.builder().errorCode(ErrorCode.PROVIDER_FATAL).build()); - shutdown(); } private void onConfigurationChanged(FlagdProviderEvent flagdProviderEvent) { diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java index 915855b27..a0e66ee97 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java @@ -143,7 +143,6 @@ private void observeSyncStream() { if (fatalStatusCodes.contains(metaEx.getStatus().getCode().name())) { //throw new FatalError("Failed to connect for metadata request, not retrying for error " + metaEx.getStatus()); enqueueFatal("Fatal: Failed to connect for metadata request, not retrying for error " + metaEx.getStatus().getCode()); - return; } // retry for other status codes String message = metaEx.getMessage(); diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/rpc/RpcResolver.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/rpc/RpcResolver.java index 1f3101d00..eebd7f2a0 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/rpc/RpcResolver.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/rpc/RpcResolver.java @@ -68,6 +68,7 @@ public final class RpcResolver implements Resolver { private final Consumer onProviderEvent; private final ServiceStub stub; private final ServiceBlockingStub blockingStub; + private final List fatalStatusCodes; /** * Resolves flag values using @@ -89,6 +90,7 @@ public RpcResolver( this.stub = ServiceGrpc.newStub(this.connector.getChannel()).withWaitForReady(); this.blockingStub = ServiceGrpc.newBlockingStub(this.connector.getChannel()).withWaitForReady(); + this.fatalStatusCodes = options.getFatalStatusCodes(); } // testing only @@ -107,6 +109,7 @@ protected RpcResolver( this.onProviderEvent = onProviderEvent; this.stub = mockStub; this.blockingStub = mockBlockingStub; + this.fatalStatusCodes = options.getFatalStatusCodes(); } /** @@ -353,7 +356,12 @@ private void observeEventStream() throws InterruptedException { log.debug( "Exception in event stream connection, streamException {}, will reconnect", streamException); - this.handleErrorOrComplete(); + if (streamException instanceof StatusRuntimeException && fatalStatusCodes.contains( + ((StatusRuntimeException) streamException).getStatus().getCode().name())) { + this.handleFatalError(); + } else { + this.handleErrorOrComplete(); + } break; } @@ -412,9 +420,15 @@ private void handleProviderReadyEvent() { * Handles provider error events by clearing the cache (if enabled) and notifying listeners of the error. */ private void handleErrorOrComplete() { - log.debug("Emitting provider error event"); + log.debug("Emitting provider stale event"); // complete is an error, logically...even if the server went down gracefully we need to reconnect. + onProviderEvent.accept(new FlagdProviderEvent(ProviderEvent.PROVIDER_STALE)); + } + + private void handleFatalError() { + log.debug("Emitting provider error event"); + onProviderEvent.accept(new FlagdProviderEvent(ProviderEvent.PROVIDER_ERROR)); } } From e6d40578c72c67f2f9366211b7f32585f5de5ab6 Mon Sep 17 00:00:00 2001 From: lea konvalinka Date: Wed, 17 Dec 2025 10:15:55 +0100 Subject: [PATCH 05/17] fix e2e tests Signed-off-by: Konvalinka --- .../contrib/providers/flagd/Config.java | 14 +++++++++++ .../contrib/providers/flagd/FlagdOptions.java | 4 ++-- .../process/storage/StorageState.java | 2 +- .../connector/sync/SyncStreamQueueSource.java | 23 +++++++++---------- .../providers/flagd/e2e/RunInProcessTest.java | 2 +- providers/flagd/test-harness | 2 +- 6 files changed, 30 insertions(+), 17 deletions(-) diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/Config.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/Config.java index 417826437..a2ae3e9ea 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/Config.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/Config.java @@ -1,7 +1,10 @@ package dev.openfeature.contrib.providers.flagd; import dev.openfeature.contrib.providers.flagd.resolver.rpc.cache.CacheType; +import java.util.Arrays; +import java.util.List; import java.util.function.Function; +import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; /** Helper class to hold configuration default values. */ @@ -36,6 +39,7 @@ public final class Config { static final String FLAGD_RETRY_BACKOFF_MAX_MS_VAR_NAME = "FLAGD_RETRY_BACKOFF_MAX_MS"; static final String STREAM_DEADLINE_MS_ENV_VAR_NAME = "FLAGD_STREAM_DEADLINE_MS"; static final String SOURCE_SELECTOR_ENV_VAR_NAME = "FLAGD_SOURCE_SELECTOR"; + static final String FATAL_STATUS_CODES_ENV_VAR_NAME = "FLAGD_FATAL_STATUS_CODES"; /** * Environment variable to fetch Provider id. * @@ -91,6 +95,16 @@ static long fallBackToEnvOrDefault(String key, long defaultValue) { } } + static List fallBackToEnvOrDefaultList(String key, List defaultValue) { + try { + return System.getenv(key) != null ? Arrays.stream(System.getenv(key).split(",")) + .map(String::trim) + .collect(Collectors.toList()) : defaultValue; + } catch (Exception e) { + return defaultValue; + } + } + static Resolver fromValueProvider(Function provider) { final String resolverVar = provider.apply(RESOLVER_ENV_VAR); if (resolverVar == null) { diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdOptions.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdOptions.java index f537dfb25..1005c4a4c 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdOptions.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdOptions.java @@ -1,6 +1,7 @@ package dev.openfeature.contrib.providers.flagd; import static dev.openfeature.contrib.providers.flagd.Config.fallBackToEnvOrDefault; +import static dev.openfeature.contrib.providers.flagd.Config.fallBackToEnvOrDefaultList; import static dev.openfeature.contrib.providers.flagd.Config.fromValueProvider; import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueueSource; @@ -10,7 +11,6 @@ import io.grpc.ClientInterceptor; import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.api.OpenTelemetry; -import java.util.ArrayList; import java.util.List; import java.util.function.Function; import lombok.Builder; @@ -129,7 +129,7 @@ public class FlagdOptions { * Defaults to empty list */ @Builder.Default - private List fatalStatusCodes = new ArrayList<>(); + private List fatalStatusCodes = fallBackToEnvOrDefaultList(Config.FATAL_STATUS_CODES_ENV_VAR_NAME, List.of()); /** * Selector to be used with flag sync gRPC contract. diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/StorageState.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/StorageState.java index c47670b7d..d6b8b30c5 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/StorageState.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/StorageState.java @@ -1,6 +1,6 @@ package dev.openfeature.contrib.providers.flagd.resolver.process.storage; -/** Satus of the storage. */ +/** Status of the storage. */ public enum StorageState { /** Storage is upto date and working as expected. */ OK, diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java index 61ecca041..c49423c7d 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java @@ -141,13 +141,13 @@ private void observeSyncStream() { observer.metadata = getMetadata(); } catch (StatusRuntimeException metaEx) { if (fatalStatusCodes.contains(metaEx.getStatus().getCode().name())) { - //throw new FatalError("Failed to connect for metadata request, not retrying for error " + metaEx.getStatus()); - enqueueFatal("Fatal: Failed to connect for metadata request, not retrying for error " + metaEx.getStatus().getCode()); + enqueueFatal(String.format("Fatal: Failed to connect for metadata request, not retrying for error %s", metaEx.getStatus().getCode())); + } else { + // retry for other status codes + String message = metaEx.getMessage(); + log.debug("Metadata request error: {}, will restart", message, metaEx); + enqueueError(String.format("Error in getMetadata request: %s", message)); } - // retry for other status codes - String message = metaEx.getMessage(); - log.debug("Metadata request error: {}, will restart", message, metaEx); - enqueueError(String.format("Error in getMetadata request: %s", message)); shouldThrottle.set(true); continue; } @@ -156,13 +156,12 @@ private void observeSyncStream() { syncFlags(observer); } catch (StatusRuntimeException ex) { if (fatalStatusCodes.contains(ex.getStatus().getCode().toString())) { - //throw new FatalError("Failed to connect for metadata request, not retrying for error " + ex.getStatus().getCode()); - enqueueFatal("Fatal: Failed to connect for metadata request, not retrying for error " + ex.getStatus().getCode()); - return; + enqueueFatal(String.format("Fatal: Failed to connect for metadata request, not retrying for error %s", ex.getStatus().getCode())); + } else { + // retry for other status codes + log.error("Unexpected sync stream exception, will restart.", ex); + enqueueError(String.format("Error in syncStream: %s", ex.getMessage())); } - // retry for other status codes - log.error("Unexpected sync stream exception, will restart.", ex); - enqueueError(String.format("Error in syncStream: %s", ex.getMessage())); shouldThrottle.set(true); } } catch (InterruptedException ie) { diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/RunInProcessTest.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/RunInProcessTest.java index c694aa9ef..3f859d984 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/RunInProcessTest.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/RunInProcessTest.java @@ -28,7 +28,7 @@ @ConfigurationParameter(key = GLUE_PROPERTY_NAME, value = "dev.openfeature.contrib.providers.flagd.e2e.steps") @ConfigurationParameter(key = OBJECT_FACTORY_PROPERTY_NAME, value = "io.cucumber.picocontainer.PicoFactory") @IncludeTags("in-process") -@ExcludeTags({"unixsocket"}) +@ExcludeTags({"unixsocket","sync-port"}) @Testcontainers public class RunInProcessTest { diff --git a/providers/flagd/test-harness b/providers/flagd/test-harness index 6948dcbab..9b73b3a95 160000 --- a/providers/flagd/test-harness +++ b/providers/flagd/test-harness @@ -1 +1 @@ -Subproject commit 6948dcbabef284fae4a4c1d03ce5e0bd9ea34c17 +Subproject commit 9b73b3a95cd9e0885937d244b118713b26374b1d From 95a880c68e488774e0eef03152c87d0a08524786 Mon Sep 17 00:00:00 2001 From: lea konvalinka Date: Wed, 17 Dec 2025 10:44:26 +0100 Subject: [PATCH 06/17] clean up Signed-off-by: Konvalinka --- .../openfeature/contrib/providers/flagd/FlagdProvider.java | 2 +- .../process/storage/connector/sync/SyncStreamQueueSource.java | 2 ++ .../contrib/providers/flagd/resolver/rpc/RpcResolver.java | 4 ++++ .../contrib/providers/flagd/e2e/steps/ProviderSteps.java | 2 +- 4 files changed, 8 insertions(+), 2 deletions(-) diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java index ddd87f949..88cd71fa7 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java @@ -136,7 +136,7 @@ public void initialize(EvaluationContext evaluationContext) throws Exception { public void shutdown() { synchronized (syncResources) { try { - if (syncResources.isShutDown()) { + if (!syncResources.isInitialized() || syncResources.isShutDown()) { return; } diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java index 5613f50cb..8545519d4 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java @@ -138,6 +138,7 @@ private void observeSyncStream() { observer.metadata = getMetadata(); } catch (StatusRuntimeException metaEx) { if (fatalStatusCodes.contains(metaEx.getStatus().getCode().name())) { + log.debug("Fatal status code for metadata request: {}, not retrying", metaEx.getStatus().getCode()); enqueueFatal(String.format("Fatal: Failed to connect for metadata request, not retrying for error %s", metaEx.getStatus().getCode())); } else { // retry for other status codes @@ -153,6 +154,7 @@ private void observeSyncStream() { syncFlags(observer); } catch (StatusRuntimeException ex) { if (fatalStatusCodes.contains(ex.getStatus().getCode().toString())) { + log.debug("Fatal status code during sync stream: {}, not retrying", ex.getStatus().getCode()); enqueueFatal(String.format("Fatal: Failed to connect for metadata request, not retrying for error %s", ex.getStatus().getCode())); } else { // retry for other status codes diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/rpc/RpcResolver.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/rpc/RpcResolver.java index 5df2d7844..15fea898b 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/rpc/RpcResolver.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/rpc/RpcResolver.java @@ -424,6 +424,10 @@ private void handleErrorOrComplete() { onProviderEvent.accept(new FlagdProviderEvent(ProviderEvent.PROVIDER_STALE)); } + /** + * Handles fatal error events (i.e. error codes defined in fatalStatusCodes) by transitioning the provider into + * fatal state + */ private void handleFatalError() { log.debug("Emitting provider error event"); diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/ProviderSteps.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/ProviderSteps.java index 489002f68..90d082292 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/ProviderSteps.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/ProviderSteps.java @@ -53,7 +53,7 @@ public static void beforeAll() throws IOException { .withExposedService("flagd", 8015, Wait.forListeningPort()) .withExposedService("flagd", 8080, Wait.forListeningPort()) .withExposedService("envoy", 9211, Wait.forListeningPort()) - .withExposedService("envoy", 9212, Wait.forListeningPort()) + .withExposedService("envoy", FORBIDDEN_PORT, Wait.forListeningPort()) .withStartupTimeout(Duration.ofSeconds(45)); container.start(); } From 45a9822a8555ba9dcca3ba2dc753bd1308f018d1 Mon Sep 17 00:00:00 2001 From: lea konvalinka Date: Wed, 17 Dec 2025 11:27:58 +0100 Subject: [PATCH 07/17] fatal only on first connection Signed-off-by: Konvalinka --- .../storage/connector/sync/SyncStreamQueueSource.java | 6 ++++-- .../providers/flagd/resolver/rpc/RpcResolver.java | 11 +++++++---- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java index 8545519d4..1118dc7e0 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java @@ -38,6 +38,7 @@ public class SyncStreamQueueSource implements QueueSource { private final AtomicBoolean shutdown = new AtomicBoolean(false); private final AtomicBoolean shouldThrottle = new AtomicBoolean(false); + private final AtomicBoolean successfulSync = new AtomicBoolean(false); private final int streamDeadline; private final int deadline; private final int maxBackoffMs; @@ -137,7 +138,7 @@ private void observeSyncStream() { try { observer.metadata = getMetadata(); } catch (StatusRuntimeException metaEx) { - if (fatalStatusCodes.contains(metaEx.getStatus().getCode().name())) { + if (fatalStatusCodes.contains(metaEx.getStatus().getCode().name()) && !successfulSync.get()) { log.debug("Fatal status code for metadata request: {}, not retrying", metaEx.getStatus().getCode()); enqueueFatal(String.format("Fatal: Failed to connect for metadata request, not retrying for error %s", metaEx.getStatus().getCode())); } else { @@ -152,8 +153,9 @@ private void observeSyncStream() { try { syncFlags(observer); + successfulSync.set(true); } catch (StatusRuntimeException ex) { - if (fatalStatusCodes.contains(ex.getStatus().getCode().toString())) { + if (fatalStatusCodes.contains(ex.getStatus().getCode().toString()) && !successfulSync.get()) { log.debug("Fatal status code during sync stream: {}, not retrying", ex.getStatus().getCode()); enqueueFatal(String.format("Fatal: Failed to connect for metadata request, not retrying for error %s", ex.getStatus().getCode())); } else { diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/rpc/RpcResolver.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/rpc/RpcResolver.java index 15fea898b..1f601151e 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/rpc/RpcResolver.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/rpc/RpcResolver.java @@ -60,6 +60,7 @@ public final class RpcResolver implements Resolver { private static final int QUEUE_SIZE = 5; private final AtomicBoolean shutdown = new AtomicBoolean(false); + private final AtomicBoolean successfulConnection = new AtomicBoolean(false); private final ChannelConnector connector; private final Cache cache; private final ResolveStrategy strategy; @@ -351,18 +352,20 @@ private void observeEventStream() throws InterruptedException { Throwable streamException = taken.getError(); if (streamException != null) { - log.debug( - "Exception in event stream connection, streamException {}, will reconnect", - streamException); if (streamException instanceof StatusRuntimeException && fatalStatusCodes.contains( - ((StatusRuntimeException) streamException).getStatus().getCode().name())) { + ((StatusRuntimeException) streamException).getStatus().getCode().name()) && !successfulConnection.get()) { + log.debug("Fatal error code received: {}", ((StatusRuntimeException) streamException).getStatus().getCode()); this.handleFatalError(); } else { + log.debug( + "Exception in event stream connection, streamException {}, will reconnect", + streamException); this.handleErrorOrComplete(); } break; } + successfulConnection.set(true); final EventStreamResponse response = taken.getResponse(); log.debug("Got stream response: {}", response); From e50aa7f69fe619d3407cf678abb91ba01b947b8b Mon Sep 17 00:00:00 2001 From: lea konvalinka Date: Wed, 17 Dec 2025 12:45:54 +0100 Subject: [PATCH 08/17] remove exclusion of sync e2e test tag Signed-off-by: Konvalinka --- .../contrib/providers/flagd/e2e/RunInProcessTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/RunInProcessTest.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/RunInProcessTest.java index 3f859d984..c694aa9ef 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/RunInProcessTest.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/RunInProcessTest.java @@ -28,7 +28,7 @@ @ConfigurationParameter(key = GLUE_PROPERTY_NAME, value = "dev.openfeature.contrib.providers.flagd.e2e.steps") @ConfigurationParameter(key = OBJECT_FACTORY_PROPERTY_NAME, value = "io.cucumber.picocontainer.PicoFactory") @IncludeTags("in-process") -@ExcludeTags({"unixsocket","sync-port"}) +@ExcludeTags({"unixsocket"}) @Testcontainers public class RunInProcessTest { From a636257449bec6312447df0df5d7b95f32ac5b7f Mon Sep 17 00:00:00 2001 From: lea konvalinka Date: Fri, 19 Dec 2025 10:25:33 +0100 Subject: [PATCH 09/17] add shutdown after fatal, fix tests Signed-off-by: Konvalinka --- .../dev/openfeature/contrib/providers/flagd/FlagdOptions.java | 2 +- .../openfeature/contrib/providers/flagd/FlagdProvider.java | 1 + .../process/storage/connector/sync/SyncStreamQueueSource.java | 2 ++ .../flagd/resolver/process/storage/FlagStoreTest.java | 4 ++-- .../contrib/providers/flagd/resolver/rpc/RpcResolverTest.java | 4 ++-- 5 files changed, 8 insertions(+), 5 deletions(-) diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdOptions.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdOptions.java index 1005c4a4c..0cd0d97f4 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdOptions.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdOptions.java @@ -125,7 +125,7 @@ public class FlagdOptions { fallBackToEnvOrDefault(Config.STREAM_RETRY_GRACE_PERIOD, Config.DEFAULT_STREAM_RETRY_GRACE_PERIOD); /** - * List of grpc response status codes for which failed connections are not retried. + * List of grpc response status codes for which the provider transitions into fatal state upon first connection. * Defaults to empty list */ @Builder.Default diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java index 88cd71fa7..f7e69f476 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java @@ -240,6 +240,7 @@ private void onProviderEvent(FlagdProviderEvent flagdProviderEvent) { private void onError() { this.emitProviderError(ProviderEventDetails.builder().errorCode(ErrorCode.PROVIDER_FATAL).build()); + this.shutdown(); } private void onConfigurationChanged(FlagdProviderEvent flagdProviderEvent) { diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java index 1118dc7e0..e2db09ea0 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java @@ -141,6 +141,7 @@ private void observeSyncStream() { if (fatalStatusCodes.contains(metaEx.getStatus().getCode().name()) && !successfulSync.get()) { log.debug("Fatal status code for metadata request: {}, not retrying", metaEx.getStatus().getCode()); enqueueFatal(String.format("Fatal: Failed to connect for metadata request, not retrying for error %s", metaEx.getStatus().getCode())); + return; } else { // retry for other status codes String message = metaEx.getMessage(); @@ -158,6 +159,7 @@ private void observeSyncStream() { if (fatalStatusCodes.contains(ex.getStatus().getCode().toString()) && !successfulSync.get()) { log.debug("Fatal status code during sync stream: {}, not retrying", ex.getStatus().getCode()); enqueueFatal(String.format("Fatal: Failed to connect for metadata request, not retrying for error %s", ex.getStatus().getCode())); + return; } else { // retry for other status codes log.error("Unexpected sync stream exception, will restart.", ex); diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStoreTest.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStoreTest.java index 86ca298e3..e58b4eb3f 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStoreTest.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStoreTest.java @@ -64,14 +64,14 @@ void connectorHandling() throws Exception { }); assertTimeoutPreemptively(Duration.ofMillis(maxDelay), () -> { - assertEquals(StorageState.ERROR, states.take().getStorageState()); + assertEquals(StorageState.STALE, states.take().getStorageState()); }); // Shutdown handling store.shutdown(); assertTimeoutPreemptively(Duration.ofMillis(maxDelay), () -> { - assertEquals(StorageState.ERROR, states.take().getStorageState()); + assertEquals(StorageState.STALE, states.take().getStorageState()); }); } diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/rpc/RpcResolverTest.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/rpc/RpcResolverTest.java index 119f9e2e6..0ed2b8e3c 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/rpc/RpcResolverTest.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/rpc/RpcResolverTest.java @@ -111,7 +111,7 @@ void onCompletedRerunsStreamWithError() throws Exception { // should run consumer with error await().untilAsserted(() -> - verify(consumer).accept(argThat((arg) -> arg.getEvent() == ProviderEvent.PROVIDER_ERROR))); + verify(consumer).accept(argThat((arg) -> arg.getEvent() == ProviderEvent.PROVIDER_STALE))); // should have restarted the stream (2 calls) await().untilAsserted(() -> verify(stub, times(2)).eventStream(any(), any())); } @@ -128,7 +128,7 @@ void onErrorRunsConsumerWithError() throws Exception { // should run consumer with error await().untilAsserted(() -> - verify(consumer).accept(argThat((arg) -> arg.getEvent() == ProviderEvent.PROVIDER_ERROR))); + verify(consumer).accept(argThat((arg) -> arg.getEvent() == ProviderEvent.PROVIDER_STALE))); // should have restarted the stream (2 calls) await().untilAsserted(() -> verify(stub, times(2)).eventStream(any(), any())); } From d27e4e9eb02c0b73521d17ba8c01a835c9c9706b Mon Sep 17 00:00:00 2001 From: lea konvalinka Date: Fri, 19 Dec 2025 11:00:18 +0100 Subject: [PATCH 10/17] remove shutdown Signed-off-by: Konvalinka --- .../dev/openfeature/contrib/providers/flagd/FlagdProvider.java | 1 - 1 file changed, 1 deletion(-) diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java index 8690a2037..4bd250377 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java @@ -240,7 +240,6 @@ private void onProviderEvent(FlagdProviderEvent flagdProviderEvent) { private void onError() { this.emitProviderError(ProviderEventDetails.builder().errorCode(ErrorCode.PROVIDER_FATAL).build()); - this.shutdown(); } private void onConfigurationChanged(FlagdProviderEvent flagdProviderEvent) { From 94c76912df1848d3dc3858a34672d7e70255c4df Mon Sep 17 00:00:00 2001 From: lea konvalinka Date: Fri, 19 Dec 2025 11:16:06 +0100 Subject: [PATCH 11/17] fix lint issues Signed-off-by: Konvalinka --- .../contrib/providers/flagd/FlagdOptions.java | 3 ++- .../connector/sync/SyncStreamQueueSource.java | 24 ++++++++++++------- .../flagd/resolver/rpc/RpcResolver.java | 6 +++-- 3 files changed, 21 insertions(+), 12 deletions(-) diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdOptions.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdOptions.java index e92c3f0a6..79ac22a86 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdOptions.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdOptions.java @@ -129,7 +129,8 @@ public class FlagdOptions { * Defaults to empty list */ @Builder.Default - private List fatalStatusCodes = fallBackToEnvOrDefaultList(Config.FATAL_STATUS_CODES_ENV_VAR_NAME, List.of()); + private List fatalStatusCodes = + fallBackToEnvOrDefaultList(Config.FATAL_STATUS_CODES_ENV_VAR_NAME, List.of()); /** * Selector to be used with flag sync gRPC contract. diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java index 8cc178580..3522a18b7 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java @@ -195,8 +195,11 @@ private void observeSyncStream() { observer.metadata = getMetadata(); } catch (StatusRuntimeException metaEx) { if (fatalStatusCodes.contains(metaEx.getStatus().getCode().name()) && !successfulSync.get()) { - log.debug("Fatal status code for metadata request: {}, not retrying", metaEx.getStatus().getCode()); - enqueueFatal(String.format("Fatal: Failed to connect for metadata request, not retrying for error %s", metaEx.getStatus().getCode())); + log.debug("Fatal status code for metadata request: {}, not retrying", + metaEx.getStatus().getCode()); + enqueueFatal(String.format( + "Fatal: Failed to connect for metadata request, not retrying for error %s", + metaEx.getStatus().getCode())); return; } else { // retry for other status codes @@ -214,7 +217,9 @@ private void observeSyncStream() { } catch (StatusRuntimeException ex) { if (fatalStatusCodes.contains(ex.getStatus().getCode().toString()) && !successfulSync.get()) { log.debug("Fatal status code during sync stream: {}, not retrying", ex.getStatus().getCode()); - enqueueFatal(String.format("Fatal: Failed to connect for metadata request, not retrying for error %s", ex.getStatus().getCode())); + enqueueFatal(String.format( + "Fatal: Failed to connect for metadata request, not retrying for error %s", + ex.getStatus().getCode())); return; } else { // retry for other status codes @@ -289,16 +294,16 @@ private void enqueueError(String message) { enqueueError(outgoingQueue, message); } - private void enqueueFatal(String message) { - enqueueFatal(outgoingQueue, message); - } - private static void enqueueError(BlockingQueue queue, String message) { if (!queue.offer(new QueuePayload(QueuePayloadType.ERROR, message, null))) { log.error("Failed to convey ERROR status, queue is full"); } } + private void enqueueFatal(String message) { + enqueueFatal(outgoingQueue, message); + } + private static void enqueueFatal(BlockingQueue queue, String message) { if (!queue.offer(new QueuePayload(QueuePayloadType.FATAL, message, null))) { log.error("Failed to convey FATAL status, queue is full"); @@ -313,7 +318,8 @@ private static class SyncStreamObserver implements StreamObserver outgoingQueue, AtomicBoolean shouldThrottle, List fatalStatusCodes) { + public SyncStreamObserver(BlockingQueue outgoingQueue, AtomicBoolean shouldThrottle, + List fatalStatusCodes) { this.outgoingQueue = outgoingQueue; this.shouldThrottle = shouldThrottle; this.fatalStatusCodes = fatalStatusCodes; @@ -337,7 +343,7 @@ public void onError(Throwable throwable) { Status status = Status.fromThrowable(throwable); String message = throwable != null ? throwable.getMessage() : "unknown"; log.debug("Stream error: {}, will restart", message, throwable); - if (fatalStatusCodes.contains(status.getCode())) { + if (fatalStatusCodes.contains(status.getCode().name())) { enqueueFatal(outgoingQueue, String.format("Error from stream: %s", message)); } else { enqueueError(outgoingQueue, String.format("Error from stream: %s", message)); diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/rpc/RpcResolver.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/rpc/RpcResolver.java index 1f601151e..15b5c8e3d 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/rpc/RpcResolver.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/rpc/RpcResolver.java @@ -353,8 +353,10 @@ private void observeEventStream() throws InterruptedException { Throwable streamException = taken.getError(); if (streamException != null) { if (streamException instanceof StatusRuntimeException && fatalStatusCodes.contains( - ((StatusRuntimeException) streamException).getStatus().getCode().name()) && !successfulConnection.get()) { - log.debug("Fatal error code received: {}", ((StatusRuntimeException) streamException).getStatus().getCode()); + ((StatusRuntimeException) streamException).getStatus().getCode().name()) + && !successfulConnection.get()) { + log.debug("Fatal error code received: {}", + ((StatusRuntimeException) streamException).getStatus().getCode()); this.handleFatalError(); } else { log.debug( From ee424052eb5d5b1b77648a3897c84f11bb5537cb Mon Sep 17 00:00:00 2001 From: lea konvalinka Date: Mon, 22 Dec 2025 09:13:34 +0100 Subject: [PATCH 12/17] fix spotless Signed-off-by: Konvalinka --- .../contrib/providers/flagd/Config.java | 8 +++++--- .../contrib/providers/flagd/FlagdProvider.java | 4 +++- .../connector/sync/SyncStreamQueueSource.java | 11 ++++++++--- .../providers/flagd/resolver/rpc/RpcResolver.java | 14 ++++++++++---- .../contrib/providers/flagd/e2e/steps/Utils.java | 5 +++-- 5 files changed, 29 insertions(+), 13 deletions(-) diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/Config.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/Config.java index 3adc9fbdb..f88cfdd13 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/Config.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/Config.java @@ -99,9 +99,11 @@ static long fallBackToEnvOrDefault(String key, long defaultValue) { static List fallBackToEnvOrDefaultList(String key, List defaultValue) { try { - return System.getenv(key) != null ? Arrays.stream(System.getenv(key).split(",")) - .map(String::trim) - .collect(Collectors.toList()) : defaultValue; + return System.getenv(key) != null + ? Arrays.stream(System.getenv(key).split(",")) + .map(String::trim) + .collect(Collectors.toList()) + : defaultValue; } catch (Exception e) { return defaultValue; } diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java index 4bd250377..7961c6cdc 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java @@ -239,7 +239,9 @@ private void onProviderEvent(FlagdProviderEvent flagdProviderEvent) { } private void onError() { - this.emitProviderError(ProviderEventDetails.builder().errorCode(ErrorCode.PROVIDER_FATAL).build()); + this.emitProviderError(ProviderEventDetails.builder() + .errorCode(ErrorCode.PROVIDER_FATAL) + .build()); } private void onConfigurationChanged(FlagdProviderEvent flagdProviderEvent) { diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java index 3522a18b7..4dfab4394 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java @@ -195,7 +195,8 @@ private void observeSyncStream() { observer.metadata = getMetadata(); } catch (StatusRuntimeException metaEx) { if (fatalStatusCodes.contains(metaEx.getStatus().getCode().name()) && !successfulSync.get()) { - log.debug("Fatal status code for metadata request: {}, not retrying", + log.debug( + "Fatal status code for metadata request: {}, not retrying", metaEx.getStatus().getCode()); enqueueFatal(String.format( "Fatal: Failed to connect for metadata request, not retrying for error %s", @@ -216,7 +217,9 @@ private void observeSyncStream() { successfulSync.set(true); } catch (StatusRuntimeException ex) { if (fatalStatusCodes.contains(ex.getStatus().getCode().toString()) && !successfulSync.get()) { - log.debug("Fatal status code during sync stream: {}, not retrying", ex.getStatus().getCode()); + log.debug( + "Fatal status code during sync stream: {}, not retrying", + ex.getStatus().getCode()); enqueueFatal(String.format( "Fatal: Failed to connect for metadata request, not retrying for error %s", ex.getStatus().getCode())); @@ -318,7 +321,9 @@ private static class SyncStreamObserver implements StreamObserver outgoingQueue, AtomicBoolean shouldThrottle, + public SyncStreamObserver( + BlockingQueue outgoingQueue, + AtomicBoolean shouldThrottle, List fatalStatusCodes) { this.outgoingQueue = outgoingQueue; this.shouldThrottle = shouldThrottle; diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/rpc/RpcResolver.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/rpc/RpcResolver.java index 15b5c8e3d..3376c06c9 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/rpc/RpcResolver.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/rpc/RpcResolver.java @@ -352,11 +352,17 @@ private void observeEventStream() throws InterruptedException { Throwable streamException = taken.getError(); if (streamException != null) { - if (streamException instanceof StatusRuntimeException && fatalStatusCodes.contains( - ((StatusRuntimeException) streamException).getStatus().getCode().name()) + if (streamException instanceof StatusRuntimeException + && fatalStatusCodes.contains(((StatusRuntimeException) streamException) + .getStatus() + .getCode() + .name()) && !successfulConnection.get()) { - log.debug("Fatal error code received: {}", - ((StatusRuntimeException) streamException).getStatus().getCode()); + log.debug( + "Fatal error code received: {}", + ((StatusRuntimeException) streamException) + .getStatus() + .getCode()); this.handleFatalError(); } else { log.debug( diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/Utils.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/Utils.java index a89f8560e..626105ce4 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/Utils.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/Utils.java @@ -41,8 +41,9 @@ public static Object convert(String value, String type) throws ClassNotFoundExce case "CacheType": return CacheType.valueOf(value.toUpperCase()).getValue(); case "StringList": - return value.isEmpty() ? List.of() : Arrays.stream(value.split(",")).map(String::trim).collect( - Collectors.toList()); + return value.isEmpty() + ? List.of() + : Arrays.stream(value.split(",")).map(String::trim).collect(Collectors.toList()); case "Object": return Value.objectToValue(new ObjectMapper().readValue(value, Object.class)); } From 02539d07b2fdec5c89af624481ae0d1b91766322 Mon Sep 17 00:00:00 2001 From: Guido Breitenhuber Date: Mon, 22 Dec 2025 17:07:12 +0100 Subject: [PATCH 13/17] feat(flagd): Communicate Fatal and shutdown connectors Signed-off-by: Guido Breitenhuber --- .../providers/flagd/FlagdProvider.java | 60 +++++++------- .../resolver/process/InProcessResolver.java | 30 ++++--- .../resolver/process/storage/FlagStore.java | 15 ++-- .../process/storage/StorageState.java | 6 +- .../storage/connector/QueuePayload.java | 6 ++ .../storage/connector/QueuePayloadType.java | 2 +- .../connector/sync/SyncStreamQueueSource.java | 82 ++++++++----------- .../flagd/resolver/rpc/RpcResolver.java | 17 ++-- .../process/InProcessResolverTest.java | 4 +- .../process/storage/FlagStoreTest.java | 6 +- 10 files changed, 115 insertions(+), 113 deletions(-) diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java index 7961c6cdc..8c11bb4dc 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java @@ -1,7 +1,6 @@ package dev.openfeature.contrib.providers.flagd; import dev.openfeature.contrib.providers.flagd.resolver.Resolver; -import dev.openfeature.contrib.providers.flagd.resolver.common.FlagdProviderEvent; import dev.openfeature.contrib.providers.flagd.resolver.process.InProcessResolver; import dev.openfeature.contrib.providers.flagd.resolver.rpc.RpcResolver; import dev.openfeature.contrib.providers.flagd.resolver.rpc.cache.Cache; @@ -193,8 +192,8 @@ EvaluationContext getEnrichedContext() { } @SuppressWarnings("checkstyle:fallthrough") - private void onProviderEvent(FlagdProviderEvent flagdProviderEvent) { - log.debug("FlagdProviderEvent event {} ", flagdProviderEvent.getEvent()); + private void onProviderEvent(ProviderEvent providerEvent, ProviderEventDetails providerEventDetails, Structure syncMetadata) { + log.debug("FlagdProviderEvent event {} ", providerEvent); synchronized (syncResources) { /* * We only use Error and Ready as previous states. @@ -205,10 +204,10 @@ private void onProviderEvent(FlagdProviderEvent flagdProviderEvent) { * forward a configuration changed to the ready, if we are not in the ready * state. */ - switch (flagdProviderEvent.getEvent()) { + switch (providerEvent) { case PROVIDER_CONFIGURATION_CHANGED: if (syncResources.getPreviousEvent() == ProviderEvent.PROVIDER_READY) { - onConfigurationChanged(flagdProviderEvent); + emit(providerEvent, providerEventDetails); break; } // intentional fall through @@ -217,40 +216,29 @@ private void onProviderEvent(FlagdProviderEvent flagdProviderEvent) { * Sync metadata is used to enrich the context, and is immutable in flagd, * so we only need it to be fetched once at READY. */ - if (flagdProviderEvent.getSyncMetadata() != null) { - syncResources.setEnrichedContext(contextEnricher.apply(flagdProviderEvent.getSyncMetadata())); + if (syncMetadata != null) { + syncResources.setEnrichedContext(contextEnricher.apply(syncMetadata)); } onReady(); syncResources.setPreviousEvent(ProviderEvent.PROVIDER_READY); break; - case PROVIDER_STALE: - if (syncResources.getPreviousEvent() != ProviderEvent.PROVIDER_STALE) { - onStale(); - syncResources.setPreviousEvent(ProviderEvent.PROVIDER_STALE); - } - break; case PROVIDER_ERROR: - onError(); + if (providerEventDetails != null && providerEventDetails.getErrorCode() == ErrorCode.PROVIDER_FATAL) { + onFatal(); + break; + } + + if (syncResources.getPreviousEvent() != ProviderEvent.PROVIDER_ERROR) { + onError(); + syncResources.setPreviousEvent(ProviderEvent.PROVIDER_ERROR); + } break; default: - log.warn("Unknown event {}", flagdProviderEvent.getEvent()); + log.warn("Unknown event {}", providerEvent); } } } - private void onError() { - this.emitProviderError(ProviderEventDetails.builder() - .errorCode(ErrorCode.PROVIDER_FATAL) - .build()); - } - - private void onConfigurationChanged(FlagdProviderEvent flagdProviderEvent) { - this.emitProviderConfigurationChanged(ProviderEventDetails.builder() - .flagsChanged(flagdProviderEvent.getFlagsChanged()) - .message("configuration changed") - .build()); - } - private void onReady() { if (syncResources.initialize()) { log.info("Initialized FlagdProvider"); @@ -263,7 +251,7 @@ private void onReady() { ProviderEventDetails.builder().message("connected to flagd").build()); } - private void onStale() { + private void onError() { log.debug( "Stream error. Emitting STALE, scheduling ERROR, and waiting {}s for connection to become available.", gracePeriod); @@ -278,7 +266,7 @@ private void onStale() { if (!errorExecutor.isShutdown()) { errorTask = errorExecutor.schedule( () -> { - if (syncResources.getPreviousEvent() == ProviderEvent.PROVIDER_STALE) { + if (syncResources.getPreviousEvent() == ProviderEvent.PROVIDER_ERROR) { log.error( "Provider did not reconnect successfully within {}s. Emitting ERROR event...", gracePeriod); @@ -292,4 +280,16 @@ private void onStale() { TimeUnit.SECONDS); } } + + private void onFatal() { + if (errorTask != null && !errorTask.isCancelled()) { + errorTask.cancel(false); + } + + this.emitProviderError(ProviderEventDetails.builder() + .errorCode(ErrorCode.PROVIDER_FATAL) + .build()); + + shutdown(); + } } diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolver.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolver.java index 9eb84b0a4..4b15108ef 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolver.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolver.java @@ -20,13 +20,17 @@ import dev.openfeature.sdk.ImmutableMetadata; import dev.openfeature.sdk.ProviderEvaluation; import dev.openfeature.sdk.ProviderEvent; +import dev.openfeature.sdk.ProviderEventDetails; import dev.openfeature.sdk.Reason; +import dev.openfeature.sdk.Structure; import dev.openfeature.sdk.Value; import dev.openfeature.sdk.exceptions.GeneralError; import dev.openfeature.sdk.exceptions.ParseError; import dev.openfeature.sdk.exceptions.TypeMismatchError; import java.util.Map; +import java.util.function.BiConsumer; import java.util.function.Consumer; +import dev.openfeature.sdk.internal.TriConsumer; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; @@ -38,7 +42,7 @@ @Slf4j public class InProcessResolver implements Resolver { private final Storage flagStore; - private final Consumer onConnectionEvent; + private final TriConsumer onConnectionEvent; private final Operator operator; private final String scope; private final QueueSource queueSource; @@ -52,7 +56,7 @@ public class InProcessResolver implements Resolver { * @param onConnectionEvent lambda which handles changes in the * connection/stream */ - public InProcessResolver(FlagdOptions options, Consumer onConnectionEvent) { + public InProcessResolver(FlagdOptions options, TriConsumer onConnectionEvent) { this.queueSource = getQueueSource(options); this.flagStore = new FlagStore(queueSource); this.onConnectionEvent = onConnectionEvent; @@ -73,17 +77,23 @@ public void init() throws Exception { switch (storageStateChange.getStorageState()) { case OK: log.debug("onConnectionEvent.accept ProviderEvent.PROVIDER_CONFIGURATION_CHANGED"); - onConnectionEvent.accept(new FlagdProviderEvent( - ProviderEvent.PROVIDER_CONFIGURATION_CHANGED, - storageStateChange.getChangedFlagsKeys(), - storageStateChange.getSyncMetadata())); + + var eventDetails = ProviderEventDetails.builder() + .flagsChanged(storageStateChange.getChangedFlagsKeys()) + .message("configuration changed") + .build(); + + onConnectionEvent.accept(ProviderEvent.PROVIDER_CONFIGURATION_CHANGED, eventDetails, storageStateChange.getSyncMetadata()); + log.debug("post onConnectionEvent.accept ProviderEvent.PROVIDER_CONFIGURATION_CHANGED"); break; - case STALE: - onConnectionEvent.accept(new FlagdProviderEvent(ProviderEvent.PROVIDER_STALE)); + case TRANSIENT_ERROR: + onConnectionEvent.accept(ProviderEvent.PROVIDER_ERROR, null, null); break; - case ERROR: - onConnectionEvent.accept(new FlagdProviderEvent(ProviderEvent.PROVIDER_ERROR)); + case FATAL_ERROR: + onConnectionEvent.accept(ProviderEvent.PROVIDER_ERROR, ProviderEventDetails.builder() + .errorCode(ErrorCode.PROVIDER_FATAL) + .build(), null); break; default: log.warn(String.format( diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStore.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStore.java index a01f93c23..8e86d1e9e 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStore.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStore.java @@ -132,19 +132,20 @@ private void streamerListener(final QueueSource connector) throws InterruptedExc } catch (Throwable e) { // catch all exceptions and avoid stream listener interruptions log.warn("Invalid flag sync payload from connector", e); - if (!stateBlockingQueue.offer(new StorageStateChange(StorageState.STALE))) { - log.warn("Failed to convey STALE status, queue is full"); + if (!stateBlockingQueue.offer(new StorageStateChange(StorageState.TRANSIENT_ERROR))) { + log.warn("Failed to convey TRANSIENT_ERROR status, queue is full"); } } break; case ERROR: - if (!stateBlockingQueue.offer(new StorageStateChange(StorageState.STALE))) { - log.warn("Failed to convey STALE status, queue is full"); + if (!stateBlockingQueue.offer(new StorageStateChange(StorageState.TRANSIENT_ERROR))) { + log.warn("Failed to convey TRANSIENT_ERROR status, queue is full"); } break; - case FATAL: - if (!stateBlockingQueue.offer(new StorageStateChange(StorageState.ERROR))) { - log.warn("Failed to convey ERROR status, queue is full"); + case SHUTDOWN: + shutdown(); + if (!stateBlockingQueue.offer(new StorageStateChange(StorageState.FATAL_ERROR))) { + log.warn("Failed to convey FATAL_ERROR status, queue is full"); } break; default: diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/StorageState.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/StorageState.java index d6b8b30c5..70d3a7277 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/StorageState.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/StorageState.java @@ -4,8 +4,8 @@ public enum StorageState { /** Storage is upto date and working as expected. */ OK, - /** Storage has gone stale(most recent sync failed). May get to OK status with next sync. */ - STALE, + /** Storage has gone stale (most recent sync failed). May get to OK status with next sync. */ + TRANSIENT_ERROR, /** Storage is in an unrecoverable error stage. */ - ERROR, + FATAL_ERROR, } diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/QueuePayload.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/QueuePayload.java index 071e51085..e7e79e976 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/QueuePayload.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/QueuePayload.java @@ -8,6 +8,9 @@ @AllArgsConstructor @Getter public class QueuePayload { + public static final QueuePayload ERROR = new QueuePayload(QueuePayloadType.ERROR); + public static final QueuePayload SHUTDOWN = new QueuePayload(QueuePayloadType.SHUTDOWN); + private final QueuePayloadType type; private final String flagData; private final Struct syncContext; @@ -15,4 +18,7 @@ public class QueuePayload { public QueuePayload(QueuePayloadType type, String flagData) { this(type, flagData, null); } + public QueuePayload(QueuePayloadType type) { + this(type, null, null); + } } diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/QueuePayloadType.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/QueuePayloadType.java index 74e02912e..d9d1c5479 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/QueuePayloadType.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/QueuePayloadType.java @@ -4,5 +4,5 @@ public enum QueuePayloadType { DATA, ERROR, - FATAL + SHUTDOWN } diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java index 4dfab4394..aebd652b0 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java @@ -39,7 +39,6 @@ public class SyncStreamQueueSource implements QueueSource { private final AtomicBoolean shutdown = new AtomicBoolean(false); private final AtomicBoolean shouldThrottle = new AtomicBoolean(false); - private final AtomicBoolean successfulSync = new AtomicBoolean(false); private final int streamDeadline; private final int deadline; private final int maxBackoffMs; @@ -167,6 +166,8 @@ public void shutdown() throws InterruptedException { log.debug("Shutdown already in progress or completed"); return; } + + enqueue(QueuePayload.SHUTDOWN); grpcComponents.channelConnector.shutdown(); } @@ -190,23 +191,20 @@ private void observeSyncStream() { } log.debug("Initializing sync stream request"); - SyncStreamObserver observer = new SyncStreamObserver(outgoingQueue, shouldThrottle, fatalStatusCodes); + SyncStreamObserver observer = new SyncStreamObserver(outgoingQueue); try { observer.metadata = getMetadata(); } catch (StatusRuntimeException metaEx) { - if (fatalStatusCodes.contains(metaEx.getStatus().getCode().name()) && !successfulSync.get()) { + if (fatalStatusCodes.contains(metaEx.getStatus().getCode().name())) { log.debug( "Fatal status code for metadata request: {}, not retrying", metaEx.getStatus().getCode()); - enqueueFatal(String.format( - "Fatal: Failed to connect for metadata request, not retrying for error %s", - metaEx.getStatus().getCode())); - return; + shutdown(); } else { // retry for other status codes String message = metaEx.getMessage(); log.debug("Metadata request error: {}, will restart", message, metaEx); - enqueueError(String.format("Error in getMetadata request: %s", message)); + enqueue(QueuePayload.ERROR); } shouldThrottle.set(true); continue; @@ -214,20 +212,17 @@ private void observeSyncStream() { try { syncFlags(observer); - successfulSync.set(true); + handleObserverError(observer); } catch (StatusRuntimeException ex) { - if (fatalStatusCodes.contains(ex.getStatus().getCode().toString()) && !successfulSync.get()) { + if (fatalStatusCodes.contains(ex.getStatus().getCode().toString())) { log.debug( "Fatal status code during sync stream: {}, not retrying", ex.getStatus().getCode()); - enqueueFatal(String.format( - "Fatal: Failed to connect for metadata request, not retrying for error %s", - ex.getStatus().getCode())); - return; + shutdown(); } else { // retry for other status codes log.error("Unexpected sync stream exception, will restart.", ex); - enqueueError(String.format("Error in syncStream: %s", ex.getMessage())); + enqueue(QueuePayload.ERROR); } shouldThrottle.set(true); } @@ -293,41 +288,40 @@ private void syncFlags(SyncStreamObserver streamObserver) { streamObserver.done.await(); } - private void enqueueError(String message) { - enqueueError(outgoingQueue, message); - } + private void handleObserverError(SyncStreamObserver observer) throws InterruptedException { + if (observer.throwable == null) { + return; + } - private static void enqueueError(BlockingQueue queue, String message) { - if (!queue.offer(new QueuePayload(QueuePayloadType.ERROR, message, null))) { - log.error("Failed to convey ERROR status, queue is full"); + Throwable throwable = observer.throwable; + Status status = Status.fromThrowable(throwable); + String message = throwable.getMessage(); + if (fatalStatusCodes.contains(status.getCode().name())) { + shutdown(); + } else { + log.debug("Stream error: {}, will restart", message, throwable); + enqueue(QueuePayload.ERROR); } - } - private void enqueueFatal(String message) { - enqueueFatal(outgoingQueue, message); + // Set throttling flag to ensure backoff before retry + this.shouldThrottle.set(true); } - private static void enqueueFatal(BlockingQueue queue, String message) { - if (!queue.offer(new QueuePayload(QueuePayloadType.FATAL, message, null))) { - log.error("Failed to convey FATAL status, queue is full"); + private void enqueue(QueuePayload queuePayload) { + if (!outgoingQueue.offer(queuePayload)) { + log.error("Failed to convey {} status, queue is full", queuePayload.getType()); } } private static class SyncStreamObserver implements StreamObserver { private final BlockingQueue outgoingQueue; - private final AtomicBoolean shouldThrottle; private final Awaitable done = new Awaitable(); - private final List fatalStatusCodes; private Struct metadata; + private Throwable throwable; - public SyncStreamObserver( - BlockingQueue outgoingQueue, - AtomicBoolean shouldThrottle, - List fatalStatusCodes) { + public SyncStreamObserver(BlockingQueue outgoingQueue) { this.outgoingQueue = outgoingQueue; - this.shouldThrottle = shouldThrottle; - this.fatalStatusCodes = fatalStatusCodes; } @Override @@ -344,21 +338,9 @@ public void onNext(SyncFlagsResponse syncFlagsResponse) { @Override public void onError(Throwable throwable) { - try { - Status status = Status.fromThrowable(throwable); - String message = throwable != null ? throwable.getMessage() : "unknown"; - log.debug("Stream error: {}, will restart", message, throwable); - if (fatalStatusCodes.contains(status.getCode().name())) { - enqueueFatal(outgoingQueue, String.format("Error from stream: %s", message)); - } else { - enqueueError(outgoingQueue, String.format("Error from stream: %s", message)); - } - - // Set throttling flag to ensure backoff before retry - this.shouldThrottle.set(true); - } finally { - done.wakeup(); - } + log.debug("Sync stream error received", throwable); + this.throwable = throwable; + done.wakeup(); } @Override diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/rpc/RpcResolver.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/rpc/RpcResolver.java index 3376c06c9..a5121a93b 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/rpc/RpcResolver.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/rpc/RpcResolver.java @@ -31,12 +31,15 @@ import dev.openfeature.sdk.ImmutableMetadata; import dev.openfeature.sdk.ProviderEvaluation; import dev.openfeature.sdk.ProviderEvent; +import dev.openfeature.sdk.ProviderEventDetails; +import dev.openfeature.sdk.Structure; import dev.openfeature.sdk.Value; import dev.openfeature.sdk.exceptions.FlagNotFoundError; import dev.openfeature.sdk.exceptions.GeneralError; import dev.openfeature.sdk.exceptions.OpenFeatureError; import dev.openfeature.sdk.exceptions.ParseError; import dev.openfeature.sdk.exceptions.TypeMismatchError; +import dev.openfeature.sdk.internal.TriConsumer; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.grpc.Status.Code; import io.grpc.StatusRuntimeException; @@ -66,7 +69,7 @@ public final class RpcResolver implements Resolver { private final ResolveStrategy strategy; private final FlagdOptions options; private final LinkedBlockingQueue> incomingQueue; - private final Consumer onProviderEvent; + private final TriConsumer onProviderEvent; private final ServiceStub stub; private final ServiceBlockingStub blockingStub; private final List fatalStatusCodes; @@ -81,7 +84,7 @@ public final class RpcResolver implements Resolver { * @param onProviderEvent lambda which handles changes in the connection/stream */ public RpcResolver( - final FlagdOptions options, final Cache cache, final Consumer onProviderEvent) { + final FlagdOptions options, final Cache cache, final TriConsumer onProviderEvent) { this.cache = cache; this.strategy = ResolveFactory.getStrategy(options); this.options = options; @@ -98,7 +101,7 @@ public RpcResolver( protected RpcResolver( final FlagdOptions options, final Cache cache, - final Consumer onProviderEvent, + final TriConsumer onProviderEvent, ServiceStub mockStub, ServiceBlockingStub mockBlockingStub, ChannelConnector connector) { @@ -414,7 +417,7 @@ private void handleConfigurationChangeEvent(EventStreamResponse value) { changedFlags.forEach(this.cache::remove); } - onProviderEvent.accept(new FlagdProviderEvent(ProviderEvent.PROVIDER_CONFIGURATION_CHANGED, changedFlags)); + onProviderEvent.accept(ProviderEvent.PROVIDER_CONFIGURATION_CHANGED, ProviderEventDetails.builder().flagsChanged(changedFlags).build(), null); } /** @@ -422,7 +425,7 @@ private void handleConfigurationChangeEvent(EventStreamResponse value) { */ private void handleProviderReadyEvent() { log.debug("Emitting provider ready event"); - onProviderEvent.accept(new FlagdProviderEvent(ProviderEvent.PROVIDER_READY)); + onProviderEvent.accept(ProviderEvent.PROVIDER_READY, null, null); } /** @@ -432,7 +435,7 @@ private void handleErrorOrComplete() { log.debug("Emitting provider stale event"); // complete is an error, logically...even if the server went down gracefully we need to reconnect. - onProviderEvent.accept(new FlagdProviderEvent(ProviderEvent.PROVIDER_STALE)); + onProviderEvent.accept(ProviderEvent.PROVIDER_STALE, null, null); } /** @@ -442,6 +445,6 @@ private void handleErrorOrComplete() { private void handleFatalError() { log.debug("Emitting provider error event"); - onProviderEvent.accept(new FlagdProviderEvent(ProviderEvent.PROVIDER_ERROR)); + onProviderEvent.accept(ProviderEvent.PROVIDER_ERROR, null, null); } } diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolverTest.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolverTest.java index 34c660702..37f101876 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolverTest.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolverTest.java @@ -112,7 +112,7 @@ void eventHandling() throws Throwable { InProcessResolver inProcessResolver = getInProcessResolverWith( new MockStorage(new HashMap<>(), sender), connectionEvent -> receiver.offer(new StorageStateChange( - connectionEvent.isDisconnected() ? StorageState.ERROR : StorageState.OK, + connectionEvent.isDisconnected() ? StorageState.FATAL_ERROR : StorageState.OK, connectionEvent.getFlagsChanged(), connectionEvent.getSyncMetadata()))); @@ -130,7 +130,7 @@ void eventHandling() throws Throwable { TimeUnit.MILLISECONDS)) { Assertions.fail("failed to send the event"); } - if (!sender.offer(new StorageStateChange(StorageState.ERROR), 100, TimeUnit.MILLISECONDS)) { + if (!sender.offer(new StorageStateChange(StorageState.FATAL_ERROR), 100, TimeUnit.MILLISECONDS)) { Assertions.fail("failed to send the event"); } diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStoreTest.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStoreTest.java index e58b4eb3f..eb609e4f3 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStoreTest.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStoreTest.java @@ -46,7 +46,7 @@ void connectorHandling() throws Exception { }); assertTimeoutPreemptively(Duration.ofMillis(maxDelay), () -> { - assertEquals(StorageState.STALE, states.take().getStorageState()); + assertEquals(StorageState.TRANSIENT_ERROR, states.take().getStorageState()); }); // OK again for next payload @@ -64,14 +64,14 @@ void connectorHandling() throws Exception { }); assertTimeoutPreemptively(Duration.ofMillis(maxDelay), () -> { - assertEquals(StorageState.STALE, states.take().getStorageState()); + assertEquals(StorageState.TRANSIENT_ERROR, states.take().getStorageState()); }); // Shutdown handling store.shutdown(); assertTimeoutPreemptively(Duration.ofMillis(maxDelay), () -> { - assertEquals(StorageState.STALE, states.take().getStorageState()); + assertEquals(StorageState.TRANSIENT_ERROR, states.take().getStorageState()); }); } From 4e321254729723ec862f6072a420e4b1f88d21cb Mon Sep 17 00:00:00 2001 From: Todd Baert Date: Mon, 22 Dec 2025 13:41:16 -0500 Subject: [PATCH 14/17] fixup: update tests Signed-off-by: Todd Baert --- .../providers/flagd/FlagdProvider.java | 6 +- .../resolver/process/InProcessResolver.java | 22 +++--- .../storage/connector/QueuePayload.java | 1 + .../flagd/resolver/rpc/RpcResolver.java | 11 ++- .../providers/flagd/FlagdProviderTest.java | 29 ++++--- .../process/InProcessResolverTest.java | 78 ++++++++++--------- .../flagd/resolver/rpc/RpcResolverTest.java | 27 ++++--- 7 files changed, 95 insertions(+), 79 deletions(-) diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java index 8c11bb4dc..150d2de0e 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java @@ -192,7 +192,8 @@ EvaluationContext getEnrichedContext() { } @SuppressWarnings("checkstyle:fallthrough") - private void onProviderEvent(ProviderEvent providerEvent, ProviderEventDetails providerEventDetails, Structure syncMetadata) { + private void onProviderEvent( + ProviderEvent providerEvent, ProviderEventDetails providerEventDetails, Structure syncMetadata) { log.debug("FlagdProviderEvent event {} ", providerEvent); synchronized (syncResources) { /* @@ -223,7 +224,8 @@ private void onProviderEvent(ProviderEvent providerEvent, ProviderEventDetails p syncResources.setPreviousEvent(ProviderEvent.PROVIDER_READY); break; case PROVIDER_ERROR: - if (providerEventDetails != null && providerEventDetails.getErrorCode() == ErrorCode.PROVIDER_FATAL) { + if (providerEventDetails != null + && providerEventDetails.getErrorCode() == ErrorCode.PROVIDER_FATAL) { onFatal(); break; } diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolver.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolver.java index 4b15108ef..a57782c4f 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolver.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolver.java @@ -4,7 +4,6 @@ import dev.openfeature.contrib.providers.flagd.FlagdOptions; import dev.openfeature.contrib.providers.flagd.resolver.Resolver; -import dev.openfeature.contrib.providers.flagd.resolver.common.FlagdProviderEvent; import dev.openfeature.contrib.providers.flagd.resolver.process.model.FeatureFlag; import dev.openfeature.contrib.providers.flagd.resolver.process.storage.FlagStore; import dev.openfeature.contrib.providers.flagd.resolver.process.storage.Storage; @@ -27,10 +26,8 @@ import dev.openfeature.sdk.exceptions.GeneralError; import dev.openfeature.sdk.exceptions.ParseError; import dev.openfeature.sdk.exceptions.TypeMismatchError; -import java.util.Map; -import java.util.function.BiConsumer; -import java.util.function.Consumer; import dev.openfeature.sdk.internal.TriConsumer; +import java.util.Map; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; @@ -56,7 +53,8 @@ public class InProcessResolver implements Resolver { * @param onConnectionEvent lambda which handles changes in the * connection/stream */ - public InProcessResolver(FlagdOptions options, TriConsumer onConnectionEvent) { + public InProcessResolver( + FlagdOptions options, TriConsumer onConnectionEvent) { this.queueSource = getQueueSource(options); this.flagStore = new FlagStore(queueSource); this.onConnectionEvent = onConnectionEvent; @@ -83,7 +81,10 @@ public void init() throws Exception { .message("configuration changed") .build(); - onConnectionEvent.accept(ProviderEvent.PROVIDER_CONFIGURATION_CHANGED, eventDetails, storageStateChange.getSyncMetadata()); + onConnectionEvent.accept( + ProviderEvent.PROVIDER_CONFIGURATION_CHANGED, + eventDetails, + storageStateChange.getSyncMetadata()); log.debug("post onConnectionEvent.accept ProviderEvent.PROVIDER_CONFIGURATION_CHANGED"); break; @@ -91,9 +92,12 @@ public void init() throws Exception { onConnectionEvent.accept(ProviderEvent.PROVIDER_ERROR, null, null); break; case FATAL_ERROR: - onConnectionEvent.accept(ProviderEvent.PROVIDER_ERROR, ProviderEventDetails.builder() - .errorCode(ErrorCode.PROVIDER_FATAL) - .build(), null); + onConnectionEvent.accept( + ProviderEvent.PROVIDER_ERROR, + ProviderEventDetails.builder() + .errorCode(ErrorCode.PROVIDER_FATAL) + .build(), + null); break; default: log.warn(String.format( diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/QueuePayload.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/QueuePayload.java index e7e79e976..c31e5bd1b 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/QueuePayload.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/QueuePayload.java @@ -18,6 +18,7 @@ public class QueuePayload { public QueuePayload(QueuePayloadType type, String flagData) { this(type, flagData, null); } + public QueuePayload(QueuePayloadType type) { this(type, null, null); } diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/rpc/RpcResolver.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/rpc/RpcResolver.java index a5121a93b..46e196f1b 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/rpc/RpcResolver.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/rpc/RpcResolver.java @@ -11,7 +11,6 @@ import dev.openfeature.contrib.providers.flagd.resolver.Resolver; import dev.openfeature.contrib.providers.flagd.resolver.common.ChannelBuilder; import dev.openfeature.contrib.providers.flagd.resolver.common.ChannelConnector; -import dev.openfeature.contrib.providers.flagd.resolver.common.FlagdProviderEvent; import dev.openfeature.contrib.providers.flagd.resolver.common.QueueingStreamObserver; import dev.openfeature.contrib.providers.flagd.resolver.common.StreamResponseModel; import dev.openfeature.contrib.providers.flagd.resolver.rpc.cache.Cache; @@ -49,7 +48,6 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Consumer; import java.util.function.Function; import lombok.extern.slf4j.Slf4j; @@ -84,7 +82,9 @@ public final class RpcResolver implements Resolver { * @param onProviderEvent lambda which handles changes in the connection/stream */ public RpcResolver( - final FlagdOptions options, final Cache cache, final TriConsumer onProviderEvent) { + final FlagdOptions options, + final Cache cache, + final TriConsumer onProviderEvent) { this.cache = cache; this.strategy = ResolveFactory.getStrategy(options); this.options = options; @@ -417,7 +417,10 @@ private void handleConfigurationChangeEvent(EventStreamResponse value) { changedFlags.forEach(this.cache::remove); } - onProviderEvent.accept(ProviderEvent.PROVIDER_CONFIGURATION_CHANGED, ProviderEventDetails.builder().flagsChanged(changedFlags).build(), null); + onProviderEvent.accept( + ProviderEvent.PROVIDER_CONFIGURATION_CHANGED, + ProviderEventDetails.builder().flagsChanged(changedFlags).build(), + null); } /** diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/FlagdProviderTest.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/FlagdProviderTest.java index 115887002..2af572fe5 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/FlagdProviderTest.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/FlagdProviderTest.java @@ -19,7 +19,6 @@ import com.google.protobuf.Struct; import dev.openfeature.contrib.providers.flagd.resolver.Resolver; import dev.openfeature.contrib.providers.flagd.resolver.common.ChannelConnector; -import dev.openfeature.contrib.providers.flagd.resolver.common.FlagdProviderEvent; import dev.openfeature.contrib.providers.flagd.resolver.process.InProcessResolver; import dev.openfeature.contrib.providers.flagd.resolver.process.MockStorage; import dev.openfeature.contrib.providers.flagd.resolver.process.model.FeatureFlag; @@ -45,9 +44,11 @@ import dev.openfeature.sdk.MutableStructure; import dev.openfeature.sdk.OpenFeatureAPI; import dev.openfeature.sdk.ProviderEvent; +import dev.openfeature.sdk.ProviderEventDetails; import dev.openfeature.sdk.Reason; import dev.openfeature.sdk.Structure; import dev.openfeature.sdk.Value; +import dev.openfeature.sdk.internal.TriConsumer; import java.lang.reflect.Field; import java.lang.reflect.Method; import java.util.ArrayList; @@ -59,7 +60,6 @@ import java.util.Optional; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; -import java.util.function.Consumer; import java.util.function.Function; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -556,11 +556,12 @@ void initializationAndShutdown() throws Exception { flagResolver.setAccessible(true); flagResolver.set(provider, resolverMock); - Method onProviderEvent = FlagdProvider.class.getDeclaredMethod("onProviderEvent", FlagdProviderEvent.class); + Method onProviderEvent = FlagdProvider.class.getDeclaredMethod( + "onProviderEvent", ProviderEvent.class, ProviderEventDetails.class, Structure.class); onProviderEvent.setAccessible(true); doAnswer((i) -> { - onProviderEvent.invoke(provider, new FlagdProviderEvent(ProviderEvent.PROVIDER_READY)); + onProviderEvent.invoke(provider, ProviderEvent.PROVIDER_READY, null, null); return null; }) .when(resolverMock) @@ -596,17 +597,16 @@ void contextEnrichment() throws Exception { // mock a resolver try (MockedConstruction mockResolver = mockConstruction(InProcessResolver.class, (mock, context) -> { - Consumer onConnectionEvent; + TriConsumer onConnectionEvent; // get a reference to the onConnectionEvent callback - onConnectionEvent = - (Consumer) context.arguments().get(1); + onConnectionEvent = (TriConsumer) + context.arguments().get(1); // when our mock resolver initializes, it runs the passed onConnectionEvent // callback doAnswer(invocation -> { - onConnectionEvent.accept( - new FlagdProviderEvent(ProviderEvent.PROVIDER_READY, metadata)); + onConnectionEvent.accept(ProviderEvent.PROVIDER_READY, null, metadata); return null; }) .when(mock) @@ -637,17 +637,16 @@ void updatesSyncMetadataWithCallback() throws Exception { // mock a resolver try (MockedConstruction mockResolver = mockConstruction(InProcessResolver.class, (mock, context) -> { - Consumer onConnectionEvent; + TriConsumer onConnectionEvent; // get a reference to the onConnectionEvent callback - onConnectionEvent = - (Consumer) context.arguments().get(1); + onConnectionEvent = (TriConsumer) + context.arguments().get(1); // when our mock resolver initializes, it runs the passed onConnectionEvent // callback doAnswer(invocation -> { - onConnectionEvent.accept( - new FlagdProviderEvent(ProviderEvent.PROVIDER_READY, metadata)); + onConnectionEvent.accept(ProviderEvent.PROVIDER_READY, null, metadata); return null; }) .when(mock) @@ -690,7 +689,7 @@ private FlagdProvider createProvider(ChannelConnector connector, ServiceBlocking private FlagdProvider createProvider( ChannelConnector connector, Cache cache, ServiceStub mockStub, ServiceBlockingStub mockBlockingStub) { final FlagdOptions flagdOptions = FlagdOptions.builder().build(); - final RpcResolver grpcResolver = new RpcResolver(flagdOptions, cache, (connectionEvent) -> {}); + final RpcResolver grpcResolver = new RpcResolver(flagdOptions, cache, (event, details, metadata) -> {}); try { Field resolver = RpcResolver.class.getDeclaredField("connector"); diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolverTest.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolverTest.java index 37f101876..e77bdfb2d 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolverTest.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolverTest.java @@ -23,7 +23,6 @@ import dev.openfeature.contrib.providers.flagd.Config; import dev.openfeature.contrib.providers.flagd.FlagdOptions; -import dev.openfeature.contrib.providers.flagd.resolver.common.FlagdProviderEvent; import dev.openfeature.contrib.providers.flagd.resolver.process.model.FeatureFlag; import dev.openfeature.contrib.providers.flagd.resolver.process.storage.MockConnector; import dev.openfeature.contrib.providers.flagd.resolver.process.storage.StorageState; @@ -36,11 +35,15 @@ import dev.openfeature.sdk.MutableContext; import dev.openfeature.sdk.MutableStructure; import dev.openfeature.sdk.ProviderEvaluation; +import dev.openfeature.sdk.ProviderEvent; +import dev.openfeature.sdk.ProviderEventDetails; import dev.openfeature.sdk.Reason; +import dev.openfeature.sdk.Structure; import dev.openfeature.sdk.Value; import dev.openfeature.sdk.exceptions.GeneralError; import dev.openfeature.sdk.exceptions.ParseError; import dev.openfeature.sdk.exceptions.TypeMismatchError; +import dev.openfeature.sdk.internal.TriConsumer; import java.lang.reflect.Field; import java.time.Duration; import java.util.Collections; @@ -49,7 +52,6 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; -import java.util.function.Consumer; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -59,7 +61,7 @@ void onError_delegatesToQueueSource() throws Exception { // given FlagdOptions options = FlagdOptions.builder().build(); // option value doesn't matter here SyncStreamQueueSource mockConnector = mock(SyncStreamQueueSource.class); - InProcessResolver resolver = new InProcessResolver(options, e -> {}); + InProcessResolver resolver = new InProcessResolver(options, (event, details, metadata) -> {}); // Inject mock connector java.lang.reflect.Field queueSourceField = InProcessResolver.class.getDeclaredField("queueSource"); @@ -109,12 +111,15 @@ void eventHandling() throws Throwable { final MutableStructure syncMetadata = new MutableStructure(); syncMetadata.add(key, val); - InProcessResolver inProcessResolver = getInProcessResolverWith( - new MockStorage(new HashMap<>(), sender), - connectionEvent -> receiver.offer(new StorageStateChange( - connectionEvent.isDisconnected() ? StorageState.FATAL_ERROR : StorageState.OK, - connectionEvent.getFlagsChanged(), - connectionEvent.getSyncMetadata()))); + InProcessResolver inProcessResolver = + getInProcessResolverWith(new MockStorage(new HashMap<>(), sender), (event, details, metadata) -> { + boolean isDisconnected = + event == ProviderEvent.PROVIDER_ERROR || event == ProviderEvent.PROVIDER_STALE; + receiver.offer(new StorageStateChange( + isDisconnected ? StorageState.FATAL_ERROR : StorageState.OK, + details != null ? details.getFlagsChanged() : Collections.emptyList(), + metadata)); + }); // when - init and emit events Thread initThread = new Thread(() -> { @@ -149,7 +154,7 @@ public void simpleBooleanResolving() throws Exception { flagMap.put("booleanFlag", BOOLEAN_FLAG); InProcessResolver inProcessResolver = - getInProcessResolverWith(new MockStorage(flagMap), (connectionEvent) -> {}); + getInProcessResolverWith(new MockStorage(flagMap), (event, details, metadata) -> {}); // when ProviderEvaluation providerEvaluation = @@ -168,7 +173,7 @@ public void simpleDoubleResolving() throws Exception { flagMap.put("doubleFlag", DOUBLE_FLAG); InProcessResolver inProcessResolver = - getInProcessResolverWith(new MockStorage(flagMap), (connectionEvent) -> {}); + getInProcessResolverWith(new MockStorage(flagMap), (event, details, metadata) -> {}); // when ProviderEvaluation providerEvaluation = @@ -187,7 +192,7 @@ public void fetchIntegerAsDouble() throws Exception { flagMap.put("doubleFlag", DOUBLE_FLAG); InProcessResolver inProcessResolver = - getInProcessResolverWith(new MockStorage(flagMap), (connectionEvent) -> {}); + getInProcessResolverWith(new MockStorage(flagMap), (event, details, metadata) -> {}); // when ProviderEvaluation providerEvaluation = @@ -206,7 +211,7 @@ public void fetchDoubleAsInt() throws Exception { flagMap.put("integerFlag", INT_FLAG); InProcessResolver inProcessResolver = - getInProcessResolverWith(new MockStorage(flagMap), (connectionEvent) -> {}); + getInProcessResolverWith(new MockStorage(flagMap), (event, details, metadata) -> {}); // when ProviderEvaluation providerEvaluation = @@ -225,7 +230,7 @@ public void simpleIntResolving() throws Exception { flagMap.put("integerFlag", INT_FLAG); InProcessResolver inProcessResolver = - getInProcessResolverWith(new MockStorage(flagMap), (connectionEvent) -> {}); + getInProcessResolverWith(new MockStorage(flagMap), (event, details, metadata) -> {}); // when ProviderEvaluation providerEvaluation = @@ -244,7 +249,7 @@ public void simpleObjectResolving() throws Exception { flagMap.put("objectFlag", OBJECT_FLAG); InProcessResolver inProcessResolver = - getInProcessResolverWith(new MockStorage(flagMap), (connectionEvent) -> {}); + getInProcessResolverWith(new MockStorage(flagMap), (event, details, metadata) -> {}); Map typeDefault = new HashMap<>(); typeDefault.put("key", "0164"); @@ -270,7 +275,7 @@ public void missingFlag() throws Exception { final Map flagMap = new HashMap<>(); InProcessResolver inProcessResolver = - getInProcessResolverWith(new MockStorage(flagMap), (connectionEvent) -> {}); + getInProcessResolverWith(new MockStorage(flagMap), (event, details, metadata) -> {}); // when/then ProviderEvaluation missingFlag = @@ -285,7 +290,7 @@ public void disabledFlag() throws Exception { flagMap.put("disabledFlag", DISABLED_FLAG); InProcessResolver inProcessResolver = - getInProcessResolverWith(new MockStorage(flagMap), (connectionEvent) -> {}); + getInProcessResolverWith(new MockStorage(flagMap), (event, details, metadata) -> {}); // when/then ProviderEvaluation disabledFlag = @@ -300,7 +305,7 @@ public void variantMismatchFlag() throws Exception { flagMap.put("mismatchFlag", VARIANT_MISMATCH_FLAG); InProcessResolver inProcessResolver = - getInProcessResolverWith(new MockStorage(flagMap), (connectionEvent) -> {}); + getInProcessResolverWith(new MockStorage(flagMap), (event, details, metadata) -> {}); // when/then assertThrows(GeneralError.class, () -> { @@ -315,7 +320,7 @@ public void typeMismatchEvaluation() throws Exception { flagMap.put("booleanFlag", BOOLEAN_FLAG); InProcessResolver inProcessResolver = - getInProcessResolverWith(new MockStorage(flagMap), (connectionEvent) -> {}); + getInProcessResolverWith(new MockStorage(flagMap), (event, details, metadata) -> {}); // when/then assertThrows(TypeMismatchError.class, () -> { @@ -330,7 +335,7 @@ public void booleanShorthandEvaluation() throws Exception { flagMap.put("shorthand", FLAG_WIH_SHORTHAND_TARGETING); InProcessResolver inProcessResolver = - getInProcessResolverWith(new MockStorage(flagMap), (connectionEvent) -> {}); + getInProcessResolverWith(new MockStorage(flagMap), (event, details, metadata) -> {}); ProviderEvaluation providerEvaluation = inProcessResolver.booleanEvaluation("shorthand", false, new ImmutableContext()); @@ -348,7 +353,7 @@ public void targetingMatchedEvaluationFlag() throws Exception { flagMap.put("stringFlag", FLAG_WIH_IF_IN_TARGET); InProcessResolver inProcessResolver = - getInProcessResolverWith(new MockStorage(flagMap), (connectionEvent) -> {}); + getInProcessResolverWith(new MockStorage(flagMap), (event, details, metadata) -> {}); // when ProviderEvaluation providerEvaluation = inProcessResolver.stringEvaluation( @@ -367,7 +372,7 @@ public void targetingUnmatchedEvaluationFlag() throws Exception { flagMap.put("stringFlag", FLAG_WIH_IF_IN_TARGET); InProcessResolver inProcessResolver = - getInProcessResolverWith(new MockStorage(flagMap), (connectionEvent) -> {}); + getInProcessResolverWith(new MockStorage(flagMap), (event, details, metadata) -> {}); // when ProviderEvaluation providerEvaluation = inProcessResolver.stringEvaluation( @@ -386,7 +391,7 @@ public void explicitTargetingKeyHandling() throws NoSuchFieldException, IllegalA flagMap.put("stringFlag", FLAG_WITH_TARGETING_KEY); InProcessResolver inProcessResolver = - getInProcessResolverWith(new MockStorage(flagMap), (connectionEvent) -> {}); + getInProcessResolverWith(new MockStorage(flagMap), (event, details, metadata) -> {}); // when ProviderEvaluation providerEvaluation = @@ -405,7 +410,7 @@ public void targetingErrorEvaluationFlag() throws Exception { flagMap.put("targetingErrorFlag", FLAG_WIH_INVALID_TARGET); InProcessResolver inProcessResolver = - getInProcessResolverWith(new MockStorage(flagMap), (connectionEvent) -> {}); + getInProcessResolverWith(new MockStorage(flagMap), (event, details, metadata) -> {}); // when/then assertThrows(ParseError.class, () -> { @@ -440,7 +445,7 @@ void selectorIsAddedToFlagMetadata() throws Exception { flagMap.put("flag", INT_FLAG); InProcessResolver inProcessResolver = - getInProcessResolverWith(new MockStorage(flagMap), connectionEvent -> {}, "selector"); + getInProcessResolverWith(new MockStorage(flagMap), (event, details, metadata) -> {}, "selector"); // when ProviderEvaluation providerEvaluation = @@ -460,7 +465,7 @@ void selectorIsOverwrittenByFlagMetadata() throws Exception { flagMap.put("flag", new FeatureFlag("stage", "loop", stringVariants, "", flagMetadata)); InProcessResolver inProcessResolver = - getInProcessResolverWith(new MockStorage(flagMap), connectionEvent -> {}, "selector"); + getInProcessResolverWith(new MockStorage(flagMap), (event, details, metadata) -> {}, "selector"); // when ProviderEvaluation providerEvaluation = @@ -481,8 +486,8 @@ void flagSetMetadataIsAddedToEvaluation() throws Exception { final Map flagSetMetadata = new HashMap<>(); flagSetMetadata.put("flagSetMetadata", "metadata"); - InProcessResolver inProcessResolver = - getInProcessResolverWith(new MockStorage(flagMap, flagSetMetadata), connectionEvent -> {}, "selector"); + InProcessResolver inProcessResolver = getInProcessResolverWith( + new MockStorage(flagMap, flagSetMetadata), (event, details, metadata) -> {}, "selector"); // when ProviderEvaluation providerEvaluation = @@ -502,8 +507,8 @@ void flagSetMetadataIsAddedToFailingEvaluation() throws Exception { final Map flagSetMetadata = new HashMap<>(); flagSetMetadata.put("flagSetMetadata", "metadata"); - InProcessResolver inProcessResolver = - getInProcessResolverWith(new MockStorage(flagMap, flagSetMetadata), connectionEvent -> {}, "selector"); + InProcessResolver inProcessResolver = getInProcessResolverWith( + new MockStorage(flagMap, flagSetMetadata), (event, details, metadata) -> {}, "selector"); // when ProviderEvaluation providerEvaluation = @@ -526,8 +531,8 @@ void flagSetMetadataIsOverwrittenByFlagMetadataToEvaluation() throws Exception { final Map flagSetMetadata = new HashMap<>(); flagSetMetadata.put("key", "unexpected"); - InProcessResolver inProcessResolver = - getInProcessResolverWith(new MockStorage(flagMap, flagSetMetadata), connectionEvent -> {}, "selector"); + InProcessResolver inProcessResolver = getInProcessResolverWith( + new MockStorage(flagMap, flagSetMetadata), (event, details, metadata) -> {}, "selector"); // when ProviderEvaluation providerEvaluation = @@ -541,12 +546,13 @@ void flagSetMetadataIsOverwrittenByFlagMetadataToEvaluation() throws Exception { private InProcessResolver getInProcessResolverWith(final FlagdOptions options, final MockStorage storage) throws NoSuchFieldException, IllegalAccessException { - final InProcessResolver resolver = new InProcessResolver(options, connectionEvent -> {}); + final InProcessResolver resolver = new InProcessResolver(options, (event, details, metadata) -> {}); return injectFlagStore(resolver, storage); } private InProcessResolver getInProcessResolverWith( - final MockStorage storage, final Consumer onConnectionEvent) + final MockStorage storage, + final TriConsumer onConnectionEvent) throws NoSuchFieldException, IllegalAccessException { final InProcessResolver resolver = @@ -555,7 +561,9 @@ private InProcessResolver getInProcessResolverWith( } private InProcessResolver getInProcessResolverWith( - final MockStorage storage, final Consumer onConnectionEvent, String selector) + final MockStorage storage, + final TriConsumer onConnectionEvent, + String selector) throws NoSuchFieldException, IllegalAccessException { final InProcessResolver resolver = new InProcessResolver( diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/rpc/RpcResolverTest.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/rpc/RpcResolverTest.java index 0ed2b8e3c..188beac76 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/rpc/RpcResolverTest.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/rpc/RpcResolverTest.java @@ -3,7 +3,7 @@ import static org.awaitility.Awaitility.await; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; @@ -13,14 +13,15 @@ import dev.openfeature.contrib.providers.flagd.FlagdOptions; import dev.openfeature.contrib.providers.flagd.resolver.common.ChannelConnector; -import dev.openfeature.contrib.providers.flagd.resolver.common.FlagdProviderEvent; import dev.openfeature.contrib.providers.flagd.resolver.common.QueueingStreamObserver; import dev.openfeature.flagd.grpc.evaluation.Evaluation.EventStreamResponse; import dev.openfeature.flagd.grpc.evaluation.ServiceGrpc.ServiceBlockingStub; import dev.openfeature.flagd.grpc.evaluation.ServiceGrpc.ServiceStub; import dev.openfeature.sdk.ProviderEvent; +import dev.openfeature.sdk.ProviderEventDetails; +import dev.openfeature.sdk.Structure; +import dev.openfeature.sdk.internal.TriConsumer; import java.util.concurrent.CountDownLatch; -import java.util.function.Consumer; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.invocation.InvocationOnMock; @@ -31,15 +32,16 @@ class RpcResolverTest { private ServiceBlockingStub blockingStub; private ServiceStub stub; private QueueingStreamObserver observer; - private Consumer consumer; + private TriConsumer consumer; private CountDownLatch latch; // used to wait for observer to be initialized + @SuppressWarnings("unchecked") @BeforeEach public void init() throws Exception { latch = new CountDownLatch(1); observer = null; - consumer = mock(Consumer.class); - doNothing().when(consumer).accept(any()); + consumer = mock(TriConsumer.class); + doNothing().when(consumer).accept(any(), any(), any()); blockingStub = mock(ServiceBlockingStub.class); @@ -74,8 +76,7 @@ void onNextWithReadyRunsConsumerWithReady() throws Exception { .build()); // should run consumer with payload - await().untilAsserted(() -> - verify(consumer).accept(argThat((arg) -> arg.getEvent() == ProviderEvent.PROVIDER_READY))); + await().untilAsserted(() -> verify(consumer).accept(eq(ProviderEvent.PROVIDER_READY), any(), any())); // should NOT have restarted the stream (1 call) verify(stub, times(1)).eventStream(any(), any()); } @@ -95,8 +96,8 @@ void onNextWithChangedRunsConsumerWithChanged() throws Exception { // should run consumer with payload verify(stub, times(1)).eventStream(any(), any()); // should have restarted the stream (2 calls) - await().untilAsserted(() -> verify(consumer) - .accept(argThat((arg) -> arg.getEvent() == ProviderEvent.PROVIDER_CONFIGURATION_CHANGED))); + await().untilAsserted( + () -> verify(consumer).accept(eq(ProviderEvent.PROVIDER_CONFIGURATION_CHANGED), any(), any())); } @Test @@ -110,8 +111,7 @@ void onCompletedRerunsStreamWithError() throws Exception { observer.onCompleted(); // should run consumer with error - await().untilAsserted(() -> - verify(consumer).accept(argThat((arg) -> arg.getEvent() == ProviderEvent.PROVIDER_STALE))); + await().untilAsserted(() -> verify(consumer).accept(eq(ProviderEvent.PROVIDER_STALE), any(), any())); // should have restarted the stream (2 calls) await().untilAsserted(() -> verify(stub, times(2)).eventStream(any(), any())); } @@ -127,8 +127,7 @@ void onErrorRunsConsumerWithError() throws Exception { observer.onError(new Exception("fake error")); // should run consumer with error - await().untilAsserted(() -> - verify(consumer).accept(argThat((arg) -> arg.getEvent() == ProviderEvent.PROVIDER_STALE))); + await().untilAsserted(() -> verify(consumer).accept(eq(ProviderEvent.PROVIDER_STALE), any(), any())); // should have restarted the stream (2 calls) await().untilAsserted(() -> verify(stub, times(2)).eventStream(any(), any())); } From 701069ac373f64771adca9c119d89ac1fb5d6d0a Mon Sep 17 00:00:00 2001 From: Todd Baert Date: Mon, 22 Dec 2025 14:05:34 -0500 Subject: [PATCH 15/17] fixup: revert rpc test expectations Signed-off-by: Todd Baert --- .../flagd/resolver/rpc/RpcResolver.java | 25 +++++++------------ .../flagd/resolver/rpc/RpcResolverTest.java | 4 +-- 2 files changed, 11 insertions(+), 18 deletions(-) diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/rpc/RpcResolver.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/rpc/RpcResolver.java index 46e196f1b..b09634088 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/rpc/RpcResolver.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/rpc/RpcResolver.java @@ -26,6 +26,7 @@ import dev.openfeature.flagd.grpc.evaluation.ServiceGrpc; import dev.openfeature.flagd.grpc.evaluation.ServiceGrpc.ServiceBlockingStub; import dev.openfeature.flagd.grpc.evaluation.ServiceGrpc.ServiceStub; +import dev.openfeature.sdk.ErrorCode; import dev.openfeature.sdk.EvaluationContext; import dev.openfeature.sdk.ImmutableMetadata; import dev.openfeature.sdk.ProviderEvaluation; @@ -348,7 +349,7 @@ private void observeEventStream() throws InterruptedException { final StreamResponseModel taken = incomingQueue.take(); if (taken.isComplete()) { log.debug("Event stream completed, will reconnect"); - this.handleErrorOrComplete(); + this.handleErrorOrComplete(false); // The stream is complete, we still try to reconnect break; } @@ -366,12 +367,12 @@ private void observeEventStream() throws InterruptedException { ((StatusRuntimeException) streamException) .getStatus() .getCode()); - this.handleFatalError(); + this.handleErrorOrComplete(true); } else { log.debug( "Exception in event stream connection, streamException {}, will reconnect", streamException); - this.handleErrorOrComplete(); + this.handleErrorOrComplete(false); } break; } @@ -434,20 +435,12 @@ private void handleProviderReadyEvent() { /** * Handles provider error events by clearing the cache (if enabled) and notifying listeners of the error. */ - private void handleErrorOrComplete() { - log.debug("Emitting provider stale event"); - - // complete is an error, logically...even if the server went down gracefully we need to reconnect. - onProviderEvent.accept(ProviderEvent.PROVIDER_STALE, null, null); - } - - /** - * Handles fatal error events (i.e. error codes defined in fatalStatusCodes) by transitioning the provider into - * fatal state - */ - private void handleFatalError() { + private void handleErrorOrComplete(boolean fatal) { log.debug("Emitting provider error event"); + ErrorCode errorCode = fatal ? ErrorCode.PROVIDER_FATAL : ErrorCode.GENERAL; + var details = ProviderEventDetails.builder().errorCode(errorCode).build(); - onProviderEvent.accept(ProviderEvent.PROVIDER_ERROR, null, null); + // complete is an error, logically...even if the server went down gracefully we need to reconnect. + onProviderEvent.accept(ProviderEvent.PROVIDER_ERROR, details, null); } } diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/rpc/RpcResolverTest.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/rpc/RpcResolverTest.java index 188beac76..955d0fa2b 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/rpc/RpcResolverTest.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/rpc/RpcResolverTest.java @@ -111,7 +111,7 @@ void onCompletedRerunsStreamWithError() throws Exception { observer.onCompleted(); // should run consumer with error - await().untilAsserted(() -> verify(consumer).accept(eq(ProviderEvent.PROVIDER_STALE), any(), any())); + await().untilAsserted(() -> verify(consumer).accept(eq(ProviderEvent.PROVIDER_ERROR), any(), any())); // should have restarted the stream (2 calls) await().untilAsserted(() -> verify(stub, times(2)).eventStream(any(), any())); } @@ -127,7 +127,7 @@ void onErrorRunsConsumerWithError() throws Exception { observer.onError(new Exception("fake error")); // should run consumer with error - await().untilAsserted(() -> verify(consumer).accept(eq(ProviderEvent.PROVIDER_STALE), any(), any())); + await().untilAsserted(() -> verify(consumer).accept(eq(ProviderEvent.PROVIDER_ERROR), any(), any())); // should have restarted the stream (2 calls) await().untilAsserted(() -> verify(stub, times(2)).eventStream(any(), any())); } From bdc3e681f929cb9b9040691365c6c1ef7629b60c Mon Sep 17 00:00:00 2001 From: Todd Baert Date: Mon, 22 Dec 2025 14:13:49 -0500 Subject: [PATCH 16/17] fixup: revert enum change Signed-off-by: Todd Baert --- .../providers/flagd/resolver/process/InProcessResolver.java | 4 ++-- .../providers/flagd/resolver/process/storage/FlagStore.java | 6 +++--- .../flagd/resolver/process/storage/StorageState.java | 4 ++-- .../flagd/resolver/process/InProcessResolverTest.java | 4 ++-- .../flagd/resolver/process/storage/FlagStoreTest.java | 6 +++--- 5 files changed, 12 insertions(+), 12 deletions(-) diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolver.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolver.java index a57782c4f..a8d0b901f 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolver.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolver.java @@ -88,10 +88,10 @@ public void init() throws Exception { log.debug("post onConnectionEvent.accept ProviderEvent.PROVIDER_CONFIGURATION_CHANGED"); break; - case TRANSIENT_ERROR: + case STALE: onConnectionEvent.accept(ProviderEvent.PROVIDER_ERROR, null, null); break; - case FATAL_ERROR: + case ERROR: onConnectionEvent.accept( ProviderEvent.PROVIDER_ERROR, ProviderEventDetails.builder() diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStore.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStore.java index 8e86d1e9e..41284ad5d 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStore.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStore.java @@ -132,19 +132,19 @@ private void streamerListener(final QueueSource connector) throws InterruptedExc } catch (Throwable e) { // catch all exceptions and avoid stream listener interruptions log.warn("Invalid flag sync payload from connector", e); - if (!stateBlockingQueue.offer(new StorageStateChange(StorageState.TRANSIENT_ERROR))) { + if (!stateBlockingQueue.offer(new StorageStateChange(StorageState.STALE))) { log.warn("Failed to convey TRANSIENT_ERROR status, queue is full"); } } break; case ERROR: - if (!stateBlockingQueue.offer(new StorageStateChange(StorageState.TRANSIENT_ERROR))) { + if (!stateBlockingQueue.offer(new StorageStateChange(StorageState.STALE))) { log.warn("Failed to convey TRANSIENT_ERROR status, queue is full"); } break; case SHUTDOWN: shutdown(); - if (!stateBlockingQueue.offer(new StorageStateChange(StorageState.FATAL_ERROR))) { + if (!stateBlockingQueue.offer(new StorageStateChange(StorageState.ERROR))) { log.warn("Failed to convey FATAL_ERROR status, queue is full"); } break; diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/StorageState.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/StorageState.java index 70d3a7277..55b22dab4 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/StorageState.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/StorageState.java @@ -5,7 +5,7 @@ public enum StorageState { /** Storage is upto date and working as expected. */ OK, /** Storage has gone stale (most recent sync failed). May get to OK status with next sync. */ - TRANSIENT_ERROR, + STALE, /** Storage is in an unrecoverable error stage. */ - FATAL_ERROR, + ERROR, } diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolverTest.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolverTest.java index e77bdfb2d..04670b397 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolverTest.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolverTest.java @@ -116,7 +116,7 @@ void eventHandling() throws Throwable { boolean isDisconnected = event == ProviderEvent.PROVIDER_ERROR || event == ProviderEvent.PROVIDER_STALE; receiver.offer(new StorageStateChange( - isDisconnected ? StorageState.FATAL_ERROR : StorageState.OK, + isDisconnected ? StorageState.ERROR : StorageState.OK, details != null ? details.getFlagsChanged() : Collections.emptyList(), metadata)); }); @@ -135,7 +135,7 @@ void eventHandling() throws Throwable { TimeUnit.MILLISECONDS)) { Assertions.fail("failed to send the event"); } - if (!sender.offer(new StorageStateChange(StorageState.FATAL_ERROR), 100, TimeUnit.MILLISECONDS)) { + if (!sender.offer(new StorageStateChange(StorageState.ERROR), 100, TimeUnit.MILLISECONDS)) { Assertions.fail("failed to send the event"); } diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStoreTest.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStoreTest.java index eb609e4f3..e58b4eb3f 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStoreTest.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStoreTest.java @@ -46,7 +46,7 @@ void connectorHandling() throws Exception { }); assertTimeoutPreemptively(Duration.ofMillis(maxDelay), () -> { - assertEquals(StorageState.TRANSIENT_ERROR, states.take().getStorageState()); + assertEquals(StorageState.STALE, states.take().getStorageState()); }); // OK again for next payload @@ -64,14 +64,14 @@ void connectorHandling() throws Exception { }); assertTimeoutPreemptively(Duration.ofMillis(maxDelay), () -> { - assertEquals(StorageState.TRANSIENT_ERROR, states.take().getStorageState()); + assertEquals(StorageState.STALE, states.take().getStorageState()); }); // Shutdown handling store.shutdown(); assertTimeoutPreemptively(Duration.ofMillis(maxDelay), () -> { - assertEquals(StorageState.TRANSIENT_ERROR, states.take().getStorageState()); + assertEquals(StorageState.STALE, states.take().getStorageState()); }); } From a9694881d13f5966c6f8c0fd3e2f275ff02011b8 Mon Sep 17 00:00:00 2001 From: Todd Baert Date: Mon, 22 Dec 2025 14:31:45 -0500 Subject: [PATCH 17/17] fixup: test timeout Signed-off-by: Todd Baert --- .../contrib/providers/flagd/e2e/steps/ProviderSteps.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/ProviderSteps.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/ProviderSteps.java index 90d082292..2e8053d1d 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/ProviderSteps.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/ProviderSteps.java @@ -2,6 +2,7 @@ import static io.restassured.RestAssured.when; import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; import dev.openfeature.contrib.providers.flagd.Config; import dev.openfeature.contrib.providers.flagd.FlagdOptions; @@ -23,6 +24,7 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.time.Duration; +import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.RandomStringUtils; @@ -200,6 +202,9 @@ public void the_flag_was_modded() { @Then("the client should be in {} state") public void the_client_should_be_in_fatal_state(String clientState) { - assertThat(state.client.getProviderState()).isEqualTo(ProviderState.valueOf(clientState.toUpperCase())); + await().pollDelay(100, TimeUnit.MILLISECONDS) + .atMost(1000, TimeUnit.MILLISECONDS) + .untilAsserted(() -> assertThat(state.client.getProviderState()) + .isEqualTo(ProviderState.valueOf(clientState.toUpperCase()))); } }