Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
"""Serialization helpers for converting Mistral API payloads to OTEL GenAI convention formats.
"""Formatting helpers for converting Mistral API payloads to OTEL GenAI convention formats.

These are pure functions with no OTEL dependencies — they transform dicts to JSON strings
These are pure functions with no OTEL dependencies — they transform dicts to dicts
matching the GenAI semantic convention schemas for input/output messages and tool definitions.
The caller is responsible for the final JSON serialization (single json.dumps on the whole
collection) before setting span attributes.

Schemas:
- Input messages: https://github.com/open-telemetry/semantic-conventions/blob/main/docs/gen-ai/gen-ai-input-messages.json
- Output messages: https://github.com/open-telemetry/semantic-conventions/blob/main/docs/gen-ai/gen-ai-output-messages.json
- Tool definitions: https://github.com/Cirilla-zmh/semantic-conventions/blob/cc4d07e7e56b80e9aa5904a3d524c134699da37f/docs/gen-ai/gen-ai-tool-definitions.json
"""

import json
from typing import Any


Expand Down Expand Up @@ -72,8 +73,8 @@ def _tool_calls_to_parts(tool_calls: list[dict] | None) -> list[dict]:
return parts


def serialize_input_message(message: dict[str, Any]) -> str:
"""Serialize a single input message per the OTEL GenAI convention.
def format_input_message(message: dict[str, Any]) -> dict[str, Any]:
"""Format a single input message per the OTEL GenAI convention.

Schema: https://github.com/open-telemetry/semantic-conventions/blob/main/docs/gen-ai/gen-ai-input-messages.json
ChatMessage: {role (required), parts (required), name?}
Expand All @@ -89,7 +90,7 @@ def serialize_input_message(message: dict[str, Any]) -> str:
part: dict = {"type": "tool_call_response", "response": message.get("result")}
if (tool_call_id := message.get("tool_call_id")) is not None:
part["id"] = tool_call_id
return json.dumps({"role": "tool", "parts": [part]})
return {"role": "tool", "parts": [part]}

# TODO: may need to handle other types for conversations (e.g. agent handoff)

Expand All @@ -109,11 +110,11 @@ def serialize_input_message(message: dict[str, Any]) -> str:
parts.extend(_content_to_parts(message.get("content")))
parts.extend(_tool_calls_to_parts(message.get("tool_calls")))

return json.dumps({"role": role, "parts": parts})
return {"role": role, "parts": parts}


def serialize_output_message(choice: dict[str, Any]) -> str:
"""Serialize a single output choice/message per the OTEL GenAI convention.
def format_output_message(choice: dict[str, Any]) -> dict[str, Any]:
"""Format a single output choice/message per the OTEL GenAI convention.

Schema: https://github.com/open-telemetry/semantic-conventions/blob/main/docs/gen-ai/gen-ai-output-messages.json
OutputMessage: {role (required), parts (required), finish_reason (required), name?}
Expand All @@ -123,16 +124,14 @@ def serialize_output_message(choice: dict[str, Any]) -> str:
parts.extend(_content_to_parts(message.get("content")))
parts.extend(_tool_calls_to_parts(message.get("tool_calls")))

return json.dumps(
{
"role": message.get("role", "assistant"),
"parts": parts,
"finish_reason": choice.get("finish_reason", ""),
}
)
return {
"role": message.get("role", "assistant"),
"parts": parts,
"finish_reason": choice.get("finish_reason", ""),
}


def serialize_tool_definition(tool: dict[str, Any]) -> str | None:
def format_tool_definition(tool: dict[str, Any]) -> dict[str, Any] | None:
"""Flatten a Mistral tool definition to the OTEL GenAI convention schema.

