Skip to content

fix: support nested schemas and handle client disconnects in responses telemetry#2532

Open
abn wants to merge 7 commits into
lemonade-sdk:mainfrom
abn:fix/responses-api-telemetry-parsing
Open

fix: support nested schemas and handle client disconnects in responses telemetry#2532
abn wants to merge 7 commits into
lemonade-sdk:mainfrom
abn:fix/responses-api-telemetry-parsing

Conversation

@abn

@abn abn commented Jul 2, 2026

Copy link
Copy Markdown
Contributor

Problem

  1. Empty Output & Zero Token Counts: Telemetry traces for requests using the OpenAI-compatible /v1/responses endpoint showed empty generation outputs and 0 token usage statistics on Arize Phoenix. This occurred because:
    • Streaming text chunks were formatted with a top-level delta object/string instead of a choices array.
    • Usage statistics and timings were nested inside a top-level response object instead of being placed at the JSON root.
    • The Responses API chunk usage payload specifies "input_tokens" and "output_tokens" instead of standard OpenAI "prompt_tokens" and "completion_tokens".
  2. Aborted Streams Reported as OK: When a client disconnected prematurely (e.g. due to client-side timeouts during long parallel prompt processing), the write callback failed (CURLE_WRITE_ERROR). The proxy code didn't flag this curl error as a stream failure, resulting in the server synthesizing [DONE] and completing the span with an OK status despite the output being truncated.

Solution

  • Top-Level Delta Parsing: Updated Router::responses_stream to parse top-level delta strings and objects, extracting text changes correctly.
  • Nested Telemetry Parsing: Updated StreamingProxy::forward_sse_stream and StreamingProxy::parse_telemetry to look for usage and timings both at the root of the chunk and nested inside the response object.
  • Key Fallbacks: Parsed both "prompt_tokens"/"completion_tokens" (OpenAI) and "input_tokens"/"output_tokens" (Responses API) key schemas in both streaming (StreamingProxy) and non-streaming (Router::chat_completion, Router::completion, Router::embeddings, Router::reranking) handlers.
  • Client Disconnect Capture: Updated the SSE and byte stream forwarders to explicitly check if result.curl_code != CURLE_OK.
    • If the client disconnects (CURLE_WRITE_ERROR), the telemetry span is ended with an ERROR status indicating a client disconnect, preventing cut-off spans from appearing as successful OK runs.
    • We avoid throwing a C++ exception for CURLE_WRITE_ERROR so we don't trigger false-positive model resets or evictions on the server when a client simply closes their socket.

Testing

  • Added C++ unit tests to test_telemetry_helpers.cpp validating StreamingProxy::parse_telemetry against root-level and nested telemetry schemas with standard and fallback key names.
  • Built and verified that all C++ unit tests pass cleanly:
    cmake --build --preset default
    ./build/test_telemetry_helpers

@github-actions github-actions Bot added bug Something isn't working area::api HTTP REST API surface and route handlers cpp labels Jul 2, 2026

@fl0rianr fl0rianr left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, this looks directionally right and fixes the important telemetry schema mismatch for Responses API usage (input_tokens / output_tokens) and nested response.usage.

One thing I’d like to tighten before merging: in Router::responses_stream, we now append any top-level string delta to the accumulated telemetry output. For the normal Responses streaming text event this is correct (type == "response.output_text.delta"), but the Responses stream can contain multiple event types. If a backend emits non-text delta events later, such as tool/function/reasoning-related deltas, those could accidentally be recorded as model output in telemetry.

Could we gate this on the event type, e.g. only append top-level string delta when type == "response.output_text.delta"? The delta.text object fallback can stay if it is needed for a Lemonade-specific backend format, but it should ideally be similarly constrained or documented.

I’d also like to see a small test for the output accumulator path:

  1. response.output_text.delta contributes to accumulated_text.
  2. A non-text delta event does not contribute to accumulated_text.

Optional but useful: add coverage for the CURLE_WRITE_ERROR path so we know client disconnects end the telemetry span as an error without triggering backend reset behavior.

