Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
532 changes: 532 additions & 0 deletions braintrust/README.md

Large diffs are not rendered by default.

126 changes: 126 additions & 0 deletions braintrust/activities/invoke_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
from temporalio import activity
from openai import AsyncOpenAI
import braintrust
from braintrust import wrap_openai
from typing import Optional, List, cast, Any, TypeVar, Generic
from typing_extensions import Annotated
from pydantic import BaseModel
from pydantic.functional_validators import BeforeValidator
from pydantic.functional_serializers import PlainSerializer

import importlib
import os

T = TypeVar("T", bound=BaseModel)


def _coerce_class(v: Any) -> type[Any]:
"""Pydantic validator: convert string path to class during deserialization."""
if isinstance(v, str):
mod_path, sep, qual = v.partition(":")
if not sep: # support "package.module.Class"
mod_path, _, qual = v.rpartition(".")
module = importlib.import_module(mod_path)
obj = module
for attr in qual.split("."):
obj = getattr(obj, attr)
return cast(type[Any], obj)
elif isinstance(v, type):
return v
else:
raise ValueError(f"Cannot coerce {v} to class")


def _dump_class(t: type[Any]) -> str:
"""Pydantic serializer: convert class to string path during serialization."""
return f"{t.__module__}:{t.__qualname__}"


# Custom type that automatically handles class <-> string conversion in Pydantic serialization
ClassReference = Annotated[
type[T],
BeforeValidator(_coerce_class),
PlainSerializer(_dump_class, return_type=str),
]


class InvokeModelRequest(BaseModel, Generic[T]):
model: str
instructions: str # Fallback if Braintrust prompt unavailable
input: str
prompt_slug: Optional[str] = None # Braintrust prompt slug (e.g., "report-synthesis")
response_format: Optional[ClassReference[T]] = None
tools: Optional[List[dict]] = None


class InvokeModelResponse(BaseModel, Generic[T]):
# response_format records the type of the response model
response_format: Optional[ClassReference[T]] = None
response_model: Any

@property
def response(self) -> T:
"""Reconstruct the original response type if response_format was provided."""
if self.response_format:
model_cls = self.response_format
return model_cls.model_validate(self.response_model)
return self.response_model


@activity.defn
async def invoke_model(request: InvokeModelRequest[T]) -> InvokeModelResponse[T]:
instructions = request.instructions

# Load prompt from Braintrust if slug provided
if request.prompt_slug:
try:
prompt = braintrust.load_prompt(
project=os.environ.get("BRAINTRUST_PROJECT", "deep-research"),
slug=request.prompt_slug,
)
# Extract system message content only
# NOTE: Other params (temperature, max_tokens, model) are NOT used
built = prompt.build()
for msg in built.get("messages", []):
if msg.get("role") == "system":
instructions = msg["content"]
activity.logger.info(
f"Loaded prompt '{request.prompt_slug}' from Braintrust"
)
break
except Exception as e:
# Log warning but continue with fallback
activity.logger.warning(
f"Failed to load prompt '{request.prompt_slug}': {e}. "
"Using hardcoded fallback."
)

client = wrap_openai(AsyncOpenAI(max_retries=0))

kwargs: dict[str, Any] = {
"model": request.model,
"instructions": instructions,
"input": request.input,
}

if request.response_format:
kwargs["text_format"] = request.response_format

if request.tools:
kwargs["tools"] = request.tools

# Use responses API consistently
resp = await client.responses.parse(**kwargs)

if request.response_format:
# Convert structured response to dict for managed serialization.
# This allows us to reconstruct the original response type while maintaining type safety.
parsed_model = cast(BaseModel, resp.output_parsed)
return InvokeModelResponse(
response_model=parsed_model.model_dump(),
response_format=request.response_format,
)
else:
return InvokeModelResponse(
response_model=resp.output_text, response_format=None
)
10 changes: 10 additions & 0 deletions braintrust/agents/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
"""Configuration constants for the deep research system."""

# Model configuration constants
# Change these values to switch models globally
COMPLEX_REASONING_MODEL = "gpt-4o" # For planning and synthesis tasks
EFFICIENT_PROCESSING_MODEL = "gpt-4o-mini" # For query generation and search analysis

