diff --git a/SPEC.md b/SPEC.md index d5b7d68dd9..ff5cebdd9f 100644 --- a/SPEC.md +++ b/SPEC.md @@ -451,6 +451,9 @@ fields locally if they want stricter startup checks. - Default: implementation-defined. - `turn_sandbox_policy` (Codex `SandboxPolicy` value) - Default: implementation-defined. + - Runtime note: when the policy type is `workspaceWrite`, implementations should ensure the + current issue workspace remains writable even when callers add extra `writableRoots` for + linked-worktree metadata or similar adjunct paths. - `turn_timeout_ms` (integer) - Default: `3600000` (1 hour) - `read_timeout_ms` (integer) @@ -760,6 +763,9 @@ Retry entry creation: - Cancel any existing retry timer for the same issue. - Store `attempt`, `identifier`, `error`, `due_at_ms`, and new timer handle. +- Keep the issue claimed while retrying so a queued handoff cannot be dispatched twice. +- If a claim lease exists, update it to `retrying` with retry due/backoff metadata and the last + seen worker/workspace details. Backoff formula: @@ -784,6 +790,23 @@ Note: - Retry handling mainly operates on active candidates and releases claims when the issue is absent, rather than performing terminal cleanup itself. +Claim lease behavior: + +- When an issue is claimed for a worker, create a visible tracker comment marker headed + `## Symphony Claim Lease`. +- Persist these lease fields in runtime state and tracker-visible marker text: + `worker_id`, `worker_host`, `workspace_path`, `attempt`, `last_seen_at`, and + `lease_expires_at`. +- The default lease TTL is derived from polling cadence: at least 60 seconds and at least three poll + intervals. +- Active workers refresh the lease during poll reconciliation and when runtime/Codex activity is + observed. +- Retry and blocked transitions update the lease marker with `retrying` or `blocked` state and + relevant error/backoff details. +- Expired leases are recovered only when no live running or blocked worker exists for the issue. + Recovery logs the expiration, records an `expired` claim entry for observability, and requeues the + issue for retry handoff. + ### 8.5 Active Run Reconciliation Reconciliation runs every tick and has two parts. @@ -1300,6 +1323,12 @@ SHOULD return: - `output_tokens` - `total_tokens` - `seconds_running` (aggregate runtime seconds as of snapshot time, including active sessions) +- `token_usage` (optional durable token summary across completed and active sessions) + - `input_tokens` + - `output_tokens` + - `total_tokens` + - `issue_count` + - `session_count` - `rate_limits` (latest coding-agent rate limit payload, if available) RECOMMENDED snapshot error modes: @@ -1330,6 +1359,10 @@ Token accounting rules: - Do not treat generic `usage` maps as cumulative totals unless the event type defines them that way. - Accumulate aggregate totals in orchestrator state. +- Implementations may also persist append-only token observations for durable per-issue + observability. If they do, summarize by taking the high-water cumulative totals per + `(issue_identifier, session_id)` and then summing those session totals; do not sum every observed + event. Runtime accounting: @@ -1408,7 +1441,9 @@ Minimum endpoints: "generated_at": "2026-02-24T20:15:30Z", "counts": { "running": 2, - "retrying": 1 + "retrying": 1, + "blocked": 0, + "expired": 0 }, "running": [ { @@ -1439,12 +1474,32 @@ Minimum endpoints: "error": "no available orchestrator slots" } ], + "claim_leases": [ + { + "issue_id": "abc123", + "issue_identifier": "MT-649", + "state": "active", + "worker_id": "local:#PID<0.123.0>", + "workspace_path": "/tmp/symphony_workspaces/MT-649", + "attempt": 1, + "last_seen_at": "2026-02-24T20:14:59Z", + "lease_expires_at": "2026-02-24T20:16:30Z" + } + ], + "expired": [], "codex_totals": { "input_tokens": 5000, "output_tokens": 2400, "total_tokens": 7400, "seconds_running": 1834.2 }, + "token_usage": { + "input_tokens": 5000, + "output_tokens": 2400, + "total_tokens": 7400, + "issue_count": 2, + "session_count": 3 + }, "rate_limits": null } ``` @@ -1498,12 +1553,21 @@ Minimum endpoints: } ], "last_error": null, - "tracked": {} + "tracked": {}, + "token_usage": { + "input_tokens": 1200, + "output_tokens": 800, + "total_tokens": 2000, + "session_count": 1 + } } ``` - - If the issue is unknown to the current in-memory state, return `404` with an error response (for - example `{\"error\":{\"code\":\"issue_not_found\",\"message\":\"...\"}}`). + - If the issue is unknown to the current in-memory state but exists in a durable token ledger, an + implementation may return an inactive issue payload with token usage. + - If the issue is unknown to both the current in-memory state and durable observability state, + return `404` with an error response (for example + `{\"error\":{\"code\":\"issue_not_found\",\"message\":\"...\"}}`). - `POST /api/v1/refresh` - Queues an immediate tracker poll + reconciliation cycle (best-effort trigger; implementations diff --git a/elixir/README.md b/elixir/README.md index e31925b37b..753e2e7740 100644 --- a/elixir/README.md +++ b/elixir/README.md @@ -31,6 +31,12 @@ issue claimed and exposes it as blocked in the runtime state, JSON API, and dash entries are in memory only; restarting the orchestrator clears that blocked map, so any still-active Linear issue can become a dispatch candidate again after restart. +Claimed issues also get a Symphony claim lease marker through the tracker comment API. The lease +records the last-seen worker id, workspace path, attempt number, last heartbeat time, and expiry. +Active workers refresh the lease during poll and Codex activity; retry and blocked transitions +update the same lease state. If a non-live claim lease expires, Symphony logs the recovery and +requeues the issue without starting a duplicate worker for a still-running claim. + ## How to use it 1. Make sure your codebase is set up to work well with agents: see @@ -85,6 +91,12 @@ Optional flags: - `--logs-root` tells Symphony to write logs under a different directory (default: `./log`) - `--port` also starts the Phoenix observability service (default: disabled) +Symphony also writes durable Codex token usage observations to `token_usage.jsonl` next to the +configured log file. With the default log path, this is `./log/token_usage.jsonl`; with +`--logs-root`, it follows the same log root. The ledger stores cumulative high-water token totals +per issue/session so completed tickets can still be inspected after the in-memory dashboard state +has moved on. + The `WORKFLOW.md` file uses YAML front matter for configuration, plus a Markdown body used as the Codex session prompt. @@ -124,9 +136,11 @@ Notes: - `codex.turn_sandbox_policy` defaults to a `workspaceWrite` policy rooted at the current issue workspace - Supported `codex.approval_policy` values depend on the targeted Codex app-server version. In the current local Codex schema, string values include `untrusted`, `on-failure`, `on-request`, and `never`, and object-form `reject` is also supported. - Supported `codex.thread_sandbox` values: `read-only`, `workspace-write`, `danger-full-access`. -- When `codex.turn_sandbox_policy` is set explicitly, Symphony passes the map through to Codex - unchanged. Compatibility then depends on the targeted Codex app-server version rather than local - Symphony validation. +- When `codex.turn_sandbox_policy` is set explicitly, Symphony forwards the configured map to + Codex, but for `workspaceWrite` policies it ensures the current issue workspace stays in + `writableRoots` at runtime. This allows adding extra writable paths without granting access to + sibling workspaces by default. Compatibility for the remaining fields still depends on the + targeted Codex app-server version rather than local Symphony validation. - Workflows that run package managers or other commands that resolve external hosts should set `networkAccess: true` in `codex.turn_sandbox_policy`; otherwise DNS/network access may be denied by the Codex turn sandbox. @@ -168,10 +182,17 @@ The observability UI now runs on a minimal Phoenix stack: - LiveView for the dashboard at `/` - JSON API for operational debugging under `/api/v1/*` +- Active, retrying, blocked, and expired claim lease visibility - Bandit as the HTTP server - Phoenix dependency static assets for the LiveView client bootstrap - Tracker issue identifiers link to the tracker-provided URL when it uses `http` or `https` +The JSON API includes durable token summaries from `token_usage.jsonl`: + +- `/api/v1/state` includes `token_usage` totals plus issue/session counts. +- `/api/v1/` can return `status: "inactive"` with `token_usage` for a completed + or otherwise inactive issue that is no longer present in the live running/retry state. + ## Project Layout - `lib/`: application code and Mix tasks diff --git a/elixir/docs/token_accounting.md b/elixir/docs/token_accounting.md index 2c6e107be2..dbcb6e1d77 100644 --- a/elixir/docs/token_accounting.md +++ b/elixir/docs/token_accounting.md @@ -285,11 +285,23 @@ That is a strong signal for Symphony: - use absolute totals as the main accounting surface - ignore last/delta values for totals +## Durable Per-Issue Ledger + +The Elixir reference implementation persists token observations to `token_usage.jsonl` next to the +configured log file. Each line is an append-only JSON object containing the issue identifier, Codex +session id, source event, final/non-final marker, and cumulative input/output/total token values. + +The ledger is summarized by taking the maximum observed totals per `(issue_identifier, session_id)` +and then summing those session high-water marks. This makes repeated live updates, retries, and +final snapshots safe to append without double-counting. The ledger remains observability data only: +it is not a billing surface and does not apply pricing or model-specific cost rules. + ## Recommended Symphony Documentation Contract If Symphony documents token reporting externally, the contract should be: - Live token totals come from Codex thread-scoped cumulative usage. +- Durable per-issue totals come from high-water cumulative totals per Codex session. - Incremental usage may also be emitted, but Symphony does not use it for totals. - Turn-completed usage is event-specific and should not be assumed to be a fresh additive increment. - Reporting is thread-based, and multiple turns can occur on one thread. @@ -300,5 +312,6 @@ If Symphony documents token reporting externally, the contract should be: - Fallback to `info.total_token_usage` - Ignore `last` for totals - Key totals by `thread_id` +- Persist high-water totals by `(issue_identifier, session_id)` - Do not classify generic `usage` by field name alone - Do not double-count turn-completed usage after live updates diff --git a/elixir/lib/symphony_elixir/agent_runner.ex b/elixir/lib/symphony_elixir/agent_runner.ex index 8e199f4da3..65ac6b4cc7 100644 --- a/elixir/lib/symphony_elixir/agent_runner.ex +++ b/elixir/lib/symphony_elixir/agent_runner.ex @@ -382,14 +382,10 @@ defmodule SymphonyElixir.AgentRunner do Issue.stop_continue_labeled?(issue, Config.settings!().agent.stop_continue_labels) end - defp stop_continue_label?(_issue), do: false - defp issue_routable?(%Issue{} = issue) do Issue.routable?(issue, Config.settings!().tracker.required_labels) end - defp issue_routable?(_issue), do: false - defp active_issue_state?(state_name) when is_binary(state_name) do normalized_state = normalize_issue_state(state_name) diff --git a/elixir/lib/symphony_elixir/config/schema.ex b/elixir/lib/symphony_elixir/config/schema.ex index 2059c00b91..a763108cc6 100644 --- a/elixir/lib/symphony_elixir/config/schema.ex +++ b/elixir/lib/symphony_elixir/config/schema.ex @@ -353,7 +353,7 @@ defmodule SymphonyElixir.Config.Schema do def resolve_runtime_turn_sandbox_policy(settings, workspace \\ nil, opts \\ []) do case settings.codex.turn_sandbox_policy do %{} = policy -> - {:ok, policy} + {:ok, ensure_workspace_write_root(policy, workspace, opts)} _ -> workspace @@ -579,6 +579,35 @@ defmodule SymphonyElixir.Config.Schema do {:error, {:unsafe_turn_sandbox_policy, {:invalid_workspace_root, workspace_root}}} end + defp ensure_workspace_write_root(%{"type" => "workspaceWrite"} = policy, workspace, opts) do + case runtime_workspace_write_root(workspace, opts) do + nil -> + policy + + workspace_root -> + writable_roots = + policy + |> Map.get("writableRoots", []) + |> normalize_writable_roots() + |> List.insert_at(0, workspace_root) + |> Enum.uniq() + + Map.put(policy, "writableRoots", writable_roots) + end + end + + defp ensure_workspace_write_root(policy, _workspace, _opts), do: policy + + defp runtime_workspace_write_root(workspace, opts) + when is_binary(workspace) and workspace != "" do + if Keyword.get(opts, :remote, false), do: workspace, else: Path.expand(workspace) + end + + defp runtime_workspace_write_root(_workspace, _opts), do: nil + + defp normalize_writable_roots(roots) when is_list(roots), do: roots + defp normalize_writable_roots(_roots), do: [] + defp default_workspace_root(workspace, _fallback) when is_binary(workspace) and workspace != "", do: workspace diff --git a/elixir/lib/symphony_elixir/orchestrator.ex b/elixir/lib/symphony_elixir/orchestrator.ex index df0f38470b..270514589c 100644 --- a/elixir/lib/symphony_elixir/orchestrator.ex +++ b/elixir/lib/symphony_elixir/orchestrator.ex @@ -7,7 +7,7 @@ defmodule SymphonyElixir.Orchestrator do require Logger import Bitwise, only: [<<<: 2] - alias SymphonyElixir.{AgentRunner, Config, Ledger, StatusDashboard, Tracker, Workspace} + alias SymphonyElixir.{AgentRunner, Config, Ledger, StatusDashboard, TokenUsageLedger, Tracker, Workspace} alias SymphonyElixir.Linear.Issue @continuation_retry_delay_ms 1_000 @@ -18,6 +18,9 @@ defmodule SymphonyElixir.Orchestrator do # so it never recovers (self-perpetuating thrash). A 429/issue_state_refresh # failure means "stop hitting the API", so we wait minutes, not seconds. @rate_limit_retry_ms 300_000 + @minimum_claim_lease_ttl_ms 60_000 + @claim_lease_ttl_poll_multiplier 3 + @claim_lease_marker_interval_ms 60_000 # Slightly above the dashboard render interval so "checking now…" can render. @poll_transition_render_delay_ms 20 @empty_codex_totals %{ @@ -32,6 +35,8 @@ defmodule SymphonyElixir.Orchestrator do Runtime state for the orchestrator polling loop. """ + @type t :: %__MODULE__{} + defstruct [ :poll_interval_ms, :max_concurrent_agents, @@ -48,6 +53,8 @@ defmodule SymphonyElixir.Orchestrator do claimed: MapSet.new(), blocked: %{}, retry_attempts: %{}, + claim_leases: %{}, + expired_claims: %{}, codex_totals: nil, codex_rate_limits: nil ] @@ -168,8 +175,13 @@ defmodule SymphonyElixir.Orchestrator do |> maybe_put_runtime_value(:worker_host, runtime_info[:worker_host]) |> maybe_put_runtime_value(:workspace_path, runtime_info[:workspace_path]) + state = + state + |> Map.put(:running, Map.put(running, issue_id, updated_running_entry)) + |> refresh_claim_lease_from_running(issue_id, updated_running_entry) + notify_dashboard() - {:noreply, %{state | running: Map.put(running, issue_id, updated_running_entry)}} + {:noreply, state} end end @@ -183,6 +195,7 @@ defmodule SymphonyElixir.Orchestrator do running_entry -> {updated_running_entry, token_delta} = integrate_codex_update(running_entry, update) + :ok = append_token_usage_observation(issue_id, updated_running_entry, update, false) Ledger.add_tokens(issue_id, token_delta) maybe_record_codex_ledger_event(issue_id, update) @@ -191,7 +204,8 @@ defmodule SymphonyElixir.Orchestrator do state |> apply_codex_token_delta(token_delta) |> apply_codex_rate_limits(update) - |> then(fn state -> %{state | running: Map.put(running, issue_id, updated_running_entry)} end) + |> Map.put(:running, Map.put(running, issue_id, updated_running_entry)) + |> refresh_claim_lease_from_running(issue_id, updated_running_entry) |> enforce_issue_token_budget(issue_id, updated_running_entry) notify_dashboard() @@ -233,7 +247,8 @@ defmodule SymphonyElixir.Orchestrator do delay_type: :continuation, worker_host: Map.get(running_entry, :worker_host), workspace_path: Map.get(running_entry, :workspace_path), - previous_attempt: previous_attempt_from_running(running_entry) + previous_attempt: previous_attempt_from_running(running_entry), + worker_id: lease_worker_id(running_entry) }) end end @@ -298,7 +313,8 @@ defmodule SymphonyElixir.Orchestrator do error: "agent exited: #{inspect(reason)}", worker_host: Map.get(running_entry, :worker_host), workspace_path: Map.get(running_entry, :workspace_path), - previous_attempt: previous_attempt_from_running(running_entry) + previous_attempt: previous_attempt_from_running(running_entry), + worker_id: lease_worker_id(running_entry) }) end @@ -307,15 +323,16 @@ defmodule SymphonyElixir.Orchestrator do state |> reconcile_running_issues() |> reconcile_blocked_issues() - |> drain_slot_queue() + |> refresh_running_claim_leases() with :ok <- Config.validate!(), :ok <- ensure_workspace_mirror(), - {:ok, issues} <- fetch_candidate_issues_for_dispatch(state), - true <- available_slots(state) > 0 do + {:ok, issues} <- fetch_candidate_issues_for_dispatch(state) do state + |> recover_expired_claim_leases(issues) |> update_candidate_cache(issues) - |> choose_issues_from_cache() + |> drain_slot_queue() + |> dispatch_cached_issues() else {:error, :missing_linear_api_token} -> Logger.error("Linear API token missing in WORKFLOW.md") @@ -354,9 +371,14 @@ defmodule SymphonyElixir.Orchestrator do {:error, reason} -> Logger.error("Failed to fetch from Linear: #{inspect(reason)}") state + end + end - false -> - state + defp dispatch_cached_issues(%State{} = state) do + if available_slots(state) > 0 do + choose_issues_from_cache(state) + else + state end end @@ -500,6 +522,26 @@ defmodule SymphonyElixir.Orchestrator do select_worker_host(state, preferred_worker_host) end + @doc false + @spec start_claim_lease_for_test(State.t(), Issue.t(), map(), integer() | nil) :: State.t() + def start_claim_lease_for_test(%State{} = state, %Issue{} = issue, running_entry, attempt \\ nil) + when is_map(running_entry) do + start_claim_lease(state, issue, running_entry, attempt) + end + + @doc false + @spec refresh_claim_lease_from_running_for_test(State.t(), String.t(), map()) :: State.t() + def refresh_claim_lease_from_running_for_test(%State{} = state, issue_id, running_entry) + when is_binary(issue_id) and is_map(running_entry) do + refresh_claim_lease_from_running(state, issue_id, running_entry) + end + + @doc false + @spec recover_expired_claim_leases_for_test(State.t(), [Issue.t()]) :: State.t() + def recover_expired_claim_leases_for_test(%State{} = state, issues) when is_list(issues) do + recover_expired_claim_leases(state, issues) + end + defp reconcile_running_issue_states([], state, _active_states, _terminal_states), do: state defp reconcile_running_issue_states([issue | rest], state, active_states, terminal_states) do @@ -651,6 +693,401 @@ defmodule SymphonyElixir.Orchestrator do end end + defp refresh_running_claim_leases(%State{} = state) do + Enum.reduce(state.running, state, fn {issue_id, running_entry}, state_acc -> + refresh_claim_lease_from_running(state_acc, issue_id, running_entry) + end) + end + + defp start_claim_lease(%State{} = state, %Issue{} = issue, running_entry, attempt) + when is_map(running_entry) do + times = claim_lease_times() + + lease = + state.claim_leases + |> Map.get(issue.id, %{}) + |> Map.merge(%{ + issue_id: issue.id, + identifier: issue.identifier || issue.id, + state: :active, + worker_id: lease_worker_id(running_entry), + worker_host: Map.get(running_entry, :worker_host), + workspace_path: Map.get(running_entry, :workspace_path), + attempt: claim_attempt_number(attempt), + last_seen_at: times.now, + lease_started_at: times.now, + lease_expires_at: times.expires_at, + lease_expires_at_ms: times.expires_at_ms, + heartbeat_count: 1, + retry_due_at: nil, + retry_due_at_ms: nil, + retry_backoff_ms: nil, + error: nil + }) + + put_claim_lease(state, issue.id, lease, times.now_ms, force: true) + end + + defp refresh_claim_lease_from_running(%State{} = state, issue_id, running_entry) + when is_binary(issue_id) and is_map(running_entry) do + case Map.get(state.claim_leases, issue_id) do + nil -> + state + + lease -> + times = claim_lease_times() + + refreshed_lease = + lease + |> Map.merge(%{ + state: :active, + worker_id: lease_worker_id(running_entry), + worker_host: Map.get(running_entry, :worker_host), + workspace_path: Map.get(running_entry, :workspace_path), + attempt: Map.get(lease, :attempt, 1), + last_seen_at: times.now, + lease_expires_at: times.expires_at, + lease_expires_at_ms: times.expires_at_ms, + heartbeat_count: Map.get(lease, :heartbeat_count, 0) + 1, + retry_due_at: nil, + retry_due_at_ms: nil, + retry_backoff_ms: nil, + error: nil + }) + + put_claim_lease(state, issue_id, refreshed_lease, times.now_ms) + end + end + + defp mark_retry_claim_lease(%State{} = state, issue_id, retry_entry) when is_binary(issue_id) do + case Map.get(state.claim_leases, issue_id) do + nil -> + state + + lease -> + times = claim_lease_times() + retry_due_at_ms = Map.get(retry_entry, :due_at_ms) + retry_expires_at_ms = retry_lease_expires_at_ms(times, retry_due_at_ms) + + retry_lease = + lease + |> Map.merge(%{ + state: :retrying, + identifier: retry_entry_value(retry_entry, lease, :identifier, issue_id), + worker_id: retry_entry_value(retry_entry, lease, :worker_id), + worker_host: Map.get(retry_entry, :worker_host), + workspace_path: Map.get(retry_entry, :workspace_path), + attempt: retry_entry_value(retry_entry, lease, :attempt, 1), + lease_expires_at: DateTime.add(times.now, retry_expires_at_ms - times.now_ms, :millisecond), + lease_expires_at_ms: retry_expires_at_ms, + retry_due_at: retry_due_at(times, retry_due_at_ms), + retry_due_at_ms: retry_due_at_ms, + retry_backoff_ms: retry_backoff_ms(times, retry_due_at_ms), + error: Map.get(retry_entry, :error) + }) + + put_claim_lease(state, issue_id, retry_lease, times.now_ms, force: true) + end + end + + defp retry_entry_value(retry_entry, lease, key, fallback \\ nil) do + Map.get(retry_entry, key) || Map.get(lease, key) || fallback + end + + defp retry_lease_expires_at_ms(times, retry_due_at_ms) do + max(times.expires_at_ms, (retry_due_at_ms || times.now_ms) + claim_lease_ttl_ms()) + end + + defp retry_due_at(_times, nil), do: nil + + defp retry_due_at(times, retry_due_at_ms) when is_integer(retry_due_at_ms) do + DateTime.add(times.now, retry_due_at_ms - times.now_ms, :millisecond) + end + + defp retry_backoff_ms(_times, nil), do: nil + + defp retry_backoff_ms(times, retry_due_at_ms) when is_integer(retry_due_at_ms) do + max(0, retry_due_at_ms - times.now_ms) + end + + defp mark_blocked_claim_lease(%State{} = state, issue_id, blocked_entry) when is_binary(issue_id) do + case Map.get(state.claim_leases, issue_id) do + nil -> + state + + lease -> + times = claim_lease_times() + + blocked_lease = + lease + |> Map.merge(%{ + state: :blocked, + identifier: Map.get(blocked_entry, :identifier) || Map.get(lease, :identifier) || issue_id, + worker_id: Map.get(lease, :worker_id), + worker_host: Map.get(blocked_entry, :worker_host), + workspace_path: Map.get(blocked_entry, :workspace_path), + session_id: Map.get(blocked_entry, :session_id), + lease_expires_at: times.expires_at, + lease_expires_at_ms: times.expires_at_ms, + error: Map.get(blocked_entry, :error) + }) + + put_claim_lease(state, issue_id, blocked_lease, times.now_ms, force: true) + end + end + + defp recover_expired_claim_leases(%State{} = state, issues) when is_list(issues) do + now_ms = System.monotonic_time(:millisecond) + issues_by_id = Map.new(issues, &{&1.id, &1}) + + state.claim_leases + |> Enum.filter(fn {issue_id, lease} -> + claim_lease_expired?(lease, now_ms) and not live_claim?(state, issue_id) + end) + |> Enum.reduce(state, fn {issue_id, lease}, state_acc -> + recover_expired_claim_lease(state_acc, issue_id, lease, Map.get(issues_by_id, issue_id), now_ms) + end) + end + + defp recover_expired_claim_leases(state, _issues), do: state + + defp recover_expired_claim_lease(%State{} = state, issue_id, lease, %Issue{} = issue, _now_ms) do + if retry_candidate_issue?(issue, terminal_state_set()) do + expired_at = DateTime.utc_now() + error = "claim lease expired at #{iso8601(expired_at)}; requeueing" + attempt = expired_retry_attempt(state, issue_id, lease) + + Logger.warning("Claim lease expired; requeueing issue_id=#{issue_id} issue_identifier=#{issue.identifier} attempt=#{attempt}") + + state + |> put_expired_claim(issue_id, lease, expired_at, error) + |> schedule_issue_retry(issue_id, attempt, %{ + identifier: issue.identifier, + error: error, + worker_host: Map.get(lease, :worker_host), + workspace_path: Map.get(lease, :workspace_path), + worker_id: Map.get(lease, :worker_id), + delay_ms: 0 + }) + else + Logger.warning("Claim lease expired for non-candidate issue_id=#{issue_id}; releasing claim") + release_issue_claim(state, issue_id) + end + end + + defp recover_expired_claim_lease(%State{} = state, issue_id, _lease, _issue, _now_ms) do + Logger.warning("Claim lease expired for invisible issue_id=#{issue_id}; releasing claim") + release_issue_claim(state, issue_id) + end + + defp put_expired_claim(%State{} = state, issue_id, lease, expired_at, error) do + expired_claim = + lease + |> Map.merge(%{ + state: :expired, + expired_at: expired_at, + requeued_at: DateTime.utc_now(), + error: error + }) + + %{state | expired_claims: Map.put(state.expired_claims, issue_id, expired_claim)} + end + + defp put_claim_lease(%State{} = state, issue_id, lease, now_ms, opts \\ []) do + previous = Map.get(state.claim_leases, issue_id) + lease = maybe_publish_claim_lease_marker(issue_id, previous, lease, now_ms, opts) + + %{state | claim_leases: Map.put(state.claim_leases, issue_id, lease)} + end + + defp maybe_publish_claim_lease_marker(issue_id, previous, lease, now_ms, opts) do + if claim_lease_marker_due?(previous, lease, now_ms, opts) do + body = claim_lease_marker_body(lease) + + case safe_create_tracker_comment(issue_id, body) do + :ok -> Map.put(lease, :last_marker_at_ms, now_ms) + {:error, _reason} -> inherit_last_marker_at(lease, previous) + end + else + inherit_last_marker_at(lease, previous) + end + end + + defp claim_lease_marker_due?(nil, _lease, _now_ms, _opts), do: true + + defp claim_lease_marker_due?(previous, lease, now_ms, opts) do + Keyword.get(opts, :force, false) or + claim_lease_material_change?(previous, lease) or + claim_lease_marker_interval_due?(previous, now_ms) + end + + defp claim_lease_material_change?(previous, lease) do + fields = [:state, :worker_id, :worker_host, :workspace_path, :attempt, :retry_due_at_ms, :error] + + Enum.any?(fields, fn field -> + Map.get(previous, field) != Map.get(lease, field) + end) + end + + defp claim_lease_marker_interval_due?(previous, now_ms) do + case Map.get(previous, :last_marker_at_ms) do + marker_at_ms when is_integer(marker_at_ms) -> now_ms - marker_at_ms >= @claim_lease_marker_interval_ms + _ -> true + end + end + + defp inherit_last_marker_at(lease, nil), do: lease + + defp inherit_last_marker_at(lease, previous) do + Map.put(lease, :last_marker_at_ms, Map.get(previous, :last_marker_at_ms)) + end + + defp safe_create_tracker_comment(issue_id, body) do + Tracker.create_comment(issue_id, body) + rescue + error -> + Logger.warning("Failed to write claim lease marker for issue_id=#{issue_id}: #{Exception.message(error)}") + {:error, error} + catch + kind, reason -> + Logger.warning("Failed to write claim lease marker for issue_id=#{issue_id}: #{inspect({kind, reason})}") + {:error, reason} + end + + defp claim_lease_marker_body(lease) do + ["## Symphony Claim Lease", "" | claim_lease_marker_lines(lease)] + |> Enum.join("\n") + end + + defp claim_lease_marker_lines(lease) do + [ + {:state, Map.get(lease, :state), "n/a"}, + {:worker_id, Map.get(lease, :worker_id), "n/a"}, + {:worker_host, Map.get(lease, :worker_host), "local"}, + {:workspace_path, Map.get(lease, :workspace_path), "pending"}, + {:attempt, Map.get(lease, :attempt), 1}, + {:last_seen_at, iso8601(Map.get(lease, :last_seen_at)), "n/a"}, + {:lease_expires_at, iso8601(Map.get(lease, :lease_expires_at)), "n/a"}, + {:retry_due_at, iso8601(Map.get(lease, :retry_due_at)), "n/a"}, + {:retry_backoff_ms, Map.get(lease, :retry_backoff_ms), "n/a"}, + {:error, Map.get(lease, :error), "n/a"} + ] + |> Enum.map(fn {key, value, fallback} -> "- #{key}: #{value || fallback}" end) + end + + defp claim_lease_expired?(lease, now_ms) when is_map(lease) and is_integer(now_ms) do + case Map.get(lease, :lease_expires_at_ms) do + expires_at_ms when is_integer(expires_at_ms) -> expires_at_ms <= now_ms + _ -> false + end + end + + defp live_claim?(%State{} = state, issue_id) do + Map.has_key?(state.running, issue_id) or Map.has_key?(state.blocked, issue_id) + end + + defp expired_retry_attempt(%State{} = state, issue_id, lease) do + lease_attempt = Map.get(lease, :attempt, 1) + retry_attempt = state.retry_attempts |> Map.get(issue_id, %{}) |> Map.get(:attempt, 0) + + max(lease_attempt, retry_attempt) + 1 + end + + defp claim_lease_times do + now = DateTime.utc_now() + now_ms = System.monotonic_time(:millisecond) + ttl_ms = claim_lease_ttl_ms() + + %{ + now: now, + now_ms: now_ms, + expires_at: DateTime.add(now, ttl_ms, :millisecond), + expires_at_ms: now_ms + ttl_ms + } + end + + defp claim_lease_ttl_ms do + Config.settings!().polling.interval_ms + |> Kernel.*(@claim_lease_ttl_poll_multiplier) + |> max(@minimum_claim_lease_ttl_ms) + end + + defp claim_attempt_number(attempt) when is_integer(attempt) and attempt > 0, do: attempt + defp claim_attempt_number(_attempt), do: 1 + + defp lease_worker_id(running_entry) when is_map(running_entry) do + case Map.get(running_entry, :worker_id) do + worker_id when is_binary(worker_id) and worker_id != "" -> + worker_id + + _ -> + host = Map.get(running_entry, :worker_host) || "local" + pid = Map.get(running_entry, :pid) + "#{host}:#{if(is_pid(pid), do: inspect(pid), else: "unknown")}" + end + end + + defp restore_claim_lease(state, _issue_id, nil), do: state + + defp restore_claim_lease(%State{} = state, issue_id, lease) when is_binary(issue_id) and is_map(lease) do + %{state | claim_leases: Map.put(state.claim_leases, issue_id, lease)} + end + + defp claim_lease_snapshot_entry(lease, now_ms) do + %{ + issue_id: Map.get(lease, :issue_id), + identifier: Map.get(lease, :identifier), + state: lease_state_string(Map.get(lease, :state)), + worker_id: Map.get(lease, :worker_id), + worker_host: Map.get(lease, :worker_host), + workspace_path: Map.get(lease, :workspace_path), + attempt: Map.get(lease, :attempt), + last_seen_at: Map.get(lease, :last_seen_at), + lease_expires_at: Map.get(lease, :lease_expires_at), + lease_expires_in_ms: lease_expires_in_ms(lease, now_ms), + retry_due_at: Map.get(lease, :retry_due_at), + retry_due_in_ms: retry_due_in_ms(lease, now_ms), + retry_backoff_ms: Map.get(lease, :retry_backoff_ms), + error: Map.get(lease, :error) + } + end + + defp expired_claim_snapshot_entry(lease, now_ms) do + lease + |> claim_lease_snapshot_entry(now_ms) + |> Map.merge(%{ + state: "expired", + expired_at: Map.get(lease, :expired_at), + requeued_at: Map.get(lease, :requeued_at) + }) + end + + defp lease_expires_in_ms(lease, now_ms) do + case Map.get(lease, :lease_expires_at_ms) do + expires_at_ms when is_integer(expires_at_ms) -> expires_at_ms - now_ms + _ -> nil + end + end + + defp retry_due_in_ms(lease, now_ms) do + case Map.get(lease, :retry_due_at_ms) do + due_at_ms when is_integer(due_at_ms) -> max(0, due_at_ms - now_ms) + _ -> nil + end + end + + defp lease_state_string(state) when is_atom(state), do: Atom.to_string(state) + defp lease_state_string(state) when is_binary(state), do: state + defp lease_state_string(_state), do: nil + + defp iso8601(%DateTime{} = datetime) do + datetime + |> DateTime.truncate(:second) + |> DateTime.to_iso8601() + end + + defp iso8601(_datetime), do: nil + defp terminate_running_issue(%State{} = state, issue_id, cleanup_workspace) do case Map.get(state.running, issue_id) do nil -> @@ -671,7 +1108,9 @@ defmodule SymphonyElixir.Orchestrator do | running: Map.delete(state.running, issue_id), claimed: MapSet.delete(state.claimed, issue_id), blocked: Map.delete(state.blocked, issue_id), - retry_attempts: Map.delete(state.retry_attempts, issue_id) + retry_attempts: Map.delete(state.retry_attempts, issue_id), + claim_leases: Map.delete(state.claim_leases, issue_id), + expired_claims: Map.delete(state.expired_claims, issue_id) } _ -> @@ -727,14 +1166,19 @@ defmodule SymphonyElixir.Orchestrator do Logger.warning("Issue stalled: issue_id=#{issue_id} issue_identifier=#{identifier} session_id=#{session_id} elapsed_ms=#{elapsed_ms}; restarting with backoff") next_attempt = next_retry_attempt_from_running(running_entry) + previous_lease = Map.get(state.claim_leases, issue_id) state |> terminate_running_issue(issue_id, false) + |> restore_claim_lease(issue_id, previous_lease) |> schedule_issue_retry(issue_id, next_attempt, %{ identifier: identifier, issue_url: running_entry.issue.url, error: "stalled for #{elapsed_ms}ms without codex activity", - previous_attempt: previous_attempt_from_running(running_entry) + worker_host: Map.get(running_entry, :worker_host), + workspace_path: Map.get(running_entry, :workspace_path), + previous_attempt: previous_attempt_from_running(running_entry), + worker_id: lease_worker_id(running_entry) }) end else @@ -873,13 +1317,15 @@ defmodule SymphonyElixir.Orchestrator do last_codex_timestamp: Map.get(running_entry, :last_codex_timestamp) } - %{ + state = %{ state | running: Map.delete(state.running, issue_id), retry_attempts: Map.delete(state.retry_attempts, issue_id), claimed: MapSet.put(state.claimed, issue_id), blocked: Map.put(state.blocked, issue_id, blocked_entry) } + + mark_blocked_claim_lease(state, issue_id, blocked_entry) end defp block_issue_without_running(%State{} = state, %Issue{} = issue, reason) do @@ -1226,39 +1672,43 @@ defmodule SymphonyElixir.Orchestrator do Logger.info("Dispatching issue to agent: #{issue_context(issue)} pid=#{inspect(pid)} attempt=#{inspect(attempt)} worker_host=#{worker_host || "local"}") ledger_entry = record_dispatch_in_ledger(issue, worker_host) - running = - Map.put(state.running, issue.id, %{ - pid: pid, - ref: ref, - identifier: issue.identifier, - issue: issue, - worker_host: worker_host, - workspace_path: nil, - session_id: nil, - last_codex_message: nil, - last_codex_timestamp: nil, - last_codex_event: nil, - codex_app_server_pid: nil, - codex_input_tokens: 0, - codex_output_tokens: 0, - codex_total_tokens: 0, - codex_last_reported_input_tokens: 0, - codex_last_reported_output_tokens: 0, - codex_last_reported_total_tokens: 0, - turn_count: 0, - retry_attempt: normalize_retry_attempt(attempt), - ledger_entry: ledger_entry, - previous_attempt: previous_attempt, - started_at: DateTime.utc_now() - }) + running_entry = %{ + pid: pid, + ref: ref, + identifier: issue.identifier, + issue: issue, + worker_host: worker_host, + workspace_path: nil, + session_id: nil, + last_codex_message: nil, + last_codex_timestamp: nil, + last_codex_event: nil, + codex_app_server_pid: nil, + codex_input_tokens: 0, + codex_output_tokens: 0, + codex_total_tokens: 0, + codex_last_reported_input_tokens: 0, + codex_last_reported_output_tokens: 0, + codex_last_reported_total_tokens: 0, + turn_count: 0, + retry_attempt: normalize_retry_attempt(attempt), + ledger_entry: ledger_entry, + previous_attempt: previous_attempt, + started_at: DateTime.utc_now() + } - %{ + running = Map.put(state.running, issue.id, running_entry) + + state = %{ state | running: running, claimed: MapSet.put(state.claimed, issue.id), - retry_attempts: Map.delete(state.retry_attempts, issue.id) + retry_attempts: Map.delete(state.retry_attempts, issue.id), + expired_claims: Map.delete(state.expired_claims, issue.id) } + start_claim_lease(state, issue, running_entry, attempt) + {:error, reason} -> Logger.error("Unable to spawn agent for #{issue_context(issue)}: #{inspect(reason)}") next_attempt = if is_integer(attempt), do: attempt + 1, else: nil @@ -1314,6 +1764,7 @@ defmodule SymphonyElixir.Orchestrator do worker_host = pick_retry_worker_host(previous_retry, metadata) workspace_path = pick_retry_workspace_path(previous_retry, metadata) previous_attempt = pick_retry_previous_attempt(previous_retry, metadata) + worker_id = pick_retry_worker_id(previous_retry, metadata) Ledger.put(issue_id, %{ retries: next_attempt, @@ -1330,22 +1781,27 @@ defmodule SymphonyElixir.Orchestrator do Logger.warning("Retrying issue_id=#{issue_id} issue_identifier=#{identifier} in #{delay_ms}ms (attempt #{next_attempt})#{error_suffix}") - %{ + retry_entry = %{ + attempt: next_attempt, + timer_ref: timer_ref, + retry_token: retry_token, + due_at_ms: due_at_ms, + identifier: identifier, + issue_url: issue_url, + error: error, + worker_host: worker_host, + workspace_path: workspace_path, + previous_attempt: previous_attempt, + worker_id: worker_id + } + + state = %{ state - | retry_attempts: - Map.put(state.retry_attempts, issue_id, %{ - attempt: next_attempt, - timer_ref: timer_ref, - retry_token: retry_token, - due_at_ms: due_at_ms, - identifier: identifier, - issue_url: issue_url, - error: error, - worker_host: worker_host, - workspace_path: workspace_path, - previous_attempt: previous_attempt - }) + | claimed: MapSet.put(state.claimed, issue_id), + retry_attempts: Map.put(state.retry_attempts, issue_id, retry_entry) } + + mark_retry_claim_lease(state, issue_id, retry_entry) end defp pop_retry_attempt_state(%State{} = state, issue_id, retry_token) when is_reference(retry_token) do @@ -1357,7 +1813,8 @@ defmodule SymphonyElixir.Orchestrator do error: Map.get(retry_entry, :error), worker_host: Map.get(retry_entry, :worker_host), workspace_path: Map.get(retry_entry, :workspace_path), - previous_attempt: Map.get(retry_entry, :previous_attempt) + previous_attempt: Map.get(retry_entry, :previous_attempt), + worker_id: Map.get(retry_entry, :worker_id) } {:ok, attempt, metadata, %{state | retry_attempts: Map.delete(state.retry_attempts, issue_id)}} @@ -1542,12 +1999,17 @@ defmodule SymphonyElixir.Orchestrator do | claimed: MapSet.delete(state.claimed, issue_id), blocked: Map.delete(state.blocked, issue_id), retry_attempts: Map.delete(state.retry_attempts, issue_id), - slot_queue: reject_slot_queue_issue(state.slot_queue, issue_id) + slot_queue: reject_slot_queue_issue(state.slot_queue, issue_id), + claim_leases: Map.delete(state.claim_leases, issue_id), + expired_claims: Map.delete(state.expired_claims, issue_id) } end defp retry_delay(attempt, metadata) when is_integer(attempt) and attempt > 0 and is_map(metadata) do cond do + is_integer(metadata[:delay_ms]) and metadata[:delay_ms] >= 0 -> + metadata[:delay_ms] + rate_limited_error?(metadata[:error]) -> max(rate_limit_retry_delay(metadata[:error]), failure_retry_delay(attempt)) @@ -1700,6 +2162,10 @@ defmodule SymphonyElixir.Orchestrator do metadata[:previous_attempt] || Map.get(previous_retry, :previous_attempt) end + defp pick_retry_worker_id(previous_retry, metadata) do + metadata[:worker_id] || Map.get(previous_retry, :worker_id) + end + defp maybe_put_runtime_value(running_entry, _key, nil), do: running_entry defp maybe_put_runtime_value(running_entry, key, value) when is_map(running_entry) do @@ -1897,11 +2363,21 @@ defmodule SymphonyElixir.Orchestrator do } end) + claim_leases = + state.claim_leases + |> Enum.map(fn {_issue_id, lease} -> claim_lease_snapshot_entry(lease, now_ms) end) + + expired = + state.expired_claims + |> Enum.map(fn {_issue_id, lease} -> expired_claim_snapshot_entry(lease, now_ms) end) + {:reply, %{ running: running, retrying: retrying, blocked: blocked, + claim_leases: claim_leases, + expired: expired, codex_totals: state.codex_totals, ledger: Ledger.all(), rate_limits: Map.get(state, :codex_rate_limits), @@ -2046,6 +2522,7 @@ defmodule SymphonyElixir.Orchestrator do end defp record_session_completion_totals(state, running_entry) when is_map(running_entry) do + :ok = append_token_usage_observation(running_entry_issue_id(running_entry), running_entry, nil, true) runtime_seconds = running_seconds(running_entry.started_at, DateTime.utc_now()) maybe_record_previous_attempt(running_entry) @@ -2079,6 +2556,51 @@ defmodule SymphonyElixir.Orchestrator do defp maybe_record_previous_attempt(_running_entry), do: :ok + defp append_token_usage_observation(issue_id, running_entry, update, final?) when is_map(running_entry) do + if token_usage_observation?(running_entry) do + TokenUsageLedger.append_observation(%{ + observed_at: DateTime.utc_now(), + final: final?, + issue_id: issue_id, + issue_identifier: Map.get(running_entry, :identifier), + session_id: Map.get(running_entry, :session_id), + worker_host: Map.get(running_entry, :worker_host), + workspace_path: Map.get(running_entry, :workspace_path), + turn_count: Map.get(running_entry, :turn_count, 0), + input_tokens: Map.get(running_entry, :codex_input_tokens, 0), + output_tokens: Map.get(running_entry, :codex_output_tokens, 0), + total_tokens: Map.get(running_entry, :codex_total_tokens, 0), + source_event: token_usage_source_event(update, final?) + }) + end + + :ok + end + + defp token_usage_observation?(running_entry) do + is_binary(Map.get(running_entry, :session_id)) and + Enum.any?( + [ + Map.get(running_entry, :codex_input_tokens, 0), + Map.get(running_entry, :codex_output_tokens, 0), + Map.get(running_entry, :codex_total_tokens, 0) + ], + &(&1 > 0) + ) + end + + defp token_usage_source_event(_update, true), do: :session_final + + defp token_usage_source_event(%{event: event}, _final?), do: event + + defp token_usage_source_event(_update, _final?), do: nil + + defp running_entry_issue_id(%{issue: %Issue{id: issue_id}}) when is_binary(issue_id), do: issue_id + + defp running_entry_issue_id(%{issue_id: issue_id}) when is_binary(issue_id), do: issue_id + + defp running_entry_issue_id(_running_entry), do: nil + defp refresh_runtime_config(%State{} = state) do config = Config.settings!() diff --git a/elixir/lib/symphony_elixir/token_usage_ledger.ex b/elixir/lib/symphony_elixir/token_usage_ledger.ex new file mode 100644 index 0000000000..95d05acc10 --- /dev/null +++ b/elixir/lib/symphony_elixir/token_usage_ledger.ex @@ -0,0 +1,232 @@ +defmodule SymphonyElixir.TokenUsageLedger do + @moduledoc """ + Durable JSONL ledger for per-issue Codex token usage observations. + """ + + require Logger + + alias SymphonyElixir.LogFile + + @schema_version 1 + @ledger_filename "token_usage.jsonl" + + @type token_summary :: %{ + input_tokens: non_neg_integer(), + output_tokens: non_neg_integer(), + total_tokens: non_neg_integer(), + issue_count: non_neg_integer(), + session_count: non_neg_integer() + } + + @type issue_summary :: %{ + issue_id: String.t() | nil, + issue_identifier: String.t(), + worker_host: String.t() | nil, + workspace_path: String.t() | nil, + input_tokens: non_neg_integer(), + output_tokens: non_neg_integer(), + total_tokens: non_neg_integer(), + session_count: non_neg_integer() + } + + @spec file_path() :: Path.t() + def file_path do + case Application.get_env(:symphony_elixir, :token_usage_ledger_file) do + path when is_binary(path) and path != "" -> + path + + _ -> + default_file_path() + end + end + + @spec append_observation(map(), keyword()) :: :ok + def append_observation(attrs, opts \\ []) when is_map(attrs) do + file = Keyword.get(opts, :file, file_path()) + record = observation_record(attrs) + line = Jason.encode!(record) <> "\n" + + with :ok <- File.mkdir_p(Path.dirname(file)), + :ok <- File.write(file, line, [:append]) do + :ok + else + {:error, reason} -> + Logger.warning("Failed to append token usage ledger record file=#{file} reason=#{inspect(reason)}") + :ok + end + rescue + error -> + Logger.warning("Failed to append token usage ledger record reason=#{Exception.message(error)}") + :ok + end + + @spec summary(keyword()) :: token_summary() + def summary(opts \\ []) do + high_water = opts |> read_records() |> high_water_records() + + %{ + input_tokens: sum_token(high_water, :input_tokens), + output_tokens: sum_token(high_water, :output_tokens), + total_tokens: sum_token(high_water, :total_tokens), + issue_count: high_water |> Enum.map(& &1.issue_identifier) |> Enum.uniq() |> length(), + session_count: length(high_water) + } + end + + @spec issue_summary(String.t(), keyword()) :: issue_summary() | nil + def issue_summary(issue_identifier, opts \\ []) when is_binary(issue_identifier) do + records = + opts + |> read_records() + |> Enum.filter(&(&1.issue_identifier == issue_identifier)) + + case high_water_records(records) do + [] -> + nil + + high_water -> + latest = Enum.max_by(records, & &1.observed_at, fn -> nil end) + + %{ + issue_id: latest && latest.issue_id, + issue_identifier: issue_identifier, + worker_host: latest && latest.worker_host, + workspace_path: latest && latest.workspace_path, + input_tokens: sum_token(high_water, :input_tokens), + output_tokens: sum_token(high_water, :output_tokens), + total_tokens: sum_token(high_water, :total_tokens), + session_count: length(high_water) + } + end + end + + @spec read_records(keyword()) :: [map()] + def read_records(opts \\ []) do + file = Keyword.get(opts, :file, file_path()) + + case File.read(file) do + {:ok, contents} -> + contents + |> String.split("\n", trim: true) + |> Enum.flat_map(&decode_record/1) + + {:error, _reason} -> + [] + end + end + + defp default_file_path do + log_file = Application.get_env(:symphony_elixir, :log_file, LogFile.default_log_file()) + Path.join(Path.dirname(log_file), @ledger_filename) + end + + defp observation_record(attrs) do + %{ + "schema_version" => @schema_version, + "observed_at" => observed_at(attrs), + "final" => boolean_value(value(attrs, :final), false), + "issue_id" => string_value(value(attrs, :issue_id)), + "issue_identifier" => string_value(value(attrs, :issue_identifier)), + "session_id" => string_value(value(attrs, :session_id)), + "worker_host" => string_value(value(attrs, :worker_host)), + "workspace_path" => string_value(value(attrs, :workspace_path)), + "turn_count" => integer_value(value(attrs, :turn_count), 0), + "input_tokens" => integer_value(value(attrs, :input_tokens), 0), + "output_tokens" => integer_value(value(attrs, :output_tokens), 0), + "total_tokens" => integer_value(value(attrs, :total_tokens), 0), + "source_event" => string_value(value(attrs, :source_event)) + } + end + + defp observed_at(attrs) do + case value(attrs, :observed_at) do + %DateTime{} = observed_at -> + observed_at |> DateTime.truncate(:second) |> DateTime.to_iso8601() + + observed_at when is_binary(observed_at) and observed_at != "" -> + observed_at + + _ -> + DateTime.utc_now() |> DateTime.truncate(:second) |> DateTime.to_iso8601() + end + end + + defp decode_record(line) do + case Jason.decode(line) do + {:ok, %{} = raw} -> + case normalize_record(raw) do + nil -> [] + record -> [record] + end + + _ -> + [] + end + end + + defp normalize_record(raw) do + with @schema_version <- integer_value(value(raw, :schema_version), 0), + issue_identifier when is_binary(issue_identifier) <- string_value(value(raw, :issue_identifier)), + session_id when is_binary(session_id) <- string_value(value(raw, :session_id)) do + %{ + observed_at: string_value(value(raw, :observed_at)) || "", + final: boolean_value(value(raw, :final), false), + issue_id: string_value(value(raw, :issue_id)), + issue_identifier: issue_identifier, + session_id: session_id, + worker_host: string_value(value(raw, :worker_host)), + workspace_path: string_value(value(raw, :workspace_path)), + turn_count: integer_value(value(raw, :turn_count), 0), + input_tokens: integer_value(value(raw, :input_tokens), 0), + output_tokens: integer_value(value(raw, :output_tokens), 0), + total_tokens: integer_value(value(raw, :total_tokens), 0), + source_event: string_value(value(raw, :source_event)) + } + else + _ -> nil + end + end + + defp high_water_records(records) do + records + |> Enum.group_by(&{&1.issue_identifier, &1.session_id}) + |> Enum.map(fn {_key, session_records} -> + Enum.max_by(session_records, & &1.total_tokens) + end) + end + + defp sum_token(records, key) do + Enum.reduce(records, 0, fn record, total -> + total + Map.get(record, key, 0) + end) + end + + defp value(map, key) when is_map(map) do + Map.get(map, key) || Map.get(map, Atom.to_string(key)) + end + + defp integer_value(value, _default) when is_integer(value), do: max(value, 0) + + defp integer_value(value, default) when is_binary(value) do + case Integer.parse(value) do + {parsed, ""} -> max(parsed, 0) + _ -> default + end + end + + defp integer_value(_value, default), do: default + + defp boolean_value(value, _default) when is_boolean(value), do: value + defp boolean_value(_value, default), do: default + + defp string_value(nil), do: nil + + defp string_value(value) when is_binary(value) do + trimmed = String.trim(value) + if trimmed == "", do: nil, else: trimmed + end + + defp string_value(value) when is_atom(value), do: Atom.to_string(value) + defp string_value(value) when is_integer(value), do: Integer.to_string(value) + defp string_value(_value), do: nil +end diff --git a/elixir/lib/symphony_elixir_web/live/dashboard_live.ex b/elixir/lib/symphony_elixir_web/live/dashboard_live.ex index 7439e3a09f..7466c44cec 100644 --- a/elixir/lib/symphony_elixir_web/live/dashboard_live.ex +++ b/elixir/lib/symphony_elixir_web/live/dashboard_live.ex @@ -97,6 +97,12 @@ defmodule SymphonyElixirWeb.DashboardLive do

