Skip to content

Commit 720feba

Browse files
committed
fix: harden SDK — observer leak + callback isolation, surfaced errors
- observer: detach the sleep() abort-listener on normal timeout (was leaking one listener per tick onto a long-lived signal); wrap onLoop/onReport/onTick so a throwing user callback routes to onError instead of killing the loop - collect / planUpload: add opt-in onError so per-adapter locate/parse failures are observable instead of silently swallowed (matches the observer) - consolidate writeOtlp → writeOtlpFile (single temp-writer, single prefix) - document the ms→nanos number-typed timestamp (ordering + ms resolution kept) - test: observer routes a throwing onLoop to onError (regression)
1 parent 41eb8a0 commit 720feba

8 files changed

Lines changed: 40 additions & 21 deletions

File tree

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@tangle-network/traces",
3-
"version": "0.2.0",
3+
"version": "0.2.1",
44
"description": "Point it at your coding-agent session traces (Claude Code, Codex, OpenCode, Gemini, Pi, …) and get failure-mode + efficiency findings. CLI + SDK over the @tangle-network/agent-eval analyst suite — observe live sessions, run your own analysts, redact, and upload to the Tangle Intelligence Platform.",
55
"type": "module",
66
"license": "MIT",

src/analyze.ts

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,11 @@
1010
* (`halo <file> -p "diagnose"`), so analysis is never locked to one engine.
1111
*/
1212

13-
import { mkdtemp, writeFile } from 'node:fs/promises'
14-
import { tmpdir } from 'node:os'
15-
import { join } from 'node:path'
1613
import type { AxAIService } from '@ax-llm/ax'
1714
import { type AnalystRegistry, buildDefaultAnalystRegistry } from '@tangle-network/agent-eval/analyst'
1815
import { OtlpFileTraceStore } from '@tangle-network/agent-eval/traces'
1916
import type { OtlpSpan } from './otlp.js'
20-
import { serializeSpans } from './otlp.js'
17+
import { writeOtlpFile } from './otlp.js'
2118

