diff --git a/.sqlx/query-1766e5a8871d12054550bceb42466fa2348409ba9c7a20cf18a7613bdfb0ae89.json b/.sqlx/query-1766e5a8871d12054550bceb42466fa2348409ba9c7a20cf18a7613bdfb0ae89.json new file mode 100644 index 0000000..80dac1c --- /dev/null +++ b/.sqlx/query-1766e5a8871d12054550bceb42466fa2348409ba9c7a20cf18a7613bdfb0ae89.json @@ -0,0 +1,43 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT * FROM platform_runners WHERE TRUE AND ($1::int8[] IS NULL OR array_position($1, id) IS NOT NULL) AND ($2::timestamptz[] IS NULL OR array_position($2, created_at) IS NOT NULL) AND ($3::int8[] IS NULL OR array_position($3, platform_id) IS NOT NULL) AND ($4::int8[] IS NULL OR array_position($4, file_id) IS NOT NULL)", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Int8" + }, + { + "ordinal": 1, + "name": "created_at", + "type_info": "Timestamptz" + }, + { + "ordinal": 2, + "name": "platform_id", + "type_info": "Int8" + }, + { + "ordinal": 3, + "name": "file_id", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Int8Array", + "TimestamptzArray", + "Int8Array", + "Int8Array" + ] + }, + "nullable": [ + false, + false, + false, + false + ] + }, + "hash": "1766e5a8871d12054550bceb42466fa2348409ba9c7a20cf18a7613bdfb0ae89" +} diff --git a/.sqlx/query-6575005fcd566adcd6f630ce06b15cfe718e153af822a2b6dfee60554202face.json b/.sqlx/query-3040ca82a2a0bc2b828df2e89ed28855b02d886a95365c8b1a4a5ccfba1ca398.json similarity index 62% rename from .sqlx/query-6575005fcd566adcd6f630ce06b15cfe718e153af822a2b6dfee60554202face.json rename to .sqlx/query-3040ca82a2a0bc2b828df2e89ed28855b02d886a95365c8b1a4a5ccfba1ca398.json index e2f51df..418c399 100644 --- a/.sqlx/query-6575005fcd566adcd6f630ce06b15cfe718e153af822a2b6dfee60554202face.json +++ b/.sqlx/query-3040ca82a2a0bc2b828df2e89ed28855b02d886a95365c8b1a4a5ccfba1ca398.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "SELECT * FROM project_versions WHERE TRUE AND ($1::int8[] IS NULL OR array_position($1, id) IS NOT NULL) AND ($2::timestamptz[] IS NULL OR array_position($2, created_at) IS NOT NULL) AND ($3::timestamptz[] IS NULL OR array_position($3, disabled_at) IS NOT NULL) AND ($4::int8[] IS NULL OR array_position($4, project_id) IS NOT NULL) AND ($5::int8[] IS NULL OR array_position($5, platform_id) IS NOT NULL) AND ($6::int8[] IS NULL OR array_position($6, file_id) IS NOT NULL)", + "query": "SELECT * FROM project_runners WHERE TRUE AND ($1::int8[] IS NULL OR array_position($1, id) IS NOT NULL) AND ($2::timestamptz[] IS NULL OR array_position($2, created_at) IS NOT NULL) AND ($3::timestamptz[] IS NULL OR array_position($3, disabled_at) IS NOT NULL) AND ($4::int8[] IS NULL OR array_position($4, project_id) IS NOT NULL) AND ($5::int8[] IS NULL OR array_position($5, platform_id) IS NOT NULL) AND ($6::int8[] IS NULL OR array_position($6, file_id) IS NOT NULL)", "describe": { "columns": [ { @@ -53,5 +53,5 @@ false ] }, - "hash": "6575005fcd566adcd6f630ce06b15cfe718e153af822a2b6dfee60554202face" + "hash": "3040ca82a2a0bc2b828df2e89ed28855b02d886a95365c8b1a4a5ccfba1ca398" } diff --git a/.sqlx/query-5f568cb0d5f215c590f1ac4881a22bcbfa228d5f655c6f9cc783119e706a9206.json b/.sqlx/query-5f568cb0d5f215c590f1ac4881a22bcbfa228d5f655c6f9cc783119e706a9206.json new file mode 100644 index 0000000..c18ca48 --- /dev/null +++ b/.sqlx/query-5f568cb0d5f215c590f1ac4881a22bcbfa228d5f655c6f9cc783119e706a9206.json @@ -0,0 +1,23 @@ +{ + "db_name": "PostgreSQL", + "query": "INSERT INTO platform_runners (platform_id, file_id) VALUES ($1, $2) RETURNING id \"id: _\"", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id: _", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Int8", + "Int8" + ] + }, + "nullable": [ + false + ] + }, + "hash": "5f568cb0d5f215c590f1ac4881a22bcbfa228d5f655c6f9cc783119e706a9206" +} diff --git a/.sqlx/query-6bdc70e2c27f350648875584a1f1ca9164eb74493a155561b47263274d2b98f9.json b/.sqlx/query-6bdc70e2c27f350648875584a1f1ca9164eb74493a155561b47263274d2b98f9.json new file mode 100644 index 0000000..aa8b3bb --- /dev/null +++ b/.sqlx/query-6bdc70e2c27f350648875584a1f1ca9164eb74493a155561b47263274d2b98f9.json @@ -0,0 +1,40 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT * FROM platform_runners WHERE id = $1", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Int8" + }, + { + "ordinal": 1, + "name": "created_at", + "type_info": "Timestamptz" + }, + { + "ordinal": 2, + "name": "platform_id", + "type_info": "Int8" + }, + { + "ordinal": 3, + "name": "file_id", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [ + false, + false, + false, + false + ] + }, + "hash": "6bdc70e2c27f350648875584a1f1ca9164eb74493a155561b47263274d2b98f9" +} diff --git a/.sqlx/query-59e05c1bca439b7fb68f8e262510ebb042fb163aa3b1de13e1749687f70e4be8.json b/.sqlx/query-781c42f7107328996a9eb4a88d8f9bacbea9998f02d2486c9365efc3a37a1cbb.json similarity index 60% rename from .sqlx/query-59e05c1bca439b7fb68f8e262510ebb042fb163aa3b1de13e1749687f70e4be8.json rename to .sqlx/query-781c42f7107328996a9eb4a88d8f9bacbea9998f02d2486c9365efc3a37a1cbb.json index a76923b..63f4037 100644 --- a/.sqlx/query-59e05c1bca439b7fb68f8e262510ebb042fb163aa3b1de13e1749687f70e4be8.json +++ b/.sqlx/query-781c42f7107328996a9eb4a88d8f9bacbea9998f02d2486c9365efc3a37a1cbb.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "INSERT INTO project_versions (project_id, platform_id, file_id) VALUES ($1, $2, $3) RETURNING id \"id: _\"", + "query": "INSERT INTO project_runners (project_id, platform_id, file_id) VALUES ($1, $2, $3) RETURNING id \"id: _\"", "describe": { "columns": [ { @@ -20,5 +20,5 @@ false ] }, - "hash": "59e05c1bca439b7fb68f8e262510ebb042fb163aa3b1de13e1749687f70e4be8" + "hash": "781c42f7107328996a9eb4a88d8f9bacbea9998f02d2486c9365efc3a37a1cbb" } diff --git a/.sqlx/query-40fa22a7baa364bba68d97c54eb1e8c4fadebc4a10f66ad0682469cacab60ff5.json b/.sqlx/query-a406c9c198acece62a210855dcc73ba6e084336fa9831babc36cd551e71288fd.json similarity index 85% rename from .sqlx/query-40fa22a7baa364bba68d97c54eb1e8c4fadebc4a10f66ad0682469cacab60ff5.json rename to .sqlx/query-a406c9c198acece62a210855dcc73ba6e084336fa9831babc36cd551e71288fd.json index a7d616c..e7b8c42 100644 --- a/.sqlx/query-40fa22a7baa364bba68d97c54eb1e8c4fadebc4a10f66ad0682469cacab60ff5.json +++ b/.sqlx/query-a406c9c198acece62a210855dcc73ba6e084336fa9831babc36cd551e71288fd.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "SELECT * FROM project_versions WHERE id = ANY($1)", + "query": "SELECT * FROM project_runners WHERE id = ANY($1)", "describe": { "columns": [ { @@ -48,5 +48,5 @@ false ] }, - "hash": "40fa22a7baa364bba68d97c54eb1e8c4fadebc4a10f66ad0682469cacab60ff5" + "hash": "a406c9c198acece62a210855dcc73ba6e084336fa9831babc36cd551e71288fd" } diff --git a/.sqlx/query-b2922b902d1e423d8928526ef7f150b5b5c7755c25c8e0dd2a759692d98aac1d.json b/.sqlx/query-b2922b902d1e423d8928526ef7f150b5b5c7755c25c8e0dd2a759692d98aac1d.json new file mode 100644 index 0000000..1d0f6a6 --- /dev/null +++ b/.sqlx/query-b2922b902d1e423d8928526ef7f150b5b5c7755c25c8e0dd2a759692d98aac1d.json @@ -0,0 +1,40 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT * FROM platform_runners WHERE id = ANY($1)", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Int8" + }, + { + "ordinal": 1, + "name": "created_at", + "type_info": "Timestamptz" + }, + { + "ordinal": 2, + "name": "platform_id", + "type_info": "Int8" + }, + { + "ordinal": 3, + "name": "file_id", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Int8Array" + ] + }, + "nullable": [ + false, + false, + false, + false + ] + }, + "hash": "b2922b902d1e423d8928526ef7f150b5b5c7755c25c8e0dd2a759692d98aac1d" +} diff --git a/.sqlx/query-60d06b4ddeb3f621f6da75d1ad4f37cb37aa552d2cf4e9f3ec8365afd2555e2a.json b/.sqlx/query-d04ebea2c51ce254c8e49fad15683d85842fe0e1fbef0c0fc8b63a3964442538.json similarity index 85% rename from .sqlx/query-60d06b4ddeb3f621f6da75d1ad4f37cb37aa552d2cf4e9f3ec8365afd2555e2a.json rename to .sqlx/query-d04ebea2c51ce254c8e49fad15683d85842fe0e1fbef0c0fc8b63a3964442538.json index 1b32efe..6a397dc 100644 --- a/.sqlx/query-60d06b4ddeb3f621f6da75d1ad4f37cb37aa552d2cf4e9f3ec8365afd2555e2a.json +++ b/.sqlx/query-d04ebea2c51ce254c8e49fad15683d85842fe0e1fbef0c0fc8b63a3964442538.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "SELECT * FROM project_versions WHERE id = $1", + "query": "SELECT * FROM project_runners WHERE id = $1", "describe": { "columns": [ { @@ -48,5 +48,5 @@ false ] }, - "hash": "60d06b4ddeb3f621f6da75d1ad4f37cb37aa552d2cf4e9f3ec8365afd2555e2a" + "hash": "d04ebea2c51ce254c8e49fad15683d85842fe0e1fbef0c0fc8b63a3964442538" } diff --git a/Cargo.lock b/Cargo.lock index 29989c7..bdea941 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -384,14 +384,12 @@ dependencies = [ "clusterizer-api", "clusterizer-client", "clusterizer-common", - "clusterizer-util", "dirs", "reqwest", "tempfile", "tokio", "tracing", "tracing-subscriber", - "zip", ] [[package]] @@ -399,9 +397,13 @@ name = "clusterizer-client" version = "0.1.0" dependencies = [ "clusterizer-api", + "clusterizer-common", + "clusterizer-util", "reqwest", + "tempfile", "thiserror 2.0.18", "tokio", + "tracing", "zip", ] diff --git a/cli/Cargo.toml b/cli/Cargo.toml index 5b0543b..2491b3d 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -8,11 +8,9 @@ clap = { version = "4.6.0", features = ["derive", "string"] } clusterizer-api = { version = "0.1.0", path = "../api" } clusterizer-client = { version = "0.1.0", path = "../client" } clusterizer-common = { version = "0.1.0", path = "../common" } -clusterizer-util = { version = "0.1.0", path = "../util" } dirs = "6.0.0" reqwest = { version = "0.13.2" } tempfile = "3.27.0" tokio = { version = "1.50.0", features = ["full"] } tracing = "0.1.44" tracing-subscriber = "0.3.23" -zip = "8.5.0" diff --git a/cli/src/args.rs b/cli/src/args.rs index 50d54d9..003d036 100644 --- a/cli/src/args.rs +++ b/cli/src/args.rs @@ -42,16 +42,6 @@ pub struct RunArgs { pub queue: usize, } -impl RunArgs { - pub fn binaries_dir(&self) -> PathBuf { - self.cache_dir.join("bin") - } - - pub fn temp_dir(&self) -> PathBuf { - self.cache_dir.join("tmp") - } -} - fn cache_dir() -> Resettable { dirs::cache_dir() .map(|path| path.join("clusterizer").into_os_string().into()) diff --git a/cli/src/client.rs b/cli/src/client.rs index a31e727..7166d44 100644 --- a/cli/src/client.rs +++ b/cli/src/client.rs @@ -1,45 +1,24 @@ use std::{ - collections::{HashMap, VecDeque}, - env, - ffi::OsString, - fs, - io::{Cursor, ErrorKind}, - iter::{self, Empty}, - path::PathBuf, + collections::VecDeque, process::{Output, Stdio}, sync::Arc, time::Duration, }; use clusterizer_api::{client::ApiClient, result::ApiError}; -use clusterizer_client::result::ClientResult; +use clusterizer_client::{TaskInfo, result::ClientResult, supported_platforms::SupportedPlatforms}; use clusterizer_common::{ - errors::SubmitResultError, - records::{ - File, FileFilter, Platform, PlatformFilter, Project, ProjectFilter, ProjectVersion, - ProjectVersionFilter, Task, - }, - requests::{FetchTasksRequest, SubmitResultRequest}, - types::Id, + errors::SubmitResultError, records::Task, requests::SubmitResultRequest, types::Id, }; -use clusterizer_util::Hex; -use tokio::{io::AsyncWriteExt, process::Command, task::JoinSet, time}; -use tracing::{debug, info, warn}; -use zip::ZipArchive; +use tokio::{io::AsyncWriteExt, task::JoinSet, time}; +use tracing::{debug, info}; use crate::args::RunArgs; struct ClusterizerClient { client: ApiClient, args: RunArgs, - platform_ids: Vec>, -} - -struct TaskInfo { - task: Task, - project: Project, - project_version: ProjectVersion, - file: File, + supported_platforms: SupportedPlatforms, } enum Return { @@ -93,110 +72,43 @@ impl ClusterizerClient { } async fn fetch_tasks(self: Arc) -> ClientResult { - let tasks = loop { - let project_versions_by_project_id: HashMap<_, _> = self - .client - .get(&ProjectVersionFilter::default().disabled_at(vec![None])) - .await? - .into_iter() - .filter(|project_version| self.platform_ids.contains(&project_version.platform_id)) - .map(|project_version| (project_version.project_id, project_version)) - .collect(); - - let projects_by_project_id: HashMap<_, _> = self - .client - .get(&ProjectFilter::default().disabled_at(vec![None])) - .await? - .into_iter() - .filter(|project| project_versions_by_project_id.contains_key(&project.id)) - .map(|project| (project.id, project)) - .collect(); - - let files_by_file_id: HashMap<_, _> = self - .client - .get(&FileFilter::default()) - .await? - .into_iter() - .map(|file| (file.id, file)) - .collect(); - - let get_task_info = |task: &Task| { - let project = projects_by_project_id.get(&task.project_id)?; - let project_version = project_versions_by_project_id.get(&task.project_id)?; - let file = files_by_file_id.get(&project_version.file_id)?; - - Some(TaskInfo { - task: task.clone(), - file: file.clone(), - project: project.clone(), - project_version: project_version.clone(), - }) - }; - - let tasks: Vec<_> = self - .client - .fetch_tasks(&FetchTasksRequest { - project_ids: projects_by_project_id.keys().copied().collect(), - limit: self.args.threads, - }) - .await? - .into_iter() - .filter_map(|task| { - let info = get_task_info(&task); - - if info.is_none() { - warn!("Unwanted task received from server."); - } - - info - }) - .collect(); + loop { + let tasks = clusterizer_client::fetch_tasks( + &self.args.cache_dir, + &self.client, + &self.supported_platforms, + self.args.threads, + ) + .await?; if !tasks.is_empty() { - break tasks; + info!("Fetched {} tasks.", tasks.len()); + break Ok(Return::FetchTasks(tasks)); } info!("No tasks found. Sleeping before attempting again."); time::sleep(Duration::from_secs(15)).await; - }; - - for TaskInfo { file, .. } in &tasks { - download_archive(file, &self.args).await?; } - - Ok(Return::FetchTasks(tasks)) } async fn execute_task( self: Arc, TaskInfo { task, - project_version, - project, - file, + file_path, + platform_id, }: TaskInfo, ) -> ClientResult { let slot_dir = tempfile::tempdir()?; - info!("Task id: {}, stdin: {}", task.id, task.stdin); - info!( - "Project id: {}, Project name: {}", - task.project_id, project.name - ); - debug!("Platform id: {}", project_version.platform_id); + info!("Task id: {}", task.id); + debug!("Project id: {}", task.project_id); + debug!("Platform id: {}", platform_id); debug!("Slot dir: {}", slot_dir.path().display()); - let program = self - .args - .binaries_dir() - .join(format!("{}", Hex(&file.hash))) - .join(format!("main{}", env::consts::EXE_SUFFIX)) - .canonicalize()?; - - let args: Empty = iter::empty(); - - let mut child = Command::new(program) - .args(args) + let mut child = self + .supported_platforms + .get_command(&file_path, platform_id) .current_dir(&slot_dir) .stdin(Stdio::piped()) .stdout(Stdio::piped()) @@ -235,81 +147,13 @@ impl ClusterizerClient { } pub async fn run(client: ApiClient, args: RunArgs) -> ClientResult<()> { - fs::create_dir_all(args.binaries_dir())?; - fs::create_dir_all(args.temp_dir())?; - - let mut platform_ids = Vec::new(); - let mut platform_names = Vec::new(); - - for platform in client.get(&PlatformFilter::default()).await? { - let file = client.get(&platform.file_id).await?; - - debug!( - "Platform id: {}, tester archive url: {}", - platform.id, file.url - ); - - let platform_tester_dir = download_archive(&file, &args).await?; - - let slot_dir = tempfile::tempdir()?; - - debug!("Slot dir: {}", slot_dir.path().display()); - - let program = match platform_tester_dir - .join(format!("main{}", env::consts::EXE_SUFFIX)) - .canonicalize() - { - Err(err) if err.kind() == ErrorKind::NotFound => continue, - result => result, - }?; - - let args: Empty = iter::empty(); - - let status = Command::new(program) - .args(args) - .current_dir(&slot_dir) - .stdin(Stdio::null()) - .stdout(Stdio::null()) - .stderr(Stdio::null()) - .status() - .await?; - - if status.success() { - platform_ids.push(platform.id); - platform_names.push(platform.name); - } - } - - info!("Supported platforms: {}", platform_names.join(", ")); + let supported_platforms = SupportedPlatforms::detect(&args.cache_dir, &client).await?; Arc::new(ClusterizerClient { client, args, - platform_ids, + supported_platforms, }) .run() .await } - -async fn download_archive(file: &File, args: &RunArgs) -> ClientResult { - let dir = args.binaries_dir().join(format!("{}", Hex(&file.hash))); - - if dir.exists() { - debug!("Archive {} was cached.", dir.display()); - } else { - debug!("Archive {} is not cached.", dir.display()); - - let bytes = reqwest::get(&file.url) - .await? - .error_for_status()? - .bytes() - .await?; - - let extract_dir = tempfile::tempdir_in(args.temp_dir())?; - - ZipArchive::new(Cursor::new(bytes))?.extract(&extract_dir)?; - fs::rename(&extract_dir, &dir)?; - } - - Ok(dir) -} diff --git a/cli/src/main.rs b/cli/src/main.rs index e0b5599..acd97f4 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -1,3 +1,5 @@ +use std::fs; + use args::{ClusterizerArgs, Commands}; use clap::Parser; use clusterizer_api::client::ApiClient; @@ -31,7 +33,12 @@ async fn run() -> ClientResult<()> { println!("{}", response.api_key); } - Commands::Run(args) => client::run(client, args).await?, + Commands::Run(mut args) => { + fs::create_dir_all(&args.cache_dir)?; + args.cache_dir = args.cache_dir.canonicalize()?; + + client::run(client, args).await? + } } Ok(()) diff --git a/client/Cargo.toml b/client/Cargo.toml index f4c3912..accc06d 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -5,7 +5,11 @@ edition = "2024" [dependencies] clusterizer-api = { version = "0.1.0", path = "../api" } +clusterizer-common = { version = "0.1.0", path = "../common" } +clusterizer-util = { version = "0.1.0", path = "../util" } reqwest = { version = "0.13.2" } +tempfile = "3.27.0" thiserror = "2.0.18" tokio = { version = "1.50.0", features = ["full"] } +tracing = "0.1.44" zip = "8.5.0" diff --git a/client/src/lib.rs b/client/src/lib.rs index d4cfe2f..5ed4452 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -1 +1,147 @@ pub mod result; +pub mod supported_platforms; + +use std::{ + collections::HashMap, + fs, + io::Cursor, + path::{Path, PathBuf}, +}; + +use clusterizer_api::client::ApiClient; +use clusterizer_common::{ + records::{File, FileFilter, Platform, ProjectRunnerFilter, Task}, + requests::FetchTasksRequest, + types::Id, +}; +use clusterizer_util::Hex; +use tokio::task::{self, JoinHandle}; +use tracing::{debug, info, warn}; +use zip::ZipArchive; + +use crate::{result::ClientResult, supported_platforms::SupportedPlatforms}; + +#[derive(Debug)] +pub struct TaskInfo { + pub task: Task, + pub file_path: PathBuf, + pub platform_id: Id, +} + +pub async fn fetch_tasks( + cache_dir: &Path, + client: &ApiClient, + supported_platforms: &SupportedPlatforms, + limit: usize, +) -> ClientResult> { + let project_runners: HashMap<_, _> = client + .get( + &ProjectRunnerFilter::default() + .disabled_at(vec![None]) + .platform_id(supported_platforms.platform_ids().collect::>()), + ) + .await? + .into_iter() + .map(|project_runner| (project_runner.project_id, project_runner)) + .collect(); + + let tasks: Vec<_> = client + .fetch_tasks(&FetchTasksRequest { + project_ids: project_runners.keys().copied().collect(), + limit, + }) + .await? + .into_iter() + .filter_map(|task| { + let Some(project_runner) = project_runners.get(&task.project_id) else { + warn!( + "Unwanted task ({}) for project ({})", + task.id, task.project_id + ); + + return None; + }; + + Some((task, project_runner)) + }) + .collect(); + + let file_ids: Vec<_> = tasks + .iter() + .map(|(_, project_runner)| project_runner.file_id) + .collect(); + + let files = download_archives(cache_dir, client, file_ids).await?; + + Ok(tasks + .into_iter() + .filter_map(|(task, project_runner)| { + let Some(file_path) = files.get(&project_runner.file_id) else { + warn!( + "Missing file ({}) for project runner ({})", + project_runner.file_id, project_runner.id + ); + + return None; + }; + + Some(TaskInfo { + task, + file_path: file_path.clone(), + platform_id: project_runner.platform_id, + }) + }) + .collect()) +} + +pub async fn download_archives( + cache_dir: &Path, + client: &ApiClient, + file_ids: Vec>, +) -> ClientResult, PathBuf>> { + let binaries_dir = cache_dir.join("bin"); + let temp_dir = cache_dir.join("tmp"); + + fs::create_dir_all(&binaries_dir)?; + fs::create_dir_all(&temp_dir)?; + + let tasks: Vec<_> = client + .get(&FileFilter::default().id(file_ids)) + .await? + .into_iter() + .map(|file| -> JoinHandle> { + let dir = binaries_dir.join(format!("{}", Hex(&file.hash))); + let temp_dir = temp_dir.clone(); + + task::spawn(async move { + if !dir.exists() { + info!("Downloading archive {}", file.url); + + let bytes = reqwest::get(&file.url) + .await? + .error_for_status()? + .bytes() + .await?; + + let extract_dir = tempfile::tempdir_in(temp_dir)?; + + ZipArchive::new(Cursor::new(bytes))?.extract(&extract_dir)?; + fs::rename(&extract_dir, &dir)?; + } else { + debug!("Archive {} was cached", file.url); + } + + Ok((file.id, dir)) + }) + }) + .collect(); + + let mut files = HashMap::new(); + + for task in tasks { + let (file_id, dir) = task.await??; + files.insert(file_id, dir); + } + + Ok(files) +} diff --git a/client/src/supported_platforms.rs b/client/src/supported_platforms.rs new file mode 100644 index 0000000..c4cf141 --- /dev/null +++ b/client/src/supported_platforms.rs @@ -0,0 +1,174 @@ +use std::{ + collections::HashMap, + env, + path::{Path, PathBuf}, + process::Stdio, +}; + +use clusterizer_api::client::ApiClient; +use clusterizer_common::{ + records::{Platform, PlatformFilter, PlatformRunnerFilter}, + types::Id, +}; +use tokio::{ + process::Command, + task::{self, JoinHandle}, +}; +use tracing::{info, warn}; + +use crate::result::ClientResult; + +#[derive(Debug, Clone)] +enum PlatformStrategy { + Native, + Wrapper { + file_path: PathBuf, + platform_id: Id, + }, +} + +#[derive(Debug, Clone)] +pub struct SupportedPlatforms { + platform_strategies: HashMap, PlatformStrategy>, +} + +impl SupportedPlatforms { + fn new() -> Self { + Self { + platform_strategies: HashMap::new(), + } + } + + fn insert(&mut self, platform_id: Id, strategy: PlatformStrategy) { + self.platform_strategies.insert(platform_id, strategy); + } + + pub fn platform_ids(&self) -> impl Iterator> { + self.platform_strategies.keys().copied() + } + + pub fn get_command(&self, file_path: &Path, platform_id: Id) -> Command { + self.get_command_with_strategy(file_path, &self.platform_strategies[&platform_id]) + } + + fn get_command_with_strategy(&self, file_path: &Path, strategy: &PlatformStrategy) -> Command { + match strategy { + PlatformStrategy::Native => { + Command::new(file_path.join(format!("main{}", env::consts::EXE_SUFFIX))) + } + PlatformStrategy::Wrapper { + file_path: wrapper_file_path, + platform_id: wrapper_platform_id, + } => { + let mut command = self.get_command(wrapper_file_path, *wrapper_platform_id); + command.arg(file_path); + command + } + } + } + + pub async fn detect(cache_dir: &Path, client: &ApiClient) -> ClientResult { + let mut strategy_queue = vec![PlatformStrategy::Native]; + let mut supported_platforms = Self::new(); + let mut platforms = client.get(&PlatformFilter::default()).await?; + + let files = crate::download_archives( + cache_dir, + client, + platforms.iter().map(|platform| platform.file_id).collect(), + ) + .await?; + + while !strategy_queue.is_empty() { + let tasks: Vec<_> = platforms + .drain(..) + .map(|platform| -> JoinHandle> { + let strategy_queue = strategy_queue.clone(); + let supported_platforms = supported_platforms.clone(); + let files = files.clone(); + + task::spawn(async move { + for strategy in strategy_queue { + let Some(file_path) = files.get(&platform.file_id) else { + warn!( + "Missing file ({}) for platform ({})", + platform.file_id, platform.id + ); + + continue; + }; + + let slot_dir = tempfile::tempdir()?; + + if supported_platforms + .get_command_with_strategy(file_path, &strategy) + .current_dir(&slot_dir) + .stdin(Stdio::null()) + .stdout(Stdio::null()) + .stderr(Stdio::null()) + .status() + .await + .is_ok_and(|status| status.success()) + { + return Ok((platform, Some(strategy))); + } + } + + Ok((platform, None)) + }) + }) + .collect(); + + let mut new_platforms = Vec::new(); + + for task in tasks { + let (platform, strategy) = task.await??; + + if let Some(strategy) = strategy { + info!("Supported platform: {}", platform.name); + supported_platforms.insert(platform.id, strategy); + new_platforms.push(platform.id); + } else { + platforms.push(platform); + } + } + + let new_platform_runners = client + .get(&PlatformRunnerFilter::default().platform_id(new_platforms)) + .await?; + + let mut files = crate::download_archives( + cache_dir, + client, + new_platform_runners + .iter() + .map(|platform_runner| platform_runner.file_id) + .collect(), + ) + .await?; + + strategy_queue.splice( + .., + new_platform_runners + .into_iter() + .filter_map(|platform_runner| { + let Some(file_path) = files.remove(&platform_runner.file_id) else { + warn!( + "Missing file ({}) for platform runner ({})", + platform_runner.file_id, platform_runner.id + ); + + return None; + }; + + Some(PlatformStrategy::Wrapper { + file_path, + platform_id: platform_runner.platform_id, + }) + }), + ); + } + + Ok(supported_platforms) + } +} diff --git a/common/src/records/mod.rs b/common/src/records/mod.rs index 32db6c2..3b7c06f 100644 --- a/common/src/records/mod.rs +++ b/common/src/records/mod.rs @@ -1,8 +1,9 @@ pub mod assignment; pub mod file; pub mod platform; +pub mod platform_runner; pub mod project; -pub mod project_version; +pub mod project_runner; pub mod result; pub mod task; pub mod user; @@ -10,8 +11,9 @@ pub mod user; pub use assignment::{Assignment, AssignmentBuilder, AssignmentFilter}; pub use file::{File, FileBuilder, FileFilter}; pub use platform::{Platform, PlatformBuilder, PlatformFilter}; +pub use platform_runner::{PlatformRunner, PlatformRunnerBuilder, PlatformRunnerFilter}; pub use project::{Project, ProjectBuilder, ProjectFilter}; -pub use project_version::{ProjectVersion, ProjectVersionBuilder, ProjectVersionFilter}; +pub use project_runner::{ProjectRunner, ProjectRunnerBuilder, ProjectRunnerFilter}; pub use result::{Result, ResultBuilder, ResultFilter}; pub use task::{Task, TaskBuilder, TaskFilter}; pub use user::{User, UserBuilder, UserFilter}; diff --git a/common/src/records/platform_runner.rs b/common/src/records/platform_runner.rs new file mode 100644 index 0000000..243a31c --- /dev/null +++ b/common/src/records/platform_runner.rs @@ -0,0 +1,38 @@ +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; + +use crate::{ + records::{File, Platform, record_impl}, + types::Id, +}; + +record_impl! { + PATH = "platform_runners"; + + PlatformRunner { + id: Id, + created_at: DateTime, + platform_id: Id, + file_id: Id, + } + + PlatformRunnerFilter { + "$1::int8[] IS NULL OR array_position($1, id) IS NOT NULL" + id: Vec>, + "$2::timestamptz[] IS NULL OR array_position($2, created_at) IS NOT NULL" + created_at: Vec>, + "$3::int8[] IS NULL OR array_position($3, platform_id) IS NOT NULL" + platform_id: Vec>, + "$4::int8[] IS NULL OR array_position($4, file_id) IS NOT NULL" + file_id: Vec>, + } + + PlatformRunnerBuilder { + "platform_id" "$1" + platform_id: Id, + "file_id" "$2" + file_id: Id, + } + + UpdatePlatformRunner {} +} diff --git a/common/src/records/project_version.rs b/common/src/records/project_runner.rs similarity index 85% rename from common/src/records/project_version.rs rename to common/src/records/project_runner.rs index c407620..7b4909d 100644 --- a/common/src/records/project_version.rs +++ b/common/src/records/project_runner.rs @@ -7,10 +7,10 @@ use crate::{ }; record_impl! { - PATH = "project_versions"; + PATH = "project_runners"; - ProjectVersion { - id: Id, + ProjectRunner { + id: Id, created_at: DateTime, disabled_at: Option>, project_id: Id, @@ -18,9 +18,9 @@ record_impl! { file_id: Id, } - ProjectVersionFilter { + ProjectRunnerFilter { "$1::int8[] IS NULL OR array_position($1, id) IS NOT NULL" - id: Vec>, + id: Vec>, "$2::timestamptz[] IS NULL OR array_position($2, created_at) IS NOT NULL" created_at: Vec>, "$3::timestamptz[] IS NULL OR array_position($3, disabled_at) IS NOT NULL" @@ -33,7 +33,7 @@ record_impl! { file_id: Vec>, } - ProjectVersionBuilder { + ProjectRunnerBuilder { "project_id" "$1" project_id: Id, "platform_id" "$2" @@ -42,5 +42,5 @@ record_impl! { file_id: Id, } - UpdateProjectVersion {} + UpdateProjectRunner {} } diff --git a/server/migrations/20250426220809_init.sql b/server/migrations/20250426220809_init.sql index 8da7fb6..fa267ef 100644 --- a/server/migrations/20250426220809_init.sql +++ b/server/migrations/20250426220809_init.sql @@ -30,7 +30,7 @@ CREATE TABLE platforms ( file_id int8 NOT NULL REFERENCES files(id) ON DELETE RESTRICT ON UPDATE RESTRICT ); -CREATE TABLE project_versions ( +CREATE TABLE project_runners ( id int8 GENERATED ALWAYS AS IDENTITY NOT NULL PRIMARY KEY, created_at timestamptz NOT NULL DEFAULT now(), disabled_at timestamptz, @@ -39,6 +39,13 @@ CREATE TABLE project_versions ( file_id int8 NOT NULL REFERENCES files(id) ON DELETE RESTRICT ON UPDATE RESTRICT ); +CREATE TABLE platform_runners ( + id int8 GENERATED ALWAYS AS IDENTITY NOT NULL PRIMARY KEY, + created_at timestamptz NOT NULL DEFAULT now(), + platform_id int8 NOT NULL REFERENCES platforms(id) ON DELETE RESTRICT ON UPDATE RESTRICT, + file_id int8 NOT NULL REFERENCES files(id) ON DELETE RESTRICT ON UPDATE RESTRICT +); + CREATE TABLE tasks ( id int8 GENERATED ALWAYS AS IDENTITY NOT NULL PRIMARY KEY, created_at timestamptz NOT NULL DEFAULT now(), diff --git a/server/src/main.rs b/server/src/main.rs index 0e51f0d..3614ddb 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -10,7 +10,8 @@ use axum::{ }; use clusterizer_common::{ records::{ - Assignment, File, Platform, Project, ProjectVersion, Record, Result, Select, Task, User, + Assignment, File, Platform, PlatformRunner, Project, ProjectRunner, Record, Result, Select, + Task, User, }, types::Id, }; @@ -46,7 +47,8 @@ async fn serve_task(state: AppState, address: String) { .merge(record_router::()) .merge(record_router::()) .merge(record_router::()) - .merge(record_router::()) + .merge(record_router::()) + .merge(record_router::()) .merge(record_router::()) .merge(record_router::()) .merge(record_router::())