Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/openhuman/agent/harness/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ pub(crate) mod memory_context;
pub(crate) mod memory_context_safety;
pub(crate) mod parse;
pub(crate) mod payload_summarizer;
pub(crate) mod required_output;
pub mod run_queue;
pub mod sandbox_context;
pub mod session;
Expand Down
194 changes: 194 additions & 0 deletions src/openhuman/agent/harness/required_output.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
//! Required structured-output validation & repair (issue #4117).
//!
//! Some agent contracts mandate a JSON block — e.g. a `thoughts` block like
//! `{"thoughts": "…", "next_action": "…"}` — on **every** turn, because
//! downstream parsing/routing depends on it. Models frequently omit the block
//! entirely, leaving those consumers with nothing.
//!
//! This module supplies the pure primitives the turn engine uses to guarantee a
//! well-formed block on every accepted turn:
//!
//! * [`output_satisfies_contract`] — validate presence + shape of the block,
//! * [`repair_instruction`] — the corrective re-prompt that asks the model to
//! re-emit its reply with the block, and
//! * [`synthesize_block`] — a minimal, schema-valid block used as a deterministic
//! fallback when the re-prompt also omits it.
//!
//! The orchestration that ties these together (validate → re-prompt → synthesize)
//! lives on the session in `session::turn::session_io` so it can drive the extra
//! provider call; keeping the logic here pure keeps it unit-testable without a
//! provider.

use crate::openhuman::config::RequiredOutputContract;

/// Whether `text` satisfies `contract`: it contains a JSON object carrying every
/// required key with a non-null value. An inert contract (no non-blank keys) is
/// treated as always satisfied so enforcement is a no-op.
pub(crate) fn output_satisfies_contract(text: &str, contract: &RequiredOutputContract) -> bool {
if !contract.is_active() {
return true;
}
find_required_block(text, contract).is_some()
}

/// The required block when it appears in the expected **leading position** —
/// the *first* JSON value in `text` must be an object carrying every required
/// key with a non-null value — else `None`.
///
/// Issue #4117 mandates the block in a predictable position so downstream
/// parsing/routing can rely on it. Prose before the block is fine (prose is not
/// JSON, so the block is still the first extracted value), but a block buried
/// after *another* JSON object is rejected and gets repaired rather than
/// silently accepted. Reuses the harness JSON extractor so fenced, inline, and
/// whole-object replies are all recognised.
pub(crate) fn find_required_block(
text: &str,
contract: &RequiredOutputContract,
) -> Option<serde_json::Value> {
let keys = contract.all_keys();
if keys.is_empty() {
return None;
}
let first = super::parse::extract_json_values(text).into_iter().next()?;
let obj = first.as_object()?;
let has_all = keys
.iter()
.all(|key| obj.get(key).is_some_and(|v| !v.is_null()));
if has_all {
Some(first)
} else {
None
}
}

/// A minimal, schema-valid block synthesised when the model omits the block and
/// a corrective re-prompt fails to recover it. Every required key maps to an
/// empty string so downstream parsing always has a well-formed object to
/// consume. Returns `"{}"` only for an inert contract (which enforcement never
/// reaches).
pub(crate) fn synthesize_block(contract: &RequiredOutputContract) -> String {
let mut obj = serde_json::Map::new();
for key in contract.all_keys() {
obj.insert(key, serde_json::Value::String(String::new()));
}
serde_json::to_string(&serde_json::Value::Object(obj)).unwrap_or_else(|_| "{}".to_string())
}

/// The corrective instruction that re-prompts the model to re-emit its reply
/// with the required block. Mirrors the iteration-cap checkpoint re-prompt: a
/// self-contained user turn appended after the omitting reply.
pub(crate) fn repair_instruction(contract: &RequiredOutputContract) -> String {
let keys = contract.all_keys().join("\", \"");
format!(
"Your previous reply omitted the required JSON `{}` block that every turn must include. \
Reply again with the same answer, but this time emit a single valid JSON object containing the \
keys \"{}\" — all present and non-null. Do not call any tools.",
contract.block_key, keys
)
}