Issues paused for operator input or approval.

+
+

Expired

+

<%= @payload.counts.expired %>

+

Claim leases recovered and handed back to retry.

+
+

Total tokens

<%= format_int(@payload.codex_totals.total_tokens) %>

@@ -123,6 +129,112 @@ defmodule SymphonyElixirWeb.DashboardLive do
<%= pretty_value(@payload.rate_limits) %>
+
+
+
+

Claim leases

+

Durable worker claim heartbeat, retry, and lease expiry state.

+
+
+ + <%= if @payload.claim_leases == [] do %> +

No active claim leases.

+ <% else %> +
+ + + + + + + + + + + + + + + + + + + + + + + + + +
IssueLease stateAttemptWorkerWorkspaceLast seenExpiresRetry/backoff
+
+ <%= entry.issue_identifier %> + JSON details +
+
+ + <%= entry.state || "claimed" %> + + <%= entry.attempt || "n/a" %> +
+ <%= entry.worker_id || "n/a" %> + <%= entry.worker_host || "local" %> +
+
<%= entry.workspace_path || "pending" %><%= entry.last_seen_at || "n/a" %><%= entry.lease_expires_at || "n/a" %> +
+ <%= entry.retry_due_at || "n/a" %> + + backoff=<%= entry.retry_backoff_ms || "n/a" %> error=<%= entry.error || "n/a" %> + +
+
+
+ <% end %> +
+ +
+
+
+