# Alternative model options (uncomment to use):
# COMPLEX_REASONING_MODEL = "gpt-5"
# EFFICIENT_PROCESSING_MODEL = "gpt-5-mini"
42 changes: 42 additions & 0 deletions braintrust/agents/research_planning.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from .shared import ResearchPlan, today_str
from .config import COMPLEX_REASONING_MODEL
from activities.invoke_model import invoke_model, InvokeModelRequest
from temporalio import workflow
from datetime import timedelta

RESEARCH_PLANNING_INSTRUCTIONS = f"""
You are a research planning specialist who creates focused research strategies.

CORE RESPONSIBILITIES:
1. Decompose the user's question into 3-7 key research aspects
2. Identify required sources and evidence types
3. Design a practical search strategy
4. Set clear success criteria

OUTPUT REQUIREMENTS:
- research_question: Clarified version of the original query
- key_aspects: Specific areas requiring investigation, each with:
- aspect: The research area name
- priority: 1-5 ranking (5 highest priority)
- description: What needs to be investigated
- expected_sources: Types of sources likely to contain relevant information
- search_strategy: High-level approach for information gathering
- success_criteria: Specific indicators of research completeness

TODAY'S DATE: {today_str()}
"""


async def plan_research(query: str) -> ResearchPlan:
result = await workflow.execute_activity(
invoke_model,
InvokeModelRequest(
model=COMPLEX_REASONING_MODEL,
instructions=RESEARCH_PLANNING_INSTRUCTIONS,
input=f"Research query: {query}",
response_format=ResearchPlan,
),
start_to_close_timeout=timedelta(seconds=300),
summary="Planning research",
)
return result.response
56 changes: 56 additions & 0 deletions braintrust/agents/research_query_generation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
from .shared import QueryPlan, ResearchPlan, today_str
from .config import EFFICIENT_PROCESSING_MODEL
from activities.invoke_model import invoke_model, InvokeModelRequest
from temporalio import workflow
from datetime import timedelta

QUERY_GENERATION_INSTRUCTIONS = f"""
You are a search query specialist who crafts effective web searches.

CORE RESPONSIBILITIES:
1. Generate 3-5 diverse search queries based on the research plan
2. Balance specificity with discoverability
3. Target different information types (factual, analytical, recent, historical)

APPROACH:
- Vary query styles: direct questions, topic + keywords, source-specific searches
- Include temporal modifiers when relevant (recent, 2024, historical)
- Use domain-specific terminology appropriately

OUTPUT REQUIREMENTS:
- queries: Search queries, each with:
- query: The actual search string
- rationale: Why this query addresses research needs
- expected_info_type: One of "factual_data", "expert_analysis", "case_studies", "recent_news"
- priority: 1-5 (5 highest priority)

TODAY'S DATE: {today_str()}
"""


async def generate_queries(research_plan: ResearchPlan) -> QueryPlan:
# Prepare input with research plan context
plan_context = f"""
Research Question: {research_plan.research_question}

Key Aspects to Research:
{chr(10).join([f"- {aspect.aspect} (Priority: {aspect.priority}): {aspect.description}" for aspect in research_plan.key_aspects])}

Expected Sources: {", ".join(research_plan.expected_sources)}
Search Strategy: {research_plan.search_strategy}
Success Criteria: {", ".join(research_plan.success_criteria)}
"""

result = await workflow.execute_activity(
invoke_model,
InvokeModelRequest(
model=EFFICIENT_PROCESSING_MODEL,
instructions=QUERY_GENERATION_INSTRUCTIONS,
input=plan_context,
response_format=QueryPlan,
),
start_to_close_timeout=timedelta(seconds=300),
summary="Generating search queries",
)

return result.response
84 changes: 84 additions & 0 deletions braintrust/agents/research_report_synthesis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
from typing import List
from temporalio import workflow
from datetime import timedelta
from .shared import ResearchReport, ResearchPlan, SearchResult, today_str
from .config import COMPLEX_REASONING_MODEL
from activities.invoke_model import invoke_model, InvokeModelRequest

REPORT_SYNTHESIS_INSTRUCTIONS = f"""
You are a research synthesis expert who creates comprehensive research reports.

CORE RESPONSIBILITIES:
1. Synthesize all research into a coherent narrative
2. Structure information logically with evidence support
3. Provide comprehensive citations
4. Assess confidence levels and acknowledge limitations
5. Generate follow-up questions for deeper research

REPORT STRUCTURE:
1. **Executive Summary**: Core findings and conclusions (1-2 paragraphs)
2. **Detailed Analysis**: Examination organized by themes with evidence
3. **Key Findings**: Bullet-point list of important discoveries
4. **Confidence Assessment**: Rate findings as High/Medium/Low/Uncertain
5. **Citations**: Complete source list with URLs
6. **Follow-up Questions**: Up to 5 areas for additional research, as warranted

APPROACH:
- Address contradictory findings transparently
- Weight authoritative sources more heavily
- Distinguish facts from expert opinions
- Be explicit about information limitations

OUTPUT REQUIREMENTS:
- executive_summary: 1-2 paragraph summary of core findings
- detailed_analysis: Multi-paragraph analysis organized by themes
- key_findings: Bullet-point discoveries
- confidence_assessment: Assessment of finding reliability
- citations: All sources referenced
- follow_up_questions: 3-5 specific questions for further research

TODAY'S DATE: {today_str()}
"""


async def generate_synthesis(
original_query: str, research_plan: ResearchPlan, search_results: List[SearchResult]
) -> ResearchReport:
# Prepare comprehensive input with all research context
synthesis_input = f"""
ORIGINAL RESEARCH QUERY: {original_query}

RESEARCH PLAN:
Research Question: {research_plan.research_question}
Key Aspects Investigated: {
", ".join([aspect.aspect for aspect in research_plan.key_aspects])
}
Search Strategy Used: {research_plan.search_strategy}
Success Criteria: {", ".join(research_plan.success_criteria)}

SEARCH RESULTS TO SYNTHESIZE:
{
chr(10).join(
[
f"Query: {result.query}{chr(10)}Findings: {result.key_findings}{chr(10)}Relevance: {result.relevance_score}{chr(10)}Sources: {', '.join(result.sources)}{chr(10)}Citations: {', '.join(result.citations)}{chr(10)}"
for result in search_results
]
)
}

Please synthesize all this information into a comprehensive research report following the specified structure and quality standards.
"""
result = await workflow.execute_activity(
invoke_model,
InvokeModelRequest(
model=COMPLEX_REASONING_MODEL,
instructions=REPORT_SYNTHESIS_INSTRUCTIONS, # Fallback
input=synthesis_input,
prompt_slug="report-synthesis", # Load from Braintrust if available
response_format=ResearchReport,
),
start_to_close_timeout=timedelta(seconds=300),
summary="Generating research report synthesis",
)

return result.response
54 changes: 54 additions & 0 deletions braintrust/agents/research_web_search.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
from .shared import SearchResult, SearchQuery, today_str
from .config import EFFICIENT_PROCESSING_MODEL
from activities.invoke_model import invoke_model, InvokeModelRequest
from temporalio import workflow
from datetime import timedelta

WEB_SEARCH_INSTRUCTIONS = f"""
You are a web research specialist who finds and evaluates information from web sources.

CORE RESPONSIBILITIES:
1. Execute web searches using the web search tool
2. Prioritize authoritative sources: academic, government, established research organizations, prominent news outlets, primary sources
3. Extract key information relevant to the research question
4. Provide proper citations and assess reliability

APPROACH:
- Focus on information directly relevant to the research question
- Extract specific facts, data points, and evidence
- Note conflicting information and limitations
- Flag questionable or unverified claims

OUTPUT REQUIREMENTS:
- query: The search query that was executed
- sources: URLs and source descriptions consulted
- key_findings: Synthesized information relevant to research question (2-4 paragraphs)
- relevance_score: 0.0-1.0 assessment of how well results address the query
- citations: Formatted sources with URLs

TODAY'S DATE: {today_str()}
"""


async def search_web(query: SearchQuery) -> SearchResult:
search_input = f"""
Search Query: {query.query}
Query Rationale: {query.rationale}
Expected Information Type: {query.expected_info_type}
Priority Level: {query.priority}

Please search for information using the provided query and analyze the results according to the instructions.
"""
result = await workflow.execute_activity(
invoke_model,
InvokeModelRequest(
model=EFFICIENT_PROCESSING_MODEL,
instructions=WEB_SEARCH_INSTRUCTIONS,
input=search_input,
response_format=SearchResult,
tools=[{"type": "web_search"}],
),
start_to_close_timeout=timedelta(seconds=300),
summary="Searching web for information",
)
return result.response
Loading
Loading