#[cfg(test)]
mod tests {
use super::*;

fn thoughts_contract() -> RequiredOutputContract {
RequiredOutputContract {
block_key: "thoughts".into(),
required_keys: vec!["next_action".into()],
}
}

#[test]
fn present_well_formed_block_satisfies_contract() {
let contract = thoughts_contract();
let text = "Sure! {\"thoughts\": \"planning\", \"next_action\": \"call tool\"}";
assert!(output_satisfies_contract(text, &contract));
assert!(find_required_block(text, &contract).is_some());
}

#[test]
fn prose_only_reply_fails_validation() {
let contract = thoughts_contract();
assert!(!output_satisfies_contract(
"Sure, I'll handle that.",
&contract
));
}

#[test]
fn block_missing_a_required_sibling_key_fails() {
let contract = thoughts_contract();
// Has `thoughts` but not `next_action`.
let text = "{\"thoughts\": \"planning\"}";
assert!(!output_satisfies_contract(text, &contract));
}

#[test]
fn null_valued_required_key_fails() {
let contract = RequiredOutputContract::new("thoughts");
assert!(!output_satisfies_contract(
"{\"thoughts\": null}",
&contract
));
}

#[test]
fn synthesized_block_satisfies_its_own_contract() {
let contract = thoughts_contract();
let synthesized = synthesize_block(&contract);
assert!(
output_satisfies_contract(&synthesized, &contract),
"synthesized fallback must satisfy the contract it was built from: {synthesized}"
);
}

#[test]
fn leading_block_after_prose_is_accepted() {
let contract = thoughts_contract();
// Prose before the block is fine — prose is not JSON, so the block is
// still the first extracted value.
let text = "Here is my plan.\n{\"thoughts\": \"x\", \"next_action\": \"y\"}";
assert!(output_satisfies_contract(text, &contract));
}

#[test]
fn block_buried_after_another_json_object_is_rejected() {
let contract = thoughts_contract();
// A different JSON object leads; the required block is second → rejected
// so it gets repaired rather than silently accepted (issue #4117).
let text = "{\"foo\": 1}\n{\"thoughts\": \"x\", \"next_action\": \"y\"}";
assert!(!output_satisfies_contract(text, &contract));
}

#[test]
fn blank_block_key_makes_contract_inert() {
// A blank block key is inert even when sibling keys are listed — the
// contract's defining key can never be enforced, so enforcement is
// skipped instead of accepting a block missing that key.
let contract = RequiredOutputContract {
block_key: " ".into(),
required_keys: vec!["next_action".into()],
};
assert!(!contract.is_active());
assert!(output_satisfies_contract(
"{\"next_action\": \"y\"}",
&contract
));
}

#[test]
fn inert_contract_is_always_satisfied() {
let contract = RequiredOutputContract::default();
assert!(!contract.is_active());
assert!(output_satisfies_contract("no block here", &contract));
assert!(find_required_block("no block here", &contract).is_none());
}

#[test]
fn repair_instruction_names_every_required_key() {
let contract = thoughts_contract();
let instruction = repair_instruction(&contract);
assert!(instruction.contains("thoughts"));
assert!(instruction.contains("next_action"));
}
}
46 changes: 46 additions & 0 deletions src/openhuman/agent/harness/session/turn/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,23 @@ fn tool_records_from_conversation(
records
}

/// Rewrite the **trailing** assistant `Chat` message in `history` to `text`,
/// keeping the persisted transcript and the next turn's KV-cache prefix
/// consistent with a repaired required-output reply (issue #4117). Only the last
/// row is touched — when the tail is not an assistant `Chat` (defensive; a clean
/// finish and a cap checkpoint both end on one) a fresh assistant message is
/// appended rather than mutating an older entry.
fn replace_last_assistant_reply(history: &mut Vec<ConversationMessage>, text: &str) {
match history.last_mut() {
Some(ConversationMessage::Chat(chat)) if chat.role == "assistant" => {
chat.content = text.to_string();
}
_ => history.push(ConversationMessage::Chat(ChatMessage::assistant(
text.to_string(),
))),
}
}

