Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,24 @@ import arrow.continuations.SuspendApp
import arrow.continuations.SuspendAppScope
import arrow.continuations.exitApp
import arrow.fx.coroutines.resourceScope
import kotlin.time.Duration
import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.Duration.Companion.seconds
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.awaitCancellation
import kotlinx.coroutines.awaitCancellation
import kotlinx.coroutines.async
import kotlinx.coroutines.delay
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlin.time.Duration
import kotlin.time.Duration.Companion.seconds

fun interface Work {
suspend fun SuspendAppScope.work()
}

sealed interface Mode : Work
data class Delay(val duration: Duration = 3.seconds) : Mode, Work by Work({ delay(duration) })
data object Wait : Mode, Work by Work({
while (isActive) {
delay(1000)
}
})

data object Wait : Mode, Work by Work({ awaitCancellation() })
data object Fail : Mode, Work by Work({ error("BOOM!") })
data object ChildFail : Mode, Work by Work({ launch { error("boom.") } })
data object ExitApp : Mode, Work by Work({ exitApp(42) })
Expand All @@ -30,6 +28,17 @@ data object ChildLaunchExitApp : Mode, Work by Work({
launch { exitApp(24) }
awaitCancellation()
})
data object ExitProcess : Mode, Work by Work({
launch { exitProcess(42) }
awaitCancellation()
})

data object Timeout : Mode, Work by Work({
resourceScope {
onRelease { delay(6.seconds) }
awaitCancellation()
}
})

