diff --git a/.docker.env.sample b/.docker.env.sample index 743cc0613..59577253f 100644 --- a/.docker.env.sample +++ b/.docker.env.sample @@ -10,3 +10,8 @@ DOCSRS_TOOLCHAIN=nightly # for the registry watcher, automatically queued reqbuidls. DOCSRS_MAX_QUEUED_REBUILDS: 10 +# optional overrides for local ElasticMQ testing +# DOCSRS_SQS_QUEUE_URL=http://elasticmq:9324/queue/docsrs-events +# DOCSRS_SQS_REGION=elasticmq +# DOCSRS_SQS_ENDPOINT_URL=http://elasticmq:9324 +# DOCSRS_SQS_ACTIVE=false diff --git a/.sqlx/query-2dc065cc08f262c937c54f9cc8629e35750da2bea995fb0c433893addb253214.json b/.sqlx/query-2dc065cc08f262c937c54f9cc8629e35750da2bea995fb0c433893addb253214.json deleted file mode 100644 index 2109f69cf..000000000 --- a/.sqlx/query-2dc065cc08f262c937c54f9cc8629e35750da2bea995fb0c433893addb253214.json +++ /dev/null @@ -1,15 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "DELETE FROM builds_logs bl\n USING builds b\n JOIN releases r ON b.rid = r.id\n WHERE bl.build_id = b.id AND r.crate_id = $1 AND r.version = $2;", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Int4", - "Text" - ] - }, - "nullable": [] - }, - "hash": "2dc065cc08f262c937c54f9cc8629e35750da2bea995fb0c433893addb253214" -} diff --git a/.sqlx/query-5f5fa0e89b4e13c690b1648a18e8420f7da0f0445c6e43d5d64617226c24fba1.json b/.sqlx/query-5f5fa0e89b4e13c690b1648a18e8420f7da0f0445c6e43d5d64617226c24fba1.json new file mode 100644 index 000000000..5d451984e --- /dev/null +++ b/.sqlx/query-5f5fa0e89b4e13c690b1648a18e8420f7da0f0445c6e43d5d64617226c24fba1.json @@ -0,0 +1,26 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT id FROM releases", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Int4", + "origin": { + "Table": { + "table": "releases", + "name": "id" + } + } + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + false + ] + }, + "hash": "5f5fa0e89b4e13c690b1648a18e8420f7da0f0445c6e43d5d64617226c24fba1" +} diff --git a/.sqlx/query-66b0ba6978880b79ce7a179bbe986e6c7eed78a2d4f01f316772949f5d688f42.json b/.sqlx/query-66b0ba6978880b79ce7a179bbe986e6c7eed78a2d4f01f316772949f5d688f42.json new file mode 100644 index 000000000..89cbc239c --- /dev/null +++ b/.sqlx/query-66b0ba6978880b79ce7a179bbe986e6c7eed78a2d4f01f316772949f5d688f42.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "DELETE FROM builds_logs bl\n USING builds b\n WHERE bl.build_id = b.id AND b.rid = $1;", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int4" + ] + }, + "nullable": [] + }, + "hash": "66b0ba6978880b79ce7a179bbe986e6c7eed78a2d4f01f316772949f5d688f42" +} diff --git a/.sqlx/query-7b7dd5795cddcb66b140b57157983bd73f73ecc1cf9b4fc24c457d5f26fd582b.json b/.sqlx/query-7b7dd5795cddcb66b140b57157983bd73f73ecc1cf9b4fc24c457d5f26fd582b.json new file mode 100644 index 000000000..effb9f9ac --- /dev/null +++ b/.sqlx/query-7b7dd5795cddcb66b140b57157983bd73f73ecc1cf9b4fc24c457d5f26fd582b.json @@ -0,0 +1,29 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT id FROM releases WHERE crate_id = $1 AND version = $2", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Int4", + "origin": { + "Table": { + "table": "releases", + "name": "id" + } + } + } + ], + "parameters": { + "Left": [ + "Int4", + "Text" + ] + }, + "nullable": [ + false + ] + }, + "hash": "7b7dd5795cddcb66b140b57157983bd73f73ecc1cf9b4fc24c457d5f26fd582b" +} diff --git a/.sqlx/query-014a054d852f0937191e1a54f742d4b4c454361689fb3841cc12fd7dd1094948.json b/.sqlx/query-fab139cabc0987a1f2ad706060a3f4254924db75fc7f76a6c78d17a3fc06d663.json similarity index 65% rename from .sqlx/query-014a054d852f0937191e1a54f742d4b4c454361689fb3841cc12fd7dd1094948.json rename to .sqlx/query-fab139cabc0987a1f2ad706060a3f4254924db75fc7f76a6c78d17a3fc06d663.json index 380bd9ea6..1f16abd55 100644 --- a/.sqlx/query-014a054d852f0937191e1a54f742d4b4c454361689fb3841cc12fd7dd1094948.json +++ b/.sqlx/query-fab139cabc0987a1f2ad706060a3f4254924db75fc7f76a6c78d17a3fc06d663.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "DELETE FROM releases WHERE crate_id = $1 AND version = $2 RETURNING is_library", + "query": "DELETE FROM releases WHERE id = $1 RETURNING is_library", "describe": { "columns": [ { @@ -17,13 +17,12 @@ ], "parameters": { "Left": [ - "Int4", - "Text" + "Int4" ] }, "nullable": [ true ] }, - "hash": "014a054d852f0937191e1a54f742d4b4c454361689fb3841cc12fd7dd1094948" + "hash": "fab139cabc0987a1f2ad706060a3f4254924db75fc7f76a6c78d17a3fc06d663" } diff --git a/Cargo.lock b/Cargo.lock index 0810f40d8..db727b068 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -599,6 +599,31 @@ dependencies = [ "url", ] +[[package]] +name = "aws-sdk-sqs" +version = "1.102.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0246bf049cfc003ce44599dff955b9353758de3afa68a053da9b2c7de20a07d8" +dependencies = [ + "arc-swap", + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-observability", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "fastrand", + "http 0.2.12", + "http 1.4.2", + "regex-lite", + "tracing", +] + [[package]] name = "aws-sdk-sts" version = "1.107.0" @@ -1235,8 +1260,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1aa79e62e7697b8e29b513a68abacf485adcd1fe8284a4316c5ae868e6633327" dependencies = [ "iana-time-zone", + "js-sys", "num-traits", "serde", + "wasm-bindgen", "windows-link", ] @@ -2108,6 +2135,15 @@ dependencies = [ "tokio", ] +[[package]] +name = "docs_rs_crates_io" +version = "0.1.0" +dependencies = [ + "chrono", + "serde", + "serde_json", +] + [[package]] name = "docs_rs_database" version = "0.0.0" @@ -2447,12 +2483,16 @@ name = "docs_rs_watcher" version = "0.6.0" dependencies = [ "anyhow", + "aws-config", + "aws-sdk-sqs", + "chrono", "clap", "crates-index", "crates-index-diff", "docs_rs_build_queue", "docs_rs_config", "docs_rs_context", + "docs_rs_crates_io", "docs_rs_database", "docs_rs_env_vars", "docs_rs_fastly", @@ -2469,10 +2509,12 @@ dependencies = [ "opentelemetry", "pretty_assertions", "rayon", + "serde_json", "sqlx", "test-case", "tokio", "tracing", + "url", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 5ac02140f..dd6c3544e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,6 +30,11 @@ edition = "2024" anyhow = { version = "1.0.42", features = ["backtrace"] } askama = "0.16.0" async-stream = "0.3.5" +# The default `rustls` feature pulls in the legacy hyper 0.14 + rustls 0.21 +# stack via `aws-smithy-runtime/tls-rustls`, which includes the vulnerable +# `rustls-webpki` v0.101.x. Using only `default-https-client` avoids this by +# using the modern rustls 0.23 + hyper 1.x stack instead. +aws-config = { version = "1.0.0", default-features = false, features = ["default-https-client", "rt-tokio"] } axum-extra = { version = "0.12.0", features = ["middleware", "routing", "typed-header"] } base64 = "0.22" bon = { version = "3.8.1", features = ["experimental-overwritable"] } diff --git a/crates/bin/cratesfyi/.sqlx/query-2dc065cc08f262c937c54f9cc8629e35750da2bea995fb0c433893addb253214.json b/crates/bin/cratesfyi/.sqlx/query-2dc065cc08f262c937c54f9cc8629e35750da2bea995fb0c433893addb253214.json deleted file mode 100644 index 2109f69cf..000000000 --- a/crates/bin/cratesfyi/.sqlx/query-2dc065cc08f262c937c54f9cc8629e35750da2bea995fb0c433893addb253214.json +++ /dev/null @@ -1,15 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "DELETE FROM builds_logs bl\n USING builds b\n JOIN releases r ON b.rid = r.id\n WHERE bl.build_id = b.id AND r.crate_id = $1 AND r.version = $2;", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Int4", - "Text" - ] - }, - "nullable": [] - }, - "hash": "2dc065cc08f262c937c54f9cc8629e35750da2bea995fb0c433893addb253214" -} diff --git a/crates/bin/cratesfyi/.sqlx/query-66b0ba6978880b79ce7a179bbe986e6c7eed78a2d4f01f316772949f5d688f42.json b/crates/bin/cratesfyi/.sqlx/query-66b0ba6978880b79ce7a179bbe986e6c7eed78a2d4f01f316772949f5d688f42.json new file mode 100644 index 000000000..89cbc239c --- /dev/null +++ b/crates/bin/cratesfyi/.sqlx/query-66b0ba6978880b79ce7a179bbe986e6c7eed78a2d4f01f316772949f5d688f42.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "DELETE FROM builds_logs bl\n USING builds b\n WHERE bl.build_id = b.id AND b.rid = $1;", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int4" + ] + }, + "nullable": [] + }, + "hash": "66b0ba6978880b79ce7a179bbe986e6c7eed78a2d4f01f316772949f5d688f42" +} diff --git a/crates/bin/cratesfyi/.sqlx/query-7b7dd5795cddcb66b140b57157983bd73f73ecc1cf9b4fc24c457d5f26fd582b.json b/crates/bin/cratesfyi/.sqlx/query-7b7dd5795cddcb66b140b57157983bd73f73ecc1cf9b4fc24c457d5f26fd582b.json new file mode 100644 index 000000000..effb9f9ac --- /dev/null +++ b/crates/bin/cratesfyi/.sqlx/query-7b7dd5795cddcb66b140b57157983bd73f73ecc1cf9b4fc24c457d5f26fd582b.json @@ -0,0 +1,29 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT id FROM releases WHERE crate_id = $1 AND version = $2", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Int4", + "origin": { + "Table": { + "table": "releases", + "name": "id" + } + } + } + ], + "parameters": { + "Left": [ + "Int4", + "Text" + ] + }, + "nullable": [ + false + ] + }, + "hash": "7b7dd5795cddcb66b140b57157983bd73f73ecc1cf9b4fc24c457d5f26fd582b" +} diff --git a/crates/bin/docs_rs_watcher/.sqlx/query-014a054d852f0937191e1a54f742d4b4c454361689fb3841cc12fd7dd1094948.json b/crates/bin/cratesfyi/.sqlx/query-fab139cabc0987a1f2ad706060a3f4254924db75fc7f76a6c78d17a3fc06d663.json similarity index 65% rename from crates/bin/docs_rs_watcher/.sqlx/query-014a054d852f0937191e1a54f742d4b4c454361689fb3841cc12fd7dd1094948.json rename to crates/bin/cratesfyi/.sqlx/query-fab139cabc0987a1f2ad706060a3f4254924db75fc7f76a6c78d17a3fc06d663.json index 380bd9ea6..1f16abd55 100644 --- a/crates/bin/docs_rs_watcher/.sqlx/query-014a054d852f0937191e1a54f742d4b4c454361689fb3841cc12fd7dd1094948.json +++ b/crates/bin/cratesfyi/.sqlx/query-fab139cabc0987a1f2ad706060a3f4254924db75fc7f76a6c78d17a3fc06d663.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "DELETE FROM releases WHERE crate_id = $1 AND version = $2 RETURNING is_library", + "query": "DELETE FROM releases WHERE id = $1 RETURNING is_library", "describe": { "columns": [ { @@ -17,13 +17,12 @@ ], "parameters": { "Left": [ - "Int4", - "Text" + "Int4" ] }, "nullable": [ true ] }, - "hash": "014a054d852f0937191e1a54f742d4b4c454361689fb3841cc12fd7dd1094948" + "hash": "fab139cabc0987a1f2ad706060a3f4254924db75fc7f76a6c78d17a3fc06d663" } diff --git a/crates/bin/cratesfyi/src/daemon.rs b/crates/bin/cratesfyi/src/daemon.rs index 2ea37bd6b..f8b7f5808 100644 --- a/crates/bin/cratesfyi/src/daemon.rs +++ b/crates/bin/cratesfyi/src/daemon.rs @@ -4,7 +4,7 @@ use docs_rs_config::AppConfig as _; use docs_rs_context::Context; use docs_rs_watcher::{ start_background_queue_rebuild, start_background_repository_stats_updater, - start_background_service_metric_collector, watch_registry, + start_background_service_metric_collector, }; use docs_rs_web::run_web_server; use std::sync::Arc; @@ -21,7 +21,7 @@ fn start_registry_watcher( // space this out to prevent it from clashing against the queue-builder thread on launch tokio::time::sleep(Duration::from_secs(30)).await; - watch_registry(&config, &context).await + docs_rs_watcher::watch(&config, &context).await; }); Ok(()) diff --git a/crates/bin/cratesfyi/src/main.rs b/crates/bin/cratesfyi/src/main.rs index ccdb6c767..e6f65026f 100644 --- a/crates/bin/cratesfyi/src/main.rs +++ b/crates/bin/cratesfyi/src/main.rs @@ -1,3 +1,5 @@ +#![recursion_limit = "256"] + use anyhow::Result; use clap::Parser; use cratesfyi::daemon::start_daemon; diff --git a/crates/bin/docs_rs_watcher/.sqlx/query-2dc065cc08f262c937c54f9cc8629e35750da2bea995fb0c433893addb253214.json b/crates/bin/docs_rs_watcher/.sqlx/query-2dc065cc08f262c937c54f9cc8629e35750da2bea995fb0c433893addb253214.json deleted file mode 100644 index 2109f69cf..000000000 --- a/crates/bin/docs_rs_watcher/.sqlx/query-2dc065cc08f262c937c54f9cc8629e35750da2bea995fb0c433893addb253214.json +++ /dev/null @@ -1,15 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "DELETE FROM builds_logs bl\n USING builds b\n JOIN releases r ON b.rid = r.id\n WHERE bl.build_id = b.id AND r.crate_id = $1 AND r.version = $2;", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Int4", - "Text" - ] - }, - "nullable": [] - }, - "hash": "2dc065cc08f262c937c54f9cc8629e35750da2bea995fb0c433893addb253214" -} diff --git a/crates/bin/docs_rs_watcher/.sqlx/query-5f5fa0e89b4e13c690b1648a18e8420f7da0f0445c6e43d5d64617226c24fba1.json b/crates/bin/docs_rs_watcher/.sqlx/query-5f5fa0e89b4e13c690b1648a18e8420f7da0f0445c6e43d5d64617226c24fba1.json new file mode 100644 index 000000000..5d451984e --- /dev/null +++ b/crates/bin/docs_rs_watcher/.sqlx/query-5f5fa0e89b4e13c690b1648a18e8420f7da0f0445c6e43d5d64617226c24fba1.json @@ -0,0 +1,26 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT id FROM releases", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Int4", + "origin": { + "Table": { + "table": "releases", + "name": "id" + } + } + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + false + ] + }, + "hash": "5f5fa0e89b4e13c690b1648a18e8420f7da0f0445c6e43d5d64617226c24fba1" +} diff --git a/crates/bin/docs_rs_watcher/.sqlx/query-66b0ba6978880b79ce7a179bbe986e6c7eed78a2d4f01f316772949f5d688f42.json b/crates/bin/docs_rs_watcher/.sqlx/query-66b0ba6978880b79ce7a179bbe986e6c7eed78a2d4f01f316772949f5d688f42.json new file mode 100644 index 000000000..89cbc239c --- /dev/null +++ b/crates/bin/docs_rs_watcher/.sqlx/query-66b0ba6978880b79ce7a179bbe986e6c7eed78a2d4f01f316772949f5d688f42.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "DELETE FROM builds_logs bl\n USING builds b\n WHERE bl.build_id = b.id AND b.rid = $1;", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int4" + ] + }, + "nullable": [] + }, + "hash": "66b0ba6978880b79ce7a179bbe986e6c7eed78a2d4f01f316772949f5d688f42" +} diff --git a/crates/bin/docs_rs_watcher/.sqlx/query-7b7dd5795cddcb66b140b57157983bd73f73ecc1cf9b4fc24c457d5f26fd582b.json b/crates/bin/docs_rs_watcher/.sqlx/query-7b7dd5795cddcb66b140b57157983bd73f73ecc1cf9b4fc24c457d5f26fd582b.json new file mode 100644 index 000000000..effb9f9ac --- /dev/null +++ b/crates/bin/docs_rs_watcher/.sqlx/query-7b7dd5795cddcb66b140b57157983bd73f73ecc1cf9b4fc24c457d5f26fd582b.json @@ -0,0 +1,29 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT id FROM releases WHERE crate_id = $1 AND version = $2", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Int4", + "origin": { + "Table": { + "table": "releases", + "name": "id" + } + } + } + ], + "parameters": { + "Left": [ + "Int4", + "Text" + ] + }, + "nullable": [ + false + ] + }, + "hash": "7b7dd5795cddcb66b140b57157983bd73f73ecc1cf9b4fc24c457d5f26fd582b" +} diff --git a/crates/bin/cratesfyi/.sqlx/query-014a054d852f0937191e1a54f742d4b4c454361689fb3841cc12fd7dd1094948.json b/crates/bin/docs_rs_watcher/.sqlx/query-fab139cabc0987a1f2ad706060a3f4254924db75fc7f76a6c78d17a3fc06d663.json similarity index 65% rename from crates/bin/cratesfyi/.sqlx/query-014a054d852f0937191e1a54f742d4b4c454361689fb3841cc12fd7dd1094948.json rename to crates/bin/docs_rs_watcher/.sqlx/query-fab139cabc0987a1f2ad706060a3f4254924db75fc7f76a6c78d17a3fc06d663.json index 380bd9ea6..1f16abd55 100644 --- a/crates/bin/cratesfyi/.sqlx/query-014a054d852f0937191e1a54f742d4b4c454361689fb3841cc12fd7dd1094948.json +++ b/crates/bin/docs_rs_watcher/.sqlx/query-fab139cabc0987a1f2ad706060a3f4254924db75fc7f76a6c78d17a3fc06d663.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "DELETE FROM releases WHERE crate_id = $1 AND version = $2 RETURNING is_library", + "query": "DELETE FROM releases WHERE id = $1 RETURNING is_library", "describe": { "columns": [ { @@ -17,13 +17,12 @@ ], "parameters": { "Left": [ - "Int4", - "Text" + "Int4" ] }, "nullable": [ true ] }, - "hash": "014a054d852f0937191e1a54f742d4b4c454361689fb3841cc12fd7dd1094948" + "hash": "fab139cabc0987a1f2ad706060a3f4254924db75fc7f76a6c78d17a3fc06d663" } diff --git a/crates/bin/docs_rs_watcher/Cargo.toml b/crates/bin/docs_rs_watcher/Cargo.toml index 9744a6df6..286295618 100644 --- a/crates/bin/docs_rs_watcher/Cargo.toml +++ b/crates/bin/docs_rs_watcher/Cargo.toml @@ -8,6 +8,9 @@ edition.workspace = true [dependencies] anyhow = { workspace = true } +aws-config = { workspace = true } +aws-sdk-sqs = { version = "1.99.0", default-features = false, features = ["default-https-client", "rt-tokio"] } +chrono = { workspace = true } clap = { workspace = true } # NOTE: on the new infra, switch back from `git-https-reqwest` to `git-https` (curl) once the curl version is new enough crates-index = { version = "3.0.0", default-features = false, features = ["git", "git-https-reqwest", "git-performance", "parallel"] } @@ -16,6 +19,7 @@ crates-index-diff = { version = "31.0.0", default-features = false, features = [ docs_rs_build_queue = { path = "../../lib/docs_rs_build_queue" } docs_rs_config = { path = "../../lib/docs_rs_config" } docs_rs_context = { path = "../../lib/docs_rs_context" } +docs_rs_crates_io = { path = "../../lib/docs_rs_crates_io" } docs_rs_database = { path = "../../lib/docs_rs_database" } docs_rs_env_vars = { path = "../../lib/docs_rs_env_vars" } docs_rs_fastly = { path = "../../lib/docs_rs_fastly" } @@ -29,9 +33,11 @@ futures-util = { workspace = true } itertools = { workspace = true } opentelemetry = { workspace = true } rayon = "1.6.1" +serde_json = { workspace = true } sqlx = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } +url = { workspace = true } [dev-dependencies] docs_rs_config = { path = "../../lib/docs_rs_config", features = ["testing"] } diff --git a/crates/bin/docs_rs_watcher/src/config.rs b/crates/bin/docs_rs_watcher/src/config.rs index 7b5f17976..e8bdc4f82 100644 --- a/crates/bin/docs_rs_watcher/src/config.rs +++ b/crates/bin/docs_rs_watcher/src/config.rs @@ -2,18 +2,26 @@ use anyhow::Result; use docs_rs_config::AppConfig; use docs_rs_env_vars::{env, maybe_env, require_env}; use std::{path::PathBuf, time::Duration}; +use url::Url; #[derive(Debug)] pub struct Config { + /// registry watching config. Also used for database-synchonize pub registry_index_path: PathBuf, pub registry_url: Option, - /// How long to wait between registry checks pub delay_between_registry_fetches: Duration, - // Time between 'git gc --auto' calls in seconds pub registry_gc_interval: u64, + /// SQS watching config. + pub sqs_queue_url: Option, + pub sqs_region: Option, + pub sqs_endpoint_url: Option, + /// temporary, to switch between the sources for the index (git index vs SQS) + pub sqs_active: bool, + pub aws_sdk_max_retries: u32, + // automatic rebuild configuration pub max_queued_rebuilds: Option, @@ -29,6 +37,13 @@ impl AppConfig for Config { Ok(Self { registry_index_path: env("REGISTRY_INDEX_PATH", prefix.join("crates.io-index"))?, registry_url: maybe_env("REGISTRY_URL")?, + + sqs_queue_url: maybe_env("DOCSRS_SQS_QUEUE_URL")?, + sqs_region: maybe_env("DOCSRS_SQS_REGION")?, + sqs_endpoint_url: maybe_env("DOCSRS_SQS_ENDPOINT_URL")?, + sqs_active: env("DOCSRS_SQS_ACTIVE", false)?, + aws_sdk_max_retries: env("DOCSRS_AWS_SDK_MAX_RETRIES", 6u32)?, + delay_between_registry_fetches: Duration::from_secs(env::( "DOCSRS_DELAY_BETWEEN_REGISTRY_FETCHES", 60, @@ -42,4 +57,11 @@ impl AppConfig for Config { repository: docs_rs_repository_stats::Config::from_environment()?, }) } + + #[cfg(test)] + fn test_config() -> Result { + let mut config = Self::from_environment()?; + config.sqs_active = false; + Ok(config) + } } diff --git a/crates/bin/docs_rs_watcher/src/db/delete.rs b/crates/bin/docs_rs_watcher/src/db/delete.rs index 29fbe3796..4b97be297 100644 --- a/crates/bin/docs_rs_watcher/src/db/delete.rs +++ b/crates/bin/docs_rs_watcher/src/db/delete.rs @@ -67,7 +67,13 @@ pub async fn delete_version( return Ok(()); }; - let is_library = delete_version_from_database(conn, config, name, crate_id, version).await?; + let Some(is_library) = + delete_version_from_database(conn, config, name, crate_id, version).await? + else { + // release doesn't exist + return Ok(()); + }; + let paths = if is_library { LIBRARY_STORAGE_PATHS_TO_DELETE } else { @@ -133,7 +139,18 @@ async fn delete_version_from_database( name: &KrateName, crate_id: CrateId, version: &Version, -) -> Result { +) -> Result> { + let Some(release_id) = sqlx::query_scalar!( + "SELECT id FROM releases WHERE crate_id = $1 AND version = $2", + crate_id as _, + version as _ + ) + .fetch_optional(&mut *conn) + .await? + else { + return Ok(None); + }; + let mut transaction = conn.begin().await?; let delete_lock_timeout = format!("{}ms", config.delete_lock_timeout.as_millis()); @@ -157,23 +174,23 @@ async fn delete_version_from_database( sqlx::query!( "DELETE FROM builds_logs bl USING builds b - JOIN releases r ON b.rid = r.id - WHERE bl.build_id = b.id AND r.crate_id = $1 AND r.version = $2;", - crate_id as _, - version as _ + WHERE bl.build_id = b.id AND b.rid = $1;", + release_id as _, ) .execute(&mut *transaction) .await?; for &(table, column) in METADATA { - sqlx::query(sqlx::AssertSqlSafe( - format!("DELETE FROM {table} WHERE {column} IN (SELECT id FROM releases WHERE crate_id = $1 AND version = $2)"))) - .bind(crate_id).bind(version).execute(&mut *transaction).await?; + sqlx::query(sqlx::AssertSqlSafe(format!( + "DELETE FROM {table} WHERE {column} = $1" + ))) + .bind(release_id) + .execute(&mut *transaction) + .await?; } let is_library: bool = sqlx::query_scalar!( - "DELETE FROM releases WHERE crate_id = $1 AND version = $2 RETURNING is_library", - crate_id.0, - version as _, + "DELETE FROM releases WHERE id = $1 RETURNING is_library", + release_id as _, ) .fetch_one(&mut *transaction) .await? @@ -190,7 +207,7 @@ async fn delete_version_from_database( update_latest_version_id(&mut transaction, crate_id).await?; transaction.commit().await?; - Ok(is_library) + Ok(Some(is_library)) } /// Returns whether any release in this crate was a library @@ -449,6 +466,13 @@ mod tests { ); } + // running delete-crate again doesn't error. + assert!( + delete_crate(&mut conn, storage, env.config(), &FOO) + .await + .is_ok() + ); + Ok(()) } @@ -613,6 +637,13 @@ mod tests { vec!["Peter Rabbit".to_string()] ); + // running delete-version again doesn't fail. + assert!( + delete_version(&mut conn, storage, env.config(), &KRATE, &V1) + .await + .is_ok() + ); + // FIXME: remove for now until test frontend is async // let web = env.frontend(); // assert_success("/a/2.0.0/a/", web)?; @@ -691,6 +722,32 @@ mod tests { Ok(()) } + #[tokio::test(flavor = "multi_thread")] + async fn test_delete_already_deleted_version_doesnt_error() -> Result<()> { + let env = TestEnvironment::new().await?; + let mut conn = env.async_conn().await?; + + env.fake_release() + .await + .name(&KRATE) + .version(V1) + .create() + .await?; + env.fake_release() + .await + .name(&KRATE) + .version(V2) + .create() + .await?; + + delete_version(&mut conn, env.storage()?, env.config(), &KRATE, &V1).await?; + delete_version(&mut conn, env.storage()?, env.config(), &KRATE, &V1).await?; + + assert!(crate_exists(&mut conn, &KRATE).await?); + + Ok(()) + } + #[tokio::test(flavor = "multi_thread")] async fn test_delete_version_waits_for_locked_queue_rows() -> Result<()> { let env = TestEnvironment::new().await?; diff --git a/crates/bin/docs_rs_watcher/src/index_watcher.rs b/crates/bin/docs_rs_watcher/src/index_watcher.rs index 68b964d79..ffa8ffa2f 100644 --- a/crates/bin/docs_rs_watcher/src/index_watcher.rs +++ b/crates/bin/docs_rs_watcher/src/index_watcher.rs @@ -19,7 +19,6 @@ use tracing::{debug, error, info, warn}; pub(crate) struct CrateVersion { pub name: KrateName, pub version: Version, - pub yanked: bool, } #[cfg(test)] @@ -28,19 +27,28 @@ impl Default for CrateVersion { Self { name: docs_rs_types::testing::KRATE, version: docs_rs_types::testing::V1, - yanked: false, } } } -impl TryFrom for CrateVersion { +impl TryFrom<&crates_index_diff::CrateVersion> for CrateVersion { type Error = anyhow::Error; - fn try_from(value: crates_index_diff::CrateVersion) -> Result { + fn try_from(value: &crates_index_diff::CrateVersion) -> Result { + Ok(Self { + name: value.name.parse()?, + version: value.version.parse()?, + }) + } +} + +impl TryFrom<&docs_rs_crates_io::events::CrateVersion> for CrateVersion { + type Error = anyhow::Error; + + fn try_from(value: &docs_rs_crates_io::events::CrateVersion) -> Result { Ok(Self { name: value.name.parse()?, version: value.version.parse()?, - yanked: value.yanked, }) } } @@ -51,7 +59,6 @@ impl From for crates_index_diff::CrateVersion { Self { name: value.name.to_string().into(), version: value.version.to_string().into(), - yanked: value.yanked, ..Default::default() } } @@ -133,6 +140,16 @@ async fn process_changes(context: &Context, changes: &Vec, config: &Conf let mut crates_added = 0; for change in changes { + debug!(?change, "received change from git index"); + + if config.sqs_active { + // just to be safe. + // Generally we don't even start the git-index-watcher when + // SQS is active. + // Will be removed with the git index watcher code when SQS is stable. + continue; + } + match process_change(context, change, config).await { Ok(added) => { if added { @@ -148,22 +165,28 @@ async fn process_changes(context: &Context, changes: &Vec, config: &Conf } /// Process a crate change, returning whether the change was a crate addition or not. -async fn process_change(context: &Context, change: &Change, config: &Config) -> Result { +pub(crate) async fn process_change( + context: &Context, + change: &Change, + config: &Config, +) -> Result { let crate_version: CrateVersion = change .versions() .first() .expect("always exists") - .clone() .try_into()?; match change { Change::Added(_release) => process_version_added(context, &crate_version).await?, Change::AddedAndYanked(_release) => { process_version_added(context, &crate_version).await?; - process_version_yank_status(context, &crate_version).await?; + process_version_yank_status(context, &crate_version, true).await?; } - Change::Unyanked(_release) | Change::Yanked(_release) => { - process_version_yank_status(context, &crate_version).await? + Change::Unyanked(_release) => { + process_version_yank_status(context, &crate_version, false).await? + } + Change::Yanked(_release) => { + process_version_yank_status(context, &crate_version, true).await? } Change::CrateDeleted { name, .. } => { let name: KrateName = name.parse()?; @@ -177,15 +200,19 @@ async fn process_change(context: &Context, change: &Change, config: &Config) -> } /// Processes crate changes, whether they got yanked or unyanked. -async fn process_version_yank_status(context: &Context, release: &CrateVersion) -> Result<()> { +pub(crate) async fn process_version_yank_status( + context: &Context, + release: &CrateVersion, + yanked: bool, +) -> Result<()> { // FIXME: delay yanks of crates that have not yet finished building // https://github.com/rust-lang/docs.rs/issues/1934 - set_yanked(context, &release.name, &release.version, release.yanked).await?; + set_yanked(context, &release.name, &release.version, yanked).await?; queue_crate_invalidation(&release.name, context.cdn.as_deref()).await; Ok(()) } -async fn process_version_added(context: &Context, release: &CrateVersion) -> Result<()> { +pub(crate) async fn process_version_added(context: &Context, release: &CrateVersion) -> Result<()> { let mut conn = context.pool()?.get_async().await?; let priority = get_crate_priority(&mut conn, &release.name).await?; context @@ -216,7 +243,7 @@ async fn process_version_added(context: &Context, release: &CrateVersion) -> Res Ok(()) } -async fn process_version_deleted( +pub(crate) async fn process_version_deleted( context: &Context, config: &Config, release: &CrateVersion, @@ -250,7 +277,7 @@ async fn process_version_deleted( Ok(()) } -async fn process_crate_deleted( +pub(crate) async fn process_crate_deleted( context: &Context, config: &Config, krate: &KrateName, @@ -342,7 +369,6 @@ mod tests { let krate = CrateVersion { name: KRATE, version: V1, - ..Default::default() }; process_version_added(&env, &krate).await?; @@ -353,7 +379,6 @@ mod tests { let krate = CrateVersion { name: "krate".parse()?, version: V2.to_string().parse()?, - ..Default::default() }; process_version_added(&env, &krate).await?; @@ -386,9 +411,8 @@ mod tests { let krate = CrateVersion { name: KRATE, version: V1, - yanked: true, }; - process_version_yank_status(&env, &krate).await?; + process_version_yank_status(&env, &krate, true).await?; // And verify it's actually marked as yanked let row = sqlx::query!( @@ -405,9 +429,8 @@ mod tests { let krate = CrateVersion { name: KRATE, version: V1, - yanked: false, }; - process_version_yank_status(&env, &krate).await?; + process_version_yank_status(&env, &krate, false).await?; let row = sqlx::query!( "SELECT yanked @@ -470,7 +493,6 @@ mod tests { let krate = CrateVersion { name: KRATE, version: V2, - ..Default::default() }; process_version_deleted(&env, env.config(), &krate).await?; @@ -500,22 +522,18 @@ mod tests { let krate1 = CrateVersion { name: KRATE, version: V1, - ..Default::default() }; let krate2 = CrateVersion { name: "krate2".parse()?, version: V1, - ..Default::default() }; let krate_already_present = CrateVersion { name: "krate_already_present".parse()?, version: V1, - ..Default::default() }; let non_existing_version = CrateVersion { name: "krate_already_present".parse()?, version: V2, - ..Default::default() }; let added = process_changes( &env, diff --git a/crates/bin/docs_rs_watcher/src/lib.rs b/crates/bin/docs_rs_watcher/src/lib.rs index 833a6c688..4b51356ef 100644 --- a/crates/bin/docs_rs_watcher/src/lib.rs +++ b/crates/bin/docs_rs_watcher/src/lib.rs @@ -3,8 +3,10 @@ pub mod consistency; mod db; mod index; pub mod index_watcher; +mod metrics; mod rebuilds; mod service_metrics; +mod subscriber; #[cfg(test)] mod testing; @@ -13,14 +15,54 @@ pub use db::{delete_crate, delete_version}; pub use index::Index; pub use rebuilds::queue_rebuilds; -use crate::{index_watcher::get_new_crates, service_metrics::OtelServiceMetrics}; -use anyhow::Result; +use crate::{ + index_watcher::get_new_crates, metrics::WatcherMetrics, service_metrics::OtelServiceMetrics, +}; +use anyhow::{Error, Result}; use docs_rs_context::Context; use docs_rs_utils::start_async_cron; use std::{sync::Arc, time::Duration}; use tokio::time::{self, Instant}; use tracing::{debug, error, info, trace}; +/// main index-watcher / subscriber loop. +/// mostly wraps either the git index watcher loop, or the sqs subscriber loop. +/// Only here so unexpected errors lead to a sentry report & restart instead of +/// the daemon / watcher just stopping. +pub async fn watch(config: &Config, context: &Context) { + let metrics = WatcherMetrics::new(context.meter_provider()); + + loop { + if config.sqs_active { + if let Err(err) = crate::subscriber::run_sqs_subscriber(config, context, &metrics).await + { + error!(?err, "unexpected error watching SQS, will retry"); + time::sleep(Duration::from_secs(10)).await; + } + } else { + // intermediate mode: + // - still fetch from git for events + // - listen so SQS, and log the events so we can test SQS connection, and compare events + // + // We don't retry on unespected SQS errors yet. + + if let (Err(err), _) = tokio::join!(crate::watch_registry(config, context), async { + // unexpected SQS errors are caught here, and we don't retry. + if let Err(err) = + crate::subscriber::run_sqs_subscriber(config, context, &metrics).await + { + error!(?err, "error setting up SQS test subscriber"); + } + Ok::<_, Error>(()) + }) { + // unexpected index watcher errors lead to a report & retry. + error!(?err, "unexpected error watching registry, will retry"); + time::sleep(Duration::from_secs(10)).await; + } + } + } +} + /// Run the registry watcher /// NOTE: this should only be run once, otherwise crates would be added /// to the queue multiple times. diff --git a/crates/bin/docs_rs_watcher/src/main.rs b/crates/bin/docs_rs_watcher/src/main.rs index ebc4f728f..e5a276a9b 100644 --- a/crates/bin/docs_rs_watcher/src/main.rs +++ b/crates/bin/docs_rs_watcher/src/main.rs @@ -1,3 +1,5 @@ +#![recursion_limit = "256"] + use anyhow::{Context as _, Result}; use clap::{Parser, Subcommand}; use docs_rs_config::AppConfig as _; @@ -82,7 +84,7 @@ impl CommandLine { // which should only run once, and all the time. docs_rs_watcher::start_background_service_metric_collector(&ctx).await?; - docs_rs_watcher::watch_registry(&config, &ctx).await?; + docs_rs_watcher::watch(&config, &ctx).await; } Self::Queue { subcommand } => subcommand.handle_args(config, ctx).await?, Self::Database { subcommand } => subcommand.handle_args(config, ctx).await?, diff --git a/crates/bin/docs_rs_watcher/src/metrics.rs b/crates/bin/docs_rs_watcher/src/metrics.rs new file mode 100644 index 000000000..fbdf7763d --- /dev/null +++ b/crates/bin/docs_rs_watcher/src/metrics.rs @@ -0,0 +1,61 @@ +use docs_rs_crates_io::events::IndexChangeV1; +use docs_rs_opentelemetry::AnyMeterProvider; +use opentelemetry::{ + KeyValue, + metrics::{Counter, Histogram}, +}; + +#[derive(Debug)] +pub(crate) struct WatcherMetrics { + pub(crate) sqs_messages_received_total: Counter, + pub(crate) sqs_poll_errors_total: Counter, + pub(crate) sqs_retries_total: Counter, + pub(crate) changes_applied_total: Counter, + pub(crate) sqs_message_processing_time: Histogram, + pub(crate) sqs_event_lag: Histogram, +} + +impl WatcherMetrics { + pub(crate) fn new(meter_provider: &AnyMeterProvider) -> Self { + let meter = meter_provider.meter("watcher"); + const PREFIX: &str = "docsrs.watcher"; + Self { + sqs_messages_received_total: meter + .u64_counter(format!("{PREFIX}.sqs_messages_received_total")) + .with_unit("1") + .build(), + sqs_poll_errors_total: meter + .u64_counter(format!("{PREFIX}.sqs_poll_errors_total")) + .with_unit("1") + .build(), + sqs_retries_total: meter + .u64_counter(format!("{PREFIX}.sqs_retries_total")) + .with_unit("1") + .build(), + changes_applied_total: meter + .u64_counter(format!("{PREFIX}.changes_applied_total")) + .with_unit("1") + .build(), + sqs_message_processing_time: meter + .f64_histogram(format!("{PREFIX}.sqs_message_processing_time")) + .with_boundaries(vec![ + 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, + 45.0, 55.0, 60.0, 65.0, 90.0, 120.0, + ]) + .with_unit("s") + .build(), + sqs_event_lag: meter + .f64_histogram(format!("{PREFIX}.sqs_event_lag")) + .with_boundaries(vec![ + 0.1, 0.5, 1.0, 5.0, 10.0, 30.0, 60.0, 300.0, 900.0, 3600.0, + ]) + .with_unit("s") + .build(), + } + } + + pub(crate) fn record_change_applied(&self, change: &IndexChangeV1) { + self.changes_applied_total + .add(1, &[KeyValue::new("type", change.kind())]); + } +} diff --git a/crates/bin/docs_rs_watcher/src/subscriber.rs b/crates/bin/docs_rs_watcher/src/subscriber.rs new file mode 100644 index 000000000..e201267b8 --- /dev/null +++ b/crates/bin/docs_rs_watcher/src/subscriber.rs @@ -0,0 +1,499 @@ +use crate::{ + Config, + index_watcher::{ + process_crate_deleted, process_version_added, process_version_deleted, + process_version_yank_status, + }, + metrics::WatcherMetrics, +}; +use anyhow::{Context as _, Result}; +use aws_config::{BehaviorVersion, Region, retry::RetryConfig}; +use aws_sdk_sqs::Client; +use chrono::Utc; +use docs_rs_context::Context; +use docs_rs_crates_io::events::{IndexChangeEventV1, IndexChangeV1}; +use docs_rs_types::KrateName; +use docs_rs_utils::retry_async; +use std::time::{Duration, Instant}; +use tokio::time; +use tracing::{debug, error, instrument, warn}; + +/// wait-time (long polling): +/// +/// How long should the request be kept open when there are no messages. +/// SQS only accepts values in the range 0..=20 seconds. +const WAIT_TIME: Duration = Duration::from_secs(20); + +/// when one long-polling request is finished, how long to sleep before starting the next? +const SLEEP_BETWEEN_REQUESTS: Duration = Duration::from_secs(1); + +/// when we have an error handling a message, how long should SQS wait until +/// it redelivers this message. +/// +/// With FIFO queues, other messages will wait behind. +const RETRY_DELAY: Duration = Duration::from_secs(30); + +/// How regularly to recheck the priorities of queued crates. +/// Right now only runs `deprioritize_workspaces`. +const DELAY_BETWEEN_PRIORITY_RECHECK: Duration = Duration::from_secs(60); + +/// visibility timeout: +/// SQS visibility timeout is the period after a consumer receives a message during +/// which that message is hidden from other consumers, and if it is not deleted before +/// the timeout expires, it becomes visible again for redelivery. +/// +/// Should be longer than the longest time our server takes to handle a message. +const VISIBILITY_TIMEOUT: Duration = Duration::from_secs(60); + +/// Result type for `handle_message_body`, so we can unit-test it without needing +/// fake SQS. +#[derive(Debug, Clone, PartialEq, Eq)] +enum MessageOutcome { + Ack, + RetryLater(Duration), + Ignore, +} + +pub(crate) async fn run_sqs_subscriber( + config: &Config, + context: &Context, + metrics: &WatcherMetrics, +) -> Result<()> { + let (Some(region), Some(queue_url)) = (&config.sqs_region, &config.sqs_queue_url) else { + warn!("missing sqs region or url, disabling crates.io SQS subscriber"); + return Ok(()); + }; + let mut last_priority_recheck = Instant::now(); + let queue = context.build_queue()?; + + debug!("creating SQS client..."); + let shared_config = aws_config::load_defaults(BehaviorVersion::latest()).await; + let mut client_config = aws_sdk_sqs::config::Builder::from(&shared_config) + .retry_config(RetryConfig::standard().with_max_attempts(config.aws_sdk_max_retries)) + .region(Region::new(region.to_string())); + if let Some(endpoint_url) = &config.sqs_endpoint_url { + client_config = client_config.endpoint_url(endpoint_url.to_string()); + } + let client = Client::from_conf(client_config.build()); + + let queue_url = queue_url.to_string(); + + loop { + if queue.is_locked().await? { + debug!("Queue is locked, skipping checking new crates"); + time::sleep(WAIT_TIME).await; + continue; + } + + debug!("receiving messages..."); + let messages = match client + .receive_message() + .queue_url(&queue_url) + .max_number_of_messages(10) + .wait_time_seconds(WAIT_TIME.as_secs() as i32) + .visibility_timeout(VISIBILITY_TIMEOUT.as_secs() as i32) + .send() + .await + { + Ok(response) => response.messages().to_vec(), + Err(err) => { + metrics.sqs_poll_errors_total.add(1, &[]); + error!( + ?err, + queue_url, "error receiving messages from sqs, retrying" + ); + time::sleep(WAIT_TIME).await; + continue; + } + }; + metrics + .sqs_messages_received_total + .add(messages.len() as u64, &[]); + + for message in messages { + match handle_message_body(context, config, metrics, message.body.as_deref()).await { + MessageOutcome::Ack => { + if let Some(receipt_handle) = message.receipt_handle.as_deref() + && let Err(err) = client + .delete_message() + .queue_url(&queue_url) + .receipt_handle(receipt_handle) + .send() + .await + { + error!(?err, receipt_handle, "error deleting message from queue"); + } + } + MessageOutcome::RetryLater(delay) => { + error!( + ?message, + ?delay, + body = message.body.as_deref().unwrap_or_default(), + "error handling message. Retrying." + ); + if let Some(receipt_handle) = message.receipt_handle.as_deref() + && let Err(err) = client + .change_message_visibility() + .queue_url(&queue_url) + .receipt_handle(receipt_handle) + .visibility_timeout(delay.as_secs() as i32) + .send() + .await + { + warn!( + ?err, + receipt_handle, "error setting visibility_timeout for retry" + ); + } + } + MessageOutcome::Ignore => {} + } + } + + if last_priority_recheck.elapsed() >= DELAY_BETWEEN_PRIORITY_RECHECK { + if let Err(err) = queue.deprioritize_workspaces().await { + error!(?err, "error deprioritizing workspaces"); + } + + last_priority_recheck = Instant::now(); + } + + time::sleep(SLEEP_BETWEEN_REQUESTS).await; + } +} + +async fn handle_message_body( + context: &Context, + config: &Config, + metrics: &WatcherMetrics, + body: Option<&str>, +) -> MessageOutcome { + let Some(body) = body else { + return MessageOutcome::Ignore; + }; + let start = Instant::now(); + + match retry_async( + || async move { process_sqs_event(context, config, metrics, body).await }, + 3, + ) + .await + { + Ok(_) => { + metrics + .sqs_message_processing_time + .record(start.elapsed().as_secs_f64(), &[]); + MessageOutcome::Ack + } + Err(err) => { + metrics + .sqs_message_processing_time + .record(start.elapsed().as_secs_f64(), &[]); + metrics.sqs_retries_total.add(1, &[]); + error!( + ?err, + ?RETRY_DELAY, + body, + "error handling message. Retrying." + ); + MessageOutcome::RetryLater(RETRY_DELAY) + } + } +} + +#[instrument(skip_all)] +async fn process_sqs_event( + context: &Context, + config: &Config, + metrics: &WatcherMetrics, + body: &str, +) -> Result<()> { + let event: IndexChangeEventV1 = + serde_json::from_str(body).context("error parsing event from json")?; + + debug!(?event, "received event from sqs"); + metrics + .sqs_event_lag + .record((Utc::now() - event.occurred_at).as_seconds_f64(), &[]); + + if config.sqs_active { + process_change(context, &event.change, config) + .await + .context("error processing change")?; + metrics.record_change_applied(&event.change); + } + + Ok(()) +} + +/// Process a crate change +#[instrument(skip(context, config))] +pub(crate) async fn process_change( + context: &Context, + change: &IndexChangeV1, + config: &Config, +) -> Result<()> { + match change { + IndexChangeV1::Added(crate_version) => { + process_version_added(context, &crate_version.try_into()?).await? + } + IndexChangeV1::Yanked(crate_version) => { + process_version_yank_status(context, &crate_version.try_into()?, true).await? + } + IndexChangeV1::Unyanked(crate_version) => { + process_version_yank_status(context, &crate_version.try_into()?, false).await? + } + IndexChangeV1::CrateDeleted { name, .. } => { + let name: KrateName = name.parse()?; + process_crate_deleted(context, config, &name).await? + } + IndexChangeV1::VersionDeleted(crate_version) => { + process_version_deleted(context, config, &crate_version.try_into()?).await? + } + }; + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::testing::TestEnvironment; + use docs_rs_config::AppConfig as _; + use docs_rs_crates_io::events::CrateVersion; + use docs_rs_types::{ + Version, + testing::{KRATE, V1, V2}, + }; + use pretty_assertions::assert_eq; + + fn added_event_json(name: &KrateName, version: &Version) -> String { + serde_json::to_string(&serde_json::json!({ + "id":"evt_123", + "occurred_at":"2026-06-01T12:00:00Z", + "type":"added", + "payload":{ + "name": name.to_string(), + "vers": version.to_string(), + } + })) + .unwrap() + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_process_change_added_queues_crate() -> Result<()> { + let env = TestEnvironment::new().await?; + + process_change( + &env, + &IndexChangeV1::Added(CrateVersion { + name: KRATE.to_string(), + version: V1.to_string(), + }), + env.config(), + ) + .await?; + + let queue = env.build_queue()?.queued_crates().await?; + assert_eq!(queue.len(), 1); + assert_eq!(queue[0].name, KRATE); + assert_eq!(queue[0].version, V1); + + Ok(()) + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_process_change_yanked_updates_release() -> Result<()> { + let env = TestEnvironment::new().await?; + let mut conn = env.async_conn().await?; + + let id = env + .fake_release() + .await + .name(KRATE) + .version(V1) + .create() + .await?; + + process_change( + &env, + &IndexChangeV1::Yanked(CrateVersion { + name: KRATE.to_string(), + version: V1.to_string(), + }), + env.config(), + ) + .await?; + + let yanked = sqlx::query_scalar!( + "SELECT yanked + FROM releases + WHERE id = $1", + id.0 + ) + .fetch_one(&mut *conn) + .await?; + assert_eq!(yanked, Some(true)); + + Ok(()) + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_process_change_version_deleted_removes_release() -> Result<()> { + let env = TestEnvironment::new().await?; + let mut conn = env.async_conn().await?; + + let rid_1 = env + .fake_release() + .await + .name(KRATE) + .version(V1) + .create() + .await?; + env.fake_release() + .await + .name(KRATE) + .version(V2) + .create() + .await?; + + process_change( + &env, + &IndexChangeV1::VersionDeleted(CrateVersion { + name: KRATE.to_string(), + version: V2.to_string(), + }), + env.config(), + ) + .await?; + + assert_eq!( + sqlx::query_scalar!("SELECT id FROM releases") + .fetch_all(&mut *conn) + .await?, + vec![rid_1.0] + ); + + Ok(()) + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_process_sqs_event_dispatches_added_event() -> Result<()> { + let mut config = Config::test_config()?; + config.sqs_active = true; + let env = TestEnvironment::builder().config(config).build().await?; + let metrics = WatcherMetrics::new(&env.context().meter_provider); + + process_sqs_event(&env, env.config(), &metrics, &added_event_json(&KRATE, &V1)).await?; + + let queue = env.build_queue()?.queued_crates().await?; + assert_eq!(queue.len(), 1); + assert_eq!(queue[0].name, KRATE); + assert_eq!(queue[0].version, V1); + let collected = env.collected_metrics(); + let applied_metric = + collected.get_metric("watcher", "docsrs.watcher.changes_applied_total")?; + let applied = applied_metric.get_u64_counter(); + let change_type = applied + .attributes() + .find(|kv| kv.key.as_str() == "type") + .unwrap() + .value + .to_string(); + assert_eq!(change_type, "added"); + assert_eq!(applied.value(), 1); + let lag_metric = collected.get_metric("watcher", "docsrs.watcher.sqs_event_lag")?; + assert_eq!(lag_metric.get_f64_histogram().count(), 1); + + Ok(()) + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_process_sqs_event_respects_sqs_active() -> Result<()> { + let mut config = Config::test_config()?; + config.sqs_active = false; + let env = TestEnvironment::builder().config(config).build().await?; + let metrics = WatcherMetrics::new(&env.context().meter_provider); + + process_sqs_event(&env, env.config(), &metrics, &added_event_json(&KRATE, &V1)).await?; + + assert!(env.build_queue()?.queued_crates().await?.is_empty()); + + Ok(()) + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_process_sqs_event_rejects_invalid_json() -> Result<()> { + let env = TestEnvironment::new().await?; + let metrics = WatcherMetrics::new(&env.context().meter_provider); + + let err = process_sqs_event(&env, env.config(), &metrics, "{not json").await; + + assert!(err.is_err()); + let err = format!("{:?}", err.unwrap_err()); + assert!( + err.contains("error parsing event from json"), + "unexpected error: {err}" + ); + + Ok(()) + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_handle_message_body_acknowledges_success() -> Result<()> { + let config = Config::test_config()?; + let env = TestEnvironment::builder().config(config).build().await?; + let metrics = WatcherMetrics::new(&env.context().meter_provider); + + assert_eq!( + handle_message_body( + &env, + env.config(), + &metrics, + Some(&added_event_json(&KRATE, &V1)), + ) + .await, + MessageOutcome::Ack + ); + let collected = env.collected_metrics(); + let processing_metric = + collected.get_metric("watcher", "docsrs.watcher.sqs_message_processing_time")?; + assert_eq!(processing_metric.get_f64_histogram().count(), 1); + + Ok(()) + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_handle_message_body_retries_failed_processing() -> Result<()> { + let env = TestEnvironment::new().await?; + let metrics = WatcherMetrics::new(&env.context().meter_provider); + + assert_eq!( + handle_message_body(&env, env.config(), &metrics, Some("{bad json")).await, + MessageOutcome::RetryLater(RETRY_DELAY) + ); + let collected = env.collected_metrics(); + assert_eq!( + collected + .get_metric("watcher", "docsrs.watcher.sqs_retries_total")? + .get_u64_counter() + .value(), + 1 + ); + + Ok(()) + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_handle_message_body_ignores_missing_body() -> Result<()> { + let env = TestEnvironment::new().await?; + let metrics = WatcherMetrics::new(&env.context().meter_provider); + + assert_eq!( + handle_message_body(&env, env.config(), &metrics, None).await, + MessageOutcome::Ignore + ); + assert!(env.build_queue()?.queued_crates().await?.is_empty()); + + Ok(()) + } +} diff --git a/crates/lib/docs_rs_crates_io/Cargo.toml b/crates/lib/docs_rs_crates_io/Cargo.toml new file mode 100644 index 000000000..497536d66 --- /dev/null +++ b/crates/lib/docs_rs_crates_io/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "docs_rs_crates_io" +version = "0.1.0" +description = "types & logic for the direct integration between docs.rs & crates.io" + +authors.workspace = true +license.workspace = true +repository.workspace = true +edition.workspace = true + +[dependencies] +chrono = { version = "0.4", features = ["serde"] } +serde = { version = "1", features = ["derive"] } + +[dev-dependencies] +serde_json = "1.0" + +[lints] +workspace = true diff --git a/crates/lib/docs_rs_crates_io/src/events.rs b/crates/lib/docs_rs_crates_io/src/events.rs new file mode 100644 index 000000000..6e1b722ff --- /dev/null +++ b/crates/lib/docs_rs_crates_io/src/events.rs @@ -0,0 +1,249 @@ +use chrono::{DateTime, Utc}; +use std::fmt; + +/// A change that can happen to a crate on our index. +#[derive(Clone, serde::Serialize, serde::Deserialize, Eq, PartialEq, Debug)] +#[serde(tag = "type", content = "payload", rename_all = "snake_case")] +pub enum IndexChangeV1 { + /// A crate version was added. + Added(CrateVersion), + /// A crate version was unyanked. + Unyanked(CrateVersion), + /// A crate version was yanked. + Yanked(CrateVersion), + /// The name of the crate whose file was deleted, which implies all versions were deleted as well. + CrateDeleted { name: String }, + /// A crate version was deleted. + VersionDeleted(CrateVersion), +} + +impl IndexChangeV1 { + /// Return the added crate, if this is this kind of change. + pub fn added(&self) -> Option<&CrateVersion> { + match self { + IndexChangeV1::Added(v) => Some(v), + _ => None, + } + } + + /// Return the yanked crate, if this is this kind of change. + pub fn yanked(&self) -> Option<&CrateVersion> { + match self { + IndexChangeV1::Yanked(v) => Some(v), + _ => None, + } + } + + /// Return the unyanked crate, if this is this kind of change. + pub fn unyanked(&self) -> Option<&CrateVersion> { + match self { + IndexChangeV1::Unyanked(v) => Some(v), + _ => None, + } + } + + /// Return the deleted crate, if this is this kind of change. + pub fn crate_deleted(&self) -> Option<&str> { + match self { + IndexChangeV1::CrateDeleted { name } => Some(name.as_str()), + _ => None, + } + } + + /// Return the deleted version crate, if this is this kind of change. + pub fn version_deleted(&self) -> Option<&CrateVersion> { + match self { + IndexChangeV1::VersionDeleted(v) => Some(v), + _ => None, + } + } + + pub fn name(&self) -> &str { + match self { + IndexChangeV1::Added(crate_version) => &crate_version.name, + IndexChangeV1::Unyanked(crate_version) => &crate_version.name, + IndexChangeV1::Yanked(crate_version) => &crate_version.name, + IndexChangeV1::CrateDeleted { name } => name, + IndexChangeV1::VersionDeleted(crate_version) => &crate_version.name, + } + } + + pub fn kind(&self) -> &'static str { + match *self { + IndexChangeV1::Added(_) => "added", + IndexChangeV1::Yanked(_) => "yanked", + IndexChangeV1::CrateDeleted { .. } => "crate deleted", + IndexChangeV1::VersionDeleted(_) => "version deleted", + IndexChangeV1::Unyanked(_) => "unyanked", + } + } +} + +impl fmt::Display for IndexChangeV1 { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str(self.kind()) + } +} + +/// A conventional event envelope for our events between crates.io & docs.rs +#[derive(Clone, serde::Serialize, serde::Deserialize, Eq, PartialEq, Debug)] +pub struct Event { + /// Unique event identifier for deduplication and tracing. + pub id: String, + /// Timestamp when the event occured + pub occurred_at: DateTime, + /// The typed payload. + #[serde(flatten)] + pub change: T, +} + +/// The first version of the public event wire format. +pub type IndexChangeEventV1 = Event; + +/// Pack all information we know about a change made to a version of a crate. +#[derive(Clone, serde::Serialize, serde::Deserialize, Eq, PartialEq, Debug)] +pub struct CrateVersion { + /// The crate name, i.e. `clap`. + pub name: String, + /// The semantic version of the crate. + #[serde(rename = "vers")] + pub version: String, +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + + fn crate_version() -> CrateVersion { + CrateVersion { + name: "clap".into(), + version: "4.5.0".into(), + } + } + + fn event(change: IndexChangeV1) -> IndexChangeEventV1 { + IndexChangeEventV1 { + id: "evt_123".into(), + occurred_at: DateTime::parse_from_rfc3339("2026-05-22T12:34:56Z") + .unwrap() + .with_timezone(&Utc), + change, + } + } + + #[test] + fn crate_version_serializes_with_vers_field() { + let event = crate_version(); + + assert_eq!( + serde_json::to_value(&event).unwrap(), + json!({ + "name": "clap", + "vers": "4.5.0", + }) + ); + } + + #[test] + fn change_serializes_with_expected_variant_shapes() { + let crate_version = crate_version(); + + let cases = [ + ( + IndexChangeV1::Added(crate_version.clone()), + json!({ + "type": "added", + "payload": { + "name": "clap", + "vers": "4.5.0", + } + }), + ), + ( + IndexChangeV1::Unyanked(crate_version.clone()), + json!({ + "type": "unyanked", + "payload": { + "name": "clap", + "vers": "4.5.0", + } + }), + ), + ( + IndexChangeV1::Yanked(crate_version.clone()), + json!({ + "type": "yanked", + "payload": { + "name": "clap", + "vers": "4.5.0", + } + }), + ), + ( + IndexChangeV1::CrateDeleted { + name: "old-crate".into(), + }, + json!({ + "type": "crate_deleted", + "payload": { + "name": "old-crate" + } + }), + ), + ( + IndexChangeV1::VersionDeleted(crate_version), + json!({ + "type": "version_deleted", + "payload": { + "name": "clap", + "vers": "4.5.0", + } + }), + ), + ]; + + for (event, expected) in cases { + assert_eq!(serde_json::to_value(&event).unwrap(), expected); + } + } + + #[test] + fn event_serializes_with_minimum_metadata() { + let event = event(IndexChangeV1::CrateDeleted { + name: "old-crate".into(), + }); + + assert_eq!( + serde_json::to_value(&event).unwrap(), + json!({ + "id": "evt_123", + "occurred_at": "2026-05-22T12:34:56Z", + "type": "crate_deleted", + "payload": { + "name": "old-crate" + } + }) + ); + } + + #[test] + fn event_deserializes_rfc3339_occurred_at() { + let event: IndexChangeEventV1 = serde_json::from_value(json!({ + "id": "evt_123", + "occurred_at": "2026-05-22T12:34:56Z", + "type": "crate_deleted", + "payload": { + "name": "old-crate" + } + })) + .unwrap(); + + assert_eq!( + event.occurred_at, + DateTime::parse_from_rfc3339("2026-05-22T12:34:56Z") + .unwrap() + .with_timezone(&Utc) + ); + } +} diff --git a/crates/lib/docs_rs_crates_io/src/lib.rs b/crates/lib/docs_rs_crates_io/src/lib.rs new file mode 100644 index 000000000..a9970c28f --- /dev/null +++ b/crates/lib/docs_rs_crates_io/src/lib.rs @@ -0,0 +1 @@ +pub mod events; diff --git a/crates/lib/docs_rs_storage/Cargo.toml b/crates/lib/docs_rs_storage/Cargo.toml index 153de2489..6b0fb85ea 100644 --- a/crates/lib/docs_rs_storage/Cargo.toml +++ b/crates/lib/docs_rs_storage/Cargo.toml @@ -16,11 +16,7 @@ testing = [ anyhow = { workspace = true } async-compression = { version = "0.4.32", features = ["bzip2", "deflate", "gzip", "tokio", "zstd"] } async-stream = { workspace = true } -# The default `rustls` feature pulls in the legacy hyper 0.14 + rustls 0.21 -# stack via `aws-smithy-runtime/tls-rustls`, which includes the vulnerable -# `rustls-webpki` v0.101.x. Using only `default-https-client` avoids this by -# using the modern rustls 0.23 + hyper 1.x stack instead. -aws-config = { version = "1.0.0", default-features = false, features = ["default-https-client", "rt-tokio"] } +aws-config = { workspace = true } aws-sdk-s3 = { version = "1.3.0", default-features = false, features = ["default-https-client", "rt-tokio"] } aws-smithy-types-convert = { version = "0.60.0", features = ["convert-chrono"] } base64 = { workspace = true } diff --git a/docker-compose.yml b/docker-compose.yml index b94f73248..690d66246 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -19,6 +19,7 @@ # - repo-stats updater # - cdn invalidator # - release-rebuild-enqueuer +# * `elasticmq` -> local SQS-compatible queue for watcher testing # # optional profile: `metrics`: # * `opentelemetry` -> a debug opentelemetry receiver @@ -120,6 +121,7 @@ x-registry-watcher: ®istry-watcher depends_on: - db - s3 + - elasticmq volumes: - "./ignored/docker-registry-watcher/prefix:/opt/docsrs/prefix" - crates-io-index:/opt/docsrs/crates.io-index @@ -132,6 +134,10 @@ x-registry-watcher: ®istry-watcher REGISTRY_INDEX_PATH: /opt/docsrs/crates.io-index # configure the rebuild-queuer, DOCSRS_MAX_QUEUED_REBUILDS: ${DOCSRS_MAX_QUEUED_REBUILDS:-10} + DOCSRS_SQS_QUEUE_URL: ${DOCSRS_SQS_QUEUE_URL:-http://elasticmq:9324/queue/docsrs-events} + DOCSRS_SQS_REGION: ${DOCSRS_SQS_REGION:-elasticmq} + DOCSRS_SQS_ENDPOINT_URL: ${DOCSRS_SQS_ENDPOINT_URL:-http://elasticmq:9324} + DOCSRS_SQS_ACTIVE: ${DOCSRS_SQS_ACTIVE:-false} env_file: - .docker.env @@ -170,6 +176,18 @@ services: # watcher-CLI should not be run as background daemon, just manually - manual + elasticmq: + image: softwaremill/elasticmq + ports: + - "127.0.0.1:9324:9324" + volumes: + - "./dockerfiles/elasticmq.conf:/opt/elasticmq.conf:ro" + command: ["-Dconfig.file=/opt/elasticmq.conf"] + profiles: + - watcher + - full + - manual + builder-a: <<: *builder volumes: diff --git a/dockerfiles/elasticmq.conf b/dockerfiles/elasticmq.conf new file mode 100644 index 000000000..fb77fac15 --- /dev/null +++ b/dockerfiles/elasticmq.conf @@ -0,0 +1,18 @@ +include classpath("application.conf") + +node-address { + protocol = http + host = "*" + port = 9324 + context-path = "" +} + +rest-sqs { + enabled = true + bind-port = 9324 + bind-hostname = "0.0.0.0" +} + +queues { + docsrs-events { } +} diff --git a/justfiles/utils.just b/justfiles/utils.just index 866f6a677..412fb1df6 100644 --- a/justfiles/utils.just +++ b/justfiles/utils.just @@ -1,9 +1,15 @@ _ensure_db_and_s3_are_running: _touch-docker-env - # dependencies in the docker-cli file are ignored - # here. Instead we explicitly start any dependent services first. - docker compose up -d db s3 --wait + # dependencies in the docker-cli file are ignored + # here. Instead we explicitly start any dependent services first. + docker compose up -d db s3 --wait _touch-docker-env: - touch .docker.env + touch .docker.env +send-sqs-payload: + aws sqs send-message \ + --endpoint-url $DOCSRS_SQS_ENDPOINT_URL \ + --region elasticmq \ + --queue-url $DOCSRS_SQS_QUEUE_URL \ + --message-body '{"id":"evt_1","occurred_at":"2026-07-02T12:00:00Z","type":"added","payload":{"name":"demo-crate","vers":"1.2.3"}}'