fn render_agent_context_status_note(sources: &[harness::AgentContextPreparedSource]) -> String {
let sources = if sources.is_empty() {
"the OpenHuman harness".to_string()
Expand Down Expand Up @@ -1010,6 +1027,35 @@ impl Agent {
} else {
outcome.text.clone()
};

// Enforce the required structured-output contract (issue #4117) on the
// accepted reply — for BOTH the normal-finish and cap-checkpoint paths,
// since a capped turn also delivers a reply downstream parsing depends
// on. When this agent must emit a JSON block every turn and the reply
// omitted it, validate-and-repair (one corrective re-prompt, else a
// synthesized minimal block) before the turn is accepted. The trailing
// assistant message (final answer or pushed checkpoint) is rewritten to
// match, and the repair call's usage is folded into the turn accounting.
let reply = if let Some(contract) = self.config.required_output.clone() {
match self
.enforce_required_output(&reply, &contract, effective_model)
.await
{
Some((repaired, repair_usage)) => {
if let Some(u) = repair_usage {
input_tokens += u.input_tokens;
output_tokens += u.output_tokens;
cached_input_tokens += u.cached_input_tokens;
charged_amount_usd += u.charged_amount_usd;
}
replace_last_assistant_reply(&mut self.history, &repaired);
repaired
}
None => reply,
}
} else {
reply
};
self.trim_history();

// Fold this turn's sub-agent spend into the cumulative meters and capture
Expand Down
116 changes: 116 additions & 0 deletions src/openhuman/agent/harness/session/turn/session_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,122 @@ impl Agent {
}
}

/// Enforce this agent's required structured-output contract on a clean final
/// reply (issue #4117).
///
/// When the contract is active and `reply` already carries a well-formed
/// block, returns `None` (the caller keeps `reply` unchanged). When the block
/// is omitted, the turn is **repaired** so downstream parsing/routing always
/// receives one:
///
/// 1. one corrective re-prompt (tools disabled), seeded with the current
/// history — which already carries the omitting assistant reply — plus
/// [`repair_instruction`]; if it recovers a valid block, that becomes the
/// reply; otherwise
/// 2. a minimal [`synthesize_block`] is prepended to the model's prose so the
/// accepted turn is guaranteed to contain a valid block.
///
/// Returns `Some((repaired_text, usage))` when a repair occurred so the caller
/// can fold the extra call's usage into the turn accounting and rewrite the
/// trailing assistant message. `usage` is `None` when the re-prompt call made
/// no request or failed.
///
/// [`repair_instruction`]: harness::required_output::repair_instruction
/// [`synthesize_block`]: harness::required_output::synthesize_block
pub(in super::super) async fn enforce_required_output(
&self,
reply: &str,
contract: &crate::openhuman::config::RequiredOutputContract,
effective_model: &str,
) -> Option<(String, Option<UsageInfo>)> {
if harness::required_output::output_satisfies_contract(reply, contract) {
return None;
}
log::warn!(
"[agent_loop] required output block `{}` omitted from turn reply — repairing",
contract.block_key
);

// Tier 1 — one corrective re-prompt with native tools disabled. The
// current history already holds the omitting assistant reply, so the
// model sees exactly what it left out.
let mut base = self.tool_dispatcher.to_provider_messages(&self.history);
base.push(ChatMessage::user(
harness::required_output::repair_instruction(contract),
));
let (repair_text, usage) = self
.reprompt_for_required_block(&base, effective_model)
.await;
if harness::required_output::output_satisfies_contract(&repair_text, contract) {
log::info!(
"[agent_loop] required output block `{}` recovered via re-prompt",
contract.block_key
);
return Some((repair_text, usage));
}

// Tier 2 — deterministic fallback: prepend a minimal valid block to the
// model's original prose so the accepted turn always carries one. Fold
// in the (failed) re-prompt's usage so the extra call is still accounted.
log::warn!(
"[agent_loop] required output block `{}` still missing after re-prompt — synthesizing a minimal block",
contract.block_key
);
let synthesized = format!(
"{}\n\n{}",
harness::required_output::synthesize_block(contract),
reply
);
Some((synthesized, usage))
}

/// Ask the provider once for a reply that includes the required
/// structured-output block, with native tools **disabled** so the model
/// returns text rather than another tool call. Returns the parsed prose
/// paired with the call's usage (empty text + `None` usage when the call
/// fails or yields only tool-call markup), mirroring
/// [`summarize_iteration_checkpoint`](Self::summarize_iteration_checkpoint).
async fn reprompt_for_required_block(
&self,
messages: &[ChatMessage],
effective_model: &str,
) -> (String, Option<UsageInfo>) {
let result = self
.provider
.chat(
ChatRequest {
messages,
tools: None,
stream: None,
max_tokens: Some(AGENT_TURN_MAX_OUTPUT_TOKENS),
},
effective_model,
self.temperature,
)
.await;
match result {
Ok(resp) => {
let usage = resp.usage.clone();
// Strip any stray tool-call markup a text-mode model may emit;
// keep only the prose (which should now carry the JSON block).
let (text, calls) = self.tool_dispatcher.parse_response(&resp);
let out = if !text.trim().is_empty() {
text
} else if calls.is_empty() {
resp.text.unwrap_or_default()
} else {
// Only tool-call markup was present — no usable prose.
String::new()
};
(out, usage)
}
Err(e) => {
log::warn!("[agent_loop] required-output re-prompt call failed: {e:#}");
(String::new(), None)
}
}
}

/// Persist the exact provider messages as a session transcript.
///
/// Writes JSONL as source of truth and re-renders the companion `.md`
Expand Down
Loading
Loading