diff --git a/.gitignore b/.gitignore index cc15720..8813c10 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ __pycache__ .adk/ -.env \ No newline at end of file +.env +.venv/ \ No newline at end of file diff --git a/agents/analyze_agent_v2/agent.py b/agents/analyze_agent_v2/agent.py index 8d57b44..212d168 100644 --- a/agents/analyze_agent_v2/agent.py +++ b/agents/analyze_agent_v2/agent.py @@ -1,15 +1,16 @@ """ -Analyze Agent v2 — Analysis agent designed for search_agent_v2 output. +Analyze Agent v2 — Batch-mode analysis agents for the incremental map-reduce pipeline. -Consumes the state keys set by ExhaustiveSearchAgent: - - mobius_logs (JSON string: list of _source dicts) - - sse_mse_logs (JSON string: list of _source dicts) - - wxcas_logs (JSON string: list of _source dicts) - - search_summary (JSON string: {total_mobius_logs, total_sse_mse_logs, - total_wxcas_logs, max_depth_reached, total_ids_searched, - search_history}) +Invoked programmatically via ADK Runner from incremental.py's run_analysis_consumer(). +Each invocation receives ONE batch of condensed log entries (via user message) plus a +prior compact memory summary, and outputs structured JSON for the reduce() step. -Routes to calling_agent or contact_center_agent based on serviceIndicator. +Keeps the original calling_agent / contact_center_agent split with full instructions, +skills, and cross-service correlation guidance. Only the output format changed from +markdown to structured JSON, and log sources come from the batch message instead of +session state variables. + +Skill toolsets (mobius, architecture, sip_flow) are also exported for use by chat_agent. """ import os @@ -44,23 +45,22 @@ def _make_model() -> SessionLiteLlm: # ═══════════════════════════════════════════════════════════════════════════════ _SEARCH_CONTEXT_PREAMBLE = """ -**Search Context (from exhaustive BFS search):** -The logs below were collected by an exhaustive graph-traversal search agent that: +**Batch Analysis Context (from exhaustive BFS search):** +You will receive ONE BATCH of condensed log entries from a Webex Calling / Contact Center \ +platform, along with a PRIOR ANALYSIS SUMMARY from earlier batches (compact memory). + +These logs were collected by an exhaustive graph-traversal search agent that: - Started from user-provided identifiers and searched OpenSearch indexes - Extracted ALL related IDs (session IDs, call IDs, tracking IDs, etc.) - Recursively searched for those IDs across multiple indexes and services - Ran searches in parallel for speed -Search summary: {search_summary} - -This means you may have logs spanning MULTIPLE call legs, forwarded sessions, +This means the batch may contain logs spanning MULTIPLE call legs, forwarded sessions, retries, or related interactions that a single-ID search would have missed. -Use the search_summary to understand the scope: how many IDs were searched, -what depth the BFS reached, and what indexes were queried. -**IMPORTANT: You must analyze EVERY log entry. Do NOT skip or summarize groups of logs. -Read each log line, extract its meaning, and incorporate it into the analysis. -If there are hundreds of logs, produce a correspondingly detailed analysis.** +**IMPORTANT: You must analyze EVERY log entry in this batch. Do NOT skip or summarize \ +groups of logs. Read each log line, extract its meaning, and incorporate it into the analysis. +If there is a prior analysis summary, build upon it — focus on what is NEW in this batch.** """ _ANALYSIS_POINTS = """ @@ -117,103 +117,70 @@ def _make_model() -> SessionLiteLlm: - Track the same transaction across service boundaries """ -_OUTPUT_STRUCTURE = """ -**Output Detail Level:** {detailed_analysis} -If detailed_analysis is false, empty, or not set: -- COMPLETELY OMIT the "HTTP Communication Flow (Detailed)" section — do not print its header, do not print a placeholder, do not mention it at all -- COMPLETELY OMIT the "SIP Communication Flow (Detailed)" section — do not print its header, do not print a placeholder, do not mention it at all -- Your output must end after the "Final Outcome" section. Nothing after it. - -If detailed_analysis is true: -- Include ALL sections below, including the detailed HTTP and SIP Communication Flow - -**Output structure (follow this EXACTLY):** - ---- -### ❗ Root Cause Analysis -(Place this section at the VERY TOP of your analysis—first section. If no errors/issues were found, state "No errors or issues detected" and briefly confirm the flow succeeded.) - -For EACH issue found: -→ **[Timestamp]**: ErrorType (ErrorCode) -→ **Service**: Which service generated the error -→ **Context**: What was happening when this error occurred -→ **Description**: Detailed explanation of what went wrong -→ **Potential Root Causes**: List all possible causes, ranked by likelihood -→ **Suggested Fix**: Clear, actionable steps to resolve -→ **Impact**: How did this error affect the call/session? -→ **Notes**: Documentation references, escalation contacts, related issues - ---- -### 🔍 Extracted Identifiers -List ALL unique identifiers found across all log sources: -- **User ID**: -- **Device ID**: -- **Tracking ID**: (print baseTrackingID_* to represent multiple suffixes. Don't print all suffixes) -- **Call ID** (Mobius): -- **Call ID** (SSE/SIP): -- **Session ID (local)**: -- **Session ID (remote)**: -- **Meeting ID** (if any): -- **Trace ID** (if any): - ---- -### 📊 Search Scope -- **IDs searched**: (from search_summary.total_ids_searched) -- **Indexes queried**: (list unique indexes from search_summary.search_history) -- **Total logs analyzed**: Mobius: X, SSE/MSE: Y, WxCAS: Z - ---- -### 🔗 Cross-Service Correlation -Map how the same transaction flows across services: -- Tracking ID X in Mobius → corresponds to Call-ID Y in SSE → routed via WxCAS as Z -- Note any missing correlations or gaps in the flow - ---- -### ⏱️ Timing Analysis -- **Call setup time**: (INVITE to 200 OK) -- **Total duration**: (first log to last log, or INVITE to BYE) -- **Notable delays**: List any gaps > 2 seconds between expected sequential events -- **Retries/Retransmissions**: Count and note if any - ---- -### ✅ Final Outcome -Provide a comprehensive summary of the entire flow: -- What type of call was this? (WebRTC-to-WebRTC, WebRTC-to-PSTN, etc.) -- Did the call succeed or fail? -- Complete signaling path taken -- Media path established (or not) -- Any degradation or issues even if the call succeeded - ---- -### 📡 HTTP Communication Flow (Detailed) -Place this section at the BOTTOM of your analysis, after Root Cause Analysis and all summaries. -List ALL HTTP requests and responses in strict chronological order. -Each entry should be ONE concise line with the format: - -→ **[Timestamp]** Source → Destination: METHOD /path - StatusCode (Brief description) - -Example: -→ **[2026-02-13T10:00:00Z]** Client → Mobius: POST /v1/calling/web/devices/.../call - 200 OK (Call initiation) -→ **[2026-02-13T10:00:01Z]** Mobius → CPAPI: GET /features - 200 OK (Feature retrieval) - -**Do NOT skip any HTTP interactions.** If there are 50 requests, list all 50. - ---- -### 📞 SIP Communication Flow (Detailed) -List ALL SIP messages in strict chronological order, after the HTTP Communication Flow. -Keep Mobius, SSE, MSE, and WxCAS as separate participants. - -→ **[Timestamp]** Source → Destination: SIP Method/Response - Call-ID: xxx - Description -→ **[Timestamp]** Mobius → SSE: SIP INVITE - Call-ID: SSE0520... - Initial call setup -→ **[Timestamp]** SSE → Mobius: 100 Trying - Call-ID: SSE0520... - Call being processed -→ **[Timestamp]** SSE → WxCAS: INVITE - Call-ID: SSE0520... - Routing to app server -→ **[Timestamp]** WxCAS → SSE: 200 OK - Call-ID: SSE0520... - Call accepted -→ **[Timestamp]** SSE → Mobius: 200 OK - Call-ID: SSE0520... - Final response - -Include SDP summary when available (codec, media type, ICE candidates count). -**Do NOT skip any SIP messages.** Reconstruct the COMPLETE dialog. - ---- +_JSON_OUTPUT_SCHEMA = """\ +## Output Format + +Output ONLY valid JSON — no markdown fences, no preamble, no explanation outside the JSON. +Your analysis from the sections above must be captured in the structured fields below. + +{ + "new_identifiers": { + "session_ids": [""], + "call_ids": [""], + "sip_call_ids": [""], + "sse_call_ids": [""], + "tracking_ids": [""], + "user_ids": [""], + "device_ids": [""], + "trace_ids": [""] + }, + "events": [ + { + "timestamp": "", + "type": "HTTP|SIP|media|routing|registration|websocket|error", + "source": "", + "destination": "", + "detail": "" + } + ], + "errors": [ + { + "timestamp": "", + "code": "", + "service": "", + "message": "", + "suspected_cause": "", + "context": "", + "suggested_fix": "", + "impact": "" + } + ], + "state_updates": [ + { + "timestamp": "", + "transition": "", + "from_state": "", + "to_state": "" + } + ], + "evidence_refs": [ + { + "doc_id": "", + "index": "", + "timestamp": "", + "category": "mobius|sse_mse|wxcas", + "relevance": "" + } + ], + "delta_summary": "<2-4 sentence summary of what THIS batch reveals that is NEW compared to the prior summary. Include: call type if identifiable, key milestones, errors found, cross-service correlations, timing anomalies.>" +} + +**Capture EVERY HTTP request/response and EVERY SIP message as individual events.** +Do NOT skip any. If there are 50 HTTP requests, produce 50 event entries. +If there are 20 SIP messages, produce 20 event entries. +Include SDP summaries (codec, media type, ICE candidates count) in SIP event details when available. + +If no items exist for a category, use an empty list []. """ @@ -222,17 +189,17 @@ def _make_model() -> SessionLiteLlm: # ═══════════════════════════════════════════════════════════════════════════════ mobius_error_skill = load_skill_from_dir( - Path(__file__).parent / "skills" / "mobius_error_id_skill" + Path(__file__).parent / "skills" / "mobius-error-id-skill" ) mobius_skill_toolset = skill_toolset.SkillToolset(skills=[mobius_error_skill]) architecture_endpoints_skill = load_skill_from_dir( - Path(__file__).parent / "skills" / "architecture_endpoints_skill" + Path(__file__).parent / "skills" / "architecture-endpoints-skill" ) architecture_skill_toolset = skill_toolset.SkillToolset(skills=[architecture_endpoints_skill]) sip_flow_skill = load_skill_from_dir( - Path(__file__).parent / "skills" / "sip_flow_skill" + Path(__file__).parent / "skills" / "sip-flow-skill" ) sip_flow_skill_toolset = skill_toolset.SkillToolset(skills=[sip_flow_skill]) @@ -244,7 +211,6 @@ def _make_model() -> SessionLiteLlm: calling_agent = LlmAgent( model=_make_model(), name="calling_agent", - output_key="analyze_results", tools=[mobius_skill_toolset, architecture_skill_toolset, sip_flow_skill_toolset], instruction=f"""You are a senior VoIP/WebRTC debugging expert with deep expertise in HTTP, WebRTC, SIP, SDP, RTP, SRTP, DTLS, ICE, TCP, UDP, TLS, and related protocols. You produce EXHAUSTIVE, production-grade debug analyses that leave no log entry unexamined. @@ -252,12 +218,13 @@ def _make_model() -> SessionLiteLlm: Use the **architecture_endpoints_skill** for service roles, signaling/media paths, and WebRTC Calling architecture (see references/architecture_and_endpoints.md — endpoints and WebRTC Calling sections). Use the **sip_flow_skill** for SIP message sequences, response code meanings, SDP negotiation details, SIP timers, and common failure patterns (see references/sip_flows.md). +Use the **mobius_error_id_skill** when you encounter mobius-error codes or unexpected HTTP status codes from Mobius. -**Log Sources — Analyze ALL of them thoroughly:** -1. **Mobius logs** from {{{{mobius_logs}}}} (logstash-wxm-app indexes) — HTTP/WebSocket signaling, SIP translation, device registration -2. **SSE/MSE logs** from {{{{sse_mse_logs}}}} (logstash-wxcalling indexes) — SIP edge signaling, media relay -3. **WxCAS logs** from {{{{wxcas_logs}}}} (logstash-wxcalling indexes) — Call routing, destination resolution, application server logic -4. **SDK/Client logs** from {{{{sdk_logs}}}} (uploaded by user) — Client-side SDK perspective (browser/app WebRTC logs) +**Log Sources in this batch — recognize them by their tags/index patterns:** +1. **Mobius logs** (logstash-wxm-app indexes, tags: mobius) — HTTP/WebSocket signaling, SIP translation, device registration +2. **SSE/MSE logs** (logstash-wxcalling indexes, tags: sse, mse) — SIP edge signaling, media relay +3. **WxCAS logs** (logstash-wxcalling indexes, tags: wxcas) — Call routing, destination resolution, application server logic +4. **SDK/Client logs** (uploaded by user) — Client-side SDK perspective (browser/app WebRTC logs) When SDK/Client logs are present, these provide the browser/app perspective. Correlate with server-side logs when both are available. @@ -277,7 +244,7 @@ def _make_model() -> SessionLiteLlm: {_ANALYSIS_POINTS} -{_OUTPUT_STRUCTURE} +{_JSON_OUTPUT_SCHEMA} """, ) @@ -289,7 +256,6 @@ def _make_model() -> SessionLiteLlm: contact_center_agent = LlmAgent( model=_make_model(), name="contact_center_agent", - output_key="analyze_results", tools=[architecture_skill_toolset, sip_flow_skill_toolset], instruction=f"""You are a senior VoIP/Contact Center debugging expert with deep expertise in HTTP, WebRTC, SIP, SDP, RTP, SRTP, DTLS, ICE, TCP, UDP, TLS, and related protocols. You produce EXHAUSTIVE, production-grade debug analyses that leave no log entry unexamined. @@ -298,17 +264,20 @@ def _make_model() -> SessionLiteLlm: Use the **architecture_endpoints_skill** for service roles and Contact Center architecture (see references/architecture_and_endpoints.md — endpoints and Contact Center sections). Use the **sip_flow_skill** for SIP message sequences, response code meanings, SDP negotiation details, SIP timers, and common failure patterns (see references/sip_flows.md). -**Log Sources — Analyze ALL of them thoroughly:** -1. **Mobius logs** from {{{{mobius_logs}}}} (logstash-wxm-app indexes) — HTTP/WebSocket signaling, SIP translation -2. **SSE/MSE logs** from {{{{sse_mse_logs}}}} (logstash-wxcalling indexes) — SIP edge signaling, media relay -3. **WxCAS logs** from {{{{wxcas_logs}}}} (logstash-wxcalling indexes) — Call routing logic -4. **SDK/Client logs** from {{{{sdk_logs}}}} (uploaded by user) — Client-side SDK perspective +**Log Sources in this batch — recognize them by their tags/index patterns:** +1. **Mobius logs** (logstash-wxm-app indexes, tags: mobius) — HTTP/WebSocket signaling, SIP translation +2. **SSE/MSE logs** (logstash-wxcalling indexes, tags: sse, mse) — SIP edge signaling, media relay +3. **WxCAS logs** (logstash-wxcalling indexes, tags: wxcas) — Call routing logic +4. **SDK/Client logs** (uploaded by user) — Client-side SDK perspective When SDK/Client logs are present, these provide the browser/app perspective. Correlate with server-side logs when both are available. **Cross-source correlation is CRITICAL:** - The SAME call appears in multiple log sources with different perspectives - Correlate using shared IDs: Call-ID, Session ID, Tracking ID +- Mobius logs show the browser↔server HTTP side +- SSE logs show the SIP signaling side of the same events +- WxCAS logs show routing decisions - Present a UNIFIED view that stitches together all perspectives Because the search was exhaustive (BFS), you may see logs from MULTIPLE call legs, @@ -316,7 +285,7 @@ def _make_model() -> SessionLiteLlm: {_ANALYSIS_POINTS} -{_OUTPUT_STRUCTURE} +{_JSON_OUTPUT_SCHEMA} """, ) @@ -325,35 +294,21 @@ def _make_model() -> SessionLiteLlm: # Coordinator: Routes to calling or contact center based on serviceIndicator # ═══════════════════════════════════════════════════════════════════════════════ - -def _ensure_state_defaults(callback_context) -> None: - """Guarantee optional state keys exist so {var} references don't KeyError.""" - callback_context.state.setdefault("sdk_logs", "") - - -analyze_agent = LlmAgent( - name="analyze_agent_v2", - output_key="analyze_results", +batch_analysis_agent = LlmAgent( + name="batch_analysis_agent", model=_make_model(), - before_agent_callback=_ensure_state_defaults, instruction=""" -Context: You are analyzing logs from a WebRTC Calling or contact center flow, -which involves talking to different endpoints using protocols like HTTP, SIP, -WebRTC, SDP, RTP, TLS. - -You have access to the full search results from an exhaustive BFS search: -- Search summary: {search_summary} -- Mobius logs: {mobius_logs} -- SSE/MSE logs: {sse_mse_logs} -- WxCAS logs: {wxcas_logs} -- SDK/Client logs: {sdk_logs} - -Use `serviceIndicator` from logs to classify the session: +You are a log analysis router. You will receive a batch of condensed log entries +from a Webex Calling / Contact Center platform. + +Look at the log entries for `serviceIndicator` fields to classify the session: - `calling`, `guestCalling` → WebRTC Calling Flow, transfer to `calling_agent` - `contactCenter` → Contact Center Flow, transfer to `contact_center_agent` If no `serviceIndicator` is found, default to `calling_agent`. + +Transfer the FULL user message (batch data) to the selected agent. """, - description="Routes analysis to Calling or ContactCenter agent based on serviceIndicator in logs.", + description="Routes batch analysis to Calling or ContactCenter agent based on serviceIndicator in logs.", sub_agents=[calling_agent, contact_center_agent], ) diff --git a/agents/analyze_agent_v2/incremental.py b/agents/analyze_agent_v2/incremental.py new file mode 100644 index 0000000..5bebc45 --- /dev/null +++ b/agents/analyze_agent_v2/incremental.py @@ -0,0 +1,794 @@ +""" +Incremental Map-Reduce Analysis — processes log batches as they arrive from search. + +MAP step invokes batch_analysis_agent (from agent.py) via ADK Runner, giving each +batch the full power of calling_agent / contact_center_agent with skills and routing. +A single temporary ADK session is created per analysis run (not per batch) and +destroyed when the run finishes. + +Exports a clean function interface consumed by search_agent_v2: + - new_rolling_analysis() → empty rolling state + - map_batch() → MAP: one batch via ADK Runner → structured JSON + - reduce() → REDUCE: merge map output into rolling state + - compress_analysis_summary() → shrink rolling summary when it exceeds token cap + - format_to_markdown() → convert final rolling state to markdown report + - run_analysis_consumer() → asyncio.Queue consumer loop (producer-consumer pattern) + - analyze_upload_only() → single-pass analysis for SDK-only uploads +""" + +import asyncio +import json +import logging +import os +import uuid +from typing import Any + +import litellm +from dotenv import load_dotenv +from pathlib import Path + +from google.adk.runners import Runner +from google.adk.sessions import InMemorySessionService +from google.genai import types + +env_path = Path(__file__).parent.parent / ".env" +load_dotenv(dotenv_path=env_path) + +logger = logging.getLogger(__name__) + +from analyze_agent_v2.agent import batch_analysis_agent + +# ═══════════════════════════════════════════════════════════════════════════════ +# ADK Runner setup (reused across all map_batch calls) +# ═══════════════════════════════════════════════════════════════════════════════ + +_APP_NAME = "log-analyzer-incremental" +_USER_ID = "incremental-pipeline" + +_session_service = InMemorySessionService() +_runner = Runner( + agent=batch_analysis_agent, + app_name=_APP_NAME, + session_service=_session_service, +) + +# ═══════════════════════════════════════════════════════════════════════════════ +# Constants +# ═══════════════════════════════════════════════════════════════════════════════ + +CHARS_PER_TOKEN_ESTIMATE = 4 +ROLLING_SUMMARY_TOKEN_CAP = 4_000 +TIMELINE_MAX_EVENTS = 50 + +_IDENTIFIER_KEYS = [ + "session_ids", + "call_ids", + "tracking_ids", + "user_ids", + "device_ids", + "trace_ids", + "sip_call_ids", + "sse_call_ids", +] + + +# ═══════════════════════════════════════════════════════════════════════════════ +# Data Structures +# ═══════════════════════════════════════════════════════════════════════════════ + + +def new_rolling_analysis() -> dict: + """Factory: returns an empty rolling_analysis structure.""" + return { + "identifiers": {k: [] for k in _IDENTIFIER_KEYS}, + "timeline": [], + "errors": [], + "state_machine": [], + "cross_service_correlations": [], + "summary": "", + "evidence_count": 0, + "batch_count": 0, + } + + +def _estimate_tokens(text: str) -> int: + """Estimate token count from character length.""" + return len(text) // CHARS_PER_TOKEN_ESTIMATE + + +def _get_llm_config() -> tuple[str, str]: + """Return (api_key, api_base) for LLM calls.""" + api_key = ( + os.environ.get("OPENAI_API_KEY") + or os.environ.get("AZURE_OPENAI_API_KEY") + or "pending-oauth" + ) + api_base = os.environ["AZURE_OPENAI_ENDPOINT"] + return api_key, api_base + + +def _parse_json_from_llm(raw: Any) -> dict: + """Extract a JSON object from LLM output, handling markdown code blocks.""" + import re + + if isinstance(raw, dict): + return raw + raw = str(raw) + try: + return json.loads(raw) + except (json.JSONDecodeError, TypeError): + pass + m = re.search(r"```(?:json)?\s*(\{.*?\})\s*```", raw, re.DOTALL) + if m: + try: + return json.loads(m.group(1)) + except (json.JSONDecodeError, TypeError): + pass + m = re.search(r"\{.*\}", raw, re.DOTALL) + if m: + try: + return json.loads(m.group(0)) + except (json.JSONDecodeError, TypeError): + pass + logger.warning("[_parse_json_from_llm] Could not extract JSON, returning empty dict") + return {} + + +# ═══════════════════════════════════════════════════════════════════════════════ +# MAP Step — invokes batch_analysis_agent via ADK Runner +# ═══════════════════════════════════════════════════════════════════════════════ + +_MAP_USER_TEMPLATE = """\ +## Prior Analysis Summary +{compact_memory} + +## Log Batch (analyze this) +{batch_json} +""" + + +async def map_batch( + condensed_hits: list[dict], + compact_memory: str, + session_id: str, +) -> dict: + """MAP step: analyze one batch of log entries via ADK Runner. + + Sends the batch + compact_memory as a user message to the shared session + and collects the structured JSON response from the batch_analysis_agent + (which routes to calling_agent or contact_center_agent with full skills). + + Args: + condensed_hits: list of condensed log entries (from extract_id_fields_for_llm) + compact_memory: the rolling_analysis["summary"] from prior batches (few KB) + session_id: the shared session ID for this analysis run + + Returns: + MapOutput dict matching the JSON schema, or empty dict on failure. + """ + batch_json = json.dumps(condensed_hits, default=str) + + user_content = _MAP_USER_TEMPLATE.format( + compact_memory=compact_memory or "(No prior analysis — this is the first batch)", + batch_json=batch_json, + ) + + try: + user_message = types.Content( + role="user", + parts=[types.Part.from_text(text=user_content)], + ) + + final_text = "" + async for event in _runner.run_async( + user_id=_USER_ID, + session_id=session_id, + new_message=user_message, + ): + if event.content and event.content.parts: + for part in event.content.parts: + if part.text: + final_text = part.text + + result = _parse_json_from_llm(final_text or "{}") + + logger.info( + f"[map_batch] ADK Runner result: " + f"events={len(result.get('events', []))}, " + f"errors={len(result.get('errors', []))}, " + f"state_updates={len(result.get('state_updates', []))}, " + f"evidence_refs={len(result.get('evidence_refs', []))}" + ) + return result + + except Exception as e: + logger.error(f"[map_batch] ADK Runner call failed: {e}") + return {} + + + + +# ═══════════════════════════════════════════════════════════════════════════════ +# REDUCE Step +# ═══════════════════════════════════════════════════════════════════════════════ + + +def reduce( + rolling: dict, + map_output: dict, + evidence_index: list[dict], +) -> tuple[dict, list[dict]]: + """REDUCE step: merge one map_batch output into the rolling analysis. + + Pure Python — no LLM calls. Deduplicates identifiers, appends events/errors/ + state_updates, moves evidence_refs to the separate evidence_index, and appends + the delta_summary to the rolling summary. + + Args: + rolling: the current rolling_analysis dict (mutated in place and returned) + map_output: the structured dict returned by map_batch() + evidence_index: the accumulated evidence list (mutated in place and returned) + + Returns: + (updated rolling_analysis, updated evidence_index) + """ + if not map_output: + return rolling, evidence_index + + rolling["batch_count"] += 1 + + # ── Merge identifiers (deduplicated) ── + new_ids = map_output.get("new_identifiers", {}) + for key in _IDENTIFIER_KEYS: + existing = set(rolling["identifiers"].get(key, [])) + for val in new_ids.get(key, []): + val = str(val).strip() + if val and val not in existing: + existing.add(val) + rolling["identifiers"].setdefault(key, []).append(val) + + # ── Append timeline events (capped at TIMELINE_MAX_EVENTS) ── + new_events = map_output.get("events", []) + rolling["timeline"].extend(new_events) + if len(rolling["timeline"]) > TIMELINE_MAX_EVENTS: + rolling["timeline"] = _prune_timeline(rolling["timeline"]) + + # ── Append errors (never pruned) ── + new_errors = map_output.get("errors", []) + rolling["errors"].extend(new_errors) + + # ── Append state machine transitions ── + new_states = map_output.get("state_updates", []) + rolling["state_machine"].extend(new_states) + + # ── Move evidence_refs to separate index ── + new_evidence = map_output.get("evidence_refs", []) + evidence_index.extend(new_evidence) + rolling["evidence_count"] = len(evidence_index) + + # ── Append delta_summary to rolling summary ── + delta = map_output.get("delta_summary", "") + if delta: + if rolling["summary"]: + rolling["summary"] = f"{rolling['summary']}\n\n[Batch {rolling['batch_count']}] {delta}" + else: + rolling["summary"] = f"[Batch {rolling['batch_count']}] {delta}" + + logger.info( + f"[reduce] Batch {rolling['batch_count']}: " + f"+{len(new_events)} events, +{len(new_errors)} errors, " + f"+{len(new_states)} state_updates, +{len(new_evidence)} evidence_refs, " + f"summary={_estimate_tokens(rolling['summary'])} tokens" + ) + + return rolling, evidence_index + + +def _prune_timeline(timeline: list[dict]) -> list[dict]: + """Keep timeline within TIMELINE_MAX_EVENTS by removing low-value entries. + + Preserves: errors, first/last events, SIP milestones, state changes. + Removes: routine success HTTP requests, redundant info entries. + """ + if len(timeline) <= TIMELINE_MAX_EVENTS: + return timeline + + high_priority_types = {"SIP", "error", "routing", "media", "registration"} + + high = [] + low = [] + for event in timeline: + etype = event.get("type", "") + detail = event.get("detail", "") + is_error = "error" in etype.lower() or "error" in detail.lower() + is_high = etype in high_priority_types or is_error + if is_high: + high.append(event) + else: + low.append(event) + + remaining_slots = TIMELINE_MAX_EVENTS - len(high) + if remaining_slots > 0: + kept_low = low[:remaining_slots] + else: + kept_low = [] + high = high[:TIMELINE_MAX_EVENTS] + + result = high + kept_low + result.sort(key=lambda e: e.get("timestamp", "")) + + logger.info( + f"[_prune_timeline] Pruned {len(timeline)} -> {len(result)} events " + f"({len(high)} high-priority, {len(kept_low)} low-priority kept)" + ) + return result + + +# ═══════════════════════════════════════════════════════════════════════════════ +# Compression +# ═══════════════════════════════════════════════════════════════════════════════ + +_ANALYSIS_COMPRESS_INSTRUCTION = """\ +You are a log analysis compressor. The rolling analysis summary below has grown \ +too large and must be compressed to approximately HALF its current length. + +MUST preserve: +1. ALL errors — timestamps, codes, services, suspected causes (never drop these) +2. ALL correlation-critical IDs (session IDs, call IDs, tracking IDs linking services) +3. Key timeline milestones (first event, last event, SIP state transitions, error events) +4. Cross-service correlation evidence +5. Any unresolved questions or anomalies + +MAY abbreviate or remove: +- Redundant success confirmations +- Verbose details of normal/expected HTTP 200 responses +- Duplicate information across batch summaries +- Routine registration or keep-alive events + +Output the compressed summary directly, no preamble or explanation.\ +""" + + +async def compress_analysis_summary(rolling: dict) -> dict: + """Compress rolling_analysis['summary'] when it exceeds ROLLING_SUMMARY_TOKEN_CAP. + + Calls the LLM to produce a shorter version that preserves errors, IDs, and + key milestones. Mutates and returns the rolling dict. + """ + summary = rolling.get("summary", "") + current_tokens = _estimate_tokens(summary) + + if current_tokens <= ROLLING_SUMMARY_TOKEN_CAP: + return rolling + + logger.info( + f"[compress_analysis_summary] Summary at {current_tokens} tokens " + f"(cap={ROLLING_SUMMARY_TOKEN_CAP}), compressing..." + ) + + api_key, api_base = _get_llm_config() + + try: + response = await litellm.acompletion( + model="openai/gpt-4.1", + api_key=api_key, + api_base=api_base, + extra_headers={"x-cisco-app": "microservice-log-analyzer"}, + messages=[ + {"role": "system", "content": _ANALYSIS_COMPRESS_INSTRUCTION}, + {"role": "user", "content": summary}, + ], + temperature=0, + ) + + compressed = response.choices[0].message.content or summary + old_tokens = current_tokens + new_tokens = _estimate_tokens(compressed) + + rolling["summary"] = compressed + logger.info( + f"[compress_analysis_summary] Compressed: {old_tokens} -> {new_tokens} tokens " + f"(saved ~{old_tokens - new_tokens})" + ) + + except Exception as e: + logger.error(f"[compress_analysis_summary] Compression failed: {e}") + + return rolling + + +# ═══════════════════════════════════════════════════════════════════════════════ +# Format to Markdown +# ═══════════════════════════════════════════════════════════════════════════════ + + +def format_to_markdown( + rolling: dict, + evidence_index: list[dict], + search_summary: str = "", +) -> str: + """Convert the final rolling_analysis + evidence_index into a markdown report. + + The output mirrors the section structure expected by downstream agents + (sequence_diagram, chat_agent). + """ + lines: list[str] = [] + errors = rolling.get("errors", []) + timeline = rolling.get("timeline", []) + ids = rolling.get("identifiers", {}) + summary = rolling.get("summary", "") + + # ── Root Cause Analysis ── + lines.append("### Root Cause Analysis\n") + if not errors: + lines.append( + "No errors or issues detected. The flow appears to have completed normally.\n" + ) + else: + for i, err in enumerate(errors, 1): + ts = err.get("timestamp", "unknown") + code = err.get("code", "N/A") + svc = err.get("service", "unknown") + msg = err.get("message", "") + cause = err.get("suspected_cause", "") + ctx = err.get("context", "") + fix = err.get("suggested_fix", "") + impact = err.get("impact", "") + + lines.append(f"{i}. [{ts}] — `{code}`\n") + lines.append("| Field | Detail |") + lines.append("|-------|--------|") + lines.append(f"| Service | {svc} |") + lines.append(f"| Description | {msg} |") + if ctx: + lines.append(f"| Context | {ctx} |") + lines.append(f"| Root Cause | {cause} |") + if fix: + lines.append(f"| Suggested Fix | {fix} |") + if impact: + lines.append(f"| Impact | {impact} |") + lines.append("") + + # ── Extracted Identifiers ── + lines.append("### Extracted Identifiers\n") + label_map = { + "tracking_ids": "Tracking ID", + "call_ids": "Call ID (Mobius)", + "sip_call_ids": "Call ID (SIP)", + "sse_call_ids": "Call ID (SSE)", + "session_ids": "Session ID", + "user_ids": "User ID", + "device_ids": "Device ID", + "trace_ids": "Trace ID", + } + has_any_id = False + for key, label in label_map.items(): + vals = ids.get(key, []) + if vals: + has_any_id = True + lines.append(f"- {label}: `{'`, `'.join(vals)}`") + if not has_any_id: + lines.append("No identifiers extracted.") + lines.append("") + + # ── Search Scope ── + if search_summary: + lines.append("### Search Scope\n") + lines.append(search_summary) + lines.append("") + + # ── Cross-Service Correlation ── + lines.append("### Cross-Service Correlation\n") + corrs = rolling.get("cross_service_correlations", []) + if corrs: + for c in corrs: + lines.append(f"- {c}") + else: + if "cross" in summary.lower() or "correlat" in summary.lower(): + lines.append("(See Final Outcome below for cross-service details)") + else: + lines.append("No explicit cross-service correlations captured.") + lines.append("") + + # ── Timing Analysis ── + lines.append("### Timing Analysis\n") + if timeline: + first_ts = timeline[0].get("timestamp", "") + last_ts = timeline[-1].get("timestamp", "") + sip_evts = [e for e in timeline if e.get("type") == "SIP"] + http_evts = [e for e in timeline if e.get("type") == "HTTP"] + error_evts = [e for e in timeline if e.get("type") == "error"] + + lines.append("| Metric | Value |") + lines.append("|--------|-------|") + lines.append(f"| First event | {first_ts} |") + lines.append(f"| Last event | {last_ts} |") + lines.append(f"| Total events | {len(timeline)} |") + if http_evts: + lines.append(f"| HTTP requests | {len(http_evts)} |") + if sip_evts: + lines.append(f"| SIP messages | {len(sip_evts)} |") + if error_evts: + lines.append(f"| Error events | {len(error_evts)} |") + else: + lines.append("No timeline events captured.") + lines.append("") + + # ── Final Outcome ── + lines.append("### Final Outcome\n") + if summary: + lines.append(summary) + else: + lines.append("Analysis produced no summary.") + lines.append("") + + # ── Communication Flow (split by protocol) ── + if timeline: + http_evts = [e for e in timeline if e.get("type") == "HTTP"] + sip_evts = [e for e in timeline if e.get("type") == "SIP"] + other_evts = [e for e in timeline if e.get("type") not in ("HTTP", "SIP")] + + if http_evts: + lines.append(f"### HTTP Communication Flow ({len(http_evts)} requests)\n") + for ev in http_evts: + ts = ev.get("timestamp", "?") + src = ev.get("source", "?") + dst = ev.get("destination", "?") + detail = ev.get("detail", "") + lines.append(f"- [{ts}] {src} \u2192 {dst}: {detail}") + lines.append("") + + if sip_evts: + lines.append(f"### SIP Communication Flow ({len(sip_evts)} messages)\n") + for ev in sip_evts: + ts = ev.get("timestamp", "?") + src = ev.get("source", "?") + dst = ev.get("destination", "?") + detail = ev.get("detail", "") + lines.append(f"- [{ts}] {src} \u2192 {dst}: {detail}") + lines.append("") + + if other_evts: + lines.append(f"### Other Events ({len(other_evts)})\n") + for ev in other_evts: + ts = ev.get("timestamp", "?") + etype = ev.get("type", "") + src = ev.get("source", "?") + dst = ev.get("destination", "?") + detail = ev.get("detail", "") + lines.append(f"- [{ts}] `{etype}` {src} \u2192 {dst}: {detail}") + lines.append("") + + # ── Evidence References ── + if evidence_index: + lines.append(f"### Evidence Index ({len(evidence_index)} references)\n") + display_refs = evidence_index[:25] + lines.append("| # | Doc ID | Index | Category | Timestamp | Relevance |") + lines.append("|---|--------|-------|----------|-----------|-----------|") + for i, ref in enumerate(display_refs, 1): + doc_id = ref.get("doc_id", "?") + idx = ref.get("index", "?") + ts = ref.get("timestamp", "?") + cat = ref.get("category", "?") + rel = ref.get("relevance", "") + lines.append(f"| {i} | `{doc_id}` | {idx} | {cat} | {ts} | {rel} |") + if len(evidence_index) > 25: + lines.append(f"\n*... and {len(evidence_index) - 25} more references*") + lines.append("") + + # ── Stats ── + lines.append( + f"*Analysis: {rolling.get('batch_count', 0)} batches processed, " + f"{len(errors)} errors found, " + f"{len(timeline)} events captured, " + f"{rolling.get('evidence_count', 0)} evidence references collected.*" + ) + + return "\n".join(lines) + + +# ═══════════════════════════════════════════════════════════════════════════════ +# Producer-Consumer: analysis consumer loop +# ═══════════════════════════════════════════════════════════════════════════════ + +SENTINEL = None # pushed by the producer to signal "no more batches" + + +async def run_analysis_consumer( + queue: "asyncio.Queue[list[dict] | None]", + search_summary: str = "", + sdk_logs: str = "", +) -> tuple[str, dict, list[dict]]: + """Consume condensed hit batches from an asyncio.Queue and run MAP-REDUCE. + + The search producer pushes list[dict] items (condensed hits per page) onto + the queue, then pushes SENTINEL (None) when done. This consumer processes + them one-at-a-time with map_batch -> reduce, compressing the summary when + it exceeds the token cap. + + After all search batches are consumed, if sdk_logs is provided the consumer + chunks them and processes those batches too — building a unified + rolling_analysis covering both data sources. + + A single temporary ADK session is created for the entire analysis run and + destroyed when the consumer finishes (or on error). + + Args: + queue: asyncio.Queue fed by the search producer; items are + list[dict] (condensed hits) or None (sentinel). + search_summary: optional search_summary string for the final markdown. + sdk_logs: optional raw SDK log text to analyze after search batches. + + Returns: + (markdown_report, rolling_analysis, evidence_index) + """ + rolling = new_rolling_analysis() + evidence_index: list[dict] = [] + + session_id = f"analysis-run-{uuid.uuid4().hex[:12]}" + await _session_service.create_session( + app_name=_APP_NAME, + user_id=_USER_ID, + session_id=session_id, + ) + logger.info(f"[analysis_consumer] Created shared session {session_id}") + + try: + batch_num = 0 + + # Phase 1: consume search batches from the queue + while True: + item = await queue.get() + if item is SENTINEL: + queue.task_done() + logger.info("[analysis_consumer] Received sentinel, search batches done") + break + + batch_num += 1 + condensed_hits = item + logger.info( + f"[analysis_consumer] Processing search batch {batch_num} " + f"({len(condensed_hits)} entries)" + ) + + compact_memory = rolling["summary"] + + map_output = await map_batch(condensed_hits, compact_memory, session_id) + if map_output: + rolling, evidence_index = reduce(rolling, map_output, evidence_index) + + summary_tokens = _estimate_tokens(rolling.get("summary", "")) + if summary_tokens > ROLLING_SUMMARY_TOKEN_CAP: + rolling = await compress_analysis_summary(rolling) + + queue.task_done() + + # Phase 2: chunk and process SDK logs (if provided) + sdk_batches = chunk_sdk_logs(sdk_logs) + if sdk_batches: + logger.info( + f"[analysis_consumer] Processing {len(sdk_batches)} SDK log batches " + f"({sum(len(b) for b in sdk_batches)} lines)" + ) + for condensed in sdk_batches: + batch_num += 1 + logger.info( + f"[analysis_consumer] Processing SDK batch {batch_num} " + f"({len(condensed)} entries)" + ) + + compact_memory = rolling["summary"] + map_output = await map_batch(condensed, compact_memory, session_id) + if map_output: + rolling, evidence_index = reduce(rolling, map_output, evidence_index) + + summary_tokens = _estimate_tokens(rolling.get("summary", "")) + if summary_tokens > ROLLING_SUMMARY_TOKEN_CAP: + rolling = await compress_analysis_summary(rolling) + + finally: + try: + await _session_service.delete_session( + app_name=_APP_NAME, + user_id=_USER_ID, + session_id=session_id, + ) + logger.info(f"[analysis_consumer] Destroyed shared session {session_id}") + except Exception: + pass + + markdown = format_to_markdown(rolling, evidence_index, search_summary) + logger.info( + f"[analysis_consumer] Done — {batch_num} batches, " + f"{len(evidence_index)} evidence refs, " + f"{_estimate_tokens(markdown)} tokens in report" + ) + return markdown, rolling, evidence_index + + +# ═══════════════════════════════════════════════════════════════════════════════ +# Upload-only (SDK logs pasted directly, no search) +# ═══════════════════════════════════════════════════════════════════════════════ + +_UPLOAD_BATCH_SIZE = 200 + + +def chunk_sdk_logs(sdk_logs: str, batch_size: int = _UPLOAD_BATCH_SIZE) -> list[list[dict]]: + """Split raw SDK log text into batches of condensed dicts. + + Each dict has {"raw_line": , "line_num": <1-based line number>}. + Returns an empty list if sdk_logs is blank. + """ + if not sdk_logs or not sdk_logs.strip(): + return [] + lines = sdk_logs.strip().splitlines() + batches: list[list[dict]] = [] + for start in range(0, len(lines), batch_size): + batch_lines = lines[start : start + batch_size] + batches.append([ + {"raw_line": line, "line_num": start + i + 1} + for i, line in enumerate(batch_lines) + ]) + return batches + + +async def analyze_upload_only( + sdk_logs: str, +) -> tuple[str, dict, list[dict]]: + """Analyze SDK logs that were uploaded directly (no OpenSearch search). + + Splits the raw log text into line-based batches and runs the same + map -> reduce -> compress pipeline. A single temporary ADK session is + created for the entire upload analysis and destroyed at the end. + + Args: + sdk_logs: raw log text pasted or uploaded by the user. + + Returns: + (markdown_report, rolling_analysis, evidence_index) + """ + batches = chunk_sdk_logs(sdk_logs) + if not batches: + return "(No SDK logs provided)", new_rolling_analysis(), [] + + logger.info(f"[analyze_upload_only] Processing {sum(len(b) for b in batches)} lines in {len(batches)} batches") + + rolling = new_rolling_analysis() + evidence_index: list[dict] = [] + + session_id = f"upload-run-{uuid.uuid4().hex[:12]}" + await _session_service.create_session( + app_name=_APP_NAME, + user_id=_USER_ID, + session_id=session_id, + ) + logger.info(f"[analyze_upload_only] Created shared session {session_id}") + + try: + for condensed in batches: + compact_memory = rolling["summary"] + map_output = await map_batch(condensed, compact_memory, session_id) + + if map_output: + rolling, evidence_index = reduce(rolling, map_output, evidence_index) + + summary_tokens = _estimate_tokens(rolling.get("summary", "")) + if summary_tokens > ROLLING_SUMMARY_TOKEN_CAP: + rolling = await compress_analysis_summary(rolling) + finally: + try: + await _session_service.delete_session( + app_name=_APP_NAME, + user_id=_USER_ID, + session_id=session_id, + ) + logger.info(f"[analyze_upload_only] Destroyed shared session {session_id}") + except Exception: + pass + + markdown = format_to_markdown(rolling, evidence_index, search_summary="(SDK log upload)") + logger.info( + f"[analyze_upload_only] Done — {rolling['batch_count']} batches, " + f"{len(evidence_index)} evidence refs" + ) + return markdown, rolling, evidence_index diff --git a/agents/analyze_agent_v2/skills/architecture_endpoints_skill/SKILL.md b/agents/analyze_agent_v2/skills/architecture-endpoints-skill/SKILL.md similarity index 60% rename from agents/analyze_agent_v2/skills/architecture_endpoints_skill/SKILL.md rename to agents/analyze_agent_v2/skills/architecture-endpoints-skill/SKILL.md index 231201a..08d6af2 100644 --- a/agents/analyze_agent_v2/skills/architecture_endpoints_skill/SKILL.md +++ b/agents/analyze_agent_v2/skills/architecture-endpoints-skill/SKILL.md @@ -11,10 +11,4 @@ When analyzing Mobius, SSE, MSE, or WxCAS logs, use this skill to look up: - **Signaling and media paths**: End-to-end flow for WebRTC Calling vs. Contact Center (browser → Mobius → SSE → …). - **Call types and routing**: WebRTC-to-WebRTC, WebRTC-to-PSTN, WebRTC-to-Desk Phone, and Contact Center flows. -Consult the reference documents: - -- **references/architecture_and_endpoints.md** — service roles and endpoint descriptions. -- **references/calling_flow.md** — WebRTC Calling end-to-end architecture (signaling, media, call types). -- **references/contact_center_flow.md** — Contact Center end-to-end architecture (signaling, media, Kamailio/RTMS/RAS, health ping, timers, failover). - -Use them to attribute log lines to the correct service and to explain signaling/media paths in your analysis. +Consult the reference document **references/architecture_and_endpoints.md** for the full descriptions. Use it to attribute log lines to the correct service and to explain signaling/media paths in your analysis. diff --git a/agents/analyze_agent_v2/skills/architecture-endpoints-skill/references/architecture_and_endpoints.md b/agents/analyze_agent_v2/skills/architecture-endpoints-skill/references/architecture_and_endpoints.md new file mode 100644 index 0000000..86a87c7 --- /dev/null +++ b/agents/analyze_agent_v2/skills/architecture-endpoints-skill/references/architecture_and_endpoints.md @@ -0,0 +1,53 @@ +# Architecture and Endpoints Reference + +## Endpoints — service roles and descriptions + +- **Webex SDK/Client** (Web or native app making the request): Chrome extension or any third-party web application that consumes the Webex Calling SDK. + +- **Mobius**: Microservice that interworks between WebRTC and SIP to enable Webex Calling users to register and make calls using a web browser. It translates browser-originated signaling (HTTP/WSS) into SIP for backend communication. + - **Mobius Multi-Instance Architecture**: Multiple Mobius servers are deployed across different geographic regions (e.g., US, EU, APAC). When a user initiates a call from their browser, their geolocation (based on IP) is used to route them to the nearest Mobius instance using a GeoDNS or load balancer. + - Mobius talks to the following components: + - **CPAPI** (Cisco Platform API): User entitlement and application metadata. + - **CXAPI** (Webex Calling Call Control API): Stateless micro-service that implements the messaging logic behind the Webex Calling Call Control API. When incoming requests are received, it validates that the customer/user the request is made on behalf of belongs to Webex Calling and has the appropriate scope and roles. It then converts the request to the appropriate OCI requests and routes it to the OCIRouter to route to the correct WxC deployment. + +- **U2C (User to Cluster)**: Microservice that helps services find other service instances across multiple Data Centers. It takes a user's email or UUID and optional service name, and returns the catalog containing the service URLs. + +- **WDM (Webex squared Device Manager)**: Microservice responsible for registering a device and proxying feature toggles and settings for the user to bootstrap the Webex clients. + - If WDM shows many 502 responses, possible failing dependencies: Common Identity CI, Mercury API, U2C, Feature Service, Cassandra, Redis. + - If WDM is generating errors, either Locus will produce 502 responses or the clients will show an error. + +- **SSE (Signalling Service Edge)**: Edge component for SIP signalling. It communicates with two endpoints — Mobius and the application server WxCAS. + +- **MSE (Media Service Engine)**: Edge component for media relay that handles RTP for WebRTC clients. + +- **Webex Calling Application Server (WxCAS)**: Core control application in Webex Calling responsible for enabling communication between source SSE and destination SSE. + +- **Mercury**: Webex's real-time messaging and signaling service that establishes WebSocket connections and exchanges information in the form of events. Mobius uses Mercury to send events to the SDK. The SDK establishes a Mercury connection (WebSocket) to receive communication from Mobius. + +--- + +## WebRTC Calling — End-to-End Architecture + +**Signaling Path**: Browser → Mobius (HTTP/WSS→SIP translation) → SSE (SIP edge) → WxCAS (Application Server) → Destination + +**Media Path**: Browser ↔ MSE (DTLS-SRTP) ↔ Destination + +**Call Types & Routing:** + +- **WebRTC to WebRTC**: WxCAS resolves destination browser → Mobius notifies Browser 2 → Both browsers establish DTLS-SRTP with their local MSE. +- **WebRTC to PSTN**: WxCAS resolves PSTN destination → SSE signals toward Local Gateway → Browser↔MSE1 (DTLS-SRTP), MSE1↔MSE2 (RTP), MSE2→LGW (RTP→PSTN). +- **WebRTC to Desk Phone**: WxCAS resolves desk phone → SSE coordinates with MSE → Browser↔MSE1 (DTLS-SRTP), MSE1↔MSE2 (RTP), MSE2→Desk Phone. + +--- + +## Contact Center — End-to-End Architecture + +**Signaling Path**: Browser → Mobius → SSE → Kamailio (SIP proxy) → Destination + +**Media Path**: Browser ↔ MSE ↔ Destination + +**Additional Contact Center Components:** + +- **Kamailio**: SIP proxy for Contact Center — handles SIP REGISTER, stores registration details on RTMS Application Server, routes calls to the appropriate destination. +- **RTMS**: Real-time microservice enabling persistent WebSocket connections between clients and backend services. +- **RAS** (Registration, Activation, and provisioning Service): Stores SIP REGISTER Contact and Path headers with expiry, maintains metrics for WebRTC active sessions and calls to WebRTC phones. diff --git a/agents/analyze_agent_v2/skills/architecture_endpoints_skill/references/calling_flow.md b/agents/analyze_agent_v2/skills/architecture-endpoints-skill/references/calling_flow.md similarity index 100% rename from agents/analyze_agent_v2/skills/architecture_endpoints_skill/references/calling_flow.md rename to agents/analyze_agent_v2/skills/architecture-endpoints-skill/references/calling_flow.md diff --git a/agents/analyze_agent_v2/skills/architecture_endpoints_skill/references/contact_center_flow.md b/agents/analyze_agent_v2/skills/architecture-endpoints-skill/references/contact_center_flow.md similarity index 100% rename from agents/analyze_agent_v2/skills/architecture_endpoints_skill/references/contact_center_flow.md rename to agents/analyze_agent_v2/skills/architecture-endpoints-skill/references/contact_center_flow.md diff --git a/agents/analyze_agent_v2/skills/architecture_endpoints_skill/references/architecture_and_endpoints.md b/agents/analyze_agent_v2/skills/architecture_endpoints_skill/references/architecture_and_endpoints.md deleted file mode 100644 index 9245699..0000000 --- a/agents/analyze_agent_v2/skills/architecture_endpoints_skill/references/architecture_and_endpoints.md +++ /dev/null @@ -1,101 +0,0 @@ -# Architecture and Endpoints Reference - -## Endpoints — service roles and descriptions - -- **Webex SDK/Client** (Web or native app making the request): Chrome extension or any third-party web application that consumes the Webex Calling SDK. - -- **Mobius**: Microservice that interworks between WebRTC and SIP to enable Webex Calling users to register and make calls using a web browser. It translates browser-originated signaling (HTTP/WSS) into SIP for backend communication. - - **Mobius Multi-Instance Architecture**: Multiple Mobius servers are deployed across different geographic regions (e.g., US, EU, APAC). When a user initiates a call from their browser, their geolocation (based on IP) is used to route them to the nearest Mobius instance using a GeoDNS or load balancer. - - Mobius talks to the following components: - - **CPAPI** (Cisco Platform API): User entitlement and application metadata. - - **CXAPI** (Webex Calling Call Control API): Stateless micro-service that implements the messaging logic behind the Webex Calling Call Control API. When incoming requests are received, it validates that the customer/user the request is made on behalf of belongs to Webex Calling and has the appropriate scope and roles. It then converts the request to the appropriate OCI requests and routes it to the OCIRouter to route to the correct WxC deployment. - -- **U2C (User to Cluster)**: Microservice that helps services find other service instances across multiple Data Centers. It takes a user's email or UUID and optional service name, and returns the catalog containing the service URLs. - -- **WDM (Webex squared Device Manager)**: Microservice responsible for registering a device and proxying feature toggles and settings for the user to bootstrap the Webex clients. - - If WDM shows many 502 responses, possible failing dependencies: Common Identity CI, Mercury API, U2C, Feature Service, Cassandra, Redis. - - If WDM is generating errors, either Locus will produce 502 responses or the clients will show an error. - -- **SSE (Signalling Service Edge)**: Edge component for SIP signalling. It communicates with two endpoints — Mobius and the application server WxCAS. - -- **MSE (Media Service Engine)**: Edge component for media relay that handles RTP for WebRTC clients. - -- **Webex Calling Application Server (WxCAS)**: Core control application in Webex Calling responsible for enabling communication between source SSE and destination SSE. - -- **Mercury**: Webex's real-time messaging and signaling service that establishes WebSocket connections and exchanges information in the form of events. Mobius uses Mercury to send events to the SDK. The SDK establishes a Mercury connection (WebSocket) to receive communication from Mobius. - ---- - -## High-Level Component Topology (MSE Option with Mobius in WxC) - -``` - ┌─────────────────────────────────────────────────┐ - │ Webex Cloud │ - │ │ - │ ┌─────────┐ ┌──────────┐ │ - WebSocket │ │ Mercury │ │ Webex CI │ │ -Registered ┌──────────────────────┼──>│ │ │ │ │ -User │ │ └─────────┘ └──────────┘ │ - [Browser │ │ │ - Extension] │ │ Async events ┌────────┐ │ - │ │ REST API (HTTPS) │ │ │ CXAPI │ │ - │ │ │ ▼ │ 3PCC │ │ - │ └──────────────────────┼──> ┌──────────────┐ └───┬────┘ │ - │ │ │ Mobius │ │ Supplementary │ - │ │ │ Micro Service │ │ Services for calls│ - │ Enterprise │ └──────┬───────┘ │ │ - │ Web Server │ │ Provisioning │ │ - │ │ │ data query ┌──┴─────┐ ┌────────┐│ - │ │ └────────────>│ CPAPI │ │ CH UI ││ - │ │ └────────┘ │(Control││ - │ │ │ Hub) ││ - │ └──────────────────────────────────────┴────────┘│ - │ │ - │ Media (DTLS-SRTP) ┌─────────────────────────────────────────────────┐ - │ STUN consent, ICE │ Webex Calling │ - │ │ │ - │ ┌───────────────────┼──> ┌───────┐ mTLS SIP persistent ┌─────────┐ │ - │ │ │ │ MSE │ connection w/ │ SSE │ │ - │ │ │ │ │ webRTC domain in │ │ │ - │ │ │ └───┬───┘ cert SAN └────┬────┘ │ - │ │ │ │ RTP │ │ - │ │ │ ┌───┴───┐ ┌────┴──────┐│ - ▼ ▼ │ │ MSE │ │WxC Call ││ -Peer User [Peer Device] │ │ │◄────── RTP ─────────>│Control ││ - Media (SRTP) │ └───────┘ │(AS/WxCAS)││ - │ └────┬──────┘│ - │ ┌───────┐ ┌────┴──────┐│ - │ │ SSE │ │OCI Router ││ - │ └───────┘ └───────────┘│ - └─────────────────────────────────────────────────┘ - -Legend: - ───────> signaling - ═══════> media - - - - -> media control done by Mobius - [dark] new components - [light] existing components, need change -``` - -### Connection Types Between Components - -| From | To | Protocol | Purpose | -|------|----|----------|---------| -| Browser/Extension | Mercury | WebSocket | Async event delivery (call notifications, status updates) | -| Browser/Extension | Mobius | REST API (HTTPS) | Call control signaling (register, call, hold, transfer) | -| Browser/Extension | MSE | DTLS-SRTP | Encrypted media (audio/video), STUN consent, ICE | -| Mobius | Mercury | Internal | Async event push to SDK/Client | -| Mobius | CPAPI | HTTPS | Provisioning data query, user entitlements | -| Mobius | CXAPI (3PCC) | HTTPS | Supplementary call services | -| Mobius | SSE | mTLS SIP | SIP signaling (persistent connection, webRTC domain in cert SAN) | -| MSE | MSE | RTP | Media relay between caller and callee media engines | -| SSE | WxC Call Control (WxCAS) | SIP | Call routing, destination resolution | -| WxCAS | OCI Router | Internal | Route OCI requests to correct WxC deployment | -| Admin | Control Hub (CH UI) | HTTPS | Administration and configuration | -| Control Hub | CPAPI | HTTPS | Provisioning and configuration | - ---- - -For **WebRTC Calling** flow details (signaling path, media path, call types & routing), see **references/calling_flow.md**. - -For **Contact Center** flow details (signaling path, media path, Kamailio/RTMS/RAS, health ping, timers, Kafka failover, inter-regional failover), see **references/contact_center_flow.md**. diff --git a/agents/analyze_agent_v2/skills/mobius_error_id_skill/SKILL.md b/agents/analyze_agent_v2/skills/mobius-error-id-skill/SKILL.md similarity index 100% rename from agents/analyze_agent_v2/skills/mobius_error_id_skill/SKILL.md rename to agents/analyze_agent_v2/skills/mobius-error-id-skill/SKILL.md diff --git a/agents/analyze_agent_v2/skills/mobius_error_id_skill/references/mobius_error_ids.md b/agents/analyze_agent_v2/skills/mobius-error-id-skill/references/mobius_error_ids.md similarity index 96% rename from agents/analyze_agent_v2/skills/mobius_error_id_skill/references/mobius_error_ids.md rename to agents/analyze_agent_v2/skills/mobius-error-id-skill/references/mobius_error_ids.md index 50e8dd8..9f3b508 100644 --- a/agents/analyze_agent_v2/skills/mobius_error_id_skill/references/mobius_error_ids.md +++ b/agents/analyze_agent_v2/skills/mobius-error-id-skill/references/mobius_error_ids.md @@ -313,13 +313,13 @@ Reference for Mobius HTTP response codes and `mobius-error` codes. Use this to i --- -## Timers (keepalive and unregistration) +## Useful Mobius log filters (Kibana / OpenSearch) -When correlating registration/call failures or unexpected unregisters, use these values (Mobius Basic Training and 2 AM Guide): +- `tags: mobius` — all Mobius-related logs +- `fields.eventType`: `mobiusapp` (application), `sipmsg` (SIP), `httpmsg` (HTTP) +- `fields.id`: `mobiusreg`, `mobiuscall`, `mobiusconn`, `mobiusdns`, `mobiuscallkeepalive`, `mobiusregkeepalive` +- `fields.localAlias`: `mobius-reg-app`, `mobius-sip-app`, `mobius-call-app` +- `fields.response_status` — HTTP response to client +- `fields.mobiusCallId`, `fields.sipCallId`, `fields.DEVICE_ID`, `fields.USER_ID`, `fields.WEBEX_TRACKINGID` -- **Registration keepalive:** Browser sends keepalive every **30 seconds**. After **5 missed** keepalives, Mobius triggers **unregistration**. So ~150 seconds of no keepalive → unregister. -- **Call keepalive:** Browser sends keepalive during a call; valid **within 15 minutes**. -- **SIP APP – Registration refresh:** Refreshes with SSE at **3/4 of the Expires** header from REGISTER. -- **SIP APP – Options:** OPTIONS ping to SSE every **35 seconds** to check connectivity. - -If you see unregistration (e.g. 404 or 503 with "Call/s exist for this device") shortly after gaps in logs, consider keepalive miss (network, client suspend, or load) as a cause. +Index pattern: `wxm-app:logs*` (EU may use a separate index). diff --git a/agents/analyze_agent_v2/skills/sip_flow_skill/SKILL.md b/agents/analyze_agent_v2/skills/sip-flow-skill/SKILL.md similarity index 100% rename from agents/analyze_agent_v2/skills/sip_flow_skill/SKILL.md rename to agents/analyze_agent_v2/skills/sip-flow-skill/SKILL.md diff --git a/agents/analyze_agent_v2/skills/sip_flow_skill/references/sip_flows.md b/agents/analyze_agent_v2/skills/sip-flow-skill/references/sip_flows.md similarity index 100% rename from agents/analyze_agent_v2/skills/sip_flow_skill/references/sip_flows.md rename to agents/analyze_agent_v2/skills/sip-flow-skill/references/sip_flows.md diff --git a/agents/chat_agent/agent.py b/agents/chat_agent/agent.py index 3b24bf4..feff7a1 100644 --- a/agents/chat_agent/agent.py +++ b/agents/chat_agent/agent.py @@ -1,13 +1,138 @@ +import json +import math import os from pathlib import Path from dotenv import load_dotenv from google.adk.agents import LlmAgent +from google.adk.models.lite_llm import LiteLlm +from google.adk.tools import FunctionTool +from google.adk.tools.tool_context import ToolContext from oauth_context import SessionLiteLlm env_path = Path(__file__).parent.parent / ".env" load_dotenv(dotenv_path=env_path) +from analyze_agent_v2.agent import ( + architecture_skill_toolset, + sip_flow_skill_toolset, + mobius_skill_toolset as mobius_error_skill_toolset, +) +from search_agent_v2.agent import _log_cache + +_SERVICE_KEY_MAP = { + "mobius": "mobius_logs", + "sse_mse": "sse_mse_logs", + "sse": "sse_mse_logs", + "mse": "sse_mse_logs", + "wxcas": "wxcas_logs", + "sdk": "sdk_logs", +} + + +def _get_state_or_cache(tool_context: ToolContext, key: str) -> str: + """Read from tool_context.state first; fall back to the module-level log cache.""" + value = tool_context.state.get(key, "") + if value: + return value + session_id = tool_context._invocation_context.session.id + return _log_cache.get(session_id, {}).get(key, "") + + +def _parse_log_entries(raw: str) -> list[dict]: + """Parse a JSON log string into a list of dicts, returning [] on failure.""" + if not raw: + return [] + try: + entries = json.loads(raw) + return entries if isinstance(entries, list) else [] + except (json.JSONDecodeError, TypeError): + return [] + + +def get_raw_logs(service: str, page: int, tool_context: ToolContext, page_size: int = 30) -> dict: + """Retrieve a paginated chunk of raw logs for a specific service. + + Returns only one page at a time so the full log set is never loaded + into the LLM context. Call with page=1 for the first chunk, then + increment to get subsequent chunks. + + Args: + service: One of "mobius", "sse_mse", "wxcas", "sdk". + Use "all" to get a count summary of all services (no log entries). + page: 1-based page number. Start with 1. + page_size: Number of log entries per page (default 30, max 50). + + Returns: + A dict with: entries (list), page, total_pages, total_entries, has_more. + """ + service_lower = service.lower().strip() + page_size = max(1, min(page_size, 50)) + + if service_lower == "all": + summary = {} + for svc, key in [("mobius", "mobius_logs"), ("sse_mse", "sse_mse_logs"), + ("wxcas", "wxcas_logs"), ("sdk", "sdk_logs")]: + entries = _parse_log_entries(_get_state_or_cache(tool_context, key)) + summary[svc] = len(entries) + return { + "message": "Use get_raw_logs with a specific service name and page=1 to fetch entries.", + "log_counts": summary, + } + + state_key = _SERVICE_KEY_MAP.get(service_lower) + if not state_key: + return { + "error": f"Unknown service '{service}'. Use one of: mobius, sse_mse, wxcas, sdk, all.", + } + + all_entries = _parse_log_entries(_get_state_or_cache(tool_context, state_key)) + total = len(all_entries) + + if total == 0: + return {"entries": [], "page": 1, "total_pages": 0, + "total_entries": 0, "has_more": False, + "message": f"No {service} logs available in the current analysis."} + + total_pages = math.ceil(total / page_size) + page = max(1, min(page, total_pages)) + start = (page - 1) * page_size + end = start + page_size + chunk = all_entries[start:end] + + return { + "entries": chunk, + "page": page, + "total_pages": total_pages, + "total_entries": total, + "has_more": page < total_pages, + } + + +def get_sequence_diagram(tool_context: ToolContext) -> dict: + """Retrieve the PlantUML sequence diagram for the current analysis. + + Returns: + A dict with the diagram code, or a message if not available. + """ + diagram = tool_context.state.get("sequence_diagram", "") + if not diagram: + return {"diagram": "", "message": "No sequence diagram available for the current analysis."} + return {"diagram": diagram} + + +def get_search_summary(tool_context: ToolContext) -> dict: + """Retrieve the search statistics for the current analysis. + + Returns: + A dict with log counts, BFS depth, environments, and IDs searched. + """ + summary = _get_state_or_cache(tool_context, "search_summary") + if not summary: + return {"summary": "", "message": "No search summary available."} + return {"summary": summary} + + chat_agent = LlmAgent( model=SessionLiteLlm( model="openai/gpt-4.1", @@ -18,6 +143,14 @@ description="Conversational assistant for the Webex Calling Log Analyzer.", name="chat_agent", output_key="chat_response", + tools=[ + FunctionTool(get_raw_logs), + FunctionTool(get_sequence_diagram), + FunctionTool(get_search_summary), + architecture_skill_toolset, + sip_flow_skill_toolset, + mobius_error_skill_toolset, + ], instruction="""You are a conversational assistant for the Webex Calling Log Analyzer. You help engineers explore and understand analysis results produced by the log-analysis pipeline. You are READ-ONLY — you never run searches, never @@ -27,16 +160,15 @@ AVAILABLE CONTEXT ================================================================ -These state variables are injected from prior pipeline agents. -They may be EMPTY if no search has been run yet. +Primary analysis (always in context): + {analyze_results} + +The following data is available ON-DEMAND via tools (not loaded +into context by default — call the tool only when needed): - Analysis (primary source of truth) : {analyze_results} - Search statistics : {search_summary} - Sequence diagram (PlantUML) : {sequence_diagram} - Raw Mobius logs : {mobius_logs} - Raw SSE/MSE logs : {sse_mse_logs} - Raw WxCAS logs : {wxcas_logs} - SDK/Client logs (uploaded) : {sdk_logs} + get_raw_logs(service, page) — paginated raw logs (one chunk at a time) + get_sequence_diagram() — PlantUML sequence diagram + get_search_summary() — search statistics (log counts, BFS depth, IDs) ================================================================ RULE 0 — CONTEXT TRACKING (READ THIS FIRST) @@ -118,12 +250,16 @@ - Be concise. Lead with the direct answer. Expand only if asked. - Always cite exact timestamps and identifiers: - "At **06:58:18.075Z**, **Mobius** sent **SIP 480** + "At 06:58:18.075Z, Mobius sent SIP 480 (Call-ID: SSE065806...)." Never say: "later in the logs", "around that time". -- Use markdown: bold for services/IDs, bullet lists for clarity. +- Use markdown sparingly: bullet lists for clarity, bold only for + section headings or a single critical keyword per sentence. + Do NOT bold timestamps, service names, IDs, or status codes inline. + Overuse of bold makes the output hard to read. - Engineers prefer precision over explanation. Facts first. - Professional tone. No fluff, no storytelling, no emojis. +- Never output bare bullet markers (- or *) on otherwise empty lines. ================================================================ HANDLING SPECIFIC REQUEST TYPES @@ -156,28 +292,67 @@ ── RAW LOG REQUESTS ("show logs", "give me the raw Mobius logs") ── -Clarify which service if not specified: - "Which logs? Mobius, SSE/MSE, or WxCAS?" -Return logs as stored — preserve JSON, sort by @timestamp ascending. -If user asks for ALL logs, warn: "This is a large output. Continue?" +get_raw_logs is PAGINATED — it returns one chunk at a time, not all +logs at once. This keeps context small and responses fast. + + get_raw_logs(service, page, page_size=30) + service: "mobius", "sse_mse", "wxcas", "sdk", or "all" + page: 1-based page number (start with 1) + page_size: entries per page (default 30, max 50) + + Returns: { entries, page, total_pages, total_entries, has_more } + +**Usage pattern:** + 1. If user doesn't specify which service, ask: + "Which logs? Mobius, SSE/MSE, WxCAS, or SDK?" + 2. Call get_raw_logs(service="mobius", page=1) for the first chunk. + 3. Present the entries in a JSON code block. + 4. Report pagination: "**[Page 1/N]** (30 of 142 entries). + Reply 'next' for the next page, or 'stop' to end." + 5. When user says "next"/"continue", call with page=2, page=3, etc. + 6. If user asks for "all" services, call get_raw_logs("all", page=1) + first to get the count per service, then fetch one service at a + time starting with page=1. ── DIAGRAM REQUESTS ("show diagram", "give PlantUML") ── -Return {sequence_diagram} in a code block. Do NOT return it for -non-diagram questions. For modifications, generate updated PlantUML -keeping the same style. +Call get_sequence_diagram() and return the result in a code block. +Do NOT call this tool for non-diagram questions. +For modifications, generate updated PlantUML keeping the same style. ── SEARCH STATISTICS ("how many logs?", "what was searched?") ── -Use {search_summary} for log counts, BFS depth, environments, IDs. +Call get_search_summary() for log counts, BFS depth, environments, IDs. ── TIMING ("how long did the call take?", "setup time?") ── Extract timestamps from analysis. Calculate and present durations. -── TELECOM CONCEPTS ("what is ICE?", "what is SIP 480?") ── +── TELECOM CONCEPTS ("what is ICE?", "what is SIP 480?", "what does + mobius-error 115 mean?", "explain the role of SSE") ── -2–3 sentences max. Just enough to understand the analysis. +You have three reference skills you can consult for accurate answers: + + • architecture_endpoints_skill — service roles (Mobius, SSE, MSE, + WxCAS, CPAPI, Mercury, etc.), signaling/media paths, call types + and routing (WebRTC-to-PSTN, Contact Center, etc.), and topology. + Use when the user asks about what a service does, how traffic flows, + or how components connect. + + • sip_flow_skill — SIP message sequences (INVITE, BYE, REFER, etc.), + SIP response code meanings (480, 488, 503, etc.), SDP negotiation, + SIP timers, and common failure patterns (one-way audio, 32s drops). + Use when the user asks about SIP codes, call setup flows, or + protocol-level behavior. + + • mobius_error_id_skill — Mobius-specific error codes (101–121), + their meanings, root causes, user impact, and what to check in logs. + Use when the user asks about a mobius-error code or a Mobius HTTP + error (403/503/etc.) in the context of registration or calls. + +Use these skills to give precise, reference-backed answers rather than +relying on general knowledge. Keep answers concise (2–5 sentences) +unless the user asks for more detail. ── NEW / UNKNOWN IDENTIFIER ── @@ -185,13 +360,6 @@ "This identifier does not appear in the current analysis. Please run a new search with that ID." -── COMPARISON REQUESTS ("compare the two searches") ── - -If conversation history contains results from multiple searches, -compare based on what you remember from the conversation. Note that -only the LATEST results are in state — earlier results may have been -overwritten. Be transparent about what you can and cannot compare. - ================================================================ WHAT YOU MUST NEVER DO ================================================================ diff --git a/agents/query_analyzer/__init__.py b/agents/query_analyzer/__init__.py new file mode 100644 index 0000000..02c597e --- /dev/null +++ b/agents/query_analyzer/__init__.py @@ -0,0 +1 @@ +from . import agent diff --git a/agents/query_analyzer/agent.py b/agents/query_analyzer/agent.py new file mode 100644 index 0000000..1200662 --- /dev/null +++ b/agents/query_analyzer/agent.py @@ -0,0 +1,370 @@ +import os +from pathlib import Path +from dotenv import load_dotenv + +from google.adk.agents import BaseAgent, LlmAgent +from google.adk.agents.invocation_context import InvocationContext +from google.adk.events import Event +from google.adk.models.lite_llm import LiteLlm + +env_path = Path(__file__).parent.parent / ".env" +load_dotenv(dotenv_path=env_path) + +from root_agent_v2.agent import root_agent as pipeline +from chat_agent.agent import chat_agent + +query_analyzer = LlmAgent( + model=LiteLlm( + + ), + name='query-analyzer', + description='', + instruction=""" +You are the Query Analyzer agent. + +Your role is to analyze each user query and delegate the request to the correct agent. + +You MUST NOT answer user questions. +You MUST NOT analyze logs. +You MUST NOT generate explanations. + +You ONLY decide delegation. + +Available sub-agents: + +1) pipeline + Responsible for: + - Searching OpenSearch logs + - Extracting identifiers + - Building call flows + - Finding sequence failures + - Error investigation + +2) chat_agent + Responsible for: + - Follow-up conversations + - Explanations + - Clarifications + - Asking user for missing information + - Answering general questions + +-------------------------------------------------- + +SYSTEM PURPOSE + +The pipeline retrieves information about: + +- Error IDs +- Tracking IDs +- Session IDs +- Call flows +- Sequence failures +- Service errors + +from OpenSearch logs. + +The chat_agent uses already available information in the session state +to communicate with the user. + +-------------------------------------------------- + +CORE PRINCIPLE + +QueryAnalyzer is a lightweight router. + +It should: + +1) Inspect the query +2) Check session state +3) Delegate + +It should NOT: + +- Think deeply +- Analyze logs +- Interpret results + +The QueryAnalyzer should primarily check session state and delegate. + +-------------------------------------------------- + +SESSION STATE + +You have access to: + +Latest Search Results: +{latest_search_results} + +Latest Analyze Results: +{analyze_results} + +These contain all information already retrieved from logs. + +If the required information already exists in state: + +→ Delegate to chat_agent + +Do NOT run pipeline again. + +-------------------------------------------------- + +QUERY CLASSIFICATION + +Queries fall into the following categories: + +-------------------------------------------------- + +1) INFO QUERY → PIPELINE + +These queries request log information. + +Examples: + +- webex-js-sdk_abc123 +- errorId_12345 +- trackingId_xxx +- sessionId_xxx +- callId_xxx + +Examples: + +"Investigate this error" + +"Analyze this call" + +"Find sequence failures" + +"Search logs for this tracking ID" + +If NEW information must be fetched: + +→ Delegate to pipeline + +-------------------------------------------------- + +2) FOLLOW-UP QUERY → CHAT AGENT + +These queries refer to previous results. + +Examples: + +"Explain more" + +"Why did it fail" + +"What happened" + +"Show root cause" + +"Which service failed" + +"Explain the ROAP issue" + +These queries use EXISTING information. + +→ Delegate to chat_agent + +Always. + +-------------------------------------------------- + +3) NO QUERY PARAMETERS → CHAT AGENT + +If the query contains NO identifiers such as: + +- error id +- tracking id +- session id +- call id +- meeting id +- device id + +Then pipeline cannot run. + +Examples: + +"What is ROAP?" + +"Explain SIP flow" + +"What does 403 mean?" + +"Call failed" + +"Something broke" + +In these cases: + +→ Delegate to chat_agent + +The chat agent will ask the user for: + +- Tracking ID +- Error ID +- Session ID +- Environment + +-------------------------------------------------- + +4) STATE-DEPENDENT QUERY → CHECK STATE FIRST + +These queries might require logs OR might be follow-ups. + +Examples: + +"Show the flow" + +"Show failures" + +"What errors occurred?" + +"Analyze this session" + +Decision process: + +Step 1: + +Check: + +{latest_search_results} + +{analyze_results} + +Step 2: + +If relevant information exists: + +→ Delegate to chat_agent + +Step 3: + +If information does NOT exist: + +→ Delegate to pipeline + +-------------------------------------------------- + +5) MIXED QUERIES + +Mixed queries contain both identifiers and explanation requests. + +Examples: + +"Check this error and explain" + +"Analyze trackingId_123 and summarize" + +Decision process: + +Step 1: + +Check state. + +Step 2: + +If identifier already exists in state: + +→ chat_agent + +Step 3: + +If identifier does NOT exist in state: + +→ pipeline + +-------------------------------------------------- + +6) MISSING INFORMATION + +If the query does not contain enough information for log search: + +Examples: + +"Call failed" + +"Error occurred" + +"Logs missing" + +Do NOT run pipeline. + +→ Delegate to chat_agent + +Chat agent will request more details from user. + +-------------------------------------------------- + +STATE RULES + +Rule 1: + +If state is empty: + +latest_search_results = empty +analyze_results = empty + +AND query contains identifiers: + +→ pipeline + +Otherwise: + +→ chat_agent + +-------------------------------------------------- + +Rule 2: + +If identifier exists in state: + +→ chat_agent + +-------------------------------------------------- + +Rule 3: + +If identifier does not exist in state: + +→ pipeline + +-------------------------------------------------- + +Rule 4: + +If unsure: + +Check state first. + +If state contains relevant information: + +→ chat_agent + +Otherwise: + +→ pipeline + +-------------------------------------------------- + +FINAL RULES + +You MUST ONLY delegate. + +Never answer user questions. + +Never explain routing. + +Never analyze logs. + +Never ask user questions. + +Only delegate. + +Valid targets: + +pipeline +chat_agent +""" + , + sub_agents=[pipeline,chat_agent] +) + diff --git a/agents/query_router/agent.py b/agents/query_router/agent.py index 1082821..c44b0fe 100644 --- a/agents/query_router/agent.py +++ b/agents/query_router/agent.py @@ -16,7 +16,7 @@ load_dotenv(dotenv_path=env_path) from root_agent_v2.agent import root_agent as pipeline -from analyze_agent_v2.agent import analyze_agent +from analyze_agent_v2.incremental import analyze_upload_only from visualAgent.agent import sequence_diagram_agent logger = logging.getLogger(__name__) @@ -391,10 +391,12 @@ async def _run_async_impl( # Store SDK logs if uploaded if upload_only: - ctx.session.state["sdk_logs"] = search_params.get("sdk_logs", "") - logger.info("[query_analyzer] Upload-only mode — skipping search, running analyze + visualize") - async for event in analyze_agent.run_async(ctx): - yield event + sdk_logs = search_params.get("sdk_logs", "") + ctx.session.state["sdk_logs"] = sdk_logs + logger.info("[query_analyzer] Upload-only mode — running incremental analysis + visualize") + markdown, _rolling, _evidence = await analyze_upload_only(sdk_logs) + ctx.session.state["analyze_results"] = markdown + ctx.session.state["analysis_evidence"] = json.dumps(_evidence, default=str) async for event in sequence_diagram_agent.run_async(ctx): yield event else: @@ -408,6 +410,7 @@ async def _run_async_impl( logger.info("[query_analyzer] Running pipeline") async for event in pipeline.run_async(ctx): yield event + else: logger.info("[query_analyzer] Skipping pipeline — passing to chat_agent") return diff --git a/agents/root_agent_v2/agent.py b/agents/root_agent_v2/agent.py index d628764..d0bb2dd 100644 --- a/agents/root_agent_v2/agent.py +++ b/agents/root_agent_v2/agent.py @@ -1,7 +1,8 @@ """ -Root Agent v2 — Sequential pipeline using search_agent_v2 and analyze_agent_v2. +Root Agent v2 — Sequential pipeline: search (with inline incremental analysis) +then sequence diagram generation. -Pipeline: search_agent_v2 → analyze_agent_v2 → sequence_diagram_agent +Pipeline: search_agent_v2 (includes map-reduce analysis) → sequence_diagram_agent Run standalone: adk web agents/root_agent_v2 """ @@ -17,16 +18,11 @@ env_path = Path(__file__).parent.parent / ".env" load_dotenv(dotenv_path=env_path) -# ═══════════════════════════════════════════════════════════════════════════════ -# OAuth Token Initialization — DISABLED (token provided via Webex OAuth login) -# ═══════════════════════════════════════════════════════════════════════════════ - # ═══════════════════════════════════════════════════════════════════════════════ # Import sub-agents # ═══════════════════════════════════════════════════════════════════════════════ from search_agent_v2.agent import search_agent -from analyze_agent_v2.agent import analyze_agent from visualAgent.agent import sequence_diagram_agent logging.info("✓ root_agent_v2: All sub-agents imported successfully") @@ -37,10 +33,10 @@ root_agent = SequentialAgent( name="MicroserviceLogAnalyzerV2", - sub_agents=[search_agent, analyze_agent, sequence_diagram_agent], + sub_agents=[search_agent, sequence_diagram_agent], description=( "Executes a full log analysis pipeline: " - "exhaustive BFS search → analysis (calling/contact-center routing) → " + "exhaustive BFS search with inline incremental analysis → " "PlantUML sequence diagram generation." ), ) diff --git a/agents/search_agent_v2/agent.py b/agents/search_agent_v2/agent.py index 47fe34e..1618167 100644 --- a/agents/search_agent_v2/agent.py +++ b/agents/search_agent_v2/agent.py @@ -19,6 +19,8 @@ import time import requests from collections import deque + +_log_cache: dict[str, dict[str, str]] = {} from pathlib import Path from typing import Any, AsyncGenerator, Optional from typing_extensions import override @@ -29,6 +31,13 @@ from google.adk.agents import LlmAgent, BaseAgent from google.adk.agents.invocation_context import InvocationContext from google.adk.events import Event +from google.adk.models.lite_llm import LiteLlm + +from analyze_agent_v2.incremental import ( + run_analysis_consumer, + format_to_markdown, + SENTINEL, +) from opensearchpy import OpenSearch, RequestsHttpConnection from oauth_context import SessionLiteLlm, get_oauth_token @@ -1062,7 +1071,8 @@ async def _process_hits_progressive( seen_hit_ids: set[str], category: str, id_extractor_instruction: str, - ) -> tuple[dict, int]: + analysis_queue: "asyncio.Queue | None" = None, + ) -> tuple[str, dict, int]: """ Process a page of hits: deduplicate, extract IDs. Returns (extracted_ids, new_unique_count). @@ -1096,6 +1106,13 @@ async def _process_hits_progressive( f"{len(condensed)} entries for LLM" ) + if analysis_queue is not None: + await analysis_queue.put(condensed) + logger.info( + f"[_process_hits_progressive] Pushed {len(condensed)} entries " + f"to analysis queue" + ) + extracted = await _extract_ids_from_batch(condensed, id_extractor_instruction) logger.info( f"[_process_hits_progressive] LLM results: " @@ -1252,6 +1269,15 @@ async def _run_async_impl( TIME_PADDING_HOURS = 2 derived_time_range: tuple[str, str] | None = None + analysis_queue: asyncio.Queue = asyncio.Queue() + analysis_task = asyncio.create_task( + run_analysis_consumer( + queue=analysis_queue, + sdk_logs=ctx.session.state.get("sdk_logs", ""), + ) + ) + logger.info(f"[{self.name}] Analysis consumer task started in background") + for ident in identifiers: id_val = ident["value"] id_type = ident.get("type", "unknown") @@ -1382,6 +1408,7 @@ async def _prefetch_pages(idx: str, q: dict, out: asyncio.Queue) -> None: seen_hit_ids=seen_hit_ids, category=category, id_extractor_instruction=self.id_extractor.instruction, + analysis_queue=analysis_queue, ) depth_new_hits += new_count @@ -1503,6 +1530,24 @@ async def _prefetch_pages(idx: str, q: dict, out: asyncio.Queue) -> None: f"Frontier: {len(frontier)} pending" ) + # ══════════════════════════════════════════════════════════════════════ + # Step 3.5: Signal analysis consumer to finish and await results + # ══════════════════════════════════════════════════════════════════════ + await analysis_queue.put(SENTINEL) + logger.info(f"[{self.name}] Sent sentinel to analysis consumer, awaiting results...") + try: + analysis_markdown, analysis_rolling, analysis_evidence = await analysis_task + logger.info( + f"[{self.name}] Analysis consumer finished: " + f"{analysis_rolling.get('batch_count', 0)} batches, " + f"{len(analysis_evidence)} evidence refs" + ) + except Exception as e: + logger.error(f"[{self.name}] Analysis consumer failed: {e}") + analysis_markdown = "" + analysis_rolling = {} + analysis_evidence = [] + # ══════════════════════════════════════════════════════════════════════ # Step 4: Store final results in session state # ══════════════════════════════════════════════════════════════════════ @@ -1538,6 +1583,26 @@ async def _prefetch_pages(idx: str, q: dict, out: asyncio.Queue) -> None: [hit.get("_source", {}) for hit in all_logs["wxcas"]], default=str ) + _log_cache[ctx.session.id] = { + "mobius_logs": ctx.session.state["mobius_logs"], + "sse_mse_logs": ctx.session.state["sse_mse_logs"], + "wxcas_logs": ctx.session.state["wxcas_logs"], + "all_logs": ctx.session.state["all_logs"], + "search_summary": ctx.session.state["search_summary"], + } + ctx.session.state["chunk_analysis_summary"] = analysis_rolling.get("summary", "") + + # Re-format analysis markdown now that search_summary is available + if analysis_rolling: + analysis_markdown = format_to_markdown( + analysis_rolling, + analysis_evidence, + search_summary=ctx.session.state.get("search_summary", ""), + ) + ctx.session.state["analyze_results"] = analysis_markdown + ctx.session.state["analysis_evidence"] = json.dumps(analysis_evidence, default=str) + _log_cache[ctx.session.id]["analyze_results"] = analysis_markdown + logger.info( f"[{self.name}] == Search complete ==\n" f" Mobius: {len(all_logs['mobius'])} logs\n" diff --git a/agents/visualAgent/agent.py b/agents/visualAgent/agent.py index d0bc46a..21d7ff7 100644 --- a/agents/visualAgent/agent.py +++ b/agents/visualAgent/agent.py @@ -77,12 +77,21 @@ ✓ Example: participant "Webex SDK/Client" as Client #E3F2FD ✓ Example: participant "Mobius" as Mobius #BBDEFB +**CRITICAL: ALL entities used in arrow messages MUST be declared as participants FIRST.** +If the analysis mentions a device, user, or entity by a long ID (e.g. d8ac9405-e6c7-30e9-...), +declare it as a participant with a short alias: +✓ CORRECT: participant "Device d8ac9405" as Device #E8EAF6 + Then use: Mobius -> Device: Start Client Event +✗ WRONG: Mobius -> Device d8ac9405-e6c7-30e9-b60f-1613fe6f2986: Start Client Event + (spaces in name and undeclared participant cause syntax errors) + ✗ FORBIDDEN (causes diagram type misdetection): - Using actor keyword: actor "Name" as Alias - Using entity, boundary, control, database, collections keywords - Using stereotypes: participant "Name" as Alias <> - Empty angle brackets: participant "Name" as Alias <> - Missing color assignment: participant "Name" as Alias + - Using undeclared names in arrow messages **RULE 2a: Color Assignment (MANDATORY)** ALL participants MUST have direct color assignment: @@ -105,8 +114,9 @@ - No spaces: Client->Mobius (WRONG) - Double arrows: Client ->> Mobius (wrong notation) - Bidirectional: Client <-> Mobius (not supported) - - Multiple arrows on same line + - Multiple/chained arrows on same line: A -> B -[#00AA00]-> C (WRONG - use two separate lines) - Wrong spacing: Client- >Mobius or Client -> Mobius (uneven spaces) + - Undeclared participants: EVERY name in an arrow MUST be a declared participant alias - **CRITICAL:** Splitting message across lines (see below) **CRITICAL LINE BREAK RULE FOR ARROWS:** @@ -199,6 +209,8 @@ - Missing endlegend - Not wrapping long values - Unescaped special characters breaking table + - Using backslash \ at end of a source line for line continuation (NOT valid PlantUML) + - Splitting a legend table row across multiple source lines **RULE 6: Color Codes (STRICT HEX FORMAT)** @@ -447,6 +459,9 @@ 2. Splitting legend table rows across lines 3. Breaking IDs/URLs with line breaks 4. Using actual line breaks instead of \n for display wrapping +5. Using undeclared participant names in arrows — declare ALL entities first +6. Chaining multiple arrows on one line (WRONG: A -> B -[#color]-> C — split into two lines) +7. Using backslash \ at end of source line for continuation (NOT valid PlantUML — use \n inside strings) Return the PlantUML diagram now.''' ) diff --git a/log-analyzer-frontend/components/analysis-view.tsx b/log-analyzer-frontend/components/analysis-view.tsx index c4b2501..1569b06 100644 --- a/log-analyzer-frontend/components/analysis-view.tsx +++ b/log-analyzer-frontend/components/analysis-view.tsx @@ -15,16 +15,24 @@ export function AnalysisView({ analysis }: AnalysisViewProps) { return ( -
+

