diff --git a/Cargo.lock b/Cargo.lock index d9bdce5b7..36e37aa8a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -317,6 +317,7 @@ dependencies = [ "bytes", "configparser", "daemon-client-utils", + "eyre", "foldhash 0.2.0", "fs-err", "futures", @@ -1762,7 +1763,6 @@ dependencies = [ name = "hydra-builder" version = "0.1.0" dependencies = [ - "anyhow", "async-compression", "async-stream", "backon", @@ -1838,7 +1838,6 @@ dependencies = [ name = "hydra-queue-runner" version = "0.1.0" dependencies = [ - "anyhow", "arc-swap", "async-compression", "atomic_float", @@ -1847,6 +1846,7 @@ dependencies = [ "byte-unit", "bytes", "clap", + "color-eyre", "daemon-client-utils", "db", "fs-err", diff --git a/Cargo.toml b/Cargo.toml index 6e6c6ff70..6cbeaf9fc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,6 @@ rust-version = "1.91.0" version = "0.1.0" [workspace.dependencies] -anyhow = "1.0.98" arc-swap = "1.7" async-compression = { version = "0.4", default-features = false } async-stream = "0.3" diff --git a/subprojects/crates/binary-cache/Cargo.toml b/subprojects/crates/binary-cache/Cargo.toml index 91efb8056..3b96b79f9 100644 --- a/subprojects/crates/binary-cache/Cargo.toml +++ b/subprojects/crates/binary-cache/Cargo.toml @@ -51,5 +51,6 @@ harmonia-utils-io.workspace = true harmonia-utils-signature.workspace = true [dev-dependencies] +eyre = "0.6" hydra-tracing.workspace = true tempfile = "3.23.0" diff --git a/subprojects/crates/binary-cache/examples/download_file.rs b/subprojects/crates/binary-cache/examples/download_file.rs index 28c4c8f05..588b150d9 100644 --- a/subprojects/crates/binary-cache/examples/download_file.rs +++ b/subprojects/crates/binary-cache/examples/download_file.rs @@ -1,7 +1,7 @@ use binary_cache::S3BinaryCacheClient; #[tokio::main] -async fn main() -> Result<(), Box> { +async fn main() -> eyre::Result<()> { let _tracing_guard = hydra_tracing::init()?; let client = S3BinaryCacheClient::new( "s3://store?region=unknown&endpoint=http://localhost:9000&scheme=http&write-nar-listing=1&ls-compression=br&log-compression=br&profile=local_nix_store".parse()?, diff --git a/subprojects/crates/binary-cache/examples/query_missing_paths.rs b/subprojects/crates/binary-cache/examples/query_missing_paths.rs index 58eceaa6d..9871eb2cd 100644 --- a/subprojects/crates/binary-cache/examples/query_missing_paths.rs +++ b/subprojects/crates/binary-cache/examples/query_missing_paths.rs @@ -2,7 +2,7 @@ use binary_cache::S3BinaryCacheClient; use harmonia_store_path::StorePath; #[tokio::main] -async fn main() -> Result<(), Box> { +async fn main() -> eyre::Result<()> { let _tracing_guard = hydra_tracing::init()?; let nix_config = daemon_client_utils::parse_nix_remote().unwrap(); let store = harmonia_store_remote::ConnectionPool::new( diff --git a/subprojects/crates/binary-cache/examples/simple_presigned.rs b/subprojects/crates/binary-cache/examples/simple_presigned.rs index 44bcba7ae..7c22e79f4 100644 --- a/subprojects/crates/binary-cache/examples/simple_presigned.rs +++ b/subprojects/crates/binary-cache/examples/simple_presigned.rs @@ -4,7 +4,7 @@ use binary_cache::{PresignedUploadClient, S3BinaryCacheClient, path_to_narinfo}; use harmonia_store_path::StorePath; #[tokio::main] -async fn main() -> Result<(), Box> { +async fn main() -> eyre::Result<()> { let now = std::time::Instant::now(); let _tracing_guard = hydra_tracing::init()?; @@ -51,7 +51,7 @@ async fn main() -> Result<(), Box> { client .upload_narinfo_after_presigned_upload(&store, narinfo) .await?; - Ok::<(), Box>(()) + Ok::<(), eyre::Report>(()) } }) .buffered(10); diff --git a/subprojects/crates/binary-cache/examples/upload_file.rs b/subprojects/crates/binary-cache/examples/upload_file.rs index 6f5dc3801..8a2b85a87 100644 --- a/subprojects/crates/binary-cache/examples/upload_file.rs +++ b/subprojects/crates/binary-cache/examples/upload_file.rs @@ -2,7 +2,7 @@ use binary_cache::S3BinaryCacheClient; use harmonia_store_path::StorePath; #[tokio::main] -async fn main() -> Result<(), Box> { +async fn main() -> eyre::Result<()> { let now = std::time::Instant::now(); let _tracing_guard = hydra_tracing::init()?; diff --git a/subprojects/crates/binary-cache/examples/upload_logs.rs b/subprojects/crates/binary-cache/examples/upload_logs.rs index 1084b640e..65d9ac4c3 100644 --- a/subprojects/crates/binary-cache/examples/upload_logs.rs +++ b/subprojects/crates/binary-cache/examples/upload_logs.rs @@ -1,7 +1,7 @@ use binary_cache::S3BinaryCacheClient; #[tokio::main] -async fn main() -> Result<(), Box> { +async fn main() -> eyre::Result<()> { let _tracing_guard = hydra_tracing::init()?; let client = S3BinaryCacheClient::new( "s3://store?region=unknown&endpoint=http://localhost:9000&scheme=http&write-nar-listing=1&ls-compression=br&log-compression=br&profile=local_nix_store".parse()?, diff --git a/subprojects/crates/binary-cache/examples/upload_realisation.rs b/subprojects/crates/binary-cache/examples/upload_realisation.rs index 14fd277c7..cd9f8d041 100644 --- a/subprojects/crates/binary-cache/examples/upload_realisation.rs +++ b/subprojects/crates/binary-cache/examples/upload_realisation.rs @@ -1,7 +1,7 @@ use binary_cache::S3BinaryCacheClient; #[tokio::main] -async fn main() -> Result<(), Box> { +async fn main() -> eyre::Result<()> { let _tracing_guard = hydra_tracing::init()?; let nix_config = daemon_client_utils::parse_nix_remote().unwrap(); let _local = harmonia_store_remote::ConnectionPool::new( diff --git a/subprojects/crates/db/src/error.rs b/subprojects/crates/db/src/error.rs index d10314b6e..d1e53282c 100644 --- a/subprojects/crates/db/src/error.rs +++ b/subprojects/crates/db/src/error.rs @@ -45,3 +45,9 @@ impl From for Error { } pub type Result = std::result::Result; + +/// Error from parsing a database connection URL. This is a config +/// error, not a connection error — no database was contacted. +#[derive(Debug, thiserror::Error)] +#[error("bad database configuration: {0}")] +pub struct DbConfigurationError(pub Box); diff --git a/subprojects/crates/db/src/lib.rs b/subprojects/crates/db/src/lib.rs index c6462822d..805cc2a6c 100644 --- a/subprojects/crates/db/src/lib.rs +++ b/subprojects/crates/db/src/lib.rs @@ -20,7 +20,7 @@ pub mod models; use std::str::FromStr as _; pub use connection::{Connection, Transaction}; -pub use error::{DataError, Error, Result}; +pub use error::{DataError, DbConfigurationError, Error, Result}; pub use harmonia_store_path::StoreDir; #[derive(Debug, Clone)] @@ -43,12 +43,26 @@ impl Database { Ok(Connection::new(conn)) } + /// Re-configure the connection pool with a new URL. + /// + /// This only parses and stores the new options — it does **not** + /// contact the database. + // TODO: ability to change max_connections by dropping the pool and recreating it #[tracing::instrument(skip(self, url), err)] - pub fn reconfigure_pool(&self, url: &str) -> Result<()> { - // TODO: ability to change max_connections by dropping the pool and recreating it - self.pool - .set_connect_options(sqlx::postgres::PgConnectOptions::from_str(url)?); - Ok(()) + pub fn reconfigure_pool(&self, url: &str) -> std::result::Result<(), DbConfigurationError> { + match sqlx::postgres::PgConnectOptions::from_str(url) { + Ok(options) => { + self.pool.set_connect_options(options); + Ok(()) + } + Err(sqlx::Error::Configuration(e)) => Err(DbConfigurationError(e)), + Err(e) => { + // PgConnectOptions::from_str only produces Configuration + // errors. If this changes in a future sqlx version, fail + // loudly rather than silently swallowing it. + panic!("unexpected error from PgConnectOptions::from_str: {e}") + } + } } pub async fn listener( diff --git a/subprojects/crates/nix-support/src/lib.rs b/subprojects/crates/nix-support/src/lib.rs index a09544f88..20686d9bb 100644 --- a/subprojects/crates/nix-support/src/lib.rs +++ b/subprojects/crates/nix-support/src/lib.rs @@ -534,7 +534,7 @@ mod tests { let fs = FilesystemOperations { real_store_dir: store_dir.to_path().to_owned(), }; - let bp = Box::pin(parse_build_product(&store_dir, &fs, line)).await; + let bp = parse_build_product(&store_dir, &fs, line).await; assert!(bp.is_none()); } diff --git a/subprojects/hydra-builder/Cargo.toml b/subprojects/hydra-builder/Cargo.toml index 5a120489f..92ef5c319 100644 --- a/subprojects/hydra-builder/Cargo.toml +++ b/subprojects/hydra-builder/Cargo.toml @@ -9,7 +9,6 @@ rust-version.workspace = true sd-notify.workspace = true tracing.workspace = true -anyhow.workspace = true clap = { workspace = true, features = [ "derive" ] } color-eyre.workspace = true fs-err = { workspace = true, features = [ "tokio" ] } diff --git a/subprojects/hydra-builder/src/error.rs b/subprojects/hydra-builder/src/error.rs index a5cde2e86..81d2720b5 100644 --- a/subprojects/hydra-builder/src/error.rs +++ b/subprojects/hydra-builder/src/error.rs @@ -1,3 +1,5 @@ +use color_eyre::eyre; + #[derive(Debug, thiserror::Error)] pub enum BuilderError { #[error("environment variable {0} not set")] @@ -22,7 +24,7 @@ pub enum BuilderError { ParseNixStore(String), #[error("Loading Nix configuration")] - LoadNixConfig(#[source] anyhow::Error), + LoadNixConfig(#[source] eyre::Report), #[error("Gateway API missing host")] GatewayMissingHost, @@ -43,13 +45,13 @@ pub enum BuilderError { VersionIncompatible(String), #[error("Reading system information")] - ReadingSystemInfo(#[source] anyhow::Error), + ReadingSystemInfo(#[source] eyre::Report), #[error("Failed to communicate {0} times over the channel. Terminating the application.")] RepeatedFailure(u32), #[error("While handling request")] - HandlingRequest(#[source] anyhow::Error), + HandlingRequest(#[source] eyre::Report), #[error("Task failed")] Task(#[from] tokio::task::JoinError), diff --git a/subprojects/hydra-builder/src/grpc.rs b/subprojects/hydra-builder/src/grpc.rs index 970645844..595930200 100644 --- a/subprojects/hydra-builder/src/grpc.rs +++ b/subprojects/hydra-builder/src/grpc.rs @@ -5,6 +5,7 @@ use std::sync::atomic::Ordering; use tonic::{Request, service::interceptor::InterceptedService, transport::Channel}; use harmonia_store_path::StorePath; + use hydra_proto::{ BuilderRequest, VersionCheckRequest, builder_request, runner_request, runner_service_client::RunnerServiceClient, @@ -195,12 +196,12 @@ async fn handle_request( runner_request::Message::Build(m) => { state .schedule_build(m) - .map_err(BuilderError::HandlingRequest)?; + .map_err(|e| BuilderError::HandlingRequest(e.into()))?; } runner_request::Message::Abort(m) => { state .abort_build(&m) - .map_err(BuilderError::HandlingRequest)?; + .map_err(|e| BuilderError::HandlingRequest(e.into()))?; } } Ok(()) @@ -244,8 +245,11 @@ pub async fn start_bidirectional_stream( let join_msg = state .get_join_message() .await - .map_err(BuilderError::ReadingSystemInfo)?; + .map_err(|e| BuilderError::ReadingSystemInfo(e.into()))?; let state2 = state.clone(); + let ping_error: Arc>> = + Arc::new(std::sync::Mutex::new(None)); + let ping_error2 = ping_error.clone(); let ping_stream = async_stream::stream! { yield BuilderRequest { message: Some(builder_request::Message::Join(join_msg)) @@ -256,16 +260,17 @@ pub async fn start_bidirectional_stream( interval.tick().await; let ping = match state.get_ping_message() { - Ok(v) => builder_request::Message::Ping(v), + Ok(v) => v, Err(e) => { - tracing::error!("Failed to construct ping message: {e}"); - continue + *ping_error2.lock().expect("ping error mutex poisoned") = + Some(BuilderError::ReadingSystemInfo(e.into())); + break; }, }; tracing::debug!("sending ping: {ping:?}"); yield BuilderRequest { - message: Some(ping) + message: Some(builder_request::Message::Ping(ping)) }; } }; @@ -283,17 +288,12 @@ pub async fn start_bidirectional_stream( } }; - let mut consecutive_failure_count = 0; + let mut consecutive_failure_count: u32 = 0; while let Some(item) = stream.next().await { - match item.map(|v| v.message) { - Ok(Some(v)) => { - consecutive_failure_count = 0; - if let Err(err) = handle_request(state2.clone(), v).await { - tracing::error!("Failed to correctly handle request: {err}"); - } - } - Ok(None) => { + let item = match item { + Ok(v) => { consecutive_failure_count = 0; + v } Err(e) => { consecutive_failure_count += 1; @@ -301,8 +301,15 @@ pub async fn start_bidirectional_stream( if consecutive_failure_count == 10 { return Err(BuilderError::RepeatedFailure(consecutive_failure_count)); } + continue; } + }; + if let Some(msg) = item.message { + handle_request(state2.clone(), msg).await?; } } + if let Some(e) = ping_error.lock().expect("ping error mutex poisoned").take() { + return Err(e); + } Ok(()) } diff --git a/subprojects/hydra-builder/src/nix_config.rs b/subprojects/hydra-builder/src/nix_config.rs index 94c43354e..f5456b9c4 100644 --- a/subprojects/hydra-builder/src/nix_config.rs +++ b/subprojects/hydra-builder/src/nix_config.rs @@ -2,6 +2,17 @@ use std::collections::HashMap; +/// Errors from loading nix configuration. +#[derive(Debug, thiserror::Error)] +pub enum NixConfigError { + #[error(transparent)] + Io(#[from] std::io::Error), + #[error(transparent)] + Json(#[from] serde_json::Error), + #[error("nix show-config failed: {0}")] + Command(String), +} + /// Cached nix configuration values. #[derive(Debug, Clone)] pub struct NixConfig { @@ -10,7 +21,7 @@ pub struct NixConfig { impl NixConfig { /// Read nix configuration by running `nix show-config --json`. - pub fn load() -> anyhow::Result { + pub fn load() -> Result { let output = std::process::Command::new("nix") .args([ "--extra-experimental-features", @@ -20,10 +31,11 @@ impl NixConfig { ]) .output()?; if !output.status.success() { - anyhow::bail!( - "nix show-config failed: {}", - str::from_utf8(&output.stderr).unwrap_or("Invalid UTF-8") - ); + return Err(NixConfigError::Command( + str::from_utf8(&output.stderr) + .unwrap_or("Invalid UTF-8") + .to_owned(), + )); } let values: HashMap = serde_json::from_slice(&output.stdout)?; Ok(Self { values }) diff --git a/subprojects/hydra-builder/src/state.rs b/subprojects/hydra-builder/src/state.rs index a9703a19d..b9b8a9b6a 100644 --- a/subprojects/hydra-builder/src/state.rs +++ b/subprojects/hydra-builder/src/state.rs @@ -3,8 +3,20 @@ use std::sync::Arc; use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; use std::time::Instant; -use anyhow::Context as _; use backon::RetryableWithContext as _; + +/// Errors from builder state management (scheduling/aborting builds). +#[derive(Debug, thiserror::Error)] +pub enum StateError { + #[error("builder is halting")] + Halting, + + #[error("missing drv in build message")] + MissingDrv, + + #[error("invalid UUID: {0}")] + Uuid(#[from] uuid::Error), +} use futures::TryFutureExt as _; use harmonia_protocol::daemon_wire::types2::BuildResultSuccess; use harmonia_store_remote::DaemonStore as _; @@ -15,7 +27,7 @@ use crate::grpc::BuilderClient; use crate::types::BuildTimings; use binary_cache::{Compression, PresignedUpload, PresignedUploadClient}; use harmonia_store_derivation::derived_path::OutputName; -use harmonia_store_path::StorePath; +use harmonia_store_path::{ParseStorePathError, StorePath}; use hydra_proto::ProtoStorePath; use hydra_proto::{ AbortMessage, BuildMessage, BuildResultInfo, BuildResultState, JoinMessage, OutputInfo, @@ -31,28 +43,105 @@ fn retry_strategy() -> backon::ExponentialBuilder { .with_max_delay(RETRY_MAX_DELAY) } +// Per-phase error types are defined next to the functions that use +// them (see below). JobFailure ties them together. + +/// Errors from parsing build outputs after a successful build. +/// Used by `process_build`. +#[derive(Debug, thiserror::Error)] +pub enum BuildOutputParseError { + #[error(transparent)] + Elapsed(#[from] tokio_stream::Elapsed), + #[error(transparent)] + Io(#[from] std::io::Error), + #[error(transparent)] + Json(#[from] serde_json::Error), + #[error(transparent)] + StorePath(#[from] ParseStorePathError), + #[error(transparent)] + Store(#[from] harmonia_protocol::types::DaemonError), + #[error("child did not print outputs")] + NoOutputs, + #[error("nix built {got} derivations, expecting 1")] + UnexpectedDerivationCount { got: usize }, + #[error("nix returned outputs for {actual} when we expected {expected}")] + DrvPathMismatch { + actual: StorePath, + expected: StorePath, + }, + #[error("missing path info for output `{0}`")] + MissingPathInfo(String), +} + +/// Errors from the daemon build request. +#[derive(Debug, thiserror::Error)] +pub enum BuildError { + #[error(transparent)] + Daemon(#[from] harmonia_protocol::types::DaemonError), + #[error("build_paths_with_results returned {got} results, expected 1")] + UnexpectedResultCount { got: usize }, + #[error("build failed: {status:?}: {msg}")] + Failed { + status: harmonia_protocol::daemon_wire::types2::FailureStatus, + msg: String, + }, +} + +/// Submit a build result to the queue-runner with retries. +async fn submit_build_result( + client: &BuilderClient, + result: BuildResultInfo, + context: &'static str, +) -> Result<(), tonic::Status> { + let (_, res) = (|tuple: (BuilderClient, BuildResultInfo)| async { + let (mut client, body) = tuple; + let res = client.complete_build(body.clone()).await; + ((client, body), res) + }) + .retry(retry_strategy()) + .sleep(tokio::time::sleep) + .context((client.clone(), result)) + .notify(|err: &tonic::Status, dur: core::time::Duration| { + tracing::error!("{context}: err={err}, retrying in={dur:?}"); + }) + .await; + res.map(|_| ()) +} + + #[derive(thiserror::Error, Debug)] pub enum JobFailure { - #[error("Build failure")] - Build(#[source] anyhow::Error), - #[error("Preparing failure")] - Preparing(#[source] anyhow::Error), - #[error("Import failure")] - Import(#[source] anyhow::Error), - #[error("Upload failure")] - Upload(#[source] anyhow::Error), - #[error("Post processing failure")] - PostProcessing(#[source] anyhow::Error), + #[error("missing drv in build message")] + MissingDrv, + #[error("build failure")] + Build(#[source] BuildError), + #[error("build IO failure")] + BuildIo(#[source] std::io::Error), + #[error("preparing IO failure")] + PreparingIo(#[source] std::io::Error), + #[error("import failure")] + Import(#[source] ImportError), + #[error("upload failure")] + Upload(#[source] UploadError), + #[error("failed to stream build log to queue-runner")] + LogStream(#[source] tonic::Status), + #[error("post-processing failure")] + PostProcessing(#[source] BuildOutputParseError), + #[error("result construction failure")] + ResultInfo(#[source] ResultInfoError), } impl From<&JobFailure> for BuildResultState { fn from(item: &JobFailure) -> Self { match item { - JobFailure::Build(_) => Self::BuildFailure, - JobFailure::Preparing(_) => Self::PreparingFailure, + JobFailure::MissingDrv => Self::PreparingFailure, + JobFailure::Build(_) | JobFailure::BuildIo(_) => Self::BuildFailure, + JobFailure::PreparingIo(_) => Self::PreparingFailure, JobFailure::Import(_) => Self::ImportFailure, - JobFailure::Upload(_) => Self::UploadFailure, - JobFailure::PostProcessing(_) => Self::PostProcessingFailure, + JobFailure::Upload(_) | JobFailure::LogStream(_) => Self::UploadFailure, + JobFailure::PostProcessing(_) | JobFailure::ResultInfo(_) => { + Self::PostProcessingFailure + } } } } @@ -60,7 +149,7 @@ impl From<&JobFailure> for BuildResultState { #[derive(Debug)] pub struct BuildInfo { drv_path: StorePath, - handle: tokio::task::JoinHandle<()>, + handle: tokio::task::JoinHandle>, was_cancelled: Arc, } @@ -138,11 +227,12 @@ impl Drop for Gcroot { } } + impl State { #[tracing::instrument(err)] pub async fn new(cli: &super::config::Cli) -> Result, BuilderError> { let nix_config = - crate::nix_config::NixConfig::load().map_err(BuilderError::LoadNixConfig)?; + crate::nix_config::NixConfig::load().map_err(|e| BuilderError::LoadNixConfig(e.into()))?; let nix_remote = daemon_client_utils::parse_nix_remote().map_err(BuilderError::ParseNixStore)?; @@ -224,14 +314,16 @@ impl State { } #[tracing::instrument(skip(self), err)] - pub async fn get_join_message(&self) -> anyhow::Result { + pub async fn get_join_message( + &self, + ) -> Result { let sys = crate::system::BaseSystemInfo::new()?; Ok(JoinMessage { machine_id: self.id.to_string(), systems: self.config.systems.clone(), hostname: self.hostname.clone(), - cpu_count: u32::try_from(sys.cpu_count)?, + cpu_count: u32::try_from(sys.cpu_count).unwrap_or(u32::MAX), bogomips: sys.bogomips, speed_factor: self.config.speed_factor, max_jobs: self.config.max_jobs, @@ -252,7 +344,7 @@ impl State { } #[tracing::instrument(skip(self), err)] - pub fn get_ping_message(&self) -> anyhow::Result { + pub fn get_ping_message(&self) -> Result { let default_store = self.config.store_dir.to_string(); let store_path = self .config @@ -279,17 +371,13 @@ impl State { } #[tracing::instrument(skip(self, m), fields(drv=?m.drv))] - pub fn schedule_build(self: Arc, m: BuildMessage) -> anyhow::Result<()> { + pub fn schedule_build(self: Arc, m: BuildMessage) -> Result<(), StateError> { if self.halt.load(Ordering::SeqCst) { tracing::warn!("State is set to halt, will no longer accept new builds!"); - return Err(anyhow::anyhow!("State set to halt.")); + return Err(StateError::Halting); } - let drv = m - .drv - .clone() - .ok_or_else(|| anyhow::anyhow!("missing drv"))? - .0; + let drv = m.drv.clone().ok_or(StateError::MissingDrv)?.0; if self.contains_build(&drv) { return Ok(()); } @@ -311,15 +399,14 @@ impl State { Err(e) => { if was_cancelled.load(Ordering::SeqCst) { tracing::error!( - "Build of {drv} was cancelled, not reporting Error: {:?}", - anyhow::Error::new(e) + "Build of {drv} was cancelled, not reporting Error: {e:?}", ); - return; + return Err(e); } let result_state = BuildResultState::from(&e) as i32; - tracing::error!("Build of {drv} failed with: {:?}", anyhow::Error::new(e)); + tracing::error!("Build of {drv} failed with: {e:?}"); self_.remove_build(build_id); let failed_build = BuildResultInfo { build_id: build_id.to_string(), @@ -334,23 +421,16 @@ impl State { output_infos: std::collections::HashMap::new(), }; - if let (_, Err(e)) = (|tuple: (BuilderClient, BuildResultInfo)| async { - let (mut client, body) = tuple; - let res = client.complete_build(body.clone()).await; - ((client, body), res) - }) - .retry(retry_strategy()) - .sleep(tokio::time::sleep) - .context((self_.client.clone(), failed_build)) - .notify(|err: &tonic::Status, dur: core::time::Duration| { - tracing::error!("Failed to submit build failure info: err={err}, retrying in={dur:?}"); - }) + submit_build_result( + &self_.client, + failed_build, + "Failed to submit build failure info", + ) .await - { - tracing::error!("Failed to submit build failure info: {e}"); - } + .map_err(|e| JobFailure::Upload(UploadError::from(e)))?; } } + Ok(()) } }); @@ -388,7 +468,7 @@ impl State { } #[tracing::instrument(skip(self, m), fields(build_id=%m.build_id))] - pub fn abort_build(&self, m: &AbortMessage) -> anyhow::Result<()> { + pub fn abort_build(&self, m: &AbortMessage) -> Result<(), StateError> { tracing::info!("Try cancelling build"); let build_id = uuid::Uuid::parse_str(&m.build_id)?; if let Some(b) = self.remove_build(build_id) { @@ -414,9 +494,9 @@ impl State { max_log_size: u64, max_silent_time: i32, build_timeout: i32, - ) -> anyhow::Result { + ) -> Result { // Build the pre-resolved BasicDerivation; logs stream to the queue-runner. - let mut guard = pool.acquire().await.context("daemon connection failed")?; + let mut guard = pool.acquire().await?; let (log_tx, log_rx) = tokio::sync::mpsc::unbounded_channel::(); let log_stream = crate::utils::compressed_log_stream(drv, log_rx); @@ -475,7 +555,7 @@ impl State { } } drop(log_tx); - result_log.await.context("build_derivation failed")? + result_log.await? }; drop(guard); @@ -494,11 +574,12 @@ impl State { Ok(match build_result.inner { BuildResultInner::Success(s) => s, BuildResultInner::Failure(f) => { - return Err(anyhow::anyhow!( - "build failed: {:?}: {}", - f.status, - str::from_utf8(&f.error_msg).unwrap_or("Invalid UTF-8") - )); + return Err(BuildError::Failed { + status: f.status, + msg: str::from_utf8(&f.error_msg) + .unwrap_or("Invalid UTF-8") + .to_owned(), + }); } }) } @@ -511,16 +592,13 @@ impl State { timings: &mut BuildTimings, ) -> Result<(), JobFailure> { let machine_id = self.id; - let drv = m - .drv - .ok_or(JobFailure::Preparing(anyhow::anyhow!("missing drv")))? - .0; + let drv = m.drv.ok_or(JobFailure::MissingDrv)?.0; let before_import = Instant::now(); let gcroot_prefix = uuid::Uuid::new_v4().to_string(); let gcroot = self .get_gcroot(&gcroot_prefix) - .map_err(|e| JobFailure::Preparing(e.into()))?; + .map_err(JobFailure::PreparingIo)?; let mut client = self.client.clone(); let _ = client // we ignore the error here, as this step status has no prio @@ -534,12 +612,14 @@ impl State { // Decode the force-resolved BasicDerivation sent by the queue runner. let basic_drv: harmonia_store_derivation::derivation::BasicDerivation = m .resolved_drv - .ok_or(JobFailure::Preparing(anyhow::anyhow!( + .ok_or(JobFailure::PreparingIo(std::io::Error::other( "missing resolved_drv in BuildMessage", )))? .try_into() .map_err(|e: String| { - JobFailure::Import(anyhow::anyhow!("failed to decode resolved derivation: {e}")) + JobFailure::PreparingIo(std::io::Error::other( + format!("failed to decode resolved derivation: {e}"), + )) })?; // Fetch the transitive closure of the resolved inputs. @@ -548,7 +628,7 @@ impl State { paths: basic_drv.inputs.iter().map(ProtoStorePath::from).collect(), }) .await - .map_err(|e| JobFailure::Import(e.into()))? + .map_err(|e| JobFailure::Import(ImportError::from(e)))? .into_inner() .requisites; @@ -608,11 +688,10 @@ impl State { for (name, path) in &outputs { let info = daemon_client_utils::query_path_info(&self.pool, path) .await - .context("query_path_info failed") - .map_err(JobFailure::PostProcessing)? + .map_err(|e| JobFailure::PostProcessing(BuildOutputParseError::from(e)))? .ok_or_else(|| { - JobFailure::PostProcessing(anyhow::anyhow!( - "missing path info for output `{name}`" + JobFailure::PostProcessing(BuildOutputParseError::MissingPathInfo( + name.to_string(), )) })?; output_infos.insert( @@ -660,27 +739,17 @@ impl State { m.build_id.clone(), )) .await - .map_err(JobFailure::PostProcessing)?; + .map_err(JobFailure::ResultInfo)?; // This part is stupid, if writing doesnt work, we try to write a failure, maybe that works. // We retry to ensure that this almost never happens. - (|tuple: (BuilderClient, BuildResultInfo)| async { - let (mut client, body) = tuple; - let res = client.complete_build(body.clone()).await; - ((client, body), res) - }) - .retry(retry_strategy()) - .sleep(tokio::time::sleep) - .context((client.clone(), build_results)) - .notify(|err: &tonic::Status, dur: core::time::Duration| { - tracing::error!("Failed to submit build success info: err={err}, retrying in={dur:?}"); - }) + submit_build_result( + &client, + build_results, + "Failed to submit build success info", + ) .await - .1 - .map_err(|e| { - tracing::error!("Failed to submit build success info. Will fail build: err={e}"); - JobFailure::PostProcessing(e.into()) - })?; + .map_err(|e| JobFailure::Upload(UploadError::from(e)))?; Ok(()) } @@ -728,7 +797,7 @@ impl State { build_id: &str, machine_id: &str, presigned_url_opts: Option, - ) -> anyhow::Result<()> { + ) -> Result<(), UploadError> { if let Some(opts) = presigned_url_opts { upload_nars_presigned( self.client.clone(), @@ -751,7 +820,7 @@ async fn is_path_missing( pool: &harmonia_store_remote::ConnectionPool, gcroot: &Gcroot, path: StorePath, -) -> anyhow::Result> { +) -> Result, harmonia_protocol::types::DaemonError> { if daemon_client_utils::is_valid_path(pool, &path).await? { add_gc_root(&gcroot.root, pool.store_dir(), &path); Ok(None) @@ -766,7 +835,7 @@ async fn filter_missing( gcroot: &Gcroot, paths: Vec, concurrency: usize, -) -> anyhow::Result> { +) -> Result, harmonia_protocol::types::DaemonError> { use futures::StreamExt as _; futures::StreamExt::map(tokio_stream::iter(paths), |p| { is_path_missing(pool, gcroot, p) @@ -775,7 +844,7 @@ async fn filter_missing( .collect::>() .await .into_iter() - .collect::>>() + .collect::, _>>() .map(|v| v.into_iter().flatten().collect()) } @@ -796,13 +865,24 @@ fn add_gc_root( async fn substitute_paths( pool: &harmonia_store_remote::ConnectionPool, paths: &[StorePath], -) -> anyhow::Result<()> { +) -> Result<(), harmonia_protocol::types::DaemonError> { for p in paths { daemon_client_utils::ensure_path(pool, p).await?; } Ok(()) } +/// Errors from importing paths from the queue-runner. +#[derive(Debug, thiserror::Error)] +pub enum ImportError { + #[error(transparent)] + Store(#[from] harmonia_protocol::types::DaemonError), + #[error(transparent)] + Grpc(#[from] tonic::Status), + #[error(transparent)] + Transfer(#[from] store_transfer::Error), +} + #[tracing::instrument(skip(client, pool, metrics), fields(%gcroot), err)] async fn import_paths( mut client: BuilderClient, @@ -812,7 +892,7 @@ async fn import_paths( paths: Vec, filter: bool, use_substitutes: bool, -) -> anyhow::Result<()> { +) -> Result<(), ImportError> { let paths = if filter { filter_missing(&pool, gcroot, paths, 10).await? } else { @@ -876,7 +956,7 @@ async fn import_requisites>( requisites: T, max_concurrent_downloads: usize, use_substitutes: bool, -) -> anyhow::Result<()> { +) -> Result<(), ImportError> { let requisites = filter_missing(&pool, gcroot, requisites.into_iter().collect(), 50).await?; let (input_drvs, input_srcs): (Vec<_>, Vec<_>) = @@ -911,19 +991,46 @@ async fn import_requisites>( Ok(()) } +/// Errors from uploading NARs to a binary cache. +#[derive(Debug, thiserror::Error)] +pub enum UploadError { + #[error(transparent)] + Store(#[from] harmonia_protocol::types::DaemonError), + #[error(transparent)] + Cache(#[from] binary_cache::CacheError), + #[error(transparent)] + Grpc(#[from] tonic::Status), + #[error(transparent)] + Transfer(#[from] store_transfer::Error), + #[error(transparent)] + Join(#[from] tokio::task::JoinError), + #[error(transparent)] + Io(#[from] std::io::Error), + #[error(transparent)] + StorePath(#[from] ParseStorePathError), + #[error("mismatch between paths_to_upload ({upload}) and paths_with_narhash ({narhash})")] + NarHashMismatch { upload: usize, narhash: usize }, + #[error("mismatch between requested NARs ({requested}) and presigned URLs ({received})")] + PresignedUrlMismatch { requested: usize, received: usize }, + #[error("nar_upload information is missing")] + MissingNarUpload, + #[error("path not found in store: {0}")] + PathNotFound(StorePath), + #[error(transparent)] + Builder(#[from] BuilderError), +} + #[tracing::instrument(skip(client, pool, metrics), err)] async fn upload_nars_regular( mut client: BuilderClient, pool: harmonia_store_remote::ConnectionPool, metrics: Arc, nars: Vec, -) -> anyhow::Result<()> { +) -> Result<(), UploadError> { // Compute full closure by walking references via daemon protocol. // query_closure returns ValidPathInfos in dependency order with // path infos already populated, so we don't need to re-query. - let closure = daemon_client_utils::query_closure(&pool, &nars) - .await - .map_err(|e| anyhow::anyhow!("failed to compute closure: {e}"))?; + let closure = daemon_client_utils::query_closure(&pool, &nars).await?; // Filter out paths the queue-runner already has. let closure = { @@ -978,7 +1085,7 @@ async fn upload_nars_regular( tokio_stream::wrappers::UnboundedReceiverStream::new(rx), Result::ok, )) - .map_err(Into::::into); + .map_err(UploadError::from); let (upload_result, sender_result) = futures::future::join(upload, sender).await; upload_result?; @@ -1002,7 +1109,7 @@ async fn upload_nars_presigned( opts: hydra_proto::PresignedUploadOpts, build_id: &str, machine_id: &str, -) -> anyhow::Result<()> { +) -> Result<(), UploadError> { use futures::stream::StreamExt as _; tracing::info!("Start uploading paths using presigned urls"); @@ -1039,7 +1146,7 @@ async fn upload_nars_presigned( let Some(narhash) = path_infos.get(&path).map(|i| i.nar_hash) else { return Ok(None); }; - Ok::<_, anyhow::Error>(Some((path, narhash, debug_info_ids))) + Ok::<_, UploadError>(Some((path, narhash, debug_info_ids))) } }) .buffered(10); @@ -1051,11 +1158,10 @@ async fn upload_nars_presigned( } if nars.len() != paths_to_upload.len() { - return Err(anyhow::anyhow!( - "Mismatch between paths_to_upload ({}) and paths_with_narhash ({})", - paths_to_upload.len(), - nars.len(), - )); + return Err(UploadError::NarHashMismatch { + upload: paths_to_upload.len(), + narhash: nars.len(), + }); } let presigned_responses = client @@ -1063,18 +1169,16 @@ async fn upload_nars_presigned( .await?; if presigned_responses.len() != paths_to_upload.len() { - return Err(anyhow::anyhow!( - "Mismatch between requested NARs ({}) and presigned URLs ({})", - paths_to_upload.len(), - presigned_responses.len() - )); + return Err(UploadError::PresignedUrlMismatch { + requested: paths_to_upload.len(), + received: presigned_responses.len(), + }); } for presigned_response in presigned_responses { upload_single_nar_presigned( &pool, - &StorePath::from_base_path(&presigned_response.store_path) - .map_err(|e| anyhow::anyhow!("invalid store path in presigned response: {e}"))?, + &StorePath::from_base_path(&presigned_response.store_path)?, build_id, machine_id, &presigned_response, @@ -1100,18 +1204,18 @@ async fn upload_single_nar_presigned( presigned_response: &hydra_proto::PresignedNarResponse, client: &mut BuilderClient, upload_client: &PresignedUploadClient, -) -> anyhow::Result<()> { +) -> Result<(), UploadError> { // Presigned upload requires constructing NarInfo from daemon path info. let narinfo: binary_cache::NarInfo = { let info = daemon_client_utils::query_path_info(pool, nar_path) .await? - .ok_or_else(|| anyhow::anyhow!("path not found: {nar_path}"))?; + .ok_or_else(|| UploadError::PathNotFound(nar_path.clone()))?; binary_cache::narinfo_simple(nar_path, info, Compression::None) }; let nar_upload = presigned_response .nar_upload .as_ref() - .ok_or_else(|| anyhow::anyhow!("nar_upload information is missing"))?; + .ok_or(UploadError::MissingNarUpload)?; let presigned_request = binary_cache::PresignedUploadResponse { nar_url: presigned_response.nar_url.clone(), @@ -1169,6 +1273,17 @@ async fn upload_single_nar_presigned( Ok(()) } +/// Errors from constructing the success result message. +#[derive(Debug, thiserror::Error)] +pub enum ResultInfoError { + #[error(transparent)] + Store(#[from] harmonia_protocol::types::DaemonError), + #[error(transparent)] + Io(#[from] std::io::Error), + #[error(transparent)] + IntConversion(#[from] std::num::TryFromIntError), +} + #[tracing::instrument(skip(pool, output_infos), fields(%drv), ret(level = tracing::Level::DEBUG), err)] async fn new_success_build_result_info( pool: harmonia_store_remote::ConnectionPool, @@ -1177,7 +1292,7 @@ async fn new_success_build_result_info( output_infos: &BTreeMap, timings: BuildTimings, build_id: String, -) -> anyhow::Result { +) -> Result { let outputs: BTreeMap<_, _> = output_infos .iter() .map(|(name, vpi)| (name.clone(), vpi.path.clone())) diff --git a/subprojects/hydra-builder/src/system.rs b/subprojects/hydra-builder/src/system.rs index d15b8e1a7..6bbed6606 100644 --- a/subprojects/hydra-builder/src/system.rs +++ b/subprojects/hydra-builder/src/system.rs @@ -1,6 +1,19 @@ use hashbrown::HashMap; use procfs_core::FromRead as _; +/// Errors from reading system information. +#[derive(Debug, thiserror::Error)] +pub enum SystemInfoError { + #[error(transparent)] + Proc(#[from] procfs_core::ProcError), + + #[error(transparent)] + Io(#[from] std::io::Error), + + #[error(transparent)] + Nix(#[from] nix::errno::Errno), +} + #[derive(Debug, Clone, Copy)] pub struct BaseSystemInfo { pub cpu_count: usize, @@ -11,7 +24,7 @@ pub struct BaseSystemInfo { impl BaseSystemInfo { #[cfg(target_os = "linux")] #[tracing::instrument(err)] - pub fn new() -> anyhow::Result { + pub fn new() -> Result { let cpuinfo = procfs_core::CpuInfo::from_file("/proc/cpuinfo")?; let meminfo = procfs_core::Meminfo::from_file("/proc/meminfo")?; let bogomips = cpuinfo @@ -29,7 +42,7 @@ impl BaseSystemInfo { #[cfg(target_os = "macos")] #[tracing::instrument(err)] - pub fn new() -> anyhow::Result { + pub fn new() -> Result { let mut sys = sysinfo::System::new_all(); sys.refresh_memory(); sys.refresh_cpu_all(); @@ -143,7 +156,7 @@ pub struct SystemLoad { } #[tracing::instrument(err)] -pub fn get_mount_free_percent(dest: &str) -> anyhow::Result { +pub fn get_mount_free_percent(dest: &str) -> Result { let stat = nix::sys::statvfs::statvfs(dest)?; let total_bytes = (stat.blocks() as u64) * stat.block_size(); @@ -155,7 +168,7 @@ pub fn get_mount_free_percent(dest: &str) -> anyhow::Result { impl SystemLoad { #[cfg(target_os = "linux")] #[tracing::instrument(err)] - pub fn new(build_dir: &str, store_dir: &str) -> anyhow::Result { + pub fn new(build_dir: &str, store_dir: &str) -> Result { let meminfo = procfs_core::Meminfo::from_file("/proc/meminfo")?; let load = procfs_core::LoadAverage::from_file("/proc/loadavg")?; @@ -172,7 +185,7 @@ impl SystemLoad { #[cfg(target_os = "macos")] #[tracing::instrument(err)] - pub fn new(build_dir: &str, store_dir: &str) -> anyhow::Result { + pub fn new(build_dir: &str, store_dir: &str) -> Result { let mut sys = sysinfo::System::new_all(); sys.refresh_memory(); let load = sysinfo::System::load_average(); diff --git a/subprojects/hydra-builder/src/utils.rs b/subprojects/hydra-builder/src/utils.rs index 87e50d267..ef3c18c79 100644 --- a/subprojects/hydra-builder/src/utils.rs +++ b/subprojects/hydra-builder/src/utils.rs @@ -48,8 +48,10 @@ pub(crate) fn compressed_log_stream( data: bytes.into(), }, Err(e) => { - tracing::error!("Failed to compress log chunk: {e}"); - break; + // Zstd encoding of in-memory data should never + // fail. The only input is the duplex pipe reader + // which returns EOF when the writer is dropped. + panic!("Failed to compress log chunk: {e}"); } } } diff --git a/subprojects/hydra-manual/src/architecture.md b/subprojects/hydra-manual/src/architecture.md index 9da091af6..ce87f3354 100644 --- a/subprojects/hydra-manual/src/architecture.md +++ b/subprojects/hydra-manual/src/architecture.md @@ -67,10 +67,12 @@ graph BT hydra-proto --> nix-support store-transfer --> hydra-proto hydra-builder --> binary-cache + hydra-builder --> error-context hydra-builder --> hydra-tracing hydra-builder --> store-transfer hydra-queue-runner --> binary-cache hydra-queue-runner --> db + hydra-queue-runner --> error-context hydra-queue-runner --> hydra-tracing hydra-queue-runner --> store-transfer binary-cache -.-> hydra-tracing diff --git a/subprojects/hydra-queue-runner/Cargo.toml b/subprojects/hydra-queue-runner/Cargo.toml index b7be2b7b0..177d22d04 100644 --- a/subprojects/hydra-queue-runner/Cargo.toml +++ b/subprojects/hydra-queue-runner/Cargo.toml @@ -18,10 +18,10 @@ serde_json.workspace = true smallvec = { workspace = true, features = [ "serde" ] } toml.workspace = true -anyhow.workspace = true atomic_float.workspace = true backon.workspace = true clap = { workspace = true, features = [ "derive", "env" ] } +color-eyre.workspace = true fs-err = { workspace = true, features = [ "tokio" ] } thiserror.workspace = true uuid = { workspace = true, features = [ "v4", "serde" ] } diff --git a/subprojects/hydra-queue-runner/examples/collect-fods.rs b/subprojects/hydra-queue-runner/examples/collect-fods.rs index 8c3ddc28b..86ba228c9 100644 --- a/subprojects/hydra-queue-runner/examples/collect-fods.rs +++ b/subprojects/hydra-queue-runner/examples/collect-fods.rs @@ -2,11 +2,12 @@ use harmonia_store_derivation::derivation::Derivation; use harmonia_store_path::StorePath; #[tokio::main] -async fn main() -> anyhow::Result<()> { +async fn main() -> color_eyre::eyre::Result<()> { let p: StorePath = "dzgpbp0vp7lj7lgj26rjgmnjicq2wf4k-hello-2.12.2.drv".parse()?; let (tx, mut rx) = tokio::sync::mpsc::channel::<()>(4); - let nix_config = daemon_client_utils::parse_nix_remote().map_err(|e| anyhow::anyhow!(e))?; + let nix_config = + daemon_client_utils::parse_nix_remote().map_err(|e| color_eyre::eyre::eyre!(e))?; let pool = harmonia_store_remote::ConnectionPool::new( &nix_config.socket, harmonia_store_remote::PoolConfig::default(), diff --git a/subprojects/hydra-queue-runner/src/config.rs b/subprojects/hydra-queue-runner/src/config.rs index 017a9516c..065eba884 100644 --- a/subprojects/hydra-queue-runner/src/config.rs +++ b/subprojects/hydra-queue-runner/src/config.rs @@ -1,8 +1,16 @@ use std::{net::SocketAddr, sync::Arc}; -use anyhow::Context as _; use clap::Parser; +#[derive(Debug, thiserror::Error)] +pub enum ConfigError { + #[error("missing option: {0}")] + MissingOption(&'static str), + + #[error(transparent)] + Io(#[from] std::io::Error), +} + #[derive(Debug, Clone)] pub enum BindSocket { Tcp(SocketAddr), @@ -86,20 +94,20 @@ impl Cli { #[tracing::instrument(skip(self), err)] pub async fn get_mtls( &self, - ) -> anyhow::Result<(tonic::transport::Certificate, tonic::transport::Identity)> { + ) -> Result<(tonic::transport::Certificate, tonic::transport::Identity), ConfigError> { let server_cert_path = self .server_cert_path .as_deref() - .ok_or_else(|| anyhow::anyhow!("server_cert_path not provided"))?; + .ok_or(ConfigError::MissingOption("server_cert_path"))?; let server_key_path = self .server_key_path .as_deref() - .ok_or_else(|| anyhow::anyhow!("server_key_path not provided"))?; + .ok_or(ConfigError::MissingOption("server_key_path"))?; let client_ca_cert_path = self .client_ca_cert_path .as_deref() - .ok_or_else(|| anyhow::anyhow!("client_ca_cert_path not provided"))?; + .ok_or(ConfigError::MissingOption("client_ca_cert_path"))?; let client_ca_cert = fs_err::tokio::read_to_string(client_ca_cert_path).await?; let client_ca_cert = tonic::transport::Certificate::from_pem(client_ca_cert); @@ -287,8 +295,20 @@ pub struct PreparedApp { pub forced_substituters: Vec, } +#[derive(Debug, thiserror::Error)] +pub enum PrepareConfigError { + #[error("environment variable `{0}` is missing")] + MissingEnvVar(&'static str), + + #[error(transparent)] + Io(#[from] std::io::Error), + + #[error("failed to parse NIX_REMOTE: {0}")] + NixRemote(String), +} + impl TryFrom for PreparedApp { - type Error = anyhow::Error; + type Error = PrepareConfigError; fn try_from(val: AppConfig) -> Result { let remote_store_addr = val @@ -302,8 +322,10 @@ impl TryFrom for PreparedApp { }) .collect(); - let logname = std::env::var("LOGNAME").context("LOGNAME env var missing")?; - let nix_remote = daemon_client_utils::parse_nix_remote().map_err(|e| anyhow::anyhow!(e))?; + let logname = + std::env::var("LOGNAME").map_err(|_| PrepareConfigError::MissingEnvVar("LOGNAME"))?; + let nix_remote = + daemon_client_utils::parse_nix_remote().map_err(PrepareConfigError::NixRemote)?; let roots_dir = val.roots_dir.map_or_else( || { nix_remote @@ -386,20 +408,28 @@ impl TryFrom for PreparedApp { } } +#[derive(Debug, thiserror::Error)] +pub enum LoadConfigError { + #[error(transparent)] + Toml(#[from] toml::de::Error), + + #[error(transparent)] + Prepare(#[from] PrepareConfigError), +} + /// Loads the config from specified path #[tracing::instrument(err)] -fn load_config(filepath: &str) -> anyhow::Result { +fn load_config(filepath: &str) -> Result { tracing::info!("Trying to loading file: {filepath}"); let toml: AppConfig = if let Ok(content) = fs_err::read_to_string(filepath) { - toml::from_str(&content) - .with_context(|| format!("Failed to toml load from '{filepath}'"))? + toml::from_str(&content)? } else { tracing::warn!("no config file found! Using default config"); - toml::from_str("").context("Failed to parse empty string as config")? + toml::from_str("")? }; tracing::info!("Loaded config: {toml:?}"); - toml.try_into().context("Failed to prepare configuration") + Ok(toml.try_into()?) } #[derive(Debug, Clone)] @@ -409,7 +439,7 @@ pub struct App { impl App { #[tracing::instrument(err)] - pub fn init(filepath: &str) -> anyhow::Result { + pub fn init(filepath: &str) -> Result { Ok(Self { inner: Arc::new(arc_swap::ArcSwap::from(Arc::new(load_config(filepath)?))), }) @@ -565,6 +595,8 @@ pub async fn reload(current_config: &App, filepath: &str, state: &Arc c, Err(e) => { + // Non-fatal: `LoadConfigError` is toml parse / config + // validation — no DB. Old config remains in effect. tracing::warn!("Failed to load new config: {e}"); let _notify = sd_notify::notify(&[ sd_notify::NotifyState::Status("Reload failed"), @@ -576,6 +608,9 @@ pub async fn reload(current_config: &App, filepath: &str, state: &Arc anyhow::Result { + fn new(cgroups_path: &std::path::Path) -> Result { Ok(Self { current_bytes: fs_err::read_to_string(cgroups_path.join("memory.current"))? .trim() - .parse() - .context("memory current parsing failed")?, + .parse()?, peak_bytes: fs_err::read_to_string(cgroups_path.join("memory.peak"))? .trim() - .parse() - .context("memory peak parsing failed")?, + .parse()?, swap_current_bytes: fs_err::read_to_string(cgroups_path.join("memory.swap.current"))? .trim() - .parse() - .context("swap parsing failed")?, + .parse()?, zswap_current_bytes: fs_err::read_to_string(cgroups_path.join("memory.zswap.current"))? .trim() - .parse() - .context("zswap parsing failed")?, + .parse()?, }) } } @@ -67,7 +79,7 @@ pub struct IoStats { impl IoStats { #[tracing::instrument(err)] - fn new(cgroups_path: &std::path::Path) -> anyhow::Result { + fn new(cgroups_path: &std::path::Path) -> Result { let mut total_read_bytes: u64 = 0; let mut total_write_bytes: u64 = 0; @@ -108,7 +120,7 @@ pub struct CpuStats { impl CpuStats { #[tracing::instrument(err)] - fn new(cgroups_path: &std::path::Path) -> anyhow::Result { + fn new(cgroups_path: &std::path::Path) -> Result { let contents = fs_err::read_to_string(cgroups_path.join("cpu.stat"))?; let mut usage_usec: u128 = 0; @@ -154,18 +166,18 @@ pub struct CgroupStats { impl CgroupStats { #[tracing::instrument(err)] - fn new(me: &procfs::process::Process) -> anyhow::Result { + fn new(me: &procfs::process::Process) -> Result { let cgroups_pathname = format!( "/sys/fs/cgroup/{}", me.cgroups()? .0 .first() - .ok_or_else(|| anyhow::anyhow!("cgroup information is missing in process."))? + .ok_or(CgroupError::NoCgroup)? .pathname ); let cgroups_path = std::path::Path::new(&cgroups_pathname); if !cgroups_path.exists() { - return Err(anyhow::anyhow!("cgroups directory does not exists.")); + return Err(CgroupError::NoCgroupDir); } Ok(Self { @@ -202,6 +214,8 @@ impl Process { cgroup: match CgroupStats::new(&me) { Ok(v) => Some(v), Err(e) => { + // Non-fatal: `CgroupError` is procfs/IO/parse — no DB. + // Cgroup info is optional for metrics. tracing::error!("failed to cgroups stats: {e}"); None } diff --git a/subprojects/hydra-queue-runner/src/main.rs b/subprojects/hydra-queue-runner/src/main.rs index 3da421d42..5de1ee36f 100644 --- a/subprojects/hydra-queue-runner/src/main.rs +++ b/subprojects/hydra-queue-runner/src/main.rs @@ -23,31 +23,76 @@ pub mod utils; use std::future::Future; -use anyhow::Context as _; - use state::State; +#[derive(Debug, thiserror::Error)] +enum MainError { + #[error(transparent)] + State(#[from] state::StateError), + #[error(transparent)] + Config(#[from] config::ConfigError), + #[error(transparent)] + LoadConfig(#[from] config::LoadConfigError), + #[error(transparent)] + Db(#[from] db::Error), + #[error(transparent)] + Io(#[from] std::io::Error), + #[error(transparent)] + Server(#[from] server::grpc::ServerError), + #[error(transparent)] + Join(#[from] tokio::task::JoinError), + #[error("another instance is already running (lock file: {path})")] + LockFile { + path: String, + #[source] + source: std::io::Error, + }, + #[error("tracing init failed: {0}")] + Tracing(String), + #[error("no listenfd TCP listener at index {0}")] + NoListenFd(usize), + #[error("HTTP server does not support Unix sockets")] + HttpUnixUnsupported, + #[error("server error: {0}")] + ServerTask(String), +} + +type GrpcServer = + std::pin::Pin> + Send>>; + #[cfg(not(target_env = "msvc"))] #[global_allocator] static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc; -fn start_task_loops(state: &std::sync::Arc) -> Vec { +struct TaskLoops { + /// Background tasks that only need to be aborted on shutdown. + abort_only: Vec, + /// Critical tasks whose errors should halt the queue-runner. + critical: Vec>>, +} + +fn start_task_loops(state: &std::sync::Arc) -> TaskLoops { tracing::info!("QueueRunner starting task loops"); - let mut service_list = vec![ + let mut abort_only = vec![ spawn_config_reloader(state.clone(), state.config.clone(), &state.cli.config_path), - state.clone().start_dispatch_loop(), state.clone().start_uploader_queue(), ]; + + let mut critical = vec![state.clone().start_dispatch_loop()]; + if !state.cli.disable_queue_monitor_loop { - service_list.push(state.clone().start_queue_monitor_loop()); + critical.push(state.clone().start_queue_monitor_loop()); } if let Some(fod_checker) = &state.fod_checker { - service_list.push(fod_checker.clone().start_traverse_loop()); + abort_only.push(fod_checker.clone().start_traverse_loop()); } - service_list + TaskLoops { + abort_only, + critical, + } } fn spawn_config_reloader( @@ -65,6 +110,8 @@ fn spawn_config_reloader( config::reload(¤t_config, &filepath, &state).await; } Err(e) => { + // Non-fatal: `io::Error` — signal handling infra, no DB. + // Config reloads stop working but queue-runner continues. tracing::error!("Failed to create signal listener for SIGHUP: {e}"); break; } @@ -76,8 +123,10 @@ fn spawn_config_reloader( #[tokio::main] #[allow(clippy::too_many_lines)] -async fn main() -> anyhow::Result<()> { - let _tracing_guard = hydra_tracing::init().map_err(|e| anyhow::anyhow!("{e}"))?; +async fn main() -> Result<(), MainError> { + color_eyre::install().map_err(|e| MainError::Tracing(e.to_string()))?; + let _tracing_guard = + hydra_tracing::init().map_err(|e| MainError::Tracing(e.to_string()))?; #[cfg(debug_assertions)] { @@ -93,19 +142,22 @@ async fn main() -> anyhow::Result<()> { let state = State::new().await?; let lockfile_path = state.config.get_lockfile(); - let _lock = lock_file::LockFile::acquire(&lockfile_path) - .context("Another instance is already running.")?; + let _lock = + lock_file::LockFile::acquire(&lockfile_path).map_err(|source| MainError::LockFile { + path: lockfile_path.display().to_string(), + source, + })?; state.clear_busy().await?; // clear busy once before starting the queue-runner if !state.cli.mtls_configured_correctly() { - tracing::error!( - "mtls configured inproperly, please pass all options: server_cert_path, server_key_path and client_ca_cert_path!" - ); - return Err(anyhow::anyhow!("Configuration issue")); + return Err(config::ConfigError::MissingOption( + "server_cert_path, server_key_path and client_ca_cert_path", + ) + .into()); } - let task_abort_handles = start_task_loops(&state); + let tasks = start_task_loops(&state); // Resolve listeners for both servers. When using socket activation // (ListenFd), we use LISTEN_FDNAMES to map names to fd indices. @@ -120,22 +172,19 @@ async fn main() -> anyhow::Result<()> { config::BindSocket::Tcp(s) => tokio::net::TcpListener::bind(s).await?, config::BindSocket::ListenFd => { let idx = fd_names.iter().position(|n| n == "rest").unwrap_or(0); - let std_listener = listenfd.take_tcp_listener(idx)?.ok_or_else(|| { - anyhow::anyhow!("No listenfd TCP listener at index {idx} for REST") - })?; + let std_listener = listenfd + .take_tcp_listener(idx)? + .ok_or(MainError::NoListenFd(idx))?; std_listener.set_nonblocking(true)?; tokio::net::TcpListener::from_std(std_listener)? } config::BindSocket::Unix(_) => { - anyhow::bail!("HTTP server does not support Unix sockets"); + return Err(MainError::HttpUnixUnsupported); } }; let http_addr = http_listener.local_addr()?; - let (srv1, grpc_info): ( - std::pin::Pin> + Send>>, - String, - ) = match &state.cli.grpc_bind { + let (srv1, grpc_info): (GrpcServer, String) = match &state.cli.grpc_bind { config::BindSocket::Tcp(s) => { let listener = tokio::net::TcpListener::bind(s).await?; let addr = listener.local_addr()?; @@ -147,9 +196,9 @@ async fn main() -> anyhow::Result<()> { } config::BindSocket::ListenFd => { let idx = fd_names.iter().position(|n| n == "grpc").unwrap_or(1); - let std_listener = listenfd.take_tcp_listener(idx)?.ok_or_else(|| { - anyhow::anyhow!("No listenfd TCP listener at index {idx} for gRPC") - })?; + let std_listener = listenfd + .take_tcp_listener(idx)? + .ok_or(MainError::NoListenFd(idx))?; let addr = std_listener.local_addr()?; let info = addr.to_string(); std_listener.set_nonblocking(true)?; @@ -180,11 +229,9 @@ async fn main() -> anyhow::Result<()> { let task = tokio::spawn(async move { match futures_util::future::join(srv1, srv2).await { (Ok(()), Ok(())) => Ok(()), - (Ok(()), Err(e)) => Err(anyhow::anyhow!("hyper error while awaiting handle: {e}")), - (Err(e), Ok(())) => Err(anyhow::anyhow!("tonic error while awaiting handle: {e}")), - (Err(e1), Err(e2)) => Err(anyhow::anyhow!( - "tonic and hyper error while awaiting handle: {e1} | {e2}" - )), + (Ok(()), Err(e)) => Err(format!("HTTP server error: {e}")), + (Err(e), Ok(())) => Err(format!("gRPC server error: {e}")), + (Err(e1), Err(e2)) => Err(format!("gRPC and HTTP server errors: {e1} | {e2}")), } }); @@ -196,35 +243,37 @@ async fn main() -> anyhow::Result<()> { let mut sigint = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt())?; let mut sigterm = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())?; - let abort_handle = task.abort_handle(); + let server_handle = task.abort_handle(); + let mut critical_tasks = + futures_util::future::try_join_all(tasks.critical.into_iter().map(|h| async { h.await? })); + tokio::select! { _ = sigint.recv() => { tracing::info!("Received sigint - shutting down gracefully"); - let _ = sd_notify::notify(&[sd_notify::NotifyState::Stopping]); - abort_handle.abort(); - for h in task_abort_handles { - h.abort(); - } - // removing all machines will also mark all currently running jobs as cancelled - state.remove_all_machines().await; - let _ = state.clear_busy().await; - Ok(()) } _ = sigterm.recv() => { tracing::info!("Received sigterm - shutting down gracefully"); - let _ = sd_notify::notify(&[sd_notify::NotifyState::Stopping]); - abort_handle.abort(); - for h in task_abort_handles { - h.abort(); - } - // removing all machines will also mark all currently running jobs as cancelled - state.remove_all_machines().await; - let _ = state.clear_busy().await; - Ok(()) } r = task => { - r??; - Ok(()) + if let Err(e) = r? { + return Err(MainError::ServerTask(e)); + } + return Ok(()); + } + r = &mut critical_tasks => { + r?; + return Ok(()); } } + + // Shutdown path (signal received) + let _ = sd_notify::notify(&[sd_notify::NotifyState::Stopping]); + server_handle.abort(); + for h in tasks.abort_only { + h.abort(); + } + // removing all machines will also mark all currently running jobs as cancelled + state.remove_all_machines().await?; + let _ = state.clear_busy().await; + Ok(()) } diff --git a/subprojects/hydra-queue-runner/src/server/grpc.rs b/subprojects/hydra-queue-runner/src/server/grpc.rs index 85c9f64f1..ac49ebdbd 100644 --- a/subprojects/hydra-queue-runner/src/server/grpc.rs +++ b/subprojects/hydra-queue-runner/src/server/grpc.rs @@ -1,7 +1,17 @@ use std::sync::Arc; -use anyhow::Context as _; use harmonia_store_path::StorePath; + +/// Errors from gRPC server setup and serving. +#[derive(Debug, thiserror::Error)] +pub enum ServerError { + #[error("gRPC transport error")] + Transport(#[from] tonic::transport::Error), + #[error("server configuration error")] + Config(#[from] crate::config::ConfigError), + #[error("reflection service: {0}")] + Reflection(#[from] tonic_reflection::server::Error), +} use tokio::sync::mpsc; use tonic::service::interceptor::InterceptedService; use tower::ServiceBuilder; @@ -17,6 +27,44 @@ use hydra_proto::{ }; type BuilderResult = Result, tonic::Status>; + +fn parse_uuid(field: &str, value: &str) -> Result { + uuid::Uuid::parse_str(value) + .map_err(|e| tonic::Status::invalid_argument(format!("{field} is not a valid uuid: {e}"))) +} + +/// Spawn a task that panics on error. Use for fire-and-forget +/// operations where failure indicates an infrastructure problem +/// (e.g. DB down) that should bring down the queue-runner. +impl State { + /// Handle a builder sending invalid data: log the error and + /// disconnect the machine. Returns `Ok(None)` if the builder was + /// rejected, propagates DB errors from `remove_machine`. + async fn reject_builder_on_error( + &self, + machine_id: uuid::Uuid, + result: Result, + ) -> Result, crate::state::StateError> { + match result { + Ok(v) => Ok(Some(v)), + Err(e) => { + tracing::error!("rejecting builder {machine_id}, disconnecting: {e}"); + self.remove_machine(machine_id).await?; + Ok(None) + } + } + } +} + +fn spawn_fatal( + fut: impl Future> + Send + 'static, +) { + tokio::spawn(async move { + if let Err(e) = fut.await { + panic!("fatal error in spawned task: {e:#}"); + } + }); +} type OpenTunnelResponseStream = std::pin::Pin> + Send>>; type FetchPathsResponseStream = std::pin::Pin< @@ -101,7 +149,10 @@ pub struct Server { impl Server { /// Serve on a pre-bound TCP listener. #[tracing::instrument(skip(listener, state), err)] - pub async fn run(listener: tokio::net::TcpListener, state: Arc) -> anyhow::Result<()> { + pub async fn run( + listener: tokio::net::TcpListener, + state: Arc, + ) -> Result<(), ServerError> { let stream = tokio_stream::wrappers::TcpListenerStream::new(listener); Self::serve_incoming(stream, state).await } @@ -111,12 +162,12 @@ impl Server { pub async fn run_unix( listener: tokio::net::UnixListener, state: Arc, - ) -> anyhow::Result<()> { + ) -> Result<(), ServerError> { let stream = tokio_stream::wrappers::UnixListenerStream::new(listener); Self::serve_incoming(stream, state).await } - async fn serve_incoming(incoming: S, state: Arc) -> anyhow::Result<()> + async fn serve_incoming(incoming: S, state: Arc) -> Result<(), ServerError> where S: futures_util::Stream>, IO: tokio::io::AsyncRead @@ -125,6 +176,8 @@ impl Server { + Unpin + Send + 'static, + // Required by tonic's `serve_with_incoming` API. We cannot replace this bound with + // something else. IE: Into>, { let service = RunnerServiceServer::new(Self { @@ -153,11 +206,7 @@ impl Server { if state.cli.mtls_enabled() { tracing::info!("Using mtls"); - let (client_ca_cert, server_identity) = state - .cli - .get_mtls() - .await - .context("Failed to get_mtls Certificate and Identity")?; + let (client_ca_cert, server_identity) = state.cli.get_mtls().await?; let tls = tonic::transport::ServerTlsConfig::new() .identity(server_identity) @@ -241,20 +290,18 @@ impl RunnerService for Server { let forced_substituters = self.state.config.get_forced_substituters(); let machine = match stream.next().await { Some(Ok(m)) => match m.message { - Some(builder_request::Message::Join(v)) => { - match Machine::new(v, input_tx, use_presigned_uploads, &forced_substituters) { - Ok(m) => Some(m), - Err(e) => { - tracing::error!("Rejecting new machine creation: {e}"); - return Err(tonic::Status::invalid_argument("Machine is not valid")); - } - } - } + Some(builder_request::Message::Join(v)) => Some( + Machine::new(v, input_tx, use_presigned_uploads, &forced_substituters) + .map_err(|e| { + tonic::Status::invalid_argument(format!("invalid machine: {e}")) + })?, + ), _ => None, }, Some(Err(e)) => { - tracing::error!("Bad message in stream: {e}"); - None + return Err(tonic::Status::invalid_argument(format!( + "bad first message: {e}" + ))); } _ => None, }; @@ -267,7 +314,7 @@ impl RunnerService for Server { tracing::info!("Registered new machine: machine_id={machine_id} machine={machine}",); let (output_tx, output_rx) = mpsc::channel(128); - if let Err(e) = output_tx + output_tx .send(Ok(RunnerRequest { message: Some(hydra_proto::runner_request::Message::Join(JoinResponse { machine_id: machine_id.to_string(), @@ -275,14 +322,11 @@ impl RunnerService for Server { })), })) .await - { - tracing::error!("Failed to send join response machine_id={machine_id} e={e}"); - return Err(tonic::Status::internal("Failed to send join Response.")); - } + .map_err(|e| tonic::Status::internal(format!("failed to send join response: {e}")))?; let mut ping_interval = tokio::time::interval(std::time::Duration::from_secs(BACKWARDS_PING_INTERVAL)); - tokio::spawn(async move { + spawn_fatal(async move { loop { tokio::select! { _ = ping_interval.tick() => { @@ -292,49 +336,44 @@ impl RunnerService for Server { })) }; if let Err(e) = output_tx.send(Ok(msg)).await { - tracing::error!("Failed to send message to machine={machine_id} e={e}"); - state.remove_machine(machine_id).await; + // Non-fatal: `mpsc::SendError` — channel + // closed, no DB. Builder disconnected. + tracing::warn!("failed to send ping to machine={machine_id}: {e}"); break } }, msg = input_rx.recv() => { if let Some(msg) = msg { if let Err(e) = output_tx.send(Ok(msg.into_request())).await { - tracing::error!("Failed to send message to machine={machine_id} e={e}"); - state.remove_machine(machine_id).await; + // Non-fatal: same as above. + tracing::warn!("failed to send message to machine={machine_id}: {e}"); break } } else { - state.remove_machine(machine_id).await; break } }, msg = stream.next() => match msg.map(|v| v.map(|v| v.message)) { Some(Ok(Some(msg))) => handle_message(&state, msg), - Some(Ok(None)) => (), // empty meesage can be ignored + Some(Ok(None)) => (), // empty message can be ignored Some(Err(err)) => { if let Some(io_err) = match_for_io_error(&err) && io_err.kind() == std::io::ErrorKind::BrokenPipe { - tracing::error!("client disconnected: broken pipe: machine={machine_id} hostname={}", machine.hostname); - state.remove_machine(machine_id).await; + tracing::warn!("client disconnected: broken pipe: machine={machine_id} hostname={}", machine.hostname); break; } match output_tx.send(Err(err)).await { Ok(()) => (), - Err(_err) => { - state.remove_machine(machine_id).await; - break - } + Err(_err) => break, } }, - None => { - state.remove_machine(machine_id).await; - break - } + None => break, } } } + // Machine disconnected or channel broke — clean up. + state.remove_machine(machine_id).await }); Ok(tonic::Response::new( @@ -412,32 +451,19 @@ impl RunnerService for Server { let state = self.state.clone(); let req = req.into_inner(); - let build_id = uuid::Uuid::parse_str(&req.build_id).map_err(|e| { - tracing::error!("Failed to parse build_id into uuid: {e}"); - tonic::Status::invalid_argument("build_id is not a valid uuid.") - })?; - let machine_id = uuid::Uuid::parse_str(&req.machine_id).map_err(|e| { - tracing::error!("Failed to parse machine_id into uuid: {e}"); - tonic::Status::invalid_argument("machine_id is not a valid uuid.") - })?; + let build_id = parse_uuid("build_id", &req.build_id)?; + let machine_id = parse_uuid("machine_id", &req.machine_id)?; let step_status = db::models::StepStatus::from(req.step_status()); - tokio::spawn({ + spawn_fatal( async move { - if let Err(e) = state - .update_build_step( - build_id, - machine_id, - step_status, - ) + state + .update_build_step(build_id, machine_id, step_status) .await - { - tracing::error!( - "Failed to update build step with build_id={build_id:?} step_status={step_status:?}: {e}" - ); - } - }.in_current_span() - }); + .map_err(|e| format!("update_build_step failed: {e}")) + } + .in_current_span(), + ); Ok(tonic::Response::new(hydra_proto::Empty {})) } @@ -450,51 +476,43 @@ impl RunnerService for Server { let state = self.state.clone(); let req = req.into_inner(); - let build_id = uuid::Uuid::parse_str(&req.build_id).map_err(|e| { - tracing::error!("Failed to parse build_id into uuid: {e}"); - tonic::Status::invalid_argument("build_id is not a valid uuid.") - })?; - let machine_id = uuid::Uuid::parse_str(&req.machine_id).map_err(|e| { - tracing::error!("Failed to parse machine_id into uuid: {e}"); - tonic::Status::invalid_argument("machine_id is not a valid uuid.") - })?; + let build_id = parse_uuid("build_id", &req.build_id)?; + let machine_id = parse_uuid("machine_id", &req.machine_id)?; - tokio::spawn({ + // Spawned so the gRPC response goes back immediately. + spawn_fatal( async move { if req.result_state() == hydra_proto::BuildResultState::Success { - let build_output = match crate::state::BuildOutput::from_grpc(req) { - Ok(output) => output, - Err(e) => { - tracing::error!("Failed to parse build output: {e}"); - return; - } + let Some(build_output) = state + .reject_builder_on_error( + machine_id, + crate::state::BuildOutput::from_grpc(req), + ) + .await? + else { + return Ok::<(), crate::state::StateError>(()); }; - if let Err(e) = state + state .succeed_step_by_uuid(build_id, machine_id, build_output) - .await - { - tracing::error!( - "Failed to mark step with build_id={build_id} as done: {e}" - ); - } - } else if let Err(e) = state - .fail_step_by_uuid( - build_id, - machine_id, - req.result_state().into(), - crate::state::BuildTimings::new( - req.import_time_ms, - req.build_time_ms, - req.upload_time_ms, - ), - ) - .await - { - tracing::error!("Failed to fail step with build_id={build_id}: {e}"); + .await?; + } else { + state + .fail_step_by_uuid( + build_id, + machine_id, + req.result_state().into(), + crate::state::BuildTimings::new( + req.import_time_ms, + req.build_time_ms, + req.upload_time_ms, + ), + ) + .await?; } + Ok(()) } - .in_current_span() - }); + .in_current_span(), + ); Ok(tonic::Response::new(hydra_proto::Empty {})) } @@ -510,10 +528,7 @@ impl RunnerService for Server { let requisites: Vec = daemon_client_utils::query_closure(&state.pool, &paths) .await - .map_err(|e| { - tracing::error!("failed to compute closure e={e}"); - tonic::Status::internal("failed to compute closure.") - })? + .map_err(|e| tonic::Status::internal(format!("failed to compute closure: {e}")))? .into_iter() .map(|vpi| ProtoStorePath(vpi.path)) .collect(); @@ -596,14 +611,8 @@ impl RunnerService for Server { let _state = self.state.clone(); let req = req.into_inner(); - let _build_id = uuid::Uuid::parse_str(&req.build_id).map_err(|e| { - tracing::error!("Failed to parse build_id into uuid: {e}"); - tonic::Status::invalid_argument("build_id is not a valid uuid.") - })?; - let _machine_id = uuid::Uuid::parse_str(&req.machine_id).map_err(|e| { - tracing::error!("Failed to parse machine_id into uuid: {e}"); - tonic::Status::invalid_argument("machine_id is not a valid uuid.") - })?; + let _build_id = parse_uuid("build_id", &req.build_id)?; + let _machine_id = parse_uuid("machine_id", &req.machine_id)?; let remote_store = { let remote_stores = _state.remote_stores.read(); @@ -638,8 +647,9 @@ impl RunnerService for Server { ) .await .map_err(|e| { - tracing::error!("Failed to generate presigned URL for {}: {e}", store_path); - tonic::Status::internal("Failed to generate presigned URL") + tonic::Status::internal(format!( + "failed to generate presigned URL for {store_path}: {e}" + )) })?; responses.push(hydra_proto::PresignedNarResponse { @@ -697,14 +707,8 @@ impl RunnerService for Server { let state = self.state.clone(); let req = req.into_inner(); - let build_id = uuid::Uuid::parse_str(&req.build_id).map_err(|e| { - tracing::error!("Failed to parse build_id into uuid: {e}"); - tonic::Status::invalid_argument("build_id is not a valid uuid.") - })?; - let machine_id = uuid::Uuid::parse_str(&req.machine_id).map_err(|e| { - tracing::error!("Failed to parse machine_id into uuid: {e}"); - tonic::Status::invalid_argument("machine_id is not a valid uuid.") - })?; + let build_id = parse_uuid("build_id", &req.build_id)?; + let machine_id = parse_uuid("machine_id", &req.machine_id)?; let machine = state .machines @@ -753,8 +757,7 @@ impl RunnerService for Server { .upload_narinfo_after_presigned_upload(&self.state.pool, narinfo) .await .map_err(|e| { - tracing::error!("Failed to upload narinfo for {}: {e}", store_path); - tonic::Status::internal("Failed to upload narinfo") + tonic::Status::internal(format!("failed to upload narinfo for {store_path}: {e}")) })?; tracing::debug!( diff --git a/subprojects/hydra-queue-runner/src/server/http.rs b/subprojects/hydra-queue-runner/src/server/http.rs index 55b8ed5aa..849f80ae0 100644 --- a/subprojects/hydra-queue-runner/src/server/http.rs +++ b/subprojects/hydra-queue-runner/src/server/http.rs @@ -22,12 +22,18 @@ pub enum Error { #[error("std io error: `{0}`")] Io(#[from] std::io::Error), - #[error("anyhow error: `{0}`")] - Anyhow(#[from] anyhow::Error), + #[error("prometheus error: `{0}`")] + Prometheus(#[from] prometheus::Error), + + #[error(transparent)] + State(#[from] crate::state::StateError), #[error("db error: `{0}`")] Sqlx(#[from] db::Error), + #[error("invalid store path: {0}")] + StorePath(#[from] harmonia_store_path::ParseStorePathError), + #[error("Not found")] NotFound, @@ -44,9 +50,11 @@ impl Error { | Self::HyperHttp(_) | Self::Hyper(_) | Self::Io(_) - | Self::Anyhow(_) + | Self::Prometheus(_) + | Self::State(_) | Self::Sqlx(_) | Self::Fatal => hyper::StatusCode::INTERNAL_SERVER_ERROR, + Self::StorePath(_) => hyper::StatusCode::BAD_REQUEST, Self::NotFound => hyper::StatusCode::NOT_FOUND, } } @@ -104,6 +112,9 @@ impl Server { .instrument(server_span.clone()) .await { + // Non-fatal: `hyper::Error` — per-connection HTTP + // error, no DB. A bad client shouldn't bring + // down the server. tracing::error!("Error serving connection: {err:?}"); } } @@ -354,10 +365,7 @@ mod handler { let data: io::BuildPayload = serde_json::from_reader(whole_body.reader())?; state - .queue_one_build( - data.jobset_id, - &StorePath::from_base_path(&data.drv).map_err(|e| anyhow::anyhow!("{e}"))?, - ) + .queue_one_build(data.jobset_id, &StorePath::from_base_path(&data.drv)?) .await?; construct_json_ok_response(&io::Empty {}) } diff --git a/subprojects/hydra-queue-runner/src/state/build.rs b/subprojects/hydra-queue-runner/src/state/build.rs index c553067ae..03498b4fb 100644 --- a/subprojects/hydra-queue-runner/src/state/build.rs +++ b/subprojects/hydra-queue-runner/src/state/build.rs @@ -68,7 +68,7 @@ impl Build { } #[tracing::instrument(skip(v, jobset), err)] - pub fn new(v: db::models::Build, jobset: Arc) -> anyhow::Result> { + pub fn new(v: db::models::Build, jobset: Arc) -> Result, jiff::Error> { Ok(Arc::new(Self { id: v.id, drv_path: v.drvpath, @@ -370,15 +370,38 @@ pub struct BuildOutput { pub metrics: BTreeMap, } +#[derive(Debug, thiserror::Error)] +pub enum BuildParseError { + #[error("buildstatus missing")] + BuildStatusMissing, + + #[error("buildstatus value did not map to a known status")] + BuildStatusUnknown, + + #[error("output missing path")] + OutputMissingPath, + + #[error(transparent)] + OutputName(#[from] harmonia_store_path::StorePathNameError), +} + +/// Errors from constructing a `BuildOutput` from store data. +#[derive(Debug, thiserror::Error)] +pub enum BuildOutputError { + #[error(transparent)] + Store(#[from] harmonia_store_remote::DaemonError), + + #[error(transparent)] + Io(#[from] std::io::Error), +} + impl TryFrom for BuildOutput { - type Error = anyhow::Error; + type Error = BuildParseError; - fn try_from(v: db::models::BuildOutput) -> anyhow::Result { - let build_status = BuildStatus::from_i32( - v.buildstatus - .ok_or_else(|| anyhow::anyhow!("buildstatus missing"))?, - ) - .ok_or_else(|| anyhow::anyhow!("buildstatus did not map"))?; + fn try_from(v: db::models::BuildOutput) -> Result { + let build_status = + BuildStatus::from_i32(v.buildstatus.ok_or(BuildParseError::BuildStatusMissing)?) + .ok_or(BuildParseError::BuildStatusUnknown)?; Ok(Self { failed: build_status != BuildStatus::Success, timings: BuildTimings::default(), @@ -395,17 +418,14 @@ impl TryFrom for BuildOutput { } impl BuildOutput { - pub fn from_grpc(v: hydra_proto::BuildResultInfo) -> anyhow::Result { + pub fn from_grpc(v: hydra_proto::BuildResultInfo) -> Result { let mut outputs = BTreeMap::new(); let mut closure_size = 0; let mut nar_size = 0; let mut merged = nix_support::NixSupport::default(); for (name, info) in v.output_infos { - let path = info - .path - .ok_or_else(|| anyhow::anyhow!("output missing path"))? - .0; + let path = info.path.ok_or(BuildParseError::OutputMissingPath)?.0; closure_size += info.closure_size; nar_size += info.nar_size; outputs.insert(name.parse()?, path); @@ -434,7 +454,7 @@ impl BuildOutput { store: &harmonia_store_remote::ConnectionPool, real_store_dir: &std::path::Path, outputs: BTreeMap>, - ) -> anyhow::Result { + ) -> Result { let resolved: BTreeMap<_, _> = outputs .iter() .filter_map(|(name, path)| Some((name.clone(), path.as_ref()?.clone()))) diff --git a/subprojects/hydra-queue-runner/src/state/jobset.rs b/subprojects/hydra-queue-runner/src/state/jobset.rs index 047d1c534..643eb5c2e 100644 --- a/subprojects/hydra-queue-runner/src/state/jobset.rs +++ b/subprojects/hydra-queue-runner/src/state/jobset.rs @@ -3,9 +3,20 @@ use std::hash::Hash; use std::sync::Arc; use std::sync::atomic::{AtomicI64, AtomicU32, Ordering}; -use anyhow::Context; use hashbrown::HashMap; +#[derive(Debug, thiserror::Error)] +pub enum JobsetError { + #[error(transparent)] + Db(#[from] db::Error), + + #[error("scheduling shares not found for jobset {jobset_id}")] + MissingShares { jobset_id: i32 }, + + #[error("scheduling shares out of range: {0}")] + SharesOutOfRange(#[from] std::num::TryFromIntError), +} + pub type JobsetID = i32; pub(super) const SCHEDULING_WINDOW: i64 = 24 * 60 * 60; @@ -161,7 +172,7 @@ impl Jobsets { jobset_id: i32, project_name: &str, jobset_name: &str, - ) -> anyhow::Result> { + ) -> Result, JobsetError> { let key = (project_name.to_owned(), jobset_name.to_owned()); { let jobsets = self.inner.read(); @@ -173,7 +184,7 @@ impl Jobsets { let shares = conn .get_jobset_scheduling_shares(jobset_id) .await? - .ok_or_else(|| anyhow::anyhow!("Scheduling Shares not found for jobset not found."))?; + .ok_or(JobsetError::MissingShares { jobset_id })?; let jobset = Jobset::new(jobset_id, project_name, jobset_name); jobset.set_shares(shares); @@ -200,14 +211,13 @@ impl Jobsets { } #[tracing::instrument(skip(self, conn), err)] - pub async fn handle_change(&self, conn: &mut db::Connection) -> anyhow::Result<()> { + pub async fn handle_change(&self, conn: &mut db::Connection) -> Result<(), JobsetError> { let curr_jobsets_in_db = conn.get_jobsets().await?; let jobsets = self.inner.read(); for row in curr_jobsets_in_db { if let Some(i) = jobsets.get(&(row.project.clone(), row.name.clone())) { - let shares = u32::try_from(row.schedulingshares) - .context("scheduling shares out of range")?; + let shares = u32::try_from(row.schedulingshares)?; i.set_shares(shares); } } diff --git a/subprojects/hydra-queue-runner/src/state/machine.rs b/subprojects/hydra-queue-runner/src/state/machine.rs index 3f3073aa5..de31c7e77 100644 --- a/subprojects/hydra-queue-runner/src/state/machine.rs +++ b/subprojects/hydra-queue-runner/src/state/machine.rs @@ -11,6 +11,21 @@ use super::{RemoteBuild, System}; use crate::config::{MachineFreeFn, MachineSortFn}; use hydra_proto::{AbortMessage, BuildMessage, JoinMessage, PresignedUploadOpts, runner_request}; +/// Errors from builder registration (validating the join message). +#[derive(Debug, thiserror::Error)] +pub enum MachineRegistrationError { + #[error("{0}")] + ConfigIncompat(String), + + #[error(transparent)] + Uuid(#[from] uuid::Error), +} + +/// Error from sending a message to a builder (channel closed = disconnected). +#[derive(Debug, thiserror::Error)] +#[error("failed to send message to machine: channel closed")] +pub struct ChannelClosedError(#[from] mpsc::error::SendError); + pub use hydra_proto::Pressure; pub(crate) use hydra_proto::PressureState; @@ -583,21 +598,20 @@ impl Machine { tx: mpsc::Sender, use_presigned_uploads: bool, forced_substituters: &[String], - ) -> anyhow::Result { + ) -> Result { if use_presigned_uploads && !forced_substituters.is_empty() { if !msg.use_substitutes { - return Err(anyhow::anyhow!( - "Forced_substituters is configured but builder doesnt use substituters. This is an issue because presigned uploads are enabled", + return Err(MachineRegistrationError::ConfigIncompat( + "Forced_substituters is configured but builder doesnt use substituters. This is an issue because presigned uploads are enabled".into(), )); } for forced_sub in forced_substituters { if !msg.substituters.contains(forced_sub) { - return Err(anyhow::anyhow!( + return Err(MachineRegistrationError::ConfigIncompat(format!( "Builder missing required substituter '{}'. Available: {:?}", - forced_sub, - msg.substituters - )); + forced_sub, msg.substituters + ))); } } } @@ -649,7 +663,7 @@ impl Machine { build_timeout: i32, presigned_url_opts: Option, resolved_drv: hydra_proto::nix::store::derivation::v1::Basic, - ) -> anyhow::Result<()> { + ) -> Result<(), ChannelClosedError> { let drv = effective_drv; self.msg_queue .send(Message::BuildMessage { @@ -678,7 +692,7 @@ impl Machine { } #[tracing::instrument(skip(self), fields(build_id=%build_id), err)] - pub async fn abort_build(&self, build_id: uuid::Uuid) -> anyhow::Result<()> { + pub async fn abort_build(&self, build_id: uuid::Uuid) -> Result<(), ChannelClosedError> { self.msg_queue .send(Message::AbortMessage { build_id }) .await?; @@ -688,7 +702,7 @@ impl Machine { } #[tracing::instrument(skip(self), err)] - pub async fn publish_config_update(&self, change: ConfigUpdate) -> anyhow::Result<()> { + pub async fn publish_config_update(&self, change: ConfigUpdate) -> Result<(), ChannelClosedError> { self.msg_queue.send(Message::ConfigUpdate(change)).await?; Ok(()) } diff --git a/subprojects/hydra-queue-runner/src/state/metrics.rs b/subprojects/hydra-queue-runner/src/state/metrics.rs index 7b3bb27b2..aa6b4700b 100644 --- a/subprojects/hydra-queue-runner/src/state/metrics.rs +++ b/subprojects/hydra-queue-runner/src/state/metrics.rs @@ -120,7 +120,7 @@ pub struct PromMetrics { impl PromMetrics { #[allow(clippy::too_many_lines)] #[tracing::instrument(err)] - pub fn new() -> anyhow::Result { + pub fn new() -> Result { let queue_checks_started = prometheus::IntCounter::with_opts(prometheus::Opts::new( "hydraqueuerunner_queue_checks_started_total", "Number of times State::get_queued_builds() was started", @@ -1098,7 +1098,10 @@ impl PromMetrics { } #[tracing::instrument(skip(self, state), err)] - pub async fn gather_metrics(&self, state: &Arc) -> anyhow::Result> { + pub async fn gather_metrics( + &self, + state: &Arc, + ) -> Result, prometheus::Error> { self.refresh_dynamic_metrics(state).await; let mut buffer = Vec::new(); diff --git a/subprojects/hydra-queue-runner/src/state/mod.rs b/subprojects/hydra-queue-runner/src/state/mod.rs index 305911e4d..81925ba46 100644 --- a/subprojects/hydra-queue-runner/src/state/mod.rs +++ b/subprojects/hydra-queue-runner/src/state/mod.rs @@ -11,8 +11,161 @@ mod step; mod step_info; mod uploader; -use anyhow::Context as _; pub use atomic::AtomicDateTime; + +/// All state logic errors combined. Sub-enums are defined alongside +/// the `impl State` blocks that use them. +#[derive(Debug, thiserror::Error)] +pub enum StateLogicError { + #[error(transparent)] + Resolution(#[from] ResolutionError), + #[error(transparent)] + StepLookup(#[from] StepLookupError), + #[error(transparent)] + MachineLookup(#[from] MachineLookupError), + #[error(transparent)] + DrvLookup(#[from] DrvLookupError), +} + +/// Top-level state error — each variant wraps the narrowest error type +/// for that subsystem, using `#[source]` to preserve the error chain. +#[derive(Debug, thiserror::Error)] +pub enum StateError { + #[error("database operation")] + Db(#[source] db::Error), + + #[error("nix daemon")] + Daemon(#[source] harmonia_store_remote::DaemonError), + + #[error("I/O")] + Io(#[source] std::io::Error), + + #[error("jobset")] + Jobset(#[source] jobset::JobsetError), + + #[error("build output")] + BuildOutput(#[source] build::BuildOutputError), + + #[error("channel closed")] + ChannelClosed(#[source] machine::ChannelClosedError), + + #[error("metrics")] + Metrics(#[source] prometheus::Error), + + #[error("configuration")] + Config(#[source] crate::config::ConfigError), + + #[error("loading configuration")] + LoadConfig(#[source] crate::config::LoadConfigError), + + #[error("binary cache")] + Cache(#[source] binary_cache::CacheError), + + #[error("integer conversion")] + IntConversion(#[source] std::num::TryFromIntError), + + #[error("time")] + Jiff(#[source] jiff::Error), + + #[error("task join")] + Join(#[source] tokio::task::JoinError), + + #[error("invalid platform UTF-8: {0}")] + InvalidPlatformUtf8(std::str::Utf8Error), + + #[error("failed to construct log path string")] + LogPathNotUtf8, + + #[error("reading derivation")] + ReadDerivation(#[source] ReadDerivationError), + + #[error("logic error")] + Logic(#[source] StateLogicError), + + #[error("handling previous failure for resolved drv")] + PreviousFailureContext(#[source] Box), +} + +// Manual `From` impls — using `#[source]` (not `#[from]`) so we keep +// the contextual message on each variant. +macro_rules! impl_from_for_state_error { + ($($variant:ident($ty:ty)),* $(,)?) => { + $( + impl From<$ty> for StateError { + fn from(e: $ty) -> Self { + Self::$variant(e) + } + } + )* + } +} + +impl_from_for_state_error! { + Db(db::Error), + Daemon(harmonia_store_remote::DaemonError), + Io(std::io::Error), + Jobset(jobset::JobsetError), + BuildOutput(build::BuildOutputError), + ChannelClosed(machine::ChannelClosedError), + Metrics(prometheus::Error), + Config(crate::config::ConfigError), + LoadConfig(crate::config::LoadConfigError), + Cache(binary_cache::CacheError), + IntConversion(std::num::TryFromIntError), + Jiff(jiff::Error), + Join(tokio::task::JoinError), + Logic(StateLogicError), + ReadDerivation(ReadDerivationError), +} + +/// Decompose `UtilError` into the appropriate `StateError` variant +/// so that `StateError::Db` is the single canonical DB error variant. +impl From for StateError { + fn from(e: crate::utils::UtilError) -> Self { + match e { + crate::utils::UtilError::Db(e) => Self::Db(e), + crate::utils::UtilError::IntConversion(e) => Self::IntConversion(e), + crate::utils::UtilError::NonUtf8LogPath => Self::Io( + std::io::Error::new(std::io::ErrorKind::InvalidData, "non UTF-8 log file path"), + ), + } + } +} + +impl From for StateError { + fn from(e: ResolutionError) -> Self { + Self::Logic(StateLogicError::Resolution(e)) + } +} +impl From for StateError { + fn from(e: StepLookupError) -> Self { + Self::Logic(StateLogicError::StepLookup(e)) + } +} +impl From for StateError { + fn from(e: MachineLookupError) -> Self { + Self::Logic(StateLogicError::MachineLookup(e)) + } +} +impl From for StateError { + fn from(e: DrvLookupError) -> Self { + Self::Logic(StateLogicError::DrvLookup(e)) + } +} + +/// Errors from reading and parsing a derivation file. +#[derive(Debug, thiserror::Error)] +pub enum ReadDerivationError { + #[error(transparent)] + Io(#[from] std::io::Error), + #[error("drv path does not end in .drv")] + NotDrv, + #[error("failed to parse derivation name: {0}")] + Name(harmonia_store_path::StorePathNameError), + #[error("failed to parse derivation ATerm: {0}")] + Aterm(harmonia_store_aterm::ParseError), +} + pub use build::{Build, BuildOutput, BuildResultState, BuildTimings, Builds, RemoteBuild}; use harmonia_store_derivation::derivation::Derivation; use harmonia_store_path::StorePath; @@ -127,25 +280,29 @@ impl State { } /// Read and parse a `.drv` file from the store. - async fn read_derivation(&self, drv_path: &StorePath) -> anyhow::Result { + async fn read_derivation( + &self, + drv_path: &StorePath, + ) -> Result { let content = fs_err::tokio::read_to_string(self.real_path(drv_path)).await?; let drv_name_str = drv_path.name().to_string(); let name = drv_name_str .strip_suffix(".drv") - .ok_or_else(|| anyhow::anyhow!("drv path does not end in .drv"))? + .ok_or(ReadDerivationError::NotDrv)? .parse() - .context("failed to parse derivation name")?; + .map_err(ReadDerivationError::Name)?; harmonia_store_aterm::parse_derivation_aterm( self.pool.store_dir(), content.as_bytes(), name, ) - .context("failed to parse derivation ATerm") + .map_err(ReadDerivationError::Aterm) } #[tracing::instrument(err)] - pub async fn new() -> anyhow::Result> { - let nix_config = daemon_client_utils::parse_nix_remote().map_err(|e| anyhow::anyhow!(e))?; + pub async fn new() -> Result, StateError> { + let nix_config = daemon_client_utils::parse_nix_remote() + .map_err(|e| StateError::Io(std::io::Error::other(format!("failed to parse NIX_REMOTE: {e}"))))?; let store_dir = nix_config.store_dir.clone(); let pool = harmonia_store_remote::ConnectionPool::with_store_dir( &nix_config.socket, @@ -165,10 +322,9 @@ impl State { ) .await?; - match fs_err::tokio::create_dir_all(&log_dir).await { - Ok(()) => tracing::info!("successfully created hydra log_dir={log_dir:?}"), - Err(e) => tracing::error!("Failed to create hydra log_dir={log_dir:?} e={e}"), - } + fs_err::tokio::create_dir_all(&log_dir) + .await + .map_err(StateError::from)?; let mut remote_stores = vec![]; for uri in config.get_remote_store_addrs() { @@ -213,11 +369,25 @@ impl State { })) } +} + +/// Errors from hot-reloading configuration. These are config +/// validation errors (bad URL, bad S3 config), not infrastructure +/// failures — the old config remains in effect. +#[derive(Debug, thiserror::Error)] +pub enum ReloadConfigError { + #[error(transparent)] + BadDbUrl(#[from] db::DbConfigurationError), + #[error(transparent)] + Cache(#[from] binary_cache::CacheError), +} + +impl State { #[tracing::instrument(skip(self, new_config), err)] pub async fn reload_config_callback( &self, new_config: &crate::config::PreparedApp, - ) -> anyhow::Result<()> { + ) -> Result<(), ReloadConfigError> { // IF this gets more complex we need a way to trap the state and revert. // right now it doesnt matter because only reconfigure_pool can fail and this is the first // thing we do. @@ -284,55 +454,77 @@ impl State { machine_id } - #[tracing::instrument(skip(self))] - pub async fn remove_machine(&self, machine_id: uuid::Uuid) { + #[tracing::instrument(skip(self), err)] + pub async fn remove_machine(&self, machine_id: uuid::Uuid) -> Result<(), StateError> { if let Some(m) = self.machines.remove_machine(machine_id) { let jobs = { let jobs = m.jobs.read(); jobs.clone() }; for job in &jobs { - if let Err(e) = self - .fail_step( - machine_id, - &job.path, - // we fail this with preparing because we kinda want to restart all jobs if - // a machine is removed - BuildResultState::Completed( - hydra_proto::BuildResultState::PreparingFailure, - ), - BuildTimings::default(), - ) - .await - { - tracing::error!( - "Failed to fail step machine_id={machine_id} drv={} e={e}", - job.path - ); - } + self.fail_step( + machine_id, + &job.path, + // we fail this with preparing because we kinda want to restart all jobs if + // a machine is removed + BuildResultState::Completed(hydra_proto::BuildResultState::PreparingFailure), + BuildTimings::default(), + ) + .await?; } } + Ok(()) } - pub async fn remove_all_machines(&self) { + pub async fn remove_all_machines(&self) -> Result<(), StateError> { for m in self.machines.get_all_machines() { - self.remove_machine(m.id).await; + self.remove_machine(m.id).await?; } + Ok(()) } #[tracing::instrument(skip(self), err)] - pub async fn clear_busy(&self) -> anyhow::Result<()> { + pub async fn clear_busy(&self) -> Result<(), db::Error> { let mut db = self.db.get().await?; db.clear_busy(0).await?; Ok(()) } +} + +/// Errors from derivation resolution (CA floating, dynamic deps). +#[derive(Debug, thiserror::Error)] +pub enum ResolutionError { + #[error("failed to resolve CAFloating derivation {0}")] + UnresolvedCAFloating(StorePath), + + #[error("failed to resolve derivation {0}")] + ResolveFailed(StorePath), + + #[error("failed to fill deferred outputs for {0}")] + FillDeferredOutputs(StorePath), + + #[error("could not create resolved build step")] + ResolvedStepCreationFailed, + + #[error("output path mismatch for output `{name}` of {drv}: expected {expected}, got {actual}")] + OutputPathMismatch { + name: String, + drv: StorePath, + expected: String, + actual: String, + }, + #[error("dynamic rdep references output `{output}` not produced by {drv}")] + DynRdepOutputMissing { output: String, drv: StorePath }, +} + +impl State { #[tracing::instrument(skip(self, constraint), err)] #[allow(clippy::too_many_lines)] async fn realise_drv_on_valid_machine( self: Arc, constraint: queue::JobConstraint, - ) -> anyhow::Result { + ) -> Result { let free_fn = self.config.get_machine_free_fn(); let Some((machine, step_info)) = constraint.resolve(&self.machines, free_fn) else { @@ -390,7 +582,7 @@ impl State { } self.construct_log_file_path(drv) - .await? + .await .clone_into(&mut job.result.log_file); let mut db = self.db.get().await?; let step_nr = { @@ -444,7 +636,7 @@ impl State { &resolved_map, ) .await - .ok_or_else(|| anyhow::anyhow!("Failed to resolve derivation {drv}"))?; + .ok_or_else(|| ResolutionError::ResolveFailed(drv.clone()))?; // Input-addressed outputs that transitively depend on a CA // derivation come out of eval as `Deferred` because the IA @@ -467,7 +659,7 @@ impl State { self.pool.store_dir(), unfilled, ) - .map_err(|e| anyhow::anyhow!("Failed to fill deferred outputs for {drv}: {e}"))?; + .map_err(|_| ResolutionError::FillDeferredOutputs(drv.clone()))?; basic_drv = filled.map_outputs(DerivationOutput::InputAddressed); } (basic_drv, was_deferred) @@ -486,11 +678,7 @@ impl State { // Write the resolved derivation to the store via daemon // protocol so we can compare its path. let resolved_path = { - let mut guard = self - .pool - .acquire() - .await - .map_err(|e| anyhow::anyhow!("daemon connection failed: {e}"))?; + let mut guard = self.pool.acquire().await?; harmonia_protocol::daemon::write_derivation( guard.client(), self.pool.store_dir(), @@ -559,20 +747,18 @@ impl State { Arc::new(parking_lot::RwLock::new(HashSet::new())), Arc::new(parking_lot::RwLock::new(HashSet::new())), ) - .await + .await? { CreateStepResult::None => { - return Err(anyhow::anyhow!("Could not create resolved build step")); + return Err(StateError::from( + ResolutionError::ResolvedStepCreationFailed, + )); } CreateStepResult::Valid(step) => step, CreateStepResult::PreviousFailure(step) => { self.handle_previous_failure(build.clone(), step.clone()) .await - .with_context(|| { - format!( - "Failed to handle previous failure in resolved version of {drv}" - ) - })?; + .map_err(|e| StateError::PreviousFailureContext(Box::new(e)))?; return Ok(RealiseStepResult::CachedFailure); } }; @@ -668,30 +854,37 @@ impl State { Ok(RealiseStepResult::Valid(machine)) } - #[tracing::instrument(skip(self), fields(%drv), err)] - async fn construct_log_file_path(&self, drv: &StorePath) -> anyhow::Result { + #[tracing::instrument(skip(self), fields(%drv))] + async fn construct_log_file_path(&self, drv: &StorePath) -> std::path::PathBuf { let mut log_file = self.log_dir.clone(); let base = drv.to_string(); let (dir, file) = base.split_at(2); log_file.push(format!("{dir}/")); - let _ = fs_err::tokio::create_dir_all(&log_file).await; // create dir + if let Err(e) = fs_err::tokio::create_dir_all(&log_file).await { + // Non-fatal: `io::Error` — no DB. Log dir creation is + // best-effort; the log file open will fail more visibly. + tracing::warn!("failed to create log directory {log_file:?}: {e}"); + } log_file.push(file); - Ok(log_file) + log_file } #[tracing::instrument(skip(self), fields(%drv), err)] - pub async fn new_log_file(&self, drv: &StorePath) -> anyhow::Result { - let log_file = self.construct_log_file_path(drv).await?; + pub async fn new_log_file( + &self, + drv: &StorePath, + ) -> Result { + let log_file = self.construct_log_file_path(drv).await; tracing::debug!("opening {log_file:?}"); - Ok(fs_err::tokio::File::options() + fs_err::tokio::File::options() .create(true) .truncate(true) .write(true) .read(false) .mode(0o666) .open(log_file) - .await?) + .await } #[allow(clippy::cast_possible_truncation)] @@ -702,7 +895,7 @@ impl State { new_builds_by_id: Arc>>>, new_builds_by_path: Arc>>, finished_drvs: Arc>>, - ) -> anyhow::Result> { + ) -> Result, StateError> { let Some(build) = new_builds_by_id.read().get(&id).cloned() else { return Ok(None); }; @@ -739,7 +932,7 @@ impl State { new_ids: Vec, new_builds_by_id: Arc>>>, new_builds_by_path: HashMap>, - ) -> anyhow::Result { + ) -> Result { use futures::stream::StreamExt as _; let finished_drvs = Arc::new(parking_lot::RwLock::new(HashSet::::new())); @@ -830,7 +1023,7 @@ impl State { } #[tracing::instrument(skip(self), err)] - async fn process_queue_change(&self) -> anyhow::Result<()> { + async fn process_queue_change(&self) -> Result<(), StateError> { let mut db = self.db.get().await?; let curr_ids: HashMap<_, _> = db .get_not_finished_builds_fast() @@ -842,36 +1035,45 @@ impl State { let cancelled_steps = self.queues.kill_active_steps().await; for (drv_path, machine_id) in cancelled_steps { - if let Err(e) = self - .fail_step( - machine_id, - &drv_path, - BuildResultState::Cancelled, - BuildTimings::default(), - ) - .await - { - tracing::error!( - "Failed to abort step machine_id={machine_id} drv={drv_path} e={e}", - ); - } + self.fail_step( + machine_id, + &drv_path, + BuildResultState::Cancelled, + BuildTimings::default(), + ) + .await?; } Ok(()) } +} +/// Errors from looking up derivations. +#[derive(Debug, thiserror::Error)] +pub enum DrvLookupError { + #[error("drv not found")] + DrvNotFound, + + #[error("drv {drv} for build {build_id} is not valid — possible GC root bug")] + DrvInvalid { drv: String, build_id: BuildID }, + + #[error("derivation not found")] + DerivationNotFound, +} + +impl State { #[tracing::instrument(skip(self), fields(%drv_path))] pub async fn queue_one_build( &self, jobset_id: i32, drv_path: &StorePath, - ) -> anyhow::Result<()> { + ) -> Result<(), StateError> { let mut db = self.db.get().await?; let drv = self.read_derivation(drv_path).await?; db.insert_debug_build( self.pool.store_dir(), jobset_id, drv_path, - std::str::from_utf8(&drv.platform)?, + std::str::from_utf8(&drv.platform).map_err(|e| StateError::InvalidPlatformUtf8(e))?, ) .await?; @@ -882,7 +1084,10 @@ impl State { } #[tracing::instrument(skip(self), err)] - pub(crate) async fn manually_add_queue_build(&self, build_id: BuildID) -> anyhow::Result<()> { + pub(crate) async fn manually_add_queue_build( + &self, + build_id: BuildID, + ) -> Result<(), StateError> { let mut new_ids = Vec::::new(); let mut new_builds_by_id = HashMap::>::new(); let mut new_builds_by_path = HashMap::>::new(); @@ -924,7 +1129,7 @@ impl State { } #[tracing::instrument(skip(self), err)] - pub async fn get_queued_builds(&self) -> anyhow::Result { + pub async fn get_queued_builds(&self) -> Result { self.metrics.queue_checks_started.inc(); let mut new_ids = Vec::::with_capacity(1000); @@ -959,19 +1164,14 @@ impl State { } #[tracing::instrument(skip(self))] - pub fn start_queue_monitor_loop(self: Arc) -> tokio::task::AbortHandle { - let task = tokio::task::spawn({ - async move { - if let Err(e) = Box::pin(self.queue_monitor_loop()).await { - tracing::error!("Failed to spawn queue monitor loop. e={e}"); - } - } - }); - task.abort_handle() + pub fn start_queue_monitor_loop( + self: Arc, + ) -> tokio::task::JoinHandle> { + tokio::task::spawn(async move { Box::pin(self.queue_monitor_loop()).await }) } #[tracing::instrument(skip(self), err)] - async fn queue_monitor_loop(&self) -> anyhow::Result<()> { + async fn queue_monitor_loop(&self) -> Result<(), StateError> { let mut listener = self .db .listener(vec![ @@ -987,13 +1187,7 @@ impl State { loop { let before_work = Instant::now(); // no cache in daemon protocol - let early_exit = match self.get_queued_builds().await { - Ok(early_exit) => early_exit, - Err(e) => { - tracing::error!("get_queue_builds failed inside queue monitor loop: {e}"); - continue; - } - }; + let early_exit = self.get_queued_builds().await?; #[allow(clippy::cast_possible_truncation)] self.metrics @@ -1023,8 +1217,7 @@ impl State { Ok(Some(v)) => v.channel().to_owned(), Ok(None) => continue, Err(e) => { - tracing::warn!("PgListener failed with e={e}"); - continue; + return Err(StateError::from(db::Error::from(e))); } }, } @@ -1032,10 +1225,7 @@ impl State { match listener.try_next().await { Ok(Some(v)) => v.channel().to_owned(), Ok(None) => continue, - Err(e) => { - tracing::warn!("PgListener failed with e={e}"); - continue; - } + Err(e) => return Err(StateError::from(db::Error::from(e))), } }; self.metrics.nr_queue_wakeups.inc(); @@ -1048,24 +1238,12 @@ impl State { "builds_restarted" => tracing::debug!("got notification: builds restarted"), "builds_cancelled" | "builds_deleted" | "builds_bumped" => { tracing::info!("got notification: builds cancelled or bumped"); - if let Err(e) = self.process_queue_change().await { - tracing::error!("Failed to process queue change. e={e}"); - } + self.process_queue_change().await?; } "jobset_shares_changed" => { tracing::info!("got notification: jobset shares changed"); - match self.db.get().await { - Ok(mut conn) => { - if let Err(e) = self.jobsets.handle_change(&mut conn).await { - tracing::error!("Failed to handle jobset change. e={e}"); - } - } - Err(e) => { - tracing::error!( - "Failed to get db connection for event 'jobset_shares_changed'. e={e}" - ); - } - } + let mut conn = self.db.get().await?; + self.jobsets.handle_change(&mut conn).await?; } _ => (), } @@ -1078,46 +1256,43 @@ impl State { } #[tracing::instrument(skip(self))] - pub fn start_dispatch_loop(self: Arc) -> tokio::task::AbortHandle { - let task = tokio::task::spawn({ - async move { - loop { - let before_sleep = Instant::now(); - let dispatch_trigger_timer = self.config.get_dispatch_trigger_timer(); - if let Some(timer) = dispatch_trigger_timer { - tokio::select! { - () = self.notify_dispatch.notified() => {}, - () = tokio::time::sleep(timer) => {}, - }; - } else { - self.notify_dispatch.notified().await; - } - tracing::info!("starting dispatch"); + pub fn start_dispatch_loop(self: Arc) -> tokio::task::JoinHandle> { + tokio::task::spawn(async move { + loop { + let before_sleep = Instant::now(); + let dispatch_trigger_timer = self.config.get_dispatch_trigger_timer(); + if let Some(timer) = dispatch_trigger_timer { + tokio::select! { + () = self.notify_dispatch.notified() => {}, + () = tokio::time::sleep(timer) => {}, + }; + } else { + self.notify_dispatch.notified().await; + } + tracing::info!("starting dispatch"); - #[allow(clippy::cast_possible_truncation)] - self.metrics - .dispatcher_time_spent_waiting - .inc_by(before_sleep.elapsed().as_micros() as u64); + #[allow(clippy::cast_possible_truncation)] + self.metrics + .dispatcher_time_spent_waiting + .inc_by(before_sleep.elapsed().as_micros() as u64); - self.metrics.nr_dispatcher_wakeups.inc(); - let before_work = Instant::now(); - self.clone().do_dispatch_once().await; + self.metrics.nr_dispatcher_wakeups.inc(); + let before_work = Instant::now(); + self.clone().do_dispatch_once().await?; - let elapsed = before_work.elapsed(); + let elapsed = before_work.elapsed(); - #[allow(clippy::cast_possible_truncation)] - self.metrics - .dispatcher_time_spent_running - .inc_by(elapsed.as_micros() as u64); + #[allow(clippy::cast_possible_truncation)] + self.metrics + .dispatcher_time_spent_running + .inc_by(elapsed.as_micros() as u64); - #[allow(clippy::cast_possible_truncation)] - self.metrics - .dispatch_time_ms - .inc_by(elapsed.as_millis() as u64); - } + #[allow(clippy::cast_possible_truncation)] + self.metrics + .dispatch_time_ms + .inc_by(elapsed.as_millis() as u64); } - }); - task.abort_handle() + }) } #[tracing::instrument(skip(self))] @@ -1155,7 +1330,7 @@ impl State { } #[tracing::instrument(skip(self))] - async fn do_dispatch_once(self: Arc) { + async fn do_dispatch_once(self: Arc) -> Result<(), StateError> { // Prune old historical build step info from the jobsets. self.jobsets.prune(); let new_runnable = self.steps.clone_runnable(); @@ -1201,12 +1376,12 @@ impl State { }, &self.metrics, ) - .await; + .await?; self.metrics .nr_steps_waiting .set(nr_steps_waiting_all_queues); - self.abort_unsupported().await; + self.abort_unsupported().await } #[tracing::instrument(skip(self, step_status), fields(%build_id, %machine_id), err)] @@ -1215,7 +1390,7 @@ impl State { build_id: uuid::Uuid, machine_id: uuid::Uuid, step_status: db::models::StepStatus, - ) -> anyhow::Result<()> { + ) -> Result<(), db::Error> { let build_id_and_step_nr = self.machines.get_machine_by_id(machine_id).and_then(|m| { tracing::debug!( "get job from machine by build_id: build_id={build_id} m={}", @@ -1241,7 +1416,19 @@ impl State { .await?; Ok(()) } +} + +/// Errors from looking up steps/jobs in the in-memory queues. +#[derive(Debug, thiserror::Error)] +pub enum StepLookupError { + #[error("step is missing in queues.scheduled")] + StepNotScheduled, + #[error("job is missing in machine.jobs m={0}")] + JobNotOnMachine(String), +} + +impl State { #[allow(clippy::too_many_lines)] #[tracing::instrument(skip(self, output), fields(%machine_id, %drv_path), err)] pub async fn succeed_step( @@ -1249,13 +1436,13 @@ impl State { machine_id: uuid::Uuid, drv_path: &StorePath, output: BuildOutput, - ) -> anyhow::Result<()> { + ) -> Result<(), StateError> { tracing::info!("marking job as done: drv_path={drv_path}"); let item = self .queues .remove_job_from_scheduled(drv_path) .await - .ok_or_else(|| anyhow::anyhow!("Step is missing in queues.scheduled"))?; + .ok_or(StateError::from(StepLookupError::StepNotScheduled))?; item.step_info.step.set_finished(true); @@ -1270,14 +1457,15 @@ impl State { let Some(expected_path) = expected_path else { continue; // path not statically known (Deferred/CAFloating/Impure) }; - if let Some(actual_path) = output.outputs.get(name) { - anyhow::ensure!( - expected_path == actual_path, - "output path mismatch for output `{name}` of {drv_path}: \ - expected {}, got {}", - self.pool.store_dir().display(expected_path), - self.pool.store_dir().display(actual_path), - ); + if let Some(actual_path) = output.outputs.get(name) + && expected_path != actual_path + { + return Err(StateError::from(ResolutionError::OutputPathMismatch { + name: name.to_string(), + drv: drv_path.clone(), + expected: self.pool.store_dir().display(expected_path).to_string(), + actual: self.pool.store_dir().display(actual_path).to_string(), + })); } } } @@ -1286,10 +1474,9 @@ impl State { "removing job from machine: drv_path={drv_path} m={}", item.machine.id ); - let mut job = item - .machine - .remove_job(drv_path) - .ok_or_else(|| anyhow::anyhow!("Job is missing in machine.jobs m={}", item.machine,))?; + let mut job = item.machine.remove_job(drv_path).ok_or_else(|| { + StateError::from(StepLookupError::JobNotOnMachine(item.machine.to_string())) + })?; self.queues .remove_job(&item.step_info, &item.build_queue) .await; @@ -1351,6 +1538,7 @@ impl State { tracing::info!("Copied {} paths to {dest_uri}", paths.len()); } Ok(out) => { + // Non-fatal: remote store copy is best-effort. tracing::error!( "nix copy to {dest_uri} failed: {}", str::from_utf8(&out.stderr).unwrap_or("Invalid UTF-8") @@ -1425,6 +1613,8 @@ impl State { }; for s3 in &s3_stores { if let Err(e) = s3.write_realisation(realisation.clone()).await { + // Non-fatal: `CacheError` — no DB. S3 + // realisation write is best-effort. tracing::warn!( "Failed to write realisation for {drv_path}^{output_name}: {e}" ); @@ -1486,9 +1676,10 @@ impl State { }; let resolved_drv = output.outputs.get(&output_name).cloned().ok_or_else(|| { - anyhow::anyhow!( - "Dynamic rdep references output `{output_name}` not produced by {drv_path}" - ) + StateError::from(ResolutionError::DynRdepOutputMissing { + output: output_name.to_string(), + drv: drv_path.clone(), + }) })?; // Find a build associated with this step. For intermediate steps @@ -1524,15 +1715,12 @@ impl State { Arc::default(), new_runnable.clone(), ) - .await + .await? { CreateStepResult::None => continue, CreateStepResult::Valid(step) => step, CreateStepResult::PreviousFailure(step) => { - if let Err(e) = self.handle_previous_failure(build.clone(), step).await { - tracing::error!("Failed to handle previous failure: {e}"); - } - // TODO: figure out what to do here + self.handle_previous_failure(build.clone(), step).await?; continue; } }; @@ -1560,13 +1748,14 @@ impl State { drv_path: &StorePath, state: BuildResultState, timings: BuildTimings, - ) -> anyhow::Result<()> { + ) -> Result<(), StateError> { tracing::info!("removing job from running in system queue: drv_path={drv_path}"); - let item = self - .queues - .remove_job_from_scheduled(drv_path) - .await - .ok_or_else(|| anyhow::anyhow!("Step is missing in queues.scheduled"))?; + let Some(item) = self.queues.remove_job_from_scheduled(drv_path).await else { + // Job may have already been cleaned up (e.g. machine + // disconnected concurrently). Not a real error. + tracing::warn!("step already gone from queues.scheduled: drv_path={drv_path}"); + return Ok(()); + }; item.step_info.step.set_finished(false); @@ -1574,10 +1763,14 @@ impl State { "removing job from machine: drv_path={drv_path} m={}", item.machine.id ); - let mut job = item - .machine - .remove_job(drv_path) - .ok_or_else(|| anyhow::anyhow!("Job is missing in machine.jobs m={}", item.machine))?; + let Some(mut job) = item.machine.remove_job(drv_path) else { + // Same race — job already removed from machine. + tracing::warn!( + "job already gone from machine.jobs: drv_path={drv_path} m={}", + item.machine + ); + return Ok(()); + }; job.result.step_status = BuildStatus::Failed; // this can override step_status to something more specific @@ -1647,21 +1840,33 @@ impl State { ) .await } +} + +/// Errors from looking up machines/jobs by UUID. +#[derive(Debug, Clone, Copy, thiserror::Error)] +pub enum MachineLookupError { + #[error("machine with machine_id not found")] + MachineNotFound, + + #[error("job with build_id not found")] + JobNotFound, +} +impl State { #[tracing::instrument(skip(self, output), fields(%machine_id, build_id=%build_id), err)] pub async fn succeed_step_by_uuid( &self, build_id: uuid::Uuid, machine_id: uuid::Uuid, output: BuildOutput, - ) -> anyhow::Result<()> { + ) -> Result<(), StateError> { let machine = self .machines .get_machine_by_id(machine_id) - .ok_or_else(|| anyhow::anyhow!("Machine with machine_id not found"))?; + .ok_or(StateError::from(MachineLookupError::MachineNotFound))?; let drv_path = machine .get_job_drv_for_build_id(build_id) - .ok_or_else(|| anyhow::anyhow!("Job with build_id not found"))?; + .ok_or(StateError::from(MachineLookupError::JobNotFound))?; self.succeed_step(machine_id, &drv_path, output).await } @@ -1673,14 +1878,14 @@ impl State { machine_id: uuid::Uuid, state: BuildResultState, timings: BuildTimings, - ) -> anyhow::Result<()> { + ) -> Result<(), StateError> { let machine = self .machines .get_machine_by_id(machine_id) - .ok_or_else(|| anyhow::anyhow!("Machine with machine_id not found"))?; + .ok_or(StateError::from(MachineLookupError::MachineNotFound))?; let drv_path = machine .get_job_drv_for_build_id(build_id) - .ok_or_else(|| anyhow::anyhow!("Job with build_id not found"))?; + .ok_or(StateError::from(MachineLookupError::JobNotFound))?; self.fail_step(machine_id, &drv_path, state, timings).await } @@ -1693,7 +1898,7 @@ impl State { machine: Option>, mut job: machine::Job, step: Arc, - ) -> anyhow::Result<()> { + ) -> Result<(), StateError> { if !job.result.has_stop_time() { job.result.set_stop_time_now(); } @@ -1851,7 +2056,7 @@ impl State { &self, build: Arc, step: Arc, - ) -> anyhow::Result<()> { + ) -> Result<(), StateError> { // Some step previously failed, so mark the build as failed right away. tracing::warn!( "marking build {} as cached failure due to '{}'", @@ -1951,7 +2156,7 @@ impl State { new_builds_by_path: Arc>>, finished_drvs: Arc>>, new_runnable: Arc>>>, - ) -> anyhow::Result<()> { + ) -> Result<(), StateError> { self.metrics.queue_build_loads.inc(); tracing::info!("loading build {} ({})", build.id, build.full_job_name()); nr_added.fetch_add(1, Ordering::Relaxed); @@ -1961,29 +2166,10 @@ impl State { } if !daemon_client_utils::is_valid_path(&self.pool, &build.drv_path).await? { - tracing::error!( - "aborting GC'ed build id={} path={}", - build.id, - self.pool.store_dir().display(&build.drv_path) - ); - if !build.get_finished_in_db() { - match self.db.get().await { - Ok(mut conn) => { - if let Err(e) = conn.abort_build(build.id).await { - tracing::error!("Failed to abort the build={} e={}", build.id, e); - } - } - Err(e) => tracing::error!( - "Failed to get database connection so we can abort the build={} e={}", - build.id, - e - ), - } - } - - build.set_finished_in_db(true); - self.metrics.nr_builds_done.inc(); - return Ok(()); + return Err(StateError::from(DrvLookupError::DrvInvalid { + drv: self.pool.store_dir().display(&build.drv_path).to_string(), + build_id: build.id, + })); } // Create steps for this derivation and its dependencies. @@ -1999,14 +2185,12 @@ impl State { new_steps.clone(), new_runnable.clone(), ) - .await + .await? { CreateStepResult::None => None, CreateStepResult::Valid(dep) => Some(dep), CreateStepResult::PreviousFailure(step) => { - if let Err(e) = self.handle_previous_failure(build, step).await { - tracing::error!("Failed to handle previous failure: {e}"); - } + self.handle_previous_failure(build, step).await?; return Ok(()); } }; @@ -2029,12 +2213,8 @@ impl State { let finished_drvs = finished_drvs.clone(); let new_runnable = new_runnable.clone(); async move { - let j = { - if let Some(j) = new_builds_by_id.read().get(&b) { - j.clone() - } else { - return Ok(()); - } + let Some(j) = new_builds_by_id.read().get(&b).cloned() else { + return Ok(()); }; Box::pin(self.create_build( @@ -2071,9 +2251,7 @@ impl State { } else { // If we didn't get a step, it means the step's outputs are // all valid. So we mark this as a finished, cached build. - if let Err(e) = self.handle_cached_build(build).await { - tracing::error!("failed to handle cached build: {e}"); - } + self.handle_cached_build(build).await?; } Ok(()) } @@ -2097,13 +2275,13 @@ impl State { finished_drvs: Arc>>, new_steps: Arc>>>, new_runnable: Arc>>>, - ) -> CreateStepResult { + ) -> Result { use futures::stream::StreamExt as _; { let finished_drvs = finished_drvs.read(); if finished_drvs.contains(&drv_path) { - return CreateStepResult::None; + return Ok(CreateStepResult::None); } } @@ -2134,7 +2312,7 @@ impl State { // TODO once we properly feed IFD builds in to Hydra to be // distributed, remove this hack. if step.get_finished() { - return CreateStepResult::None; + return Ok(CreateStepResult::None); } if let Some(output_paths) = step.get_output_paths(self.pool.store_dir()) { // All output paths must be known (Some) and valid in @@ -2159,17 +2337,17 @@ impl State { if all_valid { finished_drvs.write().insert(drv_path.clone()); step.set_finished(true); - return CreateStepResult::None; + return Ok(CreateStepResult::None); } } - return CreateStepResult::Valid(step); + return Ok(CreateStepResult::Valid(step)); } self.metrics.queue_steps_created.inc(); tracing::debug!("considering derivation '{drv_path}'"); let Some(drv) = self.read_derivation(&drv_path).await.ok() else { tracing::warn!("create_step: could not query derivation {drv_path}, skipping"); - return CreateStepResult::None; + return Ok(CreateStepResult::None); }; if let Some(fod_checker) = &self.fod_checker { fod_checker.add_ca_drv_parsed(&drv_path, &drv); @@ -2248,8 +2426,8 @@ impl State { }) .map(|(name, path)| (name.clone(), path.clone())) .collect::>(); - if !unregistered_local_outputs.is_empty() - && let Err(e) = crate::utils::make_local_step( + if !unregistered_local_outputs.is_empty() { + match crate::utils::make_local_step( &self.db, self.pool.store_dir(), build.id, @@ -2257,8 +2435,16 @@ impl State { &unregistered_local_outputs, ) .await - { - tracing::warn!("Failed to mark outputs as already found, continuing: {e}"); + { + Ok(()) => {} + Err(crate::utils::UtilError::Db(e)) => return Err(StateError::from(e)), + Err(e @ (crate::utils::UtilError::IntConversion(_) + | crate::utils::UtilError::NonUtf8LogPath)) => { + // Non-fatal: no DB involvement. The local step + // just won't be recorded. + tracing::warn!("Failed to record local step: {e}"); + } + } } // Handle paths that aren't in the remote store (for pushing) @@ -2268,7 +2454,8 @@ impl State { .await; if !missing.is_empty() && missing_local_outputs.is_empty() { // we have all paths locally, so we can just upload them to the remote_store - if let Ok(log_file) = self.construct_log_file_path(&drv_path).await { + { + let log_file = self.construct_log_file_path(&drv_path).await; let missing_paths: Vec = missing.values().filter_map(Clone::clone).collect(); self.uploader @@ -2290,7 +2477,7 @@ impl State { if self.check_cached_failure(step.clone()).await { step.set_previous_failure(true); - return CreateStepResult::PreviousFailure(step); + return Ok(CreateStepResult::PreviousFailure(step)); } tracing::debug!("missing outputs: {missing_outputs:?}"); @@ -2320,9 +2507,15 @@ impl State { Ok(_) => { self.metrics.nr_substitutes_failed.inc(); } - Err(e) => { + Err(crate::utils::UtilError::Db(e)) => { + self.metrics.nr_substitutes_failed.inc(); + return Err(StateError::from(e)); + } + Err(e @ (crate::utils::UtilError::IntConversion(_) + | crate::utils::UtilError::NonUtf8LogPath)) => { + // Non-fatal: no DB involvement. self.metrics.nr_substitutes_failed.inc(); - tracing::warn!("Failed to substitute path: {e}"); + tracing::warn!("Failed to record substitution step: {e}"); } } } @@ -2340,7 +2533,7 @@ impl State { finished_drvs.write().insert(drv_path.clone()); step.set_finished(true); - return CreateStepResult::None; + return Ok(CreateStepResult::None); } tracing::debug!("creating build step '{drv_path}"); @@ -2369,7 +2562,7 @@ impl State { }) .buffered(25); while let Some(result) = tokio_stream::StreamExt::next(&mut stream).await { - match result { + match result? { CreateStepResult::None => (), CreateStepResult::Valid(dep) => { if !dep.get_finished() && !dep.get_previous_failure() { @@ -2379,7 +2572,7 @@ impl State { } } CreateStepResult::PreviousFailure(step) => { - return CreateStepResult::PreviousFailure(step); + return Ok(CreateStepResult::PreviousFailure(step)); } } } @@ -2396,19 +2589,18 @@ impl State { let mut new_steps = new_steps.write(); new_steps.insert(step.clone()); } - CreateStepResult::Valid(step) + Ok(CreateStepResult::Valid(step)) } #[tracing::instrument(skip(self))] async fn query_known_drv_outputs( &self, drv_path: &StorePath, - ) -> anyhow::Result> { + ) -> Result, db::Error> { let mut db = self.db.get().await?; let mut tx = db.begin_transaction().await?; - Ok(tx - .find_build_step_outputs(self.pool.store_dir(), drv_path) - .await?) + tx.find_build_step_outputs(self.pool.store_dir(), drv_path) + .await } #[tracing::instrument(skip(self, step), ret, level = "debug")] @@ -2433,7 +2625,7 @@ impl State { } #[tracing::instrument(skip(self, build), fields(build_id=build.id), err)] - async fn handle_cached_build(&self, build: Arc) -> anyhow::Result<()> { + async fn handle_cached_build(&self, build: Arc) -> Result<(), StateError> { let res = self.get_build_output_cached(&build.drv_path).await?; { @@ -2461,7 +2653,10 @@ impl State { } #[tracing::instrument(skip(self), err)] - async fn get_build_output_cached(&self, drv_path: &StorePath) -> anyhow::Result { + async fn get_build_output_cached( + &self, + drv_path: &StorePath, + ) -> Result { let drv = self.read_derivation(drv_path).await?; let output_paths: BTreeMap> = drv @@ -2490,7 +2685,7 @@ impl State { continue; }; let build_id = db_build_output.id; - let Ok(mut res): anyhow::Result = db_build_output.try_into() else { + let Ok(mut res): Result = db_build_output.try_into() else { continue; }; @@ -2529,7 +2724,7 @@ impl State { fs_err::os::unix::fs::symlink(&store_path_full, &link_path) } - async fn abort_unsupported(&self) { + async fn abort_unsupported(&self) -> Result<(), StateError> { let runnable = self.steps.clone_runnable(); let now = jiff::Timestamp::now(); @@ -2555,7 +2750,7 @@ impl State { let drv = step.get_drv_path(); let system = step.get_system(); - tracing::error!("aborting unsupported build step '{drv}' (type '{system:?}')",); + tracing::warn!("aborting unsupported build step '{drv}' (type '{system:?}')",); aborted.insert(step.clone()); @@ -2584,9 +2779,7 @@ impl State { "unsupported system type '{}'", system.unwrap_or(String::new()) )); - if let Err(e) = self.inner_fail_job(drv, None, job, step.clone()).await { - tracing::error!("Failed to fail step drv={drv} e={e}"); - } + self.inner_fail_job(drv, None, job, step.clone()).await?; } { @@ -2599,5 +2792,6 @@ impl State { self.metrics .nr_unsupported_steps_aborted .inc_by(aborted.len() as u64); + Ok(()) } } diff --git a/subprojects/hydra-queue-runner/src/state/queue.rs b/subprojects/hydra-queue-runner/src/state/queue.rs index be924fd35..1a6c8c08a 100644 --- a/subprojects/hydra-queue-runner/src/state/queue.rs +++ b/subprojects/hydra-queue-runner/src/state/queue.rs @@ -270,19 +270,19 @@ impl InnerQueues { } fn remove_job_by_path(&mut self, drv: &StorePath) { - if self.jobs.remove(drv).is_none() { - tracing::error!("Failed to remove stepinfo drv={drv} from jobs!"); - } + assert!( + self.jobs.remove(drv).is_some(), + "tried to remove non-existent job drv={drv}" + ); } #[tracing::instrument(skip(self, stepinfo, queue))] fn remove_job(&mut self, stepinfo: &Arc, queue: &Arc) { - if self.jobs.remove(stepinfo.step.get_drv_path()).is_none() { - tracing::error!( - "Failed to remove stepinfo drv={} from jobs!", - stepinfo.step.get_drv_path(), - ); - } + assert!( + self.jobs.remove(stepinfo.step.get_drv_path()).is_some(), + "tried to remove non-existent job drv={}", + stepinfo.step.get_drv_path(), + ); // active should be removed queue.scrube_jobs(); } @@ -318,6 +318,8 @@ impl InnerQueues { item.machine.get_internal_build_id_for_drv(drv_path) { if let Err(e) = item.machine.abort_build(internal_build_id).await { + // Non-fatal: `ChannelClosedError` — builder + // already disconnected. No DB involved. tracing::error!( "Failed to abort build drv_path={drv_path} build_id={internal_build_id} e={e}", ); @@ -452,9 +454,11 @@ impl Queues { &self, processor: F, metrics: &super::metrics::PromMetrics, - ) -> i64 + ) -> Result where - F: AsyncFn(JobConstraint) -> anyhow::Result, + F: AsyncFn( + JobConstraint, + ) -> Result, { let now = jiff::Timestamp::now(); let mut nr_steps_waiting_all_queues = 0; @@ -519,18 +523,26 @@ impl Queues { metrics.queue_aborted_jobs_total.inc(); } - Err(e) => { - tracing::warn!( - "Failed to realise drv on valid machine, will be skipped: drv={} e={e}", - job.step.get_drv_path(), - ); - } + Err(e) => match &e { + // Fatal: DB infrastructure error — halt the + // dispatch loop so the queue-runner can restart. + super::StateError::Db(_) => return Err(e), + // Non-fatal: store unavailable, logic error, + // timestamp overflow, etc. — skip this drv and + // continue dispatching. + _ => { + tracing::warn!( + "Failed to realise drv on valid machine, skipping: drv={} e={e}", + job.step.get_drv_path(), + ); + } + }, } queue.set_nr_runnable_waiting(nr_waiting); queue.set_nr_runnable_disabled(nr_disabled); } } - nr_steps_waiting_all_queues + Ok(nr_steps_waiting_all_queues) } pub async fn clone_inner(&self) -> HashMap> { diff --git a/subprojects/hydra-queue-runner/src/state/uploader.rs b/subprojects/hydra-queue-runner/src/state/uploader.rs index 928be9447..37117762a 100644 --- a/subprojects/hydra-queue-runner/src/state/uploader.rs +++ b/subprojects/hydra-queue-runner/src/state/uploader.rs @@ -2,6 +2,22 @@ use backon::ExponentialBuilder; use backon::Retryable as _; use harmonia_store_path::StorePath; +#[derive(Debug, thiserror::Error)] +pub(crate) enum PersistError { + #[error(transparent)] + Io(#[from] std::io::Error), + #[error(transparent)] + Json(#[from] serde_json::Error), +} + +#[derive(Debug, thiserror::Error)] +pub(crate) enum UploadError { + #[error(transparent)] + Cache(#[from] binary_cache::CacheError), + #[error(transparent)] + Io(#[from] std::io::Error), +} + #[allow(clippy::unnecessary_wraps)] fn deserialize_with_new_v4<'de, D>(_: D) -> Result where @@ -36,6 +52,8 @@ impl Uploader { }; if let Err(e) = uploader.load_state().await { + // Non-fatal: `PersistError` is IO/JSON — no DB. + // Starting with an empty upload queue is fine. tracing::warn!( "Failed to load uploader state from {}: {}", uploader.state_file_path.display(), @@ -46,7 +64,7 @@ impl Uploader { uploader } - async fn save_state(&self) -> anyhow::Result<()> { + async fn save_state(&self) -> Result<(), PersistError> { let mut queue = self.queue.inspect(); queue.extend(self.current_tasks.read().iter().cloned()); let json = serde_json::to_string(&queue)?; @@ -55,7 +73,7 @@ impl Uploader { Ok(()) } - async fn load_state(&self) -> anyhow::Result<()> { + async fn load_state(&self) -> Result<(), PersistError> { if !self.state_file_path.exists() { tracing::info!( "Uploader state file {} does not exist, starting with empty queue", @@ -110,6 +128,9 @@ impl Uploader { { Ok(c) => c, Err(e) => { + // Non-fatal: outputs remain in the local store. The + // uploader is best-effort — a transient daemon issue + // shouldn't stop other uploads from proceeding. tracing::error!("Failed to query requisites: {e}"); return; } @@ -145,7 +166,7 @@ impl Uploader { local_store: &harmonia_store_remote::ConnectionPool, msg: &Message, closure: &[harmonia_store_path_info::ValidPathInfo], - ) -> anyhow::Result<()> { + ) -> Result<(), UploadError> { // Upload log file (|| async { let file = fs_err::tokio::File::open(msg.log_local_path.as_path()).await?; @@ -153,15 +174,14 @@ impl Uploader { remote_store .upsert_file_stream(&msg.log_remote_path, reader, "text/plain; charset=utf-8") .await?; - Ok::<(), anyhow::Error>(()) + Ok::<(), UploadError>(()) }) .retry( ExponentialBuilder::default() .with_max_delay(std::time::Duration::from_secs(30)) .with_max_times(3), ) - .await - .map_err(|e| anyhow::anyhow!("failed to upload log file: {e}"))?; + .await?; // Copy NARs let missing_paths: hashbrown::HashSet = remote_store @@ -183,15 +203,14 @@ impl Uploader { remote_store .copy_paths(local_store, paths_to_copy.clone(), false) .await?; - Ok::<(), anyhow::Error>(()) + Ok::<(), UploadError>(()) }) .retry( ExponentialBuilder::default() .with_max_delay(std::time::Duration::from_secs(60)) .with_max_times(5), ) - .await - .map_err(|e| anyhow::anyhow!("failed to copy paths: {e}"))?; + .await?; tracing::debug!( "Successfully uploaded {} paths to bucket {}", diff --git a/subprojects/hydra-queue-runner/src/utils.rs b/subprojects/hydra-queue-runner/src/utils.rs index a8971acf3..456964b35 100644 --- a/subprojects/hydra-queue-runner/src/utils.rs +++ b/subprojects/hydra-queue-runner/src/utils.rs @@ -1,12 +1,24 @@ use std::{collections::BTreeMap, os::unix::ffi::OsStrExt as _}; -use anyhow::Context as _; use db::models::BuildID; use harmonia_store_derivation::derived_path::OutputName; use harmonia_store_path::{StoreDir, StorePath}; use crate::state::RemoteBuild; +/// Errors from queue-runner utility functions that record build steps +/// in the database. Only `Db` and `IntConversion` can escape — store +/// and cache errors are handled internally where they occur. +#[derive(Debug, thiserror::Error)] +pub enum UtilError { + #[error(transparent)] + Db(#[from] db::Error), + #[error(transparent)] + IntConversion(#[from] std::num::TryFromIntError), + #[error("non UTF-8 log file path")] + NonUtf8LogPath, +} + #[tracing::instrument(skip(db, store_dir, res), err)] pub async fn finish_build_step( db: &db::Database, @@ -16,7 +28,7 @@ pub async fn finish_build_step( res: &RemoteBuild, machine: Option<&str>, output_paths: Option<&BTreeMap>, -) -> anyhow::Result<()> { +) -> Result<(), UtilError> { let mut conn = db.get().await?; let mut tx = conn.begin_transaction().await?; @@ -47,7 +59,7 @@ pub async fn finish_build_step( tx.notify_step_finished( build_id, step_nr, - res.log_file.to_str().context("Non UTF-8 log file path")?, + res.log_file.to_str().ok_or(UtilError::NonUtf8LogPath)?, ) .await?; @@ -72,7 +84,7 @@ pub async fn substitute_output( build_id: BuildID, drv_path: &StorePath, remote_store: Option<&binary_cache::S3BinaryCacheClient>, -) -> anyhow::Result { +) -> Result { let (name, path) = o; let Some(path) = path else { return Ok(false); @@ -80,12 +92,15 @@ pub async fn substitute_output( let store_dir = pool.store_dir(); let starttime = i32::try_from(jiff::Timestamp::now().as_second())?; // TODO + // Non-fatal: path simply isn't available for substitution. if let Err(e) = daemon_client_utils::ensure_path(&pool, &path).await { tracing::debug!("Path not found, can't import={e}"); return Ok(false); } + // Best-effort replication to S3. Non-fatal: the substitution + // succeeded locally, so the build can proceed regardless. if let Some(remote_store) = remote_store { - let _: Result<(), anyhow::Error> = async { + let _: Result<(), Box> = async { let closure = daemon_client_utils::query_closure(&pool, std::slice::from_ref(&path)).await?; let missing: hashbrown::HashSet = remote_store @@ -103,7 +118,7 @@ pub async fn substitute_output( .await .inspect_err(|e| { tracing::error!( - "Failed to copy paths to remote store({}): {e}", + "Failed to replicate to remote store({}): {e}", remote_store.cfg.client_config.bucket ); }); @@ -133,7 +148,7 @@ pub async fn make_local_step( build_id: BuildID, drv_path: &StorePath, missing: &BTreeMap>, -) -> anyhow::Result<()> { +) -> Result<(), UtilError> { let time = i32::try_from(jiff::Timestamp::now().as_second())?; let mut db = db.get().await?;