From d6ebaa8bdc5b55824c70b23404f494659533fb6f Mon Sep 17 00:00:00 2001 From: John Ericson Date: Thu, 26 Mar 2026 19:59:17 -0400 Subject: [PATCH] hydra-evaluator: Rewrite in Rust Port hydra-evaluator from C++ to Rust, removing the last C++ executable from the project. The new implementation uses sqlx and tokio, matching the async patterns already established by hydra-queue-runner and hydra-builder. Build/packaging changes: - Remove all C++ dependencies from `subprojects/hydra/meson.build` - Move Rust packaging from `hydra-queue-runner/package.nix` to `subprojects/rust-package.nix` with named outputs (`queue_runner`, `builder`, `evaluator`) - Add `evaluatorExecutable` option to the NixOS web-app module, matching how the queue runner and builder are discovered via their own modules - Clean up stale references in `hydra-tests/meson.build` and `dev-shell.nix` Co-authored-by: Claude --- Cargo.lock | 19 +- Cargo.toml | 2 +- flake.nix | 5 +- nixos-modules/default.nix | 6 +- nixos-modules/web-app.nix | 11 +- packaging/dev-shell.nix | 2 +- subprojects/crates/db/src/lib.rs | 17 + subprojects/hydra-evaluator/Cargo.toml | 22 + subprojects/hydra-evaluator/src/config.rs | 117 ++++ subprojects/hydra-evaluator/src/evaluator.rs | 596 +++++++++++++++++ subprojects/hydra-evaluator/src/main.rs | 68 ++ subprojects/hydra-queue-runner/package.nix | 57 -- subprojects/hydra-tests/meson.build | 7 +- subprojects/hydra-tests/package.nix | 4 + .../hydra/hydra-evaluator/hydra-evaluator.cc | 612 ------------------ subprojects/hydra/hydra-evaluator/meson.build | 9 - subprojects/hydra/meson.build | 18 +- subprojects/hydra/package.nix | 17 - subprojects/rust-package.nix | 78 +++ 19 files changed, 942 insertions(+), 725 deletions(-) create mode 100644 subprojects/hydra-evaluator/Cargo.toml create mode 100644 subprojects/hydra-evaluator/src/config.rs create mode 100644 subprojects/hydra-evaluator/src/evaluator.rs create mode 100644 subprojects/hydra-evaluator/src/main.rs delete mode 100644 subprojects/hydra-queue-runner/package.nix delete mode 100644 subprojects/hydra/hydra-evaluator/hydra-evaluator.cc delete mode 100644 subprojects/hydra/hydra-evaluator/meson.build create mode 100644 subprojects/rust-package.nix diff --git a/Cargo.lock b/Cargo.lock index d970b05c8..a422052a6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1485,6 +1485,21 @@ dependencies = [ "uuid", ] +[[package]] +name = "hydra-evaluator" +version = "0.1.0" +dependencies = [ + "anyhow", + "clap", + "db", + "futures", + "hydra-tracing", + "sqlx", + "tokio", + "tokio-stream", + "tracing", +] + [[package]] name = "hydra-queue-runner" version = "0.1.0" @@ -4379,9 +4394,9 @@ checksum = "7df058c713841ad818f1dc5d3fd88063241cc61f49f5fbea4b951e8cf5a8d71d" [[package]] name = "unicode-segmentation" -version = "1.13.1" +version = "1.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da36089a805484bcccfffe0739803392c8298778a2d2f09febf76fac5ad9025b" +checksum = "9629274872b2bfaf8d66f5f15725007f635594914870f65218920345aa11aa8c" [[package]] name = "unicode-width" diff --git a/Cargo.toml b/Cargo.toml index eea7fb1dd..9dc6edd16 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [workspace] resolver = "2" -members = ["subprojects/hydra-builder", "subprojects/hydra-queue-runner", "subprojects/crates/*"] +members = ["subprojects/hydra-builder", "subprojects/hydra-evaluator", "subprojects/hydra-queue-runner", "subprojects/crates/*"] [workspace.package] version = "0.1.0" diff --git a/flake.nix b/flake.nix index a21db7c33..a6e62aa6a 100644 --- a/flake.nix +++ b/flake.nix @@ -41,9 +41,12 @@ }; hydra-linters = self'.callPackage ./subprojects/hydra-linters/package.nix { }; - hydra-queue-runner = self'.callPackage ./subprojects/hydra-queue-runner/package.nix { + hydra-rust = self'.callPackage ./subprojects/rust-package.nix { inherit nixComponents; }; + hydra-queue-runner = self'.hydra-rust.queue_runner; + hydra-builder = self'.hydra-rust.builder; + hydra-evaluator = self'.hydra-rust.evaluator; }); in rec { diff --git a/nixos-modules/default.nix b/nixos-modules/default.nix index 246cfa93c..3a93010c1 100644 --- a/nixos-modules/default.nix +++ b/nixos-modules/default.nix @@ -6,6 +6,8 @@ rec { imports = [ ./web-app.nix ]; services.hydra-dev.package = lib.mkDefault flakePackages.${pkgs.stdenv.hostPlatform.system}.hydra; + services.hydra-dev.evaluatorExecutable = + lib.mkDefault "${flakePackages.${pkgs.stdenv.hostPlatform.system}.hydra-evaluator}/bin/hydra-evaluator"; }; postgresql = ./postgresql.nix; @@ -21,14 +23,14 @@ rec { _file = ./default.nix; imports = [ ./linux-builder-module.nix ]; services.hydra-queue-builder-dev.package = - lib.mkDefault flakePackages.${pkgs.stdenv.hostPlatform.system}.hydra-queue-runner; + lib.mkDefault flakePackages.${pkgs.stdenv.hostPlatform.system}.hydra-builder; }; darwin-builder = { pkgs, lib, ... }: { _file = ./default.nix; imports = [ ./darwin-builder-module.nix ]; services.hydra-queue-builder-dev.package = - lib.mkDefault flakePackages.${pkgs.stdenv.hostPlatform.system}.hydra-queue-runner; + lib.mkDefault flakePackages.${pkgs.stdenv.hostPlatform.system}.hydra-builder; }; hydra = { ... }: { diff --git a/nixos-modules/web-app.nix b/nixos-modules/web-app.nix index 3ef26c544..7a8750d48 100644 --- a/nixos-modules/web-app.nix +++ b/nixos-modules/web-app.nix @@ -72,6 +72,11 @@ in description = "The Hydra package."; }; + evaluatorExecutable = mkOption { + type = types.path; + description = "Path to the hydra-evaluator executable."; + }; + hydraURL = mkOption { type = types.str; description = '' @@ -261,13 +266,13 @@ in requires = [ "hydra-init.service" ]; restartTriggers = [ hydraConf ]; after = [ "hydra-init.service" "network.target" ]; - path = with pkgs; [ hostname-debian cfg.package ]; + path = with pkgs; [ hostname-debian ]; environment = env // { HYDRA_DBI = "${env.HYDRA_DBI};application_name=hydra-evaluator"; }; serviceConfig = - { ExecStart = "@${cfg.package}/bin/hydra-evaluator hydra-evaluator"; - ExecStopPost = "${cfg.package}/bin/hydra-evaluator --unlock"; + { ExecStart = "@${cfg.evaluatorExecutable} hydra-evaluator"; + ExecStopPost = "${cfg.evaluatorExecutable} --unlock"; User = "hydra"; Restart = "always"; WorkingDirectory = baseDir; diff --git a/packaging/dev-shell.nix b/packaging/dev-shell.nix index a24119e64..8d588d509 100644 --- a/packaging/dev-shell.nix +++ b/packaging/dev-shell.nix @@ -47,7 +47,7 @@ hydra.overrideAttrs (finalAttrs: prevAttrs: { shellHook = '' pushd $(git rev-parse --show-toplevel) >/dev/null - PATH=$(pwd)/build/subprojects/hydra/hydra-evaluator:$(pwd)/subprojects/hydra/script:$PATH + PATH=$(pwd)/subprojects/hydra/script:$PATH PERL5LIB=$(pwd)/subprojects/hydra/lib:$PERL5LIB export HYDRA_HOME="$(pwd)/subprojects/hydra/" mkdir -p .hydra-data diff --git a/subprojects/crates/db/src/lib.rs b/subprojects/crates/db/src/lib.rs index 25c8ebaf2..e9c59f798 100644 --- a/subprojects/crates/db/src/lib.rs +++ b/subprojects/crates/db/src/lib.rs @@ -36,6 +36,23 @@ impl Database { }) } + pub async fn new_with_options( + options: sqlx::postgres::PgConnectOptions, + max_connections: u32, + ) -> Result { + Ok(Self { + pool: sqlx::postgres::PgPoolOptions::new() + .max_connections(max_connections) + .connect_with(options) + .await?, + }) + } + + #[must_use] + pub fn pool(&self) -> &sqlx::PgPool { + &self.pool + } + pub async fn get(&self) -> Result { let conn = self.pool.acquire().await?; Ok(Connection::new(conn)) diff --git a/subprojects/hydra-evaluator/Cargo.toml b/subprojects/hydra-evaluator/Cargo.toml new file mode 100644 index 000000000..0dd89a488 --- /dev/null +++ b/subprojects/hydra-evaluator/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "hydra-evaluator" +version.workspace = true +edition = "2024" +license = "GPL-3.0" +rust-version.workspace = true + +[dependencies] +tracing.workspace = true +anyhow.workspace = true +clap = { workspace = true, features = ["derive"] } +tokio = { workspace = true, features = ["full"] } +tokio-stream.workspace = true +futures.workspace = true +sqlx = { workspace = true, features = [ + "runtime-tokio", + "tls-rustls-ring-webpki", + "postgres", +] } + +db = { path = "../crates/db" } +hydra-tracing = { path = "../crates/tracing" } diff --git a/subprojects/hydra-evaluator/src/config.rs b/subprojects/hydra-evaluator/src/config.rs new file mode 100644 index 000000000..6aac2a6ea --- /dev/null +++ b/subprojects/hydra-evaluator/src/config.rs @@ -0,0 +1,117 @@ +use std::collections::HashMap; + +use anyhow::{Context as _, bail}; +use sqlx::postgres::PgConnectOptions; + +#[derive(Debug)] +pub(crate) struct HydraConfig { + options: HashMap, +} + +impl HydraConfig { + pub(crate) fn load() -> Self { + let mut options = HashMap::new(); + + let path = match std::env::var("HYDRA_CONFIG") { + Ok(p) if !p.is_empty() => p, + _ => return Self { options }, + }; + + let contents = match std::fs::read_to_string(&path) { + Ok(c) => c, + Err(e) => { + tracing::warn!("could not read HYDRA_CONFIG at {path}: {e}"); + return Self { options }; + } + }; + + for line in contents.lines() { + // Strip comments + let line = match line.find('#') { + Some(pos) => &line[..pos], + None => line, + }; + let line = line.trim(); + + let Some(eq) = line.find('=') else { + continue; + }; + + let key = line[..eq].trim(); + let value = line[eq + 1..].trim(); + + if key.is_empty() { + continue; + } + + options.insert(key.to_owned(), value.to_owned()); + } + + Self { options } + } + + pub(crate) fn get_int(&self, key: &str, default: u64) -> u64 { + self.options + .get(key) + .and_then(|v| v.parse().ok()) + .unwrap_or(default) + } +} + +/// Parse a `HYDRA_DBI` environment variable into `PgConnectOptions`. +/// +/// Accepts strings like `dbi:Pg:dbname=hydra;host=localhost;port=5432`. +pub(crate) fn parse_hydra_dbi() -> anyhow::Result { + let dbi = std::env::var("HYDRA_DBI").unwrap_or_else(|_| "dbi:Pg:dbname=hydra;".to_owned()); + parse_dbi(&dbi) +} + +fn parse_dbi(dbi: &str) -> anyhow::Result { + let params = dbi + .strip_prefix("dbi:Pg:") + .or_else(|| dbi.strip_prefix("DBI:Pg:")) + .context("$HYDRA_DBI does not denote a PostgreSQL database")?; + + let mut opts = PgConnectOptions::new(); + + for pair in params.split(';').filter(|s| !s.is_empty()) { + let (key, value) = pair + .split_once('=') + .with_context(|| format!("invalid DBI parameter: {pair}"))?; + match key.trim() { + "dbname" => opts = opts.database(value.trim()), + "host" => opts = opts.host(value.trim()), + "port" => { + opts = opts.port( + value + .trim() + .parse() + .with_context(|| format!("invalid port: {value}"))?, + ); + } + "user" => opts = opts.username(value.trim()), + "password" => opts = opts.password(value.trim()), + "application_name" => opts = opts.application_name(value.trim()), + other => { + bail!("unknown DBI parameter: {other}"); + } + } + } + + Ok(opts) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parse_simple_dbi() { + parse_dbi("dbi:Pg:dbname=hydra;host=localhost;port=5432").unwrap(); + } + + #[test] + fn parse_dbi_uppercase() { + parse_dbi("DBI:Pg:dbname=testdb;").unwrap(); + } +} diff --git a/subprojects/hydra-evaluator/src/evaluator.rs b/subprojects/hydra-evaluator/src/evaluator.rs new file mode 100644 index 000000000..e57323d26 --- /dev/null +++ b/subprojects/hydra-evaluator/src/evaluator.rs @@ -0,0 +1,596 @@ +use std::collections::{BTreeMap, HashSet}; +use std::sync::Arc; +use std::time::Duration; + +use anyhow::Context as _; +use futures::StreamExt as _; +use tokio::process::Command; +use tokio::sync::{Mutex, Notify}; + +use crate::config::HydraConfig; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[repr(i32)] +enum EvaluationStyle { + Schedule = 1, + Oneshot = 2, + OneAtATime = 3, +} + +impl EvaluationStyle { + fn from_i32(v: i32) -> Option { + match v { + 1 => Some(Self::Schedule), + 2 => Some(Self::Oneshot), + 3 => Some(Self::OneAtATime), + _ => None, + } + } +} + +#[derive(Debug)] +struct Jobset { + id: i32, + project: String, + name: String, + evaluation_style: Option, + last_checked_time: i64, + trigger_time: Option, + check_interval: i64, +} + +impl Jobset { + fn display(&self) -> String { + format!("{}:{} (jobset#{})", self.project, self.name, self.id) + } +} + +#[derive(Debug, Default)] +struct State { + running_evals: usize, + running_ids: HashSet, + jobsets: BTreeMap, +} + +pub(crate) struct Evaluator { + db: db::Database, + max_evals: usize, + eval_one: Option<(String, String)>, + state: Arc>, + notify_work: Arc, +} + +impl Evaluator { + pub(crate) fn new( + db: db::Database, + config: &HydraConfig, + eval_one: Option<(String, String)>, + ) -> Self { + let max_evals = config.get_int("max_concurrent_evals", 4).max(1) as usize; + Self { + db, + max_evals, + eval_one, + state: Arc::new(Mutex::new(State::default())), + notify_work: Arc::new(Notify::new()), + } + } + + pub(crate) async fn run(self) -> anyhow::Result<()> { + self.unlock().await?; + + let this = Arc::new(self); + + let monitor = { + let this = Arc::clone(&this); + tokio::spawn(async move { + this.db_monitor_task().await; + }) + }; + + let main_loop = { + let this = Arc::clone(&this); + tokio::spawn(async move { + this.main_loop_task().await; + }) + }; + + let mut sigint = + tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt())?; + let mut sigterm = + tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())?; + + tokio::select! { + _ = sigint.recv() => { + tracing::info!("received SIGINT, exiting"); + std::process::exit(1); + } + _ = sigterm.recv() => { + tracing::info!("received SIGTERM, exiting"); + std::process::exit(1); + } + _ = monitor => { + tracing::error!("database monitor exited unexpectedly"); + std::process::exit(1); + } + _ = main_loop => { + tracing::error!("main loop exited unexpectedly"); + std::process::exit(1); + } + } + } + + async fn main_loop_task(&self) { + loop { + if let Err(e) = self.main_loop_iteration().await { + tracing::error!("exception in main loop: {e:#}"); + tokio::time::sleep(Duration::from_secs(30)).await; + } + } + } + + async fn main_loop_iteration(&self) -> anyhow::Result<()> { + loop { + let sleep_duration = { + let state = self.state.lock().await; + self.compute_sleep_duration(&state) + }; + + tracing::debug!("waiting for {} s", sleep_duration.as_secs()); + + if sleep_duration == Duration::MAX { + self.notify_work.notified().await; + } else { + tokio::select! { + () = tokio::time::sleep(sleep_duration) => {} + () = self.notify_work.notified() => {} + } + } + + let mut state = self.state.lock().await; + self.start_evals(&mut state).await?; + } + } + + fn compute_sleep_duration(&self, state: &State) -> Duration { + if state.running_evals >= self.max_evals { + return Duration::MAX; + } + + let now = now_epoch(); + let mut sleep_secs = i64::MAX; + + for jobset in state.jobsets.values() { + if state.running_ids.contains(&jobset.id) { + continue; + } + if jobset.check_interval > 0 { + let next = jobset.last_checked_time + jobset.check_interval - now; + sleep_secs = sleep_secs.min(next.max(1)); + } + } + + if sleep_secs == i64::MAX { + Duration::MAX + } else { + Duration::from_secs(sleep_secs as u64) + } + } + + async fn db_monitor_task(&self) { + loop { + if let Err(e) = self.db_monitor_loop().await { + tracing::error!("exception in database monitor: {e:#}"); + if is_broken_connection(&e) { + tracing::error!("database connection broken, exiting"); + std::process::exit(1); + } + tokio::time::sleep(Duration::from_secs(30)).await; + } + } + } + + async fn db_monitor_loop(&self) -> anyhow::Result<()> { + let mut stream = self + .db + .listener(vec![ + "jobsets_added", + "jobsets_deleted", + "jobset_scheduling_changed", + ]) + .await?; + + // Read initial state before waiting for notifications + self.read_jobsets().await?; + self.notify_work.notify_one(); + + loop { + match stream.next().await { + Some(Ok(_notification)) => { + tracing::info!("received jobset event"); + self.read_jobsets().await?; + self.notify_work.notify_one(); + } + Some(Err(e)) => { + return Err(e.into()); + } + None => { + anyhow::bail!("notification stream ended"); + } + } + } + } + + async fn read_jobsets(&self) -> anyhow::Result<()> { + let rows = sqlx::query_as::<_, JobsetRow>( + "SELECT j.id, project, j.name, \ + lastCheckedTime, triggerTime, checkInterval, \ + j.enabled as jobset_enabled \ + FROM Jobsets j \ + JOIN Projects p ON j.project = p.name \ + WHERE j.enabled != 0 AND p.enabled != 0", + ) + .fetch_all(self.db.pool()) + .await?; + + let mut state = self.state.lock().await; + let mut seen = HashSet::new(); + + for row in &rows { + if let Some((ref proj, ref js)) = self.eval_one { + if row.project != *proj || row.name != *js { + continue; + } + } + + seen.insert(row.id); + + let evaluation_style = EvaluationStyle::from_i32(row.jobset_enabled); + + let jobset = state.jobsets.entry(row.id).or_insert_with(|| Jobset { + id: row.id, + project: row.project.clone(), + name: row.name.clone(), + evaluation_style: None, + last_checked_time: 0, + trigger_time: None, + check_interval: 0, + }); + + jobset.project.clone_from(&row.project); + jobset.name.clone_from(&row.name); + jobset.last_checked_time = row.lastcheckedtime.unwrap_or(0); + jobset.trigger_time = row.triggertime; + jobset.check_interval = row.checkinterval.into(); + jobset.evaluation_style = evaluation_style; + } + + if self.eval_one.is_some() && seen.is_empty() { + tracing::error!("the specified jobset does not exist or is disabled"); + std::process::exit(1); + } + + state.jobsets.retain(|id, jobset| { + if seen.contains(id) { + true + } else { + tracing::info!("forgetting jobset '{}'", jobset.display()); + false + } + }); + + Ok(()) + } + + async fn should_evaluate(&self, jobset: &Jobset, state: &State) -> bool { + if state.running_ids.contains(&jobset.id) { + tracing::debug!( + "shouldEvaluate {}? no: already running", + jobset.display() + ); + return false; + } + + if jobset.trigger_time.is_some() { + tracing::debug!( + "shouldEvaluate {}? yes: requested", + jobset.display() + ); + return true; + } + + if jobset.check_interval <= 0 { + tracing::debug!( + "shouldEvaluate {}? no: checkInterval <= 0", + jobset.display() + ); + return false; + } + + let now = now_epoch(); + if jobset.last_checked_time + jobset.check_interval <= now { + if jobset.evaluation_style == Some(EvaluationStyle::OneAtATime) { + return self.should_evaluate_one_at_a_time(jobset).await; + } + tracing::debug!( + "shouldEvaluate(oneshot/scheduled) {}? yes: checkInterval elapsed", + jobset.display() + ); + return true; + } + + false + } + + async fn should_evaluate_one_at_a_time(&self, jobset: &Jobset) -> bool { + let eval_id: Option = match sqlx::query_scalar( + "SELECT id FROM JobsetEvals WHERE jobset_id = $1 ORDER BY id DESC LIMIT 1", + ) + .bind(jobset.id) + .fetch_optional(self.db.pool()) + .await + { + Ok(v) => v, + Err(e) => { + tracing::error!( + "error checking one-at-a-time for {}: {e}", + jobset.display() + ); + return false; + } + }; + + let Some(eval_id) = eval_id else { + tracing::debug!( + "shouldEvaluate(one-at-a-time) {}? yes: no prior eval", + jobset.display() + ); + return true; + }; + + let unfinished: Option = match sqlx::query_scalar( + "SELECT id FROM Builds \ + JOIN JobsetEvalMembers ON (JobsetEvalMembers.build = Builds.id) \ + WHERE JobsetEvalMembers.eval = $1 AND builds.finished = 0 \ + LIMIT 1", + ) + .bind(eval_id) + .fetch_optional(self.db.pool()) + .await + { + Ok(v) => v, + Err(e) => { + tracing::error!( + "error checking unfinished builds for {}: {e}", + jobset.display() + ); + return false; + } + }; + + if unfinished.is_none() { + tracing::debug!( + "shouldEvaluate(one-at-a-time) {}? yes: no unfinished builds", + jobset.display() + ); + true + } else { + tracing::debug!( + "shouldEvaluate(one-at-a-time) {}? no: at least one unfinished build", + jobset.display() + ); + false + } + } + + async fn start_evals(&self, state: &mut State) -> anyhow::Result<()> { + // Collect eligible jobset IDs with their sort keys + let mut candidates: Vec<(i32, Option, i64)> = Vec::new(); + + for jobset in state.jobsets.values() { + if self.eval_one.is_some() || (jobset.evaluation_style.is_some() && self.should_evaluate(jobset, state).await) { + candidates.push(( + jobset.id, + jobset.trigger_time, + jobset.last_checked_time, + )); + } + } + + // Sort by (trigger_time, last_checked_time, id) + // None trigger_time sorts after Some (not triggered = lowest priority) + candidates.sort_by(|a, b| { + let ta = a.1.unwrap_or(i64::MAX); + let tb = b.1.unwrap_or(i64::MAX); + ta.cmp(&tb) + .then(a.2.cmp(&b.2)) + .then(a.0.cmp(&b.0)) + }); + + for (jobset_id, _, _) in candidates { + if state.running_evals >= self.max_evals { + break; + } + // Re-borrow the jobset from state + if let Some(jobset) = state.jobsets.get(&jobset_id) { + self.start_eval(state, jobset_id, jobset.project.clone(), jobset.name.clone()) + .await?; + } + } + + Ok(()) + } + + async fn start_eval( + &self, + state: &mut State, + jobset_id: i32, + project: String, + jobset_name: String, + ) -> anyhow::Result<()> { + let now = now_epoch(); + let jobset_display = format!("{project}:{jobset_name} (jobset#{jobset_id})"); + + let last_checked = state + .jobsets + .get(&jobset_id) + .map_or(0, |j| j.last_checked_time); + + tracing::info!( + "starting evaluation of jobset '{}' (last checked {} s ago)", + jobset_display, + now - last_checked, + ); + + sqlx::query("UPDATE Jobsets SET startTime = $1 WHERE id = $2") + .bind(now as i32) + .bind(jobset_id) + .execute(self.db.pool()) + .await + .context("failed to set startTime")?; + + let child = Command::new("hydra-eval-jobset") + .arg(&project) + .arg(&jobset_name) + .spawn() + .with_context(|| format!("failed to spawn hydra-eval-jobset for {jobset_display}"))?; + + state.running_evals += 1; + state.running_ids.insert(jobset_id); + + let eval_one = self.eval_one.is_some(); + let db = self.db.clone(); + let state_arc = Arc::clone(&self.state); + let notify_work = Arc::clone(&self.notify_work); + + tokio::spawn(async move { + reap_child(child, jobset_id, jobset_display, eval_one, db, state_arc, notify_work).await; + }); + + Ok(()) + } + + async fn unlock(&self) -> anyhow::Result<()> { + sqlx::query("UPDATE Jobsets SET startTime = null") + .execute(self.db.pool()) + .await + .context("failed to unlock jobsets")?; + Ok(()) + } +} + +async fn reap_child( + mut child: tokio::process::Child, + jobset_id: i32, + jobset_display: String, + eval_one: bool, + db: db::Database, + state: Arc>, + notify_work: Arc, +) { + let status = child.wait().await; + + let (exit_ok, status_str) = match &status { + Ok(s) => { + let code = s.code(); + let ok = code == Some(0) || code == Some(1); + (ok, format!("{s}")) + } + Err(e) => (false, format!("error: {e}")), + }; + + tracing::info!("evaluation of jobset '{}' {}", jobset_display, status_str); + + let now = now_epoch(); + + { + let mut st = state.lock().await; + if st.running_evals > 0 { + st.running_evals -= 1; + } + st.running_ids.remove(&jobset_id); + + if let Some(jobset) = st.jobsets.get_mut(&jobset_id) { + jobset.trigger_time = None; + jobset.last_checked_time = now; + } + } + + if let Err(e) = update_db_after_eval(&db, jobset_id, exit_ok, &status_str, now).await { + tracing::error!("exception setting jobset error: {e:#}"); + } + + notify_work.notify_one(); + + if eval_one { + std::process::exit(0); + } +} + +async fn update_db_after_eval( + db: &db::Database, + jobset_id: i32, + exit_ok: bool, + status_str: &str, + now: i64, +) -> anyhow::Result<()> { + let pool = db.pool(); + + // Clear trigger time to prevent stuck eval loop + sqlx::query( + "UPDATE Jobsets SET triggerTime = null \ + WHERE id = $1 AND startTime IS NOT NULL AND triggerTime <= startTime", + ) + .bind(jobset_id) + .execute(pool) + .await?; + + // Clear start time + sqlx::query("UPDATE Jobsets SET startTime = null WHERE id = $1") + .bind(jobset_id) + .execute(pool) + .await?; + + if !exit_ok { + sqlx::query( + "UPDATE Jobsets SET errorMsg = $1, lastCheckedTime = $2, \ + errorTime = $2, fetchErrorMsg = null WHERE id = $3", + ) + .bind(format!("evaluation {status_str}")) + .bind(now as i32) + .bind(jobset_id) + .execute(pool) + .await?; + } + + Ok(()) +} + +fn now_epoch() -> i64 { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map_or(0, |d| d.as_secs() as i64) +} + +fn is_broken_connection(e: &anyhow::Error) -> bool { + for cause in e.chain() { + if let Some(sqlx_err) = cause.downcast_ref::() { + if matches!(sqlx_err, sqlx::Error::Io(_) | sqlx::Error::PoolClosed) { + return true; + } + } + } + false +} + +#[derive(sqlx::FromRow)] +#[allow(non_snake_case)] +struct JobsetRow { + id: i32, + project: String, + name: String, + lastcheckedtime: Option, + triggertime: Option, + checkinterval: i32, + jobset_enabled: i32, +} diff --git a/subprojects/hydra-evaluator/src/main.rs b/subprojects/hydra-evaluator/src/main.rs new file mode 100644 index 000000000..bfa9d10ae --- /dev/null +++ b/subprojects/hydra-evaluator/src/main.rs @@ -0,0 +1,68 @@ +#![forbid(unsafe_code)] +#![deny( + clippy::all, + clippy::pedantic, + clippy::expect_used, + clippy::unwrap_used, + future_incompatible, + missing_debug_implementations, + nonstandard_style, + unreachable_pub, + missing_copy_implementations, + unused_qualifications +)] +#![allow(clippy::missing_errors_doc)] + +mod config; +mod evaluator; + +use anyhow::Context as _; +use clap::Parser; + +use config::{HydraConfig, parse_hydra_dbi}; +use evaluator::Evaluator; + +#[derive(clap::Parser, Debug)] +#[command(name = "hydra-evaluator", about = "Hydra jobset evaluation scheduler")] +struct Cli { + /// Clear startTime on all jobsets and exit + #[arg(long)] + unlock: bool, + + /// Project name (requires jobset) + project: Option, + + /// Jobset name (requires project) + jobset: Option, +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let _tracing_guard = hydra_tracing::init()?; + let cli = Cli::parse(); + + let db_opts = parse_hydra_dbi()?; + let db = db::Database::new_with_options(db_opts, 4) + .await + .context("failed to connect to database")?; + + if cli.unlock { + sqlx::query("UPDATE Jobsets SET startTime = null") + .execute(db.pool()) + .await + .context("failed to unlock jobsets")?; + tracing::info!("unlocked all jobsets"); + return Ok(()); + } + + let config = HydraConfig::load(); + + let eval_one = match (cli.project, cli.jobset) { + (Some(p), Some(j)) => Some((p, j)), + (None, None) => None, + _ => anyhow::bail!("Syntax: hydra-evaluator [ ]"), + }; + + let evaluator = Evaluator::new(db, &config, eval_one); + evaluator.run().await +} diff --git a/subprojects/hydra-queue-runner/package.nix b/subprojects/hydra-queue-runner/package.nix deleted file mode 100644 index 0e699e483..000000000 --- a/subprojects/hydra-queue-runner/package.nix +++ /dev/null @@ -1,57 +0,0 @@ -{ lib -, version - -, rustPlatform - -, nixComponents -, protobuf -, pkg-config -, rust-jemalloc-sys -}: - -rustPlatform.buildRustPackage { - pname = "hydra-queue-runner"; - inherit version; - - src = lib.fileset.toSource { - root = ../..; - fileset = lib.fileset.unions [ - ../../Cargo.toml - ../../Cargo.lock - ../../.cargo - ../../.sqlx - ../../subprojects/hydra-queue-runner/Cargo.toml - ../../subprojects/hydra-queue-runner/build.rs - ../../subprojects/hydra-queue-runner/src - ../../subprojects/hydra-queue-runner/examples - ../../subprojects/hydra-builder/Cargo.toml - ../../subprojects/hydra-builder/build.rs - ../../subprojects/hydra-builder/src - ../../subprojects/crates - ../../subprojects/proto - ]; - }; - - cargoLock = { - lockFile = ../../Cargo.lock; - outputHashes = { - "harmonia-store-core-0.0.0-alpha.0" = "sha256-FDL2xxAFOYw21VhGYake2fFC9S7jK5kBSM4OfU12VmQ="; - }; - }; - - nativeBuildInputs = [ - pkg-config - protobuf - ]; - - buildInputs = [ - nixComponents.nix-main - protobuf - rust-jemalloc-sys - ]; - - # FIXME: get these passing in a prod build - doCheck = false; - - meta.description = "Hydra queue runner and builder (Rust)"; -} diff --git a/subprojects/hydra-tests/meson.build b/subprojects/hydra-tests/meson.build index f3cd18a3a..fc480d245 100644 --- a/subprojects/hydra-tests/meson.build +++ b/subprojects/hydra-tests/meson.build @@ -7,12 +7,10 @@ fs = import('fs') perl = find_program('perl', native: true) nix = find_program('nix') -hydra_evaluator = find_program('hydra-evaluator') curl = find_program('curl') exe_path = [ fs.parent(nix.full_path()), - fs.parent(hydra_evaluator.full_path()), fs.parent(curl.full_path()), meson.current_source_dir() / 'scripts', ] @@ -33,7 +31,8 @@ if meson.is_subproject() exe_path += hydra_source_root / 'target' / cargo_profile else hydra_queue_runner = find_program('hydra-queue-runner') - hydra_queue_runner = find_program('hydra-builder') + hydra_builder = find_program('hydra-builder') + hydra_evaluator = find_program('hydra-evaluator') hydra_init = find_program('hydra-init') hydra_notify = find_program('hydra-notify') @@ -41,6 +40,8 @@ else hydra_perl5lib = hydra_home / 'lib' exe_path += fs.parent(hydra_queue_runner.full_path()) + exe_path += fs.parent(hydra_builder.full_path()) + exe_path += fs.parent(hydra_evaluator.full_path()) exe_path += fs.parent(hydra_init.full_path()) exe_path += fs.parent(hydra_notify.full_path()) endif diff --git a/subprojects/hydra-tests/package.nix b/subprojects/hydra-tests/package.nix index e066b54cb..909e59715 100644 --- a/subprojects/hydra-tests/package.nix +++ b/subprojects/hydra-tests/package.nix @@ -4,6 +4,8 @@ , hydra , hydra-queue-runner +, hydra-builder +, hydra-evaluator , meson , ninja @@ -59,6 +61,8 @@ stdenv.mkDerivation (finalAttrs: { ninja hydra hydra-queue-runner + hydra-builder + hydra-evaluator hydra.perlDeps perl nixComponents.nix-cli diff --git a/subprojects/hydra/hydra-evaluator/hydra-evaluator.cc b/subprojects/hydra/hydra-evaluator/hydra-evaluator.cc deleted file mode 100644 index e1d55165a..000000000 --- a/subprojects/hydra/hydra-evaluator/hydra-evaluator.cc +++ /dev/null @@ -1,612 +0,0 @@ -#include - -#include -#include -#include -#include -#include -#include - -#include -#include -#include -#include - -#include -#include - -using namespace nix; - -struct Connection : pqxx::connection -{ - Connection() : pqxx::connection(getFlags()) { }; - - std::string getFlags() - { - auto s = getEnv("HYDRA_DBI").value_or("dbi:Pg:dbname=hydra;"); - - std::string lower_prefix = "dbi:Pg:"; - std::string upper_prefix = "DBI:Pg:"; - - if (hasPrefix(s, lower_prefix) || hasPrefix(s, upper_prefix)) { - return concatStringsSep(" ", tokenizeString(std::string(s, lower_prefix.size()), ";")); - } - - throw Error("$HYDRA_DBI does not denote a PostgreSQL database"); - } -}; - - -class receiver -{ - std::optional status; - pqxx::connection & conn; - -public: - - receiver(pqxx::connection_base & c, const std::string & channel) - : conn(static_cast(c)) - { - conn.listen(channel, [this](pqxx::notification n) { - status = std::string(n.payload); - }); - } - - std::optional get() { - auto s = status; - status = std::nullopt; - return s; - } -}; - - -struct HydraConfig -{ - std::map options; - - HydraConfig() - { - /* Read hydra.conf. */ - auto hydraConfigFile = getEnv("HYDRA_CONFIG"); - if (hydraConfigFile && pathExists(*hydraConfigFile)) { - - for (auto line : tokenizeString(readFile(*hydraConfigFile), "\n")) { - line = trim(std::string(line, 0, line.find('#'))); - - auto eq = line.find('='); - if (eq == std::string::npos) continue; - - auto key = trim(std::string(line, 0, eq)); - auto value = trim(std::string(line, eq + 1)); - - if (key == "") continue; - - options[key] = value; - } - } - } - - std::string getStrOption(const std::string & key, const std::string & def = "") - { - auto i = options.find(key); - return i == options.end() ? def : i->second; - } - - uint64_t getIntOption(const std::string & key, uint64_t def = 0) - { - auto i = options.find(key); - return i == options.end() ? def : std::stoll(i->second); - } - - bool getBoolOption(const std::string & key, bool def = false) - { - auto i = options.find(key); - return i == options.end() ? def : (i->second == "true" || i->second == "1"); - } -}; - -typedef std::pair JobsetName; - -class JobsetId { - public: - - std::string project; - std::string jobset; - int id; - - - JobsetId(const std::string & project, const std::string & jobset, int id) - : project{ project }, jobset{ jobset }, id{ id } - { - } - - friend bool operator== (const JobsetId & lhs, const JobsetId & rhs); - friend bool operator!= (const JobsetId & lhs, const JobsetId & rhs); - friend bool operator< (const JobsetId & lhs, const JobsetId & rhs); - - - friend bool operator== (const JobsetId & lhs, const JobsetName & rhs); - friend bool operator!= (const JobsetId & lhs, const JobsetName & rhs); - - std::string display() const { - return boost::str(boost::format("%1%:%2% (jobset#%3%)") % project % jobset % id); - } -}; -bool operator==(const JobsetId & lhs, const JobsetId & rhs) -{ - return lhs.id == rhs.id; -} - -bool operator!=(const JobsetId & lhs, const JobsetId & rhs) -{ - return lhs.id != rhs.id; -} - -bool operator<(const JobsetId & lhs, const JobsetId & rhs) -{ - return lhs.id < rhs.id; -} - -bool operator==(const JobsetId & lhs, const JobsetName & rhs) -{ - return lhs.project == rhs.first && lhs.jobset == rhs.second; -} - -bool operator!=(const JobsetId & lhs, const JobsetName & rhs) -{ - return ! (lhs == rhs); -} - -enum class EvaluationStyle -{ - SCHEDULE = 1, - ONESHOT = 2, - ONE_AT_A_TIME = 3, -}; - -struct Evaluator -{ - std::unique_ptr config; - - nix::Pool dbPool; - - struct Jobset - { - JobsetId name; - std::optional evaluation_style; - time_t lastCheckedTime, triggerTime; - int checkInterval; - Pid pid; - }; - - typedef std::map Jobsets; - - std::optional evalOne; - - const size_t maxEvals; - - struct State - { - size_t runningEvals = 0; - Jobsets jobsets; - }; - - Sync state_; - - std::condition_variable childStarted; - std::condition_variable maybeDoWork; - - const time_t notTriggered = std::numeric_limits::max(); - - Evaluator() - : config(std::make_unique()) - , maxEvals(std::max((size_t) 1, (size_t) config->getIntOption("max_concurrent_evals", 4))) - { } - - void readJobsets() - { - auto conn(dbPool.get()); - - pqxx::work txn(*conn); - - auto res = txn.exec - ("select j.id as id, project, j.name, lastCheckedTime, triggerTime, checkInterval, j.enabled as jobset_enabled " - "from Jobsets j " - "join Projects p on j.project = p.name " - "where j.enabled != 0 and p.enabled != 0"); - - - auto state(state_.lock()); - - std::set seen; - - for (auto const & row : res) { - auto name = JobsetId{row["project"].as(), row["name"].as(), row["id"].as()}; - - if (evalOne && name != *evalOne) continue; - - auto res = state->jobsets.try_emplace(name, Jobset{name}); - - auto & jobset = res.first->second; - jobset.lastCheckedTime = row["lastCheckedTime"].as(0); - jobset.triggerTime = row["triggerTime"].as(notTriggered); - jobset.checkInterval = row["checkInterval"].as(); - switch (row["jobset_enabled"].as(0)) { - case 1: - jobset.evaluation_style = EvaluationStyle::SCHEDULE; - break; - case 2: - jobset.evaluation_style = EvaluationStyle::ONESHOT; - break; - case 3: - jobset.evaluation_style = EvaluationStyle::ONE_AT_A_TIME; - break; - } - - seen.insert(name); - } - - if (evalOne && seen.empty()) { - printError("the specified jobset does not exist or is disabled"); - std::_Exit(1); - } - - for (auto i = state->jobsets.begin(); i != state->jobsets.end(); ) - if (seen.count(i->first)) - ++i; - else { - printInfo("forgetting jobset ‘%s’", i->first.display()); - i = state->jobsets.erase(i); - } - } - - void startEval(State & state, Jobset & jobset) - { - time_t now = time(0); - - printInfo("starting evaluation of jobset ‘%s’ (last checked %d s ago)", - jobset.name.display(), - now - jobset.lastCheckedTime); - - { - auto conn(dbPool.get()); - pqxx::work txn(*conn); - txn.exec("update Jobsets set startTime = $1 where id = $2", - pqxx::params{now, jobset.name.id}).no_rows(); - txn.commit(); - } - - assert(jobset.pid == -1); - - jobset.pid = startProcess([&]() { - Strings args = { "hydra-eval-jobset", jobset.name.project, jobset.name.jobset }; - execvp(args.front().c_str(), stringsToCharPtrs(args).data()); - throw SysError("executing ‘%1%’", args.front()); - }); - - state.runningEvals++; - - childStarted.notify_one(); - } - - bool shouldEvaluate(Jobset & jobset) - { - if (jobset.pid != -1) { - // Already running. - debug("shouldEvaluate %s? no: already running", - jobset.name.display()); - return false; - } - - if (jobset.triggerTime != std::numeric_limits::max()) { - // An evaluation of this Jobset is requested - debug("shouldEvaluate %s? yes: requested", - jobset.name.display()); - return true; - } - - if (jobset.checkInterval <= 0) { - // Automatic scheduling is disabled. We allow requested - // evaluations, but never schedule start one. - debug("shouldEvaluate %s? no: checkInterval <= 0", - jobset.name.display()); - return false; - } - - if (jobset.lastCheckedTime + jobset.checkInterval <= time(0)) { - // Time to schedule a fresh evaluation. If the jobset - // is a ONE_AT_A_TIME jobset, ensure the previous jobset - // has no remaining, unfinished work. - - auto conn(dbPool.get()); - - pqxx::work txn(*conn); - - if (jobset.evaluation_style == EvaluationStyle::ONE_AT_A_TIME) { - auto evaluation_res = txn.exec - ("select id from JobsetEvals " - "where jobset_id = $1 " - "order by id desc limit 1" - ,jobset.name.id - ); - - if (evaluation_res.empty()) { - // First evaluation, so allow scheduling. - debug("shouldEvaluate(one-at-a-time) %s? yes: no prior eval", - jobset.name.display()); - return true; - } - - auto evaluation_id = evaluation_res[0][0].as(); - - auto unfinished_build_res = txn.exec - ("select id from Builds " - "join JobsetEvalMembers " - " on (JobsetEvalMembers.build = Builds.id) " - "where JobsetEvalMembers.eval = $1 " - " and builds.finished = 0 " - " limit 1" - ,evaluation_id - ); - - // If the previous evaluation has no unfinished builds - // schedule! - if (unfinished_build_res.empty()) { - debug("shouldEvaluate(one-at-a-time) %s? yes: no unfinished builds", - jobset.name.display()); - return true; - } else { - debug("shouldEvaluate(one-at-a-time) %s:%s? no: at least one unfinished build", - jobset.name.display()); - return false; - } - - - } else { - // EvaluationStyle::ONESHOT, EvaluationStyle::SCHEDULED - debug("shouldEvaluate(oneshot/scheduled) %s? yes: checkInterval elapsed", - jobset.name.display()); - return true; - } - } - - return false; - } - - void startEvals(State & state) - { - std::vector sorted; - - /* Filter out jobsets that have been evaluated recently and have - not been triggered. */ - for (auto i = state.jobsets.begin(); i != state.jobsets.end(); ++i) - if (evalOne || - (i->second.evaluation_style && shouldEvaluate(i->second))) - sorted.push_back(i); - - /* Put jobsets in order of ascending trigger time, last checked - time, and name. */ - std::sort(sorted.begin(), sorted.end(), - [](const Jobsets::iterator & a, const Jobsets::iterator & b) { - return - a->second.triggerTime != b->second.triggerTime - ? a->second.triggerTime < b->second.triggerTime - : a->second.lastCheckedTime != b->second.lastCheckedTime - ? a->second.lastCheckedTime < b->second.lastCheckedTime - : a->first < b->first; - }); - - /* Start jobset evaluations up to the concurrency limit.*/ - for (auto & i : sorted) { - if (state.runningEvals >= maxEvals) break; - startEval(state, i->second); - } - } - - void loop() - { - auto state(state_.lock()); - - while (true) { - - time_t now = time(0); - - std::chrono::seconds sleepTime = std::chrono::seconds::max(); - - if (state->runningEvals < maxEvals) { - for (auto & i : state->jobsets) - if (i.second.pid == -1 && - i.second.checkInterval > 0) - sleepTime = std::min(sleepTime, std::chrono::seconds( - std::max((time_t) 1, i.second.lastCheckedTime - now + i.second.checkInterval))); - } - - debug("waiting for %d s", sleepTime.count()); - if (sleepTime == std::chrono::seconds::max()) - state.wait(maybeDoWork); - else - state.wait_for(maybeDoWork, sleepTime); - - startEvals(*state); - } - } - - /* A thread that listens to PostgreSQL notifications about jobset - changes, updates the jobsets map, and signals the main thread - to start evaluations. */ - void databaseMonitor() - { - while (true) { - - try { - - auto conn(dbPool.get()); - - receiver jobsetsAdded(*conn, "jobsets_added"); - receiver jobsetsDeleted(*conn, "jobsets_deleted"); - receiver jobsetsChanged(*conn, "jobset_scheduling_changed"); - - while (true) { - /* Note: we read/notify before - await_notification() to ensure we don't miss a - state change. */ - readJobsets(); - maybeDoWork.notify_one(); - conn->await_notification(); - printInfo("received jobset event"); - } - - } catch (pqxx::broken_connection & e) { - printError("Database connection broken: %s", e.what()); - std::_Exit(1); - } catch (std::exception & e) { - printError("exception in database monitor thread: %s", e.what()); - sleep(30); - } - } - } - - /* A thread that reaps child processes.*/ - void reaper() - { - while (true) { - { - auto state(state_.lock()); - while (!state->runningEvals) - state.wait(childStarted); - } - - int status; - pid_t pid = waitpid(-1, &status, 0); - if (pid == -1) { - if (errno == EINTR) continue; - throw SysError("waiting for children"); - } - - { - auto state(state_.lock()); - assert(state->runningEvals); - state->runningEvals--; - - // FIXME: should use a map. - for (auto & i : state->jobsets) { - auto & jobset(i.second); - - if (jobset.pid == pid) { - printInfo("evaluation of jobset ‘%s’ %s", - jobset.name.display(), statusToString(status)); - - auto now = time(0); - - jobset.triggerTime = notTriggered; - jobset.lastCheckedTime = now; - - try { - - auto conn(dbPool.get()); - pqxx::work txn(*conn); - - /* Clear the trigger time to prevent this - jobset from getting stuck in an endless - failing eval loop. */ - txn.exec - ("update Jobsets set triggerTime = null where id = $1 and startTime is not null and triggerTime <= startTime", - jobset.name.id).no_rows(); - - /* Clear the start time. */ - txn.exec - ("update Jobsets set startTime = null where id = $1", - jobset.name.id).no_rows(); - - if (!WIFEXITED(status) || WEXITSTATUS(status) > 1) { - txn.exec("update Jobsets set errorMsg = $1, lastCheckedTime = $2, errorTime = $2, fetchErrorMsg = null where id = $3", - pqxx::params{fmt("evaluation %s", statusToString(status)), now, jobset.name.id}).no_rows(); - } - - txn.commit(); - - } catch (std::exception & e) { - printError("exception setting jobset error: %s", e.what()); - } - - jobset.pid.release(); - maybeDoWork.notify_one(); - - if (evalOne) std::_Exit(0); - - break; - } - } - } - } - } - - void unlock() - { - auto conn(dbPool.get()); - pqxx::work txn(*conn); - txn.exec("update Jobsets set startTime = null").no_rows(); - txn.commit(); - } - - void run() - { - unlock(); - - /* Can't be bothered to shut down cleanly. Goodbye! */ - auto callback = createInterruptCallback([&]() { std::_Exit(1); }); - - std::thread reaperThread([&]() { reaper(); }); - - std::thread monitorThread([&]() { databaseMonitor(); }); - - while (true) { - try { - loop(); - } catch (pqxx::broken_connection & e) { - printError("Database connection broken: %s", e.what()); - std::_Exit(1); - } catch (std::exception & e) { - printError("exception in main loop: %s", e.what()); - sleep(30); - } - } - } -}; - -int main(int argc, char * * argv) -{ - return handleExceptions(argv[0], [&]() { - initNix(); - - signal(SIGINT, SIG_DFL); - signal(SIGTERM, SIG_DFL); - signal(SIGHUP, SIG_DFL); - - bool unlock = false; - - Evaluator evaluator; - - std::vector args; - - parseCmdLine(argc, argv, [&](Strings::iterator & arg, const Strings::iterator & end) { - if (*arg == "--unlock") - unlock = true; - else if (hasPrefix(*arg, "-")) - return false; - args.push_back(*arg); - return true; - }); - - - if (unlock) - evaluator.unlock(); - else { - if (!args.empty()) { - if (args.size() != 2) throw UsageError("Syntax: hydra-evaluator [ ]"); - evaluator.evalOne = JobsetName(args[0], args[1]); - } - evaluator.run(); - } - }); -} diff --git a/subprojects/hydra/hydra-evaluator/meson.build b/subprojects/hydra/hydra-evaluator/meson.build deleted file mode 100644 index 76561071c..000000000 --- a/subprojects/hydra/hydra-evaluator/meson.build +++ /dev/null @@ -1,9 +0,0 @@ -hydra_evaluator = executable('hydra-evaluator', - 'hydra-evaluator.cc', - dependencies: [ - nix_util_dep, - nix_main_dep, - pqxx_dep, - ], - install: true, -) diff --git a/subprojects/hydra/meson.build b/subprojects/hydra/meson.build index 6a0f46a87..c6bb71085 100644 --- a/subprojects/hydra/meson.build +++ b/subprojects/hydra/meson.build @@ -1,28 +1,12 @@ -project('hydra', 'cpp', +project('hydra', version: files('version.txt'), license: 'GPL-3.0', default_options: [ 'debug=true', 'optimization=2', - 'cpp_std=c++23', ], ) -nix_util_dep = dependency('nix-util', required: true) -nix_store_dep = dependency('nix-store', required: true) -nix_main_dep = dependency('nix-main', required: true) - -pqxx_dep = dependency('libpqxx', required: true) - -prom_cpp_core_dep = dependency('prometheus-cpp-core', required: true) -prom_cpp_pull_dep = dependency('prometheus-cpp-pull', required: true) - -# Native code -subdir('hydra-evaluator') - -# Make executables available to sibling subprojects (e.g. hydra-tests) -meson.override_find_program('hydra-evaluator', hydra_evaluator) - hydra_libexecdir = get_option('libexecdir') / 'hydra' # Data and interpreted diff --git a/subprojects/hydra/package.nix b/subprojects/hydra/package.nix index e700762f5..684a76925 100644 --- a/subprojects/hydra/package.nix +++ b/subprojects/hydra/package.nix @@ -16,18 +16,11 @@ , meson , ninja , nukeReferences -, pkg-config , unzip -, libpqxx -, openssl , bzip2 -, libxslt , perl , pixz -, boost -, nlohmann_json -, prometheus-cpp , openssh , coreutils @@ -150,24 +143,14 @@ stdenv.mkDerivation (finalAttrs: { meson ninja nukeReferences - pkg-config perlDeps perl unzip ]; buildInputs = [ - libpqxx - openssl - libxslt - nixComponents.nix-util - nixComponents.nix-store - nixComponents.nix-main perlDeps perl - boost - nlohmann_json - prometheus-cpp ]; hydraPath = lib.makeBinPath ( diff --git a/subprojects/rust-package.nix b/subprojects/rust-package.nix new file mode 100644 index 000000000..1f60061c4 --- /dev/null +++ b/subprojects/rust-package.nix @@ -0,0 +1,78 @@ +{ lib +, version + +, rustPlatform + +, nixComponents +, protobuf +, pkg-config +, rust-jemalloc-sys +}: + +rustPlatform.buildRustPackage { + pname = "hydra-rust"; + inherit version; + + src = lib.fileset.toSource { + root = ../.; + fileset = lib.fileset.unions [ + ../Cargo.toml + ../Cargo.lock + ../.cargo + ../.sqlx + ../subprojects/hydra-queue-runner/Cargo.toml + ../subprojects/hydra-queue-runner/build.rs + ../subprojects/hydra-queue-runner/src + ../subprojects/hydra-queue-runner/examples + ../subprojects/hydra-builder/Cargo.toml + ../subprojects/hydra-builder/build.rs + ../subprojects/hydra-builder/src + ../subprojects/hydra-evaluator/Cargo.toml + ../subprojects/hydra-evaluator/src + ../subprojects/crates + ../subprojects/proto + ]; + }; + + cargoLock = { + lockFile = ../Cargo.lock; + outputHashes = { + "harmonia-store-core-0.0.0-alpha.0" = "sha256-FDL2xxAFOYw21VhGYake2fFC9S7jK5kBSM4OfU12VmQ="; + }; + }; + + outputs = [ + # TODO: build crates separately so each is its own derivation, or at the + # very least drop "out" + "out" + "queue_runner" + "builder" + "evaluator" + ]; + + nativeBuildInputs = [ + pkg-config + protobuf + ]; + + buildInputs = [ + nixComponents.nix-main + protobuf + rust-jemalloc-sys + ]; + + postInstall = '' + mkdir -p $queue_runner/bin $builder/bin $evaluator/bin + mv $out/bin/hydra-queue-runner $queue_runner/bin/ + mv $out/bin/hydra-builder $builder/bin/ + mv $out/bin/hydra-evaluator $evaluator/bin/ + ln -s $queue_runner/bin/hydra-queue-runner $out/bin/hydra-queue-runner + ln -s $builder/bin/hydra-builder $out/bin/hydra-builder + ln -s $evaluator/bin/hydra-evaluator $out/bin/hydra-evaluator + ''; + + # FIXME: get these passing in a prod build + doCheck = false; + + meta.description = "Hydra Rust binaries (queue runner, builder, evaluator)"; +}