{children}

, - h2: ({ children }) =>

{children}

, - h3: ({ children }) =>

{children}

, - p: ({ children }) =>

{children}

, - ul: ({ children }) =>
    {children}
, - ol: ({ children }) =>
    {children}
, - li: ({ children }) =>
  • {children}
  • , + h1: ({ children }) =>

    {children}

    , + h2: ({ children }) =>

    {children}

    , + h3: ({ children }) =>

    {children}

    , + p: ({ children }) => { + if (!children || (Array.isArray(children) && children.every((c: any) => c === null || c === undefined || c === ""))) return null + return

    {children}

    + }, + ul: ({ children }) =>
      {children}
    , + ol: ({ children }) =>
      {children}
    , + li: ({ children }) => { + if (!children || (typeof children === "string" && !(children as string).trim())) return null + return
  • {children}
  • + }, + hr: () =>
    , + strong: ({ children }) => {children}, code: ({ children }) => ( {children} ), @@ -40,7 +48,7 @@ export function AnalysisView({ analysis }: AnalysisViewProps) { ), }} > - {analysis} + {analysis.replace(/^[-*]\s*$/gm, "").replace(/\n{3,}/g, "\n\n")}
    diff --git a/log-analyzer-frontend/components/chat-panel.tsx b/log-analyzer-frontend/components/chat-panel.tsx index 0b1cd7f..537c0c3 100644 --- a/log-analyzer-frontend/components/chat-panel.tsx +++ b/log-analyzer-frontend/components/chat-panel.tsx @@ -7,6 +7,12 @@ import { Input } from "@/components/ui/input" import { Send, Square, Bot, User, Loader2 } from "lucide-react" import ReactMarkdown from "react-markdown" +function cleanMarkdown(md: string): string { + return md + .replace(/^[-*]\s*$/gm, "") // remove bullet-only lines (no content) + .replace(/\n{3,}/g, "\n\n") // collapse excessive blank lines +} + export interface ChatMessage { id: string role: "user" | "assistant" @@ -23,24 +29,29 @@ interface ChatPanelProps { const markdownComponents = { h1: ({ children }: any) => ( -

    {children}

    +

    {children}

    ), h2: ({ children }: any) => ( -

    {children}

    +

    {children}

    ), h3: ({ children }: any) => ( -

    {children}

    - ), - p: ({ children }: any) => ( -

    {children}

    +

    {children}

    ), + p: ({ children }: any) => { + if (!children || (Array.isArray(children) && children.every((c: any) => c === null || c === undefined || c === ""))) return null + return

    {children}

    + }, ul: ({ children }: any) => ( -
      {children}
    +
      {children}
    ), ol: ({ children }: any) => ( -
      {children}
    +
      {children}
    ), - li: ({ children }: any) =>
  • {children}
  • , + li: ({ children }: any) => { + if (!children || (typeof children === "string" && !children.trim())) return null + return
  • {children}
  • + }, + hr: () =>
    , code: ({ children, className }: any) => { const isBlock = className?.includes("language-") if (isBlock) { @@ -65,7 +76,7 @@ const markdownComponents = { ), strong: ({ children }: any) => ( - {children} + {children} ), table: ({ children }: any) => (
    @@ -73,7 +84,7 @@ const markdownComponents = {
    ), th: ({ children }: any) => ( - + {children} ), @@ -171,9 +182,9 @@ export function ChatPanel({ messages, loading, chatDisabled, onSendMessage, onSt {msg.content}

    ) : ( -
    +
    - {msg.content} + {cleanMarkdown(msg.content)}
    )} diff --git a/log-analyzer-frontend/components/chat-view.tsx b/log-analyzer-frontend/components/chat-view.tsx index e17f8ab..6f97f18 100644 --- a/log-analyzer-frontend/components/chat-view.tsx +++ b/log-analyzer-frontend/components/chat-view.tsx @@ -15,16 +15,24 @@ export function ChatView({ chatResponse }: ChatViewProps) { return ( -
    +

    {children}

    , - h2: ({ children }) =>

    {children}

    , - h3: ({ children }) =>

    {children}

    , - p: ({ children }) =>

    {children}

    , - ul: ({ children }) =>
      {children}
    , - ol: ({ children }) =>
      {children}
    , - li: ({ children }) =>
  • {children}
  • , + h1: ({ children }) =>

    {children}

    , + h2: ({ children }) =>

    {children}

    , + h3: ({ children }) =>

    {children}

    , + p: ({ children }) => { + if (!children || (Array.isArray(children) && children.every((c: any) => c === null || c === undefined || c === ""))) return null + return

    {children}

    + }, + ul: ({ children }) =>
      {children}
    , + ol: ({ children }) =>
      {children}
    , + li: ({ children }) => { + if (!children || (typeof children === "string" && !(children as string).trim())) return null + return
  • {children}
  • + }, + hr: () =>
    , + strong: ({ children }) => {children}, code: ({ children }) => ( {children} ), @@ -40,7 +48,7 @@ export function ChatView({ chatResponse }: ChatViewProps) { ), }} > - {chatResponse} + {chatResponse.replace(/^[-*]\s*$/gm, "").replace(/\n{3,}/g, "\n\n")}
    diff --git a/log-analyzer-frontend/lib/session-manager.ts b/log-analyzer-frontend/lib/session-manager.ts index 31e5b86..7722219 100644 --- a/log-analyzer-frontend/lib/session-manager.ts +++ b/log-analyzer-frontend/lib/session-manager.ts @@ -42,7 +42,7 @@ export class SessionManager { if (this.sessionCreated) return const controller = new AbortController() - const timeoutId = setTimeout(() => controller.abort(), 600000) + const timeoutId = setTimeout(() => controller.abort(), 1500000) try { const response = await fetch( @@ -74,7 +74,7 @@ export class SessionManager { async sendMessage(text: string): Promise { const controller = new AbortController() - const timeoutId = setTimeout(() => controller.abort(), 600000) + const timeoutId = setTimeout(() => controller.abort(), 1500000) this.activeController = controller this.activeTimeout = timeoutId