forked from rungalileo/sdk-examples
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
263 lines (209 loc) · 11.1 KB
/
Copy pathmain.py
File metadata and controls
263 lines (209 loc) · 11.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
import os
from typing import TypedDict
# Load environment variables first (contains API keys and project settings)
import dotenv
dotenv.load_dotenv()
# ============================================================================
# OPENTELEMETRY & GALILEO IMPORTS
# ============================================================================
# OpenTelemetry (OTel) is an observability framework that helps you collect
# traces, metrics, and logs from your applications. Think of it as a way to
# "instrument" your code so you can see exactly what's happening during execution.
# Core OpenTelemetry imports
from opentelemetry.sdk import trace as trace_sdk # SDK for creating traces
from opentelemetry import trace as trace_api # API for interacting with traces
from opentelemetry.sdk.trace.export import (
BatchSpanProcessor,
) # Efficiently batches spans before export
from opentelemetry.exporter.otlp.proto.http.trace_exporter import (
OTLPSpanExporter,
) # Sends traces via HTTP
from opentelemetry.sdk.trace.export import (
ConsoleSpanExporter,
) # Prints traces to console (for debugging)
# OpenInference is a specialized instrumentation library that understands AI frameworks
# It automatically creates meaningful spans for LangChain/LangGraph operations
from openinference.instrumentation.langchain import LangChainInstrumentor
from openinference.instrumentation.openai import OpenAIInstrumentor
# LangGraph imports - this is what we're actually instrumenting
from langgraph.graph import StateGraph, END
# OpenAI imports for LLM integration
import openai
# ============================================================================
# STEP 1: CONFIGURE API AUTHENTICATION
# ============================================================================
# Configure OpenAI API key
openai_api_key = os.environ.get("OPENAI_API_KEY")
if not openai_api_key:
raise ValueError("OPENAI_API_KEY environment variable is required")
# Initialize OpenAI client
client = openai.OpenAI(api_key=openai_api_key)
print("✓ OpenAI client configured")
# Galileo is an AI observability platform that helps you monitor and debug
# AI applications. It receives and visualizes the traces we'll generate.
# Set up authentication headers for Galileo
# These tell Galileo who you are and which project to store traces in
headers = {
"Galileo-API-Key": os.environ.get("GALILEO_API_KEY"), # Your unique API key
"project": os.environ.get("GALILEO_PROJECT"), # Which Galileo project to use
"logstream": os.environ.get("GALILEO_LOG_STREAM", "default"), # Organize traces within the project
}
# OpenTelemetry requires headers in a specific format: "key1=value1,key2=value2"
# This converts our dictionary to that format
os.environ["OTEL_EXPORTER_OTLP_TRACES_HEADERS"] = ",".join([f"{k}={v}" for k, v in headers.items()])
# Debug: Print the formatted headers to verify they're correct
print(f"OTEL Headers: {os.environ['OTEL_EXPORTER_OTLP_TRACES_HEADERS']}")
# ============================================================================
# STEP 2: CONFIGURE OPENTELEMETRY TRACING
# ============================================================================
# OpenTelemetry works by creating "spans" - units of work that represent operations
# in your application. Spans are organized into "traces" that show the full flow
# of a request through your system.
# Define where to send the traces - Galileo's OpenTelemetry endpoint.
# Galileo's OTel ingest lives on the `api.` subdomain (not the `console.`/`app.`
# one you log into). We derive it from GALILEO_CONSOLE_URL so custom deployments
# (e.g. https://console.demo-v2.galileocloud.io/) route to their own ingest
# (https://api.demo-v2.galileocloud.io/otel/traces) instead of app.galileo.ai.
console_url = os.environ.get("GALILEO_CONSOLE_URL", "https://app.galileo.ai").rstrip("/")
api_url = console_url.replace("://console.", "://api.").replace("://app.", "://api.")
endpoint = f"{api_url}/otel/traces"
print(f"OTEL endpoint: {endpoint}")
# Create a TracerProvider with descriptive resource information
# This helps identify these traces as coming from OpenTelemetry in Galileo
from opentelemetry.sdk.resources import Resource
resource = Resource.create(
{
"service.name": "LangGraph-OpenTelemetry-Demo",
"service.version": "1.0.0",
"deployment.environment": "development",
}
)
tracer_provider = trace_sdk.TracerProvider(resource=resource)
# Add a span processor that sends traces to Galileo
# BatchSpanProcessor is more efficient than SimpleSpanProcessor for production
# because it batches multiple spans together before sending
tracer_provider.add_span_processor(BatchSpanProcessor(OTLPSpanExporter(endpoint))) # OTLP = OpenTelemetry Protocol
# OPTIONAL: Console output disabled to reduce noise in Galileo
# Uncomment the next 3 lines if you want local console debugging:
# tracer_provider.add_span_processor(
# BatchSpanProcessor(ConsoleSpanExporter())
# )
# Register our tracer provider as the global one
# This means all OpenTelemetry operations will use our configuration
trace_api.set_tracer_provider(tracer_provider=tracer_provider)
# ============================================================================
# STEP 3: APPLY OPENINFERENCE INSTRUMENTATION
# ============================================================================
# OpenInference automatically instruments LangChain/LangGraph to create spans
# for AI operations. This gives us detailed visibility into:
# - LangGraph workflow execution
# - Individual node processing
# - State transitions
# - Input/output data
LangChainInstrumentor().instrument(tracer_provider=tracer_provider)
print("✓ LangGraph instrumentation applied - automatic spans will be created")
# Also instrument OpenAI calls to capture LLM input/output
OpenAIInstrumentor().instrument(tracer_provider=tracer_provider)
print("✓ OpenAI instrumentation applied - LLM calls will be traced")
# Get a tracer for creating custom spans manually
# We'll use this in our node functions below
tracer = trace_api.get_tracer(__name__)
# ============================================================================
# STEP 4: DEFINE THE LANGGRAPH STATE AND NODES
# ============================================================================
# LangGraph uses a shared state object (a dict) that flows through nodes. Each
# node reads from the state and can write updates back to it.
class AgentState(TypedDict, total=False):
user_input: str # The user's input question
llm_response: str # The raw response from the LLM
parsed_answer: str # The processed/cleaned answer
# Node 1: Input Validation
# Validates and prepares the user input for processing
def validate_input(state: AgentState):
user_input = state.get("user_input", "")
print(f"📥 Validating input: '{user_input}'")
# Add span attributes for better observability
current_span = trace_api.get_current_span()
if current_span:
current_span.set_attribute("input.value", str(state))
current_span.set_attribute("output.value", user_input)
current_span.set_attribute("node.type", "validation")
return {"user_input": user_input}
# Node 2: Generate Response
# Calls OpenAI to generate a response to the user's question
# OpenAI instrumentation will automatically create detailed spans
def generate_response(state: AgentState):
user_input = state["user_input"]
try:
print(f"⚙️ Calling OpenAI with: '{user_input}'")
# Make the OpenAI API call - OpenAI instrumentation handles tracing
response = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": user_input}],
max_tokens=300,
temperature=0.7,
)
# Extract the response content
llm_response = response.choices[0].message.content
print(f"✓ Received response: '{llm_response[:100]}...'")
return {"llm_response": llm_response}
except Exception as e:
print(f"❌ Error calling OpenAI: {e}")
return {"llm_response": f"Error: {str(e)}"}
# Node 3: Format Answer
# Extracts and formats a clean answer from the raw LLM response
def format_answer(state: AgentState):
llm_response = state.get("llm_response", "")
# Simple parsing - extract first sentence for a concise answer
sentences = llm_response.split(". ")
parsed_answer = sentences[0] if sentences else llm_response
# Clean up the answer
parsed_answer = parsed_answer.strip()
if not parsed_answer.endswith(".") and parsed_answer:
parsed_answer += "."
print(f"✨ Parsed answer: '{parsed_answer}'")
# Add span attributes for better observability
current_span = trace_api.get_current_span()
if current_span:
current_span.set_attribute("input.value", llm_response)
current_span.set_attribute("output.value", parsed_answer)
current_span.set_attribute("node.type", "formatting")
return {"parsed_answer": parsed_answer}
# ============================================================================
# STEP 5: BUILD AND RUN THE LANGGRAPH WORKFLOW
# ============================================================================
workflow = StateGraph(AgentState)
workflow.add_node("validate_input", validate_input)
workflow.add_node("generate_response", generate_response)
workflow.add_node("format_answer", format_answer)
# Entry point and edges define the control flow of the graph
workflow.set_entry_point("validate_input")
workflow.add_edge("validate_input", "generate_response")
workflow.add_edge("generate_response", "format_answer")
workflow.add_edge("format_answer", END)
# Compile builds the runnable app
app = workflow.compile()
# Run the app and observe traces in both console and Galileo
if __name__ == "__main__":
# Create a session-level span to group all operations
with tracer.start_as_current_span("astronomy_qa_session") as session_span:
inputs = {"user_input": "what moons did galileo discover"}
# Add OpenInference-compatible attributes for proper input/output display
session_span.set_attribute("input.value", inputs["user_input"])
session_span.set_attribute("input.mime_type", "text/plain")
session_span.set_attribute("session.type", "question_answering")
session_span.set_attribute("session.domain", "astronomy")
result = app.invoke(inputs)
# Add result attributes with OpenInference-compatible format
if result.get("llm_response"):
final_answer = result.get("parsed_answer", result.get("llm_response"))
session_span.set_attribute("output.value", final_answer)
session_span.set_attribute("output.mime_type", "text/plain")
session_span.set_status(trace_api.Status(trace_api.StatusCode.OK))
else:
session_span.set_status(trace_api.Status(trace_api.StatusCode.ERROR, "No response generated"))
print(f"\n=== FINAL RESULT ===")
print(f"Question: {result.get('user_input', 'N/A')}")
print(f"LLM Response: {result.get('llm_response', 'N/A')}")
print(f"Parsed Answer: {result.get('parsed_answer', 'N/A')}")
print("✓ Execution complete - check Galileo for traces in your project/log stream")