From 123d79ea36b857f04c6fa1db0161c8cbc4b37bc2 Mon Sep 17 00:00:00 2001 From: Youssef Shoaib Date: Sat, 29 Nov 2025 22:34:10 +0000 Subject: [PATCH] Use guaranteeCase in CircuitBreaker instead of addSuppressed --- .../arrow-resilience/build.gradle.kts | 4 +- .../kotlin/arrow/resilience/CircuitBreaker.kt | 37 ++++++++----------- 2 files changed, 17 insertions(+), 24 deletions(-) diff --git a/arrow-libs/resilience/arrow-resilience/build.gradle.kts b/arrow-libs/resilience/arrow-resilience/build.gradle.kts index e380ad13a49..426173ad74c 100644 --- a/arrow-libs/resilience/arrow-resilience/build.gradle.kts +++ b/arrow-libs/resilience/arrow-resilience/build.gradle.kts @@ -6,9 +6,7 @@ kotlin { sourceSets { commonMain { dependencies { - api(projects.arrowCore) - implementation(libs.coroutines.core) - implementation(projects.arrowExceptionUtils) + implementation(projects.arrowFxCoroutines) } } commonTest { diff --git a/arrow-libs/resilience/arrow-resilience/src/commonMain/kotlin/arrow/resilience/CircuitBreaker.kt b/arrow-libs/resilience/arrow-resilience/src/commonMain/kotlin/arrow/resilience/CircuitBreaker.kt index 8c67cb69da0..590451abf45 100644 --- a/arrow-libs/resilience/arrow-resilience/src/commonMain/kotlin/arrow/resilience/CircuitBreaker.kt +++ b/arrow-libs/resilience/arrow-resilience/src/commonMain/kotlin/arrow/resilience/CircuitBreaker.kt @@ -7,11 +7,10 @@ import arrow.atomic.update import arrow.atomic.updateAndGet import arrow.core.Either import arrow.core.raise.catch +import arrow.fx.coroutines.ExitCase +import arrow.fx.coroutines.guaranteeCase import arrow.resilience.CircuitBreaker.State.* -import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CompletableDeferred -import kotlinx.coroutines.NonCancellable -import kotlinx.coroutines.withContext import kotlin.contracts.ExperimentalContracts import kotlin.contracts.InvocationKind import kotlin.contracts.contract @@ -263,19 +262,22 @@ private constructor( contract { callsInPlace(task, InvocationKind.EXACTLY_ONCE) } - return try { + return guaranteeCase({ onHalfOpen.invoke() task.invoke() - } catch (e: CancellationException) { - // We need to return to Open state, otherwise we get stuck in Half-Open (see https://github.com/monix/monix/issues/1080 ) - state.set(Open(state.get().openingStrategy, lastStartedAt, resetTimeout, awaitClose)) - onOpenAndThrow(e) - } catch (e: Throwable) { - // Failed reset, which means we go back in the Open state with new expiry val nextTimeout - val value: Duration = (resetTimeout * exponentialBackoffFactor) - val nextTimeout = if (maxResetTimeout.isFinite() && value > maxResetTimeout) maxResetTimeout else value - state.set(Open(state.get().openingStrategy, timeSource.markNow(), nextTimeout, awaitClose)) - onOpenAndThrow(e) + }) { + val (value, timeout) = when (it) { + is ExitCase.Completed -> return@guaranteeCase + // We need to return to Open state, otherwise we get stuck in Half-Open (see https://github.com/monix/monix/issues/1080 ) + is ExitCase.Cancelled -> lastStartedAt to resetTimeout + is ExitCase.Failure -> { + val value = resetTimeout * exponentialBackoffFactor + val nextTimeout = if (maxResetTimeout.isFinite() && value > maxResetTimeout) maxResetTimeout else value + timeSource.markNow() to nextTimeout + } + } + state.set(Open(state.get().openingStrategy, value, timeout, awaitClose)) + onOpen.invoke() }.also { // While in HalfOpen only a reset attempt is allowed to update the state, so setting this directly is safe state.set(Closed(state.get().openingStrategy.resetFailuresCount())) @@ -284,13 +286,6 @@ private constructor( } } - private suspend fun onOpenAndThrow(original: Throwable): Nothing { - runCatching { - withContext(NonCancellable) { onOpen.invoke() } - }.exceptionOrNull()?.let { original.addSuppressed(it) } - throw original - } - /** Returns a new circuit breaker that wraps the state of the source * and that upon a task being rejected will execute the given [callback]. *