Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 70 additions & 13 deletions src/agent-client-protocol-core/src/jsonrpc/incoming_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@ use crate::RoleId;
use crate::UntypedMessage;
use crate::jsonrpc::ConnectionTo;
use crate::jsonrpc::HandleDispatchFrom;
use crate::jsonrpc::OutgoingMessage;
use crate::jsonrpc::ReplyMessage;
use crate::jsonrpc::Responder;
use crate::jsonrpc::ResponseRouter;
use crate::jsonrpc::dynamic_handler::DynHandleDispatchFrom;
use crate::jsonrpc::dynamic_handler::DynamicHandlerMessage;
use crate::jsonrpc::outgoing_actor::send_raw_message;

use crate::role::Role;

Expand Down Expand Up @@ -93,20 +95,25 @@ pub(super) async fn incoming_protocol_actor<Counterpart: Role>(
let mut new_pending_messages = vec![];
for pending_message in pending_messages {
tracing::trace!(method = pending_message.method(), handler = ?handler.dyn_describe_chain(), "Retrying message");
let id = pending_message.id();
match handler
.dyn_handle_dispatch_from(pending_message, connection.clone())
.await?
.await
{
Handled::Yes => {
Ok(Handled::Yes) => {
tracing::trace!("Message handled");
}
Handled::No {
Ok(Handled::No {
message: m,
retry: _,
} => {
}) => {
tracing::trace!(method = m.method(), handler = ?handler.dyn_describe_chain(), "Message not handled");
new_pending_messages.push(m);
}
Err(err) => {
tracing::warn!(?err, handler = ?handler.dyn_describe_chain(), "Dynamic handler errored on pending message, reporting back");
report_handler_error(connection, id, err)?;
}
}
}
pending_messages = new_pending_messages;
Expand Down Expand Up @@ -253,55 +260,75 @@ async fn dispatch_dispatch<Counterpart: Role>(
tracing::trace!(handler = ?handler.describe_chain(), "Attempting handler chain");
match handler
.handle_dispatch_from(dispatch, connection.clone())
.await?
.await
{
Handled::Yes => {
Ok(Handled::Yes) => {
tracing::trace!(?method, ?id, handler = ?handler.describe_chain(), "Handler accepted message");
return Ok(());
}

Handled::No { message: m, retry } => {
Ok(Handled::No { message: m, retry }) => {
tracing::trace!(?method, ?id, handler = ?handler.describe_chain(), "Handler declined message");
dispatch = m;
retry_any |= retry;
}

Err(err) => {
tracing::warn!(?method, ?id, ?err, handler = ?handler.describe_chain(), "Handler errored, reporting back to remote");
return report_handler_error(connection, id, err);
}
}

// Next, apply any dynamic handlers.
for dynamic_handler in dynamic_handlers.values_mut() {
tracing::trace!(handler = ?dynamic_handler.dyn_describe_chain(), "Attempting dynamic handler");
match dynamic_handler
.dyn_handle_dispatch_from(dispatch, connection.clone())
.await?
.await
{
Handled::Yes => {
Ok(Handled::Yes) => {
tracing::trace!(?method, ?id, handler = ?dynamic_handler.dyn_describe_chain(), "Dynamic handler accepted message");
return Ok(());
}

Handled::No { message: m, retry } => {
Ok(Handled::No { message: m, retry }) => {
tracing::trace!(?method, ?id, handler = ?dynamic_handler.dyn_describe_chain(), "Dynamic handler declined message");
retry_any |= retry;
dispatch = m;
}

Err(err) => {
tracing::warn!(?method, ?id, ?err, handler = ?dynamic_handler.dyn_describe_chain(), "Dynamic handler errored, reporting back to remote");
return report_handler_error(connection, id, err);
}
}
}

// Finally, apply the default handler for the role.
tracing::trace!(role = ?counterpart, "Attempting default handler");
match counterpart
.default_handle_dispatch_from(dispatch, connection.clone())
.await?
.await
{
Handled::Yes => {
Ok(Handled::Yes) => {
tracing::trace!(?method, handler = "default", "Role accepted message");
return Ok(());
}
Handled::No { message: m, retry } => {
Ok(Handled::No { message: m, retry }) => {
tracing::trace!(?method, handler = "default", "Role declined message");
dispatch = m;
retry_any |= retry;
}
Err(err) => {
tracing::warn!(
?method,
?id,
?err,
handler = "default",
"Default handler errored, reporting back to remote"
);
return report_handler_error(connection, id, err);
}
}

// If the message was never handled, check whether the retry flag was set.
Expand Down Expand Up @@ -330,3 +357,33 @@ async fn dispatch_dispatch<Counterpart: Role>(
}
}
}

/// When a handler returns an error, report it back to the remote side instead
/// of propagating it and tearing down the connection.
///
/// For requests (which have an id), sends a JSON-RPC error response.
/// For notifications (no id), sends an out-of-band error notification.
/// For responses, forwards the error to the local awaiter.
fn report_handler_error<Counterpart: Role>(
connection: &ConnectionTo<Counterpart>,
id: Option<serde_json::Value>,
error: crate::Error,
) -> Result<(), crate::Error> {
match id {
Some(id) => {
// Request: send error response with the original request id
let jsonrpc_id = serde_json::from_value(id).unwrap_or(jsonrpcmsg::Id::Null);
send_raw_message(
&connection.message_tx,
OutgoingMessage::Response {
id: jsonrpc_id,
response: Err(error),
},
)
}
None => {
// Notification or response without id: send error notification
connection.send_error_notification(error)
}
}
}
12 changes: 6 additions & 6 deletions src/agent-client-protocol-core/src/schema/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ macro_rules! impl_jsonrpc_request {
if method != $method {
return Err($crate::Error::method_not_found());
}
$crate::util::json_cast(params)
$crate::util::json_cast_params(params)
}
}

Expand Down Expand Up @@ -84,7 +84,7 @@ macro_rules! impl_jsonrpc_notification {
if method != $method {
return Err($crate::Error::method_not_found());
}
$crate::util::json_cast(params)
$crate::util::json_cast_params(params)
}
}

Expand Down Expand Up @@ -133,10 +133,10 @@ macro_rules! impl_jsonrpc_request_enum {
params: &impl serde::Serialize,
) -> Result<Self, $crate::Error> {
match method {
$( $(#[$meta])* $method => $crate::util::json_cast(params).map(Self::$variant), )*
$( $(#[$meta])* $method => $crate::util::json_cast_params(params).map(Self::$variant), )*
_ => {
if let Some(custom_method) = method.strip_prefix('_') {
$crate::util::json_cast(params).map(
$crate::util::json_cast_params(params).map(
|ext_req: $crate::schema::ExtRequest| {
Self::$ext_variant($crate::schema::ExtRequest::new(
custom_method.to_string(),
Expand Down Expand Up @@ -196,10 +196,10 @@ macro_rules! impl_jsonrpc_notification_enum {
params: &impl serde::Serialize,
) -> Result<Self, $crate::Error> {
match method {
$( $(#[$meta])* $method => $crate::util::json_cast(params).map(Self::$variant), )*
$( $(#[$meta])* $method => $crate::util::json_cast_params(params).map(Self::$variant), )*
_ => {
if let Some(custom_method) = method.strip_prefix('_') {
$crate::util::json_cast(params).map(
$crate::util::json_cast_params(params).map(
|ext_notif: $crate::schema::ExtNotification| {
Self::$ext_variant($crate::schema::ExtNotification::new(
custom_method.to_string(),
Expand Down
4 changes: 2 additions & 2 deletions src/agent-client-protocol-core/src/schema/proxy_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ impl<M: JsonRpcMessage> JsonRpcMessage for SuccessorMessage<M> {
if method != METHOD_SUCCESSOR_MESSAGE {
return Err(crate::Error::method_not_found());
}
let outer = crate::util::json_cast::<_, SuccessorMessage<UntypedMessage>>(params)?;
let outer = crate::util::json_cast_params::<_, SuccessorMessage<UntypedMessage>>(params)?;
if !M::matches_method(&outer.message.method) {
return Err(crate::Error::method_not_found());
}
Expand Down Expand Up @@ -161,7 +161,7 @@ impl<M: JsonRpcMessage> JsonRpcMessage for McpOverAcpMessage<M> {
if method != METHOD_MCP_MESSAGE {
return Err(crate::Error::method_not_found());
}
let outer = crate::util::json_cast::<_, McpOverAcpMessage<UntypedMessage>>(params)?;
let outer = crate::util::json_cast_params::<_, McpOverAcpMessage<UntypedMessage>>(params)?;
if !M::matches_method(&outer.message.method) {
return Err(crate::Error::method_not_found());
}
Expand Down
27 changes: 27 additions & 0 deletions src/agent-client-protocol-core/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,33 @@ where
Ok(m)
}

/// Cast incoming request/notification params into a typed payload.
///
/// Like [`json_cast`], but deserialization failures become
/// [`Error::invalid_params`](`crate::Error::invalid_params`) (`-32602`)
/// instead of a parse error, which is the correct JSON-RPC error code for
/// malformed method parameters.
pub fn json_cast_params<N, M>(params: N) -> Result<M, crate::Error>
where
N: serde::Serialize,
M: serde::de::DeserializeOwned,
{
let json = serde_json::to_value(params).map_err(|e| {
crate::Error::internal_error().data(serde_json::json!({
"error": e.to_string(),
"phase": "serialization"
}))
})?;
let m = serde_json::from_value(json.clone()).map_err(|e| {
crate::Error::invalid_params().data(serde_json::json!({
"error": e.to_string(),
"json": json,
"phase": "deserialization"
}))
})?;
Ok(m)
}

/// Creates an internal error with the given message
pub fn internal_error(message: impl ToString) -> crate::Error {
crate::Error::internal_error().data(message.to_string())
Expand Down
Loading