2219
export interface AnalyzeOptions {
2320
/** Ax service enabling the agentic RLM kinds. Omit → deterministic only. */
@@ -44,13 +41,6 @@ export interface AnalyzeResult {
4441
result: Awaited<ReturnType<ReturnType<typeof buildDefaultAnalystRegistry>['run']>>
4542
}
4643

47-
/** Write spans to an OTLP-JSONL file and return its path. */
48-
export async function writeOtlp(spans: readonly OtlpSpan[], outPath?: string): Promise<string> {
49-
const path = outPath ?? join(await mkdtemp(join(tmpdir(), 'traces-')), 'spans.otlp.jsonl')
50-
await writeFile(path, serializeSpans(spans), 'utf8')
51-
return path
52-
}
53-
5444
/**
5545
* `viewTrace` ceiling for the deterministic pass. The default 150KB cap
5646
* exists to protect an LLM's context window — the deterministic behavioral
@@ -62,7 +52,7 @@ const DETERMINISTIC_VIEW_CEILING = 256 * 1024 * 1024
6252

6353
export async function analyzeSpans(spans: readonly OtlpSpan[], opts: AnalyzeOptions = {}): Promise<AnalyzeResult> {
6454
if (spans.length === 0) throw new Error('analyzeSpans: no spans to analyze')
65-
const otlpPath = await writeOtlp(spans, opts.otlpOutPath)
55+
const otlpPath = await writeOtlpFile(spans, opts.otlpOutPath)
6656
const runId = opts.runId ?? `traces-${Date.now()}`
6757

6858
// Deterministic pass — high ceiling so the behavioral analyst sees the whole

src/cli.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,11 @@
1515
*/
1616

1717
import { stat, writeFile } from 'node:fs/promises'
18-
import { analyzeSpans, writeOtlp } from './analyze.js'
18+
import { analyzeSpans } from './analyze.js'
1919
import type { OtlpSpan } from './otlp.js'
20-
import { runPipelines } from './pipelines.js'
20+
import { writeOtlpFile } from './otlp.js'
2121
import { watchSessions } from './observer.js'
22+
import { runPipelines } from './pipelines.js'
2223
import { knownHarnesses, listAdapters, resolveAdapter } from './registry.js'
2324
import { renderPipelines, renderReport } from './report.js'
2425
import type { HarnessTraceAdapter, SessionRef } from './types.js'
@@ -184,7 +185,7 @@ async function collectSpans(args: Args): Promise<{ spans: OtlpSpan[]; harness: s
184185
async function cmdConvert(args: Args): Promise<void> {
185186
const { spans } = await collectSpans(args)
186187
if (spans.length === 0) throw new Error('no spans found for the given selection')
187-
const path = await writeOtlp(spans, args.otlp)
188+
const path = await writeOtlpFile(spans, args.otlp)
188189
console.log(`wrote ${spans.length} spans → ${path}`)
189190
}
190191

src/collect.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ export interface CollectOptions {
2727
last?: number
2828
/** Redact PII/secrets (default true). Set false to get raw spans. */
2929
redact?: boolean
30+
/** Called on a per-adapter locate/parse failure (collection continues). */
31+
onError?: (error: unknown, ref?: SessionRef) => void
3032
}
3133

3234
export interface SessionBatch {
@@ -54,15 +56,17 @@ export async function collectSessions(opts: CollectOptions = {}): Promise<Sessio
5456
let refs: SessionRef[]
5557
try {
5658
refs = await adapter.locate({ cwd: opts.cwd, sinceMs: opts.sinceMs })
57-
} catch {
59+
} catch (err) {
60+
opts.onError?.(err)
5861
continue
5962
}
6063
if (opts.last && opts.last > 0) refs = refs.slice(0, opts.last)
6164
for (const ref of refs) {
6265
let spans: OtlpSpan[]
6366
try {
6467
spans = await adapter.parse(ref)
65-
} catch {
68+
} catch (err) {
69+
opts.onError?.(err, ref)
6670
continue
6771
}
6872
if (spans.length === 0) continue

src/observer.ts

556 Bytes
Binary file not shown.

src/otlp.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ export async function writeOtlpFile(spans: readonly OtlpSpan[], outPath?: string
102102
const { mkdtemp, writeFile } = await import('node:fs/promises')
103103
const { tmpdir } = await import('node:os')
104104
const { join } = await import('node:path')
105-
const path = outPath ?? join(await mkdtemp(join(tmpdir(), 'tangle-traces-')), 'spans.otlp.jsonl')
105+
const path = outPath ?? join(await mkdtemp(join(tmpdir(), 'traces-')), 'spans.otlp.jsonl')
106106
await writeFile(path, serializeSpans(spans), 'utf8')
107107
return path
108108
}

src/upload.ts

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ export interface PlanOptions {
4949
all?: boolean
5050
cwd?: string
5151
sinceMs?: number
52+
/** Called on a per-adapter locate/parse failure (planning continues). */
53+
onError?: (error: unknown, ref?: SessionRef) => void
5254
}
5355

5456
function adaptersFor(opts: PlanOptions): HarnessTraceAdapter[] {
@@ -66,14 +68,16 @@ export async function planUpload(opts: PlanOptions): Promise<UploadPlan> {
6668
let refs: SessionRef[]
6769
try {
6870
refs = await adapter.locate({ cwd: opts.cwd, sinceMs: opts.sinceMs })
69-
} catch {
71+
} catch (err) {
72+
opts.onError?.(err)
7073
continue
7174
}
7275
for (const ref of refs) {
7376
let raw: OtlpSpan[]
7477
try {
7578
raw = await adapter.parse(ref)
76-
} catch {
79+
} catch (err) {
80+
opts.onError?.(err, ref)
7781
continue
7882
}
7983
if (raw.length === 0) continue
@@ -128,6 +132,10 @@ function epochMs(ts: string): number {
128132
return Number.isNaN(n) ? 0 : n
129133
}
130134

135+
// TraceSpanEvent.*UnixNano is typed `number`, and our source resolution is
136+
// milliseconds. ms × 1e6 exceeds MAX_SAFE_INTEGER, so the low ~256ns are not
137+
// representable — but that's below our input resolution, and since both ends
138+
// are integer-ms × 1e6 the rounding preserves ordering (start ≤ end always).
131139
const msToNano = (iso: string): number => epochMs(iso) * 1_000_000
132140

133141
/** Map redacted OtlpSpan[] → the hosted TraceSpanEvent[] wire shape, attaching

tests/sdk.test.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,22 @@ describe('watchSessions (observer event API)', () => {
7272
expect(loops[0]!.toolName).toBe('bash')
7373
expect(loops[0]!.occurrences).toBeGreaterThanOrEqual(3)
7474
})
75+
76+
it('routes a throwing onLoop to onError instead of crashing the loop', async () => {
77+
const controller = new AbortController()
78+
const errors: unknown[] = []
79+
await watchSessions({
80+
adapters: [adapterOf(loopSpans(4))],
81+
intervalMs: 250,
82+
minLoopOccurrences: 3,
83+
signal: controller.signal,
84+
onLoop: () => { throw new Error('boom') },
85+
onError: (e) => errors.push(e),
86+
onTick: () => controller.abort(),
87+
})
88+
expect(errors).toHaveLength(1)
89+
expect((errors[0] as Error).message).toBe('boom')
90+
})
7591
})
7692

7793
describe('collectSessions (batch seam)', () => {

0 commit comments

Comments
 (0)