fun app(mode: String?) = app(
when (mode) {
Expand All @@ -40,13 +49,15 @@ fun app(mode: String?) = app(
"exitapp" -> ExitApp
"childexitapp" -> ChildExitApp
"childlaunchexitapp" -> ChildLaunchExitApp
"exit" -> ExitProcess
"timeout" -> Timeout
else -> Delay()
}
)

fun app(work: Work) {
println("pre-suspendapp")
SuspendApp(timeout = 10.seconds) {
SuspendApp(timeout = 5.seconds) {
println("Running $work")
resourceScope {
onRelease {
Expand All @@ -59,3 +70,5 @@ fun app(work: Work) {
}
println("post-suspendapp")
}

expect fun exitProcess(code: Int): Nothing
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@

actual fun exitProcess(code: Int): Nothing {
js("process.exit(code);")
error("non-exiting process.exit")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import kotlin.system.exitProcess

actual fun exitProcess(code: Int): Nothing = exitProcess(code)
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import kotlin.system.exitProcess

actual fun exitProcess(code: Int): Nothing = exitProcess(code)
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
@file:OptIn(ExperimentalWasmJsInterop::class)

actual fun exitProcess(code: Int): Nothing {
jsExit(code)
error("did not exit")
}

@Suppress("unused")
private fun jsExit(code: Int) {
js("process.exit(code);")
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ open class JsSpec : SuspendAppTest() {
.apply { environment()["TASK"] = mode }

companion object {
val config = JsTestConfig("jsNodeRun")
val config = JsTestConfig("jsNodeProductionRun")

@JvmStatic
fun enabled() = config.validConfig()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
import io.kotest.inspectors.shouldForAtLeastOne
import io.kotest.inspectors.shouldForNone
import io.kotest.inspectors.shouldForOne
import io.kotest.matchers.collections.shouldBeIn
import io.kotest.matchers.shouldBe
import io.kotest.matchers.shouldNotBe
import io.kotest.matchers.string.shouldContain
import io.kotest.matchers.string.shouldEndWith
import kotlinx.coroutines.delay
import kotlinx.coroutines.test.runTest
Expand All @@ -19,15 +24,25 @@ abstract class SuspendAppTest : ProcessProvider {
val (process, output) = execute("fail")
process.exitValue() shouldBe 255
output.shouldForOne { it.line shouldBe "resource clean complete" }
.shouldForOne { it.line shouldEndWith "IllegalStateException: BOOM!" }
.shouldForAtLeastOne { it.line shouldEndWith "IllegalStateException: BOOM!" }
}

@Test
fun exit() = runTest {
val (process, output) = execute("exit")
process.exitValue() shouldBe 42
output
.filter { it.source == "stdout" }
.shouldForNone { it.line shouldBe "resource clean complete" }
.last().line shouldBe "Running ExitProcess"
}

@Test
fun childFailure() = runTest {
val (process, output) = execute("childfail")
process.exitValue() shouldBe 255
output.shouldForOne { it.line shouldBe "resource clean complete" }
.shouldForOne { it.line shouldEndWith "IllegalStateException: boom." }
.shouldForAtLeastOne { it.line shouldEndWith "IllegalStateException: boom." }
}

@Test
Expand Down Expand Up @@ -57,9 +72,21 @@ abstract class SuspendAppTest : ProcessProvider {
@Test
fun waitAndSignalSigint() = waitAndSignal(Signal.SIGINT)

private fun waitAndSignal(signal: Signal) = runTest {
val (process, output) = execute("wait") {
delay(1.seconds)
@Test
fun waitAndTimeout() = runTest {
val (process, output) = execute("timeout") {
delay(0.5.seconds)
sendSignal(Signal.SIGTERM)
}
// TODO: inconsistent exit codes across platforms, for now just check it's not a success
process.exitValue() shouldNotBe 0
output.shouldForAtLeastOne { it.line shouldContain "Timed out waiting for 5000 ms" }
.shouldForNone { it.line shouldContain "resource clean complete" }
}

private fun waitAndSignal(signal: Signal, mode: String = "wait") = runTest {
val (process, output) = execute(mode) {
delay(0.5.seconds)
sendSignal(signal)
}
process.exitValue() shouldBe signal.code + 128
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ class WasmJsSpec : JsSpec() {
override val config: JsTestConfig get() = Companion.config

companion object {
val config = JsTestConfig("wasmJsNodeRun")
val config = JsTestConfig("wasmJsNodeProductionRun")

@JvmStatic
fun enabled() = config.validConfig()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package arrow.continuations

import arrow.AutoCloseScope
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger
import kotlin.coroutines.CoroutineContext
import kotlin.system.exitProcess
import kotlinx.coroutines.CoroutineScope
Expand All @@ -14,42 +15,85 @@ internal actual fun AutoCloseScope.process(): Process = JvmProcess
private object JvmProcess : Process {
override fun onShutdown(block: suspend () -> Unit): () -> Unit {
val isShutdown = AtomicBoolean(false)
val lastSignal = AtomicInteger(-1)
onSigInt(lastSignal::set)
onSigTerm(lastSignal::set)

fun shutdown() {
if (!isShutdown.getAndSet(true))
runBlocking {
// We don't call exit from ShutdownHook on JVM
try {
block()
} catch (e: Throwable) {
e.printStackTrace()
if (!isShutdown.getAndSet(true)) {
/*
`SuspendApp` is intended to be invoked from the main entrypoint (a non-daemon thread) and, in that normal usage,
will unregister this hook before main completes.

This hook can run when:
1) JVM receives SIGINT/SIGTERM
2) last non-daemon thread exits
3) `System.exit` is called

We run graceful shutdown only for (1), detected by a captured signal.

Regarding (2):
- In the expected usage, a non-daemon caller thread (usually main) remains alive while
`SuspendApp` is running.
- So if this hook is invoked because the last non-daemon thread exited, that likely reflects an
unexpected invocation pattern (for example, daemon-thread-driven usage).
- We choose not to provide graceful-shutdown guarantees for that edge case.

Regarding (3) and deadlock risk:
- `System.exit` may be called from code running in the `SuspendApp` coroutine scope.
- `System.exit` will never return to that caller.
- If this hook attempts graceful termination (`block()`), it can end up waiting for the
`SuspendApp` scope to complete (join/drain), while that scope is waiting on a call path that
includes the non-returning `System.exit`.
- Since that wait cannot resolve, the shutdown hook will deadlock.

Other platform behaviour:
- Other targets do not have a JVM-style shutdown hook with equivalent process-exit semantics,
and currently do not attempt graceful shutdown either, except when signalled.

So, graceful shutdown is gated solely on SIGINT/SIGTERM capture, both to align behaviour with
other platforms and to avoid the potential deadlock of an explicit `System.exit`.
*/
if (lastSignal.get() != -1) {
runBlocking {
// We don't call exit from ShutdownHook on JVM
try {
block()
} catch (e: Throwable) {
e.printStackTrace()
}
}
}
}
}

val hook = Thread(::shutdown, "Arrow-kt SuspendApp JVM ShutdownHook")
Runtime.getRuntime().addShutdownHook(hook)
return {
if (!isShutdown.get()) {
Runtime.getRuntime().removeShutdownHook(hook)
try {
Runtime.getRuntime().removeShutdownHook(hook)
} catch (_: IllegalStateException) {
// Shutdown hook already running, ignore
}
}
}
}

override fun onSigTerm(block: suspend (code: Int) -> Unit): Unit =
addSignalHandler("SIGTERM") { block(15) }
addSignalHandler("TERM", block)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the change from SIGTERM to TERM needed?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sun.misc.Signal takes the signal name sans SIG prefix.
The example from the JavaDoc is:

Signal objects are created based on their names. For example:

new Signal("INT");

Previously these were throwing IllegalArgumentException on construction that were being caught and ignored.


override fun onSigInt(block: suspend (code: Int) -> Unit): Unit =
addSignalHandler("SIGINT") { block(2) }
addSignalHandler("INT", block)

private fun addSignalHandler(signal: String, action: suspend () -> Unit): Unit =
private fun addSignalHandler(signal: String, action: suspend (code: Int) -> Unit): Unit =
try {
var handle: SignalHandler? = null
@Suppress("ASSIGNED_VALUE_IS_NEVER_READ")
handle =
Signal.handle(Signal(signal)) { prev ->
runBlocking { action() }
Signal.handle(Signal(signal)) { sig ->
runBlocking { action(sig.number) }
if (handle != SignalHandler.SIG_DFL && handle != SignalHandler.SIG_IGN) {
handle?.handle(prev)
handle?.handle(sig)
}
}
} catch (_: Throwable) {
Expand Down
Loading