diff --git a/sucoder/cli.py b/sucoder/cli.py index f9b3f5d..b20bcc6 100644 --- a/sucoder/cli.py +++ b/sucoder/cli.py @@ -1797,3 +1797,64 @@ def _collect_directory_preview(path: Path, executor: CommandExecutor, limit: int trimmed.append(f"...(+{extras})") return trimmed return entries + + +@app.command("mcp-suggest") +def mcp_suggest( + ctx: typer.Context, + mirror: Optional[str] = typer.Argument( + None, help="Mirror name (auto-detected when omitted).", + shell_complete=_mirror_completion, + ), + verbose: bool = typer.Option(False, "--verbose", "-v", help="Increase console logging."), + apply: bool = typer.Option(False, "--apply", help="Save accepted servers to mirror prefs."), +) -> None: + """Scan mirror for tech stack indicators and suggest MCP servers.""" + import json as _json + + from .config import McpServerConfig + from .mcp_discovery import detect_suggestions + from .workspace_prefs import WorkspacePrefs + + mirror_name = _resolve_mirror_name(ctx, mirror) + config = _get_config(ctx) + logger = setup_logger(f"sucoder.{mirror_name}", config.log_dir, verbose) + manager = _build_manager_for_mirror(config, logger, False, mirror_name) + mirror_ctx = manager.context_for(mirror_name) + mirror_path = mirror_ctx.mirror_path + + if not mirror_path.exists(): + typer.secho(f"Mirror path does not exist: {mirror_path}", fg="red") + raise typer.Exit(code=1) + + existing: Dict[str, McpServerConfig] = dict(mirror_ctx.settings.mcp_servers) + repo_mcp = mirror_path / ".mcp.json" + if repo_mcp.exists(): + try: + repo_data = _json.loads(repo_mcp.read_text(encoding="utf-8")) + for name in repo_data.get("mcpServers", {}): + existing[name] = McpServerConfig(command="") + except (_json.JSONDecodeError, OSError): + pass + + suggestions = detect_suggestions(mirror_path, existing) + + if not suggestions: + typer.echo("No additional MCP servers suggested for this repo.") + raise typer.Exit() + + typer.echo(f"Suggested MCP servers for {mirror_name}:\n") + for s in suggestions: + env_note = f" (requires: {', '.join(s.required_env)})" if s.required_env else "" + if s.required_env and not all(os.environ.get(v) for v in s.required_env): + status = typer.style("missing env", fg="yellow") + else: + status = typer.style("ready", fg="green") + typer.echo(f" {s.name}: {s.description}{env_note} [{status}]") + + if apply: + prefs = WorkspacePrefs.load(mirror_path) + decisions = {s.name: True for s in suggestions} + prefs.set_mcp_discovery(decisions) + prefs.save() + typer.echo(f"\nSaved {len(suggestions)} server(s) to mirror prefs.") diff --git a/sucoder/mcp_discovery.py b/sucoder/mcp_discovery.py new file mode 100644 index 0000000..7ed97f9 --- /dev/null +++ b/sucoder/mcp_discovery.py @@ -0,0 +1,112 @@ +"""Automatic MCP server discovery based on repository tech stack.""" + +from __future__ import annotations + +import json +from dataclasses import dataclass, field +from pathlib import Path +from typing import Dict, List + +from .config import McpServerConfig + + +@dataclass +class McpSuggestion: + """A single discoverable MCP server tied to file-based indicators.""" + + name: str + indicator_patterns: List[str] + server: McpServerConfig + required_env: List[str] = field(default_factory=list) + description: str = "" + + +# Static mapping of file indicators to MCP server suggestions. +# Order: specific indicators first, always-on suggestions last. +MCP_SUGGESTIONS: List[McpSuggestion] = [ + McpSuggestion( + name="github", + indicator_patterns=[".github/"], + server=McpServerConfig( + command="npx", + args=["-y", "@modelcontextprotocol/server-github"], + env={"GITHUB_TOKEN": ""}, + ), + required_env=["GITHUB_TOKEN"], + description="GitHub API (issues, PRs, actions)", + ), + McpSuggestion( + name="postgres", + indicator_patterns=["docker-compose.yml", "docker-compose.yaml"], + server=McpServerConfig( + command="npx", + args=["-y", "@modelcontextprotocol/server-postgres"], + env={"DATABASE_URL": ""}, + ), + required_env=["DATABASE_URL"], + description="PostgreSQL database access", + ), + McpSuggestion( + name="fetch", + indicator_patterns=["package.json", "openapi.yaml", "openapi.json", "swagger.json"], + server=McpServerConfig( + command="npx", + args=["-y", "@anthropic-ai/mcp-server-fetch"], + ), + description="Fetch web pages and URLs", + ), + McpSuggestion( + name="memory", + indicator_patterns=[], # Always suggested + server=McpServerConfig( + command="npx", + args=["-y", "@modelcontextprotocol/server-memory"], + ), + description="Persistent memory across sessions", + ), +] + + +def _pattern_matches(mirror_path: Path, pattern: str) -> bool: + """Check whether an indicator pattern matches something in mirror_path.""" + if pattern.endswith("/"): + return (mirror_path / pattern.rstrip("/")).is_dir() + return (mirror_path / pattern).exists() + + +def _read_repo_mcp_server_names(mirror_path: Path) -> set[str]: + """Return server names already defined in the repo's ``.mcp.json``.""" + mcp_file = mirror_path / ".mcp.json" + if not mcp_file.exists(): + return set() + try: + data = json.loads(mcp_file.read_text(encoding="utf-8")) + return set(data.get("mcpServers", {}).keys()) + except (json.JSONDecodeError, OSError): + return set() + + +def detect_suggestions( + mirror_path: Path, + existing_servers: Dict[str, McpServerConfig], +) -> List[McpSuggestion]: + """Return MCP suggestions for *mirror_path*, excluding already-configured servers. + + *existing_servers* should include servers from both the sucoder config + (``mcp_servers``) and any other source the caller wants to deduplicate against. + The repo's ``.mcp.json`` is also read for deduplication. + """ + skip_names = set(existing_servers.keys()) | _read_repo_mcp_server_names(mirror_path) + + results: List[McpSuggestion] = [] + for suggestion in MCP_SUGGESTIONS: + if suggestion.name in skip_names: + continue + # Empty indicator_patterns means "always suggest" + if not suggestion.indicator_patterns: + results.append(suggestion) + continue + if any(_pattern_matches(mirror_path, p) for p in suggestion.indicator_patterns): + results.append(suggestion) + + return results diff --git a/sucoder/mirror.py b/sucoder/mirror.py index 37ec968..5d1af93 100644 --- a/sucoder/mirror.py +++ b/sucoder/mirror.py @@ -1034,6 +1034,7 @@ def launch_agent( ) self._maybe_run_poetry_auto_install(ctx, mirror_path) + self._maybe_suggest_mcp_servers(ctx, mirror_path) # Get merged templates (per-mirror > global > agent profile) templates = self._get_merged_templates(command, launcher) @@ -1300,6 +1301,67 @@ def _poetry_error_highlights(self, error: CommandError) -> Sequence[str]: highlights.append(line) return highlights + def _maybe_suggest_mcp_servers(self, ctx: MirrorContext, mirror_path: Path) -> None: + """Scan the mirror for tech-stack indicators and offer relevant MCP servers.""" + if ctx.is_remote: + return + + from .mcp_discovery import detect_suggestions + + prefs = WorkspacePrefs.load(mirror_path) + previous = prefs.mcp_discovery() + + existing = dict(ctx.settings.mcp_servers) + suggestions = detect_suggestions(mirror_path, existing) + if not suggestions: + return + + # Filter out servers the user already decided on. + if previous is not None: + suggestions = [s for s in suggestions if s.name not in previous] + if not suggestions: + return + + if self._prompt_handler is None: + self.logger.info("Skipping MCP discovery (no prompt handler).") + return + + lines = ["Detected tech stack suggests these MCP servers:"] + for s in suggestions: + env_note = f" (requires: {', '.join(s.required_env)})" if s.required_env else "" + lines.append(f" - {s.name}: {s.description}{env_note}") + lines.append("Enable discovered servers?") + message = "\n".join(lines) + + try: + accepted = bool(self._prompt_handler(message)) + except Exception: + accepted = False + + new_decisions: Dict[str, bool] = {} + for s in suggestions: + new_decisions[s.name] = accepted + if not accepted: + continue + missing = [v for v in s.required_env if not os.environ.get(v)] + if missing: + self.logger.info( + "Skipping MCP server %s: missing env var(s) %s", + s.name, + ", ".join(missing), + ) + continue + from .config import McpServerConfig + server = McpServerConfig( + command=s.server.command, + args=list(s.server.args), + env={k: os.environ.get(k, "") for k in s.required_env} if s.required_env else {}, + ) + ctx.settings.mcp_servers[s.name] = server + + prefs.set_mcp_discovery(new_decisions) + prefs.save() + def bootstrap( self, ctx: MirrorContext, diff --git a/sucoder/workspace_prefs.py b/sucoder/workspace_prefs.py index 6b3a0b2..d1b902e 100644 --- a/sucoder/workspace_prefs.py +++ b/sucoder/workspace_prefs.py @@ -69,3 +69,24 @@ def set_poetry_auto_install(self, enabled: bool) -> None: "enabled": enabled, "decided_at": _utc_now_iso(), } + + def mcp_discovery(self) -> Optional[Dict[str, bool]]: + """Return per-server accept/decline decisions, or ``None`` if never asked.""" + raw = self.data.get("mcp_discovery") + if isinstance(raw, dict) and "servers" in raw: + servers = raw["servers"] + if isinstance(servers, dict): + return servers + return None + + def set_mcp_discovery(self, servers: Dict[str, bool]) -> None: + """Record which discovered servers were accepted or declined.""" + existing = self.data.get("mcp_discovery", {}) + if not isinstance(existing, dict): + existing = {} + merged = dict(existing.get("servers", {})) + merged.update(servers) + self.data["mcp_discovery"] = { + "servers": merged, + "decided_at": _utc_now_iso(), + } diff --git a/tests/test_mcp_discovery.py b/tests/test_mcp_discovery.py new file mode 100644 index 0000000..279ad50 --- /dev/null +++ b/tests/test_mcp_discovery.py @@ -0,0 +1,226 @@ +import json +import os +from pathlib import Path +from typing import Dict + +import pytest + +from sucoder.config import McpServerConfig +from sucoder.mcp_discovery import ( + MCP_SUGGESTIONS, + McpSuggestion, + detect_suggestions, +) +from sucoder.workspace_prefs import WorkspacePrefs + + +# -- detect_suggestions tests ------------------------------------------------- + + +def test_detect_suggestions_empty_dir(tmp_path: Path) -> None: + """A bare directory still gets always-on suggestions (memory).""" + results = detect_suggestions(tmp_path, {}) + names = [s.name for s in results] + assert "memory" in names + # No github suggestion without .github/ + assert "github" not in names + + +def test_detect_suggestions_github_dir(tmp_path: Path) -> None: + """A .github/ directory triggers the github suggestion.""" + (tmp_path / ".github").mkdir() + results = detect_suggestions(tmp_path, {}) + names = [s.name for s in results] + assert "github" in names + assert "memory" in names + + +def test_detect_suggestions_package_json(tmp_path: Path) -> None: + """A package.json triggers the fetch suggestion.""" + (tmp_path / "package.json").write_text("{}", encoding="utf-8") + results = detect_suggestions(tmp_path, {}) + names = [s.name for s in results] + assert "fetch" in names + + +def test_detect_suggestions_docker_compose(tmp_path: Path) -> None: + """docker-compose.yml triggers the postgres suggestion.""" + (tmp_path / "docker-compose.yml").write_text("version: '3'", encoding="utf-8") + results = detect_suggestions(tmp_path, {}) + names = [s.name for s in results] + assert "postgres" in names + + +def test_detect_suggestions_dedup_existing_config(tmp_path: Path) -> None: + """Servers already in config are excluded from suggestions.""" + (tmp_path / ".github").mkdir() + existing = {"github": McpServerConfig(command="npx")} + results = detect_suggestions(tmp_path, existing) + names = [s.name for s in results] + assert "github" not in names + # Memory should still be suggested + assert "memory" in names + + +def test_detect_suggestions_dedup_repo_mcp_json(tmp_path: Path) -> None: + """Servers defined in repo's .mcp.json are excluded from suggestions.""" + (tmp_path / ".github").mkdir() + mcp_data = {"mcpServers": {"github": {"command": "npx", "args": []}}} + (tmp_path / ".mcp.json").write_text(json.dumps(mcp_data), encoding="utf-8") + + results = detect_suggestions(tmp_path, {}) + names = [s.name for s in results] + assert "github" not in names + + +def test_detect_suggestions_invalid_mcp_json(tmp_path: Path) -> None: + """Invalid .mcp.json is handled gracefully.""" + (tmp_path / ".github").mkdir() + (tmp_path / ".mcp.json").write_text("not json", encoding="utf-8") + results = detect_suggestions(tmp_path, {}) + names = [s.name for s in results] + # github should still appear since .mcp.json couldn't be parsed + assert "github" in names + + +def test_detect_suggestions_openapi(tmp_path: Path) -> None: + """openapi.yaml triggers the fetch suggestion.""" + (tmp_path / "openapi.yaml").write_text("openapi: 3.0", encoding="utf-8") + results = detect_suggestions(tmp_path, {}) + names = [s.name for s in results] + assert "fetch" in names + + +# -- WorkspacePrefs MCP discovery tests --------------------------------------- + + +def test_workspace_prefs_mcp_discovery_default(tmp_path: Path) -> None: + """Default prefs have no MCP discovery decisions.""" + prefs = WorkspacePrefs.load(tmp_path) + assert prefs.mcp_discovery() is None + + +def test_workspace_prefs_mcp_discovery_roundtrip(tmp_path: Path) -> None: + """set_mcp_discovery / mcp_discovery round-trips correctly.""" + prefs = WorkspacePrefs.load(tmp_path) + prefs.set_mcp_discovery({"github": True, "docker": False}) + prefs.save() + + prefs2 = WorkspacePrefs.load(tmp_path) + decisions = prefs2.mcp_discovery() + assert decisions == {"github": True, "docker": False} + + +def test_workspace_prefs_mcp_discovery_merge(tmp_path: Path) -> None: + """Subsequent set_mcp_discovery calls merge into existing decisions.""" + prefs = WorkspacePrefs.load(tmp_path) + prefs.set_mcp_discovery({"github": True}) + prefs.set_mcp_discovery({"memory": True}) + prefs.save() + + prefs2 = WorkspacePrefs.load(tmp_path) + decisions = prefs2.mcp_discovery() + assert decisions == {"github": True, "memory": True} + + +# -- _maybe_suggest_mcp_servers integration tests ---------------------------- + + +def test_maybe_suggest_accepts_servers(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None: + """Accepted suggestions are injected into ctx.settings.mcp_servers.""" + from tests.test_mirror import build_manager, CommandResult + + manager = build_manager(tmp_path, prompt_handler=lambda _msg: True) + ctx = manager.context_for("sample") + manager.ensure_clone(ctx) + + mirror_path = ctx.mirror_path + (mirror_path / ".github").mkdir(exist_ok=True) + + # Set GITHUB_TOKEN so the server is not skipped + monkeypatch.setenv("GITHUB_TOKEN", "test-token") + + manager._maybe_suggest_mcp_servers(ctx, mirror_path) + + assert "github" in ctx.settings.mcp_servers + assert ctx.settings.mcp_servers["github"].env["GITHUB_TOKEN"] == "test-token" + + +def test_maybe_suggest_declines_servers(tmp_path: Path) -> None: + """Declined suggestions are not injected.""" + from tests.test_mirror import build_manager + + manager = build_manager(tmp_path, prompt_handler=lambda _msg: False) + ctx = manager.context_for("sample") + manager.ensure_clone(ctx) + + mirror_path = ctx.mirror_path + (mirror_path / ".github").mkdir(exist_ok=True) + + manager._maybe_suggest_mcp_servers(ctx, mirror_path) + + assert "github" not in ctx.settings.mcp_servers + + +def test_maybe_suggest_skips_missing_env(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None: + """Servers with missing required env vars are skipped even when accepted.""" + from tests.test_mirror import build_manager + + manager = build_manager(tmp_path, prompt_handler=lambda _msg: True) + ctx = manager.context_for("sample") + manager.ensure_clone(ctx) + + mirror_path = ctx.mirror_path + (mirror_path / ".github").mkdir(exist_ok=True) + + # Ensure GITHUB_TOKEN is NOT set + monkeypatch.delenv("GITHUB_TOKEN", raising=False) + + manager._maybe_suggest_mcp_servers(ctx, mirror_path) + + # github should be skipped (missing token), but memory should be added + assert "github" not in ctx.settings.mcp_servers + assert "memory" in ctx.settings.mcp_servers + + +def test_maybe_suggest_remembers_decisions(tmp_path: Path) -> None: + """Decisions are persisted and not re-prompted on second call.""" + from tests.test_mirror import build_manager + + call_count = 0 + + def counting_handler(msg: str) -> bool: + nonlocal call_count + call_count += 1 + return False + + manager = build_manager(tmp_path, prompt_handler=counting_handler) + ctx = manager.context_for("sample") + manager.ensure_clone(ctx) + + mirror_path = ctx.mirror_path + (mirror_path / ".github").mkdir(exist_ok=True) + + manager._maybe_suggest_mcp_servers(ctx, mirror_path) + assert call_count == 1 + + # Second call should not prompt again + manager._maybe_suggest_mcp_servers(ctx, mirror_path) + assert call_count == 1 + + +def test_maybe_suggest_no_prompt_handler(tmp_path: Path) -> None: + """Without a prompt handler, suggestions are silently skipped.""" + from tests.test_mirror import build_manager + + manager = build_manager(tmp_path, prompt_handler=None) + # Explicitly clear the prompt handler since build_manager may set a default + manager._prompt_handler = None + ctx = manager.context_for("sample") + manager.ensure_clone(ctx) + + mirror_path = ctx.mirror_path + (mirror_path / ".github").mkdir(exist_ok=True) + + manager._maybe_suggest_mcp_servers(ctx, mirror_path) + assert "github" not in ctx.settings.mcp_servers