Skip to content

feat(agentcore): add AgentCoreRuntime adapter for deploying CrewAI crews#4887

Open
kevin-orellana wants to merge 1 commit intocrewAIInc:mainfrom
kevin-orellana:feat/agentcore-runtime
Open

feat(agentcore): add AgentCoreRuntime adapter for deploying CrewAI crews#4887
kevin-orellana wants to merge 1 commit intocrewAIInc:mainfrom
kevin-orellana:feat/agentcore-runtime

Conversation

@kevin-orellana
Copy link

@kevin-orellana kevin-orellana commented Mar 15, 2026

Summary

Add AgentCoreRuntime adapter to crewai-tools for deploying CrewAI Crew instances to AWS Bedrock AgentCore Runtime.

  • Wraps BedrockAgentCoreApp from the bedrock-agentcore SDK, providing POST /invocations and GET /ping endpoints
  • One-liner deployment: AgentCoreRuntime.serve(crew)
  • Streaming support: maps CrewAI StreamChunk events (text + tool calls) to SSE
  • Thread-safe: uses crew.copy() per streaming request to avoid mutating shared state
  • Accepts both CrewAI inputs dict and simple prompt/message/input payloads
  • Full BedrockAgentCoreApp constructor passthrough: debug, lifespan, middleware
  • Rich response including tasks_output, token_usage, and json_dict when available
  • Exported from crewai_tools.aws.bedrock for top-level access

Target API

from crewai import Crew, Agent, Task
from crewai_tools.aws.bedrock import AgentCoreRuntime

crew = Crew(agents=[...], tasks=[...])

# One-liner
AgentCoreRuntime.serve(crew)

# With options
runtime = AgentCoreRuntime(
    crew=crew,
    stream=True,        # SSE streaming (default)
    port=8080,          # default
    debug=False,
    lifespan=lifespan,  # Starlette lifespan handler
    middleware=[...],    # Starlette middleware stack
)
runtime.run()

# Or access the ASGI app directly for testing/mounting
app = runtime.app

SSE Event Mapping

CrewAI Event SSE Event Fields
StreamChunk(TEXT) {"event": "text"} content, agent_role, task_name
StreamChunk(TOOL_CALL) {"event": "tool_call"} tool_name, arguments, agent_role, task_name
Final CrewOutput {"event": "done"} response, tasks_output, token_usage, json
Exception {"event": "error"} message

Files changed

File Change
bedrock/__init__.py Export AgentCoreRuntime
runtime/__init__.py New submodule init
runtime/base.py AgentCoreRuntime class (~245 lines)
tests/tools/test_agentcore_runtime.py 49 unit tests
tests/tools/test_agentcore_runtime_e2e.py 9 ASGI integration tests

Test plan

58 tests total — all passing.

Unit tests (49)

  • Input extraction (17): standard inputs dict, prompt/message/input keys, nested dict, priority order, missing/empty/whitespace/list raises 400, numeric coercion, float coercion, whitespace stripping, empty prompt falls to message, non-dict inputs falls to prompt
  • Non-streaming handler (5): JSON response, prompt-style input, json_dict, tasks_output, token_usage
  • Streaming handler (3): text + done events, tool call events, does not mutate shared crew on error
  • Streaming fallback (2): RuntimeError on .result falls back to get_full_text(), error during chunk iteration yields error event
  • _stream_chunk_to_dict (4): text chunk, tool call chunk, TOOL_CALL with None tool_call falls to text, None tool_name defaults to empty string
  • _crew_output_to_dict (7): basic output, json_dict included, empty json_dict excluded, empty tasks excluded, None token_usage excluded, missing task attrs default to empty, missing usage fields default to 0
  • Constructor/app (7): serve calls run, streaming entrypoint is async gen, non-streaming entrypoint is coroutine, app property exposed, lifespan/middleware/all params passed through
  • run() passthrough (2): port/host forwarded to app, extra kwargs forwarded
  • Export (1): importable from crewai_tools.aws.bedrock
  • Top-level export (1): from crewai_tools.aws.bedrock import AgentCoreRuntime

ASGI integration tests (9)

Full HTTP round-trip via httpx.ASGITransport — no real server or AWS credentials needed:

  • Ping: GET /ping returns 200
  • Non-streaming (6): prompt, message, input, inputs dict keys all return 200 with response; missing prompt returns 400; empty payload returns 400
  • Streaming (3): returns SSE content-type, text events have correct content/agent_role, done event has final response

mock_type.name = chunk_type_name
# Make the == comparison work against the real enum
chunk.chunk_type = mock_type
return chunk
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused make_stream_chunk test helper is dead code

Low Severity

The make_stream_chunk helper function is defined but never called anywhere. All streaming tests construct their mock chunks directly with MagicMock() and the real StreamChunkType enum instead.

Fix in Cursor Fix in Web

@kevin-orellana
Copy link
Author

Code review

Found 1 issue:

  1. Thread-safety bug: _streaming_handler mutates self._crew.stream on the shared Crew instance (sets it to True, restores original in finally). Under concurrent requests, one request's finally block will restore stream to its original value while another request is still mid-stream, causing that request to produce incorrect output or fail. Consider creating a per-request copy of the crew, or passing the stream flag through a different mechanism that does not mutate shared state.

self, payload: dict, context: RequestContext
) -> dict:
"""Handle non-streaming invocation. Returns JSON response."""
inputs = self._extract_inputs(payload)
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
None, lambda: self._crew.kickoff(inputs=inputs)
)
return self._crew_output_to_dict(result)
async def _streaming_handler(

🤖 Generated with Claude Code

- If this code review was useful, please react with 👍. Otherwise, react with 👎.

@kevin-orellana kevin-orellana force-pushed the feat/agentcore-runtime branch from 5233ce2 to 51bd19d Compare March 15, 2026 05:21
…ews to AWS Bedrock AgentCore

Add runtime adapter that wraps BedrockAgentCoreApp to serve CrewAI Crew
instances via POST /invocations and GET /ping endpoints.

- One-liner deployment: AgentCoreRuntime.serve(crew)
- Streaming support via CrewAI's StreamChunk → SSE events
- Accepts both CrewAI inputs dict and simple prompt/message/input payloads
- Full BedrockAgentCoreApp constructor passthrough (debug, lifespan, middleware)
- Rich response with tasks_output, token_usage, and json_dict when available
@kevin-orellana kevin-orellana force-pushed the feat/agentcore-runtime branch from 51bd19d to 48d324d Compare March 15, 2026 05:28
Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

There are 2 total unresolved issues (including 1 from previous review).

Fix All in Cursor

Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.

result = await loop.run_in_executor(
None, lambda: self._crew.kickoff(inputs=inputs)
)
return self._crew_output_to_dict(result)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non-streaming handler has race condition on shared crew

High Severity

_non_streaming_handler calls self._crew.kickoff() directly on the shared Crew instance via run_in_executor, but kickoff() extensively mutates the crew — prepare_kickoff sets _inputs, _kickoff_event_id, interpolates inputs into tasks/agents in-place, resets the task output handler, and more. Under concurrent HTTP requests, multiple thread pool workers will mutate the same instance simultaneously, causing data corruption. The streaming handler correctly avoids this with self._crew.copy(), but the non-streaming handler lacks the same protection.

Additional Locations (1)
Fix in Cursor Fix in Web

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants