diff --git a/.env.example b/.env.example
index 99c8152..67cc4e9 100644
--- a/.env.example
+++ b/.env.example
@@ -12,3 +12,12 @@ LOGGING_CONFIG_FILE=server/logging_dev.conf
# Groq API key for Whisper transcription
GROQ_API_KEY=your_groq_api_key_here
+
+# OpenTelemetry (used when running with docker-compose.signoz.yml)
+# conserver and api send traces/metrics to signoz-otel-collector when the SignOz stack is enabled
+# OTEL_EXPORTER_OTLP_ENDPOINT=http://signoz-otel-collector:4318
+# OTEL_EXPORTER_OTLP_PROTOCOL=http/protobuf
+# OTEL_TRACES_EXPORTER=otlp
+# OTEL_METRICS_EXPORTER=otlp
+# OTEL_LOGS_EXPORTER=otlp
+# OTEL_SERVICE_NAME=conserver
diff --git a/docker-compose.signoz.yml b/docker-compose.signoz.yml
new file mode 100644
index 0000000..ff5c6c4
--- /dev/null
+++ b/docker-compose.signoz.yml
@@ -0,0 +1,158 @@
+# SigNoz Observability Stack
+# Usage: docker compose -f docker-compose.yml -f docker-compose.override.yml -f docker-compose.signoz.yml up -d
+#
+# After first run, execute schema migrations:
+# docker run --rm --network conserver signoz/signoz-schema-migrator:latest sync --dsn='tcp://signoz-clickhouse:9000'
+#
+# Access UI at: http://localhost:3301
+
+networks:
+ conserver:
+ external: true
+
+volumes:
+ signoz_clickhouse_data:
+ signoz_zookeeper_data:
+ signoz_zookeeper_log:
+ signoz_data:
+
+services:
+ signoz-zookeeper:
+ image: zookeeper:3.9
+ container_name: signoz-zookeeper
+ hostname: signoz-zookeeper
+ environment:
+ - ZOO_AUTOPURGE_PURGEINTERVAL=1
+ - ZOO_4LW_COMMANDS_WHITELIST=mntr,ruok,stat
+ volumes:
+ - signoz_zookeeper_data:/data
+ - signoz_zookeeper_log:/datalog
+ networks:
+ - conserver
+ healthcheck:
+ test: ["CMD-SHELL", "echo ruok | nc localhost 2181 | grep imok"]
+ interval: 30s
+ timeout: 10s
+ retries: 3
+ restart: unless-stopped
+
+ signoz-clickhouse:
+ image: clickhouse/clickhouse-server:24.1.2-alpine
+ container_name: signoz-clickhouse
+ hostname: signoz-clickhouse
+ tty: true
+ depends_on:
+ signoz-zookeeper:
+ condition: service_healthy
+ volumes:
+ - signoz_clickhouse_data:/var/lib/clickhouse
+ - ./signoz/zz-clickhouse-config.xml:/etc/clickhouse-server/config.d/zz-clickhouse-config.xml:ro
+ - ./signoz/clickhouse-users.xml:/etc/clickhouse-server/users.d/users.xml:ro
+ environment:
+ - CLICKHOUSE_DB=signoz_traces
+ - CLICKHOUSE_USER=default
+ - CLICKHOUSE_PASSWORD=
+ ulimits:
+ nofile:
+ soft: 262144
+ hard: 262144
+ networks:
+ - conserver
+ healthcheck:
+ test: ["CMD", "wget", "--no-verbose", "--tries=1", "--spider", "http://127.0.0.1:8123/ping"]
+ interval: 30s
+ timeout: 10s
+ retries: 3
+ restart: unless-stopped
+
+ signoz-otel-collector:
+ image: signoz/signoz-otel-collector:latest
+ container_name: signoz-otel-collector
+ hostname: signoz-otel-collector
+ command:
+ - "--config=/etc/otel-collector-config.yaml"
+ depends_on:
+ signoz-clickhouse:
+ condition: service_healthy
+ environment:
+ - OTEL_RESOURCE_ATTRIBUTES=host.name=signoz-host,os.type=linux
+ volumes:
+ - ./signoz/otel-collector-config.yaml:/etc/otel-collector-config.yaml:ro
+ ports:
+ - "4317:4317" # OTLP gRPC receiver
+ - "4318:4318" # OTLP HTTP receiver
+ networks:
+ - conserver
+ healthcheck:
+ test: ["CMD", "wget", "--no-verbose", "--tries=1", "--spider", "http://127.0.0.1:13133/"]
+ interval: 30s
+ timeout: 10s
+ retries: 3
+ restart: unless-stopped
+
+ signoz:
+ image: signoz/query-service:latest
+ container_name: signoz
+ hostname: signoz
+ depends_on:
+ signoz-clickhouse:
+ condition: service_healthy
+ environment:
+ - ClickHouseUrl=tcp://signoz-clickhouse:9000
+ - SIGNOZ_LOCAL_DB_PATH=/var/lib/signoz/signoz.db
+ - DASHBOARDS_PATH=/root/config/dashboards
+ - STORAGE=clickhouse
+ - GODEBUG=netdns=go
+ - TELEMETRY_ENABLED=true
+ - DEPLOYMENT_TYPE=docker-standalone
+ volumes:
+ - signoz_data:/var/lib/signoz
+ - ./signoz/dashboards:/root/config/dashboards
+ ports:
+ - "3301:8080" # Web UI
+ networks:
+ - conserver
+ healthcheck:
+ test: ["CMD", "wget", "--no-verbose", "--tries=1", "--spider", "http://127.0.0.1:8080/api/v1/health"]
+ interval: 30s
+ timeout: 10s
+ retries: 3
+ restart: unless-stopped
+
+ # Override conserver and api to send traces/metrics to SignOz (OTLP HTTP)
+ conserver:
+ command: "opentelemetry-instrument python ./server/main.py"
+ environment:
+ OTEL_EXPORTER_OTLP_ENDPOINT: http://signoz-otel-collector:4318
+ OTEL_EXPORTER_OTLP_PROTOCOL: http/protobuf
+ OTEL_TRACES_EXPORTER: otlp
+ OTEL_METRICS_EXPORTER: otlp
+ OTEL_LOGS_EXPORTER: otlp
+ OTEL_SERVICE_NAME: conserver
+ depends_on:
+ signoz-otel-collector:
+ condition: service_healthy
+
+ api:
+ command: /bin/bash -c "opentelemetry-instrument uvicorn server.api:app --host 0.0.0.0 --port 8000"
+ environment:
+ OTEL_EXPORTER_OTLP_ENDPOINT: http://signoz-otel-collector:4318
+ OTEL_EXPORTER_OTLP_PROTOCOL: http/protobuf
+ OTEL_TRACES_EXPORTER: otlp
+ OTEL_METRICS_EXPORTER: otlp
+ OTEL_LOGS_EXPORTER: otlp
+ OTEL_SERVICE_NAME: conserver.api
+ depends_on:
+ signoz-otel-collector:
+ condition: service_healthy
+
+ logspout-signoz:
+ image: pavanputhra/logspout-signoz
+ container_name: logspout-signoz
+ restart: unless-stopped
+ volumes:
+ - /var/run/docker.sock:/var/run/docker.sock
+ environment:
+ SIGNOZ_LOG_ENDPOINT: http://172.17.0.1:8082
+ ENV: prod
+ command: signoz://172.17.0.1:8082
\ No newline at end of file
diff --git a/docker/Dockerfile b/docker/Dockerfile
index f1c781d..13091e2 100644
--- a/docker/Dockerfile
+++ b/docker/Dockerfile
@@ -10,6 +10,9 @@ ENV VCON_SERVER_VERSION=${VCON_SERVER_VERSION}
ENV VCON_SERVER_GIT_COMMIT=${VCON_SERVER_GIT_COMMIT}
ENV VCON_SERVER_BUILD_TIME=${VCON_SERVER_BUILD_TIME}
+# Configure apt to use HTTPS sources (required when HTTP port 80 is blocked)
+RUN sed -i 's|http://deb.debian.org|https://deb.debian.org|g' /etc/apt/sources.list.d/debian.sources
+
RUN apt-get update && \
apt-get install -y libavdevice-dev ffmpeg
diff --git a/docs/PERFORMANCE_TESTING.md b/docs/PERFORMANCE_TESTING.md
new file mode 100644
index 0000000..e77c845
--- /dev/null
+++ b/docs/PERFORMANCE_TESTING.md
@@ -0,0 +1,216 @@
+# Performance Testing Notes
+
+**Last Updated:** 2026-02-02
+
+## Test Environment
+
+### Servers
+- **Conserver (vcon-server)**: http://localhost:8080 (token: `mulliganmccarthy`)
+- **vfun (transcription)**: http://localhost:4380/wtf
+
+### NAS Storage
+
+**Mount Point:** `/mnt/nas`
+```
+64.187.219.131:/mnt/slave_recording → /mnt/nas (NFS4)
+- rsize/wsize: 1MB
+- Protocol: TCP
+- Hard mount with 600s timeout
+```
+
+**Directory Structure:**
+```
+/mnt/nas/
+├── Freeswitch1/ # 20 Freeswitch servers (1-20)
+│ ├── 2026-01-19/ # Date directories (15+ days available)
+│ │ ├── 06/ # Hour directories (00-23)
+│ │ │ └── *.wav # Recording files (~489k per day)
+│ │ ├── 07/
+│ │ └── ...
+│ ├── 2026-01-20/
+│ └── ...
+├── Freeswitch2/
+├── ...
+├── Freeswitch20/
+├── Batch1_recording/
+├── pcaps_*/ # Packet captures
+└── fs_collect_by_number.sh # Collection utility
+```
+
+**File Naming Pattern:**
+```
+{campaign}_{caller}_{callid}_{date}_{time}.wav
+Example: 10508_12026661845_993317168030975_2026-01-19_06:47:08.wav
+
+Fields:
+- campaign: Campaign/extension ID (e.g., 10508, 6075, 9676)
+- caller: Phone number (e.g., 12026661845)
+- callid: Unique call ID (e.g., 993317168030975)
+- date: YYYY-MM-DD
+- time: HH:MM:SS
+```
+
+**Scale:**
+- ~489,000 recordings per day per Freeswitch server
+- ~9.78 million recordings/day across all 20 servers
+- ~938 KB average file size (~60 seconds @ 8kHz 16-bit)
+- ~9 TB/day of new recordings
+- 15+ days of historical data
+- Access requires `nasgroup` membership
+
+---
+
+## Performance Results (2026-02-02)
+
+### Conserver API
+| Metric | Value |
+|--------|-------|
+| Throughput | 151.68 req/s |
+| Avg Latency | 57.22 ms |
+| Success Rate | 100% |
+
+### vfun Transcription (Local Files)
+| Metric | Value |
+|--------|-------|
+| Throughput | 32.72 files/sec |
+| Data Rate | 30.36 MB/sec |
+| Peak GPU Utilization | 95% |
+
+### vfun Transcription (NAS Files)
+| Files | Workers | Throughput | Data Rate | Parallelism |
+|-------|---------|------------|-----------|-------------|
+| 100 | 32 | 48.40 files/sec | 34.08 MB/s | 25.9x |
+| 200 | 64 | 45.60 files/sec | 30.92 MB/s | 47.9x |
+| 500 | 64 | 43.63 files/sec | 30.85 MB/s | 59.4x |
+
+### Full Pipeline (NAS → vfun → vCon → Conserver → vcon-mcp)
+| Files | Workers | Throughput | vCons Stored | Success |
+|-------|---------|------------|--------------|---------|
+| 50 | 16 | 2,447 files/min | 35 | 100% |
+| 500 | 48 | 2,576 files/min | 362 | 100% |
+| 1,000 | 64 | **2,973 files/min** | 703 | 100% |
+
+**Full Pipeline Capacity (single vfun instance):**
+- ~3,000 files/min = **~4.3 million files/day**
+- vCon creation adds minimal overhead (~1ms per vCon)
+- Conserver chain processing: ~10ms per vCon
+- Webhook to vcon-mcp (Supabase): ~100-200ms per vCon
+
+**Key Findings:**
+- NAS network storage does not bottleneck transcription
+- GPU batching works efficiently (59.4x parallelism vs 64x max)
+- Sustained ~44-48 files/sec with high concurrency
+- 100% success rate across 1,500+ files
+- Full pipeline maintains ~48 files/sec throughput
+
+### vfun Batching Configuration
+```
+GPU_MAX_BATCH_SIZE = 64
+GPU_COALESCE_TIMEOUT_US = 5000 (5ms)
+GPU_COALESCE_MIN_FILL = 16
+```
+
+---
+
+## Test Scripts
+
+Located in `scripts/`:
+- `nas_transcription_pipeline.py` - **Production pipeline** with vCon creation and storage
+- `nas_stress_test.py` - High-concurrency vfun stress test with NAS files
+
+### Running Tests
+
+```bash
+# Check servers
+curl -s http://localhost:8080/docs | head -5
+curl -s http://localhost:4380/ready
+
+# Start vfun if needed
+cd ~/strolid/vfun && ./vfun --port 4380
+
+# Run vfun-only stress test
+python3 scripts/nas_stress_test.py 200 64
+
+# Run full pipeline (transcription + vCon storage)
+python3 scripts/nas_transcription_pipeline.py --date 2026-01-19 --hour 06 --limit 500 --workers 48 --store-vcons
+
+# Dry run to see file counts
+python3 scripts/nas_transcription_pipeline.py --date 2026-01-19 --dry-run
+```
+
+### Pipeline Chain Configuration
+```
+main_chain: ingress:default → tag → expire_vcon → egress:processed → storage: supabase_webhook
+transcription: ingress:transcribe → tag → wtf_transcribe → keyword_tagger → expire_vcon → egress:transcribed → storage: supabase_webhook
+```
+Note: `supabase_webhook` runs as a post-chain storage (parallel, non-blocking) via `storage.webhook` module.
+
+---
+
+## vfun Stability Issues (CUDA Crashes)
+
+### Root Cause Analysis (2026-02-02)
+
+**Problem:** vfun crashes intermittently after processing hundreds of files under sustained load.
+
+**Investigation findings:**
+1. **NOT the NAS** - Files read correctly, NAS performance is stable
+2. **NOT memory leaks** - GPU memory stable at ~12.6GB throughout processing
+3. **NOT single file issues** - Crash-causing files process fine individually
+4. **IS a CUDA batching issue** - Specific batch combinations trigger cuBLAS failures
+
+**Error signature:**
+```
+RuntimeError: CUDA error: CUBLAS_STATUS_EXECUTION_FAILED when calling cublasLtMatmul
+with transpose_mat1 1 transpose_mat2 0 m 1024 n 251 k 1024
+```
+
+**What happens:**
+1. Under high concurrency, vfun batches audio files for GPU processing
+2. Certain combinations of audio lengths create tensor dimensions that trigger cuBLAS matrix multiplication failures
+3. The CUDA error corrupts GPU state, leaving vfun hung (process exists but unresponsive to `/ready` endpoint)
+4. GPU memory shows 0 MiB used after crash (resources released but process not terminated)
+
+**Affected dimensions:** The `n=251` parameter in the error suggests certain audio sequence lengths cause problematic matrix sizes during the transformer decoder forward pass.
+
+### Workarounds for Production
+
+**1. Auto-restart script:**
+```bash
+#!/bin/bash
+# Run pipeline with automatic vfun restart on crash
+restart_vfun() {
+ pkill -9 -f "vfun --port 4380"
+ sleep 2
+ cd ~/strolid/vfun && ./vfun --port 4380 > /tmp/vfun.log 2>&1 &
+ sleep 10
+}
+
+# Check health every 30 seconds, restart if hung
+while true; do
+ if ! curl -s --max-time 5 http://localhost:4380/ready > /dev/null 2>&1; then
+ echo "$(date) - vfun crash detected, restarting..."
+ restart_vfun
+ fi
+ sleep 30
+done
+```
+
+**2. Reduce concurrency** (may reduce throughput but fewer crashes):
+- Try 32-48 workers instead of 64
+- Smaller batches reduce likelihood of problematic tensor dimensions
+
+**3. Batch processing with checkpoints:**
+- Process in batches of 2000-3000 files
+- Restart vfun between batches preventively
+- Track progress in checkpoint files
+
+### Investigation Scripts
+
+Located in `scripts/`:
+- `find_bad_file.py` - Tests files sequentially to identify crash point
+- `run_pipeline_with_restart.sh` - Pipeline with auto-restart capability
+
+### Logs to Check
+- `/tmp/vfun.log` or `/tmp/vfun_test.log` - vfun stdout/stderr including CUDA errors
+- Pipeline logs show last successful file before crash
diff --git a/docs/VFUN_CRASH_REPORT.md b/docs/VFUN_CRASH_REPORT.md
new file mode 100644
index 0000000..1a7dcb4
--- /dev/null
+++ b/docs/VFUN_CRASH_REPORT.md
@@ -0,0 +1,269 @@
+# vfun CUDA Crash Report
+
+**Date:** 2026-02-02
+**Environment:** Ubuntu Linux, NVIDIA H100 NVL (95GB VRAM)
+**vfun version:** Built from ~/strolid/vfun
+**Model:** parakeet-tdt-110m (NeMo ASR)
+
+---
+
+## Executive Summary
+
+vfun crashes intermittently after processing hundreds of audio files under sustained concurrent load. The crash is caused by a **cuBLAS matrix multiplication failure** (`CUBLAS_STATUS_EXECUTION_FAILED`) that occurs when certain batch combinations create problematic tensor dimensions during the transformer decoder's forward pass.
+
+**Key finding:** Individual files process successfully. The crash only occurs under batched/concurrent workloads when specific tensor dimension combinations are formed.
+
+---
+
+## Symptom
+
+When processing large volumes of audio files (500+) with high concurrency (64 workers), vfun:
+
+1. Processes successfully for a period (typically 100-500+ files)
+2. Suddenly stops responding to requests
+3. Process remains alive but `/ready` endpoint times out
+4. GPU memory drops to 0 MiB (resources released)
+5. Subsequent requests fail silently
+
+The service does not exit or restart automatically - it enters a hung state.
+
+---
+
+## Reproduction Steps
+
+```bash
+# Start vfun
+cd ~/strolid/vfun && ./vfun --port 4380 > /tmp/vfun.log 2>&1 &
+
+# Wait for ready
+sleep 10 && curl http://localhost:4380/ready
+
+# Send concurrent requests (64 parallel workers)
+ls /path/to/wav/files/*.wav | head -500 | \
+ xargs -P 64 -I {} curl -s -X POST \
+ -F "file-binary=@{};type=audio/wav" \
+ http://localhost:4380/wtf
+
+# vfun will crash after processing 100-500 files
+```
+
+**Audio file characteristics:**
+- Format: WAV, 8kHz, 16-bit, mono (standard telephony)
+- Duration: ~60 seconds each
+- Size: ~960KB each
+- Source: Freeswitch call recordings
+
+---
+
+## Error Message
+
+From `/tmp/vfun.log` after crash:
+
+```
+RuntimeError: CUDA error: CUBLAS_STATUS_EXECUTION_FAILED when calling cublasLtMatmul
+with transpose_mat1 1 transpose_mat2 0 m 1024 n 251 k 1024 mat1_ld 1024 mat2_ld 1024
+result_ld 1024 abcType 0 computeType 68 scaleType 0
+```
+
+---
+
+## Full Stack Trace
+
+```
+File "code/__torch__/nemo/collections/asr/modules/transformer/transformer_decoders/___torch_mangle_1124.py", line 27, in forward
+ _2 = (first_sub_layer).forward(_0, _1, argument_2, )
+ input = torch.add_(_2, argument_1)
+ _3 = (second_sub_layer).forward((layer_norm_2).forward(input, ), encoder_embeddings, argument_4, )
+ ~~~~~~~~~~~~~~~~~~~~~~~~~ <--- HERE
+
+File "code/__torch__/nemo/collections/asr/modules/transformer/transformer_modules/___torch_mangle_1118.py", line 23, in forward
+ query_net = self.query_net
+ _0 = (query_net).forward(argument_1, )
+ _1 = (key_net).forward(encoder_embeddings, )
+ ~~~~~~~~~~~~~~~~ <--- HERE
+
+File "code/__torch__/torch/nn/modules/linear/___torch_mangle_1113.py", line 12, in forward
+ bias = self.bias
+ weight = self.weight
+ x = torch.linear(encoder_embeddings, weight, bias)
+ ~~~~~~~~~~~~ <--- HERE
+
+Traceback of TorchScript, original code (most recent call last):
+/home/dev/vfun/.venv/lib/python3.12/site-packages/torch/nn/modules/linear.py(134): forward
+/home/dev/vfun/.venv/lib/python3.12/site-packages/nemo/collections/asr/modules/transformer/transformer_modules.py(174): forward
+/home/dev/vfun/.venv/lib/python3.12/site-packages/nemo/collections/asr/modules/transformer/transformer_decoders.py(98): forward_preln
+/home/dev/vfun/.venv/lib/python3.12/site-packages/nemo/collections/asr/modules/transformer/transformer_decoders.py(158): forward
+/home/dev/vfun/.venv/lib/python3.12/site-packages/nemo/collections/asr/modules/transformer/transformer_decoders.py(255): forward
+/home/dev/vfun/export-scripts/canary-to-torchscript.py(62): forward
+```
+
+---
+
+## Analysis
+
+### What's Happening
+
+1. **Batching behavior:** vfun batches concurrent requests for GPU efficiency using:
+ ```
+ GPU_MAX_BATCH_SIZE = 64
+ GPU_COALESCE_TIMEOUT_US = 5000 (5ms)
+ GPU_COALESCE_MIN_FILL = 16
+ ```
+
+2. **Tensor dimension issue:** When batching audio of varying lengths, the resulting `encoder_embeddings` tensor has shape `[batch, seq_len, 1024]`. The error shows `n=251` which suggests a sequence length dimension.
+
+3. **cuBLAS failure:** The `cublasLtMatmul` call fails when the resulting matrix dimensions hit certain values. This may be related to:
+ - Memory alignment issues at specific sizes
+ - cuBLAS kernel selection for unusual dimensions
+ - Tensor core compatibility with non-standard shapes
+
+4. **No recovery:** After the CUDA error, the GPU context is corrupted. vfun doesn't catch this exception at the HTTP handler level, so it hangs indefinitely.
+
+### Why Single Files Work
+
+When processing files individually:
+- Batch size is always 1
+- Sequence lengths are consistent per-request
+- No cross-request tensor dimension combinations occur
+
+### Suspected Root Causes
+
+1. **Batch padding edge case:** When batching audio of different durations, padding may create tensor dimensions that cuBLAS handles poorly.
+
+2. **Missing dimension validation:** The model may not validate that input dimensions are compatible with cuBLAS kernel requirements before calling `torch.linear()`.
+
+3. **CUDA error handling:** The exception isn't caught and handled gracefully - the service should restart or reset the CUDA context.
+
+---
+
+## Observations
+
+| Test | Result |
+|------|--------|
+| Single file processing | Always succeeds |
+| Sequential processing (1 worker) | Succeeds for 1000+ files |
+| Parallel processing (64 workers) | Crashes after 100-500 files |
+| Same "crash file" sent individually | Succeeds |
+| Reduced concurrency (8 workers) | Still crashes, just takes longer |
+
+**GPU Memory:** Stable at ~12.6GB during processing until crash. No memory leak observed.
+
+**File Characteristics:** Crash files have no distinguishing features - same format, similar duration, similar content to files that succeed.
+
+---
+
+## Suggested Fixes
+
+### 1. Exception Handling (Quick Fix)
+
+Wrap the inference call in try/except and reset CUDA context on failure:
+
+```python
+try:
+ result = model.forward(batch)
+except RuntimeError as e:
+ if "CUDA" in str(e) or "cuBLAS" in str(e):
+ torch.cuda.empty_cache()
+ torch.cuda.synchronize()
+ # Return error response instead of hanging
+ return {"error": "CUDA error, please retry"}
+ raise
+```
+
+### 2. Dimension Validation (Preventive)
+
+Before batching, validate that resulting tensor dimensions are safe:
+
+```python
+def is_safe_batch(sequences):
+ """Check if batch dimensions are compatible with cuBLAS"""
+ max_len = max(len(s) for s in sequences)
+ # Avoid problematic dimensions (may need empirical tuning)
+ if max_len % 8 != 0: # Ensure alignment
+ max_len = ((max_len + 7) // 8) * 8
+ return max_len
+```
+
+### 3. Graceful Degradation
+
+If a batch fails, retry with smaller batch size or process files individually:
+
+```python
+def process_with_fallback(files, batch_size=64):
+ try:
+ return process_batch(files, batch_size)
+ except RuntimeError:
+ if batch_size > 1:
+ # Retry with smaller batches
+ return process_with_fallback(files, batch_size // 2)
+ else:
+ # Process individually as last resort
+ return [process_single(f) for f in files]
+```
+
+### 4. Health Check Watchdog
+
+Add internal watchdog that restarts the service if inference hangs:
+
+```python
+import signal
+
+def timeout_handler(signum, frame):
+ logger.error("Inference timeout - restarting")
+ torch.cuda.empty_cache()
+ os._exit(1) # Force restart
+
+signal.signal(signal.SIGALRM, timeout_handler)
+signal.alarm(30) # 30 second timeout per batch
+```
+
+---
+
+## Current Workaround
+
+We're using an external monitoring script that:
+
+1. Checks `/ready` endpoint every 30 seconds
+2. If unresponsive, kills vfun process (`pkill -9`)
+3. Restarts vfun
+4. Continues processing from checkpoint
+
+This achieves ~3000 files/minute with occasional 10-second restart pauses.
+
+---
+
+## Environment Details
+
+```
+GPU: NVIDIA H100 NVL
+VRAM: 95,320 MiB total
+CUDA: (check with nvidia-smi)
+PyTorch: 2.x (from vfun venv)
+NeMo: (from vfun venv)
+
+vfun config:
+ GPU_MAX_BATCH_SIZE = 64
+ GPU_COALESCE_TIMEOUT_US = 5000
+ GPU_COALESCE_MIN_FILL = 16
+```
+
+---
+
+## Files for Testing
+
+Sample files that trigger the crash (when processed concurrently with others):
+
+```
+/mnt/nas/Freeswitch1/2026-01-19/11/10508_12706498965_993318019641306_2026-01-19_11:14:08.wav
+/mnt/nas/Freeswitch1/2026-01-19/11/10508_17019013723_993313073314983_2026-01-19_11:11:14.wav
+```
+
+Note: These files process successfully individually. The crash occurs when they're part of a batch with other files.
+
+---
+
+## Contact
+
+Report prepared by: Claude Code (assisted investigation)
+System operator: Thomas
+Date: 2026-02-02
diff --git a/docs/VFUN_STRESS_TEST_REPORT.md b/docs/VFUN_STRESS_TEST_REPORT.md
new file mode 100644
index 0000000..af2fcf7
--- /dev/null
+++ b/docs/VFUN_STRESS_TEST_REPORT.md
@@ -0,0 +1,243 @@
+# vfun Stress Test Report
+
+**Date:** 2026-02-04 (Updated)
+**Tester:** Thomas (with Claude Code)
+**vfun Version:** Commit 0431d2d "hackery to avoid crash"
+**Environment:** Ubuntu Linux, NVIDIA H100 NVL (95GB VRAM)
+
+---
+
+## Executive Summary
+
+### Update (2026-02-04): Significant Improvement After Fix
+
+Commit `0431d2d` ("hackery to avoid crash") dramatically improved stability:
+
+| Test | Before Fix (97a9d1c) | After Fix (0431d2d) |
+|------|---------------------|---------------------|
+| 64 workers / 1,000 files | 30.6% (crashed ~306) | **100%** |
+| 64 workers / 5,000 files | N/A | **100%** |
+| 64 workers / 43,940 files | N/A | **100%** |
+| 64 workers / 406,037 files | N/A | 22.3% (crashed ~90k) |
+
+**Key finding:** The fix extended crash threshold from ~300-7000 files to ~90,000 files - a 10-100x improvement. However, crashes still occur on very long runs (400k+ files).
+
+**Recommendation:** Use auto-restart wrapper for production runs exceeding 50k files.
+
+---
+
+## Post-Fix Test Results (2026-02-04)
+
+### Test 1: 64 workers, 1,000 files
+```
+Processed: 1,000 files
+Successful: 1,000 (100.0%)
+Failed: 0
+Throughput: 992 files/min
+```
+
+### Test 2: 64 workers, 5,000 files
+```
+Processed: 5,000 files
+Successful: 5,000 (100.0%)
+Failed: 0
+Throughput: 2,711 files/min
+```
+
+### Test 3: 64 workers, 43,940 files (1 hour of data)
+```
+Processed: 43,940 files
+Successful: 43,940 (100.0%)
+Failed: 0
+Elapsed: 1480.6s (~25 min)
+Throughput: 1,781 files/min
+```
+
+### Test 4: 64 workers, 406,037 files (full day)
+```
+Processed: 406,037 files
+Successful: 90,629 (22.3%)
+Failed: 315,408
+Elapsed: 30632.1s (~8.5 hours)
+```
+**Note:** vfun crashed after ~90k files. Remaining failures are connection refused errors after crash.
+
+### Conclusion
+
+The fix (`0431d2d`) significantly improved stability:
+- **Before:** Crashed after 300-7,000 files depending on concurrency
+- **After:** Stable for 44k files, crashes around 90k files
+
+For production workloads >50k files, use `scripts/run_pipeline_with_restart.sh` which automatically restarts vfun on crash.
+
+---
+
+## Original Report (2026-02-03)
+
+vfun crashes after processing a cumulative number of files regardless of worker/concurrency settings. The crash point scales inversely with concurrency - more workers = faster crash. This indicates **accumulated memory corruption** rather than an immediate concurrency issue.
+
+The original fix (commit 97a9d1c - non-blocking CUDA streams and pinned memory) did not resolve the underlying issue.
+
+---
+
+## Test Results
+
+| Workers | Files Tested | Success Rate | Files Before Crash | Crash Type |
+|---------|-------------|--------------|-------------------|------------|
+| 64 | 1,000 | 30.6% | ~306 | "corrupted double-linked list" |
+| 32 | 1,000 | 13.0% | ~130 | Connection refused (process died) |
+| 24 | 1,000 | 100% | - | No crash |
+| 24 | 5,000 | 62.7% | ~3,134 | Connection refused |
+| 16 | 1,000 | 100% | - | No crash |
+| 16 | 5,000 | 100% | - | No crash |
+| 16 | 43,940 | ~45% | ~6,923 | Connection refused |
+
+### Key Observations
+
+1. **Short tests pass**: 1000-file tests with 16-24 workers complete successfully
+2. **Longer tests crash**: Extended runs eventually crash regardless of concurrency
+3. **Crash threshold scales with concurrency**:
+ - 64 workers: crashes after ~300 files
+ - 16 workers: crashes after ~7000 files
+ - Suggests ~300 * workers ≈ crash threshold (rough approximation)
+
+4. **Performance when stable**:
+ - 16 workers: 920 files/min, 1.0s avg latency
+ - 24 workers: 179 files/min, 7.4s avg latency (slower due to saturation?)
+
+---
+
+## Crash Analysis
+
+### Error Messages Observed
+
+1. **"corrupted double-linked list"** (glibc heap corruption)
+ - Observed with 64 workers
+ - Indicates memory corruption in heap management
+
+2. **Silent death** (no error logged)
+ - Process dies, port becomes unavailable
+ - `/ready` endpoint stops responding
+ - No CUDA error or stack trace in logs
+
+3. **Previous crash type** (from earlier testing):
+ - `CUBLAS_STATUS_EXECUTION_FAILED` during `cublasLtMatmul`
+ - Occurred in transformer decoder's forward pass
+
+### Root Cause Hypothesis
+
+The crash is NOT caused by:
+- Immediate concurrency issues (16 workers is stable for 5000 files)
+- Single problematic files (same files work individually)
+- GPU memory exhaustion (VRAM stable at ~12.6GB)
+
+The crash IS likely caused by:
+- **Accumulated memory corruption** that builds up over many requests
+- Possibly related to batch assembly/disassembly
+- May be in queue management, tensor allocation, or FFmpeg decoding
+- The corruption eventually corrupts glibc's heap metadata, causing "corrupted double-linked list"
+
+---
+
+## Recent Fixes Applied (Not Sufficient)
+
+### Commit 97a9d1c (2026-02-03 18:03)
+```
+fix: use non-blocking streams and pinned memory for lens transfer
+
+- Convert all cudaStreamCreate calls to cudaStreamCreateWithFlags(..., cudaStreamNonBlocking)
+- Add pinned host memory and device memory for batch lengths
+- Replace synchronous lens tensor creation with async H2D transfer
+```
+
+### Commit f924c67 (2026-02-03 17:38)
+```
+refactor: replace all malloc/realloc/calloc with _or_die variants
+
+- All allocations now fail-fast on out-of-memory errors
+```
+
+### Commit fcf2c2e (2026-02-03 17:09)
+```
+Fix thread explosion: Limit MHD and OpenMP threads to 16
+```
+
+These fixes address specific issues but don't resolve the accumulated corruption.
+
+---
+
+## Suggested Investigation Areas
+
+1. **Queue management** (`src/queue.c`)
+ - Check for use-after-free or double-free
+ - Verify thread-safe access to shared queues
+ - Look for buffer overflows in batch assembly
+
+2. **Tensor lifecycle**
+ - Ensure tensors are properly freed after each batch
+ - Check for leaks in error paths
+
+3. **FFmpeg integration**
+ - `av_malloc_or_die` wrapper - verify all allocations freed
+ - Check for leaks in audio decoding path
+
+4. **Memory debugging**
+ - Run with AddressSanitizer: `ASAN_OPTIONS=detect_leaks=1`
+ - Run with Valgrind (may be slow with CUDA)
+ - Add heap corruption detection: `MALLOC_CHECK_=3`
+
+---
+
+## Workaround for Production
+
+Use the auto-restart wrapper script that monitors health and restarts on crash:
+
+```bash
+./scripts/run_pipeline_with_restart.sh 2026-01-19 16 5000
+```
+
+This achieves sustained throughput by:
+1. Processing in batches of 5000 files
+2. Checking `/ready` endpoint after each batch
+3. Restarting vfun if unresponsive
+4. Continuing from checkpoint
+
+Expected throughput with restarts: ~800-900 files/min sustained
+
+---
+
+## Test Commands Used
+
+```bash
+# Start vfun
+cd ~/strolid/vfun && ./vfun --port 4380 > /tmp/vfun_4380.log 2>&1 &
+
+# Run stress test
+python3 scripts/nas_transcription_pipeline.py \
+ --date 2026-01-19 \
+ --hour 06 \
+ --server 1 \
+ --workers 16 \
+ --limit 5000
+
+# Check health
+curl -s http://localhost:4380/ready
+
+# View crash log
+tail -50 /tmp/vfun_4380.log
+```
+
+---
+
+## Appendix: Test Data
+
+- **Source**: `/mnt/nas/Freeswitch1/2026-01-19/06/`
+- **File count**: 43,940 WAV files
+- **File format**: 8kHz, 16-bit, mono (standard telephony)
+- **Avg duration**: ~60 seconds
+- **Avg size**: ~960KB
+
+---
+
+**Report prepared by:** Claude Code
+**Contact:** Thomas
diff --git a/scripts/find_bad_file.py b/scripts/find_bad_file.py
new file mode 100644
index 0000000..8bf7675
--- /dev/null
+++ b/scripts/find_bad_file.py
@@ -0,0 +1,66 @@
+#!/usr/bin/env python3
+"""Find the specific file that crashes vfun"""
+
+import os
+import sys
+import time
+import requests
+from pathlib import Path
+
+VFUN_URL = "http://localhost:4380/wtf"
+
+def test_file(filepath):
+ """Test a single file, return True if successful"""
+ try:
+ with open(filepath, 'rb') as f:
+ response = requests.post(
+ VFUN_URL,
+ files={"file-binary": ("audio.wav", f, "audio/wav")},
+ timeout=60
+ )
+ return response.status_code == 200
+ except Exception as e:
+ return False
+
+def is_vfun_alive():
+ """Check if vfun is responding"""
+ try:
+ r = requests.get("http://localhost:4380/ready", timeout=5)
+ return r.status_code == 200
+ except:
+ return False
+
+def main():
+ # Get files from hour 11
+ base_dir = Path("/mnt/nas/Freeswitch1/2026-01-19/11")
+ files = sorted(base_dir.glob("*.wav"))[:2000] # First 2000 files
+
+ print(f"Testing {len(files)} files to find crash point...")
+ print("")
+
+ last_good = None
+ for i, filepath in enumerate(files):
+ if not is_vfun_alive():
+ print(f"\n!!! vfun CRASHED after file #{i}")
+ print(f"Last good file: {last_good}")
+ print(f"Crash likely caused by: {filepath}")
+ print(f"Previous file: {files[i-1] if i > 0 else 'N/A'}")
+ return
+
+ success = test_file(filepath)
+ if success:
+ last_good = filepath
+ if i % 100 == 0:
+ print(f"Progress: {i}/{len(files)} - OK")
+ else:
+ if not is_vfun_alive():
+ print(f"\n!!! vfun CRASHED processing file #{i}")
+ print(f"Crash file: {filepath}")
+ return
+ else:
+ print(f"File #{i} failed but vfun still alive: {filepath}")
+
+ print(f"\nAll {len(files)} files processed successfully!")
+
+if __name__ == "__main__":
+ main()
diff --git a/scripts/nas_stress_test.py b/scripts/nas_stress_test.py
new file mode 100644
index 0000000..a855d0f
--- /dev/null
+++ b/scripts/nas_stress_test.py
@@ -0,0 +1,155 @@
+#!/usr/bin/env python3
+"""High-concurrency stress test for vfun using NAS audio files"""
+
+import time
+import requests
+import concurrent.futures
+import statistics
+import os
+from pathlib import Path
+from datetime import datetime
+import random
+
+VFUN_URL = "http://localhost:4380"
+NAS_PATH = "/mnt/nas/Freeswitch1/2026-01-19/06"
+
+def find_nas_wav_files(max_files=500):
+ """Find wav files in NAS directory"""
+ nas_dir = Path(NAS_PATH)
+ if not nas_dir.exists():
+ print(f"ERROR: NAS path not found: {NAS_PATH}")
+ return []
+
+ wav_files = list(nas_dir.glob("*.wav"))[:max_files]
+ return wav_files
+
+def single_request(audio_file):
+ """Make a single transcription request"""
+ start = time.time()
+ try:
+ with open(audio_file, 'rb') as f:
+ files = {'file-binary': (audio_file.name, f, 'audio/wav')}
+ resp = requests.post(f"{VFUN_URL}/wtf", files=files, timeout=120)
+ elapsed = time.time() - start
+ if resp.status_code == 200:
+ result = resp.json()
+ text = result.get('text', '')
+ return (True, elapsed, len(text), audio_file.stat().st_size)
+ return (False, elapsed, 0, audio_file.stat().st_size)
+ except Exception as e:
+ elapsed = time.time() - start
+ return (False, elapsed, 0, 0)
+
+def run_stress_test(num_files=100, concurrent_workers=32):
+ """Run stress test with high concurrency"""
+ print(f"\n{'='*70}")
+ print(f"VFUN NAS STRESS TEST - {num_files} files, {concurrent_workers} concurrent workers")
+ print(f"Source: {NAS_PATH}")
+ print(f"{'='*70}")
+
+ # Health check
+ try:
+ resp = requests.get(f"{VFUN_URL}/ready", timeout=5)
+ print(f"Server status: {resp.json()}")
+ except Exception as e:
+ print(f"Server not ready: {e}")
+ return
+
+ # Find audio files
+ all_files = find_nas_wav_files(max_files=num_files * 2)
+ print(f"Found {len(all_files)} wav files on NAS")
+
+ if len(all_files) == 0:
+ print("ERROR: No audio files found!")
+ return
+
+ if len(all_files) < num_files:
+ # Repeat files if needed
+ test_files = all_files * ((num_files // len(all_files)) + 1)
+ else:
+ test_files = all_files
+
+ # Shuffle and select
+ random.shuffle(test_files)
+ test_files = test_files[:num_files]
+
+ total_size = sum(f.stat().st_size for f in test_files)
+ print(f"Testing with {len(test_files)} files ({total_size / (1024*1024):.2f} MB)")
+ print(f"Concurrent workers: {concurrent_workers}")
+ print(f"\nStarting at: {datetime.now().isoformat()}")
+ print("-" * 70)
+
+ times = []
+ successes = 0
+ errors = 0
+ total_text = 0
+ total_bytes = 0
+
+ start_total = time.time()
+ completed = 0
+
+ with concurrent.futures.ThreadPoolExecutor(max_workers=concurrent_workers) as executor:
+ futures = {executor.submit(single_request, f): f for f in test_files}
+
+ for future in concurrent.futures.as_completed(futures):
+ success, elapsed, text_len, file_size = future.result()
+ times.append(elapsed)
+ total_bytes += file_size
+ completed += 1
+
+ if success:
+ successes += 1
+ total_text += text_len
+ else:
+ errors += 1
+
+ # Progress update every 10 files
+ if completed % 10 == 0:
+ elapsed_total = time.time() - start_total
+ rate = completed / elapsed_total if elapsed_total > 0 else 0
+ print(f" Progress: {completed}/{num_files} ({rate:.1f} files/sec, {successes} ok, {errors} err)")
+
+ total_time = time.time() - start_total
+
+ print("-" * 70)
+ print(f"Finished at: {datetime.now().isoformat()}")
+ print(f"\n{'='*70}")
+ print("RESULTS")
+ print(f"{'='*70}")
+ print(f" Files processed: {len(test_files)}")
+ print(f" Success rate: {successes}/{len(test_files)} ({100*successes/len(test_files):.1f}%)")
+ print(f" Errors: {errors}")
+ print(f" Total data: {total_bytes / (1024*1024):.2f} MB")
+ print(f" Total text: {total_text} chars")
+ print()
+ print(f" Total time: {total_time:.2f}s")
+ print(f" Throughput: {len(test_files)/total_time:.2f} files/sec")
+ print(f" Data rate: {total_bytes / (1024*1024) / total_time:.2f} MB/sec")
+ print()
+ if times:
+ print(f" Min latency: {min(times):.3f}s")
+ print(f" Max latency: {max(times):.3f}s")
+ print(f" Avg latency: {statistics.mean(times):.3f}s")
+ print(f" P50 latency: {statistics.median(times):.3f}s")
+ print(f" P95 latency: {sorted(times)[int(len(times)*0.95)]:.3f}s")
+ print(f" P99 latency: {sorted(times)[int(len(times)*0.99)]:.3f}s")
+ if len(times) > 1:
+ print(f" Std dev: {statistics.stdev(times):.3f}s")
+ print(f"{'='*70}")
+
+ # Estimate batching efficiency
+ avg_latency = statistics.mean(times) if times else 0
+ theoretical_serial = avg_latency * len(test_files)
+ actual_parallel = total_time
+ parallelism = theoretical_serial / actual_parallel if actual_parallel > 0 else 0
+ print(f"\nBATCHING ANALYSIS:")
+ print(f" Effective parallelism: {parallelism:.1f}x")
+ print(f" (If batching works well, this should be > {concurrent_workers}x)")
+ print(f" Theoretical serial time: {theoretical_serial:.1f}s")
+ print(f" Actual parallel time: {actual_parallel:.1f}s")
+
+if __name__ == "__main__":
+ import sys
+ num_files = int(sys.argv[1]) if len(sys.argv) > 1 else 100
+ workers = int(sys.argv[2]) if len(sys.argv) > 2 else 32
+ run_stress_test(num_files=num_files, concurrent_workers=workers)
diff --git a/scripts/nas_transcription_pipeline.py b/scripts/nas_transcription_pipeline.py
new file mode 100644
index 0000000..434174d
--- /dev/null
+++ b/scripts/nas_transcription_pipeline.py
@@ -0,0 +1,753 @@
+#!/usr/bin/env python3
+"""
+Production NAS Transcription Pipeline
+
+High-throughput transcription pipeline for processing phone call recordings
+from NAS storage using the vfun transcription server.
+
+Performance characteristics (single vfun instance):
+- Throughput: ~2,500 files/minute
+- Latency: ~0.4s per file
+- Capacity: ~3.5M files/day per vfun instance
+
+Usage:
+ # Process specific date/hour (transcription only)
+ python3 nas_transcription_pipeline.py --date 2026-01-19 --hour 06 --limit 100
+
+ # Process and store vCons in vcon-server
+ python3 nas_transcription_pipeline.py --date 2026-01-19 --store-vcons
+
+ # Process all files from a date
+ python3 nas_transcription_pipeline.py --date 2026-01-19 --workers 16
+
+ # Dry run to see file counts
+ python3 nas_transcription_pipeline.py --date 2026-01-19 --dry-run
+
+ # Save results to JSON
+ python3 nas_transcription_pipeline.py --date 2026-01-19 --output results.json
+"""
+
+import os
+import sys
+import time
+import json
+import uuid
+import queue
+import threading
+import argparse
+import logging
+from pathlib import Path
+from datetime import datetime, timezone
+from concurrent.futures import ThreadPoolExecutor, as_completed
+from dataclasses import dataclass, asdict
+from typing import Optional, List, Dict, Any
+import requests
+
+# ============================================================================
+# Configuration
+# ============================================================================
+
+# vfun transcription servers
+VFUN_URLS = [
+ "http://localhost:4380/wtf",
+ # "http://localhost:4381/wtf", # Multiple instances hurt performance on single GPU
+ # "http://localhost:4382/wtf",
+ # "http://localhost:4383/wtf",
+]
+
+# vcon-server API
+VCON_API_URL = "http://localhost:8080/vcon"
+VCON_INGRESS_LIST = "default" # Ingress list for processed vCons
+
+# NAS configuration
+NAS_BASE = "/mnt/nas"
+FREESWITCH_SERVERS = list(range(1, 21)) # Freeswitch1 through Freeswitch20
+
+# Performance tuning
+DEFAULT_WORKERS = 12 # Concurrent transcription workers
+REQUEST_TIMEOUT = 300 # Transcription timeout (seconds)
+MAX_RETRIES = 3 # Retries for NFS errors
+RETRY_DELAY = 0.1 # Base delay between retries (seconds)
+VCON_STORE_TIMEOUT = 30 # Timeout for storing vCons
+
+# Logging
+logging.basicConfig(
+ level=logging.INFO,
+ format='%(asctime)s - %(levelname)s - %(message)s'
+)
+logger = logging.getLogger(__name__)
+
+
+# ============================================================================
+# Data Classes
+# ============================================================================
+
+@dataclass
+class TranscriptionResult:
+ filepath: str
+ success: bool
+ duration: float
+ text: str = ""
+ text_length: int = 0
+ error: Optional[str] = None
+ language: Optional[str] = None
+ vcon_uuid: Optional[str] = None
+ vcon_stored: bool = False
+
+ def to_dict(self) -> Dict:
+ return {
+ "filepath": self.filepath,
+ "filename": Path(self.filepath).name,
+ "success": self.success,
+ "duration": round(self.duration, 3),
+ "text_length": self.text_length,
+ "language": self.language,
+ "error": self.error,
+ "vcon_uuid": self.vcon_uuid,
+ "vcon_stored": self.vcon_stored,
+ }
+
+
+@dataclass
+class FileMetadata:
+ """Metadata parsed from NAS filename."""
+ campaign_id: str
+ caller_number: str
+ call_id: str
+ call_date: str
+ call_time: str
+ freeswitch_server: str
+
+ @classmethod
+ def from_filepath(cls, filepath: str) -> Optional['FileMetadata']:
+ """Parse metadata from NAS file path.
+
+ Expected format: /mnt/nas/Freeswitch{N}/{date}/{hour}/{campaign}_{caller}_{callid}_{date}_{time}.wav
+ Example: /mnt/nas/Freeswitch1/2026-01-19/06/6075_18557533609_993315706043435_2026-01-19_06:05:02.wav
+ """
+ try:
+ path = Path(filepath)
+ filename = path.stem # Without .wav
+
+ # Parse Freeswitch server from path
+ parts = path.parts
+ fs_server = None
+ for part in parts:
+ if part.startswith("Freeswitch"):
+ fs_server = part
+ break
+
+ # Parse filename: campaign_caller_callid_date_time
+ # Example: 6075_18557533609_993315706043435_2026-01-19_06:05:02
+ segments = filename.split('_')
+ if len(segments) >= 5:
+ campaign_id = segments[0]
+ caller_number = segments[1]
+ call_id = segments[2]
+ call_date = segments[3]
+ call_time = segments[4].replace(':', '-') # Normalize time format
+
+ return cls(
+ campaign_id=campaign_id,
+ caller_number=caller_number,
+ call_id=call_id,
+ call_date=call_date,
+ call_time=call_time,
+ freeswitch_server=fs_server or "unknown"
+ )
+ except Exception as e:
+ logger.debug(f"Failed to parse metadata from {filepath}: {e}")
+
+ return None
+
+
+@dataclass
+class PipelineStats:
+ """Pipeline execution statistics."""
+ start_time: float
+ processed: int = 0
+ successful: int = 0
+ failed: int = 0
+ total_transcription_time: float = 0.0
+ total_text_chars: int = 0
+ vcons_created: int = 0
+ vcons_stored: int = 0
+
+ def record(self, result: TranscriptionResult):
+ self.processed += 1
+ self.total_transcription_time += result.duration
+ if result.success:
+ self.successful += 1
+ self.total_text_chars += result.text_length
+ if result.vcon_uuid:
+ self.vcons_created += 1
+ if result.vcon_stored:
+ self.vcons_stored += 1
+ else:
+ self.failed += 1
+
+ def to_dict(self) -> Dict:
+ elapsed = time.time() - self.start_time
+ return {
+ "processed": self.processed,
+ "successful": self.successful,
+ "failed": self.failed,
+ "success_rate": round(self.successful / self.processed * 100, 1) if self.processed > 0 else 0,
+ "elapsed_seconds": round(elapsed, 1),
+ "throughput_per_min": round(self.processed / elapsed * 60, 1) if elapsed > 0 else 0,
+ "avg_latency": round(self.total_transcription_time / self.processed, 3) if self.processed > 0 else 0,
+ "total_text_chars": self.total_text_chars,
+ "vcons_created": self.vcons_created,
+ "vcons_stored": self.vcons_stored,
+ }
+
+
+# ============================================================================
+# vCon Creation
+# ============================================================================
+
+def create_vcon(
+ filepath: str,
+ metadata: Optional[FileMetadata],
+ transcription_text: str,
+ language: str,
+ audio_duration: float = 60.0
+) -> Dict[str, Any]:
+ """Create a vCon document from transcription result.
+
+ Args:
+ filepath: Path to the audio file
+ metadata: Parsed file metadata
+ transcription_text: The transcribed text
+ language: Detected language code
+ audio_duration: Duration of audio in seconds
+
+ Returns:
+ vCon dictionary ready for submission to vcon-server
+ """
+ vcon_uuid = str(uuid.uuid4())
+ now = datetime.now(timezone.utc).isoformat()
+
+ # Build parties from metadata
+ parties = []
+ if metadata:
+ # Caller party
+ caller_tel = metadata.caller_number
+ if not caller_tel.startswith('+'):
+ caller_tel = f"+1{caller_tel}" if len(caller_tel) == 10 else f"+{caller_tel}"
+ parties.append({
+ "tel": caller_tel,
+ "name": "Caller",
+ "meta": {"role": "customer"}
+ })
+ # Agent party (placeholder)
+ parties.append({
+ "tel": "+10000000000",
+ "name": "Agent",
+ "meta": {"role": "agent"}
+ })
+ else:
+ parties = [
+ {"tel": "+10000000001", "name": "Party 1"},
+ {"tel": "+10000000002", "name": "Party 2"}
+ ]
+
+ # Build dialog with file:// URL reference (no embedding)
+ dialog_start = now
+ if metadata:
+ try:
+ # call_time is stored as HH-MM-SS, convert to HH:MM:SSZ
+ dialog_start = f"{metadata.call_date}T{metadata.call_time.replace('-', ':')}Z"
+ except:
+ pass
+
+ dialog = [{
+ "type": "recording",
+ "start": dialog_start,
+ "parties": [0, 1],
+ "duration": audio_duration,
+ "mimetype": "audio/wav",
+ "url": f"file://{filepath}", # Reference file instead of embedding
+ }]
+
+ # Build WTF transcription analysis
+ analysis_body = {
+ "transcript": {
+ "text": transcription_text,
+ "language": language,
+ "duration": audio_duration,
+ "confidence": 0.9,
+ },
+ "segments": [],
+ "metadata": {
+ "created_at": now,
+ "processed_at": now,
+ "provider": "vfun",
+ "model": "parakeet-tdt-110m",
+ "audio": {"duration": audio_duration},
+ },
+ "quality": {
+ "average_confidence": 0.9,
+ "multiple_speakers": True,
+ "low_confidence_words": 0,
+ }
+ }
+ analysis = [{
+ "type": "wtf_transcription",
+ "dialog": 0,
+ "vendor": "vfun",
+ "product": "parakeet-tdt-110m",
+ "schema": "wtf-1.0",
+ "encoding": "json",
+ "body": json.dumps(analysis_body),
+ }]
+
+ # Build attachments with source metadata
+ attachments = []
+ if metadata:
+ attachment_body = {
+ "freeswitch_server": metadata.freeswitch_server,
+ "campaign_id": metadata.campaign_id,
+ "call_id": metadata.call_id,
+ "source_file": filepath,
+ }
+ attachments.append({
+ "type": "source_metadata",
+ "encoding": "json",
+ "body": json.dumps(attachment_body),
+ })
+
+ return {
+ "vcon": "0.0.1",
+ "uuid": vcon_uuid,
+ "created_at": now,
+ "parties": parties,
+ "dialog": dialog,
+ "analysis": analysis,
+ "attachments": attachments,
+ "group": [],
+ "redacted": {},
+ }
+
+
+def store_vcon(vcon: Dict[str, Any], ingress_list: Optional[str] = None) -> bool:
+ """Store a vCon in vcon-server.
+
+ Args:
+ vcon: The vCon document to store
+ ingress_list: Optional ingress list to add the vCon to
+
+ Returns:
+ True if stored successfully, False otherwise
+ """
+ try:
+ params = {}
+ if ingress_list:
+ params["ingress_lists"] = [ingress_list]
+
+ response = requests.post(
+ VCON_API_URL,
+ params=params,
+ json=vcon,
+ timeout=VCON_STORE_TIMEOUT
+ )
+
+ if response.status_code == 201:
+ return True
+ else:
+ logger.warning(f"Failed to store vCon {vcon['uuid']}: {response.status_code} {response.text[:100]}")
+ return False
+
+ except Exception as e:
+ logger.warning(f"Error storing vCon {vcon['uuid']}: {e}")
+ return False
+
+
+# ============================================================================
+# Load Balancer
+# ============================================================================
+
+class VfunLoadBalancer:
+ """Thread-safe round-robin load balancer for vfun instances."""
+
+ def __init__(self, urls: List[str]):
+ self.urls = urls
+ self.index = 0
+ self.lock = threading.Lock()
+ self._check_health()
+
+ def _check_health(self):
+ """Check which vfun instances are healthy."""
+ healthy = []
+ for url in self.urls:
+ try:
+ ready_url = url.replace("/wtf", "/ready")
+ resp = requests.get(ready_url, timeout=5)
+ if resp.status_code == 200:
+ healthy.append(url)
+ logger.info(f"vfun healthy: {url}")
+ else:
+ logger.warning(f"vfun unhealthy: {url} (status {resp.status_code})")
+ except Exception as e:
+ logger.warning(f"vfun unavailable: {url} ({e})")
+
+ if not healthy:
+ raise RuntimeError("No healthy vfun instances available!")
+
+ self.urls = healthy
+
+ def get_url(self) -> str:
+ with self.lock:
+ url = self.urls[self.index % len(self.urls)]
+ self.index += 1
+ return url
+
+
+# ============================================================================
+# Transcription
+# ============================================================================
+
+def transcribe_file(
+ filepath: str,
+ vfun_lb: VfunLoadBalancer,
+ store_vcons: bool = False,
+ ingress_list: Optional[str] = None
+) -> TranscriptionResult:
+ """Transcribe a single audio file with retry logic for NFS errors.
+
+ Args:
+ filepath: Path to the audio file
+ vfun_lb: Load balancer for vfun instances
+ store_vcons: Whether to create and store a vCon
+ ingress_list: Optional ingress list for the vCon
+
+ Returns:
+ TranscriptionResult with transcription and optional vCon info
+ """
+ start = time.time()
+ last_error = None
+ audio_data = None
+ file_size = 0
+
+ for attempt in range(MAX_RETRIES):
+ try:
+ if attempt > 0:
+ time.sleep(RETRY_DELAY * attempt)
+
+ with open(filepath, 'rb') as f:
+ audio_data = f.read()
+ file_size = len(audio_data)
+
+ url = vfun_lb.get_url()
+ response = requests.post(
+ url,
+ files={"file-binary": ("audio.wav", audio_data, "audio/wav")},
+ timeout=REQUEST_TIMEOUT
+ )
+
+ duration = time.time() - start
+
+ if response.status_code == 200:
+ data = response.json()
+ text = data.get("text", "")
+ language = data.get("language", "en")
+
+ result = TranscriptionResult(
+ filepath=filepath,
+ success=True,
+ duration=duration,
+ text=text,
+ text_length=len(text),
+ language=language
+ )
+
+ # Create and store vCon if requested
+ if store_vcons and text:
+ # Estimate audio duration from file size (8kHz 16-bit mono)
+ audio_duration = file_size / 16000.0 if file_size > 0 else 60.0
+
+ metadata = FileMetadata.from_filepath(filepath)
+ vcon = create_vcon(
+ filepath=filepath,
+ metadata=metadata,
+ transcription_text=text,
+ language=language,
+ audio_duration=audio_duration
+ )
+ result.vcon_uuid = vcon["uuid"]
+
+ if store_vcon(vcon, ingress_list):
+ result.vcon_stored = True
+
+ return result
+ else:
+ last_error = f"HTTP {response.status_code}: {response.text[:100]}"
+
+ except OSError as e:
+ last_error = str(e)
+ continue
+ except requests.exceptions.Timeout:
+ last_error = "Request timeout"
+ continue
+ except Exception as e:
+ last_error = str(e)
+ break
+
+ return TranscriptionResult(
+ filepath=filepath,
+ success=False,
+ duration=time.time() - start,
+ error=last_error
+ )
+
+
+# ============================================================================
+# File Discovery
+# ============================================================================
+
+def find_audio_files(
+ base_path: str,
+ date: Optional[str] = None,
+ hour: Optional[str] = None,
+ servers: Optional[List[int]] = None,
+ limit: Optional[int] = None
+) -> List[str]:
+ """Find audio files in the NAS Freeswitch structure."""
+ files = []
+ servers = servers or FREESWITCH_SERVERS
+
+ for server_num in servers:
+ server_path = Path(base_path) / f"Freeswitch{server_num}"
+ if not server_path.exists():
+ continue
+
+ if date and hour:
+ search_path = server_path / date / hour
+ if search_path.exists():
+ for f in search_path.iterdir():
+ if f.suffix == '.wav':
+ files.append(str(f))
+ if limit and len(files) >= limit:
+ return files
+ elif date:
+ date_path = server_path / date
+ if date_path.exists():
+ for hour_dir in date_path.iterdir():
+ if hour_dir.is_dir():
+ try:
+ for f in hour_dir.iterdir():
+ if f.suffix == '.wav':
+ files.append(str(f))
+ if limit and len(files) >= limit:
+ return files
+ except PermissionError:
+ logger.warning(f"Permission denied: {hour_dir}")
+ continue
+ else:
+ for f in server_path.rglob("*.wav"):
+ files.append(str(f))
+ if limit and len(files) >= limit:
+ return files
+
+ return files
+
+
+# ============================================================================
+# Pipeline Execution
+# ============================================================================
+
+def run_pipeline(
+ files: List[str],
+ workers: int,
+ vfun_lb: VfunLoadBalancer,
+ verbose: bool = False,
+ store_vcons: bool = False,
+ ingress_list: Optional[str] = None
+) -> tuple[List[TranscriptionResult], PipelineStats]:
+ """Execute the transcription pipeline.
+
+ Args:
+ files: List of audio file paths to process
+ workers: Number of concurrent workers
+ vfun_lb: Load balancer for vfun instances
+ verbose: Whether to print detailed output
+ store_vcons: Whether to create and store vCons
+ ingress_list: Optional ingress list for vCons
+
+ Returns:
+ Tuple of (results list, stats)
+ """
+ stats = PipelineStats(start_time=time.time())
+ results = []
+ lock = threading.Lock()
+
+ def process_and_record(filepath):
+ result = transcribe_file(filepath, vfun_lb, store_vcons, ingress_list)
+ with lock:
+ stats.record(result)
+ results.append(result)
+
+ if verbose:
+ status = "✓" if result.success else "✗"
+ filename = Path(filepath).name[:50]
+ print(f" {status} {filename}: {result.duration:.2f}s", end="")
+ if result.success:
+ extra = f" ({result.text_length} chars, {result.language})"
+ if result.vcon_stored:
+ extra += f" [vCon: {result.vcon_uuid[:8]}...]"
+ print(extra)
+ else:
+ print(f" ERROR: {result.error}")
+ else:
+ if result.vcon_stored:
+ sys.stdout.write("V") # V for vCon stored
+ elif result.success:
+ sys.stdout.write(".")
+ else:
+ sys.stdout.write("X")
+ sys.stdout.flush()
+
+ return result
+
+ with ThreadPoolExecutor(max_workers=workers) as executor:
+ futures = [executor.submit(process_and_record, f) for f in files]
+ for future in as_completed(futures):
+ future.result() # Raise any exceptions
+
+ if not verbose:
+ print()
+
+ return results, stats
+
+
+# ============================================================================
+# Main
+# ============================================================================
+
+def main():
+ parser = argparse.ArgumentParser(
+ description="Production NAS transcription pipeline",
+ formatter_class=argparse.RawDescriptionHelpFormatter,
+ epilog=__doc__
+ )
+ parser.add_argument("--date", help="Process specific date (YYYY-MM-DD)")
+ parser.add_argument("--hour", help="Process specific hour (00-23)")
+ parser.add_argument("--server", type=int, action="append",
+ help="Specific Freeswitch server number (can repeat)")
+ parser.add_argument("--limit", type=int, help="Limit files to process")
+ parser.add_argument("--workers", type=int, default=DEFAULT_WORKERS,
+ help=f"Concurrent workers (default: {DEFAULT_WORKERS})")
+ parser.add_argument("--verbose", "-v", action="store_true",
+ help="Verbose output")
+ parser.add_argument("--dry-run", action="store_true",
+ help="Find files but don't process")
+ parser.add_argument("--output", "-o", help="Save results to JSON file")
+ parser.add_argument("--store-vcons", action="store_true",
+ help="Create and store vCons in vcon-server")
+ parser.add_argument("--ingress-list", default=VCON_INGRESS_LIST,
+ help=f"Ingress list for vCons (default: {VCON_INGRESS_LIST})")
+ args = parser.parse_args()
+
+ # Header
+ print("=" * 70)
+ print("NAS TRANSCRIPTION PIPELINE")
+ print("=" * 70)
+
+ # Initialize vfun load balancer
+ try:
+ vfun_lb = VfunLoadBalancer(VFUN_URLS)
+ except RuntimeError as e:
+ print(f"ERROR: {e}")
+ sys.exit(1)
+
+ print(f"vfun instances: {len(vfun_lb.urls)}")
+ print(f"Workers: {args.workers}")
+ print(f"Filters: date={args.date or 'all'}, hour={args.hour or 'all'}")
+ if args.limit:
+ print(f"Limit: {args.limit} files")
+ if args.store_vcons:
+ print(f"vCon storage: ENABLED (ingress: {args.ingress_list})")
+ print()
+
+ # Find files
+ logger.info("Scanning NAS for audio files...")
+ files = find_audio_files(
+ NAS_BASE,
+ date=args.date,
+ hour=args.hour,
+ servers=args.server,
+ limit=args.limit
+ )
+ print(f"Found {len(files):,} audio files")
+
+ if not files:
+ print("No files found!")
+ sys.exit(0)
+
+ if args.dry_run:
+ print("\nDry run - sample files:")
+ for f in files[:10]:
+ print(f" {f}")
+ if len(files) > 10:
+ print(f" ... and {len(files) - 10:,} more")
+ sys.exit(0)
+
+ # Run pipeline
+ print(f"\nProcessing {len(files):,} files...")
+ if args.store_vcons:
+ print("Legend: V=vCon stored, .=transcribed, X=failed")
+ print("-" * 70)
+
+ results, stats = run_pipeline(
+ files,
+ args.workers,
+ vfun_lb,
+ verbose=args.verbose,
+ store_vcons=args.store_vcons,
+ ingress_list=args.ingress_list if args.store_vcons else None
+ )
+
+ # Summary
+ summary = stats.to_dict()
+ print("\n" + "=" * 70)
+ print("RESULTS")
+ print("=" * 70)
+ print(f"Processed: {summary['processed']:,} files")
+ print(f"Successful: {summary['successful']:,} ({summary['success_rate']}%)")
+ print(f"Failed: {summary['failed']:,}")
+ print(f"Elapsed: {summary['elapsed_seconds']}s")
+ print(f"Throughput: {summary['throughput_per_min']:,.0f} files/min")
+ print(f"Avg latency: {summary['avg_latency']}s per file")
+ print(f"Total text: {summary['total_text_chars']:,} characters")
+ if args.store_vcons:
+ print(f"vCons created: {summary['vcons_created']:,}")
+ print(f"vCons stored: {summary['vcons_stored']:,}")
+
+ # Errors
+ errors = [r for r in results if not r.success]
+ if errors:
+ print(f"\nErrors ({len(errors)}):")
+ for e in errors[:5]:
+ print(f" {Path(e.filepath).name}: {e.error}")
+ if len(errors) > 5:
+ print(f" ... and {len(errors) - 5} more")
+
+ # Save results
+ if args.output:
+ output_data = {
+ "stats": summary,
+ "config": {
+ "date": args.date,
+ "hour": args.hour,
+ "workers": args.workers,
+ "vfun_instances": len(vfun_lb.urls),
+ },
+ "results": [r.to_dict() for r in results],
+ }
+ with open(args.output, 'w') as f:
+ json.dump(output_data, f, indent=2)
+ print(f"\nResults saved to: {args.output}")
+
+
+if __name__ == "__main__":
+ main()
diff --git a/scripts/run_pipeline_with_restart.sh b/scripts/run_pipeline_with_restart.sh
new file mode 100755
index 0000000..0c46e55
--- /dev/null
+++ b/scripts/run_pipeline_with_restart.sh
@@ -0,0 +1,82 @@
+#!/bin/bash
+# Auto-restart pipeline when vfun crashes
+
+DATE="${1:-2026-01-19}"
+WORKERS="${2:-64}"
+BATCH_SIZE="${3:-5000}"
+
+echo "=== Pipeline with Auto-Restart ==="
+echo "Date: $DATE"
+echo "Workers: $WORKERS"
+echo "Batch size: $BATCH_SIZE"
+echo ""
+
+restart_vfun() {
+ echo "$(date +%H:%M:%S) Restarting vfun..."
+ pkill -9 -f "vfun --port 4380" 2>/dev/null
+ sleep 2
+ cd ~/strolid/vfun && ./vfun --port 4380 > /tmp/vfun_4380.log 2>&1 &
+ sleep 18
+ if curl -s http://localhost:4380/ready > /dev/null; then
+ echo "$(date +%H:%M:%S) vfun ready"
+ return 0
+ else
+ echo "$(date +%H:%M:%S) vfun failed to start"
+ return 1
+ fi
+}
+
+# Initial start
+restart_vfun || exit 1
+
+TOTAL_PROCESSED=0
+TOTAL_VCCONS=0
+BATCH=1
+
+while true; do
+ echo ""
+ echo "=== Batch $BATCH (limit $BATCH_SIZE) ==="
+
+ # Run pipeline
+ OUTPUT=$(python3 scripts/nas_transcription_pipeline.py \
+ --date "$DATE" \
+ --server 1 \
+ --workers "$WORKERS" \
+ --limit "$BATCH_SIZE" \
+ --store-vcons 2>&1)
+
+ # Extract stats
+ PROCESSED=$(echo "$OUTPUT" | grep "Processed:" | grep -oE '[0-9,]+' | head -1 | tr -d ',')
+ VCCONS=$(echo "$OUTPUT" | grep "vCons stored:" | grep -oE '[0-9,]+' | tr -d ',')
+ FAILED=$(echo "$OUTPUT" | grep "Failed:" | grep -oE '[0-9,]+' | tr -d ',')
+
+ if [ -n "$PROCESSED" ]; then
+ TOTAL_PROCESSED=$((TOTAL_PROCESSED + PROCESSED))
+ TOTAL_VCCONS=$((TOTAL_VCCONS + VCCONS))
+ echo "Batch $BATCH: $PROCESSED files, $VCCONS vCons, $FAILED failed"
+ echo "Total so far: $TOTAL_PROCESSED files, $TOTAL_VCCONS vCons"
+ fi
+
+ # Check if vfun crashed
+ if ! curl -s http://localhost:4380/ready > /dev/null; then
+ echo "$(date +%H:%M:%S) vfun crashed, restarting..."
+ restart_vfun || exit 1
+ fi
+
+ # Check if we got fewer files than batch size (done)
+ if [ -n "$PROCESSED" ] && [ "$PROCESSED" -lt "$BATCH_SIZE" ]; then
+ echo ""
+ echo "=== COMPLETE ==="
+ echo "Total processed: $TOTAL_PROCESSED"
+ echo "Total vCons: $TOTAL_VCCONS"
+ break
+ fi
+
+ BATCH=$((BATCH + 1))
+
+ # Safety limit
+ if [ $BATCH -gt 100 ]; then
+ echo "Safety limit reached"
+ break
+ fi
+done
diff --git a/server/api.py b/server/api.py
index c38d7ca..1f3ce96 100644
--- a/server/api.py
+++ b/server/api.py
@@ -260,6 +260,24 @@ async def health_check() -> JSONResponse:
})
+@app.get(
+ "/stats/queue",
+ summary="Get queue depth",
+ description="Returns the number of items in a Redis list (queue)",
+ tags=["system"],
+)
+async def get_queue_depth(
+ list_name: str = Query(..., description="Name of the Redis list to measure")
+) -> JSONResponse:
+ """Get the current depth of a Redis list. Public endpoint (no auth) for monitoring and backpressure."""
+ try:
+ depth = await redis_async.llen(list_name)
+ return JSONResponse(content={"list_name": list_name, "depth": depth})
+ except Exception as e:
+ logger.error(f"Error getting queue depth for '{list_name}': {str(e)}")
+ raise HTTPException(status_code=500, detail="Failed to get queue depth")
+
+
class Vcon(BaseModel):
"""Pydantic model representing a vCon (Voice Conversation) record.
@@ -659,7 +677,7 @@ async def post_vcon(
await add_vcon_to_set(key, timestamp)
logger.debug(f"Indexing vCon {inbound_vcon.uuid}")
- await index_vcon(inbound_vcon.uuid)
+ await index_vcon_parties(str(inbound_vcon.uuid), dict_vcon["parties"])
# Add to ingress lists if specified
if ingress_lists:
@@ -754,7 +772,7 @@ async def external_ingress_vcon(
await add_vcon_to_set(key, timestamp)
logger.debug(f"Indexing vCon {inbound_vcon.uuid}")
- await index_vcon(inbound_vcon.uuid)
+ await index_vcon_parties(str(inbound_vcon.uuid), dict_vcon["parties"])
# Always add to the specified ingress list (required for this endpoint)
vcon_uuid_str = str(inbound_vcon.uuid)
@@ -1042,25 +1060,17 @@ async def get_dlq_vcons(
raise HTTPException(status_code=500, detail="Failed to read DLQ")
-async def index_vcon(uuid: UUID) -> None:
- """Index a vCon for searching.
+async def index_vcon_parties(vcon_uuid: str, parties: list) -> None:
+ """Index a vCon's parties for searching.
- Adds the vCon to the sorted set and indexes it by party information
- (tel, mailto, name) for searching. All indexed keys will expire after
- VCON_INDEX_EXPIRY seconds.
+ Indexes by party information (tel, mailto, name). All indexed keys
+ will expire after VCON_INDEX_EXPIRY seconds.
Args:
- uuid: UUID of the vCon to index
+ vcon_uuid: UUID string of the vCon
+ parties: List of party dicts from the vCon
"""
- key = f"vcon:{uuid}"
- vcon = await redis_async.json().get(key)
- created_at = datetime.fromisoformat(vcon["created_at"])
- timestamp = int(created_at.timestamp())
- vcon_uuid = vcon["uuid"]
- await add_vcon_to_set(key, timestamp)
-
- # Index by party information with expiration
- for party in vcon["parties"]:
+ for party in parties:
if party.get("tel"):
tel_key = f"tel:{party['tel']}"
await redis_async.sadd(tel_key, vcon_uuid)
@@ -1075,6 +1085,25 @@ async def index_vcon(uuid: UUID) -> None:
await redis_async.expire(name_key, VCON_INDEX_EXPIRY)
+async def index_vcon(uuid: UUID) -> None:
+ """Index a vCon for searching (reads from Redis).
+
+ Reads the vCon from Redis, adds it to the sorted set, and indexes
+ by party information. Used for bulk re-indexing. For the ingest path,
+ use index_vcon_parties() directly to avoid redundant Redis reads.
+
+ Args:
+ uuid: UUID of the vCon to index
+ """
+ key = f"vcon:{uuid}"
+ vcon = await redis_async.json().get(key)
+ created_at = datetime.fromisoformat(vcon["created_at"])
+ timestamp = int(created_at.timestamp())
+ vcon_uuid = vcon["uuid"]
+ await add_vcon_to_set(key, timestamp)
+ await index_vcon_parties(vcon_uuid, vcon["parties"])
+
+
@api_router.get(
"/index_vcons",
status_code=200,
diff --git a/server/links/keyword_tagger/__init__.py b/server/links/keyword_tagger/__init__.py
new file mode 100644
index 0000000..a7e8ccf
--- /dev/null
+++ b/server/links/keyword_tagger/__init__.py
@@ -0,0 +1,211 @@
+"""Keyword Tagger Link
+
+This link searches vCon transcriptions for specific keywords and phrases,
+then adds corresponding tags based on matches.
+
+Categories:
+- Compliance & Regulatory (do_not_call, take_me_off, stop_calling, fcc, etc.)
+- Voicemail Service Detection (youmail variants)
+- Recording/Transcription mentions
+- Greeting Patterns
+- Legal/Professional
+- Other Content
+
+Configuration options:
+ categories: List of category names to enable (default: all)
+ custom_keywords: Dict of additional tag -> keywords mappings
+ case_sensitive: Whether to match case-sensitively (default: false)
+"""
+
+import re
+from typing import Any, Dict, List, Optional, Set
+
+from server.lib.vcon_redis import VconRedis
+from lib.logging_utils import init_logger
+
+logger = init_logger(__name__)
+
+# Keyword definitions organized by category
+# Format: tag_name -> list of keywords/phrases to search for
+KEYWORD_RULES = {
+ # Compliance & Regulatory
+ "compliance": {
+ "do_not_call": ["do not call", "don't call", "dont call", "do-not-call"],
+ "take_me_off": ["take me off", "take us off", "remove me", "remove us"],
+ "stop_calling": ["stop calling", "quit calling", "stop call"],
+ "mentions_fcc": ["fcc", "federal communications"],
+ "mentions_report": ["report you", "report this", "reporting", "file a complaint"],
+ "mentions_enforcement": ["enforc", "attorney general", "lawsuit", "sue you", "legal action"],
+ "mentions_scam": ["scam", "scammer", "scamming", "fraudulent", "fraud"],
+ "mentions_spam": ["spam", "spammer", "spamming", "junk call"],
+ "mentions_robocall": ["robo", "robocall", "auto-dial", "autodial", "autodialer"],
+ "mentions_protection": ["protect", "protection", "tcpa", "consumer protection"],
+ },
+
+ # Voicemail Service Detection (YouMail variants)
+ "voicemail_service": {
+ "youmail_detected": [
+ "youmail", "youmale", "you mail", "you male",
+ "umale", "umail", "u mail", "u-mail",
+ ],
+ },
+
+ # Recording/Transcription mentions
+ "recording": {
+ "mentions_transcribe": ["transcribe", "transcription", "transcribing"],
+ "mentions_recording": ["being recorded", "this call is recorded", "call is being recorded",
+ "may be recorded", "will be recorded", "recording this call"],
+ "mentions_email": ["email", "e-mail"], # Note: may want to exclude "voicemail"
+ },
+
+ # Greeting Patterns
+ "greetings": {
+ "greeting_yea_hello": ["yea hello", "yeah hello", "ya hello"],
+ "greeting_hi_informal": ["hi j", "hi g", "hey j", "hey g"],
+ },
+
+ # Legal/Professional
+ "legal": {
+ "mentions_law": ["law office", "lawoffice", "law firm", "lawfirm",
+ "attorney", "lawyer", "legal department"],
+ },
+
+ # Other Content
+ "other": {
+ "profanity": ["fuck", "shit", "damn", "ass"],
+ "mentions_torn": ["torn"],
+ "mentions_push": ["push"],
+ "mentions_pitch": ["pitch", "sales pitch"],
+ "mentions_bank": ["bank", "banking", "banker"],
+ "mentions_county": ["county"],
+ "mentions_general": ["general"],
+ "mentions_subscribe": ["subscribe", "subscription"],
+ "why_calling": ["why are you calling", "why you calling", "why do you keep calling"],
+ },
+}
+
+default_options = {
+ "categories": None, # None means all categories
+ "custom_keywords": {}, # Additional tag -> keywords mappings
+ "case_sensitive": False,
+ "min_confidence": 0.0, # Minimum transcription confidence to process
+}
+
+
+def get_transcription_text(vcon: Any) -> Optional[str]:
+ """Extract transcription text from vCon analysis entries."""
+ texts = []
+
+ for analysis in vcon.analysis:
+ analysis_type = analysis.get("type", "")
+
+ # Handle WTF transcription format
+ if analysis_type == "wtf_transcription":
+ body = analysis.get("body", {})
+ if isinstance(body, dict):
+ transcript = body.get("transcript", {})
+ if isinstance(transcript, dict):
+ text = transcript.get("text", "")
+ if text:
+ texts.append(text)
+ # Also check segments for text
+ segments = body.get("segments", [])
+ for seg in segments:
+ seg_text = seg.get("text", "")
+ if seg_text:
+ texts.append(seg_text)
+
+ # Handle standard transcription format
+ elif analysis_type == "transcription":
+ body = analysis.get("body", "")
+ if isinstance(body, str):
+ texts.append(body)
+ elif isinstance(body, dict):
+ text = body.get("text", body.get("transcript", ""))
+ if text:
+ texts.append(text)
+
+ if texts:
+ return " ".join(texts)
+ return None
+
+
+def find_keywords(text: str, keywords: List[str], case_sensitive: bool = False) -> Set[str]:
+ """Find which keywords are present in the text."""
+ if not case_sensitive:
+ text = text.lower()
+
+ found = set()
+ for keyword in keywords:
+ search_keyword = keyword if case_sensitive else keyword.lower()
+ if search_keyword in text:
+ found.add(keyword)
+
+ return found
+
+
+def run(
+ vcon_uuid: str,
+ link_name: str,
+ opts: Dict[str, Any] = None,
+) -> Optional[str]:
+ """Process a vCon and add keyword-based tags."""
+ merged_opts = default_options.copy()
+ if opts:
+ merged_opts.update(opts)
+ opts = merged_opts
+
+ logger.info(f"Starting keyword_tagger for vCon: {vcon_uuid}")
+
+ vcon_redis = VconRedis()
+ vcon = vcon_redis.get_vcon(vcon_uuid)
+
+ if not vcon:
+ logger.error(f"keyword_tagger: vCon {vcon_uuid} not found")
+ return vcon_uuid
+
+ # Get transcription text
+ text = get_transcription_text(vcon)
+
+ if not text:
+ logger.debug(f"No transcription found for vCon {vcon_uuid}")
+ return vcon_uuid
+
+ logger.debug(f"Analyzing transcription ({len(text)} chars) for vCon {vcon_uuid}")
+
+ case_sensitive = opts.get("case_sensitive", False)
+ enabled_categories = opts.get("categories") # None = all
+ custom_keywords = opts.get("custom_keywords", {})
+
+ tags_added = []
+
+ # Process built-in keyword rules
+ for category, rules in KEYWORD_RULES.items():
+ # Skip if category filtering is enabled and this category is not in the list
+ if enabled_categories is not None and category not in enabled_categories:
+ continue
+
+ for tag_name, keywords in rules.items():
+ found = find_keywords(text, keywords, case_sensitive)
+ if found:
+ vcon.add_tag(tag_name=tag_name, tag_value=",".join(sorted(found)))
+ tags_added.append(tag_name)
+ logger.debug(f"Added tag '{tag_name}' (matched: {found})")
+
+ # Process custom keywords
+ for tag_name, keywords in custom_keywords.items():
+ if isinstance(keywords, str):
+ keywords = [keywords]
+ found = find_keywords(text, keywords, case_sensitive)
+ if found:
+ vcon.add_tag(tag_name=tag_name, tag_value=",".join(sorted(found)))
+ tags_added.append(tag_name)
+ logger.debug(f"Added custom tag '{tag_name}' (matched: {found})")
+
+ if tags_added:
+ vcon_redis.store_vcon(vcon)
+ logger.info(f"Added {len(tags_added)} tags to vCon {vcon_uuid}: {tags_added}")
+ else:
+ logger.debug(f"No keyword matches for vCon {vcon_uuid}")
+
+ return vcon_uuid
diff --git a/server/links/webhook/__init__.py b/server/links/webhook/__init__.py
index 16b433c..28c8afd 100644
--- a/server/links/webhook/__init__.py
+++ b/server/links/webhook/__init__.py
@@ -33,6 +33,10 @@ def run(
# The webhook needs a stringified JSON version.
json_dict = vCon.to_dict()
+ # Ensure vcon version is 0.3.0 for compatibility with vcon-mcp REST API
+ if json_dict.get("vcon") == "0.0.1" or "vcon" not in json_dict:
+ json_dict["vcon"] = "0.3.0"
+
# Build headers from configuration
headers = opts.get("headers", {})
diff --git a/server/links/wtf_transcribe/README.md b/server/links/wtf_transcribe/README.md
new file mode 100644
index 0000000..6b22fdc
--- /dev/null
+++ b/server/links/wtf_transcribe/README.md
@@ -0,0 +1,140 @@
+# WTF Transcription Link (vfun Integration)
+
+A link that sends vCon audio dialogs to a vfun transcription server and adds the results as WTF (World Transcription Format) analysis entries.
+
+## Overview
+
+This link integrates with the vfun transcription server to provide:
+- Multi-language speech recognition (English + auto-detect)
+- Speaker diarization (who spoke when)
+- GPU-accelerated processing with CUDA
+- WTF-compliant output format per IETF draft-howe-vcon-wtf-extension-01
+
+## Configuration
+
+```yaml
+wtf_transcribe:
+ module: links.wtf_transcribe
+ options:
+ # Required: URL of the vfun transcription server
+ vfun-server-url: http://localhost:8443/transcribe
+
+ # Optional: Enable speaker diarization (default: true)
+ diarize: true
+
+ # Optional: Request timeout in seconds (default: 300)
+ timeout: 300
+
+ # Optional: Minimum dialog duration to transcribe in seconds (default: 5)
+ min-duration: 5
+
+ # Optional: API key for vfun server authentication
+ api-key: your-api-key-here
+```
+
+## How It Works
+
+1. **Extract Audio**: Reads audio from vCon dialog (supports `body` with base64/base64url encoding, or `url` with file:// or http:// references)
+2. **Send to vfun**: POSTs audio file to vfun's `/transcribe` endpoint
+3. **Create WTF Analysis**: Formats the transcription result as a WTF analysis entry
+4. **Update vCon**: Adds the WTF analysis to the vCon and stores it back to Redis
+
+## Output Format
+
+The link adds analysis entries with the WTF format:
+
+```json
+{
+ "type": "wtf_transcription",
+ "dialog": 0,
+ "mediatype": "application/json",
+ "vendor": "vfun",
+ "product": "parakeet-tdt-110m",
+ "schema": "wtf-1.0",
+ "encoding": "json",
+ "body": {
+ "transcript": {
+ "text": "Hello, how can I help you today?",
+ "language": "en-US",
+ "duration": 30.0,
+ "confidence": 0.95
+ },
+ "segments": [
+ {
+ "id": 0,
+ "start": 0.0,
+ "end": 3.5,
+ "text": "Hello, how can I help you today?",
+ "confidence": 0.95,
+ "speaker": 0
+ }
+ ],
+ "metadata": {
+ "created_at": "2024-01-15T10:30:00Z",
+ "processed_at": "2024-01-15T10:30:05Z",
+ "provider": "vfun",
+ "model": "parakeet-tdt-110m"
+ },
+ "speakers": {
+ "0": {
+ "id": 0,
+ "label": "Speaker 0",
+ "segments": [0],
+ "total_time": 15.2
+ }
+ },
+ "quality": {
+ "average_confidence": 0.95,
+ "multiple_speakers": true,
+ "low_confidence_words": 0
+ }
+ }
+}
+```
+
+## Behavior
+
+- **Skips non-recording dialogs**: Only processes dialogs with `type: "recording"`
+- **Skips already transcribed**: Dialogs with existing WTF transcriptions are skipped
+- **Duration filtering**: Dialogs shorter than `min-duration` are skipped
+- **File URL support**: Can read audio from local `file://` URLs directly
+
+## Example Chain Configuration
+
+```yaml
+chains:
+ transcription_chain:
+ links:
+ - tag
+ - wtf_transcribe
+ - supabase_webhook
+ ingress_lists:
+ - transcribe
+ egress_lists:
+ - transcribed
+ enabled: 1
+```
+
+## vfun Server
+
+The vfun server provides GPU-accelerated transcription:
+
+```bash
+# Start vfun server
+cd /path/to/vfun
+./vfun server
+
+# Test health
+curl http://localhost:8443/ping
+
+# Manual transcription test
+curl -X POST http://localhost:8443/transcribe \
+ -H "Authorization: Bearer YOUR_API_KEY" \
+ -F "file=@audio.wav" \
+ -F "diarize=true"
+```
+
+## Related
+
+- [vfun](https://github.com/strolid/vfun) - GPU-accelerated transcription server
+- [draft-howe-vcon-wtf-extension](https://datatracker.ietf.org/doc/html/draft-howe-vcon-wtf-extension) - IETF WTF specification
diff --git a/server/links/wtf_transcribe/__init__.py b/server/links/wtf_transcribe/__init__.py
new file mode 100644
index 0000000..2e85433
--- /dev/null
+++ b/server/links/wtf_transcribe/__init__.py
@@ -0,0 +1,586 @@
+"""WTF Transcription Link (vfun integration)
+
+This link sends vCon audio dialogs to a vfun transcription server and adds
+the results as WTF (World Transcription Format) analysis entries.
+
+The vfun server provides:
+- Multi-language speech recognition (English + auto-detect)
+- Speaker diarization (who spoke when)
+- GPU-accelerated processing with CUDA
+
+Configuration options:
+ vfun-server-url: URL of the vfun transcription server (required)
+ diarize: Enable speaker diarization (default: true)
+ timeout: Request timeout in seconds (default: 300)
+ min-duration: Minimum dialog duration to transcribe in seconds (default: 5)
+ api-key: Optional API key for vfun server authentication
+
+Example configuration in config.yml:
+ wtf_transcribe:
+ module: links.wtf_transcribe
+ options:
+ vfun-server-url: http://localhost:8443/transcribe
+ diarize: true
+ timeout: 300
+ min-duration: 5
+ api-key: your-api-key-here
+"""
+
+import base64
+import hashlib
+import json
+import logging
+import os
+import random
+import tempfile
+import time
+import threading
+import requests
+from datetime import datetime, timezone
+from typing import Optional, Dict, Any, List
+
+from server.lib.vcon_redis import VconRedis
+from lib.logging_utils import init_logger
+from lib.error_tracking import init_error_tracker
+from redis_mgr import redis
+
+init_error_tracker()
+logger = init_logger(__name__)
+
+
+# ---------------------------------------------------------------------------
+# Health-aware vfun URL selector with self-healing
+# ---------------------------------------------------------------------------
+class _VfunHealthTracker:
+ """Track vfun instance health across all workers in this process.
+
+ Instances are marked DOWN on connection/timeout/HTTP errors and
+ automatically re-checked after `recovery_seconds`. Selection prefers
+ healthy instances with random load balancing; when all are down the
+ least-recently-failed instance is tried first.
+ """
+
+ def __init__(self, recovery_seconds: float = 30.0):
+ self._lock = threading.Lock()
+ # url -> timestamp when it was marked down (0 = healthy)
+ self._down_since: Dict[str, float] = {}
+ self._recovery_seconds = recovery_seconds
+
+ def _is_healthy(self, url: str, now: float) -> bool:
+ ts = self._down_since.get(url, 0)
+ if ts == 0:
+ return True
+ # Self-heal: allow retry after recovery window
+ return (now - ts) >= self._recovery_seconds
+
+ def mark_down(self, url: str) -> None:
+ with self._lock:
+ if self._down_since.get(url, 0) == 0:
+ logger.warning("vfun instance marked DOWN: %s", url)
+ self._down_since[url] = time.monotonic()
+
+ def mark_healthy(self, url: str) -> None:
+ with self._lock:
+ was_down = self._down_since.get(url, 0) != 0
+ self._down_since[url] = 0
+ if was_down:
+ logger.info("vfun instance recovered: %s", url)
+
+ def get_ordered_urls(self, urls: List[str]) -> List[str]:
+ """Return URLs ordered: healthy (shuffled) first, then recovering
+ (oldest-failure first), then remaining down instances."""
+ now = time.monotonic()
+ healthy = []
+ recovering = []
+ down = []
+ with self._lock:
+ for url in urls:
+ ts = self._down_since.get(url, 0)
+ if ts == 0:
+ healthy.append(url)
+ elif (now - ts) >= self._recovery_seconds:
+ recovering.append((ts, url))
+ else:
+ down.append((ts, url))
+ random.shuffle(healthy)
+ recovering.sort() # oldest failure first (most likely recovered)
+ down.sort()
+ return healthy + [u for _, u in recovering] + [u for _, u in down]
+
+
+# Module-level singleton shared across all workers in this process
+_health_tracker = _VfunHealthTracker(recovery_seconds=30.0)
+
+default_options = {
+ "vfun-server-url": None,
+ "vfun-server-urls": None, # List of URLs for load balancing
+ "diarize": True,
+ "timeout": 300,
+ "min-duration": 5,
+ "api-key": None,
+ "cache-ttl": 604800, # 7 days in seconds
+}
+
+# Redis cache key prefixes for transcription results
+WTF_CACHE_PREFIX = "wtf_cache:"
+TRANSCRIPTION_PREFIX = "transcription:"
+
+
+def _get_filename_from_dialog(dialog: Dict[str, Any]) -> Optional[str]:
+ """Extract the audio filename from a dialog's URL."""
+ url = dialog.get("url", "")
+ if url:
+ if url.startswith("file://"):
+ return os.path.basename(url[7:])
+ else:
+ return os.path.basename(url.split("?")[0])
+ return None
+
+
+def get_cache_key(dialog: Dict[str, Any]) -> Optional[str]:
+ """Derive a cache key from the dialog's audio file URL or body hash."""
+ filename = _get_filename_from_dialog(dialog)
+ if filename:
+ return f"{WTF_CACHE_PREFIX}{filename}"
+ # Fall back to hashing the body content
+ body = dialog.get("body")
+ if body:
+ body_hash = hashlib.sha256(body.encode() if isinstance(body, str) else body).hexdigest()[:32]
+ return f"{WTF_CACHE_PREFIX}hash:{body_hash}"
+ return None
+
+
+def get_cached_transcription(cache_key: str, dialog: Dict[str, Any]) -> Optional[Dict[str, Any]]:
+ """Check Redis for a cached WTF transcription result.
+
+ Checks two key patterns:
+ 1. wtf_cache:{filename} — full WTF body (stored by this link)
+ 2. transcription:{filename} — simple {text, language, duration} (pre-populated)
+ """
+ # Check wtf_cache: first (full body, ready to use)
+ try:
+ cached = redis.get(cache_key)
+ if cached:
+ return json.loads(cached)
+ except Exception as e:
+ logger.debug(f"Cache lookup failed for {cache_key}: {e}")
+
+ # Check transcription: prefix (simpler format, needs conversion)
+ filename = _get_filename_from_dialog(dialog)
+ if filename:
+ try:
+ cached = redis.get(f"{TRANSCRIPTION_PREFIX}{filename}")
+ if cached:
+ data = json.loads(cached)
+ # Convert simple format to WTF body
+ now = datetime.now(timezone.utc).isoformat()
+ duration = float(data.get("duration", dialog.get("duration", 30.0)))
+ return {
+ "transcript": {
+ "text": data.get("text", ""),
+ "language": data.get("language", "en-US"),
+ "duration": duration,
+ "confidence": 0.9,
+ },
+ "segments": [],
+ "metadata": {
+ "created_at": now,
+ "processed_at": now,
+ "provider": "vfun",
+ "model": "parakeet-tdt-110m",
+ "source": "redis_cache",
+ },
+ "quality": {
+ "average_confidence": 0.9,
+ "multiple_speakers": False,
+ "low_confidence_words": 0,
+ },
+ }
+ except Exception as e:
+ logger.debug(f"Transcription cache lookup failed for {filename}: {e}")
+
+ return None
+
+
+def store_cached_transcription(cache_key: str, wtf_body: Dict[str, Any], ttl: int = 604800):
+ """Store a WTF transcription result in Redis cache."""
+ try:
+ redis.setex(cache_key, ttl, json.dumps(wtf_body))
+ except Exception as e:
+ logger.debug(f"Cache store failed for {cache_key}: {e}")
+
+
+def get_vfun_urls(opts: Dict[str, Any]) -> List[str]:
+ """Get vfun server URLs ordered by health (healthy first, shuffled)."""
+ urls = opts.get("vfun-server-urls")
+ if urls and isinstance(urls, list) and len(urls) > 0:
+ return _health_tracker.get_ordered_urls(urls)
+ single = opts.get("vfun-server-url")
+ return [single] if single else []
+
+
+def has_wtf_transcription(vcon: Any, dialog_index: int) -> bool:
+ """Check if a dialog already has a WTF transcription."""
+ for analysis in vcon.analysis:
+ if (analysis.get("type") == "wtf_transcription" and
+ analysis.get("dialog") == dialog_index):
+ return True
+ return False
+
+
+def should_transcribe_dialog(dialog: Dict[str, Any], min_duration: float) -> bool:
+ """Check if a dialog should be transcribed."""
+ if dialog.get("type") != "recording":
+ return False
+ if not dialog.get("body") and not dialog.get("url"):
+ return False
+ duration = dialog.get("duration")
+ if duration is not None and float(duration) < min_duration:
+ return False
+ return True
+
+
+def get_audio_content(dialog: Dict[str, Any]) -> Optional[bytes]:
+ """Extract audio content from dialog body or URL."""
+ if dialog.get("body"):
+ encoding = dialog.get("encoding", "base64")
+ if encoding == "base64url":
+ return base64.urlsafe_b64decode(dialog["body"])
+ elif encoding == "base64":
+ return base64.b64decode(dialog["body"])
+ else:
+ return dialog["body"].encode() if isinstance(dialog["body"], str) else dialog["body"]
+
+ if dialog.get("url"):
+ url = dialog["url"]
+ if url.startswith("file://"):
+ filepath = url[7:]
+ try:
+ with open(filepath, "rb") as f:
+ return f.read()
+ except Exception as e:
+ logger.error(f"Failed to read file {filepath}: {e}")
+ return None
+ else:
+ try:
+ resp = requests.get(url, timeout=60)
+ resp.raise_for_status()
+ return resp.content
+ except Exception as e:
+ logger.error(f"Failed to fetch URL {url}: {e}")
+ return None
+ return None
+
+
+def create_wtf_analysis(
+ dialog_index: int,
+ vfun_response: Dict[str, Any],
+ duration: float,
+) -> Dict[str, Any]:
+ """Create a WTF analysis entry from vfun response."""
+ now = datetime.now(timezone.utc).isoformat()
+
+ # Extract text and segments from vfun response
+ # vfun can return either:
+ # 1. Direct response: {"type": "wtf_transcription", "body": {...}}
+ # 2. Wrapped in analysis: {"analysis": [{"type": "wtf_transcription", "body": {...}}]}
+
+ full_text = ""
+ segments = []
+ language = "en-US"
+
+ # Check for direct response format first (vfun native format)
+ if vfun_response.get("type") in ("transcription", "wtf_transcription"):
+ body = vfun_response.get("body", {})
+ if isinstance(body, dict):
+ transcript = body.get("transcript", {})
+ full_text = transcript.get("text", body.get("text", ""))
+ language = transcript.get("language", body.get("language", "en-US"))
+ segments = body.get("segments", [])
+ elif isinstance(body, str):
+ full_text = body
+ else:
+ # Try wrapped analysis format
+ analysis_entries = vfun_response.get("analysis", [])
+ for entry in analysis_entries:
+ if entry.get("type") in ("transcription", "wtf_transcription"):
+ body = entry.get("body", {})
+ if isinstance(body, dict):
+ transcript = body.get("transcript", {})
+ full_text = transcript.get("text", body.get("text", ""))
+ language = transcript.get("language", body.get("language", "en-US"))
+ segments = body.get("segments", [])
+ elif isinstance(body, str):
+ full_text = body
+ break
+
+ # If no text found, check for direct text field
+ if not full_text:
+ full_text = vfun_response.get("text", "")
+ segments = vfun_response.get("segments", [])
+
+ # Calculate confidence
+ if segments:
+ confidences = [s.get("confidence", 0.9) for s in segments]
+ avg_confidence = sum(confidences) / len(confidences)
+ else:
+ avg_confidence = 0.9
+
+ # Build WTF segments
+ wtf_segments = []
+ for i, seg in enumerate(segments):
+ wtf_seg = {
+ "id": seg.get("id", i),
+ "start": float(seg.get("start", seg.get("start_time", 0.0))),
+ "end": float(seg.get("end", seg.get("end_time", 0.0))),
+ "text": seg.get("text", seg.get("transcription", "")),
+ "confidence": float(seg.get("confidence", 0.9)),
+ }
+ if "speaker" in seg:
+ wtf_seg["speaker"] = seg["speaker"]
+ wtf_segments.append(wtf_seg)
+
+ # Build speakers section
+ speakers = {}
+ for seg in wtf_segments:
+ speaker = seg.get("speaker")
+ if speaker is not None:
+ speaker_key = str(speaker)
+ if speaker_key not in speakers:
+ speakers[speaker_key] = {
+ "id": speaker,
+ "label": f"Speaker {speaker}",
+ "segments": [],
+ "total_time": 0.0,
+ }
+ speakers[speaker_key]["segments"].append(seg["id"])
+ speakers[speaker_key]["total_time"] += seg["end"] - seg["start"]
+
+ # Build WTF body
+ wtf_body = {
+ "transcript": {
+ "text": full_text,
+ "language": language,
+ "duration": float(duration),
+ "confidence": float(avg_confidence),
+ },
+ "segments": wtf_segments,
+ "metadata": {
+ "created_at": now,
+ "processed_at": now,
+ "provider": "vfun",
+ "model": "parakeet-tdt-110m",
+ "audio": {
+ "duration": float(duration),
+ },
+ },
+ "quality": {
+ "average_confidence": float(avg_confidence),
+ "multiple_speakers": len(speakers) > 1,
+ "low_confidence_words": sum(1 for s in wtf_segments if s.get("confidence", 1.0) < 0.5),
+ },
+ }
+
+ if speakers:
+ wtf_body["speakers"] = speakers
+
+ return {
+ "type": "wtf_transcription",
+ "dialog": dialog_index,
+ "mediatype": "application/json",
+ "vendor": "vfun",
+ "product": "parakeet-tdt-110m",
+ "schema": "wtf-1.0",
+ # Note: encoding omitted since body is a direct object, not a JSON string
+ "body": wtf_body,
+ }
+
+
+def run(
+ vcon_uuid: str,
+ link_name: str,
+ opts: Dict[str, Any] = None,
+) -> Optional[str]:
+ """Process a vCon through the vfun transcription service."""
+ merged_opts = default_options.copy()
+ if opts:
+ merged_opts.update(opts)
+ opts = merged_opts
+
+ logger.info(f"Starting wtf_transcribe link for vCon: {vcon_uuid}")
+
+ # Check if any vfun URL is configured
+ if not opts.get("vfun-server-url") and not opts.get("vfun-server-urls"):
+ logger.error("wtf_transcribe: vfun-server-url or vfun-server-urls is required")
+ return vcon_uuid
+
+ vcon_redis = VconRedis()
+ vcon = vcon_redis.get_vcon(vcon_uuid)
+
+ if not vcon:
+ logger.error(f"wtf_transcribe: vCon {vcon_uuid} not found")
+ return vcon_uuid
+
+ # Find dialogs to transcribe
+ dialogs_processed = 0
+ dialogs_skipped = 0
+ cache_hits = 0
+ cache_misses = 0
+ cache_ttl = opts.get("cache-ttl", 604800)
+
+ for i, dialog in enumerate(vcon.dialog):
+ if not should_transcribe_dialog(dialog, opts.get("min-duration", 5)):
+ logger.debug(f"Skipping dialog {i} (not eligible)")
+ dialogs_skipped += 1
+ continue
+
+ if has_wtf_transcription(vcon, i):
+ logger.debug(f"Skipping dialog {i} (already transcribed)")
+ dialogs_skipped += 1
+ continue
+
+ # Check Redis cache before calling vfun
+ cache_key = get_cache_key(dialog)
+ cached_body = get_cached_transcription(cache_key, dialog) if cache_key else None
+
+ if cached_body:
+ # Cache hit - use cached transcription
+ cache_hits += 1
+ logger.info(f"Cache HIT for dialog {i} (key={cache_key})")
+
+ wtf_analysis = {
+ "type": "wtf_transcription",
+ "dialog": i,
+ "mediatype": "application/json",
+ "vendor": "vfun",
+ "product": "parakeet-tdt-110m",
+ "schema": "wtf-1.0",
+ "body": cached_body,
+ }
+
+ vcon.add_analysis(
+ type=wtf_analysis["type"],
+ dialog=wtf_analysis["dialog"],
+ vendor=wtf_analysis.get("vendor"),
+ body=wtf_analysis["body"],
+ extra={
+ "mediatype": wtf_analysis.get("mediatype"),
+ "product": wtf_analysis.get("product"),
+ "schema": wtf_analysis.get("schema"),
+ },
+ )
+
+ dialogs_processed += 1
+ logger.info(f"Added cached WTF transcription for dialog {i}")
+ continue
+
+ # Cache miss - need to call vfun
+ cache_misses += 1
+
+ # Get audio content
+ audio_content = get_audio_content(dialog)
+ if not audio_content:
+ logger.warning(f"Could not extract audio from dialog {i}")
+ dialogs_skipped += 1
+ continue
+
+ logger.info(f"Cache MISS for dialog {i} - calling vfun (key={cache_key})")
+
+ # Try each vfun instance in health-priority order until one succeeds
+ vfun_urls = get_vfun_urls(opts)
+ if not vfun_urls:
+ logger.error("wtf_transcribe: no vfun URLs available")
+ dialogs_skipped += 1
+ continue
+
+ mimetype = dialog.get("mimetype", "audio/wav")
+ headers = {}
+ api_key = opts.get("api-key")
+ if api_key:
+ headers["Authorization"] = f"Bearer {api_key}"
+ timeout = opts.get("timeout", 300)
+
+ transcribed = False
+ for attempt, vfun_server_url in enumerate(vfun_urls):
+ try:
+ files = {
+ "file-binary": ("audio", audio_content, mimetype)
+ }
+ response = requests.post(
+ vfun_server_url,
+ files=files,
+ headers=headers,
+ timeout=timeout,
+ )
+
+ if response.status_code in (200, 302):
+ _health_tracker.mark_healthy(vfun_server_url)
+ vfun_response = response.json()
+ if isinstance(vfun_response, str):
+ vfun_response = json.loads(vfun_response)
+
+ duration = dialog.get("duration", 30.0)
+ wtf_analysis = create_wtf_analysis(i, vfun_response, float(duration))
+
+ if cache_key:
+ store_cached_transcription(cache_key, wtf_analysis["body"], cache_ttl)
+ logger.info(f"Cached transcription for dialog {i} (key={cache_key})")
+
+ vcon.add_analysis(
+ type=wtf_analysis["type"],
+ dialog=wtf_analysis["dialog"],
+ vendor=wtf_analysis.get("vendor"),
+ body=wtf_analysis["body"],
+ extra={
+ "mediatype": wtf_analysis.get("mediatype"),
+ "product": wtf_analysis.get("product"),
+ "schema": wtf_analysis.get("schema"),
+ },
+ )
+
+ dialogs_processed += 1
+ if attempt > 0:
+ logger.info(f"Added WTF transcription for dialog {i} (succeeded on attempt {attempt + 1})")
+ else:
+ logger.info(f"Added WTF transcription for dialog {i}")
+ transcribed = True
+ break # success — stop trying other URLs
+
+ else:
+ _health_tracker.mark_down(vfun_server_url)
+ logger.warning(
+ f"vfun {vfun_server_url} returned {response.status_code} for dialog {i}, "
+ f"trying next instance ({attempt + 1}/{len(vfun_urls)})"
+ )
+
+ except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e:
+ _health_tracker.mark_down(vfun_server_url)
+ logger.warning(
+ f"vfun {vfun_server_url} unreachable for dialog {i}: {type(e).__name__}, "
+ f"trying next instance ({attempt + 1}/{len(vfun_urls)})"
+ )
+ except Exception as e:
+ _health_tracker.mark_down(vfun_server_url)
+ logger.error(
+ f"Unexpected error from vfun {vfun_server_url} for dialog {i}: {e}",
+ exc_info=True,
+ )
+
+ if not transcribed:
+ logger.error(
+ f"All {len(vfun_urls)} vfun instances failed for dialog {i} of vCon {vcon_uuid}"
+ )
+
+ if dialogs_processed > 0:
+ vcon_redis.store_vcon(vcon)
+ logger.info(
+ f"Updated vCon {vcon_uuid}: processed={dialogs_processed}, "
+ f"skipped={dialogs_skipped}, cache_hits={cache_hits}, cache_misses={cache_misses}"
+ )
+ else:
+ logger.info(
+ f"No dialogs transcribed for vCon {vcon_uuid} "
+ f"(cache_hits={cache_hits}, cache_misses={cache_misses})"
+ )
+
+ return vcon_uuid
diff --git a/server/storage/webhook/__init__.py b/server/storage/webhook/__init__.py
new file mode 100644
index 0000000..e9bfbb5
--- /dev/null
+++ b/server/storage/webhook/__init__.py
@@ -0,0 +1,39 @@
+from server.lib.vcon_redis import VconRedis
+from lib.logging_utils import init_logger
+
+import requests
+
+logger = init_logger(__name__)
+
+default_options = {
+ "webhook-urls": [],
+ "headers": {},
+}
+
+
+def save(vcon_uuid, opts=default_options):
+ vcon_redis = VconRedis()
+ vCon = vcon_redis.get_vcon(vcon_uuid)
+
+ json_dict = vCon.to_dict()
+
+ if json_dict.get("vcon") == "0.0.1" or "vcon" not in json_dict:
+ json_dict["vcon"] = "0.3.0"
+
+ headers = opts.get("headers", {})
+
+ webhook_urls = opts.get("webhook-urls", [])
+ if not webhook_urls:
+ logger.warning(
+ f"webhook storage: no webhook-urls configured for vcon {vcon_uuid}, skipping"
+ )
+ return
+
+ for url in webhook_urls:
+ logger.info(
+ f"webhook storage: posting vcon {vcon_uuid} to webhook url: {url}"
+ )
+ resp = requests.post(url, json=json_dict, headers=headers)
+ logger.info(
+ f"webhook storage response for {vcon_uuid}: {resp.status_code} {resp.text}"
+ )
diff --git a/signoz/README.md b/signoz/README.md
new file mode 100644
index 0000000..25ecd0f
--- /dev/null
+++ b/signoz/README.md
@@ -0,0 +1,196 @@
+# SigNoz Observability Stack for vcon-server
+
+This directory contains the configuration for SigNoz, a self-hosted observability platform that collects traces, metrics, and logs from vcon-server (conserver and api) via OpenTelemetry.
+
+## Architecture
+
+```
+┌─────────────────┐ OTLP/HTTP ┌──────────────────────┐
+│ conserver / api │ ─────────────────► │ signoz-otel-collector│
+│ (instrumented) │ :4318 │ (OTLP receiver) │
+└─────────────────┘ └──────────┬───────────┘
+ │
+ ▼
+┌─────────────────┐ ┌──────────────────────┐
+│ signoz (UI) │ ◄────────────────► │ signoz-clickhouse │
+│ :3301 │ TCP :9000 │ (time-series DB) │
+└─────────────────┘ └──────────┬───────────┘
+ │
+ ▼
+ ┌──────────────────────┐
+ │ signoz-zookeeper │
+ │ (coordination) │
+ └──────────────────────┘
+```
+
+## Components
+
+| Service | Image | Purpose | Ports |
+|---------|-------|---------|-------|
+| signoz | `signoz/query-service:latest` | Query API + Web UI | 3301 (mapped from 8080) |
+| signoz-otel-collector | `signoz/signoz-otel-collector:latest` | OTLP ingestion | 4317 (gRPC), 4318 (HTTP) |
+| signoz-clickhouse | `clickhouse/clickhouse-server:24.1.2-alpine` | Time-series storage | 8123, 9000 (internal) |
+| signoz-zookeeper | `zookeeper:3.9` | ClickHouse coordination | 2181 (internal) |
+
+## Configuration Files
+
+### otel-collector-config.yaml
+OpenTelemetry Collector pipeline configuration:
+- **Receivers**: OTLP gRPC (4317) and HTTP (4318)
+- **Processors**: Batch processing
+- **Exporters**: ClickHouse for traces, metrics, and logs
+
+### zz-clickhouse-config.xml
+ClickHouse server configuration:
+- IPv4 listening (0.0.0.0)
+- Single-node cluster named "cluster" (required by SigNoz schema migrator)
+- ZooKeeper integration for distributed DDL
+
+### clickhouse-users.xml
+ClickHouse user permissions with default user having full access.
+
+### alertmanager.yml
+Basic alertmanager configuration (not currently active).
+
+## Usage
+
+### Start with SigNoz
+
+When you use `docker-compose.signoz.yml`, the **conserver** and **api** services are overridden to run with `opentelemetry-instrument` and OTEL environment variables so traces and metrics are sent to the SignOz collector (HTTP OTLP on port 4318). Service names appear in SignOz as `conserver` and `conserver.api`.
+
+```bash
+cd /home/thomas/bds/vcon-dev/vcon-server
+docker compose -f docker-compose.yml -f docker-compose.override.yml -f docker-compose.signoz.yml up -d
+```
+
+### Start without SigNoz (normal operation)
+
+```bash
+cd /home/thomas/bds/vcon-dev/vcon-server
+docker compose -f docker-compose.yml -f docker-compose.override.yml up -d
+```
+
+### Stop SigNoz only
+
+```bash
+docker compose -f docker-compose.signoz.yml down
+```
+
+### Access the UI
+
+Open http://localhost:3301 in your browser.
+
+## First-Time Setup
+
+After starting SigNoz for the first time, run the schema migrations (required for Traces, Metrics, and **Logs**):
+
+```bash
+docker run --rm --network conserver \
+ signoz/signoz-schema-migrator:latest \
+ sync --dsn='tcp://signoz-clickhouse:9000'
+```
+
+Note: Some migrations may fail due to JSON type syntax incompatibility with ClickHouse 24.1. Core functionality still works.
+
+Verify that logs schema exists (needed for the Logs tab):
+
+```bash
+docker exec signoz-clickhouse clickhouse-client --query "SHOW TABLES FROM signoz_logs"
+```
+
+You should see tables such as `logs_v2`, `distributed_logs_v2`, etc. If the database or tables are missing, the Logs tab will show "Aw snap" when you open it.
+
+## vcon-mcp Integration
+
+The vcon-mcp service is configured with these environment variables in `docker-compose.override.yml`:
+
+```yaml
+environment:
+ OTEL_ENABLED: "true"
+ OTEL_EXPORTER_TYPE: otlp
+ OTEL_ENDPOINT: http://signoz-otel-collector:4318
+ OTEL_SERVICE_NAME: vcon-mcp-server
+```
+
+## Verification
+
+1. Check service health:
+ ```bash
+ curl http://localhost:3301/api/v1/health
+ # Returns: {"status":"ok"}
+ ```
+
+2. Check container status:
+ ```bash
+ docker ps | grep signoz
+ ```
+
+3. View collector logs:
+ ```bash
+ docker logs signoz-otel-collector
+ ```
+
+## Troubleshooting
+
+### ClickHouse won't start
+- Check if port 9000 is in use
+- Verify zookeeper is healthy first
+- Check logs: `docker logs signoz-clickhouse`
+
+### OTEL Collector errors
+- Ensure ClickHouse is healthy before starting collector
+- Verify schema migrations have run
+- Check config syntax: `docker logs signoz-otel-collector`
+
+### No data in UI
+- Verify vcon-mcp is sending data (check its logs for OTEL export messages)
+- Ensure collector is receiving data: check collector metrics at port 8888
+- Verify ClickHouse tables exist: `docker exec signoz-clickhouse clickhouse-client --query "SHOW TABLES FROM signoz_traces"`
+
+### Logs tab shows "Aw snap" or "Something went wrong"
+Two possible causes:
+
+1. **Logs schema missing**
+ Run the schema migrator (see **First-Time Setup** above), verify `signoz_logs` tables exist, then restart `signoz` and refresh the Logs tab.
+
+2. **Query service panic (telemetry TTL check)**
+ The query service runs a telemetry cron that checks TTL for `signoz_logs.logs`. The schema migrator only creates `logs_v2`, so that table doesn't exist and the cron can panic (nil pointer), crashing the service. **Fix:** set `TELEMETRY_ENABLED=false` for the `signoz` service in `docker-compose.signoz.yml` (already set in this repo), then recreate the container:
+ `docker compose -f docker-compose.yml -f docker-compose.override.yml -f docker-compose.signoz.yml up -d signoz --force-recreate`
+
+### Port conflicts
+- Default ports: 3301 (UI), 4317 (gRPC), 4318 (HTTP)
+- Change in docker-compose.signoz.yml if needed
+
+## Known Issues
+
+1. **Schema Migration Failures**: Some newer SigNoz migrations use JSON column types with syntax not supported in ClickHouse 24.1.2. Core observability works but some advanced features may be limited.
+
+2. **Alertmanager**: Not configured for this deployment. Would require additional setup for alerts.
+
+3. **Health Check Timing**: The OTEL collector health check may show "starting" for extended periods but the service is functional.
+
+## Future Improvements
+
+- Upgrade ClickHouse to latest version for full schema compatibility
+- Add alertmanager configuration for alerts
+- Configure data retention policies
+- Add authentication to SigNoz UI
+- Set up dashboards for vcon-mcp metrics
+
+## Data Persistence
+
+Data is stored in Docker volumes:
+- `signoz_clickhouse_data` - Traces, metrics, logs
+- `signoz_zookeeper_data` - ZooKeeper state
+- `signoz_data` - SigNoz query service state
+
+To reset all data:
+```bash
+docker compose -f docker-compose.signoz.yml down -v
+```
+
+## Resources
+
+- [SigNoz Documentation](https://signoz.io/docs/)
+- [OpenTelemetry Documentation](https://opentelemetry.io/docs/)
+- [ClickHouse Documentation](https://clickhouse.com/docs/)
diff --git a/signoz/alertmanager.yml b/signoz/alertmanager.yml
new file mode 100644
index 0000000..89b0125
--- /dev/null
+++ b/signoz/alertmanager.yml
@@ -0,0 +1,19 @@
+global:
+ resolve_timeout: 5m
+
+route:
+ group_by: ['alertname']
+ group_wait: 10s
+ group_interval: 10s
+ repeat_interval: 1h
+ receiver: 'default-receiver'
+
+receivers:
+ - name: 'default-receiver'
+
+inhibit_rules:
+ - source_match:
+ severity: 'critical'
+ target_match:
+ severity: 'warning'
+ equal: ['alertname', 'dev', 'instance']
diff --git a/signoz/clickhouse-users.xml b/signoz/clickhouse-users.xml
new file mode 100644
index 0000000..c545475
--- /dev/null
+++ b/signoz/clickhouse-users.xml
@@ -0,0 +1,38 @@
+
+
+
+ 10000000000
+ 0
+ random
+ 100
+
+
+ 1
+
+
+
+
+
+
+
+ ::/0
+
+ default
+ default
+ 1
+
+
+
+
+
+
+ 3600
+ 0
+ 0
+ 0
+ 0
+ 0
+
+
+
+
diff --git a/signoz/dashboards/.gitkeep b/signoz/dashboards/.gitkeep
new file mode 100644
index 0000000..989d6cc
--- /dev/null
+++ b/signoz/dashboards/.gitkeep
@@ -0,0 +1,2 @@
+# Dashboards directory for SigNoz query service (DASHBOARDS_PATH).
+# Add JSON dashboard definitions here if needed.
diff --git a/signoz/otel-collector-config.yaml b/signoz/otel-collector-config.yaml
new file mode 100644
index 0000000..3766f7f
--- /dev/null
+++ b/signoz/otel-collector-config.yaml
@@ -0,0 +1,52 @@
+receivers:
+ otlp:
+ protocols:
+ grpc:
+ endpoint: 0.0.0.0:4317
+ http:
+ endpoint: 0.0.0.0:4318
+ # HTTP log receiver for logspout-signoz and curl/API log ingestion
+ httplogreceiver/json:
+ endpoint: 0.0.0.0:8082
+ source: json
+
+processors:
+ batch:
+ send_batch_size: 10000
+ timeout: 10s
+
+exporters:
+ clickhousetraces:
+ datasource: tcp://signoz-clickhouse:9000/signoz_traces
+ clickhouselogsexporter:
+ dsn: tcp://signoz-clickhouse:9000/signoz_logs
+ timeout: 5s
+ sending_queue:
+ queue_size: 100
+ retry_on_failure:
+ enabled: true
+ initial_interval: 5s
+ max_interval: 30s
+ max_elapsed_time: 300s
+ signozclickhousemetrics:
+ dsn: tcp://signoz-clickhouse:9000/signoz_metrics
+
+extensions:
+ health_check:
+ endpoint: 0.0.0.0:13133
+
+service:
+ extensions: [health_check]
+ pipelines:
+ traces:
+ receivers: [otlp]
+ processors: [batch]
+ exporters: [clickhousetraces]
+ metrics:
+ receivers: [otlp]
+ processors: [batch]
+ exporters: [signozclickhousemetrics]
+ logs:
+ receivers: [otlp, httplogreceiver/json]
+ processors: [batch]
+ exporters: [clickhouselogsexporter]
diff --git a/signoz/zz-clickhouse-config.xml b/signoz/zz-clickhouse-config.xml
new file mode 100644
index 0000000..d50368e
--- /dev/null
+++ b/signoz/zz-clickhouse-config.xml
@@ -0,0 +1,53 @@
+
+
+ warning
+ true
+
+
+ 0.0.0.0
+
+ 4096
+ 3
+ 100
+
+ /var/lib/clickhouse/
+ /var/lib/clickhouse/tmp/
+ /var/lib/clickhouse/user_files/
+
+ users.xml
+ default
+ default
+
+ UTC
+
+ true
+
+ 3600
+
+ 3600
+ 60
+
+
+
+
+
+ signoz-clickhouse
+ 9000
+
+
+
+
+
+
+ cluster
+ 01
+ signoz-clickhouse
+
+
+
+
+ signoz-zookeeper
+ 2181
+
+
+