Expired leases

+

Recovered stale claims that were requeued without duplicating live workers.

+
+
+ + <%= if @payload.expired == [] do %> +

No expired leases recovered.

+ <% else %> +
+ + + + + + + + + + + + + + + + + + + + + +
IssueAttemptWorkerExpired atRequeued atError
+
+ <%= entry.issue_identifier %> + JSON details +
+
<%= entry.attempt || "n/a" %><%= entry.worker_id || "n/a" %><%= entry.expired_at || "n/a" %><%= entry.requeued_at || "n/a" %><%= entry.error || "n/a" %>
+
+ <% end %> +
+
@@ -431,7 +543,7 @@ defmodule SymphonyElixirWeb.DashboardLive do cond do String.contains?(normalized, ["progress", "running", "active"]) -> "#{base} state-badge-active" - String.contains?(normalized, ["blocked", "error", "failed"]) -> "#{base} state-badge-danger" + String.contains?(normalized, ["blocked", "error", "failed", "expired"]) -> "#{base} state-badge-danger" String.contains?(normalized, ["todo", "queued", "pending", "retry"]) -> "#{base} state-badge-warning" true -> base end diff --git a/elixir/lib/symphony_elixir_web/presenter.ex b/elixir/lib/symphony_elixir_web/presenter.ex index 1dc06fc164..e6b7e795f0 100644 --- a/elixir/lib/symphony_elixir_web/presenter.ex +++ b/elixir/lib/symphony_elixir_web/presenter.ex @@ -3,7 +3,7 @@ defmodule SymphonyElixirWeb.Presenter do Shared projections for the observability API and dashboard. """ - alias SymphonyElixir.{Config, Orchestrator, StatusDashboard} + alias SymphonyElixir.{Config, Orchestrator, StatusDashboard, TokenUsageLedger} @spec state_payload(GenServer.name(), timeout()) :: map() def state_payload(orchestrator, snapshot_timeout_ms) do @@ -16,12 +16,16 @@ defmodule SymphonyElixirWeb.Presenter do counts: %{ running: length(snapshot.running), retrying: length(snapshot.retrying), - blocked: length(Map.get(snapshot, :blocked, [])) + blocked: length(Map.get(snapshot, :blocked, [])), + expired: length(Map.get(snapshot, :expired, [])) }, running: Enum.map(snapshot.running, &running_entry_payload/1), retrying: Enum.map(snapshot.retrying, &retry_entry_payload/1), blocked: Enum.map(Map.get(snapshot, :blocked, []), &blocked_entry_payload/1), + claim_leases: Enum.map(Map.get(snapshot, :claim_leases, []), &claim_lease_payload/1), + expired: Enum.map(Map.get(snapshot, :expired, []), &expired_claim_payload/1), codex_totals: snapshot.codex_totals, + token_usage: TokenUsageLedger.summary(), rate_limits: snapshot.rate_limits } @@ -40,11 +44,14 @@ defmodule SymphonyElixirWeb.Presenter do running = Enum.find(snapshot.running, &(&1.identifier == issue_identifier)) retry = Enum.find(snapshot.retrying, &(&1.identifier == issue_identifier)) blocked = Enum.find(Map.get(snapshot, :blocked, []), &(&1.identifier == issue_identifier)) + token_usage = TokenUsageLedger.issue_summary(issue_identifier) + claim_lease = Enum.find(Map.get(snapshot, :claim_leases, []), &(&1.identifier == issue_identifier)) + expired = Enum.find(Map.get(snapshot, :expired, []), &(&1.identifier == issue_identifier)) - if is_nil(running) and is_nil(retry) and is_nil(blocked) do + if issue_entries_empty?([running, retry, blocked, token_usage, claim_lease, expired]) do {:error, :issue_not_found} else - {:ok, issue_payload_body(issue_identifier, running, retry, blocked)} + {:ok, issue_payload_body(issue_identifier, running, retry, blocked, token_usage, claim_lease, expired)} end _ -> @@ -52,6 +59,8 @@ defmodule SymphonyElixirWeb.Presenter do end end + defp issue_entries_empty?(entries), do: Enum.all?(entries, &is_nil/1) + @spec refresh_payload(GenServer.name()) :: {:ok, map()} | {:error, :unavailable} def refresh_payload(orchestrator) do case Orchestrator.request_refresh(orchestrator) do @@ -63,14 +72,14 @@ defmodule SymphonyElixirWeb.Presenter do end end - defp issue_payload_body(issue_identifier, running, retry, blocked) do + defp issue_payload_body(issue_identifier, running, retry, blocked, token_usage, claim_lease, expired) do %{ issue_identifier: issue_identifier, - issue_id: issue_id_from_entries(running, retry, blocked), - status: issue_status(running, retry, blocked), + issue_id: issue_id_from_entries(running, retry, blocked, token_usage, claim_lease, expired), + status: issue_status(running, retry, blocked, token_usage, expired, claim_lease), workspace: %{ - path: workspace_path(issue_identifier, running, retry, blocked), - host: workspace_host(running, retry, blocked) + path: workspace_path(issue_identifier, running, retry, blocked, token_usage, claim_lease, expired), + host: workspace_host(running, retry, blocked, token_usage, claim_lease, expired) }, attempts: %{ restart_count: restart_count(retry), @@ -84,20 +93,26 @@ defmodule SymphonyElixirWeb.Presenter do }, recent_events: recent_events_payload(running || blocked), last_error: (blocked && blocked.error) || (retry && retry.error), - tracked: %{} + tracked: tracked_payload(claim_lease, expired), + token_usage: token_usage && token_usage_payload(token_usage) } end - defp issue_id_from_entries(running, retry, blocked), - do: (running && running.issue_id) || (retry && retry.issue_id) || (blocked && blocked.issue_id) + defp issue_id_from_entries(running, retry, blocked, token_usage, claim_lease, expired) do + first_entry_value([running, retry, blocked, claim_lease, expired, token_usage], :issue_id) + end defp restart_count(retry), do: max(retry_attempt(retry) - 1, 0) defp retry_attempt(nil), do: 0 defp retry_attempt(retry), do: retry.attempt || 0 - defp issue_status(running, _retry, _blocked) when not is_nil(running), do: "running" - defp issue_status(nil, retry, _blocked) when not is_nil(retry), do: "retrying" - defp issue_status(nil, nil, _blocked), do: "blocked" + defp issue_status(running, _retry, _blocked, _token_usage, _expired, _claim_lease) when not is_nil(running), do: "running" + defp issue_status(nil, retry, _blocked, _token_usage, _expired, _claim_lease) when not is_nil(retry), do: "retrying" + defp issue_status(nil, nil, blocked, _token_usage, _expired, _claim_lease) when not is_nil(blocked), do: "blocked" + defp issue_status(nil, nil, nil, _token_usage, expired, _claim_lease) when not is_nil(expired), do: "expired" + defp issue_status(nil, nil, nil, _token_usage, nil, claim_lease) when not is_nil(claim_lease), do: claim_lease.state || "claimed" + defp issue_status(nil, nil, nil, %{}, nil, nil), do: "inactive" + defp issue_status(nil, nil, nil, nil, nil, nil), do: "unknown" defp running_entry_payload(entry) do %{ @@ -151,6 +166,35 @@ defmodule SymphonyElixirWeb.Presenter do } end + defp claim_lease_payload(entry) do + %{ + issue_id: entry.issue_id, + issue_identifier: entry.identifier, + state: entry.state, + worker_id: entry.worker_id, + worker_host: Map.get(entry, :worker_host), + workspace_path: Map.get(entry, :workspace_path), + attempt: entry.attempt, + last_seen_at: iso8601(entry.last_seen_at), + lease_expires_at: iso8601(entry.lease_expires_at), + lease_expires_in_ms: entry.lease_expires_in_ms, + retry_due_at: iso8601(entry.retry_due_at), + retry_due_in_ms: entry.retry_due_in_ms, + retry_backoff_ms: entry.retry_backoff_ms, + error: entry.error + } + end + + defp expired_claim_payload(entry) do + entry + |> claim_lease_payload() + |> Map.merge(%{ + state: "expired", + expired_at: iso8601(entry.expired_at), + requeued_at: iso8601(entry.requeued_at) + }) + end + defp running_issue_payload(running) do %{ worker_host: Map.get(running, :worker_host), @@ -197,17 +241,40 @@ defmodule SymphonyElixirWeb.Presenter do } end - defp workspace_path(issue_identifier, running, retry, blocked) do - (running && Map.get(running, :workspace_path)) || - (retry && Map.get(retry, :workspace_path)) || - (blocked && Map.get(blocked, :workspace_path)) || + defp tracked_payload(nil, nil), do: %{} + + defp tracked_payload(claim_lease, expired) do + %{} + |> maybe_put(:claim_lease, claim_lease && claim_lease_payload(claim_lease)) + |> maybe_put(:expired_claim, expired && expired_claim_payload(expired)) + end + + defp maybe_put(map, _key, nil), do: map + defp maybe_put(map, key, value), do: Map.put(map, key, value) + + defp workspace_path(issue_identifier, running, retry, blocked, token_usage, claim_lease, expired) do + first_entry_value([running, retry, blocked, claim_lease, expired, token_usage], :workspace_path) || Path.join(Config.settings!().workspace.root, issue_identifier) end - defp workspace_host(running, retry, blocked) do - (running && Map.get(running, :worker_host)) || - (retry && Map.get(retry, :worker_host)) || - (blocked && Map.get(blocked, :worker_host)) + defp workspace_host(running, retry, blocked, token_usage, claim_lease, expired) do + first_entry_value([running, retry, blocked, claim_lease, expired, token_usage], :worker_host) + end + + defp first_entry_value(entries, key) when is_list(entries) do + Enum.find_value(entries, fn + entry when is_map(entry) -> Map.get(entry, key) + _ -> nil + end) + end + + defp token_usage_payload(token_usage) do + %{ + input_tokens: token_usage.input_tokens, + output_tokens: token_usage.output_tokens, + total_tokens: token_usage.total_tokens, + session_count: token_usage.session_count + } end defp issue_url_from_entry(%{issue: %{url: url}}), do: url @@ -282,12 +349,7 @@ defmodule SymphonyElixirWeb.Presenter do defp ledger_payload(_ledger), do: %{} - defp ledger_entry(issue_id) when is_binary(issue_id) do - case SymphonyElixir.Ledger.get(issue_id) do - entry when is_map(entry) -> entry - _ -> %{} - end - end + defp ledger_entry(issue_id) when is_binary(issue_id), do: SymphonyElixir.Ledger.get(issue_id) defp ledger_entry(_issue_id), do: %{} diff --git a/elixir/test/support/test_support.exs b/elixir/test/support/test_support.exs index 00e651a47b..a0c35439dc 100644 --- a/elixir/test/support/test_support.exs +++ b/elixir/test/support/test_support.exs @@ -33,8 +33,10 @@ defmodule SymphonyElixir.TestSupport do File.mkdir_p!(workflow_root) workflow_file = Path.join(workflow_root, "WORKFLOW.md") + token_usage_ledger_file = Path.join(workflow_root, "token_usage.jsonl") write_workflow_file!(workflow_file) Workflow.set_workflow_file_path(workflow_file) + Application.put_env(:symphony_elixir, :token_usage_ledger_file, token_usage_ledger_file) if Process.whereis(SymphonyElixir.WorkflowStore), do: SymphonyElixir.WorkflowStore.force_reload() if Process.whereis(SymphonyElixir.Ledger), do: SymphonyElixir.Ledger.reset!() stop_default_http_server() @@ -46,6 +48,7 @@ defmodule SymphonyElixir.TestSupport do Application.delete_env(:symphony_elixir, :memory_tracker_rework_counts) Application.delete_env(:symphony_elixir, :memory_tracker_recipient) if Process.whereis(SymphonyElixir.Ledger), do: SymphonyElixir.Ledger.reset!() + Application.delete_env(:symphony_elixir, :token_usage_ledger_file) File.rm_rf(workflow_root) end) diff --git a/elixir/test/symphony_elixir/app_server_test.exs b/elixir/test/symphony_elixir/app_server_test.exs index 87e464fe3a..1f49a09d14 100644 --- a/elixir/test/symphony_elixir/app_server_test.exs +++ b/elixir/test/symphony_elixir/app_server_test.exs @@ -164,6 +164,18 @@ defmodule SymphonyElixir.AppServerTest do trace = File.read!(trace_file) lines = String.split(trace, "\n", trim: true) + {:ok, canonical_workspace} = + SymphonyElixir.PathSafety.canonicalize(Path.expand(workspace)) + + expected_policy = + case configured_policy do + %{"type" => "workspaceWrite"} -> + Map.put(configured_policy, "writableRoots", [canonical_workspace, "relative/path"]) + + _ -> + configured_policy + end + assert Enum.any?(lines, fn line -> if String.starts_with?(line, "JSON:") do line @@ -171,7 +183,7 @@ defmodule SymphonyElixir.AppServerTest do |> Jason.decode!() |> then(fn payload -> payload["method"] == "turn/start" && - get_in(payload, ["params", "sandboxPolicy"]) == configured_policy + get_in(payload, ["params", "sandboxPolicy"]) == expected_policy end) else false diff --git a/elixir/test/symphony_elixir/core_test.exs b/elixir/test/symphony_elixir/core_test.exs index ca76f49a25..9018a03e8f 100644 --- a/elixir/test/symphony_elixir/core_test.exs +++ b/elixir/test/symphony_elixir/core_test.exs @@ -2209,7 +2209,7 @@ defmodule SymphonyElixir.CoreTest do codex_thread_sandbox: "workspace-write", codex_turn_sandbox_policy: %{ type: "workspaceWrite", - writableRoots: [Path.expand(workspace), workspace_cache] + writableRoots: [workspace_cache] } ) @@ -2242,9 +2242,12 @@ defmodule SymphonyElixir.CoreTest do end end) + assert {:ok, canonical_workspace} = + SymphonyElixir.PathSafety.canonicalize(Path.expand(workspace)) + expected_turn_policy = %{ "type" => "workspaceWrite", - "writableRoots" => [Path.expand(workspace), workspace_cache] + "writableRoots" => [canonical_workspace, workspace_cache] } assert Enum.any?(lines, fn line -> diff --git a/elixir/test/symphony_elixir/extensions_test.exs b/elixir/test/symphony_elixir/extensions_test.exs index e1df7eac3d..0f712a0792 100644 --- a/elixir/test/symphony_elixir/extensions_test.exs +++ b/elixir/test/symphony_elixir/extensions_test.exs @@ -496,7 +496,7 @@ defmodule SymphonyElixir.ExtensionsTest do assert state_payload == %{ "generated_at" => state_payload["generated_at"], - "counts" => %{"running" => 1, "retrying" => 1, "blocked" => 1}, + "counts" => %{"running" => 1, "retrying" => 1, "blocked" => 1, "expired" => 1}, "running" => [ %{ "issue_id" => "issue-http", @@ -542,12 +542,89 @@ defmodule SymphonyElixir.ExtensionsTest do "last_event_at" => state_payload["blocked"] |> List.first() |> Map.fetch!("last_event_at") } ], + "claim_leases" => [ + %{ + "issue_id" => "issue-http", + "issue_identifier" => "MT-HTTP", + "state" => "active", + "worker_id" => "local:#PID<0.1.0>", + "worker_host" => nil, + "workspace_path" => "/workspaces/MT-HTTP", + "attempt" => 1, + "last_seen_at" => state_payload["claim_leases"] |> Enum.at(0) |> Map.fetch!("last_seen_at"), + "lease_expires_at" => state_payload["claim_leases"] |> Enum.at(0) |> Map.fetch!("lease_expires_at"), + "lease_expires_in_ms" => 90_000, + "retry_due_at" => nil, + "retry_due_in_ms" => nil, + "retry_backoff_ms" => nil, + "error" => nil + }, + %{ + "issue_id" => "issue-retry", + "issue_identifier" => "MT-RETRY", + "state" => "retrying", + "worker_id" => "local:#PID<0.2.0>", + "worker_host" => nil, + "workspace_path" => "/workspaces/MT-RETRY", + "attempt" => 2, + "last_seen_at" => state_payload["claim_leases"] |> Enum.at(1) |> Map.fetch!("last_seen_at"), + "lease_expires_at" => state_payload["claim_leases"] |> Enum.at(1) |> Map.fetch!("lease_expires_at"), + "lease_expires_in_ms" => 120_000, + "retry_due_at" => state_payload["claim_leases"] |> Enum.at(1) |> Map.fetch!("retry_due_at"), + "retry_due_in_ms" => 2_000, + "retry_backoff_ms" => 2_000, + "error" => "boom" + }, + %{ + "issue_id" => "issue-blocked", + "issue_identifier" => "MT-BLOCKED", + "state" => "blocked", + "worker_id" => "dm-dev2:#PID<0.3.0>", + "worker_host" => "dm-dev2", + "workspace_path" => "/workspaces/MT-BLOCKED", + "attempt" => 1, + "last_seen_at" => state_payload["claim_leases"] |> Enum.at(2) |> Map.fetch!("last_seen_at"), + "lease_expires_at" => state_payload["claim_leases"] |> Enum.at(2) |> Map.fetch!("lease_expires_at"), + "lease_expires_in_ms" => 90_000, + "retry_due_at" => nil, + "retry_due_in_ms" => nil, + "retry_backoff_ms" => nil, + "error" => "codex turn requires operator input" + } + ], + "expired" => [ + %{ + "issue_id" => "issue-expired", + "issue_identifier" => "MT-EXPIRED", + "state" => "expired", + "worker_id" => "local:#PID<0.4.0>", + "worker_host" => nil, + "workspace_path" => "/workspaces/MT-EXPIRED", + "attempt" => 3, + "last_seen_at" => state_payload["expired"] |> List.first() |> Map.fetch!("last_seen_at"), + "lease_expires_at" => state_payload["expired"] |> List.first() |> Map.fetch!("lease_expires_at"), + "lease_expires_in_ms" => -1_000, + "retry_due_at" => nil, + "retry_due_in_ms" => nil, + "retry_backoff_ms" => nil, + "error" => "claim lease expired at 2026-05-29T17:00:00Z; requeueing", + "expired_at" => state_payload["expired"] |> List.first() |> Map.fetch!("expired_at"), + "requeued_at" => state_payload["expired"] |> List.first() |> Map.fetch!("requeued_at") + } + ], "codex_totals" => %{ "input_tokens" => 4, "output_tokens" => 8, "total_tokens" => 12, "seconds_running" => 42.5 }, + "token_usage" => %{ + "input_tokens" => 0, + "output_tokens" => 0, + "total_tokens" => 0, + "issue_count" => 0, + "session_count" => 0 + }, "rate_limits" => %{"primary" => %{"remaining" => 11}} } @@ -559,7 +636,7 @@ defmodule SymphonyElixir.ExtensionsTest do "issue_id" => "issue-http", "status" => "running", "workspace" => %{ - "path" => Path.join(Config.settings!().workspace.root, "MT-HTTP"), + "path" => "/workspaces/MT-HTTP", "host" => nil }, "attempts" => %{"restart_count" => 0, "current_retry_attempt" => 0}, @@ -581,7 +658,25 @@ defmodule SymphonyElixir.ExtensionsTest do "logs" => %{"codex_session_logs" => []}, "recent_events" => [], "last_error" => nil, - "tracked" => %{} + "tracked" => %{ + "claim_lease" => %{ + "issue_id" => "issue-http", + "issue_identifier" => "MT-HTTP", + "state" => "active", + "worker_id" => "local:#PID<0.1.0>", + "worker_host" => nil, + "workspace_path" => "/workspaces/MT-HTTP", + "attempt" => 1, + "last_seen_at" => issue_payload["tracked"]["claim_lease"]["last_seen_at"], + "lease_expires_at" => issue_payload["tracked"]["claim_lease"]["lease_expires_at"], + "lease_expires_in_ms" => 90_000, + "retry_due_at" => nil, + "retry_due_in_ms" => nil, + "retry_backoff_ms" => nil, + "error" => nil + } + }, + "token_usage" => nil } conn = get(build_conn(), "/api/v1/MT-RETRY") @@ -601,6 +696,15 @@ defmodule SymphonyElixir.ExtensionsTest do } } = json_response(conn, 200) + conn = get(build_conn(), "/api/v1/MT-EXPIRED") + + assert %{ + "status" => "expired", + "tracked" => %{ + "expired_claim" => %{"state" => "expired", "error" => "claim lease expired" <> _} + } + } = json_response(conn, 200) + conn = get(build_conn(), "/api/v1/MT-MISSING") assert json_response(conn, 404) == %{ @@ -613,6 +717,73 @@ defmodule SymphonyElixir.ExtensionsTest do json_response(conn, 202) end + test "phoenix observability api serves inactive ledger-backed issue details" do + orchestrator_name = Module.concat(__MODULE__, :LedgerBackedIssueOrchestrator) + snapshot = static_snapshot() |> Map.put(:running, []) |> Map.put(:retrying, []) + + {:ok, _pid} = + StaticOrchestrator.start_link( + name: orchestrator_name, + snapshot: snapshot, + refresh: :unavailable + ) + + ledger_file = Application.fetch_env!(:symphony_elixir, :token_usage_ledger_file) + + SymphonyElixir.TokenUsageLedger.append_observation( + %{ + observed_at: "2026-04-20T10:00:00Z", + final: true, + issue_id: "issue-done", + issue_identifier: "MT-DONE", + session_id: "thread-done-turn-done", + worker_host: "worker-a", + workspace_path: "/tmp/MT-DONE", + turn_count: 3, + input_tokens: 40, + output_tokens: 6, + total_tokens: 46, + source_event: :session_final + }, + file: ledger_file + ) + + start_test_endpoint(orchestrator: orchestrator_name, snapshot_timeout_ms: 50) + + state_payload = json_response(get(build_conn(), "/api/v1/state"), 200) + + assert state_payload["token_usage"] == %{ + "input_tokens" => 40, + "output_tokens" => 6, + "total_tokens" => 46, + "issue_count" => 1, + "session_count" => 1 + } + + issue_payload = json_response(get(build_conn(), "/api/v1/MT-DONE"), 200) + + assert issue_payload == %{ + "issue_identifier" => "MT-DONE", + "issue_id" => "issue-done", + "status" => "inactive", + "workspace" => %{"path" => "/tmp/MT-DONE", "host" => "worker-a"}, + "attempts" => %{"restart_count" => 0, "current_retry_attempt" => 0}, + "running" => nil, + "retry" => nil, + "blocked" => nil, + "logs" => %{"codex_session_logs" => []}, + "recent_events" => [], + "last_error" => nil, + "tracked" => %{}, + "token_usage" => %{ + "input_tokens" => 40, + "output_tokens" => 6, + "total_tokens" => 46, + "session_count" => 1 + } + } + end + test "phoenix observability api preserves 405, 404, and unavailable behavior" do unavailable_orchestrator = Module.concat(__MODULE__, :UnavailableOrchestrator) start_test_endpoint(orchestrator: unavailable_orchestrator, snapshot_timeout_ms: 5) @@ -735,6 +906,9 @@ defmodule SymphonyElixir.ExtensionsTest do assert html =~ "MT-HTTP" assert html =~ "MT-RETRY" assert html =~ "MT-BLOCKED" + assert html =~ "MT-EXPIRED" + assert html =~ "Claim leases" + assert html =~ "Expired leases" assert html =~ "rendered" assert html =~ "turn blocked: waiting for user input" assert html =~ "Runtime" @@ -835,7 +1009,7 @@ defmodule SymphonyElixir.ExtensionsTest do response = Req.get!("http://127.0.0.1:#{port}/api/v1/state") assert response.status == 200 - assert response.body["counts"] == %{"running" => 1, "retrying" => 1, "blocked" => 1} + assert response.body["counts"] == %{"running" => 1, "retrying" => 1, "blocked" => 1, "expired" => 1} dashboard_css = Req.get!("http://127.0.0.1:#{port}/dashboard.css") assert dashboard_css.status == 200 @@ -878,6 +1052,8 @@ defmodule SymphonyElixir.ExtensionsTest do end defp static_snapshot do + now = DateTime.utc_now() + %{ running: [ %{ @@ -893,7 +1069,7 @@ defmodule SymphonyElixir.ExtensionsTest do codex_input_tokens: 4, codex_output_tokens: 8, codex_total_tokens: 12, - started_at: DateTime.utc_now() + started_at: now } ], retrying: [ @@ -914,14 +1090,84 @@ defmodule SymphonyElixir.ExtensionsTest do worker_host: "dm-dev2", workspace_path: "/workspaces/MT-BLOCKED", session_id: "thread-blocked", - blocked_at: DateTime.utc_now(), + blocked_at: now, last_codex_event: :turn_input_required, last_codex_message: %{ event: :turn_input_required, message: %{"method" => "turn/input_required"}, - timestamp: DateTime.utc_now() + timestamp: now }, - last_codex_timestamp: DateTime.utc_now() + last_codex_timestamp: now + } + ], + claim_leases: [ + %{ + issue_id: "issue-http", + identifier: "MT-HTTP", + state: "active", + worker_id: "local:#PID<0.1.0>", + worker_host: nil, + workspace_path: "/workspaces/MT-HTTP", + attempt: 1, + last_seen_at: now, + lease_expires_at: DateTime.add(now, 90_000, :millisecond), + lease_expires_in_ms: 90_000, + retry_due_at: nil, + retry_due_in_ms: nil, + retry_backoff_ms: nil, + error: nil + }, + %{ + issue_id: "issue-retry", + identifier: "MT-RETRY", + state: "retrying", + worker_id: "local:#PID<0.2.0>", + worker_host: nil, + workspace_path: "/workspaces/MT-RETRY", + attempt: 2, + last_seen_at: now, + lease_expires_at: DateTime.add(now, 120_000, :millisecond), + lease_expires_in_ms: 120_000, + retry_due_at: DateTime.add(now, 2_000, :millisecond), + retry_due_in_ms: 2_000, + retry_backoff_ms: 2_000, + error: "boom" + }, + %{ + issue_id: "issue-blocked", + identifier: "MT-BLOCKED", + state: "blocked", + worker_id: "dm-dev2:#PID<0.3.0>", + worker_host: "dm-dev2", + workspace_path: "/workspaces/MT-BLOCKED", + attempt: 1, + last_seen_at: now, + lease_expires_at: DateTime.add(now, 90_000, :millisecond), + lease_expires_in_ms: 90_000, + retry_due_at: nil, + retry_due_in_ms: nil, + retry_backoff_ms: nil, + error: "codex turn requires operator input" + } + ], + expired: [ + %{ + issue_id: "issue-expired", + identifier: "MT-EXPIRED", + state: "expired", + worker_id: "local:#PID<0.4.0>", + worker_host: nil, + workspace_path: "/workspaces/MT-EXPIRED", + attempt: 3, + last_seen_at: DateTime.add(now, -90_000, :millisecond), + lease_expires_at: DateTime.add(now, -1_000, :millisecond), + lease_expires_in_ms: -1_000, + retry_due_at: nil, + retry_due_in_ms: nil, + retry_backoff_ms: nil, + error: "claim lease expired at 2026-05-29T17:00:00Z; requeueing", + expired_at: DateTime.add(now, -1_000, :millisecond), + requeued_at: now } ], codex_totals: %{input_tokens: 4, output_tokens: 8, total_tokens: 12, seconds_running: 42.5}, diff --git a/elixir/test/symphony_elixir/orchestrator_status_test.exs b/elixir/test/symphony_elixir/orchestrator_status_test.exs index 5d56b5d9f2..9f2f819bae 100644 --- a/elixir/test/symphony_elixir/orchestrator_status_test.exs +++ b/elixir/test/symphony_elixir/orchestrator_status_test.exs @@ -191,6 +191,21 @@ defmodule SymphonyElixir.OrchestratorStatusTest do assert snapshot_entry.turn_count == 1 assert is_integer(snapshot_entry.runtime_seconds) + ledger_file = Application.fetch_env!(:symphony_elixir, :token_usage_ledger_file) + + assert [ + %{ + final: false, + issue_id: ^issue_id, + issue_identifier: "MT-201", + session_id: "thread-usage-turn-usage", + input_tokens: 12, + output_tokens: 4, + total_tokens: 16, + source_event: "notification" + } + ] = SymphonyElixir.TokenUsageLedger.read_records(file: ledger_file) + send(pid, {:DOWN, process_ref, :process, self(), :normal}) completed_state = :sys.get_state(pid) @@ -198,6 +213,20 @@ defmodule SymphonyElixir.OrchestratorStatusTest do assert completed_state.codex_totals.output_tokens == 4 assert completed_state.codex_totals.total_tokens == 16 assert is_integer(completed_state.codex_totals.seconds_running) + + assert [ + %{final: false}, + %{ + final: true, + issue_id: ^issue_id, + issue_identifier: "MT-201", + session_id: "thread-usage-turn-usage", + input_tokens: 12, + output_tokens: 4, + total_tokens: 16, + source_event: "session_final" + } + ] = SymphonyElixir.TokenUsageLedger.read_records(file: ledger_file) end test "token budget counts only uncached spend, excluding cached prefix reads" do @@ -811,6 +840,16 @@ defmodule SymphonyElixir.OrchestratorStatusTest do |> Map.put(:claimed, MapSet.put(initial_state.claimed, issue_id)) end) + send( + pid, + {:codex_worker_update, issue_id, + %{ + event: :session_started, + session_id: "thread-usage-turn-usage", + timestamp: DateTime.utc_now() + }} + ) + for usage <- [ %{"input_tokens" => 8, "output_tokens" => 3, "total_tokens" => 11}, %{"input_tokens" => 10, "output_tokens" => 4, "total_tokens" => 14} @@ -834,6 +873,16 @@ defmodule SymphonyElixir.OrchestratorStatusTest do assert snapshot_entry.codex_input_tokens == 10 assert snapshot_entry.codex_output_tokens == 4 assert snapshot_entry.codex_total_tokens == 14 + + ledger_file = Application.fetch_env!(:symphony_elixir, :token_usage_ledger_file) + + assert SymphonyElixir.TokenUsageLedger.summary(file: ledger_file) == %{ + input_tokens: 10, + output_tokens: 4, + total_tokens: 14, + issue_count: 1, + session_count: 1 + } end test "orchestrator token accounting ignores last_token_usage without cumulative totals" do @@ -1145,6 +1194,9 @@ defmodule SymphonyElixir.OrchestratorStatusTest do last_codex_message: nil, last_codex_timestamp: stale_activity_at, last_codex_event: :notification, + codex_input_tokens: 20, + codex_output_tokens: 3, + codex_total_tokens: 23, started_at: stale_activity_at } @@ -1171,8 +1223,21 @@ defmodule SymphonyElixir.OrchestratorStatusTest do assert is_integer(due_at_ms) remaining_ms = due_at_ms - System.monotonic_time(:millisecond) - assert remaining_ms >= 9_500 + assert remaining_ms >= 8_000 assert remaining_ms <= 10_500 + + ledger_file = Application.fetch_env!(:symphony_elixir, :token_usage_ledger_file) + + assert [ + %{ + final: true, + issue_id: ^issue_id, + issue_identifier: "MT-STALL", + session_id: "thread-stall-turn-stall", + total_tokens: 23, + source_event: "session_final" + } + ] = SymphonyElixir.TokenUsageLedger.read_records(file: ledger_file) end test "orchestrator blocks stalled workers that are waiting on MCP elicitation" do @@ -1361,6 +1426,201 @@ defmodule SymphonyElixir.OrchestratorStatusTest do } = state.blocked[issue_id] end + test "claim lease marker is persisted when an issue is claimed" do + write_workflow_file!(Workflow.workflow_file_path(), tracker_kind: "memory") + Application.put_env(:symphony_elixir, :memory_tracker_recipient, self()) + + issue = %Issue{ + id: "issue-lease", + identifier: "MT-LEASE", + title: "Lease marker", + state: "In Progress" + } + + running_entry = %{ + pid: self(), + ref: make_ref(), + identifier: issue.identifier, + issue: issue, + worker_host: nil, + workspace_path: "/tmp/symphony_workspaces/MT-LEASE", + started_at: DateTime.utc_now() + } + + state = + %Orchestrator.State{poll_interval_ms: 30_000, max_concurrent_agents: 1} + |> Orchestrator.start_claim_lease_for_test(issue, running_entry) + + assert %{ + state: :active, + worker_id: "local:" <> _, + workspace_path: "/tmp/symphony_workspaces/MT-LEASE", + attempt: 1, + lease_expires_at: %DateTime{} + } = state.claim_leases[issue.id] + + assert_receive {:memory_tracker_comment, "issue-lease", body} + assert body =~ "## Symphony Claim Lease" + assert body =~ "- state: active" + assert body =~ "- worker_id: local:" + assert body =~ "- workspace_path: /tmp/symphony_workspaces/MT-LEASE" + assert body =~ "- attempt: 1" + assert body =~ "- lease_expires_at:" + end + + test "claim lease heartbeat refresh updates last seen and lease expiry" do + write_workflow_file!(Workflow.workflow_file_path(), tracker_kind: "memory") + Application.put_env(:symphony_elixir, :memory_tracker_recipient, self()) + + issue = %Issue{id: "issue-heartbeat", identifier: "MT-HB", title: "Heartbeat", state: "In Progress"} + + running_entry = %{ + pid: self(), + ref: make_ref(), + identifier: issue.identifier, + issue: issue, + worker_host: "worker-a", + workspace_path: "/workspaces/MT-HB", + started_at: DateTime.utc_now() + } + + state = + %Orchestrator.State{poll_interval_ms: 30_000, max_concurrent_agents: 1} + |> Orchestrator.start_claim_lease_for_test(issue, running_entry) + + assert_receive {:memory_tracker_comment, "issue-heartbeat", _body} + + stale_seen_at = DateTime.add(DateTime.utc_now(), -120, :second) + stale_marker_at_ms = System.monotonic_time(:millisecond) - 90_000 + stale_expires_at_ms = System.monotonic_time(:millisecond) + 1_000 + old_lease = state.claim_leases[issue.id] + + state = + put_in(state.claim_leases[issue.id], %{ + old_lease + | last_seen_at: stale_seen_at, + lease_expires_at_ms: stale_expires_at_ms, + last_marker_at_ms: stale_marker_at_ms + }) + + refreshed_state = Orchestrator.refresh_claim_lease_from_running_for_test(state, issue.id, running_entry) + refreshed_lease = refreshed_state.claim_leases[issue.id] + + assert DateTime.compare(refreshed_lease.last_seen_at, stale_seen_at) == :gt + assert refreshed_lease.lease_expires_at_ms > stale_expires_at_ms + assert refreshed_lease.heartbeat_count == old_lease.heartbeat_count + 1 + + assert_receive {:memory_tracker_comment, "issue-heartbeat", body} + assert body =~ "- state: active" + assert body =~ "- worker_host: worker-a" + end + + test "expired claim leases are requeued and logged" do + write_workflow_file!(Workflow.workflow_file_path(), tracker_kind: "memory") + Application.put_env(:symphony_elixir, :memory_tracker_recipient, self()) + + issue = %Issue{ + id: "issue-expired-lease", + identifier: "MT-EXPIRED-LEASE", + title: "Expired lease", + state: "In Progress" + } + + expired_lease = %{ + issue_id: issue.id, + identifier: issue.identifier, + state: :active, + worker_id: "local:#PID<0.1.0>", + worker_host: nil, + workspace_path: "/tmp/symphony_workspaces/MT-EXPIRED-LEASE", + attempt: 1, + last_seen_at: DateTime.add(DateTime.utc_now(), -120, :second), + lease_expires_at: DateTime.add(DateTime.utc_now(), -1, :second), + lease_expires_at_ms: System.monotonic_time(:millisecond) - 1, + last_marker_at_ms: System.monotonic_time(:millisecond) - 120_000 + } + + state = %Orchestrator.State{ + poll_interval_ms: 30_000, + max_concurrent_agents: 1, + claimed: MapSet.new([issue.id]), + claim_leases: %{issue.id => expired_lease} + } + + log = + capture_log(fn -> + recovered_state = Orchestrator.recover_expired_claim_leases_for_test(state, [issue]) + send(self(), {:recovered_state, recovered_state}) + end) + + assert_receive {:recovered_state, recovered_state} + assert log =~ "Claim lease expired; requeueing" + + assert %{attempt: 2, error: "claim lease expired" <> _} = recovered_state.retry_attempts[issue.id] + assert %{state: :expired, error: "claim lease expired" <> _} = recovered_state.expired_claims[issue.id] + assert %{state: :retrying, attempt: 2, retry_backoff_ms: 0} = recovered_state.claim_leases[issue.id] + assert MapSet.member?(recovered_state.claimed, issue.id) + + assert_receive {:memory_tracker_comment, "issue-expired-lease", body} + assert body =~ "- state: retrying" + assert body =~ "- retry_backoff_ms: 0" + end + + test "expired claim lease recovery does not duplicate live workers" do + issue = %Issue{ + id: "issue-live-expired", + identifier: "MT-LIVE-EXPIRED", + title: "Live expired lease", + state: "In Progress" + } + + worker_pid = + spawn(fn -> + receive do + :done -> :ok + end + end) + + on_exit(fn -> + if Process.alive?(worker_pid), do: Process.exit(worker_pid, :normal) + end) + + expired_lease = %{ + issue_id: issue.id, + identifier: issue.identifier, + state: :active, + worker_id: "local:#PID<0.2.0>", + worker_host: nil, + workspace_path: "/tmp/symphony_workspaces/MT-LIVE-EXPIRED", + attempt: 1, + last_seen_at: DateTime.add(DateTime.utc_now(), -120, :second), + lease_expires_at: DateTime.add(DateTime.utc_now(), -1, :second), + lease_expires_at_ms: System.monotonic_time(:millisecond) - 1 + } + + state = %Orchestrator.State{ + poll_interval_ms: 30_000, + max_concurrent_agents: 1, + claimed: MapSet.new([issue.id]), + running: %{ + issue.id => %{ + pid: worker_pid, + ref: make_ref(), + identifier: issue.identifier, + issue: issue, + started_at: DateTime.utc_now() + } + }, + claim_leases: %{issue.id => expired_lease} + } + + recovered_state = Orchestrator.recover_expired_claim_leases_for_test(state, [issue]) + + assert recovered_state.retry_attempts == %{} + assert recovered_state.expired_claims == %{} + assert recovered_state.running[issue.id].pid == worker_pid + end + test "status dashboard renders offline marker to terminal" do rendered = ExUnit.CaptureIO.capture_io(fn -> diff --git a/elixir/test/symphony_elixir/token_usage_ledger_test.exs b/elixir/test/symphony_elixir/token_usage_ledger_test.exs new file mode 100644 index 0000000000..7c5069262e --- /dev/null +++ b/elixir/test/symphony_elixir/token_usage_ledger_test.exs @@ -0,0 +1,246 @@ +defmodule SymphonyElixir.TokenUsageLedgerTest do + use ExUnit.Case + + import ExUnit.CaptureLog + + alias SymphonyElixir.TokenUsageLedger + + setup do + previous_ledger_file = Application.get_env(:symphony_elixir, :token_usage_ledger_file) + previous_log_file = Application.get_env(:symphony_elixir, :log_file) + + test_root = + Path.join( + System.tmp_dir!(), + "symphony-token-usage-ledger-#{System.unique_integer([:positive])}" + ) + + File.mkdir_p!(test_root) + ledger_file = Path.join(test_root, "token_usage.jsonl") + + on_exit(fn -> + restore_app_env(:token_usage_ledger_file, previous_ledger_file) + restore_app_env(:log_file, previous_log_file) + File.rm_rf(test_root) + end) + + {:ok, ledger_file: ledger_file, test_root: test_root} + end + + test "default ledger path is next to the configured log file", %{test_root: test_root} do + Application.delete_env(:symphony_elixir, :token_usage_ledger_file) + Application.put_env(:symphony_elixir, :log_file, Path.join(test_root, "log/symphony.log")) + + assert TokenUsageLedger.file_path() == Path.join(test_root, "log/token_usage.jsonl") + + Application.put_env(:symphony_elixir, :token_usage_ledger_file, Path.join(test_root, "custom.jsonl")) + assert TokenUsageLedger.file_path() == Path.join(test_root, "custom.jsonl") + end + + test "appends valid JSONL records", %{ledger_file: ledger_file} do + assert :ok = + TokenUsageLedger.append_observation( + %{ + observed_at: ~U[2026-04-20 10:00:00Z], + final: false, + issue_id: "issue-1", + issue_identifier: "MT-1", + session_id: "thread-1-turn-1", + worker_host: "worker-a", + workspace_path: "/tmp/MT-1", + turn_count: 1, + input_tokens: 10, + output_tokens: 5, + total_tokens: 15, + source_event: :notification + }, + file: ledger_file + ) + + assert [ + %{ + "schema_version" => 1, + "observed_at" => "2026-04-20T10:00:00Z", + "final" => false, + "issue_id" => "issue-1", + "issue_identifier" => "MT-1", + "session_id" => "thread-1-turn-1", + "worker_host" => "worker-a", + "workspace_path" => "/tmp/MT-1", + "turn_count" => 1, + "input_tokens" => 10, + "output_tokens" => 5, + "total_tokens" => 15, + "source_event" => "notification" + } + ] = + ledger_file + |> File.read!() + |> String.split("\n", trim: true) + |> Enum.map(&Jason.decode!/1) + end + + test "summarizes max token totals per issue and session", %{ledger_file: ledger_file} do + append!(ledger_file, "MT-1", "session-a", 100, 20, 120) + append!(ledger_file, "MT-1", "session-a", 90, 10, 100) + append!(ledger_file, "MT-1", "session-b", 30, 5, 35) + append!(ledger_file, "MT-2", "session-c", 7, 3, 10) + + assert TokenUsageLedger.summary(file: ledger_file) == %{ + input_tokens: 137, + output_tokens: 28, + total_tokens: 165, + issue_count: 2, + session_count: 3 + } + + assert TokenUsageLedger.issue_summary("MT-1", file: ledger_file) == %{ + issue_id: "issue-MT-1", + issue_identifier: "MT-1", + worker_host: nil, + workspace_path: "/tmp/MT-1", + input_tokens: 130, + output_tokens: 25, + total_tokens: 155, + session_count: 2 + } + end + + test "ignores malformed and unreadable ledger records", %{ledger_file: ledger_file} do + File.write!( + ledger_file, + [ + "not json", + "[]", + Jason.encode!(%{"schema_version" => 2, "issue_identifier" => "MT-0"}), + Jason.encode!(%{"schema_version" => 1, "issue_identifier" => "MT-0"}), + "" + ] + |> Enum.join("\n") + ) + + append!(ledger_file, "MT-3", "session-d", 1, 2, 3) + + assert TokenUsageLedger.summary(file: ledger_file) == %{ + input_tokens: 1, + output_tokens: 2, + total_tokens: 3, + issue_count: 1, + session_count: 1 + } + + missing_file = Path.join(Path.dirname(ledger_file), "missing.jsonl") + assert TokenUsageLedger.read_records(file: missing_file) == [] + assert TokenUsageLedger.issue_summary("MT-MISSING", file: ledger_file) == nil + end + + test "normalizes invalid token field values", %{ledger_file: ledger_file} do + File.write!( + ledger_file, + Jason.encode!(%{ + "schema_version" => 1, + "issue_identifier" => "MT-0", + "session_id" => "session-zero", + "final" => "not-a-bool", + "input_tokens" => "bad", + "output_tokens" => -2, + "total_tokens" => 0 + }) + ) + + assert [ + %{ + final: false, + input_tokens: 0, + output_tokens: 0, + total_tokens: 0 + } + ] = TokenUsageLedger.read_records(file: ledger_file) + end + + test "reads configured ledger path and normalizes string token values", %{ledger_file: ledger_file} do + Application.put_env(:symphony_elixir, :token_usage_ledger_file, ledger_file) + + File.write!( + ledger_file, + Jason.encode!(%{ + "schema_version" => 1, + "issue_identifier" => "MT-STRING", + "session_id" => "session-string", + "input_tokens" => "4", + "output_tokens" => "1", + "total_tokens" => "5", + "source_event" => [] + }) + ) + + assert [ + %{ + issue_identifier: "MT-STRING", + session_id: "session-string", + input_tokens: 4, + output_tokens: 1, + total_tokens: 5, + source_event: nil + } + ] = TokenUsageLedger.read_records() + end + + test "write failures are logged but do not crash", %{test_root: test_root} do + log = + capture_log(fn -> + assert :ok = + TokenUsageLedger.append_observation( + %{ + issue_identifier: "MT-ERR", + session_id: "session-error", + input_tokens: 1, + output_tokens: 0, + total_tokens: 1 + }, + file: test_root + ) + end) + + assert log =~ "Failed to append token usage ledger record" + + log = + capture_log(fn -> + assert :ok = + TokenUsageLedger.append_observation( + %{ + issue_id: 123, + issue_identifier: "MT-ERR", + session_id: "session-error", + source_event: 456, + input_tokens: 1, + output_tokens: 0, + total_tokens: 1 + }, + file: nil + ) + end) + + assert log =~ "Failed to append token usage ledger record" + end + + defp append!(ledger_file, issue_identifier, session_id, input_tokens, output_tokens, total_tokens) do + TokenUsageLedger.append_observation( + %{ + observed_at: "2026-04-20T10:00:00Z", + final: false, + issue_id: "issue-#{issue_identifier}", + issue_identifier: issue_identifier, + session_id: session_id, + workspace_path: "/tmp/#{issue_identifier}", + input_tokens: input_tokens, + output_tokens: output_tokens, + total_tokens: total_tokens + }, + file: ledger_file + ) + end + + defp restore_app_env(key, nil), do: Application.delete_env(:symphony_elixir, key) + defp restore_app_env(key, value), do: Application.put_env(:symphony_elixir, key, value) +end diff --git a/elixir/test/symphony_elixir/workspace_and_config_test.exs b/elixir/test/symphony_elixir/workspace_and_config_test.exs index c893955109..3780ec9241 100644 --- a/elixir/test/symphony_elixir/workspace_and_config_test.exs +++ b/elixir/test/symphony_elixir/workspace_and_config_test.exs @@ -1195,7 +1195,7 @@ defmodule SymphonyElixir.WorkspaceAndConfigTest do codex_thread_sandbox: "workspace-write", codex_turn_sandbox_policy: %{ type: "workspaceWrite", - writableRoots: [explicit_workspace, explicit_cache] + writableRoots: [explicit_cache] } ) @@ -1540,7 +1540,7 @@ defmodule SymphonyElixir.WorkspaceAndConfigTest do assert runtime_settings.turn_sandbox_policy == %{ "type" => "workspaceWrite", - "writableRoots" => ["relative/path"], + "writableRoots" => [issue_workspace, "relative/path"], "networkAccess" => true } @@ -1563,6 +1563,74 @@ defmodule SymphonyElixir.WorkspaceAndConfigTest do end end + test "runtime sandbox policy resolution leaves explicit workspaceWrite roots unchanged without a workspace" do + settings = %Schema{ + codex: %Codex{ + turn_sandbox_policy: %{ + "type" => "workspaceWrite", + "writableRoots" => "not-a-list", + "networkAccess" => true + } + }, + workspace: %Schema.Workspace{root: "/tmp/ignored"} + } + + assert {:ok, policy} = Schema.resolve_runtime_turn_sandbox_policy(settings, nil) + + assert policy == %{ + "type" => "workspaceWrite", + "writableRoots" => "not-a-list", + "networkAccess" => true + } + end + + test "runtime sandbox policy resolution normalizes explicit workspaceWrite roots when needed" do + issue_workspace = "/tmp/MT-201" + + settings = %Schema{ + codex: %Codex{ + turn_sandbox_policy: %{ + "type" => "workspaceWrite", + "writableRoots" => "not-a-list", + "networkAccess" => true + } + }, + workspace: %Schema.Workspace{root: "/tmp/ignored"} + } + + assert {:ok, policy} = Schema.resolve_runtime_turn_sandbox_policy(settings, issue_workspace) + + assert policy == %{ + "type" => "workspaceWrite", + "writableRoots" => [issue_workspace], + "networkAccess" => true + } + end + + test "runtime sandbox policy resolution preserves explicit workspaceWrite roots for remote workers" do + remote_workspace = "/remote/workspaces/MT-200" + + settings = %Schema{ + codex: %Codex{ + turn_sandbox_policy: %{ + "type" => "workspaceWrite", + "writableRoots" => ["relative/path"], + "networkAccess" => true + } + }, + workspace: %Schema.Workspace{root: "/tmp/ignored"} + } + + assert {:ok, policy} = + Schema.resolve_runtime_turn_sandbox_policy(settings, remote_workspace, remote: true) + + assert policy == %{ + "type" => "workspaceWrite", + "writableRoots" => [remote_workspace, "relative/path"], + "networkAccess" => true + } + end + test "path safety returns errors for invalid path segments" do invalid_segment = String.duplicate("a", 300) path = Path.join(System.tmp_dir!(), invalid_segment)