fix(inference): SSE inactivity watchdog to stop research-agent RESPONSE hang (#4269)#4393
Conversation
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (5)
✅ Files skipped from review due to trivial changes (1)
🚧 Files skipped from review as they are similar to previous changes (4)
📝 WalkthroughWalkthroughAdds a configurable per-chunk streaming idle timeout, wires it into OpenAI-compatible streaming, applies watchdogs to SSE reads and delta forwarding, and adds coverage for stalled reads, backpressure, dropped receivers, and terminal completion. ChangesStream idle watchdog
Estimated code review effort: 4 (Complex) | ~50 minutes Sequence Diagram(s)sequenceDiagram
participant stream_native_chat
participant bytes_stream
participant forward_delta
participant delta_tx
loop per chunk
stream_native_chat->>bytes_stream: next()
alt chunk arrives before idle timeout
bytes_stream-->>stream_native_chat: SSE chunk
stream_native_chat->>forward_delta: send(delta)
forward_delta->>delta_tx: send(delta)
alt receiver closed
delta_tx-->>forward_delta: dropped
forward_delta-->>stream_native_chat: Ok(())
else send succeeds
delta_tx-->>forward_delta: sent
forward_delta-->>stream_native_chat: Ok(())
else backpressure timeout
forward_delta-->>stream_native_chat: retryable error
end
else no chunk before idle timeout
bytes_stream-->>stream_native_chat: timeout
stream_native_chat-->>stream_native_chat: abort retryable watchdog error
end
end
stream_native_chat->>stream_native_chat: stop on [DONE]
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 inconclusive)
✅ Passed checks (4 passed)
Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/openhuman/inference/provider/compatible_stream_native.rs`:
- Around line 263-293: The SSE loop in compatible_stream_native.rs is still
treating the terminal `[DONE]` event as non-final, so it re-arms the idle
watchdog and can incorrectly fail a completed response if the socket stays open.
Update the stream-reading logic around the `bytes_stream.next()` loop and
`[DONE]` handling so the loop exits immediately on the terminal sentinel instead
of continuing to another read. Add a regression test in the same provider path
that emits `[DONE]` and then keeps the connection open to verify the stream
completes without tripping `stream_idle_timeout`.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 82583067-bccd-4429-9c9a-93da830c0165
📒 Files selected for processing (5)
.env.examplesrc/openhuman/inference/provider/compatible.rssrc/openhuman/inference/provider/compatible_stream_native.rssrc/openhuman/inference/provider/compatible_tests.rssrc/openhuman/inference/provider/compatible_timeout.rs
…e watchdog CodeRabbit review on tinyhumansai#4393: after the terminal [DONE] sentinel the loop re-armed the idle watchdog, so a provider that sends [DONE] but holds the socket open would fail an already-complete response as a retryable stall. Break on [DONE]; add a regression that sends [DONE] then keeps the connection open. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…e watchdog CodeRabbit review on tinyhumansai#4393: after the terminal [DONE] sentinel the loop re-armed the idle watchdog, so a provider that sends [DONE] but holds the socket open would fail an already-complete response as a retryable stall. Break on [DONE]; add a regression that sends [DONE] then keeps the connection open. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
e0e5652 to
35be03b
Compare
…SE hang (tinyhumansai#4269) The native streaming path parked on `bytes_stream.next().await` when an upstream flushed 200 then went silent mid-response — cut only by the blunt whole-request timeout, which tinyhumansai#3856 tells operators to raise up to 1h, turning the stall into an indefinite hang. Add a per-chunk inactivity watchdog that resets on every token and aborts a stalled stream with a retryable error (ReliableProvider replays it); also bound the delta send so a wedged consumer can't hang the turn on a full channel. New env knob OPENHUMAN_INFERENCE_STREAM_IDLE_TIMEOUT_SECS (default 90s, range 1-3600). Reproduced against staging via CDP: deep-research streams stalled the full 120s request-timeout window, surfacing as "error decoding response body". Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…e watchdog CodeRabbit review on tinyhumansai#4393: after the terminal [DONE] sentinel the loop re-armed the idle watchdog, so a provider that sends [DONE] but holds the socket open would fail an already-complete response as a retryable stall. Break on [DONE]; add a regression that sends [DONE] then keeps the connection open. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
35be03b to
17f4eb8
Compare
M3gA-Mind
left a comment
There was a problem hiding this comment.
PR #4393 — SSE inactivity watchdog to stop research-agent RESPONSE hang (#4269)
Walkthrough
Wraps each SSE read in stream_native_chat with a per-chunk inactivity tokio::time::timeout (default 90s, env-overridable, resets on every chunk), and applies the same bound to each downstream delta send via a new forward_delta helper. A stall aborts as a retryable error so ReliableProvider replays the turn (degrading to non-streaming), instead of parking on next().await until the raised whole-request ceiling. Approach is sound, correctly scoped to the reproduced native path, and unusually well tested (stall, per-chunk reset, wedged consumer, dropped receiver, and the [DONE]-lingers regression). Solid work — no blockers from me.
Changes
| File | Summary |
|---|---|
.env.example |
Documents new OPENHUMAN_INFERENCE_STREAM_IDLE_TIMEOUT_SECS knob. |
compatible.rs |
Adds stream_idle_timeout field + test-only with_stream_idle_timeout setter. |
compatible_stream_native.rs |
Read loop → watchdog-armed loop; [DONE] now break not continue; delta sends routed through new forward_delta. |
compatible_timeout.rs |
stream_idle_timeout() resolver (default 90, 1–3600) + unit tests. |
compatible_tests.rs |
5 integration tests incl. a raw scripted-SSE TCP server for deterministic stalls. |
Actionable comments (2)
⚠️ Major / Question
1. src/openhuman/inference/provider/compatible_stream.rs:32 — sibling SSE reader has the identical unguarded park
The other streaming path, sse_bytes_to_chunks, still does a bare while let Some(item) = bytes_stream.next().await with no inactivity bound. It's live — invoked from compatible_provider_impl.rs:1159 (stream_chat_with_system) and :1419 (stream_chat_with_history) — so the same upstream "flush 200 then go silent" stall that #4269 fixes on the native path can still hang a turn here until the whole-request ceiling.
Not necessarily blocking (the research-agent repro goes through stream_native_chat, which this PR covers), but worth confirming: are those two entrypoints reachable for the flows #4269 cares about? If yes, this fix is incomplete without the same watchdog there; if they're legacy/unused, a one-line note on the PR would close the scope question. A follow-up issue is a fine resolution either way.
💡 Consideration
2. src/openhuman/inference/provider/compatible_timeout.rs:33 — 90s idle vs providers that buffer reasoning silently
The comment argues reasoning pauses are safe because thinking streams as reasoning_content deltas that reset the window. That holds for managed/OpenAI-style reasoning, but some providers emit no deltas during a long think and then flush the answer. Against those, a >90s silent think would trip the watchdog and force an (unnecessary) retry. It degrades gracefully — retryable → non-streaming replay still returns an answer — and it's env-tunable, so this is a heads-up, not a defect. Consider a one-line caveat in the rustdoc so an operator debugging "why did my slow local reasoning model retry" finds the knob quickly.
Nitpicks (1)
compatible_stream_native.rs— the read loop copieslet idle_window = self.stream_idle_timeout;whileforward_deltareadsself.stream_idle_timeoutdirectly. Harmless, but using one form in both spots reads cleaner.
Verified / looks good
- Retryable classification is correct. Neither watchdog message ("no response data…" / "delta channel stalled…") matches any
is_non_retryablehint (no structured 4xx, no auth/quota/session-expired substrings), and bothstream_watchdog_trips_*tests assert!is_non_retryable. SoReliableProvidergenuinely replays. [DONE]→break 'streamis right, and thestream_stops_on_done_even_if_socket_lingerstest nails the exact watchdog×terminal-sentinel interaction that would otherwise fail a completed response as a stall.forward_deltapreserves prior semantics for the two benign cases (success; dropped receiver →Ok), adding only the wedge timeout. The oldlet _ = send().awaitalso awaited, so healthy consumers behave identically.- Bounds/env resolution reuse the existing
resolve/parse_timeout_secspath;0/out-of-range/typo all fall back to the default (can't accidentally disable the guard) — covered bystream_idle_parse_respects_bounds. - Timeout-cancelled
bytes_stream.next()is dropped on the bail path, so there's no partial-read/data-loss concern.
Posted as a comment (not an approval) — leaving the approve/request-changes call to the maintainer.
Summary
stream_native_chat) that aborts a stalled stream after a configurable idle window (default 90s) instead of parking indefinitely onbytes_stream.next().await.ReliableProviderreplays the turn (degrading to non-streaming on retry) — matching existing recovery behaviour.OPENHUMAN_INFERENCE_STREAM_IDLE_TIMEOUT_SECS(default 90, range 1–3600).Problem
#4269 — the research agent intermittently hangs in the RESPONSE phase after tool calls complete.
Reproduced end-to-end against a staging build by driving the research agent via CDP and looping deep-research queries (arxiv paper + web search — the issue's own repro shape). The failure is an upstream SSE stall on the read side:
Exactly 120s apart: the SSE body goes silent mid-response and the reader parks on
bytes_stream.next().await(compatible_stream_native.rs) until the whole-request timeout cuts it. On default config that is a ~2-minute freeze ("cursor blinks, no output"); but #3856 advises operators to raiseOPENHUMAN_INFERENCE_TIMEOUT_SECSup to 3600s for long research turns — with that raised, this exact stall hangs for up to an hour = the reported indefinite hang. The stall class appeared on ~2 of every 3 research runs; two runs blew past a 10-minute cap.The whole-request timeout is the wrong instrument: it cannot distinguish "stalled, zero tokens" from "valid long response still streaming", and operators are told to raise it. A per-token inactivity watchdog bounds the stall independent of that knob.
Solution
compatible_timeout.rs— newstream_idle_timeout()(envOPENHUMAN_INFERENCE_STREAM_IDLE_TIMEOUT_SECS, default 90s, range 1..=3600), reusing the existing resolver +OnceLockcache pattern.compatible.rs—OpenAiCompatibleProvidercarries the idle window (defaulted from config); a#[cfg(test)]with_stream_idle_timeoutinjects a small value in tests.compatible_stream_native.rs— wrap the SSE read in a per-chunktokio::time::timeoutthat resets each iteration; route every delta through a newforward_deltahelper that applies the same idle bound to the send (dropped receiver = benignOk; idle timeout = retryable bail). Both bail messages are crafted soreliable::is_non_retryableclassifies them retryable..env.example— document the knob.Scope note: the observed failure is the silent-stall case (no
finish_reason/[DONE]arrives — the body just dies). Honoring the in-band terminal marker to end instantly when an upstream does send[DONE]but lingers the socket is an orthogonal correctness improvement, deferred as a follow-up.Tests
stream_watchdog_trips_on_stalled_read— raw-TCP server flushes200SSE headers then goes silent; asserts a retryable watchdog abort (well before the whole-request timeout).stream_watchdog_resets_on_each_chunk— chunks arriving under the idle window stream to completion (no false cut — the no-regression guarantee).stream_watchdog_trips_on_wedged_delta_consumer— a capacity-1, never-drained delta channel trips the send-side watchdog.compatible_timeoutresolver tests for the new bound + boundaries.Submission Checklist
cargo-llvm-cov+diff-cover).## Related— N/A: no coverage-matrix feature ID applies.Closes #NNNin## Related.Impact
tokio::time::timeoutper SSE chunk (negligible).Related
Summary by CodeRabbit
New Features
OPENHUMAN_INFERENCE_STREAM_IDLE_TIMEOUT_SECS(documented default and allowed range).Bug Fixes
[DONE]sentinel is received.Tests
[DONE]completion behavior.