abn added 7 commits July 3, 2026 00:50
Parse usage/timings nested inside 'response' objects in SSE chunks.
Correctly set stream_error and error_message when client disconnects
(CURLE_WRITE_ERROR) or other curl errors occur, ending the span with
an error status in telemetry.
Export parse_telemetry as public to enable unit testing and add test cases
covering root-level and nested usage/timings schemas in SSE stream parsing.
Support both standard prompt_tokens/completion_tokens keys and the newer
input_tokens/output_tokens keys in the SSE chunk usage metrics parser,
ensuring complete token metrics collection for the Responses API.
Add fallback for input_tokens and output_tokens usage keys in non-streaming
chat_completion, completion, embeddings, and reranking handlers.
@abn abn force-pushed the fix/responses-api-telemetry-parsing branch from 74c6d1b to c3a25a0 Compare July 2, 2026 22:51
@abn

abn commented Jul 2, 2026

Copy link
Copy Markdown
Contributor Author

Hey @fl0rianr. Thanks for the feedback! I've addressed your recommendations and pushed the updates to the branch:

  1. Constrained Responses Delta Parsing: Extracted the text accumulation logic into a shared helper StreamingProxy::accumulate_responses_delta. It now checks the event type and only appends the top-level string delta when type == "response.output_text.delta". Chunks without a type field fall back to direct parsing for backward compatibility.
  2. Added Unit Tests: Added 7 new test cases in test_telemetry_helpers.cpp verifying the accumulator path:
    • response.output_text.delta events (both string and text-object format) correctly contribute to the accumulated output.
    • Non-text events (such as response.audio.delta or response.tool.delta) are ignored and do not contribute to the text output.
    • Legacy chunks lacking the type field correctly fall back and contribute to the text.
  3. Transport Error Coverage: Added a unit test validating that connection errors (simulated via closed ports) correctly populate telemetry.error_message. Also wrapped the curl stream call in a try-catch block to safely catch stream-level exceptions.

Let me know if something else needs fixing.

@abn abn requested a review from fl0rianr July 2, 2026 22:52

@fl0rianr fl0rianr left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates. The Responses delta accumulator looks good now.

I still think the streaming error handling needs one more fix before merge.

The new broad try/catch around HttpClient::post_stream(` in StreamingProxy::forward_sse_stream() changes the ownership of backend connection failures. HttpClient::post_stream() still throws for real cURL errors except CURLE_PARTIAL_FILE and CURLE_RECV_ERROR. With the new catch, connection failures such as “failed to connect” are converted into a telemetry error and are not rethrown to WrappedServer::forward_streaming_request().

That bypasses the existing retry/watchdog path in WrappedServer, which is specifically responsible for detecting backend connection failures before any stream bytes were delivered and replaying/reloading when appropriate. In the current shape, a backend connection failure can end up as an empty/done stream with only the telemetry span marked as error, rather than a proper retry/reset or client-visible error.

There is also a related issue with CURLE_WRITE_ERROR: forward_sse_stream() now has a branch for result.curl_code == CURLE_WRITE_ERROR, but HttpClient::post_stream() currently throws for that code before returning the HttpResponse, because only CURLE_PARTIAL_FILE and CURLE_RECV_ERROR are exempted. So the intended client-disconnect branch is not reliably reached.

I’d suggest this structure instead:

  1. Let HttpClient::post_stream() return CURLE_WRITE_ERROR to the streaming layer, like it already does for CURLE_PARTIAL_FILE / CURLE_RECV_ERROR.

  2. In StreamingProxy, classify only CURLE_WRITE_ERROR as a client disconnect and end telemetry with a client-disconnect error.

  3. For CURLE_PARTIAL_FILE / CURLE_RECV_ERROR, keep the existing protocol-aware behavior:

    • before [DONE]: throw backend stream failure;
    • after [DONE]: treat as clean completion.
  4. Do not broadly swallow other post_stream() exceptions in StreamingProxy; let them propagate to WrappedServer so backend retry/watchdog behavior remains intact.

The new closed-port test currently exercises backend connection failure, not client disconnect. I would remove or replace it with a test where a stream starts successfully and DataSink.write returns false, so the test specifically validates the CURLE_WRITE_ERROR client-disconnect path without changing backend-failure semantics.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area::api HTTP REST API surface and route handlers bug Something isn't working cpp

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants