Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions sucoder/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -1797,3 +1797,64 @@ def _collect_directory_preview(path: Path, executor: CommandExecutor, limit: int
trimmed.append(f"...(+{extras})")
return trimmed
return entries


@app.command("mcp-suggest")
def mcp_suggest(
ctx: typer.Context,
mirror: Optional[str] = typer.Argument(
None, help="Mirror name (auto-detected when omitted).",
shell_complete=_mirror_completion,
),
verbose: bool = typer.Option(False, "--verbose", "-v", help="Increase console logging."),
apply: bool = typer.Option(False, "--apply", help="Save accepted servers to mirror prefs."),
) -> None:
"""Scan mirror for tech stack indicators and suggest MCP servers."""
import json as _json

from .config import McpServerConfig
from .mcp_discovery import detect_suggestions
from .workspace_prefs import WorkspacePrefs

mirror_name = _resolve_mirror_name(ctx, mirror)
config = _get_config(ctx)
logger = setup_logger(f"sucoder.{mirror_name}", config.log_dir, verbose)
manager = _build_manager_for_mirror(config, logger, False, mirror_name)
mirror_ctx = manager.context_for(mirror_name)
mirror_path = mirror_ctx.mirror_path

if not mirror_path.exists():
typer.secho(f"Mirror path does not exist: {mirror_path}", fg="red")
raise typer.Exit(code=1)

existing: Dict[str, McpServerConfig] = dict(mirror_ctx.settings.mcp_servers)
repo_mcp = mirror_path / ".mcp.json"
if repo_mcp.exists():
try:
repo_data = _json.loads(repo_mcp.read_text(encoding="utf-8"))
for name in repo_data.get("mcpServers", {}):
existing[name] = McpServerConfig(command="")
except (_json.JSONDecodeError, OSError):
pass

suggestions = detect_suggestions(mirror_path, existing)

if not suggestions:
typer.echo("No additional MCP servers suggested for this repo.")
raise typer.Exit()

typer.echo(f"Suggested MCP servers for {mirror_name}:\n")
for s in suggestions:
env_note = f" (requires: {', '.join(s.required_env)})" if s.required_env else ""
if s.required_env and not all(os.environ.get(v) for v in s.required_env):
status = typer.style("missing env", fg="yellow")
else:
status = typer.style("ready", fg="green")
typer.echo(f" {s.name}: {s.description}{env_note} [{status}]")

if apply:
prefs = WorkspacePrefs.load(mirror_path)
decisions = {s.name: True for s in suggestions}
prefs.set_mcp_discovery(decisions)
prefs.save()
typer.echo(f"\nSaved {len(suggestions)} server(s) to mirror prefs.")
112 changes: 112 additions & 0 deletions sucoder/mcp_discovery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
"""Automatic MCP server discovery based on repository tech stack."""

from __future__ import annotations

import json
from dataclasses import dataclass, field
from pathlib import Path
from typing import Dict, List

from .config import McpServerConfig


@dataclass
class McpSuggestion:
"""A single discoverable MCP server tied to file-based indicators."""

name: str
indicator_patterns: List[str]
server: McpServerConfig
required_env: List[str] = field(default_factory=list)
description: str = ""


# Static mapping of file indicators to MCP server suggestions.
# Order: specific indicators first, always-on suggestions last.
MCP_SUGGESTIONS: List[McpSuggestion] = [
McpSuggestion(
name="github",
indicator_patterns=[".github/"],
server=McpServerConfig(
command="npx",
args=["-y", "@modelcontextprotocol/server-github"],
env={"GITHUB_TOKEN": ""},
),
required_env=["GITHUB_TOKEN"],
description="GitHub API (issues, PRs, actions)",
),
McpSuggestion(
name="postgres",
indicator_patterns=["docker-compose.yml", "docker-compose.yaml"],
server=McpServerConfig(
command="npx",
args=["-y", "@modelcontextprotocol/server-postgres"],
env={"DATABASE_URL": ""},
),
required_env=["DATABASE_URL"],
description="PostgreSQL database access",
),
McpSuggestion(
name="fetch",
indicator_patterns=["package.json", "openapi.yaml", "openapi.json", "swagger.json"],
server=McpServerConfig(
command="npx",
args=["-y", "@anthropic-ai/mcp-server-fetch"],
),
description="Fetch web pages and URLs",
),
McpSuggestion(
name="memory",
indicator_patterns=[], # Always suggested
server=McpServerConfig(
command="npx",
args=["-y", "@modelcontextprotocol/server-memory"],
),
description="Persistent memory across sessions",
),
]


def _pattern_matches(mirror_path: Path, pattern: str) -> bool:
"""Check whether an indicator pattern matches something in mirror_path."""
if pattern.endswith("/"):
return (mirror_path / pattern.rstrip("/")).is_dir()
return (mirror_path / pattern).exists()


def _read_repo_mcp_server_names(mirror_path: Path) -> set[str]:
"""Return server names already defined in the repo's ``.mcp.json``."""
mcp_file = mirror_path / ".mcp.json"
if not mcp_file.exists():
return set()
try:
data = json.loads(mcp_file.read_text(encoding="utf-8"))
return set(data.get("mcpServers", {}).keys())
except (json.JSONDecodeError, OSError):
return set()


def detect_suggestions(
mirror_path: Path,
existing_servers: Dict[str, McpServerConfig],
) -> List[McpSuggestion]:
"""Return MCP suggestions for *mirror_path*, excluding already-configured servers.

*existing_servers* should include servers from both the sucoder config
(``mcp_servers``) and any other source the caller wants to deduplicate against.
The repo's ``.mcp.json`` is also read for deduplication.
"""
skip_names = set(existing_servers.keys()) | _read_repo_mcp_server_names(mirror_path)

results: List[McpSuggestion] = []
for suggestion in MCP_SUGGESTIONS:
if suggestion.name in skip_names:
continue
# Empty indicator_patterns means "always suggest"
if not suggestion.indicator_patterns:
results.append(suggestion)
continue
if any(_pattern_matches(mirror_path, p) for p in suggestion.indicator_patterns):
results.append(suggestion)

return results
62 changes: 62 additions & 0 deletions sucoder/mirror.py
Original file line number Diff line number Diff line change
Expand Up @@ -1034,6 +1034,7 @@ def launch_agent(
)

self._maybe_run_poetry_auto_install(ctx, mirror_path)
self._maybe_suggest_mcp_servers(ctx, mirror_path)

# Get merged templates (per-mirror > global > agent profile)
templates = self._get_merged_templates(command, launcher)
Expand Down Expand Up @@ -1300,6 +1301,67 @@ def _poetry_error_highlights(self, error: CommandError) -> Sequence[str]:
highlights.append(line)
return highlights

def _maybe_suggest_mcp_servers(self, ctx: MirrorContext, mirror_path: Path) -> None:
"""Scan the mirror for tech-stack indicators and offer relevant MCP servers."""
if ctx.is_remote:
return

from .mcp_discovery import detect_suggestions

prefs = WorkspacePrefs.load(mirror_path)
previous = prefs.mcp_discovery()

existing = dict(ctx.settings.mcp_servers)
suggestions = detect_suggestions(mirror_path, existing)
if not suggestions:
return

# Filter out servers the user already decided on.
if previous is not None:
suggestions = [s for s in suggestions if s.name not in previous]
if not suggestions:
return

if self._prompt_handler is None:
self.logger.info("Skipping MCP discovery (no prompt handler).")
return

lines = ["Detected tech stack suggests these MCP servers:"]
for s in suggestions:
env_note = f" (requires: {', '.join(s.required_env)})" if s.required_env else ""
lines.append(f" - {s.name}: {s.description}{env_note}")
lines.append("Enable discovered servers?")
message = "\n".join(lines)

try:
accepted = bool(self._prompt_handler(message))
except Exception:
accepted = False

new_decisions: Dict[str, bool] = {}
for s in suggestions:
new_decisions[s.name] = accepted
if not accepted:
continue
missing = [v for v in s.required_env if not os.environ.get(v)]
if missing:
self.logger.info(
"Skipping MCP server %s: missing env var(s) %s",
s.name,
", ".join(missing),
)
continue
from .config import McpServerConfig
server = McpServerConfig(
command=s.server.command,
args=list(s.server.args),
env={k: os.environ.get(k, "") for k in s.required_env} if s.required_env else {},
)
ctx.settings.mcp_servers[s.name] = server

prefs.set_mcp_discovery(new_decisions)
prefs.save()

def bootstrap(
self,
ctx: MirrorContext,
Expand Down
21 changes: 21 additions & 0 deletions sucoder/workspace_prefs.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,24 @@ def set_poetry_auto_install(self, enabled: bool) -> None:
"enabled": enabled,
"decided_at": _utc_now_iso(),
}

def mcp_discovery(self) -> Optional[Dict[str, bool]]:
"""Return per-server accept/decline decisions, or ``None`` if never asked."""
raw = self.data.get("mcp_discovery")
if isinstance(raw, dict) and "servers" in raw:
servers = raw["servers"]
if isinstance(servers, dict):
return servers
return None

def set_mcp_discovery(self, servers: Dict[str, bool]) -> None:
"""Record which discovered servers were accepted or declined."""
existing = self.data.get("mcp_discovery", {})
if not isinstance(existing, dict):
existing = {}
merged = dict(existing.get("servers", {}))
merged.update(servers)
self.data["mcp_discovery"] = {
"servers": merged,
"decided_at": _utc_now_iso(),
}
Loading
Loading