fix: support nested schemas and handle client disconnects in responses telemetry#2532
fix: support nested schemas and handle client disconnects in responses telemetry#2532abn wants to merge 7 commits into
Conversation
fl0rianr
left a comment
There was a problem hiding this comment.
Thanks, this looks directionally right and fixes the important telemetry schema mismatch for Responses API usage (input_tokens / output_tokens) and nested response.usage.
One thing I’d like to tighten before merging: in Router::responses_stream, we now append any top-level string delta to the accumulated telemetry output. For the normal Responses streaming text event this is correct (type == "response.output_text.delta"), but the Responses stream can contain multiple event types. If a backend emits non-text delta events later, such as tool/function/reasoning-related deltas, those could accidentally be recorded as model output in telemetry.
Could we gate this on the event type, e.g. only append top-level string delta when type == "response.output_text.delta"? The delta.text object fallback can stay if it is needed for a Lemonade-specific backend format, but it should ideally be similarly constrained or documented.
I’d also like to see a small test for the output accumulator path:
- response.output_text.delta contributes to accumulated_text.
- A non-text delta event does not contribute to accumulated_text.
Optional but useful: add coverage for the CURLE_WRITE_ERROR path so we know client disconnects end the telemetry span as an error without triggering backend reset behavior.
Parse usage/timings nested inside 'response' objects in SSE chunks. Correctly set stream_error and error_message when client disconnects (CURLE_WRITE_ERROR) or other curl errors occur, ending the span with an error status in telemetry.
Export parse_telemetry as public to enable unit testing and add test cases covering root-level and nested usage/timings schemas in SSE stream parsing.
Support both standard prompt_tokens/completion_tokens keys and the newer input_tokens/output_tokens keys in the SSE chunk usage metrics parser, ensuring complete token metrics collection for the Responses API.
Add fallback for input_tokens and output_tokens usage keys in non-streaming chat_completion, completion, embeddings, and reranking handlers.
74c6d1b to
c3a25a0
Compare
|
Hey @fl0rianr. Thanks for the feedback! I've addressed your recommendations and pushed the updates to the branch:
Let me know if something else needs fixing. |
fl0rianr
left a comment
There was a problem hiding this comment.
Thanks for the updates. The Responses delta accumulator looks good now.
I still think the streaming error handling needs one more fix before merge.
The new broad try/catch around HttpClient::post_stream(` in StreamingProxy::forward_sse_stream() changes the ownership of backend connection failures. HttpClient::post_stream() still throws for real cURL errors except CURLE_PARTIAL_FILE and CURLE_RECV_ERROR. With the new catch, connection failures such as “failed to connect” are converted into a telemetry error and are not rethrown to WrappedServer::forward_streaming_request().
That bypasses the existing retry/watchdog path in WrappedServer, which is specifically responsible for detecting backend connection failures before any stream bytes were delivered and replaying/reloading when appropriate. In the current shape, a backend connection failure can end up as an empty/done stream with only the telemetry span marked as error, rather than a proper retry/reset or client-visible error.
There is also a related issue with CURLE_WRITE_ERROR: forward_sse_stream() now has a branch for result.curl_code == CURLE_WRITE_ERROR, but HttpClient::post_stream() currently throws for that code before returning the HttpResponse, because only CURLE_PARTIAL_FILE and CURLE_RECV_ERROR are exempted. So the intended client-disconnect branch is not reliably reached.
I’d suggest this structure instead:
-
Let
HttpClient::post_stream()return CURLE_WRITE_ERROR to the streaming layer, like it already does for CURLE_PARTIAL_FILE / CURLE_RECV_ERROR. -
In
StreamingProxy, classify only CURLE_WRITE_ERROR as a client disconnect and end telemetry with a client-disconnect error. -
For CURLE_PARTIAL_FILE / CURLE_RECV_ERROR, keep the existing protocol-aware behavior:
- before [DONE]: throw backend stream failure;
- after [DONE]: treat as clean completion.
-
Do not broadly swallow other
post_stream()exceptions inStreamingProxy; let them propagate toWrappedServerso backend retry/watchdog behavior remains intact.
The new closed-port test currently exercises backend connection failure, not client disconnect. I would remove or replace it with a test where a stream starts successfully and DataSink.write returns false, so the test specifically validates the CURLE_WRITE_ERROR client-disconnect path without changing backend-failure semantics.
Problem
/v1/responsesendpoint showed empty generation outputs and0token usage statistics on Arize Phoenix. This occurred because:deltaobject/string instead of achoicesarray.responseobject instead of being placed at the JSON root."input_tokens"and"output_tokens"instead of standard OpenAI"prompt_tokens"and"completion_tokens".CURLE_WRITE_ERROR). The proxy code didn't flag this curl error as a stream failure, resulting in the server synthesizing[DONE]and completing the span with anOKstatus despite the output being truncated.Solution
Router::responses_streamto parse top-leveldeltastrings and objects, extracting text changes correctly.StreamingProxy::forward_sse_streamandStreamingProxy::parse_telemetryto look forusageandtimingsboth at the root of the chunk and nested inside theresponseobject."prompt_tokens"/"completion_tokens"(OpenAI) and"input_tokens"/"output_tokens"(Responses API) key schemas in both streaming (StreamingProxy) and non-streaming (Router::chat_completion,Router::completion,Router::embeddings,Router::reranking) handlers.result.curl_code != CURLE_OK.CURLE_WRITE_ERROR), the telemetry span is ended with anERRORstatus indicating a client disconnect, preventing cut-off spans from appearing as successfulOKruns.CURLE_WRITE_ERRORso we don't trigger false-positive model resets or evictions on the server when a client simply closes their socket.Testing
test_telemetry_helpers.cppvalidatingStreamingProxy::parse_telemetryagainst root-level and nested telemetry schemas with standard and fallback key names.