Mistral format: {"type": "function", "function": {"name": ..., "description": ..., "parameters": ...}}
Expand All @@ -148,9 +147,9 @@ def serialize_tool_definition(tool: dict[str, Any]) -> str | None:
name = func.get("name")
if not name:
return None
serialized: dict = {"type": type, "name": name}
formatted: dict = {"type": type, "name": name}
if (description := func.get("description")) is not None:
serialized["description"] = description
formatted["description"] = description
if (parameters := func.get("parameters")) is not None:
serialized["parameters"] = parameters
return json.dumps(serialized)
formatted["parameters"] = parameters
return formatted
39 changes: 21 additions & 18 deletions src/mistralai/extra/observability/otel.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@
from opentelemetry.baggage import get_baggage
from opentelemetry.trace import Span, Status, StatusCode, Tracer, set_span_in_context

from .serialization import (
serialize_input_message,
serialize_output_message,
serialize_tool_definition,
from .formatting import (
format_input_message,
format_output_message,
format_tool_definition,
)
from .streaming import accumulate_chunks_to_response_dict, parse_sse_chunks

Expand Down Expand Up @@ -185,18 +185,20 @@ def _enrich_request_genai_attrs(
# Chat/agent completion API uses messages in request body; conversation API uses inputs
input_messages = request_body.get("messages") or request_body.get("inputs")
if isinstance(input_messages, str):
attributes[gen_ai_attributes.GEN_AI_INPUT_MESSAGES] = [
serialize_input_message({"role": "user", "content": input_messages})
]
attributes[gen_ai_attributes.GEN_AI_INPUT_MESSAGES] = json.dumps(
[format_input_message({"role": "user", "content": input_messages})]
)
elif isinstance(input_messages, list):
attributes[gen_ai_attributes.GEN_AI_INPUT_MESSAGES] = list(
map(serialize_input_message, input_messages)
attributes[gen_ai_attributes.GEN_AI_INPUT_MESSAGES] = json.dumps(
list(map(format_input_message, input_messages))
)
# Tool definitions
if tools := request_body.get("tools"):
attributes[gen_ai_attributes.GEN_AI_TOOL_DEFINITIONS] = list(
filter(None, map(serialize_tool_definition, tools))
)
formatted_tools = list(filter(None, map(format_tool_definition, tools)))
if formatted_tools:
attributes[gen_ai_attributes.GEN_AI_TOOL_DEFINITIONS] = json.dumps(
formatted_tools
)
# TODO: For agent start conversation, add agent id and version attributes here ?

set_available_attributes(span, attributes)
Expand Down Expand Up @@ -244,8 +246,8 @@ def _enrich_response_genai_attrs(
if finish_reasons:
attributes[gen_ai_attributes.GEN_AI_RESPONSE_FINISH_REASONS] = finish_reasons
if choices:
attributes[gen_ai_attributes.GEN_AI_OUTPUT_MESSAGES] = list(
map(serialize_output_message, choices)
attributes[gen_ai_attributes.GEN_AI_OUTPUT_MESSAGES] = json.dumps(
list(map(format_output_message, choices))
)

# Usage
Expand Down Expand Up @@ -305,7 +307,8 @@ def _create_tool_execution_child_span(
if isinstance(tool_arguments, str)
else (json.dumps(tool_arguments) if tool_arguments else None),
gen_ai_attributes.GEN_AI_TOOL_CALL_RESULT: tool_result
and json.dumps(tool_result),
if isinstance(tool_result, str)
else (json.dumps(tool_result) if tool_result else None),
gen_ai_attributes.GEN_AI_TOOL_NAME: output.get("name"),
gen_ai_attributes.GEN_AI_TOOL_TYPE: "extension",
}
Expand Down Expand Up @@ -338,9 +341,9 @@ def _create_message_output_child_span(
gen_ai_attributes.GEN_AI_RESPONSE_ID: output.get("id"),
gen_ai_attributes.GEN_AI_AGENT_ID: output.get("agent_id"),
gen_ai_attributes.GEN_AI_RESPONSE_MODEL: output.get("model"),
gen_ai_attributes.GEN_AI_OUTPUT_MESSAGES: [
serialize_output_message(choice_wrapper)
],
gen_ai_attributes.GEN_AI_OUTPUT_MESSAGES: json.dumps(
[format_output_message(choice_wrapper)]
),
}
set_available_attributes(child_span, message_attributes)
child_span.end(end_time=end_ns)
Expand Down
Loading
Loading