From 59bff4cae8e1aecf812b5794a298c394937e323f Mon Sep 17 00:00:00 2001 From: Thomas Howe Date: Tue, 27 Jan 2026 22:24:15 +0000 Subject: [PATCH 01/11] Add keyword_tagger link for automatic tagging Co-Authored-By: Claude Opus 4.5 --- server/links/keyword_tagger/__init__.py | 211 ++++++++++++++++++++++++ 1 file changed, 211 insertions(+) create mode 100644 server/links/keyword_tagger/__init__.py diff --git a/server/links/keyword_tagger/__init__.py b/server/links/keyword_tagger/__init__.py new file mode 100644 index 0000000..a7e8ccf --- /dev/null +++ b/server/links/keyword_tagger/__init__.py @@ -0,0 +1,211 @@ +"""Keyword Tagger Link + +This link searches vCon transcriptions for specific keywords and phrases, +then adds corresponding tags based on matches. + +Categories: +- Compliance & Regulatory (do_not_call, take_me_off, stop_calling, fcc, etc.) +- Voicemail Service Detection (youmail variants) +- Recording/Transcription mentions +- Greeting Patterns +- Legal/Professional +- Other Content + +Configuration options: + categories: List of category names to enable (default: all) + custom_keywords: Dict of additional tag -> keywords mappings + case_sensitive: Whether to match case-sensitively (default: false) +""" + +import re +from typing import Any, Dict, List, Optional, Set + +from server.lib.vcon_redis import VconRedis +from lib.logging_utils import init_logger + +logger = init_logger(__name__) + +# Keyword definitions organized by category +# Format: tag_name -> list of keywords/phrases to search for +KEYWORD_RULES = { + # Compliance & Regulatory + "compliance": { + "do_not_call": ["do not call", "don't call", "dont call", "do-not-call"], + "take_me_off": ["take me off", "take us off", "remove me", "remove us"], + "stop_calling": ["stop calling", "quit calling", "stop call"], + "mentions_fcc": ["fcc", "federal communications"], + "mentions_report": ["report you", "report this", "reporting", "file a complaint"], + "mentions_enforcement": ["enforc", "attorney general", "lawsuit", "sue you", "legal action"], + "mentions_scam": ["scam", "scammer", "scamming", "fraudulent", "fraud"], + "mentions_spam": ["spam", "spammer", "spamming", "junk call"], + "mentions_robocall": ["robo", "robocall", "auto-dial", "autodial", "autodialer"], + "mentions_protection": ["protect", "protection", "tcpa", "consumer protection"], + }, + + # Voicemail Service Detection (YouMail variants) + "voicemail_service": { + "youmail_detected": [ + "youmail", "youmale", "you mail", "you male", + "umale", "umail", "u mail", "u-mail", + ], + }, + + # Recording/Transcription mentions + "recording": { + "mentions_transcribe": ["transcribe", "transcription", "transcribing"], + "mentions_recording": ["being recorded", "this call is recorded", "call is being recorded", + "may be recorded", "will be recorded", "recording this call"], + "mentions_email": ["email", "e-mail"], # Note: may want to exclude "voicemail" + }, + + # Greeting Patterns + "greetings": { + "greeting_yea_hello": ["yea hello", "yeah hello", "ya hello"], + "greeting_hi_informal": ["hi j", "hi g", "hey j", "hey g"], + }, + + # Legal/Professional + "legal": { + "mentions_law": ["law office", "lawoffice", "law firm", "lawfirm", + "attorney", "lawyer", "legal department"], + }, + + # Other Content + "other": { + "profanity": ["fuck", "shit", "damn", "ass"], + "mentions_torn": ["torn"], + "mentions_push": ["push"], + "mentions_pitch": ["pitch", "sales pitch"], + "mentions_bank": ["bank", "banking", "banker"], + "mentions_county": ["county"], + "mentions_general": ["general"], + "mentions_subscribe": ["subscribe", "subscription"], + "why_calling": ["why are you calling", "why you calling", "why do you keep calling"], + }, +} + +default_options = { + "categories": None, # None means all categories + "custom_keywords": {}, # Additional tag -> keywords mappings + "case_sensitive": False, + "min_confidence": 0.0, # Minimum transcription confidence to process +} + + +def get_transcription_text(vcon: Any) -> Optional[str]: + """Extract transcription text from vCon analysis entries.""" + texts = [] + + for analysis in vcon.analysis: + analysis_type = analysis.get("type", "") + + # Handle WTF transcription format + if analysis_type == "wtf_transcription": + body = analysis.get("body", {}) + if isinstance(body, dict): + transcript = body.get("transcript", {}) + if isinstance(transcript, dict): + text = transcript.get("text", "") + if text: + texts.append(text) + # Also check segments for text + segments = body.get("segments", []) + for seg in segments: + seg_text = seg.get("text", "") + if seg_text: + texts.append(seg_text) + + # Handle standard transcription format + elif analysis_type == "transcription": + body = analysis.get("body", "") + if isinstance(body, str): + texts.append(body) + elif isinstance(body, dict): + text = body.get("text", body.get("transcript", "")) + if text: + texts.append(text) + + if texts: + return " ".join(texts) + return None + + +def find_keywords(text: str, keywords: List[str], case_sensitive: bool = False) -> Set[str]: + """Find which keywords are present in the text.""" + if not case_sensitive: + text = text.lower() + + found = set() + for keyword in keywords: + search_keyword = keyword if case_sensitive else keyword.lower() + if search_keyword in text: + found.add(keyword) + + return found + + +def run( + vcon_uuid: str, + link_name: str, + opts: Dict[str, Any] = None, +) -> Optional[str]: + """Process a vCon and add keyword-based tags.""" + merged_opts = default_options.copy() + if opts: + merged_opts.update(opts) + opts = merged_opts + + logger.info(f"Starting keyword_tagger for vCon: {vcon_uuid}") + + vcon_redis = VconRedis() + vcon = vcon_redis.get_vcon(vcon_uuid) + + if not vcon: + logger.error(f"keyword_tagger: vCon {vcon_uuid} not found") + return vcon_uuid + + # Get transcription text + text = get_transcription_text(vcon) + + if not text: + logger.debug(f"No transcription found for vCon {vcon_uuid}") + return vcon_uuid + + logger.debug(f"Analyzing transcription ({len(text)} chars) for vCon {vcon_uuid}") + + case_sensitive = opts.get("case_sensitive", False) + enabled_categories = opts.get("categories") # None = all + custom_keywords = opts.get("custom_keywords", {}) + + tags_added = [] + + # Process built-in keyword rules + for category, rules in KEYWORD_RULES.items(): + # Skip if category filtering is enabled and this category is not in the list + if enabled_categories is not None and category not in enabled_categories: + continue + + for tag_name, keywords in rules.items(): + found = find_keywords(text, keywords, case_sensitive) + if found: + vcon.add_tag(tag_name=tag_name, tag_value=",".join(sorted(found))) + tags_added.append(tag_name) + logger.debug(f"Added tag '{tag_name}' (matched: {found})") + + # Process custom keywords + for tag_name, keywords in custom_keywords.items(): + if isinstance(keywords, str): + keywords = [keywords] + found = find_keywords(text, keywords, case_sensitive) + if found: + vcon.add_tag(tag_name=tag_name, tag_value=",".join(sorted(found))) + tags_added.append(tag_name) + logger.debug(f"Added custom tag '{tag_name}' (matched: {found})") + + if tags_added: + vcon_redis.store_vcon(vcon) + logger.info(f"Added {len(tags_added)} tags to vCon {vcon_uuid}: {tags_added}") + else: + logger.debug(f"No keyword matches for vCon {vcon_uuid}") + + return vcon_uuid From 950bc3c35412d2c4dd256ef5f08fccd1734ed8dc Mon Sep 17 00:00:00 2001 From: Thomas Howe Date: Tue, 27 Jan 2026 22:24:27 +0000 Subject: [PATCH 02/11] Add wtf_transcribe link for WTF transcription Co-Authored-By: Claude Opus 4.5 --- server/links/wtf_transcribe/README.md | 140 ++++++++++ server/links/wtf_transcribe/__init__.py | 340 ++++++++++++++++++++++++ 2 files changed, 480 insertions(+) create mode 100644 server/links/wtf_transcribe/README.md create mode 100644 server/links/wtf_transcribe/__init__.py diff --git a/server/links/wtf_transcribe/README.md b/server/links/wtf_transcribe/README.md new file mode 100644 index 0000000..6b22fdc --- /dev/null +++ b/server/links/wtf_transcribe/README.md @@ -0,0 +1,140 @@ +# WTF Transcription Link (vfun Integration) + +A link that sends vCon audio dialogs to a vfun transcription server and adds the results as WTF (World Transcription Format) analysis entries. + +## Overview + +This link integrates with the vfun transcription server to provide: +- Multi-language speech recognition (English + auto-detect) +- Speaker diarization (who spoke when) +- GPU-accelerated processing with CUDA +- WTF-compliant output format per IETF draft-howe-vcon-wtf-extension-01 + +## Configuration + +```yaml +wtf_transcribe: + module: links.wtf_transcribe + options: + # Required: URL of the vfun transcription server + vfun-server-url: http://localhost:8443/transcribe + + # Optional: Enable speaker diarization (default: true) + diarize: true + + # Optional: Request timeout in seconds (default: 300) + timeout: 300 + + # Optional: Minimum dialog duration to transcribe in seconds (default: 5) + min-duration: 5 + + # Optional: API key for vfun server authentication + api-key: your-api-key-here +``` + +## How It Works + +1. **Extract Audio**: Reads audio from vCon dialog (supports `body` with base64/base64url encoding, or `url` with file:// or http:// references) +2. **Send to vfun**: POSTs audio file to vfun's `/transcribe` endpoint +3. **Create WTF Analysis**: Formats the transcription result as a WTF analysis entry +4. **Update vCon**: Adds the WTF analysis to the vCon and stores it back to Redis + +## Output Format + +The link adds analysis entries with the WTF format: + +```json +{ + "type": "wtf_transcription", + "dialog": 0, + "mediatype": "application/json", + "vendor": "vfun", + "product": "parakeet-tdt-110m", + "schema": "wtf-1.0", + "encoding": "json", + "body": { + "transcript": { + "text": "Hello, how can I help you today?", + "language": "en-US", + "duration": 30.0, + "confidence": 0.95 + }, + "segments": [ + { + "id": 0, + "start": 0.0, + "end": 3.5, + "text": "Hello, how can I help you today?", + "confidence": 0.95, + "speaker": 0 + } + ], + "metadata": { + "created_at": "2024-01-15T10:30:00Z", + "processed_at": "2024-01-15T10:30:05Z", + "provider": "vfun", + "model": "parakeet-tdt-110m" + }, + "speakers": { + "0": { + "id": 0, + "label": "Speaker 0", + "segments": [0], + "total_time": 15.2 + } + }, + "quality": { + "average_confidence": 0.95, + "multiple_speakers": true, + "low_confidence_words": 0 + } + } +} +``` + +## Behavior + +- **Skips non-recording dialogs**: Only processes dialogs with `type: "recording"` +- **Skips already transcribed**: Dialogs with existing WTF transcriptions are skipped +- **Duration filtering**: Dialogs shorter than `min-duration` are skipped +- **File URL support**: Can read audio from local `file://` URLs directly + +## Example Chain Configuration + +```yaml +chains: + transcription_chain: + links: + - tag + - wtf_transcribe + - supabase_webhook + ingress_lists: + - transcribe + egress_lists: + - transcribed + enabled: 1 +``` + +## vfun Server + +The vfun server provides GPU-accelerated transcription: + +```bash +# Start vfun server +cd /path/to/vfun +./vfun server + +# Test health +curl http://localhost:8443/ping + +# Manual transcription test +curl -X POST http://localhost:8443/transcribe \ + -H "Authorization: Bearer YOUR_API_KEY" \ + -F "file=@audio.wav" \ + -F "diarize=true" +``` + +## Related + +- [vfun](https://github.com/strolid/vfun) - GPU-accelerated transcription server +- [draft-howe-vcon-wtf-extension](https://datatracker.ietf.org/doc/html/draft-howe-vcon-wtf-extension) - IETF WTF specification diff --git a/server/links/wtf_transcribe/__init__.py b/server/links/wtf_transcribe/__init__.py new file mode 100644 index 0000000..7c07407 --- /dev/null +++ b/server/links/wtf_transcribe/__init__.py @@ -0,0 +1,340 @@ +"""WTF Transcription Link (vfun integration) + +This link sends vCon audio dialogs to a vfun transcription server and adds +the results as WTF (World Transcription Format) analysis entries. + +The vfun server provides: +- Multi-language speech recognition (English + auto-detect) +- Speaker diarization (who spoke when) +- GPU-accelerated processing with CUDA + +Configuration options: + vfun-server-url: URL of the vfun transcription server (required) + diarize: Enable speaker diarization (default: true) + timeout: Request timeout in seconds (default: 300) + min-duration: Minimum dialog duration to transcribe in seconds (default: 5) + api-key: Optional API key for vfun server authentication + +Example configuration in config.yml: + wtf_transcribe: + module: links.wtf_transcribe + options: + vfun-server-url: http://localhost:8443/transcribe + diarize: true + timeout: 300 + min-duration: 5 + api-key: your-api-key-here +""" + +import base64 +import json +import logging +import os +import tempfile +import requests +from datetime import datetime, timezone +from typing import Optional, Dict, Any, List + +from server.lib.vcon_redis import VconRedis +from lib.logging_utils import init_logger +from lib.error_tracking import init_error_tracker + +init_error_tracker() +logger = init_logger(__name__) + +default_options = { + "vfun-server-url": None, + "diarize": True, + "timeout": 300, + "min-duration": 5, + "api-key": None, +} + + +def has_wtf_transcription(vcon: Any, dialog_index: int) -> bool: + """Check if a dialog already has a WTF transcription.""" + for analysis in vcon.analysis: + if (analysis.get("type") == "wtf_transcription" and + analysis.get("dialog") == dialog_index): + return True + return False + + +def should_transcribe_dialog(dialog: Dict[str, Any], min_duration: float) -> bool: + """Check if a dialog should be transcribed.""" + if dialog.get("type") != "recording": + return False + if not dialog.get("body") and not dialog.get("url"): + return False + duration = dialog.get("duration") + if duration is not None and float(duration) < min_duration: + return False + return True + + +def get_audio_content(dialog: Dict[str, Any]) -> Optional[bytes]: + """Extract audio content from dialog body or URL.""" + if dialog.get("body"): + encoding = dialog.get("encoding", "base64") + if encoding == "base64url": + return base64.urlsafe_b64decode(dialog["body"]) + elif encoding == "base64": + return base64.b64decode(dialog["body"]) + else: + return dialog["body"].encode() if isinstance(dialog["body"], str) else dialog["body"] + + if dialog.get("url"): + url = dialog["url"] + if url.startswith("file://"): + filepath = url[7:] + try: + with open(filepath, "rb") as f: + return f.read() + except Exception as e: + logger.error(f"Failed to read file {filepath}: {e}") + return None + else: + try: + resp = requests.get(url, timeout=60) + resp.raise_for_status() + return resp.content + except Exception as e: + logger.error(f"Failed to fetch URL {url}: {e}") + return None + return None + + +def create_wtf_analysis( + dialog_index: int, + vfun_response: Dict[str, Any], + duration: float, +) -> Dict[str, Any]: + """Create a WTF analysis entry from vfun response.""" + now = datetime.now(timezone.utc).isoformat() + + # Extract text and segments from vfun response + # vfun returns: analysis[].body with transcription data + analysis_entries = vfun_response.get("analysis", []) + + full_text = "" + segments = [] + language = "en-US" + + for entry in analysis_entries: + if entry.get("type") in ("transcription", "wtf_transcription"): + body = entry.get("body", {}) + + # Handle different response formats + if isinstance(body, dict): + # WTF format from vfun + transcript = body.get("transcript", {}) + full_text = transcript.get("text", body.get("text", "")) + language = transcript.get("language", body.get("language", "en-US")) + segments = body.get("segments", []) + elif isinstance(body, str): + full_text = body + break + + # If no analysis found, check for direct text field + if not full_text: + full_text = vfun_response.get("text", "") + segments = vfun_response.get("segments", []) + + # Calculate confidence + if segments: + confidences = [s.get("confidence", 0.9) for s in segments] + avg_confidence = sum(confidences) / len(confidences) + else: + avg_confidence = 0.9 + + # Build WTF segments + wtf_segments = [] + for i, seg in enumerate(segments): + wtf_seg = { + "id": seg.get("id", i), + "start": float(seg.get("start", seg.get("start_time", 0.0))), + "end": float(seg.get("end", seg.get("end_time", 0.0))), + "text": seg.get("text", seg.get("transcription", "")), + "confidence": float(seg.get("confidence", 0.9)), + } + if "speaker" in seg: + wtf_seg["speaker"] = seg["speaker"] + wtf_segments.append(wtf_seg) + + # Build speakers section + speakers = {} + for seg in wtf_segments: + speaker = seg.get("speaker") + if speaker is not None: + speaker_key = str(speaker) + if speaker_key not in speakers: + speakers[speaker_key] = { + "id": speaker, + "label": f"Speaker {speaker}", + "segments": [], + "total_time": 0.0, + } + speakers[speaker_key]["segments"].append(seg["id"]) + speakers[speaker_key]["total_time"] += seg["end"] - seg["start"] + + # Build WTF body + wtf_body = { + "transcript": { + "text": full_text, + "language": language, + "duration": float(duration), + "confidence": float(avg_confidence), + }, + "segments": wtf_segments, + "metadata": { + "created_at": now, + "processed_at": now, + "provider": "vfun", + "model": "parakeet-tdt-110m", + "audio": { + "duration": float(duration), + }, + }, + "quality": { + "average_confidence": float(avg_confidence), + "multiple_speakers": len(speakers) > 1, + "low_confidence_words": sum(1 for s in wtf_segments if s.get("confidence", 1.0) < 0.5), + }, + } + + if speakers: + wtf_body["speakers"] = speakers + + return { + "type": "wtf_transcription", + "dialog": dialog_index, + "mediatype": "application/json", + "vendor": "vfun", + "product": "parakeet-tdt-110m", + "schema": "wtf-1.0", + # Note: encoding omitted since body is a direct object, not a JSON string + "body": wtf_body, + } + + +def run( + vcon_uuid: str, + link_name: str, + opts: Dict[str, Any] = None, +) -> Optional[str]: + """Process a vCon through the vfun transcription service.""" + merged_opts = default_options.copy() + if opts: + merged_opts.update(opts) + opts = merged_opts + + logger.info(f"Starting wtf_transcribe link for vCon: {vcon_uuid}") + + vfun_server_url = opts.get("vfun-server-url") + if not vfun_server_url: + logger.error("wtf_transcribe: vfun-server-url is required") + return vcon_uuid + + vcon_redis = VconRedis() + vcon = vcon_redis.get_vcon(vcon_uuid) + + if not vcon: + logger.error(f"wtf_transcribe: vCon {vcon_uuid} not found") + return vcon_uuid + + # Find dialogs to transcribe + dialogs_processed = 0 + dialogs_skipped = 0 + + for i, dialog in enumerate(vcon.dialog): + if not should_transcribe_dialog(dialog, opts.get("min-duration", 5)): + logger.debug(f"Skipping dialog {i} (not eligible)") + dialogs_skipped += 1 + continue + + if has_wtf_transcription(vcon, i): + logger.debug(f"Skipping dialog {i} (already transcribed)") + dialogs_skipped += 1 + continue + + # Get audio content + audio_content = get_audio_content(dialog) + if not audio_content: + logger.warning(f"Could not extract audio from dialog {i}") + dialogs_skipped += 1 + continue + + logger.info(f"Transcribing dialog {i} for vCon {vcon_uuid}") + + try: + # Build request to vfun server + headers = {} + api_key = opts.get("api-key") + if api_key: + headers["Authorization"] = f"Bearer {api_key}" + + # Get filename from dialog or generate one + filename = dialog.get("filename", f"audio_{i}.wav") + mimetype = dialog.get("mimetype", "audio/wav") + + # Send audio to vfun server + files = {"file": (filename, audio_content, mimetype)} + data = { + "diarize": str(opts.get("diarize", True)), + "block": "true", + } + + response = requests.post( + vfun_server_url, + files=files, + data=data, + headers=headers, + timeout=opts.get("timeout", 300), + ) + + if response.status_code in (200, 302): + vfun_response = response.json() + # Handle double-encoded JSON (vfun sometimes returns JSON string) + if isinstance(vfun_response, str): + vfun_response = json.loads(vfun_response) + + duration = dialog.get("duration", 30.0) + wtf_analysis = create_wtf_analysis(i, vfun_response, float(duration)) + + # Add analysis to vCon + vcon.add_analysis( + type=wtf_analysis["type"], + dialog=wtf_analysis["dialog"], + vendor=wtf_analysis.get("vendor"), + body=wtf_analysis["body"], + extra={ + "mediatype": wtf_analysis.get("mediatype"), + "product": wtf_analysis.get("product"), + "schema": wtf_analysis.get("schema"), + }, + ) + + dialogs_processed += 1 + logger.info(f"Added WTF transcription for dialog {i}") + + else: + logger.error( + f"vfun transcription failed for dialog {i}: " + f"status={response.status_code}, response={response.text[:200]}" + ) + + except requests.exceptions.Timeout: + logger.error(f"vfun transcription timed out for dialog {i}") + except Exception as e: + logger.error(f"Error transcribing dialog {i}: {e}", exc_info=True) + + if dialogs_processed > 0: + vcon_redis.store_vcon(vcon) + logger.info( + f"Updated vCon {vcon_uuid}: processed={dialogs_processed}, " + f"skipped={dialogs_skipped}" + ) + else: + logger.info(f"No dialogs transcribed for vCon {vcon_uuid}") + + return vcon_uuid From 7f74f867f81f935d7bc7d461b0d5ea80daa7e339 Mon Sep 17 00:00:00 2001 From: Thomas Howe Date: Tue, 27 Jan 2026 22:24:43 +0000 Subject: [PATCH 03/11] Ensure vcon version 0.3.0 for webhook compatibility Updates webhook link to set vcon version to 0.3.0 for compatibility with vcon-mcp REST API. Co-Authored-By: Claude Opus 4.5 --- server/links/webhook/__init__.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/server/links/webhook/__init__.py b/server/links/webhook/__init__.py index 16b433c..28c8afd 100644 --- a/server/links/webhook/__init__.py +++ b/server/links/webhook/__init__.py @@ -33,6 +33,10 @@ def run( # The webhook needs a stringified JSON version. json_dict = vCon.to_dict() + # Ensure vcon version is 0.3.0 for compatibility with vcon-mcp REST API + if json_dict.get("vcon") == "0.0.1" or "vcon" not in json_dict: + json_dict["vcon"] = "0.3.0" + # Build headers from configuration headers = opts.get("headers", {}) From 161b623f7333bf7f8d1364b4b5999826d52c1e95 Mon Sep 17 00:00:00 2001 From: Thomas Howe Date: Tue, 27 Jan 2026 22:27:42 +0000 Subject: [PATCH 04/11] Use HTTPS for apt sources in Dockerfile Configure apt to use HTTPS sources for environments where HTTP port 80 is blocked. Co-Authored-By: Claude Opus 4.5 --- docker/Dockerfile | 3 +++ 1 file changed, 3 insertions(+) diff --git a/docker/Dockerfile b/docker/Dockerfile index f1c781d..13091e2 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -10,6 +10,9 @@ ENV VCON_SERVER_VERSION=${VCON_SERVER_VERSION} ENV VCON_SERVER_GIT_COMMIT=${VCON_SERVER_GIT_COMMIT} ENV VCON_SERVER_BUILD_TIME=${VCON_SERVER_BUILD_TIME} +# Configure apt to use HTTPS sources (required when HTTP port 80 is blocked) +RUN sed -i 's|http://deb.debian.org|https://deb.debian.org|g' /etc/apt/sources.list.d/debian.sources + RUN apt-get update && \ apt-get install -y libavdevice-dev ffmpeg From 74e747b7ce91e19457b3ac96d6ad4c04022584a2 Mon Sep 17 00:00:00 2001 From: Thomas Howe Date: Tue, 27 Jan 2026 22:27:19 +0000 Subject: [PATCH 05/11] Add SigNoz observability configuration Includes docker-compose and config files for SigNoz observability stack with OpenTelemetry collector. Co-Authored-By: Claude Opus 4.5 --- docker-compose.signoz.yml | 120 ++++++++++++++++++++ signoz/README.md | 176 ++++++++++++++++++++++++++++++ signoz/alertmanager.yml | 19 ++++ signoz/clickhouse-users.xml | 38 +++++++ signoz/otel-collector-config.yaml | 48 ++++++++ signoz/zz-clickhouse-config.xml | 53 +++++++++ 6 files changed, 454 insertions(+) create mode 100644 docker-compose.signoz.yml create mode 100644 signoz/README.md create mode 100644 signoz/alertmanager.yml create mode 100644 signoz/clickhouse-users.xml create mode 100644 signoz/otel-collector-config.yaml create mode 100644 signoz/zz-clickhouse-config.xml diff --git a/docker-compose.signoz.yml b/docker-compose.signoz.yml new file mode 100644 index 0000000..d57fb8f --- /dev/null +++ b/docker-compose.signoz.yml @@ -0,0 +1,120 @@ +# SigNoz Observability Stack +# Usage: docker compose -f docker-compose.yml -f docker-compose.override.yml -f docker-compose.signoz.yml up -d +# +# After first run, execute schema migrations: +# docker run --rm --network conserver signoz/signoz-schema-migrator:latest sync --dsn='tcp://signoz-clickhouse:9000' +# +# Access UI at: http://localhost:3301 + +networks: + conserver: + external: true + +volumes: + signoz_clickhouse_data: + signoz_zookeeper_data: + signoz_zookeeper_log: + signoz_data: + +services: + signoz-zookeeper: + image: zookeeper:3.9 + container_name: signoz-zookeeper + hostname: signoz-zookeeper + environment: + - ZOO_AUTOPURGE_PURGEINTERVAL=1 + - ZOO_4LW_COMMANDS_WHITELIST=mntr,ruok,stat + volumes: + - signoz_zookeeper_data:/data + - signoz_zookeeper_log:/datalog + networks: + - conserver + healthcheck: + test: ["CMD-SHELL", "echo ruok | nc localhost 2181 | grep imok"] + interval: 30s + timeout: 10s + retries: 3 + restart: unless-stopped + + signoz-clickhouse: + image: clickhouse/clickhouse-server:24.1.2-alpine + container_name: signoz-clickhouse + hostname: signoz-clickhouse + tty: true + depends_on: + signoz-zookeeper: + condition: service_healthy + volumes: + - signoz_clickhouse_data:/var/lib/clickhouse + - ./signoz/zz-clickhouse-config.xml:/etc/clickhouse-server/config.d/zz-clickhouse-config.xml:ro + - ./signoz/clickhouse-users.xml:/etc/clickhouse-server/users.d/users.xml:ro + environment: + - CLICKHOUSE_DB=signoz_traces + - CLICKHOUSE_USER=default + - CLICKHOUSE_PASSWORD= + ulimits: + nofile: + soft: 262144 + hard: 262144 + networks: + - conserver + healthcheck: + test: ["CMD", "wget", "--no-verbose", "--tries=1", "--spider", "http://127.0.0.1:8123/ping"] + interval: 30s + timeout: 10s + retries: 3 + restart: unless-stopped + + signoz-otel-collector: + image: signoz/signoz-otel-collector:latest + container_name: signoz-otel-collector + hostname: signoz-otel-collector + command: + - "--config=/etc/otel-collector-config.yaml" + depends_on: + signoz-clickhouse: + condition: service_healthy + environment: + - OTEL_RESOURCE_ATTRIBUTES=host.name=signoz-host,os.type=linux + volumes: + - ./signoz/otel-collector-config.yaml:/etc/otel-collector-config.yaml:ro + ports: + - "4317:4317" # OTLP gRPC receiver + - "4318:4318" # OTLP HTTP receiver + networks: + - conserver + healthcheck: + test: ["CMD", "wget", "--no-verbose", "--tries=1", "--spider", "http://127.0.0.1:13133/"] + interval: 30s + timeout: 10s + retries: 3 + restart: unless-stopped + + signoz: + image: signoz/query-service:latest + container_name: signoz + hostname: signoz + depends_on: + signoz-clickhouse: + condition: service_healthy + environment: + - ClickHouseUrl=tcp://signoz-clickhouse:9000 + - SIGNOZ_LOCAL_DB_PATH=/var/lib/signoz/signoz.db + - DASHBOARDS_PATH=/root/config/dashboards + - STORAGE=clickhouse + - GODEBUG=netdns=go + - TELEMETRY_ENABLED=true + - DEPLOYMENT_TYPE=docker-standalone + volumes: + - signoz_data:/var/lib/signoz + - ./signoz/dashboards:/root/config/dashboards + ports: + - "3301:8080" # Web UI + networks: + - conserver + healthcheck: + test: ["CMD", "wget", "--no-verbose", "--tries=1", "--spider", "http://127.0.0.1:8080/api/v1/health"] + interval: 30s + timeout: 10s + retries: 3 + restart: unless-stopped diff --git a/signoz/README.md b/signoz/README.md new file mode 100644 index 0000000..cc38424 --- /dev/null +++ b/signoz/README.md @@ -0,0 +1,176 @@ +# SigNoz Observability Stack for vcon-server + +This directory contains the configuration for SigNoz, a self-hosted observability platform that collects traces, metrics, and logs from the vcon-mcp server via OpenTelemetry. + +## Architecture + +``` +┌─────────────────┐ OTLP/HTTP ┌──────────────────────┐ +│ vcon-mcp │ ─────────────────► │ signoz-otel-collector│ +│ (instrumented) │ :4318 │ (OTLP receiver) │ +└─────────────────┘ └──────────┬───────────┘ + │ + ▼ +┌─────────────────┐ ┌──────────────────────┐ +│ signoz (UI) │ ◄────────────────► │ signoz-clickhouse │ +│ :3301 │ TCP :9000 │ (time-series DB) │ +└─────────────────┘ └──────────┬───────────┘ + │ + ▼ + ┌──────────────────────┐ + │ signoz-zookeeper │ + │ (coordination) │ + └──────────────────────┘ +``` + +## Components + +| Service | Image | Purpose | Ports | +|---------|-------|---------|-------| +| signoz | `signoz/query-service:latest` | Query API + Web UI | 3301 (mapped from 8080) | +| signoz-otel-collector | `signoz/signoz-otel-collector:latest` | OTLP ingestion | 4317 (gRPC), 4318 (HTTP) | +| signoz-clickhouse | `clickhouse/clickhouse-server:24.1.2-alpine` | Time-series storage | 8123, 9000 (internal) | +| signoz-zookeeper | `zookeeper:3.9` | ClickHouse coordination | 2181 (internal) | + +## Configuration Files + +### otel-collector-config.yaml +OpenTelemetry Collector pipeline configuration: +- **Receivers**: OTLP gRPC (4317) and HTTP (4318) +- **Processors**: Batch processing +- **Exporters**: ClickHouse for traces, metrics, and logs + +### zz-clickhouse-config.xml +ClickHouse server configuration: +- IPv4 listening (0.0.0.0) +- Single-node cluster named "cluster" (required by SigNoz schema migrator) +- ZooKeeper integration for distributed DDL + +### clickhouse-users.xml +ClickHouse user permissions with default user having full access. + +### alertmanager.yml +Basic alertmanager configuration (not currently active). + +## Usage + +### Start with SigNoz + +```bash +cd /home/thomas/bds/vcon-dev/vcon-server +docker compose -f docker-compose.yml -f docker-compose.override.yml -f docker-compose.signoz.yml up -d +``` + +### Start without SigNoz (normal operation) + +```bash +cd /home/thomas/bds/vcon-dev/vcon-server +docker compose -f docker-compose.yml -f docker-compose.override.yml up -d +``` + +### Stop SigNoz only + +```bash +docker compose -f docker-compose.signoz.yml down +``` + +### Access the UI + +Open http://localhost:3301 in your browser. + +## First-Time Setup + +After starting SigNoz for the first time, run the schema migrations: + +```bash +docker run --rm --network conserver \ + signoz/signoz-schema-migrator:latest \ + sync --dsn='tcp://signoz-clickhouse:9000' +``` + +Note: Some migrations may fail due to JSON type syntax incompatibility with ClickHouse 24.1. Core functionality still works. + +## vcon-mcp Integration + +The vcon-mcp service is configured with these environment variables in `docker-compose.override.yml`: + +```yaml +environment: + OTEL_ENABLED: "true" + OTEL_EXPORTER_TYPE: otlp + OTEL_ENDPOINT: http://signoz-otel-collector:4318 + OTEL_SERVICE_NAME: vcon-mcp-server +``` + +## Verification + +1. Check service health: + ```bash + curl http://localhost:3301/api/v1/health + # Returns: {"status":"ok"} + ``` + +2. Check container status: + ```bash + docker ps | grep signoz + ``` + +3. View collector logs: + ```bash + docker logs signoz-otel-collector + ``` + +## Troubleshooting + +### ClickHouse won't start +- Check if port 9000 is in use +- Verify zookeeper is healthy first +- Check logs: `docker logs signoz-clickhouse` + +### OTEL Collector errors +- Ensure ClickHouse is healthy before starting collector +- Verify schema migrations have run +- Check config syntax: `docker logs signoz-otel-collector` + +### No data in UI +- Verify vcon-mcp is sending data (check its logs for OTEL export messages) +- Ensure collector is receiving data: check collector metrics at port 8888 +- Verify ClickHouse tables exist: `docker exec signoz-clickhouse clickhouse-client --query "SHOW TABLES FROM signoz_traces"` + +### Port conflicts +- Default ports: 3301 (UI), 4317 (gRPC), 4318 (HTTP) +- Change in docker-compose.signoz.yml if needed + +## Known Issues + +1. **Schema Migration Failures**: Some newer SigNoz migrations use JSON column types with syntax not supported in ClickHouse 24.1.2. Core observability works but some advanced features may be limited. + +2. **Alertmanager**: Not configured for this deployment. Would require additional setup for alerts. + +3. **Health Check Timing**: The OTEL collector health check may show "starting" for extended periods but the service is functional. + +## Future Improvements + +- Upgrade ClickHouse to latest version for full schema compatibility +- Add alertmanager configuration for alerts +- Configure data retention policies +- Add authentication to SigNoz UI +- Set up dashboards for vcon-mcp metrics + +## Data Persistence + +Data is stored in Docker volumes: +- `signoz_clickhouse_data` - Traces, metrics, logs +- `signoz_zookeeper_data` - ZooKeeper state +- `signoz_data` - SigNoz query service state + +To reset all data: +```bash +docker compose -f docker-compose.signoz.yml down -v +``` + +## Resources + +- [SigNoz Documentation](https://signoz.io/docs/) +- [OpenTelemetry Documentation](https://opentelemetry.io/docs/) +- [ClickHouse Documentation](https://clickhouse.com/docs/) diff --git a/signoz/alertmanager.yml b/signoz/alertmanager.yml new file mode 100644 index 0000000..89b0125 --- /dev/null +++ b/signoz/alertmanager.yml @@ -0,0 +1,19 @@ +global: + resolve_timeout: 5m + +route: + group_by: ['alertname'] + group_wait: 10s + group_interval: 10s + repeat_interval: 1h + receiver: 'default-receiver' + +receivers: + - name: 'default-receiver' + +inhibit_rules: + - source_match: + severity: 'critical' + target_match: + severity: 'warning' + equal: ['alertname', 'dev', 'instance'] diff --git a/signoz/clickhouse-users.xml b/signoz/clickhouse-users.xml new file mode 100644 index 0000000..c545475 --- /dev/null +++ b/signoz/clickhouse-users.xml @@ -0,0 +1,38 @@ + + + + 10000000000 + 0 + random + 100 + + + 1 + + + + + + + + ::/0 + + default + default + 1 + + + + + + + 3600 + 0 + 0 + 0 + 0 + 0 + + + + diff --git a/signoz/otel-collector-config.yaml b/signoz/otel-collector-config.yaml new file mode 100644 index 0000000..eeedb0c --- /dev/null +++ b/signoz/otel-collector-config.yaml @@ -0,0 +1,48 @@ +receivers: + otlp: + protocols: + grpc: + endpoint: 0.0.0.0:4317 + http: + endpoint: 0.0.0.0:4318 + +processors: + batch: + send_batch_size: 10000 + timeout: 10s + +exporters: + clickhousetraces: + datasource: tcp://signoz-clickhouse:9000/signoz_traces + clickhouselogsexporter: + dsn: tcp://signoz-clickhouse:9000/signoz_logs + timeout: 5s + sending_queue: + queue_size: 100 + retry_on_failure: + enabled: true + initial_interval: 5s + max_interval: 30s + max_elapsed_time: 300s + signozclickhousemetrics: + dsn: tcp://signoz-clickhouse:9000/signoz_metrics + +extensions: + health_check: + endpoint: 0.0.0.0:13133 + +service: + extensions: [health_check] + pipelines: + traces: + receivers: [otlp] + processors: [batch] + exporters: [clickhousetraces] + metrics: + receivers: [otlp] + processors: [batch] + exporters: [signozclickhousemetrics] + logs: + receivers: [otlp] + processors: [batch] + exporters: [clickhouselogsexporter] diff --git a/signoz/zz-clickhouse-config.xml b/signoz/zz-clickhouse-config.xml new file mode 100644 index 0000000..d50368e --- /dev/null +++ b/signoz/zz-clickhouse-config.xml @@ -0,0 +1,53 @@ + + + warning + true + + + 0.0.0.0 + + 4096 + 3 + 100 + + /var/lib/clickhouse/ + /var/lib/clickhouse/tmp/ + /var/lib/clickhouse/user_files/ + + users.xml + default + default + + UTC + + true + + 3600 + + 3600 + 60 + + + + + + signoz-clickhouse + 9000 + + + + + + + cluster + 01 + signoz-clickhouse + + + + + signoz-zookeeper + 2181 + + + From cefef641874f2d61b17f25a4b0bd13a9c1025346 Mon Sep 17 00:00:00 2001 From: Thomas Howe Date: Sun, 8 Feb 2026 01:07:43 +0000 Subject: [PATCH 06/11] Add /stats/queue endpoint for Redis queue depth monitoring Public endpoint (no auth) that returns the depth of any Redis list, used by the audio adapter for backpressure control. Co-Authored-By: Claude Opus 4.6 --- server/api.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/server/api.py b/server/api.py index c38d7ca..78b40b0 100644 --- a/server/api.py +++ b/server/api.py @@ -260,6 +260,24 @@ async def health_check() -> JSONResponse: }) +@app.get( + "/stats/queue", + summary="Get queue depth", + description="Returns the number of items in a Redis list (queue)", + tags=["system"], +) +async def get_queue_depth( + list_name: str = Query(..., description="Name of the Redis list to measure") +) -> JSONResponse: + """Get the current depth of a Redis list. Public endpoint (no auth) for monitoring and backpressure.""" + try: + depth = await redis_async.llen(list_name) + return JSONResponse(content={"list_name": list_name, "depth": depth}) + except Exception as e: + logger.error(f"Error getting queue depth for '{list_name}': {str(e)}") + raise HTTPException(status_code=500, detail="Failed to get queue depth") + + class Vcon(BaseModel): """Pydantic model representing a vCon (Voice Conversation) record. From bd0715f4c35954555d2933a97a19d518178cf2be Mon Sep 17 00:00:00 2001 From: Thomas Howe Date: Wed, 18 Feb 2026 17:17:07 +0000 Subject: [PATCH 07/11] Optimize vCon ingest by removing redundant Redis operations The post_vcon and external_ingress_vcon paths called index_vcon() which re-read the vCon from Redis (JSON.GET) and duplicated the sorted set add (ZADD) that was already done by the caller. This added 2 unnecessary Redis round-trips per ingest. Extract index_vcon_parties() that takes the vCon dict directly, and use it in both POST paths. The original index_vcon() is preserved for the bulk re-indexing endpoint. Reduces ingest from 11 to 9 Redis ops per vCon, measured 4.9x improvement in adapter posting throughput. Co-Authored-By: Claude Opus 4.6 --- server/api.py | 45 ++++++++++++++++++++++++++++----------------- 1 file changed, 28 insertions(+), 17 deletions(-) diff --git a/server/api.py b/server/api.py index 78b40b0..1f3ce96 100644 --- a/server/api.py +++ b/server/api.py @@ -677,7 +677,7 @@ async def post_vcon( await add_vcon_to_set(key, timestamp) logger.debug(f"Indexing vCon {inbound_vcon.uuid}") - await index_vcon(inbound_vcon.uuid) + await index_vcon_parties(str(inbound_vcon.uuid), dict_vcon["parties"]) # Add to ingress lists if specified if ingress_lists: @@ -772,7 +772,7 @@ async def external_ingress_vcon( await add_vcon_to_set(key, timestamp) logger.debug(f"Indexing vCon {inbound_vcon.uuid}") - await index_vcon(inbound_vcon.uuid) + await index_vcon_parties(str(inbound_vcon.uuid), dict_vcon["parties"]) # Always add to the specified ingress list (required for this endpoint) vcon_uuid_str = str(inbound_vcon.uuid) @@ -1060,25 +1060,17 @@ async def get_dlq_vcons( raise HTTPException(status_code=500, detail="Failed to read DLQ") -async def index_vcon(uuid: UUID) -> None: - """Index a vCon for searching. +async def index_vcon_parties(vcon_uuid: str, parties: list) -> None: + """Index a vCon's parties for searching. - Adds the vCon to the sorted set and indexes it by party information - (tel, mailto, name) for searching. All indexed keys will expire after - VCON_INDEX_EXPIRY seconds. + Indexes by party information (tel, mailto, name). All indexed keys + will expire after VCON_INDEX_EXPIRY seconds. Args: - uuid: UUID of the vCon to index + vcon_uuid: UUID string of the vCon + parties: List of party dicts from the vCon """ - key = f"vcon:{uuid}" - vcon = await redis_async.json().get(key) - created_at = datetime.fromisoformat(vcon["created_at"]) - timestamp = int(created_at.timestamp()) - vcon_uuid = vcon["uuid"] - await add_vcon_to_set(key, timestamp) - - # Index by party information with expiration - for party in vcon["parties"]: + for party in parties: if party.get("tel"): tel_key = f"tel:{party['tel']}" await redis_async.sadd(tel_key, vcon_uuid) @@ -1093,6 +1085,25 @@ async def index_vcon(uuid: UUID) -> None: await redis_async.expire(name_key, VCON_INDEX_EXPIRY) +async def index_vcon(uuid: UUID) -> None: + """Index a vCon for searching (reads from Redis). + + Reads the vCon from Redis, adds it to the sorted set, and indexes + by party information. Used for bulk re-indexing. For the ingest path, + use index_vcon_parties() directly to avoid redundant Redis reads. + + Args: + uuid: UUID of the vCon to index + """ + key = f"vcon:{uuid}" + vcon = await redis_async.json().get(key) + created_at = datetime.fromisoformat(vcon["created_at"]) + timestamp = int(created_at.timestamp()) + vcon_uuid = vcon["uuid"] + await add_vcon_to_set(key, timestamp) + await index_vcon_parties(vcon_uuid, vcon["parties"]) + + @api_router.get( "/index_vcons", status_code=200, From 10082dad3d920f4291365ab6d02ca567b36c7a12 Mon Sep 17 00:00:00 2001 From: Thomas Howe Date: Wed, 18 Feb 2026 19:01:52 +0000 Subject: [PATCH 08/11] Move webhook from chain link to post-chain storage backend The supabase_webhook was running as a sequential chain link, blocking each worker for ~560ms per vCon. By moving it to a storage slot, the webhook now executes post-chain in parallel via ThreadPoolExecutor, reducing per-vCon P50 latency from 617ms to 123ms (5x improvement). New module server/storage/webhook/ wraps the existing HTTP POST logic with the storage save() interface. Co-Authored-By: Claude Opus 4.6 --- server/storage/webhook/__init__.py | 39 ++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) create mode 100644 server/storage/webhook/__init__.py diff --git a/server/storage/webhook/__init__.py b/server/storage/webhook/__init__.py new file mode 100644 index 0000000..e9bfbb5 --- /dev/null +++ b/server/storage/webhook/__init__.py @@ -0,0 +1,39 @@ +from server.lib.vcon_redis import VconRedis +from lib.logging_utils import init_logger + +import requests + +logger = init_logger(__name__) + +default_options = { + "webhook-urls": [], + "headers": {}, +} + + +def save(vcon_uuid, opts=default_options): + vcon_redis = VconRedis() + vCon = vcon_redis.get_vcon(vcon_uuid) + + json_dict = vCon.to_dict() + + if json_dict.get("vcon") == "0.0.1" or "vcon" not in json_dict: + json_dict["vcon"] = "0.3.0" + + headers = opts.get("headers", {}) + + webhook_urls = opts.get("webhook-urls", []) + if not webhook_urls: + logger.warning( + f"webhook storage: no webhook-urls configured for vcon {vcon_uuid}, skipping" + ) + return + + for url in webhook_urls: + logger.info( + f"webhook storage: posting vcon {vcon_uuid} to webhook url: {url}" + ) + resp = requests.post(url, json=json_dict, headers=headers) + logger.info( + f"webhook storage response for {vcon_uuid}: {resp.status_code} {resp.text}" + ) From c3ba79f37df13a55b2b1964659d3410af519c374 Mon Sep 17 00:00:00 2001 From: Thomas Howe Date: Wed, 18 Feb 2026 19:01:59 +0000 Subject: [PATCH 09/11] Add retry with port fallback and health-aware vfun selection MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The wtf_transcribe link had no retry logic — a single vfun failure silently dropped the transcription. This adds: - _VfunHealthTracker: thread-safe singleton tracking instance health across all workers, with 30-second self-healing recovery window - get_vfun_urls(): returns URLs in priority order (healthy shuffled, then recovering oldest-first, then down instances) - Fallback loop: tries all configured vfun instances before giving up - Redis transcription cache: skips vfun calls for previously transcribed audio files (7-day TTL) On failure, instances are marked DOWN and bypassed until the recovery window expires, then automatically retried and restored on success. Co-Authored-By: Claude Opus 4.6 --- server/links/wtf_transcribe/__init__.py | 402 +++++++++++++++++++----- 1 file changed, 324 insertions(+), 78 deletions(-) diff --git a/server/links/wtf_transcribe/__init__.py b/server/links/wtf_transcribe/__init__.py index 7c07407..2e85433 100644 --- a/server/links/wtf_transcribe/__init__.py +++ b/server/links/wtf_transcribe/__init__.py @@ -27,10 +27,14 @@ """ import base64 +import hashlib import json import logging import os +import random import tempfile +import time +import threading import requests from datetime import datetime, timezone from typing import Optional, Dict, Any, List @@ -38,18 +42,182 @@ from server.lib.vcon_redis import VconRedis from lib.logging_utils import init_logger from lib.error_tracking import init_error_tracker +from redis_mgr import redis init_error_tracker() logger = init_logger(__name__) + +# --------------------------------------------------------------------------- +# Health-aware vfun URL selector with self-healing +# --------------------------------------------------------------------------- +class _VfunHealthTracker: + """Track vfun instance health across all workers in this process. + + Instances are marked DOWN on connection/timeout/HTTP errors and + automatically re-checked after `recovery_seconds`. Selection prefers + healthy instances with random load balancing; when all are down the + least-recently-failed instance is tried first. + """ + + def __init__(self, recovery_seconds: float = 30.0): + self._lock = threading.Lock() + # url -> timestamp when it was marked down (0 = healthy) + self._down_since: Dict[str, float] = {} + self._recovery_seconds = recovery_seconds + + def _is_healthy(self, url: str, now: float) -> bool: + ts = self._down_since.get(url, 0) + if ts == 0: + return True + # Self-heal: allow retry after recovery window + return (now - ts) >= self._recovery_seconds + + def mark_down(self, url: str) -> None: + with self._lock: + if self._down_since.get(url, 0) == 0: + logger.warning("vfun instance marked DOWN: %s", url) + self._down_since[url] = time.monotonic() + + def mark_healthy(self, url: str) -> None: + with self._lock: + was_down = self._down_since.get(url, 0) != 0 + self._down_since[url] = 0 + if was_down: + logger.info("vfun instance recovered: %s", url) + + def get_ordered_urls(self, urls: List[str]) -> List[str]: + """Return URLs ordered: healthy (shuffled) first, then recovering + (oldest-failure first), then remaining down instances.""" + now = time.monotonic() + healthy = [] + recovering = [] + down = [] + with self._lock: + for url in urls: + ts = self._down_since.get(url, 0) + if ts == 0: + healthy.append(url) + elif (now - ts) >= self._recovery_seconds: + recovering.append((ts, url)) + else: + down.append((ts, url)) + random.shuffle(healthy) + recovering.sort() # oldest failure first (most likely recovered) + down.sort() + return healthy + [u for _, u in recovering] + [u for _, u in down] + + +# Module-level singleton shared across all workers in this process +_health_tracker = _VfunHealthTracker(recovery_seconds=30.0) + default_options = { "vfun-server-url": None, + "vfun-server-urls": None, # List of URLs for load balancing "diarize": True, "timeout": 300, "min-duration": 5, "api-key": None, + "cache-ttl": 604800, # 7 days in seconds } +# Redis cache key prefixes for transcription results +WTF_CACHE_PREFIX = "wtf_cache:" +TRANSCRIPTION_PREFIX = "transcription:" + + +def _get_filename_from_dialog(dialog: Dict[str, Any]) -> Optional[str]: + """Extract the audio filename from a dialog's URL.""" + url = dialog.get("url", "") + if url: + if url.startswith("file://"): + return os.path.basename(url[7:]) + else: + return os.path.basename(url.split("?")[0]) + return None + + +def get_cache_key(dialog: Dict[str, Any]) -> Optional[str]: + """Derive a cache key from the dialog's audio file URL or body hash.""" + filename = _get_filename_from_dialog(dialog) + if filename: + return f"{WTF_CACHE_PREFIX}{filename}" + # Fall back to hashing the body content + body = dialog.get("body") + if body: + body_hash = hashlib.sha256(body.encode() if isinstance(body, str) else body).hexdigest()[:32] + return f"{WTF_CACHE_PREFIX}hash:{body_hash}" + return None + + +def get_cached_transcription(cache_key: str, dialog: Dict[str, Any]) -> Optional[Dict[str, Any]]: + """Check Redis for a cached WTF transcription result. + + Checks two key patterns: + 1. wtf_cache:{filename} — full WTF body (stored by this link) + 2. transcription:{filename} — simple {text, language, duration} (pre-populated) + """ + # Check wtf_cache: first (full body, ready to use) + try: + cached = redis.get(cache_key) + if cached: + return json.loads(cached) + except Exception as e: + logger.debug(f"Cache lookup failed for {cache_key}: {e}") + + # Check transcription: prefix (simpler format, needs conversion) + filename = _get_filename_from_dialog(dialog) + if filename: + try: + cached = redis.get(f"{TRANSCRIPTION_PREFIX}{filename}") + if cached: + data = json.loads(cached) + # Convert simple format to WTF body + now = datetime.now(timezone.utc).isoformat() + duration = float(data.get("duration", dialog.get("duration", 30.0))) + return { + "transcript": { + "text": data.get("text", ""), + "language": data.get("language", "en-US"), + "duration": duration, + "confidence": 0.9, + }, + "segments": [], + "metadata": { + "created_at": now, + "processed_at": now, + "provider": "vfun", + "model": "parakeet-tdt-110m", + "source": "redis_cache", + }, + "quality": { + "average_confidence": 0.9, + "multiple_speakers": False, + "low_confidence_words": 0, + }, + } + except Exception as e: + logger.debug(f"Transcription cache lookup failed for {filename}: {e}") + + return None + + +def store_cached_transcription(cache_key: str, wtf_body: Dict[str, Any], ttl: int = 604800): + """Store a WTF transcription result in Redis cache.""" + try: + redis.setex(cache_key, ttl, json.dumps(wtf_body)) + except Exception as e: + logger.debug(f"Cache store failed for {cache_key}: {e}") + + +def get_vfun_urls(opts: Dict[str, Any]) -> List[str]: + """Get vfun server URLs ordered by health (healthy first, shuffled).""" + urls = opts.get("vfun-server-urls") + if urls and isinstance(urls, list) and len(urls) > 0: + return _health_tracker.get_ordered_urls(urls) + single = opts.get("vfun-server-url") + return [single] if single else [] + def has_wtf_transcription(vcon: Any, dialog_index: int) -> bool: """Check if a dialog already has a WTF transcription.""" @@ -113,29 +281,40 @@ def create_wtf_analysis( now = datetime.now(timezone.utc).isoformat() # Extract text and segments from vfun response - # vfun returns: analysis[].body with transcription data - analysis_entries = vfun_response.get("analysis", []) + # vfun can return either: + # 1. Direct response: {"type": "wtf_transcription", "body": {...}} + # 2. Wrapped in analysis: {"analysis": [{"type": "wtf_transcription", "body": {...}}]} full_text = "" segments = [] language = "en-US" - for entry in analysis_entries: - if entry.get("type") in ("transcription", "wtf_transcription"): - body = entry.get("body", {}) - - # Handle different response formats - if isinstance(body, dict): - # WTF format from vfun - transcript = body.get("transcript", {}) - full_text = transcript.get("text", body.get("text", "")) - language = transcript.get("language", body.get("language", "en-US")) - segments = body.get("segments", []) - elif isinstance(body, str): - full_text = body - break - - # If no analysis found, check for direct text field + # Check for direct response format first (vfun native format) + if vfun_response.get("type") in ("transcription", "wtf_transcription"): + body = vfun_response.get("body", {}) + if isinstance(body, dict): + transcript = body.get("transcript", {}) + full_text = transcript.get("text", body.get("text", "")) + language = transcript.get("language", body.get("language", "en-US")) + segments = body.get("segments", []) + elif isinstance(body, str): + full_text = body + else: + # Try wrapped analysis format + analysis_entries = vfun_response.get("analysis", []) + for entry in analysis_entries: + if entry.get("type") in ("transcription", "wtf_transcription"): + body = entry.get("body", {}) + if isinstance(body, dict): + transcript = body.get("transcript", {}) + full_text = transcript.get("text", body.get("text", "")) + language = transcript.get("language", body.get("language", "en-US")) + segments = body.get("segments", []) + elif isinstance(body, str): + full_text = body + break + + # If no text found, check for direct text field if not full_text: full_text = vfun_response.get("text", "") segments = vfun_response.get("segments", []) @@ -230,9 +409,9 @@ def run( logger.info(f"Starting wtf_transcribe link for vCon: {vcon_uuid}") - vfun_server_url = opts.get("vfun-server-url") - if not vfun_server_url: - logger.error("wtf_transcribe: vfun-server-url is required") + # Check if any vfun URL is configured + if not opts.get("vfun-server-url") and not opts.get("vfun-server-urls"): + logger.error("wtf_transcribe: vfun-server-url or vfun-server-urls is required") return vcon_uuid vcon_redis = VconRedis() @@ -245,6 +424,9 @@ def run( # Find dialogs to transcribe dialogs_processed = 0 dialogs_skipped = 0 + cache_hits = 0 + cache_misses = 0 + cache_ttl = opts.get("cache-ttl", 604800) for i, dialog in enumerate(vcon.dialog): if not should_transcribe_dialog(dialog, opts.get("min-duration", 5)): @@ -257,6 +439,44 @@ def run( dialogs_skipped += 1 continue + # Check Redis cache before calling vfun + cache_key = get_cache_key(dialog) + cached_body = get_cached_transcription(cache_key, dialog) if cache_key else None + + if cached_body: + # Cache hit - use cached transcription + cache_hits += 1 + logger.info(f"Cache HIT for dialog {i} (key={cache_key})") + + wtf_analysis = { + "type": "wtf_transcription", + "dialog": i, + "mediatype": "application/json", + "vendor": "vfun", + "product": "parakeet-tdt-110m", + "schema": "wtf-1.0", + "body": cached_body, + } + + vcon.add_analysis( + type=wtf_analysis["type"], + dialog=wtf_analysis["dialog"], + vendor=wtf_analysis.get("vendor"), + body=wtf_analysis["body"], + extra={ + "mediatype": wtf_analysis.get("mediatype"), + "product": wtf_analysis.get("product"), + "schema": wtf_analysis.get("schema"), + }, + ) + + dialogs_processed += 1 + logger.info(f"Added cached WTF transcription for dialog {i}") + continue + + # Cache miss - need to call vfun + cache_misses += 1 + # Get audio content audio_content = get_audio_content(dialog) if not audio_content: @@ -264,77 +484,103 @@ def run( dialogs_skipped += 1 continue - logger.info(f"Transcribing dialog {i} for vCon {vcon_uuid}") + logger.info(f"Cache MISS for dialog {i} - calling vfun (key={cache_key})") - try: - # Build request to vfun server - headers = {} - api_key = opts.get("api-key") - if api_key: - headers["Authorization"] = f"Bearer {api_key}" - - # Get filename from dialog or generate one - filename = dialog.get("filename", f"audio_{i}.wav") - mimetype = dialog.get("mimetype", "audio/wav") - - # Send audio to vfun server - files = {"file": (filename, audio_content, mimetype)} - data = { - "diarize": str(opts.get("diarize", True)), - "block": "true", - } + # Try each vfun instance in health-priority order until one succeeds + vfun_urls = get_vfun_urls(opts) + if not vfun_urls: + logger.error("wtf_transcribe: no vfun URLs available") + dialogs_skipped += 1 + continue - response = requests.post( - vfun_server_url, - files=files, - data=data, - headers=headers, - timeout=opts.get("timeout", 300), - ) + mimetype = dialog.get("mimetype", "audio/wav") + headers = {} + api_key = opts.get("api-key") + if api_key: + headers["Authorization"] = f"Bearer {api_key}" + timeout = opts.get("timeout", 300) - if response.status_code in (200, 302): - vfun_response = response.json() - # Handle double-encoded JSON (vfun sometimes returns JSON string) - if isinstance(vfun_response, str): - vfun_response = json.loads(vfun_response) - - duration = dialog.get("duration", 30.0) - wtf_analysis = create_wtf_analysis(i, vfun_response, float(duration)) - - # Add analysis to vCon - vcon.add_analysis( - type=wtf_analysis["type"], - dialog=wtf_analysis["dialog"], - vendor=wtf_analysis.get("vendor"), - body=wtf_analysis["body"], - extra={ - "mediatype": wtf_analysis.get("mediatype"), - "product": wtf_analysis.get("product"), - "schema": wtf_analysis.get("schema"), - }, + transcribed = False + for attempt, vfun_server_url in enumerate(vfun_urls): + try: + files = { + "file-binary": ("audio", audio_content, mimetype) + } + response = requests.post( + vfun_server_url, + files=files, + headers=headers, + timeout=timeout, ) - dialogs_processed += 1 - logger.info(f"Added WTF transcription for dialog {i}") - - else: + if response.status_code in (200, 302): + _health_tracker.mark_healthy(vfun_server_url) + vfun_response = response.json() + if isinstance(vfun_response, str): + vfun_response = json.loads(vfun_response) + + duration = dialog.get("duration", 30.0) + wtf_analysis = create_wtf_analysis(i, vfun_response, float(duration)) + + if cache_key: + store_cached_transcription(cache_key, wtf_analysis["body"], cache_ttl) + logger.info(f"Cached transcription for dialog {i} (key={cache_key})") + + vcon.add_analysis( + type=wtf_analysis["type"], + dialog=wtf_analysis["dialog"], + vendor=wtf_analysis.get("vendor"), + body=wtf_analysis["body"], + extra={ + "mediatype": wtf_analysis.get("mediatype"), + "product": wtf_analysis.get("product"), + "schema": wtf_analysis.get("schema"), + }, + ) + + dialogs_processed += 1 + if attempt > 0: + logger.info(f"Added WTF transcription for dialog {i} (succeeded on attempt {attempt + 1})") + else: + logger.info(f"Added WTF transcription for dialog {i}") + transcribed = True + break # success — stop trying other URLs + + else: + _health_tracker.mark_down(vfun_server_url) + logger.warning( + f"vfun {vfun_server_url} returned {response.status_code} for dialog {i}, " + f"trying next instance ({attempt + 1}/{len(vfun_urls)})" + ) + + except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e: + _health_tracker.mark_down(vfun_server_url) + logger.warning( + f"vfun {vfun_server_url} unreachable for dialog {i}: {type(e).__name__}, " + f"trying next instance ({attempt + 1}/{len(vfun_urls)})" + ) + except Exception as e: + _health_tracker.mark_down(vfun_server_url) logger.error( - f"vfun transcription failed for dialog {i}: " - f"status={response.status_code}, response={response.text[:200]}" + f"Unexpected error from vfun {vfun_server_url} for dialog {i}: {e}", + exc_info=True, ) - except requests.exceptions.Timeout: - logger.error(f"vfun transcription timed out for dialog {i}") - except Exception as e: - logger.error(f"Error transcribing dialog {i}: {e}", exc_info=True) + if not transcribed: + logger.error( + f"All {len(vfun_urls)} vfun instances failed for dialog {i} of vCon {vcon_uuid}" + ) if dialogs_processed > 0: vcon_redis.store_vcon(vcon) logger.info( f"Updated vCon {vcon_uuid}: processed={dialogs_processed}, " - f"skipped={dialogs_skipped}" + f"skipped={dialogs_skipped}, cache_hits={cache_hits}, cache_misses={cache_misses}" ) else: - logger.info(f"No dialogs transcribed for vCon {vcon_uuid}") + logger.info( + f"No dialogs transcribed for vCon {vcon_uuid} " + f"(cache_hits={cache_hits}, cache_misses={cache_misses})" + ) return vcon_uuid From 48efc729576eccf1c6ecb618150bdbbd5f4ece89 Mon Sep 17 00:00:00 2001 From: Thomas Howe Date: Wed, 18 Feb 2026 19:02:04 +0000 Subject: [PATCH 10/11] Add SignOz observability config, docs, and utility scripts - SignOz OTEL collector config and docker-compose integration - Performance testing and vfun crash/stress test reports - Utility scripts for NAS pipeline operations and debugging Co-Authored-By: Claude Opus 4.6 --- .env.example | 9 + docker-compose.signoz.yml | 38 ++ docs/PERFORMANCE_TESTING.md | 215 ++++++++ docs/VFUN_CRASH_REPORT.md | 269 +++++++++ docs/VFUN_STRESS_TEST_REPORT.md | 243 +++++++++ scripts/find_bad_file.py | 66 +++ scripts/nas_stress_test.py | 155 ++++++ scripts/nas_transcription_pipeline.py | 753 ++++++++++++++++++++++++++ scripts/run_pipeline_with_restart.sh | 82 +++ signoz/README.md | 26 +- signoz/dashboards/.gitkeep | 2 + signoz/otel-collector-config.yaml | 6 +- 12 files changed, 1860 insertions(+), 4 deletions(-) create mode 100644 docs/PERFORMANCE_TESTING.md create mode 100644 docs/VFUN_CRASH_REPORT.md create mode 100644 docs/VFUN_STRESS_TEST_REPORT.md create mode 100644 scripts/find_bad_file.py create mode 100644 scripts/nas_stress_test.py create mode 100644 scripts/nas_transcription_pipeline.py create mode 100755 scripts/run_pipeline_with_restart.sh create mode 100644 signoz/dashboards/.gitkeep diff --git a/.env.example b/.env.example index 99c8152..67cc4e9 100644 --- a/.env.example +++ b/.env.example @@ -12,3 +12,12 @@ LOGGING_CONFIG_FILE=server/logging_dev.conf # Groq API key for Whisper transcription GROQ_API_KEY=your_groq_api_key_here + +# OpenTelemetry (used when running with docker-compose.signoz.yml) +# conserver and api send traces/metrics to signoz-otel-collector when the SignOz stack is enabled +# OTEL_EXPORTER_OTLP_ENDPOINT=http://signoz-otel-collector:4318 +# OTEL_EXPORTER_OTLP_PROTOCOL=http/protobuf +# OTEL_TRACES_EXPORTER=otlp +# OTEL_METRICS_EXPORTER=otlp +# OTEL_LOGS_EXPORTER=otlp +# OTEL_SERVICE_NAME=conserver diff --git a/docker-compose.signoz.yml b/docker-compose.signoz.yml index d57fb8f..ff5c6c4 100644 --- a/docker-compose.signoz.yml +++ b/docker-compose.signoz.yml @@ -118,3 +118,41 @@ services: timeout: 10s retries: 3 restart: unless-stopped + + # Override conserver and api to send traces/metrics to SignOz (OTLP HTTP) + conserver: + command: "opentelemetry-instrument python ./server/main.py" + environment: + OTEL_EXPORTER_OTLP_ENDPOINT: http://signoz-otel-collector:4318 + OTEL_EXPORTER_OTLP_PROTOCOL: http/protobuf + OTEL_TRACES_EXPORTER: otlp + OTEL_METRICS_EXPORTER: otlp + OTEL_LOGS_EXPORTER: otlp + OTEL_SERVICE_NAME: conserver + depends_on: + signoz-otel-collector: + condition: service_healthy + + api: + command: /bin/bash -c "opentelemetry-instrument uvicorn server.api:app --host 0.0.0.0 --port 8000" + environment: + OTEL_EXPORTER_OTLP_ENDPOINT: http://signoz-otel-collector:4318 + OTEL_EXPORTER_OTLP_PROTOCOL: http/protobuf + OTEL_TRACES_EXPORTER: otlp + OTEL_METRICS_EXPORTER: otlp + OTEL_LOGS_EXPORTER: otlp + OTEL_SERVICE_NAME: conserver.api + depends_on: + signoz-otel-collector: + condition: service_healthy + + logspout-signoz: + image: pavanputhra/logspout-signoz + container_name: logspout-signoz + restart: unless-stopped + volumes: + - /var/run/docker.sock:/var/run/docker.sock + environment: + SIGNOZ_LOG_ENDPOINT: http://172.17.0.1:8082 + ENV: prod + command: signoz://172.17.0.1:8082 \ No newline at end of file diff --git a/docs/PERFORMANCE_TESTING.md b/docs/PERFORMANCE_TESTING.md new file mode 100644 index 0000000..0802cb5 --- /dev/null +++ b/docs/PERFORMANCE_TESTING.md @@ -0,0 +1,215 @@ +# Performance Testing Notes + +**Last Updated:** 2026-02-02 + +## Test Environment + +### Servers +- **Conserver (vcon-server)**: http://localhost:8080 (token: `mulliganmccarthy`) +- **vfun (transcription)**: http://localhost:4380/wtf + +### NAS Storage + +**Mount Point:** `/mnt/nas` +``` +64.187.219.131:/mnt/slave_recording → /mnt/nas (NFS4) +- rsize/wsize: 1MB +- Protocol: TCP +- Hard mount with 600s timeout +``` + +**Directory Structure:** +``` +/mnt/nas/ +├── Freeswitch1/ # 20 Freeswitch servers (1-20) +│ ├── 2026-01-19/ # Date directories (15+ days available) +│ │ ├── 06/ # Hour directories (00-23) +│ │ │ └── *.wav # Recording files (~489k per day) +│ │ ├── 07/ +│ │ └── ... +│ ├── 2026-01-20/ +│ └── ... +├── Freeswitch2/ +├── ... +├── Freeswitch20/ +├── Batch1_recording/ +├── pcaps_*/ # Packet captures +└── fs_collect_by_number.sh # Collection utility +``` + +**File Naming Pattern:** +``` +{campaign}_{caller}_{callid}_{date}_{time}.wav +Example: 10508_12026661845_993317168030975_2026-01-19_06:47:08.wav + +Fields: +- campaign: Campaign/extension ID (e.g., 10508, 6075, 9676) +- caller: Phone number (e.g., 12026661845) +- callid: Unique call ID (e.g., 993317168030975) +- date: YYYY-MM-DD +- time: HH:MM:SS +``` + +**Scale:** +- ~489,000 recordings per day per Freeswitch server +- ~9.78 million recordings/day across all 20 servers +- ~938 KB average file size (~60 seconds @ 8kHz 16-bit) +- ~9 TB/day of new recordings +- 15+ days of historical data +- Access requires `nasgroup` membership + +--- + +## Performance Results (2026-02-02) + +### Conserver API +| Metric | Value | +|--------|-------| +| Throughput | 151.68 req/s | +| Avg Latency | 57.22 ms | +| Success Rate | 100% | + +### vfun Transcription (Local Files) +| Metric | Value | +|--------|-------| +| Throughput | 32.72 files/sec | +| Data Rate | 30.36 MB/sec | +| Peak GPU Utilization | 95% | + +### vfun Transcription (NAS Files) +| Files | Workers | Throughput | Data Rate | Parallelism | +|-------|---------|------------|-----------|-------------| +| 100 | 32 | 48.40 files/sec | 34.08 MB/s | 25.9x | +| 200 | 64 | 45.60 files/sec | 30.92 MB/s | 47.9x | +| 500 | 64 | 43.63 files/sec | 30.85 MB/s | 59.4x | + +### Full Pipeline (NAS → vfun → vCon → Conserver → vcon-mcp) +| Files | Workers | Throughput | vCons Stored | Success | +|-------|---------|------------|--------------|---------| +| 50 | 16 | 2,447 files/min | 35 | 100% | +| 500 | 48 | 2,576 files/min | 362 | 100% | +| 1,000 | 64 | **2,973 files/min** | 703 | 100% | + +**Full Pipeline Capacity (single vfun instance):** +- ~3,000 files/min = **~4.3 million files/day** +- vCon creation adds minimal overhead (~1ms per vCon) +- Conserver chain processing: ~10ms per vCon +- Webhook to vcon-mcp (Supabase): ~100-200ms per vCon + +**Key Findings:** +- NAS network storage does not bottleneck transcription +- GPU batching works efficiently (59.4x parallelism vs 64x max) +- Sustained ~44-48 files/sec with high concurrency +- 100% success rate across 1,500+ files +- Full pipeline maintains ~48 files/sec throughput + +### vfun Batching Configuration +``` +GPU_MAX_BATCH_SIZE = 64 +GPU_COALESCE_TIMEOUT_US = 5000 (5ms) +GPU_COALESCE_MIN_FILL = 16 +``` + +--- + +## Test Scripts + +Located in `scripts/`: +- `nas_transcription_pipeline.py` - **Production pipeline** with vCon creation and storage +- `nas_stress_test.py` - High-concurrency vfun stress test with NAS files + +### Running Tests + +```bash +# Check servers +curl -s http://localhost:8080/docs | head -5 +curl -s http://localhost:4380/ready + +# Start vfun if needed +cd ~/strolid/vfun && ./vfun --port 4380 + +# Run vfun-only stress test +python3 scripts/nas_stress_test.py 200 64 + +# Run full pipeline (transcription + vCon storage) +python3 scripts/nas_transcription_pipeline.py --date 2026-01-19 --hour 06 --limit 500 --workers 48 --store-vcons + +# Dry run to see file counts +python3 scripts/nas_transcription_pipeline.py --date 2026-01-19 --dry-run +``` + +### Pipeline Chain Configuration +``` +main_chain: ingress:default → tag → supabase_webhook → egress:processed +transcription: ingress:transcribe → tag → wtf_transcribe → keyword_tagger → supabase_webhook → egress:transcribed +``` + +--- + +## vfun Stability Issues (CUDA Crashes) + +### Root Cause Analysis (2026-02-02) + +**Problem:** vfun crashes intermittently after processing hundreds of files under sustained load. + +**Investigation findings:** +1. **NOT the NAS** - Files read correctly, NAS performance is stable +2. **NOT memory leaks** - GPU memory stable at ~12.6GB throughout processing +3. **NOT single file issues** - Crash-causing files process fine individually +4. **IS a CUDA batching issue** - Specific batch combinations trigger cuBLAS failures + +**Error signature:** +``` +RuntimeError: CUDA error: CUBLAS_STATUS_EXECUTION_FAILED when calling cublasLtMatmul +with transpose_mat1 1 transpose_mat2 0 m 1024 n 251 k 1024 +``` + +**What happens:** +1. Under high concurrency, vfun batches audio files for GPU processing +2. Certain combinations of audio lengths create tensor dimensions that trigger cuBLAS matrix multiplication failures +3. The CUDA error corrupts GPU state, leaving vfun hung (process exists but unresponsive to `/ready` endpoint) +4. GPU memory shows 0 MiB used after crash (resources released but process not terminated) + +**Affected dimensions:** The `n=251` parameter in the error suggests certain audio sequence lengths cause problematic matrix sizes during the transformer decoder forward pass. + +### Workarounds for Production + +**1. Auto-restart script:** +```bash +#!/bin/bash +# Run pipeline with automatic vfun restart on crash +restart_vfun() { + pkill -9 -f "vfun --port 4380" + sleep 2 + cd ~/strolid/vfun && ./vfun --port 4380 > /tmp/vfun.log 2>&1 & + sleep 10 +} + +# Check health every 30 seconds, restart if hung +while true; do + if ! curl -s --max-time 5 http://localhost:4380/ready > /dev/null 2>&1; then + echo "$(date) - vfun crash detected, restarting..." + restart_vfun + fi + sleep 30 +done +``` + +**2. Reduce concurrency** (may reduce throughput but fewer crashes): +- Try 32-48 workers instead of 64 +- Smaller batches reduce likelihood of problematic tensor dimensions + +**3. Batch processing with checkpoints:** +- Process in batches of 2000-3000 files +- Restart vfun between batches preventively +- Track progress in checkpoint files + +### Investigation Scripts + +Located in `scripts/`: +- `find_bad_file.py` - Tests files sequentially to identify crash point +- `run_pipeline_with_restart.sh` - Pipeline with auto-restart capability + +### Logs to Check +- `/tmp/vfun.log` or `/tmp/vfun_test.log` - vfun stdout/stderr including CUDA errors +- Pipeline logs show last successful file before crash diff --git a/docs/VFUN_CRASH_REPORT.md b/docs/VFUN_CRASH_REPORT.md new file mode 100644 index 0000000..1a7dcb4 --- /dev/null +++ b/docs/VFUN_CRASH_REPORT.md @@ -0,0 +1,269 @@ +# vfun CUDA Crash Report + +**Date:** 2026-02-02 +**Environment:** Ubuntu Linux, NVIDIA H100 NVL (95GB VRAM) +**vfun version:** Built from ~/strolid/vfun +**Model:** parakeet-tdt-110m (NeMo ASR) + +--- + +## Executive Summary + +vfun crashes intermittently after processing hundreds of audio files under sustained concurrent load. The crash is caused by a **cuBLAS matrix multiplication failure** (`CUBLAS_STATUS_EXECUTION_FAILED`) that occurs when certain batch combinations create problematic tensor dimensions during the transformer decoder's forward pass. + +**Key finding:** Individual files process successfully. The crash only occurs under batched/concurrent workloads when specific tensor dimension combinations are formed. + +--- + +## Symptom + +When processing large volumes of audio files (500+) with high concurrency (64 workers), vfun: + +1. Processes successfully for a period (typically 100-500+ files) +2. Suddenly stops responding to requests +3. Process remains alive but `/ready` endpoint times out +4. GPU memory drops to 0 MiB (resources released) +5. Subsequent requests fail silently + +The service does not exit or restart automatically - it enters a hung state. + +--- + +## Reproduction Steps + +```bash +# Start vfun +cd ~/strolid/vfun && ./vfun --port 4380 > /tmp/vfun.log 2>&1 & + +# Wait for ready +sleep 10 && curl http://localhost:4380/ready + +# Send concurrent requests (64 parallel workers) +ls /path/to/wav/files/*.wav | head -500 | \ + xargs -P 64 -I {} curl -s -X POST \ + -F "file-binary=@{};type=audio/wav" \ + http://localhost:4380/wtf + +# vfun will crash after processing 100-500 files +``` + +**Audio file characteristics:** +- Format: WAV, 8kHz, 16-bit, mono (standard telephony) +- Duration: ~60 seconds each +- Size: ~960KB each +- Source: Freeswitch call recordings + +--- + +## Error Message + +From `/tmp/vfun.log` after crash: + +``` +RuntimeError: CUDA error: CUBLAS_STATUS_EXECUTION_FAILED when calling cublasLtMatmul +with transpose_mat1 1 transpose_mat2 0 m 1024 n 251 k 1024 mat1_ld 1024 mat2_ld 1024 +result_ld 1024 abcType 0 computeType 68 scaleType 0 +``` + +--- + +## Full Stack Trace + +``` +File "code/__torch__/nemo/collections/asr/modules/transformer/transformer_decoders/___torch_mangle_1124.py", line 27, in forward + _2 = (first_sub_layer).forward(_0, _1, argument_2, ) + input = torch.add_(_2, argument_1) + _3 = (second_sub_layer).forward((layer_norm_2).forward(input, ), encoder_embeddings, argument_4, ) + ~~~~~~~~~~~~~~~~~~~~~~~~~ <--- HERE + +File "code/__torch__/nemo/collections/asr/modules/transformer/transformer_modules/___torch_mangle_1118.py", line 23, in forward + query_net = self.query_net + _0 = (query_net).forward(argument_1, ) + _1 = (key_net).forward(encoder_embeddings, ) + ~~~~~~~~~~~~~~~~ <--- HERE + +File "code/__torch__/torch/nn/modules/linear/___torch_mangle_1113.py", line 12, in forward + bias = self.bias + weight = self.weight + x = torch.linear(encoder_embeddings, weight, bias) + ~~~~~~~~~~~~ <--- HERE + +Traceback of TorchScript, original code (most recent call last): +/home/dev/vfun/.venv/lib/python3.12/site-packages/torch/nn/modules/linear.py(134): forward +/home/dev/vfun/.venv/lib/python3.12/site-packages/nemo/collections/asr/modules/transformer/transformer_modules.py(174): forward +/home/dev/vfun/.venv/lib/python3.12/site-packages/nemo/collections/asr/modules/transformer/transformer_decoders.py(98): forward_preln +/home/dev/vfun/.venv/lib/python3.12/site-packages/nemo/collections/asr/modules/transformer/transformer_decoders.py(158): forward +/home/dev/vfun/.venv/lib/python3.12/site-packages/nemo/collections/asr/modules/transformer/transformer_decoders.py(255): forward +/home/dev/vfun/export-scripts/canary-to-torchscript.py(62): forward +``` + +--- + +## Analysis + +### What's Happening + +1. **Batching behavior:** vfun batches concurrent requests for GPU efficiency using: + ``` + GPU_MAX_BATCH_SIZE = 64 + GPU_COALESCE_TIMEOUT_US = 5000 (5ms) + GPU_COALESCE_MIN_FILL = 16 + ``` + +2. **Tensor dimension issue:** When batching audio of varying lengths, the resulting `encoder_embeddings` tensor has shape `[batch, seq_len, 1024]`. The error shows `n=251` which suggests a sequence length dimension. + +3. **cuBLAS failure:** The `cublasLtMatmul` call fails when the resulting matrix dimensions hit certain values. This may be related to: + - Memory alignment issues at specific sizes + - cuBLAS kernel selection for unusual dimensions + - Tensor core compatibility with non-standard shapes + +4. **No recovery:** After the CUDA error, the GPU context is corrupted. vfun doesn't catch this exception at the HTTP handler level, so it hangs indefinitely. + +### Why Single Files Work + +When processing files individually: +- Batch size is always 1 +- Sequence lengths are consistent per-request +- No cross-request tensor dimension combinations occur + +### Suspected Root Causes + +1. **Batch padding edge case:** When batching audio of different durations, padding may create tensor dimensions that cuBLAS handles poorly. + +2. **Missing dimension validation:** The model may not validate that input dimensions are compatible with cuBLAS kernel requirements before calling `torch.linear()`. + +3. **CUDA error handling:** The exception isn't caught and handled gracefully - the service should restart or reset the CUDA context. + +--- + +## Observations + +| Test | Result | +|------|--------| +| Single file processing | Always succeeds | +| Sequential processing (1 worker) | Succeeds for 1000+ files | +| Parallel processing (64 workers) | Crashes after 100-500 files | +| Same "crash file" sent individually | Succeeds | +| Reduced concurrency (8 workers) | Still crashes, just takes longer | + +**GPU Memory:** Stable at ~12.6GB during processing until crash. No memory leak observed. + +**File Characteristics:** Crash files have no distinguishing features - same format, similar duration, similar content to files that succeed. + +--- + +## Suggested Fixes + +### 1. Exception Handling (Quick Fix) + +Wrap the inference call in try/except and reset CUDA context on failure: + +```python +try: + result = model.forward(batch) +except RuntimeError as e: + if "CUDA" in str(e) or "cuBLAS" in str(e): + torch.cuda.empty_cache() + torch.cuda.synchronize() + # Return error response instead of hanging + return {"error": "CUDA error, please retry"} + raise +``` + +### 2. Dimension Validation (Preventive) + +Before batching, validate that resulting tensor dimensions are safe: + +```python +def is_safe_batch(sequences): + """Check if batch dimensions are compatible with cuBLAS""" + max_len = max(len(s) for s in sequences) + # Avoid problematic dimensions (may need empirical tuning) + if max_len % 8 != 0: # Ensure alignment + max_len = ((max_len + 7) // 8) * 8 + return max_len +``` + +### 3. Graceful Degradation + +If a batch fails, retry with smaller batch size or process files individually: + +```python +def process_with_fallback(files, batch_size=64): + try: + return process_batch(files, batch_size) + except RuntimeError: + if batch_size > 1: + # Retry with smaller batches + return process_with_fallback(files, batch_size // 2) + else: + # Process individually as last resort + return [process_single(f) for f in files] +``` + +### 4. Health Check Watchdog + +Add internal watchdog that restarts the service if inference hangs: + +```python +import signal + +def timeout_handler(signum, frame): + logger.error("Inference timeout - restarting") + torch.cuda.empty_cache() + os._exit(1) # Force restart + +signal.signal(signal.SIGALRM, timeout_handler) +signal.alarm(30) # 30 second timeout per batch +``` + +--- + +## Current Workaround + +We're using an external monitoring script that: + +1. Checks `/ready` endpoint every 30 seconds +2. If unresponsive, kills vfun process (`pkill -9`) +3. Restarts vfun +4. Continues processing from checkpoint + +This achieves ~3000 files/minute with occasional 10-second restart pauses. + +--- + +## Environment Details + +``` +GPU: NVIDIA H100 NVL +VRAM: 95,320 MiB total +CUDA: (check with nvidia-smi) +PyTorch: 2.x (from vfun venv) +NeMo: (from vfun venv) + +vfun config: + GPU_MAX_BATCH_SIZE = 64 + GPU_COALESCE_TIMEOUT_US = 5000 + GPU_COALESCE_MIN_FILL = 16 +``` + +--- + +## Files for Testing + +Sample files that trigger the crash (when processed concurrently with others): + +``` +/mnt/nas/Freeswitch1/2026-01-19/11/10508_12706498965_993318019641306_2026-01-19_11:14:08.wav +/mnt/nas/Freeswitch1/2026-01-19/11/10508_17019013723_993313073314983_2026-01-19_11:11:14.wav +``` + +Note: These files process successfully individually. The crash occurs when they're part of a batch with other files. + +--- + +## Contact + +Report prepared by: Claude Code (assisted investigation) +System operator: Thomas +Date: 2026-02-02 diff --git a/docs/VFUN_STRESS_TEST_REPORT.md b/docs/VFUN_STRESS_TEST_REPORT.md new file mode 100644 index 0000000..af2fcf7 --- /dev/null +++ b/docs/VFUN_STRESS_TEST_REPORT.md @@ -0,0 +1,243 @@ +# vfun Stress Test Report + +**Date:** 2026-02-04 (Updated) +**Tester:** Thomas (with Claude Code) +**vfun Version:** Commit 0431d2d "hackery to avoid crash" +**Environment:** Ubuntu Linux, NVIDIA H100 NVL (95GB VRAM) + +--- + +## Executive Summary + +### Update (2026-02-04): Significant Improvement After Fix + +Commit `0431d2d` ("hackery to avoid crash") dramatically improved stability: + +| Test | Before Fix (97a9d1c) | After Fix (0431d2d) | +|------|---------------------|---------------------| +| 64 workers / 1,000 files | 30.6% (crashed ~306) | **100%** | +| 64 workers / 5,000 files | N/A | **100%** | +| 64 workers / 43,940 files | N/A | **100%** | +| 64 workers / 406,037 files | N/A | 22.3% (crashed ~90k) | + +**Key finding:** The fix extended crash threshold from ~300-7000 files to ~90,000 files - a 10-100x improvement. However, crashes still occur on very long runs (400k+ files). + +**Recommendation:** Use auto-restart wrapper for production runs exceeding 50k files. + +--- + +## Post-Fix Test Results (2026-02-04) + +### Test 1: 64 workers, 1,000 files +``` +Processed: 1,000 files +Successful: 1,000 (100.0%) +Failed: 0 +Throughput: 992 files/min +``` + +### Test 2: 64 workers, 5,000 files +``` +Processed: 5,000 files +Successful: 5,000 (100.0%) +Failed: 0 +Throughput: 2,711 files/min +``` + +### Test 3: 64 workers, 43,940 files (1 hour of data) +``` +Processed: 43,940 files +Successful: 43,940 (100.0%) +Failed: 0 +Elapsed: 1480.6s (~25 min) +Throughput: 1,781 files/min +``` + +### Test 4: 64 workers, 406,037 files (full day) +``` +Processed: 406,037 files +Successful: 90,629 (22.3%) +Failed: 315,408 +Elapsed: 30632.1s (~8.5 hours) +``` +**Note:** vfun crashed after ~90k files. Remaining failures are connection refused errors after crash. + +### Conclusion + +The fix (`0431d2d`) significantly improved stability: +- **Before:** Crashed after 300-7,000 files depending on concurrency +- **After:** Stable for 44k files, crashes around 90k files + +For production workloads >50k files, use `scripts/run_pipeline_with_restart.sh` which automatically restarts vfun on crash. + +--- + +## Original Report (2026-02-03) + +vfun crashes after processing a cumulative number of files regardless of worker/concurrency settings. The crash point scales inversely with concurrency - more workers = faster crash. This indicates **accumulated memory corruption** rather than an immediate concurrency issue. + +The original fix (commit 97a9d1c - non-blocking CUDA streams and pinned memory) did not resolve the underlying issue. + +--- + +## Test Results + +| Workers | Files Tested | Success Rate | Files Before Crash | Crash Type | +|---------|-------------|--------------|-------------------|------------| +| 64 | 1,000 | 30.6% | ~306 | "corrupted double-linked list" | +| 32 | 1,000 | 13.0% | ~130 | Connection refused (process died) | +| 24 | 1,000 | 100% | - | No crash | +| 24 | 5,000 | 62.7% | ~3,134 | Connection refused | +| 16 | 1,000 | 100% | - | No crash | +| 16 | 5,000 | 100% | - | No crash | +| 16 | 43,940 | ~45% | ~6,923 | Connection refused | + +### Key Observations + +1. **Short tests pass**: 1000-file tests with 16-24 workers complete successfully +2. **Longer tests crash**: Extended runs eventually crash regardless of concurrency +3. **Crash threshold scales with concurrency**: + - 64 workers: crashes after ~300 files + - 16 workers: crashes after ~7000 files + - Suggests ~300 * workers ≈ crash threshold (rough approximation) + +4. **Performance when stable**: + - 16 workers: 920 files/min, 1.0s avg latency + - 24 workers: 179 files/min, 7.4s avg latency (slower due to saturation?) + +--- + +## Crash Analysis + +### Error Messages Observed + +1. **"corrupted double-linked list"** (glibc heap corruption) + - Observed with 64 workers + - Indicates memory corruption in heap management + +2. **Silent death** (no error logged) + - Process dies, port becomes unavailable + - `/ready` endpoint stops responding + - No CUDA error or stack trace in logs + +3. **Previous crash type** (from earlier testing): + - `CUBLAS_STATUS_EXECUTION_FAILED` during `cublasLtMatmul` + - Occurred in transformer decoder's forward pass + +### Root Cause Hypothesis + +The crash is NOT caused by: +- Immediate concurrency issues (16 workers is stable for 5000 files) +- Single problematic files (same files work individually) +- GPU memory exhaustion (VRAM stable at ~12.6GB) + +The crash IS likely caused by: +- **Accumulated memory corruption** that builds up over many requests +- Possibly related to batch assembly/disassembly +- May be in queue management, tensor allocation, or FFmpeg decoding +- The corruption eventually corrupts glibc's heap metadata, causing "corrupted double-linked list" + +--- + +## Recent Fixes Applied (Not Sufficient) + +### Commit 97a9d1c (2026-02-03 18:03) +``` +fix: use non-blocking streams and pinned memory for lens transfer + +- Convert all cudaStreamCreate calls to cudaStreamCreateWithFlags(..., cudaStreamNonBlocking) +- Add pinned host memory and device memory for batch lengths +- Replace synchronous lens tensor creation with async H2D transfer +``` + +### Commit f924c67 (2026-02-03 17:38) +``` +refactor: replace all malloc/realloc/calloc with _or_die variants + +- All allocations now fail-fast on out-of-memory errors +``` + +### Commit fcf2c2e (2026-02-03 17:09) +``` +Fix thread explosion: Limit MHD and OpenMP threads to 16 +``` + +These fixes address specific issues but don't resolve the accumulated corruption. + +--- + +## Suggested Investigation Areas + +1. **Queue management** (`src/queue.c`) + - Check for use-after-free or double-free + - Verify thread-safe access to shared queues + - Look for buffer overflows in batch assembly + +2. **Tensor lifecycle** + - Ensure tensors are properly freed after each batch + - Check for leaks in error paths + +3. **FFmpeg integration** + - `av_malloc_or_die` wrapper - verify all allocations freed + - Check for leaks in audio decoding path + +4. **Memory debugging** + - Run with AddressSanitizer: `ASAN_OPTIONS=detect_leaks=1` + - Run with Valgrind (may be slow with CUDA) + - Add heap corruption detection: `MALLOC_CHECK_=3` + +--- + +## Workaround for Production + +Use the auto-restart wrapper script that monitors health and restarts on crash: + +```bash +./scripts/run_pipeline_with_restart.sh 2026-01-19 16 5000 +``` + +This achieves sustained throughput by: +1. Processing in batches of 5000 files +2. Checking `/ready` endpoint after each batch +3. Restarting vfun if unresponsive +4. Continuing from checkpoint + +Expected throughput with restarts: ~800-900 files/min sustained + +--- + +## Test Commands Used + +```bash +# Start vfun +cd ~/strolid/vfun && ./vfun --port 4380 > /tmp/vfun_4380.log 2>&1 & + +# Run stress test +python3 scripts/nas_transcription_pipeline.py \ + --date 2026-01-19 \ + --hour 06 \ + --server 1 \ + --workers 16 \ + --limit 5000 + +# Check health +curl -s http://localhost:4380/ready + +# View crash log +tail -50 /tmp/vfun_4380.log +``` + +--- + +## Appendix: Test Data + +- **Source**: `/mnt/nas/Freeswitch1/2026-01-19/06/` +- **File count**: 43,940 WAV files +- **File format**: 8kHz, 16-bit, mono (standard telephony) +- **Avg duration**: ~60 seconds +- **Avg size**: ~960KB + +--- + +**Report prepared by:** Claude Code +**Contact:** Thomas diff --git a/scripts/find_bad_file.py b/scripts/find_bad_file.py new file mode 100644 index 0000000..8bf7675 --- /dev/null +++ b/scripts/find_bad_file.py @@ -0,0 +1,66 @@ +#!/usr/bin/env python3 +"""Find the specific file that crashes vfun""" + +import os +import sys +import time +import requests +from pathlib import Path + +VFUN_URL = "http://localhost:4380/wtf" + +def test_file(filepath): + """Test a single file, return True if successful""" + try: + with open(filepath, 'rb') as f: + response = requests.post( + VFUN_URL, + files={"file-binary": ("audio.wav", f, "audio/wav")}, + timeout=60 + ) + return response.status_code == 200 + except Exception as e: + return False + +def is_vfun_alive(): + """Check if vfun is responding""" + try: + r = requests.get("http://localhost:4380/ready", timeout=5) + return r.status_code == 200 + except: + return False + +def main(): + # Get files from hour 11 + base_dir = Path("/mnt/nas/Freeswitch1/2026-01-19/11") + files = sorted(base_dir.glob("*.wav"))[:2000] # First 2000 files + + print(f"Testing {len(files)} files to find crash point...") + print("") + + last_good = None + for i, filepath in enumerate(files): + if not is_vfun_alive(): + print(f"\n!!! vfun CRASHED after file #{i}") + print(f"Last good file: {last_good}") + print(f"Crash likely caused by: {filepath}") + print(f"Previous file: {files[i-1] if i > 0 else 'N/A'}") + return + + success = test_file(filepath) + if success: + last_good = filepath + if i % 100 == 0: + print(f"Progress: {i}/{len(files)} - OK") + else: + if not is_vfun_alive(): + print(f"\n!!! vfun CRASHED processing file #{i}") + print(f"Crash file: {filepath}") + return + else: + print(f"File #{i} failed but vfun still alive: {filepath}") + + print(f"\nAll {len(files)} files processed successfully!") + +if __name__ == "__main__": + main() diff --git a/scripts/nas_stress_test.py b/scripts/nas_stress_test.py new file mode 100644 index 0000000..a855d0f --- /dev/null +++ b/scripts/nas_stress_test.py @@ -0,0 +1,155 @@ +#!/usr/bin/env python3 +"""High-concurrency stress test for vfun using NAS audio files""" + +import time +import requests +import concurrent.futures +import statistics +import os +from pathlib import Path +from datetime import datetime +import random + +VFUN_URL = "http://localhost:4380" +NAS_PATH = "/mnt/nas/Freeswitch1/2026-01-19/06" + +def find_nas_wav_files(max_files=500): + """Find wav files in NAS directory""" + nas_dir = Path(NAS_PATH) + if not nas_dir.exists(): + print(f"ERROR: NAS path not found: {NAS_PATH}") + return [] + + wav_files = list(nas_dir.glob("*.wav"))[:max_files] + return wav_files + +def single_request(audio_file): + """Make a single transcription request""" + start = time.time() + try: + with open(audio_file, 'rb') as f: + files = {'file-binary': (audio_file.name, f, 'audio/wav')} + resp = requests.post(f"{VFUN_URL}/wtf", files=files, timeout=120) + elapsed = time.time() - start + if resp.status_code == 200: + result = resp.json() + text = result.get('text', '') + return (True, elapsed, len(text), audio_file.stat().st_size) + return (False, elapsed, 0, audio_file.stat().st_size) + except Exception as e: + elapsed = time.time() - start + return (False, elapsed, 0, 0) + +def run_stress_test(num_files=100, concurrent_workers=32): + """Run stress test with high concurrency""" + print(f"\n{'='*70}") + print(f"VFUN NAS STRESS TEST - {num_files} files, {concurrent_workers} concurrent workers") + print(f"Source: {NAS_PATH}") + print(f"{'='*70}") + + # Health check + try: + resp = requests.get(f"{VFUN_URL}/ready", timeout=5) + print(f"Server status: {resp.json()}") + except Exception as e: + print(f"Server not ready: {e}") + return + + # Find audio files + all_files = find_nas_wav_files(max_files=num_files * 2) + print(f"Found {len(all_files)} wav files on NAS") + + if len(all_files) == 0: + print("ERROR: No audio files found!") + return + + if len(all_files) < num_files: + # Repeat files if needed + test_files = all_files * ((num_files // len(all_files)) + 1) + else: + test_files = all_files + + # Shuffle and select + random.shuffle(test_files) + test_files = test_files[:num_files] + + total_size = sum(f.stat().st_size for f in test_files) + print(f"Testing with {len(test_files)} files ({total_size / (1024*1024):.2f} MB)") + print(f"Concurrent workers: {concurrent_workers}") + print(f"\nStarting at: {datetime.now().isoformat()}") + print("-" * 70) + + times = [] + successes = 0 + errors = 0 + total_text = 0 + total_bytes = 0 + + start_total = time.time() + completed = 0 + + with concurrent.futures.ThreadPoolExecutor(max_workers=concurrent_workers) as executor: + futures = {executor.submit(single_request, f): f for f in test_files} + + for future in concurrent.futures.as_completed(futures): + success, elapsed, text_len, file_size = future.result() + times.append(elapsed) + total_bytes += file_size + completed += 1 + + if success: + successes += 1 + total_text += text_len + else: + errors += 1 + + # Progress update every 10 files + if completed % 10 == 0: + elapsed_total = time.time() - start_total + rate = completed / elapsed_total if elapsed_total > 0 else 0 + print(f" Progress: {completed}/{num_files} ({rate:.1f} files/sec, {successes} ok, {errors} err)") + + total_time = time.time() - start_total + + print("-" * 70) + print(f"Finished at: {datetime.now().isoformat()}") + print(f"\n{'='*70}") + print("RESULTS") + print(f"{'='*70}") + print(f" Files processed: {len(test_files)}") + print(f" Success rate: {successes}/{len(test_files)} ({100*successes/len(test_files):.1f}%)") + print(f" Errors: {errors}") + print(f" Total data: {total_bytes / (1024*1024):.2f} MB") + print(f" Total text: {total_text} chars") + print() + print(f" Total time: {total_time:.2f}s") + print(f" Throughput: {len(test_files)/total_time:.2f} files/sec") + print(f" Data rate: {total_bytes / (1024*1024) / total_time:.2f} MB/sec") + print() + if times: + print(f" Min latency: {min(times):.3f}s") + print(f" Max latency: {max(times):.3f}s") + print(f" Avg latency: {statistics.mean(times):.3f}s") + print(f" P50 latency: {statistics.median(times):.3f}s") + print(f" P95 latency: {sorted(times)[int(len(times)*0.95)]:.3f}s") + print(f" P99 latency: {sorted(times)[int(len(times)*0.99)]:.3f}s") + if len(times) > 1: + print(f" Std dev: {statistics.stdev(times):.3f}s") + print(f"{'='*70}") + + # Estimate batching efficiency + avg_latency = statistics.mean(times) if times else 0 + theoretical_serial = avg_latency * len(test_files) + actual_parallel = total_time + parallelism = theoretical_serial / actual_parallel if actual_parallel > 0 else 0 + print(f"\nBATCHING ANALYSIS:") + print(f" Effective parallelism: {parallelism:.1f}x") + print(f" (If batching works well, this should be > {concurrent_workers}x)") + print(f" Theoretical serial time: {theoretical_serial:.1f}s") + print(f" Actual parallel time: {actual_parallel:.1f}s") + +if __name__ == "__main__": + import sys + num_files = int(sys.argv[1]) if len(sys.argv) > 1 else 100 + workers = int(sys.argv[2]) if len(sys.argv) > 2 else 32 + run_stress_test(num_files=num_files, concurrent_workers=workers) diff --git a/scripts/nas_transcription_pipeline.py b/scripts/nas_transcription_pipeline.py new file mode 100644 index 0000000..434174d --- /dev/null +++ b/scripts/nas_transcription_pipeline.py @@ -0,0 +1,753 @@ +#!/usr/bin/env python3 +""" +Production NAS Transcription Pipeline + +High-throughput transcription pipeline for processing phone call recordings +from NAS storage using the vfun transcription server. + +Performance characteristics (single vfun instance): +- Throughput: ~2,500 files/minute +- Latency: ~0.4s per file +- Capacity: ~3.5M files/day per vfun instance + +Usage: + # Process specific date/hour (transcription only) + python3 nas_transcription_pipeline.py --date 2026-01-19 --hour 06 --limit 100 + + # Process and store vCons in vcon-server + python3 nas_transcription_pipeline.py --date 2026-01-19 --store-vcons + + # Process all files from a date + python3 nas_transcription_pipeline.py --date 2026-01-19 --workers 16 + + # Dry run to see file counts + python3 nas_transcription_pipeline.py --date 2026-01-19 --dry-run + + # Save results to JSON + python3 nas_transcription_pipeline.py --date 2026-01-19 --output results.json +""" + +import os +import sys +import time +import json +import uuid +import queue +import threading +import argparse +import logging +from pathlib import Path +from datetime import datetime, timezone +from concurrent.futures import ThreadPoolExecutor, as_completed +from dataclasses import dataclass, asdict +from typing import Optional, List, Dict, Any +import requests + +# ============================================================================ +# Configuration +# ============================================================================ + +# vfun transcription servers +VFUN_URLS = [ + "http://localhost:4380/wtf", + # "http://localhost:4381/wtf", # Multiple instances hurt performance on single GPU + # "http://localhost:4382/wtf", + # "http://localhost:4383/wtf", +] + +# vcon-server API +VCON_API_URL = "http://localhost:8080/vcon" +VCON_INGRESS_LIST = "default" # Ingress list for processed vCons + +# NAS configuration +NAS_BASE = "/mnt/nas" +FREESWITCH_SERVERS = list(range(1, 21)) # Freeswitch1 through Freeswitch20 + +# Performance tuning +DEFAULT_WORKERS = 12 # Concurrent transcription workers +REQUEST_TIMEOUT = 300 # Transcription timeout (seconds) +MAX_RETRIES = 3 # Retries for NFS errors +RETRY_DELAY = 0.1 # Base delay between retries (seconds) +VCON_STORE_TIMEOUT = 30 # Timeout for storing vCons + +# Logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + + +# ============================================================================ +# Data Classes +# ============================================================================ + +@dataclass +class TranscriptionResult: + filepath: str + success: bool + duration: float + text: str = "" + text_length: int = 0 + error: Optional[str] = None + language: Optional[str] = None + vcon_uuid: Optional[str] = None + vcon_stored: bool = False + + def to_dict(self) -> Dict: + return { + "filepath": self.filepath, + "filename": Path(self.filepath).name, + "success": self.success, + "duration": round(self.duration, 3), + "text_length": self.text_length, + "language": self.language, + "error": self.error, + "vcon_uuid": self.vcon_uuid, + "vcon_stored": self.vcon_stored, + } + + +@dataclass +class FileMetadata: + """Metadata parsed from NAS filename.""" + campaign_id: str + caller_number: str + call_id: str + call_date: str + call_time: str + freeswitch_server: str + + @classmethod + def from_filepath(cls, filepath: str) -> Optional['FileMetadata']: + """Parse metadata from NAS file path. + + Expected format: /mnt/nas/Freeswitch{N}/{date}/{hour}/{campaign}_{caller}_{callid}_{date}_{time}.wav + Example: /mnt/nas/Freeswitch1/2026-01-19/06/6075_18557533609_993315706043435_2026-01-19_06:05:02.wav + """ + try: + path = Path(filepath) + filename = path.stem # Without .wav + + # Parse Freeswitch server from path + parts = path.parts + fs_server = None + for part in parts: + if part.startswith("Freeswitch"): + fs_server = part + break + + # Parse filename: campaign_caller_callid_date_time + # Example: 6075_18557533609_993315706043435_2026-01-19_06:05:02 + segments = filename.split('_') + if len(segments) >= 5: + campaign_id = segments[0] + caller_number = segments[1] + call_id = segments[2] + call_date = segments[3] + call_time = segments[4].replace(':', '-') # Normalize time format + + return cls( + campaign_id=campaign_id, + caller_number=caller_number, + call_id=call_id, + call_date=call_date, + call_time=call_time, + freeswitch_server=fs_server or "unknown" + ) + except Exception as e: + logger.debug(f"Failed to parse metadata from {filepath}: {e}") + + return None + + +@dataclass +class PipelineStats: + """Pipeline execution statistics.""" + start_time: float + processed: int = 0 + successful: int = 0 + failed: int = 0 + total_transcription_time: float = 0.0 + total_text_chars: int = 0 + vcons_created: int = 0 + vcons_stored: int = 0 + + def record(self, result: TranscriptionResult): + self.processed += 1 + self.total_transcription_time += result.duration + if result.success: + self.successful += 1 + self.total_text_chars += result.text_length + if result.vcon_uuid: + self.vcons_created += 1 + if result.vcon_stored: + self.vcons_stored += 1 + else: + self.failed += 1 + + def to_dict(self) -> Dict: + elapsed = time.time() - self.start_time + return { + "processed": self.processed, + "successful": self.successful, + "failed": self.failed, + "success_rate": round(self.successful / self.processed * 100, 1) if self.processed > 0 else 0, + "elapsed_seconds": round(elapsed, 1), + "throughput_per_min": round(self.processed / elapsed * 60, 1) if elapsed > 0 else 0, + "avg_latency": round(self.total_transcription_time / self.processed, 3) if self.processed > 0 else 0, + "total_text_chars": self.total_text_chars, + "vcons_created": self.vcons_created, + "vcons_stored": self.vcons_stored, + } + + +# ============================================================================ +# vCon Creation +# ============================================================================ + +def create_vcon( + filepath: str, + metadata: Optional[FileMetadata], + transcription_text: str, + language: str, + audio_duration: float = 60.0 +) -> Dict[str, Any]: + """Create a vCon document from transcription result. + + Args: + filepath: Path to the audio file + metadata: Parsed file metadata + transcription_text: The transcribed text + language: Detected language code + audio_duration: Duration of audio in seconds + + Returns: + vCon dictionary ready for submission to vcon-server + """ + vcon_uuid = str(uuid.uuid4()) + now = datetime.now(timezone.utc).isoformat() + + # Build parties from metadata + parties = [] + if metadata: + # Caller party + caller_tel = metadata.caller_number + if not caller_tel.startswith('+'): + caller_tel = f"+1{caller_tel}" if len(caller_tel) == 10 else f"+{caller_tel}" + parties.append({ + "tel": caller_tel, + "name": "Caller", + "meta": {"role": "customer"} + }) + # Agent party (placeholder) + parties.append({ + "tel": "+10000000000", + "name": "Agent", + "meta": {"role": "agent"} + }) + else: + parties = [ + {"tel": "+10000000001", "name": "Party 1"}, + {"tel": "+10000000002", "name": "Party 2"} + ] + + # Build dialog with file:// URL reference (no embedding) + dialog_start = now + if metadata: + try: + # call_time is stored as HH-MM-SS, convert to HH:MM:SSZ + dialog_start = f"{metadata.call_date}T{metadata.call_time.replace('-', ':')}Z" + except: + pass + + dialog = [{ + "type": "recording", + "start": dialog_start, + "parties": [0, 1], + "duration": audio_duration, + "mimetype": "audio/wav", + "url": f"file://{filepath}", # Reference file instead of embedding + }] + + # Build WTF transcription analysis + analysis_body = { + "transcript": { + "text": transcription_text, + "language": language, + "duration": audio_duration, + "confidence": 0.9, + }, + "segments": [], + "metadata": { + "created_at": now, + "processed_at": now, + "provider": "vfun", + "model": "parakeet-tdt-110m", + "audio": {"duration": audio_duration}, + }, + "quality": { + "average_confidence": 0.9, + "multiple_speakers": True, + "low_confidence_words": 0, + } + } + analysis = [{ + "type": "wtf_transcription", + "dialog": 0, + "vendor": "vfun", + "product": "parakeet-tdt-110m", + "schema": "wtf-1.0", + "encoding": "json", + "body": json.dumps(analysis_body), + }] + + # Build attachments with source metadata + attachments = [] + if metadata: + attachment_body = { + "freeswitch_server": metadata.freeswitch_server, + "campaign_id": metadata.campaign_id, + "call_id": metadata.call_id, + "source_file": filepath, + } + attachments.append({ + "type": "source_metadata", + "encoding": "json", + "body": json.dumps(attachment_body), + }) + + return { + "vcon": "0.0.1", + "uuid": vcon_uuid, + "created_at": now, + "parties": parties, + "dialog": dialog, + "analysis": analysis, + "attachments": attachments, + "group": [], + "redacted": {}, + } + + +def store_vcon(vcon: Dict[str, Any], ingress_list: Optional[str] = None) -> bool: + """Store a vCon in vcon-server. + + Args: + vcon: The vCon document to store + ingress_list: Optional ingress list to add the vCon to + + Returns: + True if stored successfully, False otherwise + """ + try: + params = {} + if ingress_list: + params["ingress_lists"] = [ingress_list] + + response = requests.post( + VCON_API_URL, + params=params, + json=vcon, + timeout=VCON_STORE_TIMEOUT + ) + + if response.status_code == 201: + return True + else: + logger.warning(f"Failed to store vCon {vcon['uuid']}: {response.status_code} {response.text[:100]}") + return False + + except Exception as e: + logger.warning(f"Error storing vCon {vcon['uuid']}: {e}") + return False + + +# ============================================================================ +# Load Balancer +# ============================================================================ + +class VfunLoadBalancer: + """Thread-safe round-robin load balancer for vfun instances.""" + + def __init__(self, urls: List[str]): + self.urls = urls + self.index = 0 + self.lock = threading.Lock() + self._check_health() + + def _check_health(self): + """Check which vfun instances are healthy.""" + healthy = [] + for url in self.urls: + try: + ready_url = url.replace("/wtf", "/ready") + resp = requests.get(ready_url, timeout=5) + if resp.status_code == 200: + healthy.append(url) + logger.info(f"vfun healthy: {url}") + else: + logger.warning(f"vfun unhealthy: {url} (status {resp.status_code})") + except Exception as e: + logger.warning(f"vfun unavailable: {url} ({e})") + + if not healthy: + raise RuntimeError("No healthy vfun instances available!") + + self.urls = healthy + + def get_url(self) -> str: + with self.lock: + url = self.urls[self.index % len(self.urls)] + self.index += 1 + return url + + +# ============================================================================ +# Transcription +# ============================================================================ + +def transcribe_file( + filepath: str, + vfun_lb: VfunLoadBalancer, + store_vcons: bool = False, + ingress_list: Optional[str] = None +) -> TranscriptionResult: + """Transcribe a single audio file with retry logic for NFS errors. + + Args: + filepath: Path to the audio file + vfun_lb: Load balancer for vfun instances + store_vcons: Whether to create and store a vCon + ingress_list: Optional ingress list for the vCon + + Returns: + TranscriptionResult with transcription and optional vCon info + """ + start = time.time() + last_error = None + audio_data = None + file_size = 0 + + for attempt in range(MAX_RETRIES): + try: + if attempt > 0: + time.sleep(RETRY_DELAY * attempt) + + with open(filepath, 'rb') as f: + audio_data = f.read() + file_size = len(audio_data) + + url = vfun_lb.get_url() + response = requests.post( + url, + files={"file-binary": ("audio.wav", audio_data, "audio/wav")}, + timeout=REQUEST_TIMEOUT + ) + + duration = time.time() - start + + if response.status_code == 200: + data = response.json() + text = data.get("text", "") + language = data.get("language", "en") + + result = TranscriptionResult( + filepath=filepath, + success=True, + duration=duration, + text=text, + text_length=len(text), + language=language + ) + + # Create and store vCon if requested + if store_vcons and text: + # Estimate audio duration from file size (8kHz 16-bit mono) + audio_duration = file_size / 16000.0 if file_size > 0 else 60.0 + + metadata = FileMetadata.from_filepath(filepath) + vcon = create_vcon( + filepath=filepath, + metadata=metadata, + transcription_text=text, + language=language, + audio_duration=audio_duration + ) + result.vcon_uuid = vcon["uuid"] + + if store_vcon(vcon, ingress_list): + result.vcon_stored = True + + return result + else: + last_error = f"HTTP {response.status_code}: {response.text[:100]}" + + except OSError as e: + last_error = str(e) + continue + except requests.exceptions.Timeout: + last_error = "Request timeout" + continue + except Exception as e: + last_error = str(e) + break + + return TranscriptionResult( + filepath=filepath, + success=False, + duration=time.time() - start, + error=last_error + ) + + +# ============================================================================ +# File Discovery +# ============================================================================ + +def find_audio_files( + base_path: str, + date: Optional[str] = None, + hour: Optional[str] = None, + servers: Optional[List[int]] = None, + limit: Optional[int] = None +) -> List[str]: + """Find audio files in the NAS Freeswitch structure.""" + files = [] + servers = servers or FREESWITCH_SERVERS + + for server_num in servers: + server_path = Path(base_path) / f"Freeswitch{server_num}" + if not server_path.exists(): + continue + + if date and hour: + search_path = server_path / date / hour + if search_path.exists(): + for f in search_path.iterdir(): + if f.suffix == '.wav': + files.append(str(f)) + if limit and len(files) >= limit: + return files + elif date: + date_path = server_path / date + if date_path.exists(): + for hour_dir in date_path.iterdir(): + if hour_dir.is_dir(): + try: + for f in hour_dir.iterdir(): + if f.suffix == '.wav': + files.append(str(f)) + if limit and len(files) >= limit: + return files + except PermissionError: + logger.warning(f"Permission denied: {hour_dir}") + continue + else: + for f in server_path.rglob("*.wav"): + files.append(str(f)) + if limit and len(files) >= limit: + return files + + return files + + +# ============================================================================ +# Pipeline Execution +# ============================================================================ + +def run_pipeline( + files: List[str], + workers: int, + vfun_lb: VfunLoadBalancer, + verbose: bool = False, + store_vcons: bool = False, + ingress_list: Optional[str] = None +) -> tuple[List[TranscriptionResult], PipelineStats]: + """Execute the transcription pipeline. + + Args: + files: List of audio file paths to process + workers: Number of concurrent workers + vfun_lb: Load balancer for vfun instances + verbose: Whether to print detailed output + store_vcons: Whether to create and store vCons + ingress_list: Optional ingress list for vCons + + Returns: + Tuple of (results list, stats) + """ + stats = PipelineStats(start_time=time.time()) + results = [] + lock = threading.Lock() + + def process_and_record(filepath): + result = transcribe_file(filepath, vfun_lb, store_vcons, ingress_list) + with lock: + stats.record(result) + results.append(result) + + if verbose: + status = "✓" if result.success else "✗" + filename = Path(filepath).name[:50] + print(f" {status} {filename}: {result.duration:.2f}s", end="") + if result.success: + extra = f" ({result.text_length} chars, {result.language})" + if result.vcon_stored: + extra += f" [vCon: {result.vcon_uuid[:8]}...]" + print(extra) + else: + print(f" ERROR: {result.error}") + else: + if result.vcon_stored: + sys.stdout.write("V") # V for vCon stored + elif result.success: + sys.stdout.write(".") + else: + sys.stdout.write("X") + sys.stdout.flush() + + return result + + with ThreadPoolExecutor(max_workers=workers) as executor: + futures = [executor.submit(process_and_record, f) for f in files] + for future in as_completed(futures): + future.result() # Raise any exceptions + + if not verbose: + print() + + return results, stats + + +# ============================================================================ +# Main +# ============================================================================ + +def main(): + parser = argparse.ArgumentParser( + description="Production NAS transcription pipeline", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=__doc__ + ) + parser.add_argument("--date", help="Process specific date (YYYY-MM-DD)") + parser.add_argument("--hour", help="Process specific hour (00-23)") + parser.add_argument("--server", type=int, action="append", + help="Specific Freeswitch server number (can repeat)") + parser.add_argument("--limit", type=int, help="Limit files to process") + parser.add_argument("--workers", type=int, default=DEFAULT_WORKERS, + help=f"Concurrent workers (default: {DEFAULT_WORKERS})") + parser.add_argument("--verbose", "-v", action="store_true", + help="Verbose output") + parser.add_argument("--dry-run", action="store_true", + help="Find files but don't process") + parser.add_argument("--output", "-o", help="Save results to JSON file") + parser.add_argument("--store-vcons", action="store_true", + help="Create and store vCons in vcon-server") + parser.add_argument("--ingress-list", default=VCON_INGRESS_LIST, + help=f"Ingress list for vCons (default: {VCON_INGRESS_LIST})") + args = parser.parse_args() + + # Header + print("=" * 70) + print("NAS TRANSCRIPTION PIPELINE") + print("=" * 70) + + # Initialize vfun load balancer + try: + vfun_lb = VfunLoadBalancer(VFUN_URLS) + except RuntimeError as e: + print(f"ERROR: {e}") + sys.exit(1) + + print(f"vfun instances: {len(vfun_lb.urls)}") + print(f"Workers: {args.workers}") + print(f"Filters: date={args.date or 'all'}, hour={args.hour or 'all'}") + if args.limit: + print(f"Limit: {args.limit} files") + if args.store_vcons: + print(f"vCon storage: ENABLED (ingress: {args.ingress_list})") + print() + + # Find files + logger.info("Scanning NAS for audio files...") + files = find_audio_files( + NAS_BASE, + date=args.date, + hour=args.hour, + servers=args.server, + limit=args.limit + ) + print(f"Found {len(files):,} audio files") + + if not files: + print("No files found!") + sys.exit(0) + + if args.dry_run: + print("\nDry run - sample files:") + for f in files[:10]: + print(f" {f}") + if len(files) > 10: + print(f" ... and {len(files) - 10:,} more") + sys.exit(0) + + # Run pipeline + print(f"\nProcessing {len(files):,} files...") + if args.store_vcons: + print("Legend: V=vCon stored, .=transcribed, X=failed") + print("-" * 70) + + results, stats = run_pipeline( + files, + args.workers, + vfun_lb, + verbose=args.verbose, + store_vcons=args.store_vcons, + ingress_list=args.ingress_list if args.store_vcons else None + ) + + # Summary + summary = stats.to_dict() + print("\n" + "=" * 70) + print("RESULTS") + print("=" * 70) + print(f"Processed: {summary['processed']:,} files") + print(f"Successful: {summary['successful']:,} ({summary['success_rate']}%)") + print(f"Failed: {summary['failed']:,}") + print(f"Elapsed: {summary['elapsed_seconds']}s") + print(f"Throughput: {summary['throughput_per_min']:,.0f} files/min") + print(f"Avg latency: {summary['avg_latency']}s per file") + print(f"Total text: {summary['total_text_chars']:,} characters") + if args.store_vcons: + print(f"vCons created: {summary['vcons_created']:,}") + print(f"vCons stored: {summary['vcons_stored']:,}") + + # Errors + errors = [r for r in results if not r.success] + if errors: + print(f"\nErrors ({len(errors)}):") + for e in errors[:5]: + print(f" {Path(e.filepath).name}: {e.error}") + if len(errors) > 5: + print(f" ... and {len(errors) - 5} more") + + # Save results + if args.output: + output_data = { + "stats": summary, + "config": { + "date": args.date, + "hour": args.hour, + "workers": args.workers, + "vfun_instances": len(vfun_lb.urls), + }, + "results": [r.to_dict() for r in results], + } + with open(args.output, 'w') as f: + json.dump(output_data, f, indent=2) + print(f"\nResults saved to: {args.output}") + + +if __name__ == "__main__": + main() diff --git a/scripts/run_pipeline_with_restart.sh b/scripts/run_pipeline_with_restart.sh new file mode 100755 index 0000000..0c46e55 --- /dev/null +++ b/scripts/run_pipeline_with_restart.sh @@ -0,0 +1,82 @@ +#!/bin/bash +# Auto-restart pipeline when vfun crashes + +DATE="${1:-2026-01-19}" +WORKERS="${2:-64}" +BATCH_SIZE="${3:-5000}" + +echo "=== Pipeline with Auto-Restart ===" +echo "Date: $DATE" +echo "Workers: $WORKERS" +echo "Batch size: $BATCH_SIZE" +echo "" + +restart_vfun() { + echo "$(date +%H:%M:%S) Restarting vfun..." + pkill -9 -f "vfun --port 4380" 2>/dev/null + sleep 2 + cd ~/strolid/vfun && ./vfun --port 4380 > /tmp/vfun_4380.log 2>&1 & + sleep 18 + if curl -s http://localhost:4380/ready > /dev/null; then + echo "$(date +%H:%M:%S) vfun ready" + return 0 + else + echo "$(date +%H:%M:%S) vfun failed to start" + return 1 + fi +} + +# Initial start +restart_vfun || exit 1 + +TOTAL_PROCESSED=0 +TOTAL_VCCONS=0 +BATCH=1 + +while true; do + echo "" + echo "=== Batch $BATCH (limit $BATCH_SIZE) ===" + + # Run pipeline + OUTPUT=$(python3 scripts/nas_transcription_pipeline.py \ + --date "$DATE" \ + --server 1 \ + --workers "$WORKERS" \ + --limit "$BATCH_SIZE" \ + --store-vcons 2>&1) + + # Extract stats + PROCESSED=$(echo "$OUTPUT" | grep "Processed:" | grep -oE '[0-9,]+' | head -1 | tr -d ',') + VCCONS=$(echo "$OUTPUT" | grep "vCons stored:" | grep -oE '[0-9,]+' | tr -d ',') + FAILED=$(echo "$OUTPUT" | grep "Failed:" | grep -oE '[0-9,]+' | tr -d ',') + + if [ -n "$PROCESSED" ]; then + TOTAL_PROCESSED=$((TOTAL_PROCESSED + PROCESSED)) + TOTAL_VCCONS=$((TOTAL_VCCONS + VCCONS)) + echo "Batch $BATCH: $PROCESSED files, $VCCONS vCons, $FAILED failed" + echo "Total so far: $TOTAL_PROCESSED files, $TOTAL_VCCONS vCons" + fi + + # Check if vfun crashed + if ! curl -s http://localhost:4380/ready > /dev/null; then + echo "$(date +%H:%M:%S) vfun crashed, restarting..." + restart_vfun || exit 1 + fi + + # Check if we got fewer files than batch size (done) + if [ -n "$PROCESSED" ] && [ "$PROCESSED" -lt "$BATCH_SIZE" ]; then + echo "" + echo "=== COMPLETE ===" + echo "Total processed: $TOTAL_PROCESSED" + echo "Total vCons: $TOTAL_VCCONS" + break + fi + + BATCH=$((BATCH + 1)) + + # Safety limit + if [ $BATCH -gt 100 ]; then + echo "Safety limit reached" + break + fi +done diff --git a/signoz/README.md b/signoz/README.md index cc38424..25ecd0f 100644 --- a/signoz/README.md +++ b/signoz/README.md @@ -1,12 +1,12 @@ # SigNoz Observability Stack for vcon-server -This directory contains the configuration for SigNoz, a self-hosted observability platform that collects traces, metrics, and logs from the vcon-mcp server via OpenTelemetry. +This directory contains the configuration for SigNoz, a self-hosted observability platform that collects traces, metrics, and logs from vcon-server (conserver and api) via OpenTelemetry. ## Architecture ``` ┌─────────────────┐ OTLP/HTTP ┌──────────────────────┐ -│ vcon-mcp │ ─────────────────► │ signoz-otel-collector│ +│ conserver / api │ ─────────────────► │ signoz-otel-collector│ │ (instrumented) │ :4318 │ (OTLP receiver) │ └─────────────────┘ └──────────┬───────────┘ │ @@ -56,6 +56,8 @@ Basic alertmanager configuration (not currently active). ### Start with SigNoz +When you use `docker-compose.signoz.yml`, the **conserver** and **api** services are overridden to run with `opentelemetry-instrument` and OTEL environment variables so traces and metrics are sent to the SignOz collector (HTTP OTLP on port 4318). Service names appear in SignOz as `conserver` and `conserver.api`. + ```bash cd /home/thomas/bds/vcon-dev/vcon-server docker compose -f docker-compose.yml -f docker-compose.override.yml -f docker-compose.signoz.yml up -d @@ -80,7 +82,7 @@ Open http://localhost:3301 in your browser. ## First-Time Setup -After starting SigNoz for the first time, run the schema migrations: +After starting SigNoz for the first time, run the schema migrations (required for Traces, Metrics, and **Logs**): ```bash docker run --rm --network conserver \ @@ -90,6 +92,14 @@ docker run --rm --network conserver \ Note: Some migrations may fail due to JSON type syntax incompatibility with ClickHouse 24.1. Core functionality still works. +Verify that logs schema exists (needed for the Logs tab): + +```bash +docker exec signoz-clickhouse clickhouse-client --query "SHOW TABLES FROM signoz_logs" +``` + +You should see tables such as `logs_v2`, `distributed_logs_v2`, etc. If the database or tables are missing, the Logs tab will show "Aw snap" when you open it. + ## vcon-mcp Integration The vcon-mcp service is configured with these environment variables in `docker-compose.override.yml`: @@ -137,6 +147,16 @@ environment: - Ensure collector is receiving data: check collector metrics at port 8888 - Verify ClickHouse tables exist: `docker exec signoz-clickhouse clickhouse-client --query "SHOW TABLES FROM signoz_traces"` +### Logs tab shows "Aw snap" or "Something went wrong" +Two possible causes: + +1. **Logs schema missing** + Run the schema migrator (see **First-Time Setup** above), verify `signoz_logs` tables exist, then restart `signoz` and refresh the Logs tab. + +2. **Query service panic (telemetry TTL check)** + The query service runs a telemetry cron that checks TTL for `signoz_logs.logs`. The schema migrator only creates `logs_v2`, so that table doesn't exist and the cron can panic (nil pointer), crashing the service. **Fix:** set `TELEMETRY_ENABLED=false` for the `signoz` service in `docker-compose.signoz.yml` (already set in this repo), then recreate the container: + `docker compose -f docker-compose.yml -f docker-compose.override.yml -f docker-compose.signoz.yml up -d signoz --force-recreate` + ### Port conflicts - Default ports: 3301 (UI), 4317 (gRPC), 4318 (HTTP) - Change in docker-compose.signoz.yml if needed diff --git a/signoz/dashboards/.gitkeep b/signoz/dashboards/.gitkeep new file mode 100644 index 0000000..989d6cc --- /dev/null +++ b/signoz/dashboards/.gitkeep @@ -0,0 +1,2 @@ +# Dashboards directory for SigNoz query service (DASHBOARDS_PATH). +# Add JSON dashboard definitions here if needed. diff --git a/signoz/otel-collector-config.yaml b/signoz/otel-collector-config.yaml index eeedb0c..3766f7f 100644 --- a/signoz/otel-collector-config.yaml +++ b/signoz/otel-collector-config.yaml @@ -5,6 +5,10 @@ receivers: endpoint: 0.0.0.0:4317 http: endpoint: 0.0.0.0:4318 + # HTTP log receiver for logspout-signoz and curl/API log ingestion + httplogreceiver/json: + endpoint: 0.0.0.0:8082 + source: json processors: batch: @@ -43,6 +47,6 @@ service: processors: [batch] exporters: [signozclickhousemetrics] logs: - receivers: [otlp] + receivers: [otlp, httplogreceiver/json] processors: [batch] exporters: [clickhouselogsexporter] From 631ca58690bf8710a6db49ea1d9c4fdad6b443e8 Mon Sep 17 00:00:00 2001 From: Thomas Howe Date: Wed, 18 Feb 2026 19:13:38 +0000 Subject: [PATCH 11/11] Update chain diagram for webhook-as-storage architecture Co-Authored-By: Claude Opus 4.6 --- docs/PERFORMANCE_TESTING.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/docs/PERFORMANCE_TESTING.md b/docs/PERFORMANCE_TESTING.md index 0802cb5..e77c845 100644 --- a/docs/PERFORMANCE_TESTING.md +++ b/docs/PERFORMANCE_TESTING.md @@ -140,9 +140,10 @@ python3 scripts/nas_transcription_pipeline.py --date 2026-01-19 --dry-run ### Pipeline Chain Configuration ``` -main_chain: ingress:default → tag → supabase_webhook → egress:processed -transcription: ingress:transcribe → tag → wtf_transcribe → keyword_tagger → supabase_webhook → egress:transcribed +main_chain: ingress:default → tag → expire_vcon → egress:processed → storage: supabase_webhook +transcription: ingress:transcribe → tag → wtf_transcribe → keyword_tagger → expire_vcon → egress:transcribed → storage: supabase_webhook ``` +Note: `supabase_webhook` runs as a post-chain storage (parallel, non-blocking) via `storage.webhook` module. ---