diff --git a/.github/scripts/setup-repo-secrets.sh b/.github/scripts/setup-repo-secrets.sh new file mode 100644 index 000000000..9671964a8 --- /dev/null +++ b/.github/scripts/setup-repo-secrets.sh @@ -0,0 +1,106 @@ +#!/usr/bin/env bash +# --------------------------------------------------------------------------- +# Setup GitHub repo secrets for PR-Agent (DeepSeek) and Qodo Merge workflows. +# +# Reads API keys from local files in ~/Documents/ and pushes them as +# encrypted GitHub Actions secrets via `gh secret set`. +# +# Usage: +# bash .github/scripts/setup-repo-secrets.sh # current repo +# bash .github/scripts/setup-repo-secrets.sh --repo owner/repo +# bash .github/scripts/setup-repo-secrets.sh --dry-run # preview only +# +# Key sources (edit paths below to match your setup): +# ~/Documents/deepseek api key.txt → DEEPSEEK_API_KEY +# ~/Documents/qodo api key.txt → QODO_API_KEY +# +# Prerequisites: +# - gh CLI installed and authenticated (`gh auth status`) +# - Write / admin access to the target repo +# --------------------------------------------------------------------------- +set -euo pipefail + +REPO="" +DRY_RUN=false + +while [[ $# -gt 0 ]]; do + case "$1" in + --repo) REPO="$2"; shift 2 ;; + --dry-run) DRY_RUN=true; shift ;; + *) echo "Unknown: $1"; exit 1 ;; + esac +done + +# --- Config: key file paths ----------------------------------------------- +DEEPSEEK_KEY_FILE="$HOME/Documents/deepseek api key.txt" +QODO_KEY_FILE="$HOME/Documents/qodo api key.txt" + +# --- Checks ---------------------------------------------------------------- +if ! command -v gh &>/dev/null; then + echo "❌ gh CLI not found — install it from https://cli.github.com/" + exit 1 +fi + +if ! gh auth status &>/dev/null; then + echo "❌ gh CLI not authenticated — run 'gh auth login' first." + exit 1 +fi + +GH_ARGS=() +[[ -n "$REPO" ]] && GH_ARGS+=(--repo "$REPO") + +echo "🔍 Target: ${REPO:-$(gh repo view --json nameWithOwner -q .nameWithOwner 2>/dev/null || echo 'current repo')}" + +# --- Read keys ------------------------------------------------------------- +read_key() { + local path="$1" label="$2" + if [[ ! -f "$path" ]]; then + echo "⚠️ $label key file not found at: $path" + return 1 + fi + local val + val="$(tr -d '[:space:]' < "$path")" + if [[ -z "$val" ]]; then + echo "⚠️ $label key file is empty: $path" + return 1 + fi + echo "$val" +} + +echo "" +echo "📂 Reading keys from ~/Documents/ …" + +DEEPSEEK_KEY="$(read_key "$DEEPSEEK_KEY_FILE" "DeepSeek")" || true +QODO_KEY="$(read_key "$QODO_KEY_FILE" "Qodo")" || true + +# --- Set secrets ----------------------------------------------------------- +set_secret() { + local name="$1" value="$2" + if [[ -z "$value" ]]; then + echo " ⏭️ Skipping $name (no value)" + return + fi + if $DRY_RUN; then + echo " 🏁 [DRY-RUN] gh secret set $name ${GH_ARGS[*]}" + else + echo " 🔐 Setting $name …" + echo -n "$value" | gh secret set "$name" "${GH_ARGS[@]}" + echo " ✅ $name set" + fi +} + +echo "" +$DRY_RUN && echo "🏁 DRY RUN — no secrets will be written" || echo "🚀 Setting secrets …" +echo "" + +set_secret "DEEPSEEK_API_KEY" "$DEEPSEEK_KEY" +set_secret "QODO_API_KEY" "$QODO_KEY" + +# --- Summary --------------------------------------------------------------- +echo "" +if $DRY_RUN; then + echo "🏁 Dry run complete. Run without --dry-run to apply." +else + echo "✅ Done! Secrets are now available to GitHub Actions workflows." + echo " Verify: gh secret list ${GH_ARGS[*]}" +fi diff --git a/.github/workflows/pr-agent-review.yml b/.github/workflows/pr-agent-review.yml new file mode 100644 index 000000000..d755db87e --- /dev/null +++ b/.github/workflows/pr-agent-review.yml @@ -0,0 +1,37 @@ +# --------------------------------------------------------------------------- +# PR-Agent review powered by DeepSeek. +# +# Required repo secret: DEEPSEEK_API_KEY +# - Setup: bash .github/scripts/setup-repo-secrets.sh +# - Or set manually at: https://github.com///settings/secrets/actions +# --------------------------------------------------------------------------- +name: pr-agent-review +on: + pull_request: + types: [opened, synchronize, reopened, ready_for_review] + issue_comment: + types: [created] +permissions: + contents: read + pull-requests: write + issues: write +jobs: + pr_agent_job: + name: PR-Agent (DeepSeek) + runs-on: ubuntu-latest + if: ${{ github.event.sender.type != 'Bot' && secrets.DEEPSEEK_API_KEY != '' }} + steps: + - name: PR Agent review + uses: the-pr-agent/pr-agent@main + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + DEEPSEEK_API_KEY: ${{ secrets.DEEPSEEK_API_KEY }} + config.model: "deepseek/deepseek-chat" + config.fallback_models: '["deepseek/deepseek-chat"]' + github_action_config.auto_review: "true" + github_action_config.auto_describe: "true" + github_action_config.auto_improve: "true" + pr_description.publish_labels: "true" + pr_description.publish_description_as: "suggestion" + pr_reviewer.require_score_review: "false" + pr_reviewer.num_code_suggestions: "4" diff --git a/.github/workflows/qodo-merge.yml b/.github/workflows/qodo-merge.yml new file mode 100644 index 000000000..4237eb61e --- /dev/null +++ b/.github/workflows/qodo-merge.yml @@ -0,0 +1,26 @@ +# --------------------------------------------------------------------------- +# Qodo Merge — automated PR reviews powered by Qodo. +# +# Required repo secret: QODO_API_KEY +# - Setup: bash .github/scripts/setup-repo-secrets.sh +# - Or set manually at: https://github.com///settings/secrets/actions +# --------------------------------------------------------------------------- +name: qodo-merge +on: + pull_request: + types: [opened, synchronize, reopened, ready_for_review] +jobs: + qodo-merge: + if: ${{ secrets.QODO_API_KEY != "" }} + runs-on: ubuntu-latest + permissions: + issues: write + pull-requests: write + contents: read + steps: + - name: Qodo Merge Review + uses: qodo-ai/qodo-merge@main + env: + QODO_API_KEY: ${{ secrets.QODO_API_KEY }} + with: + github_token: ${{ secrets.GITHUB_TOKEN }} diff --git a/CMakeLists.txt b/CMakeLists.txt index 311b19e3a..f0cc11f75 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1834,3 +1834,25 @@ if(EXISTS "${_AUTO_TUNE_TEST_SRC}") include(CTest) add_test(NAME AutoTuneTest COMMAND test_auto_tune) endif() + +# Streaming proxy reasoning content normalization: ensures `content` field is +# always present alongside `reasoning_content` in SSE delta chunks. +set(_STREAMING_REASONING_TEST_SRC + "${CMAKE_CURRENT_SOURCE_DIR}/test/cpp/test_streaming_proxy_reasoning_content.cpp" +) +if(EXISTS "${_STREAMING_REASONING_TEST_SRC}") + add_executable(test_streaming_proxy_reasoning_content + test/cpp/test_streaming_proxy_reasoning_content.cpp + ) + target_include_directories(test_streaming_proxy_reasoning_content PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR}/src/cpp/include + ${CMAKE_CURRENT_BINARY_DIR}/include + ) + target_link_libraries(test_streaming_proxy_reasoning_content PRIVATE + lemonade-server-core + nlohmann_json::nlohmann_json + ) + + include(CTest) + add_test(NAME StreamingProxyReasoningContentTest COMMAND test_streaming_proxy_reasoning_content) +endif() diff --git a/src/cpp/cli/lemonade_client.cpp b/src/cpp/cli/lemonade_client.cpp index 517140d14..8ceffc9f2 100644 --- a/src/cpp/cli/lemonade_client.cpp +++ b/src/cpp/cli/lemonade_client.cpp @@ -696,6 +696,15 @@ int LemonadeClient::pull_model(const json& model_data, const std::string& displa if (event_type == "complete") { std::cout << std::endl; state.success = true; + // Check for warnings (e.g. offline mode skipped the download) + try { + auto complete_json = json::parse(event_data); + if (complete_json.contains("warning") && complete_json["warning"].is_string()) { + state.warning_message = complete_json["warning"].get(); + } + } catch (...) { + // Ignore parse errors on the completion event + } } else if (event_type == "error") { try { auto error_json = json::parse(event_data); @@ -730,7 +739,12 @@ int LemonadeClient::pull_model(const json& model_data, const std::string& displa throw std::runtime_error("Model pull failed"); } - std::cout << "Model pulled successfully: " << output_name << std::endl; + if (!state.warning_message.empty()) { + std::cout << "Model pulled with warning: " << output_name << std::endl; + std::cout << " Warning: " << state.warning_message << std::endl; + } else { + std::cout << "Model pulled successfully: " << output_name << std::endl; + } return 0; } catch (const HttpError& e) { std::cerr << "Error pulling model: " << extract_server_error_message(e) << std::endl; diff --git a/src/cpp/include/lemon/model_manager.h b/src/cpp/include/lemon/model_manager.h index 4efc68ae4..424106873 100644 --- a/src/cpp/include/lemon/model_manager.h +++ b/src/cpp/include/lemon/model_manager.h @@ -296,8 +296,10 @@ class ModelManager { // Download from a JSON manifest void download_from_manifest(const json& manifest, std::map& headers, DownloadProgressCallback progress_callback); - // Download from Hugging Face + // Download from Hugging Face. When do_not_upgrade is true and a .completed + // sentinel exists in the snapshot directory, the HF API call is skipped. void download_from_huggingface(const ModelInfo& info, + bool do_not_upgrade = false, DownloadProgressCallback progress_callback = nullptr); // Download from FLM diff --git a/src/cpp/include/lemon/streaming_proxy.h b/src/cpp/include/lemon/streaming_proxy.h index 4d56b702a..070ffbc8e 100644 --- a/src/cpp/include/lemon/streaming_proxy.h +++ b/src/cpp/include/lemon/streaming_proxy.h @@ -51,8 +51,14 @@ class StreamingProxy { std::function on_chunk = nullptr ); + // Normalize streaming chat.completion.chunk SSE deltas for OpenAI API + // compatibility. Applies: + // 1. Injects `role: "assistant"` when null/missing on assistant deltas + // 2. Injects `content: ""` alongside `reasoning_content` when absent + static std::string normalize_chat_completion_chunk(const std::string& sse_chunk); + private: static TelemetryData parse_telemetry(const std::string& buffer); }; -} // namespace lemon +} // namespace lemon \ No newline at end of file diff --git a/src/cpp/include/lemon/system_info.h b/src/cpp/include/lemon/system_info.h index 9b143ae47..fa192d838 100644 --- a/src/cpp/include/lemon/system_info.h +++ b/src/cpp/include/lemon/system_info.h @@ -108,7 +108,14 @@ class SystemInfo { static std::string get_system_llamacpp_version(); // Device support detection + // Return the first (primary) ROCm architecture — typically the iGPU. + // Used for rocm_channel selection and single-arch contexts. static std::string get_rocm_arch(); + + // Return ALL detected AMD GPU ROCm architectures, ordered iGPU first + // then dGPUs. Used by backend download paths to install ROCm binaries + // for every GPU on the system, not just the first one detected. + static std::vector get_rocm_arches(); static std::string get_cuda_arch(); // CUDA release assets are architecture-specific (sm_89, sm_120, etc.). diff --git a/src/cpp/include/lemon_cli/lemonade_client.h b/src/cpp/include/lemon_cli/lemonade_client.h index 58339acbd..9e71d9c10 100644 --- a/src/cpp/include/lemon_cli/lemonade_client.h +++ b/src/cpp/include/lemon_cli/lemonade_client.h @@ -37,6 +37,7 @@ struct StreamingRequestState { bool success = false; std::string error_message; std::string error_code; + std::string warning_message; bool total_size_printed = false; uint64_t last_file_size = 0; std::chrono::steady_clock::time_point file_start_time; diff --git a/src/cpp/server/backend_manager.cpp b/src/cpp/server/backend_manager.cpp index 120b61428..21b8204c8 100644 --- a/src/cpp/server/backend_manager.cpp +++ b/src/cpp/server/backend_manager.cpp @@ -215,11 +215,28 @@ void install_therock_if_needed(const std::string& os, const json& backend_versio return; } - std::string rocm_arch = SystemInfo::get_rocm_arch(); + // Already installed — skip to avoid redundant 3 GB download + if (is_therock_installed_for_current_arch(backend_versions)) { + LOG(DEBUG, "BackendManager") << "TheRock already installed, skipping" << std::endl; + return; + } + + std::vector rocm_arches = SystemInfo::get_rocm_arches(); std::string version = backend_versions["therock"]["version"].get(); - // Install TheRock for this architecture - backends::BackendUtils::install_therock(rocm_arch, version, progress_cb); + if (rocm_arches.empty()) { + // Fall back to single-arch detection for backward compatibility + std::string single_arch = SystemInfo::get_rocm_arch(); + if (!single_arch.empty()) { + backends::BackendUtils::install_therock(single_arch, version, progress_cb); + } + return; + } + + // Install TheRock for each detected GPU architecture + for (const auto& rocm_arch : rocm_arches) { + backends::BackendUtils::install_therock(rocm_arch, version, progress_cb); + } } } // namespace diff --git a/src/cpp/server/model_manager.cpp b/src/cpp/server/model_manager.cpp index 6cd2a483c..8ab453d89 100644 --- a/src/cpp/server/model_manager.cpp +++ b/src/cpp/server/model_manager.cpp @@ -1025,9 +1025,16 @@ std::map ModelManager::discover_extra_models() const { std::map> dirs_with_gguf; // directory -> list of gguf files std::vector standalone_files; // GGUF files not in subdirectories - // Recursively find all .gguf files + // Recursively find all .gguf files. Use error_code to skip inaccessible + // entries (permission denied, broken symlinks, dangling temp files from + // interrupted downloads) instead of throwing. try { - for (const auto& entry : fs::recursive_directory_iterator(search_dir)) { + std::error_code ec; + for (const auto& entry : fs::recursive_directory_iterator(search_dir, fs::directory_options::skip_permission_denied, ec)) { + if (ec) { + ec.clear(); + continue; + } if (!entry.is_regular_file()) continue; std::string filename = entry.path().filename().string(); @@ -1761,18 +1768,27 @@ static bool is_checkpoint_path_complete(const std::string& path_str) { return !safe_exists(path_from_utf8(path_str + ".partial")); } - return !has_partial_files(resolved); + if (has_partial_files(resolved)) return false; + + // Check for .completed sentinel — this is the authoritative marker that a + // download finished successfully. Without it, a crash during download + // (between fs::rename and manifest removal) leaves a corrupt file that is + // indistinguishable from a complete download by other checks alone. + // The sentinel is written after all files are verified in + // download_from_huggingface(). + return safe_exists(marker_dir / ".completed"); } /** * Returns true if all files required by the model recipe are present and complete. * Note: npu_cache is skipped as it is managed lazily by the flm-npu backend. + * Note: draft is skipped as it is an optional MTP checkpoint. */ static bool are_required_checkpoints_complete(const ModelInfo& info) { for (const auto& [type, checkpoint] : info.checkpoints) { (void)checkpoint; - if (type == "npu_cache") continue; + if (type == "npu_cache" || type == "draft") continue; const std::string resolved_path = info.resolved_path(type); if (!is_checkpoint_path_complete(resolved_path)) { @@ -2957,7 +2973,7 @@ void ModelManager::download_registered_model(const ModelInfo& info, bool do_not_ if (info.recipe == "flm") { download_from_flm(info.checkpoint(), do_not_upgrade, progress_callback); } else { - download_from_huggingface(info, progress_callback); + download_from_huggingface(info, do_not_upgrade, progress_callback); } // Update cache after successful download @@ -3118,7 +3134,7 @@ json ModelManager::fetch_collection_manifest(const std::string& repo_id, bool do manifest_info.model_name = repo_id; manifest_info.checkpoints["main"] = repo_id; try { - download_from_huggingface(manifest_info, nullptr); + download_from_huggingface(manifest_info, false, nullptr); manifest = read_cached_collection_manifest(cache_dir); } catch (const std::exception& e) { if (!have_cache) throw; @@ -4107,8 +4123,62 @@ void ModelManager::download_from_manifest(const json& manifest, std::map headers; + const char* hf_token = std::getenv("HF_TOKEN"); + if (hf_token && hf_token[0]) { + headers["Authorization"] = "Bearer " + std::string(hf_token); + } + download_from_manifest(manifest, headers, progress_callback); + if (fs::exists(manifest_path)) { + fs::remove(manifest_path); + } + return; + } + } catch (...) { + // Stale manifest — fall through to fresh download + LOG(WARNING, "ModelManager") << "Failed to resume from manifest, starting fresh download" + << std::endl; + } + } + } + std::string main_variant = checkpoint_to_variant(info.checkpoint("main")); // Get Hugging Face cache directory @@ -4406,6 +4476,18 @@ void ModelManager::download_from_huggingface(const ModelInfo& info, LOG(INFO, "ModelManager") << "Removed download manifest (download complete)" << std::endl; } + // Write .completed sentinel — this is the authoritative marker that a + // download finished successfully. Unlike manifest removal (which leaves a + // window where a crash produces a corrupt file indistinguishable from a + // complete one), the sentinel is only created after all files are verified. + // is_checkpoint_path_complete() checks for this file. + const fs::path completed_path = snapshot_path / ".completed"; + if (!safe_exists(completed_path)) { + std::ofstream completed_file(path_to_utf8(completed_path)); + completed_file << "completed\n"; + completed_file.close(); + } + // Advance refs/main only after a successful pull that actually uses the // latest snapshot for the selected model artifacts. README-only commits keep // refs/main on the previous active snapshot. diff --git a/src/cpp/server/router.cpp b/src/cpp/server/router.cpp index b3ec22c3b..c05059484 100644 --- a/src/cpp/server/router.cpp +++ b/src/cpp/server/router.cpp @@ -486,6 +486,36 @@ void Router::load_model(const std::string& model_name, } } + // Pre-load OOM check: estimate if the model fits in available memory. + // Done AFTER eviction so freed VRAM/RAM is visible. Uses ModelInfo::size + // (file size in GB) as a rough proxy for load-time memory demand. + // When the model far exceeds available memory (more than 2x headroom), + // reject the load with a clear error rather than letting the OS OOM killer + // kill the entire process. Smaller overruns may still succeed due to + // page-mapped quantization, so those are logged as warnings only. + if (model_info.size > 0.0) { + constexpr double SAFETY_MARGIN = 0.9; + double available = get_available_memory_gb( + model_info.device & DEVICE_GPU ? DEVICE_GPU : + model_info.device & DEVICE_NPU ? DEVICE_NPU : + DEVICE_CPU); + double headroom = available * SAFETY_MARGIN; + if (model_info.size > headroom * 2.0) { + is_loading_ = false; + load_cv_.notify_all(); + throw std::runtime_error( + "Model '" + canonical_model_name + "' requires ~" + + std::to_string(model_info.size) + " GB but only " + + std::to_string(headroom) + " GB available. " + "Free memory by unloading other models or reducing ctx_size."); + } else if (model_info.size > headroom) { + LOG(WARNING, "Router") << "Model " << canonical_model_name + << " requires ~" << model_info.size << " GB but only " + << headroom << " GB available (safety margin " << SAFETY_MARGIN + << "). Consider freeing memory or reducing ctx_size." << std::endl; + } + } + // Auto-tune: resolve ctx_size = -1 → computed from memory + arch metadata // Done AFTER eviction so that freed VRAM/RAM is visible to the memory query. int64_t auto_ctx = resolve_auto_ctx_size(effective_options, model_info); diff --git a/src/cpp/server/server.cpp b/src/cpp/server/server.cpp index 10724bbfa..5416da11a 100644 --- a/src/cpp/server/server.cpp +++ b/src/cpp/server/server.cpp @@ -93,10 +93,33 @@ bool should_disable_thinking(const json& request_json) { return false; } -bool strip_handled_thinking_fields(json& request_json) { +bool normalize_thinking_fields(json& request_json) { bool modified = false; - modified = request_json.erase("enable_thinking") > 0 || modified; - modified = request_json.erase("thinking") > 0 || modified; + + // Rename OpenAI's `thinking` (boolean or object) to lemonade's canonical + // `enable_thinking` (boolean) so backends (FLM, llama.cpp, cloud, etc.) + // that understand the flag can act on it instead of having it stripped. + if (request_json.contains("thinking")) { + const auto& thinking = request_json["thinking"]; + bool enable = true; + if (thinking.is_boolean()) { + enable = thinking.get(); + request_json["enable_thinking"] = enable; + } else if (thinking.is_object()) { + std::string type = thinking.value("type", ""); + if (type == "disabled") { + request_json["enable_thinking"] = false; + } else if (type == "enabled") { + request_json["enable_thinking"] = true; + } + } + request_json.erase("thinking"); + modified = true; + } + + // Don't strip enable_thinking — forward to backends that support it. + // The /no_think prefix (injected earlier for llama.cpp) is an additional + // signal; backends like FLM/vLLM use the enable_thinking field directly. return modified; } @@ -2164,7 +2187,7 @@ void Server::handle_chat_completions(const httplib::Request& req, httplib::Respo if (should_disable_thinking(request_json)) { request_modified = prepend_no_think_to_last_user_message(request_json) || request_modified; } - request_modified = strip_handled_thinking_fields(request_json) || request_modified; + request_modified = normalize_thinking_fields(request_json) || request_modified; // If we modified the request (or normalized the model name earlier), serialize to string // The early normalize_client_model_name() call modifies request_json but doesn't set a flag, @@ -2227,6 +2250,15 @@ void Server::handle_chat_completions(const httplib::Request& req, httplib::Respo LOG(DEBUG, "Server") << "Message content: " << message["content"].get().substr(0, 200) << std::endl; } } + + // Ensure content field is present alongside reasoning_content + // Standard OpenAI-compatible clients expect content to always be present + const bool has_reasoning = message.contains("reasoning_content") && + message["reasoning_content"].is_string(); + const bool has_content = message.contains("content") && !message["content"].is_null(); + if (has_reasoning && !has_content) { + message["content"] = ""; + } } } @@ -3674,6 +3706,12 @@ void Server::handle_pull(const httplib::Request& req, httplib::Response& res) { model_manager_->download_model(model_name, request_json, do_not_upgrade); nlohmann::json response = {{"status", "success"}, {"model_name", model_name}}; + // Inform the user when offline mode prevented an actual download + if (auto* cfg = RuntimeConfig::global()) { + if (cfg->offline()) { + response["warning"] = "offline mode is enabled; model was not downloaded"; + } + } res.set_content(response.dump(), "application/json"); } @@ -5080,9 +5118,15 @@ void Server::stream_download_operation( operation(progress_cb); // If operation completed without sending a "complete" event - // (e.g. backend was already installed), send one now + // (e.g. backend was already installed, or offline mode skipped + // the download), send one now with an appropriate warning. if (!complete_sent) { nlohmann::json done_data = {{"status", "ok"}}; + if (auto* cfg = RuntimeConfig::global()) { + if (cfg->offline()) { + done_data["warning"] = "offline mode is enabled; model was not downloaded"; + } + } std::string event = "event: complete\ndata: " + done_data.dump() + "\n\n"; sink.write(event.c_str(), event.size()); } diff --git a/src/cpp/server/streaming_proxy.cpp b/src/cpp/server/streaming_proxy.cpp index abb22fc02..f688a3ed1 100644 --- a/src/cpp/server/streaming_proxy.cpp +++ b/src/cpp/server/streaming_proxy.cpp @@ -3,12 +3,128 @@ #include #include #include +#include +#include +#include #include #include #include namespace lemon { +namespace { + +// Normalize a single `data: {...}` SSE line for chat.completion.chunk objects. +// Applies two fixes for OpenAI API compliance: +// +// 1. Role normalization: some backends emit null or missing `delta.role` on +// content chunks. Injects `"role": "assistant"` when the delta contains +// assistant-type fields (content, reasoning_content, thinking, tool_calls, +// function_call) but role is absent or null. +// +// 2. Content normalization: backends that emit `reasoning_content` often omit +// the standard `content` field entirely. Injects `"content": ""` to prevent +// OpenAI-compatible clients (e.g. @ai-sdk/openai-compatible) from resetting +// the connection when content is expected but absent. +std::string normalize_data_line(const std::string& line) { + const std::string prefix = "data: "; + if (line.rfind(prefix, 0) != 0) { + return line; + } + + // Preserve trailing \r if present (some SSE implementations send \r\n) + std::string suffix; + std::string payload = line.substr(prefix.size()); + if (!payload.empty() && payload.back() == '\r') { + suffix = "\r"; + payload.pop_back(); + } + + if (payload.empty() || payload == "[DONE]") { + return line; + } + + try { + auto chunk = json::parse(payload); + // Only normalize chat.completion.chunk objects — leave text_completion, + // error frames, and other SSE events untouched. + if (!chunk.is_object() || + !chunk.contains("object") || + !chunk["object"].is_string() || + chunk["object"].get() != "chat.completion.chunk" || + !chunk.contains("choices") || + !chunk["choices"].is_array()) { + return line; + } + + bool changed = false; + for (auto& choice : chunk["choices"]) { + if (!choice.is_object() || !choice.contains("delta") || !choice["delta"].is_object()) { + continue; + } + + auto& delta = choice["delta"]; + + // --- Fix 1: Role normalization --- + const bool role_is_null = delta.contains("role") && delta["role"].is_null(); + const bool role_is_missing = !delta.contains("role"); + const bool has_assistant_delta = + delta.contains("content") || + delta.contains("reasoning_content") || + delta.contains("thinking") || + delta.contains("tool_calls") || + delta.contains("function_call"); + + if (role_is_null || (role_is_missing && has_assistant_delta)) { + delta["role"] = "assistant"; + changed = true; + } + + // --- Fix 2: Content normalization --- + // If delta has reasoning_content but content is missing or null, + // inject empty content string for OpenAI compatibility + const bool has_reasoning = delta.contains("reasoning_content") && + delta["reasoning_content"].is_string(); + const bool has_content = delta.contains("content") && !delta["content"].is_null(); + + if (has_reasoning && !has_content) { + delta["content"] = ""; + changed = true; + } + } + + if (!changed) { + return line; + } + + return prefix + chunk.dump() + suffix; + } catch (...) { + // Malformed JSON — pass through unchanged + return line; + } +} + +} // namespace + +std::string StreamingProxy::normalize_chat_completion_chunk(const std::string& sse_chunk) { + std::string output; + size_t pos = 0; + + while (pos < sse_chunk.size()) { + size_t newline = sse_chunk.find('\n', pos); + if (newline == std::string::npos) { + output += normalize_data_line(sse_chunk.substr(pos)); + break; + } + + output += normalize_data_line(sse_chunk.substr(pos, newline - pos)); + output.push_back('\n'); + pos = newline + 1; + } + + return output; +} + void StreamingProxy::forward_sse_stream( const std::string& backend_url, const std::string& request_body, @@ -17,37 +133,94 @@ void StreamingProxy::forward_sse_stream( long timeout_seconds, std::function on_chunk) { + // During long prefill phases (e.g., 15k-token prompts taking minutes), no + // bytes are sent to the client. This can trigger client-side read timeouts + // (e.g. httplib::Client's 300s default) or reverse-proxy idle timeouts. + // Send periodic SSE comment lines (`: keepalive`) to reset I/O timeouts + // on both sides — SSE parsers ignore comment lines. + constexpr auto KEEPALIVE_INTERVAL = std::chrono::seconds(10); + std::string telemetry_buffer; bool stream_error = false; bool has_done_marker = false; - bool has_first_token = false; + std::atomic has_first_token{false}; double time_to_first_token = 0.0; const auto start_time = std::chrono::steady_clock::now(); + // Mutex serialises sink.write() calls from the libcurl callback thread and + // the optional keepalive heartbeat thread below. + std::mutex sink_mutex; + + // Line buffer for SSE normalization: libcurl may deliver an SSE line split + // across multiple write callbacks, so we accumulate partial input and only + // normalize complete lines (terminated by '\n') before forwarding. + std::string line_buffer; + + // Start a keepalive heartbeat thread that sends SSE comment lines every + // KEEPALIVE_INTERVAL while waiting for the first token. The thread stops + // when has_first_token is set or the stream ends (detected via the shared + // flag being polled). This prevents client-side read timeouts during + // extended prefill phases where the backend sends no data for minutes. + std::atomic heartbeat_running{true}; + std::thread heartbeat_thread([&]() { + while (!has_first_token.load() && heartbeat_running.load()) { + std::this_thread::sleep_for(KEEPALIVE_INTERVAL); + if (has_first_token.load() || !heartbeat_running.load()) break; + std::lock_guard lock(sink_mutex); + const char* keepalive = ": keepalive\n\n"; + if (!sink.write(keepalive, strlen(keepalive))) { + // Client disconnected — stop heartbeat + heartbeat_running.store(false); + break; + } + } + }); + auto result = utils::HttpClient::post_stream( backend_url, request_body, - [&sink, &telemetry_buffer, &has_done_marker, &has_first_token, - &time_to_first_token, &start_time, &on_chunk](const char* data, size_t length) { + [&sink, &sink_mutex, &telemetry_buffer, &has_done_marker, &has_first_token, + &time_to_first_token, &start_time, &on_chunk, &line_buffer](const char* data, size_t length) { if (on_chunk) { on_chunk(); } + // Telemetry buffer — raw bytes, pre-normalization telemetry_buffer.append(data, length); std::string chunk(data, length); - if (!has_first_token && chunk.find("data: ") != std::string::npos) { - has_first_token = true; + + // First-token timing — also signals heartbeat to stop + if (!has_first_token.load() && chunk.find("data: ") != std::string::npos) { + has_first_token.store(true); time_to_first_token = std::chrono::duration( std::chrono::steady_clock::now() - start_time).count(); } + // [DONE] marker detection if (chunk.find("data: [DONE]") != std::string::npos) { has_done_marker = true; } - if (!sink.write(data, length)) { - return false; + // Accumulate bytes and flush only complete (newline-terminated) lines + // so normalization can safely parse each `data: {...}` payload. + line_buffer.append(chunk); + std::string output; + size_t pos = 0; + size_t newline; + while ((newline = line_buffer.find('\n', pos)) != std::string::npos) { + output.append( + StreamingProxy::normalize_chat_completion_chunk( + line_buffer.substr(pos, newline - pos + 1))); + pos = newline + 1; + } + line_buffer.erase(0, pos); + + if (!output.empty()) { + std::lock_guard lock(sink_mutex); + if (!sink.write(output.data(), output.size())) { + return false; // Client disconnected + } } return true; @@ -56,6 +229,13 @@ void StreamingProxy::forward_sse_stream( timeout_seconds ); + // Signal heartbeat thread to stop and wait for it. This must happen + // before any post-stream sink operations (flush, [DONE], sink.done()). + heartbeat_running.store(false); + if (heartbeat_thread.joinable()) { + heartbeat_thread.join(); + } + const bool transport_interrupted = result.curl_code == CURLE_PARTIAL_FILE || result.curl_code == CURLE_RECV_ERROR; @@ -76,6 +256,13 @@ void StreamingProxy::forward_sse_stream( } if (!stream_error) { + // Flush any trailing partial line before sending [DONE] + if (!line_buffer.empty()) { + std::string tail = StreamingProxy::normalize_chat_completion_chunk(line_buffer); + sink.write(tail.data(), tail.size()); + line_buffer.clear(); + } + // Ensure [DONE] marker is sent only for clean transports. If the transport // was interrupted before [DONE], the block above throws and recovery is // handled by WrappedServer/Router instead of pretending success. @@ -233,4 +420,4 @@ StreamingProxy::TelemetryData StreamingProxy::parse_telemetry(const std::string& return telemetry; } -} // namespace lemon +} // namespace lemon \ No newline at end of file diff --git a/src/cpp/server/system_info.cpp b/src/cpp/server/system_info.cpp index 056297bad..8c54c2489 100644 --- a/src/cpp/server/system_info.cpp +++ b/src/cpp/server/system_info.cpp @@ -2140,6 +2140,39 @@ std::string SystemInfo::get_rocm_arch() { return ""; // No supported architecture found } +std::vector SystemInfo::get_rocm_arches() { + // Returns ALL detected AMD GPU ROCm architectures. + // Ordered: iGPU first, then dGPUs (same order as the amd_gpu array). + // Used to ensure ROCm backends are downloaded for every GPU on the system. + std::vector arches; + try { + json system_info = SystemInfoCache::get_system_info_with_cache(); + if (!system_info.contains("devices")) { + return arches; + } + const auto& devices = system_info["devices"]; + if (devices.contains("amd_gpu") && devices["amd_gpu"].is_array()) { + for (const auto& gpu : devices["amd_gpu"]) { + if (gpu.value("available", false)) { + std::string name = gpu.value("name", ""); + if (!name.empty()) { + std::string arch = identify_rocm_arch_from_name(name); + if (!arch.empty()) { + // Avoid duplicates (e.g., identical iGPU/dGPU arch strings) + if (std::find(arches.begin(), arches.end(), arch) == arches.end()) { + arches.push_back(arch); + } + } + } + } + } + } + } catch (...) { + // Detection failed — return empty list + } + return arches; +} + static int cuda_sm_value(const std::string& arch) { if (arch.size() <= 3 || arch.substr(0, 3) != "sm_") { return 0; @@ -3960,7 +3993,7 @@ static bool s_hardware_computed = false; static bool s_recipes_computed = false; json SystemInfoCache::get_system_info_with_cache() { - std::lock_guard lock(s_system_info_mutex); + std::unique_lock lock(s_system_info_mutex); // Return fully cached result if both hardware and recipes are computed if (s_hardware_computed && s_recipes_computed) { @@ -4008,18 +4041,30 @@ json SystemInfoCache::get_system_info_with_cache() { } // Compute recipes if not cached (or invalidated) + // Mark s_recipes_computed early and unlock during recipe computation to + // prevent deadlock when build_recipes_info re-enters this function + // (e.g. via get_rocm_arch() → get_system_info_with_cache()). if (!s_recipes_computed) { + s_recipes_computed = true; + lock.unlock(); + try { auto sys_info = create_system_info(); json devices = s_cached_system_info.contains("devices") ? s_cached_system_info["devices"] : json::object(); - s_cached_system_info["recipes"] = sys_info->build_recipes_info(devices); + json recipes = sys_info->build_recipes_info(devices); + + lock.lock(); + s_cached_system_info["recipes"] = recipes; } catch (const std::exception& e) { + lock.lock(); + s_recipes_computed = false; // Allow retry on failure LOG(ERROR, "Server") << "Recipe detection failed: " << e.what() << std::endl; } catch (...) { + lock.lock(); + s_recipes_computed = false; // Allow retry on failure LOG(ERROR, "Server") << "Recipe detection failed with unknown error" << std::endl; } - s_recipes_computed = true; } return s_cached_system_info; diff --git a/test/cpp/test_streaming_proxy_reasoning_content.cpp b/test/cpp/test_streaming_proxy_reasoning_content.cpp new file mode 100644 index 000000000..3c4fc115c --- /dev/null +++ b/test/cpp/test_streaming_proxy_reasoning_content.cpp @@ -0,0 +1,193 @@ +#include "lemon/streaming_proxy.h" +#include +#include +#include +#include +#include + +using json = nlohmann::json; + +static json parse_first_data_json(const std::string& sse) { + const std::string prefix = "data: "; + auto start = sse.find(prefix); + assert(start != std::string::npos); + start += prefix.size(); + auto end = sse.find('\n', start); + assert(end != std::string::npos); + return json::parse(sse.substr(start, end - start)); +} + +// ===== Role normalization tests ===== + +static void test_null_role_is_normalized() { + std::string input = + "data: {\"object\":\"chat.completion.chunk\",\"choices\":[{\"index\":0,\"delta\":{\"content\":\"hi\",\"role\":null},\"finish_reason\":null}]}\n\n"; + + std::string output = lemon::StreamingProxy::normalize_chat_completion_chunk(input); + auto chunk = parse_first_data_json(output); + + assert(chunk["choices"][0]["delta"]["role"] == "assistant"); + assert(chunk["choices"][0]["delta"]["content"] == "hi"); +} + +static void test_missing_role_on_content_delta_is_added() { + std::string input = + "data: {\"object\":\"chat.completion.chunk\",\"choices\":[{\"index\":0,\"delta\":{\"content\":\"hi\"},\"finish_reason\":null}]}\n\n"; + + std::string output = lemon::StreamingProxy::normalize_chat_completion_chunk(input); + auto chunk = parse_first_data_json(output); + + assert(chunk["choices"][0]["delta"]["role"] == "assistant"); +} + +static void test_empty_delta_chunk_role_is_not_mutated() { + // A finish-reason-only delta (no assistant payload) should not get a role added. + std::string input = + "data: {\"object\":\"chat.completion.chunk\",\"choices\":[{\"index\":0,\"delta\":{},\"finish_reason\":\"stop\"}]}\n\n"; + assert(lemon::StreamingProxy::normalize_chat_completion_chunk(input) == input); +} + +static void test_done_marker_and_non_chat_chunks_are_preserved() { + std::string done = "data: [DONE]\n\n"; + assert(lemon::StreamingProxy::normalize_chat_completion_chunk(done) == done); + + std::string completion = + "data: {\"object\":\"text_completion\",\"choices\":[{\"index\":0,\"text\":\"hi\"}]}\n\n"; + assert(lemon::StreamingProxy::normalize_chat_completion_chunk(completion) == completion); +} + +// ===== Reasoning content normalization tests ===== + +static void test_reasoning_content_without_content_gets_empty_content() { + std::string input = + "data: {\"object\":\"chat.completion.chunk\",\"choices\":[{\"index\":0,\"delta\":{\"reasoning_content\":\"Let me think...\"},\"finish_reason\":null}]}\n\n"; + + std::string output = lemon::StreamingProxy::normalize_chat_completion_chunk(input); + auto chunk = parse_first_data_json(output); + + assert(chunk["choices"][0]["delta"]["reasoning_content"] == "Let me think..."); + assert(chunk["choices"][0]["delta"]["content"] == ""); +} + +static void test_reasoning_content_with_null_content_gets_empty_content() { + std::string input = + "data: {\"object\":\"chat.completion.chunk\",\"choices\":[{\"index\":0,\"delta\":{\"reasoning_content\":\"hmm\",\"content\":null},\"finish_reason\":null}]}\n\n"; + + std::string output = lemon::StreamingProxy::normalize_chat_completion_chunk(input); + auto chunk = parse_first_data_json(output); + + assert(chunk["choices"][0]["delta"]["reasoning_content"] == "hmm"); + assert(chunk["choices"][0]["delta"]["content"] == ""); +} + +static void test_normal_content_delta_gets_role_added() { + // Content with no role gets role: assistant injected (role normalization) + std::string input = + "data: {\"object\":\"chat.completion.chunk\",\"choices\":[{\"index\":0,\"delta\":{\"content\":\"Hello\"},\"finish_reason\":null}]}\n\n"; + std::string output = lemon::StreamingProxy::normalize_chat_completion_chunk(input); + auto chunk = parse_first_data_json(output); + assert(chunk["choices"][0]["delta"]["content"] == "Hello"); + assert(chunk["choices"][0]["delta"]["role"] == "assistant"); +} + +static void test_content_and_reasoning_together_is_not_content_mutated() { + // Both content and reasoning_content present — content should not be touched + // Role gets added (role normalization) since none was present + std::string input = + "data: {\"object\":\"chat.completion.chunk\",\"choices\":[{\"index\":0,\"delta\":{\"content\":\"The\",\"reasoning_content\":\"thinking\"},\"finish_reason\":null}]}\n\n"; + auto output = lemon::StreamingProxy::normalize_chat_completion_chunk(input); + auto chunk = parse_first_data_json(output); + assert(chunk["choices"][0]["delta"]["content"] == "The"); + assert(chunk["choices"][0]["delta"]["reasoning_content"] == "thinking"); + assert(chunk["choices"][0]["delta"]["role"] == "assistant"); +} + +// ===== Combined edge cases ===== + +static void test_carriage_return_is_preserved() { + std::string input = + "data: {\"object\":\"chat.completion.chunk\",\"choices\":[{\"index\":0,\"delta\":{\"reasoning_content\":\"hmm\"},\"finish_reason\":null}]}\r\n\r\n"; + std::string output = lemon::StreamingProxy::normalize_chat_completion_chunk(input); + auto chunk = parse_first_data_json(output); + assert(chunk["choices"][0]["delta"]["reasoning_content"] == "hmm"); + assert(chunk["choices"][0]["delta"]["content"] == ""); + assert(output.size() >= 2 && output[output.size() - 2] == '\r'); +} + +static void test_multi_choice_handling() { + std::string input = + "data: {\"object\":\"chat.completion.chunk\",\"choices\":[" + "{\"index\":0,\"delta\":{\"reasoning_content\":\"think\"},\"finish_reason\":null}," + "{\"index\":1,\"delta\":{\"content\":\"Hello\"},\"finish_reason\":null}" + "]}\n\n"; + std::string output = lemon::StreamingProxy::normalize_chat_completion_chunk(input); + auto chunk = parse_first_data_json(output); + // Choice 0: reasoning + content injection + role + assert(chunk["choices"][0]["delta"]["reasoning_content"] == "think"); + assert(chunk["choices"][0]["delta"]["content"] == ""); + assert(chunk["choices"][0]["delta"]["role"] == "assistant"); + // Choice 1: unchanged (already has content) + assert(!chunk["choices"][1]["delta"].contains("reasoning_content")); + assert(chunk["choices"][1]["delta"]["content"] == "Hello"); +} + +static void test_multiple_lines() { + std::string input = + "data: {\"object\":\"chat.completion.chunk\",\"choices\":[{\"index\":0,\"delta\":{\"role\":\"assistant\"},\"finish_reason\":null}]}\n" + "data: {\"object\":\"chat.completion.chunk\",\"choices\":[{\"index\":0,\"delta\":{\"reasoning_content\":\"Let me\"},\"finish_reason\":null}]}\n" + "data: {\"object\":\"chat.completion.chunk\",\"choices\":[{\"index\":0,\"delta\":{\"reasoning_content\":\" think\"},\"finish_reason\":null}]}\n" + "data: {\"object\":\"chat.completion.chunk\",\"choices\":[{\"index\":0,\"delta\":{\"content\":\"The answer is\"},\"finish_reason\":null}]}\n" + "data: [DONE]\n"; + + std::string output = lemon::StreamingProxy::normalize_chat_completion_chunk(input); + + std::istringstream stream(output); + std::string line; + int line_num = 0; + while (std::getline(stream, line)) { + if (line.find("data: ") != 0) continue; + std::string payload = line.substr(6); + if (payload == "[DONE]") break; + + auto chunk = json::parse(payload); + auto& delta = chunk["choices"][0]["delta"]; + + if (line_num == 0) { + // Role delta — unchanged + assert(delta["role"] == "assistant"); + } else if (line_num >= 1 && line_num <= 2) { + // Reasoning deltas — should have content: "" AND role: assistant + assert(delta.contains("reasoning_content")); + assert(delta["content"] == ""); + assert(delta["role"] == "assistant"); + } else if (line_num == 3) { + // Content delta — unchanged + assert(delta["content"] == "The answer is"); + assert(!delta.contains("reasoning_content")); + } + line_num++; + } + assert(line_num >= 4); +} + +int main() { + // Role normalization + test_null_role_is_normalized(); + test_missing_role_on_content_delta_is_added(); + test_empty_delta_chunk_role_is_not_mutated(); + test_done_marker_and_non_chat_chunks_are_preserved(); + + // Reasoning content normalization + test_reasoning_content_without_content_gets_empty_content(); + test_reasoning_content_with_null_content_gets_empty_content(); + test_normal_content_delta_gets_role_added(); + test_content_and_reasoning_together_is_not_content_mutated(); + + // Combined edge cases + test_carriage_return_is_preserved(); + test_multi_choice_handling(); + test_multiple_lines(); + + std::cout << "all streaming proxy normalization tests passed\n"; + return 0; +} \ No newline at end of file