From 3f08bd5cbca65e5b71cbf530fc6b247d2be7f53e Mon Sep 17 00:00:00 2001 From: Brian Caswell Date: Tue, 17 Mar 2026 19:32:40 +0000 Subject: [PATCH 01/11] update azure blob upload support to use azure_storage_blob --- Cargo.lock | 562 ++++++++++++++++------------------------ Cargo.toml | 6 +- src/upload/blobstore.rs | 37 +-- 3 files changed, 251 insertions(+), 354 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c7d8be9a..550b17f7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3,10 +3,10 @@ version = 4 [[package]] -name = "RustyXML" -version = "0.3.0" +name = "adler2" +version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b5ace29ee3216de37c0546865ad08edef58b0f9e76838ed8959a84a990e58c5" +checksum = "320119579fcad9c21884f5c4861d16174d0e06250625266f50fe6898340abefa" [[package]] name = "anstyle" @@ -22,25 +22,26 @@ checksum = "7f202df86484c868dbad7eaa557ef785d5c66295e41b460ef922eca0723b842c" [[package]] name = "async-channel" -version = "1.9.0" +version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81953c529336010edd6d8e358f886d9581267795c61b19475b71314bffa46d35" +checksum = "924ed96dd52d1b75e9c1a3e6275715fd320f5f9439fb5a4a11fa51f4221158d2" dependencies = [ "concurrent-queue", - "event-listener 2.5.3", + "event-listener-strategy", "futures-core", + "pin-project-lite", ] [[package]] -name = "async-channel" -version = "2.5.0" +name = "async-compression" +version = "0.4.41" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "924ed96dd52d1b75e9c1a3e6275715fd320f5f9439fb5a4a11fa51f4221158d2" +checksum = "d0f9ee0f6e02ffd7ad5816e9464499fba7b3effd01123b515c41d1697c43dad1" dependencies = [ - "concurrent-queue", - "event-listener-strategy", - "futures-core", + "compression-codecs", + "compression-core", "pin-project-lite", + "tokio", ] [[package]] @@ -49,7 +50,7 @@ version = "3.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "290f7f2596bd5b78a9fec8088ccd89180d7f9f55b94b0576823bbbdc72ee8311" dependencies = [ - "event-listener 5.4.1", + "event-listener", "event-listener-strategy", "pin-project-lite", ] @@ -75,9 +76,9 @@ checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" name = "avml" version = "0.17.0" dependencies = [ - "async-channel 2.5.0", + "async-channel", "azure_core", - "azure_storage_blobs", + "azure_storage_blob", "byteorder", "bytes", "clap", @@ -91,7 +92,7 @@ dependencies = [ "rand 0.9.2", "reqwest", "snap", - "thiserror 2.0.18", + "thiserror", "tokio", "tokio-util", "url", @@ -99,93 +100,53 @@ dependencies = [ [[package]] name = "azure_core" -version = "0.21.0" +version = "0.33.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b552ad43a45a746461ec3d3a51dfb6466b4759209414b439c165eb6a6b7729e" +checksum = "bd0160068f7a3021b5e749dc552374e82360463e9fb51e1127631a69fdde641f" dependencies = [ + "async-lock", "async-trait", - "base64 0.22.1", + "azure_core_macros", "bytes", - "dyn-clone", "futures", - "getrandom 0.2.17", - "http-types", - "once_cell", - "paste", "pin-project", - "quick-xml", - "rand 0.8.5", "rustc_version", "serde", "serde_json", - "time", "tracing", - "url", - "uuid", + "typespec", + "typespec_client_core", ] [[package]] -name = "azure_storage" -version = "0.21.0" +name = "azure_core_macros" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "59f838159f4d29cb400a14d9d757578ba495ae64feb07a7516bf9e4415127126" +checksum = "1bd69a8e70ec6be32ebf7e947cf9a58f6c7255e4cd9c48e640532ef3e37adc6d" dependencies = [ - "RustyXML", - "async-lock", - "async-trait", - "azure_core", - "bytes", - "serde", - "serde_derive", - "time", - "tracing", - "url", - "uuid", -] - -[[package]] -name = "azure_storage_blobs" -version = "0.21.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97e83c3636ae86d9a6a7962b2112e3b19eb3903915c50ce06ff54ff0a2e6a7e4" -dependencies = [ - "RustyXML", - "azure_core", - "azure_storage", - "azure_svc_blobstorage", - "bytes", - "futures", - "serde", - "serde_derive", - "serde_json", - "time", + "proc-macro2", + "quote", + "syn", "tracing", - "url", - "uuid", ] [[package]] -name = "azure_svc_blobstorage" -version = "0.21.0" +name = "azure_storage_blob" +version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4e6c6f20c5611b885ba94c7bae5e02849a267381aecb8aee577e8c35ff4064c6" +checksum = "89cd8900ebab0da5cde56e07b8d9e4b94bfa5ede5ee6f5429027b93b58e1104e" dependencies = [ + "async-trait", "azure_core", "bytes", "futures", - "log", - "once_cell", + "percent-encoding", + "pin-project", "serde", "serde_json", "time", ] -[[package]] -name = "base64" -version = "0.13.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8" - [[package]] name = "base64" version = "0.22.1" @@ -232,6 +193,17 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9330f8b2ff13f34540b44e946ef35111825727b38d33286ef986142615121801" +[[package]] +name = "chacha20" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f8d983286843e49675a4b7a2d174efe136dc93a18d69130dd18198a6c167601" +dependencies = [ + "cfg-if", + "cpufeatures", + "rand_core 0.10.0", +] + [[package]] name = "clap" version = "4.6.0" @@ -270,6 +242,23 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3a822ea5bc7590f9d40f1ba12c0dc3c2760f3482c6984db1573ad11031420831" +[[package]] +name = "compression-codecs" +version = "0.4.37" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eb7b51a7d9c967fc26773061ba86150f19c50c0d65c887cb1fbe295fd16619b7" +dependencies = [ + "compression-core", + "flate2", + "memchr", +] + +[[package]] +name = "compression-core" +version = "0.4.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75984efb6ed102a0d42db99afb6c1948f0380d1d91808d5529916e6c08b49d8d" + [[package]] name = "concurrent-queue" version = "2.5.0" @@ -319,6 +308,24 @@ version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" +[[package]] +name = "cpufeatures" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b2a41393f66f16b0823bb79094d54ac5fbd34ab292ddafb9a0456ac9f87d201" +dependencies = [ + "libc", +] + +[[package]] +name = "crc32fast" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9481c1c90cbf2ac953f07c8d4a58aa3945c425b7185c9154d67a65e4230da511" +dependencies = [ + "cfg-if", +] + [[package]] name = "crossbeam-utils" version = "0.8.21" @@ -380,12 +387,6 @@ dependencies = [ "windows-sys 0.61.2", ] -[[package]] -name = "event-listener" -version = "2.5.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" - [[package]] name = "event-listener" version = "5.4.1" @@ -403,19 +404,10 @@ version = "0.5.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8be9f3dfaaffdae2972880079a491a1a8bb7cbed0b8dd7a347f668b4150a3b93" dependencies = [ - "event-listener 5.4.1", + "event-listener", "pin-project-lite", ] -[[package]] -name = "fastrand" -version = "1.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e51093e27b0797c359783294ca4f0a911c270184cb10f85783b118614a1501be" -dependencies = [ - "instant", -] - [[package]] name = "fastrand" version = "2.3.0" @@ -428,6 +420,16 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5baebc0774151f905a1a2cc41989300b1e6fbb29aff0ceffa1064fdd3088d582" +[[package]] +name = "flate2" +version = "1.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "843fba2746e448b37e26a819579957415c8cef339bf08564fe8b7ddbd959573c" +dependencies = [ + "crc32fast", + "miniz_oxide", +] + [[package]] name = "foldhash" version = "0.1.5" @@ -506,21 +508,6 @@ version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cecba35d7ad927e23624b22ad55235f2239cfa44fd10428eecbeba6d6a717718" -[[package]] -name = "futures-lite" -version = "1.13.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49a9d51ce47660b1e808d3c990b4709f2f415d928835a17dfd16991515c46bce" -dependencies = [ - "fastrand 1.9.0", - "futures-core", - "futures-io", - "memchr", - "parking", - "pin-project-lite", - "waker-fn", -] - [[package]] name = "futures-macro" version = "0.3.32" @@ -561,30 +548,6 @@ dependencies = [ "slab", ] -[[package]] -name = "getrandom" -version = "0.1.16" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8fc3cb4d91f53b50155bdcfd23f6a4c39ae1969c2ae85982b135750cccaf5fce" -dependencies = [ - "cfg-if", - "libc", - "wasi 0.9.0+wasi-snapshot-preview1", -] - -[[package]] -name = "getrandom" -version = "0.2.17" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ff2abc00be7fca6ebc474524697ae276ad847ad0a6b3faa4bcb027e9a4614ad0" -dependencies = [ - "cfg-if", - "js-sys", - "libc", - "wasi 0.11.1+wasi-snapshot-preview1", - "wasm-bindgen", -] - [[package]] name = "getrandom" version = "0.4.2" @@ -594,6 +557,7 @@ dependencies = [ "cfg-if", "libc", "r-efi", + "rand_core 0.10.0", "wasip2", "wasip3", ] @@ -652,26 +616,6 @@ dependencies = [ "pin-project-lite", ] -[[package]] -name = "http-types" -version = "2.12.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6e9b187a72d63adbfba487f48095306ac823049cb504ee195541e91c7775f5ad" -dependencies = [ - "anyhow", - "async-channel 1.9.0", - "base64 0.13.1", - "futures-lite", - "infer", - "pin-project-lite", - "rand 0.7.3", - "serde", - "serde_json", - "serde_qs", - "serde_urlencoded", - "url", -] - [[package]] name = "httparse" version = "1.10.1" @@ -699,13 +643,29 @@ dependencies = [ "want", ] +[[package]] +name = "hyper-tls" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70206fc6890eaca9fde8a0bf71caa2ddfc9fe045ac9e5c70df101a7dbde866e0" +dependencies = [ + "bytes", + "http-body-util", + "hyper", + "hyper-util", + "native-tls", + "tokio", + "tokio-native-tls", + "tower-service", +] + [[package]] name = "hyper-util" version = "0.1.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "96547c2556ec9d12fb1578c4eaf448b04993e7fb79cbaad930a656880a6bdfa0" dependencies = [ - "base64 0.22.1", + "base64", "bytes", "futures-channel", "futures-util", @@ -853,12 +813,6 @@ dependencies = [ "unit-prefix", ] -[[package]] -name = "infer" -version = "0.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "64e9829a50b42bb782c1df523f78d332fe371b10c661e78b7a3c34b0198e9fac" - [[package]] name = "insta" version = "1.46.3" @@ -872,15 +826,6 @@ dependencies = [ "tempfile", ] -[[package]] -name = "instant" -version = "0.1.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e0242819d153cba4b4b05a5a8f2a7e9bbf97b6055b2a002b395c96b5ff3c0222" -dependencies = [ - "cfg-if", -] - [[package]] name = "ipnet" version = "2.12.0" @@ -949,6 +894,16 @@ version = "2.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8ca58f447f06ed17d5fc4043ce1b10dd205e060fb3ce5b979b8ed8e59ff3f79" +[[package]] +name = "miniz_oxide" +version = "0.8.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fa76a2c86f704bdb222d66965fb3d63269ce38518b83cb0575fca855ebb6316" +dependencies = [ + "adler2", + "simd-adler32", +] + [[package]] name = "mio" version = "1.1.1" @@ -956,7 +911,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a69bcab0ad47271a0234d9422b131806bf3968021e5dc9328caf2d4cd58557fc" dependencies = [ "libc", - "wasi 0.11.1+wasi-snapshot-preview1", + "wasi", "windows-sys 0.61.2", ] @@ -1049,12 +1004,6 @@ version = "2.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba" -[[package]] -name = "paste" -version = "1.0.15" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a" - [[package]] name = "percent-encoding" version = "2.3.2" @@ -1120,15 +1069,6 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" -[[package]] -name = "ppv-lite86" -version = "0.2.21" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85eae3c4ed2f50dcfe72643da4befc30deadb458a9b590d720cde2f2b1e97da9" -dependencies = [ - "zerocopy", -] - [[package]] name = "prettyplease" version = "0.2.37" @@ -1150,9 +1090,9 @@ dependencies = [ [[package]] name = "quick-xml" -version = "0.31.0" +version = "0.39.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1004a344b30a54e2ee58d66a71b32d2db2feb0a31f9a2d302bf0536f15de2a33" +checksum = "958f21e8e7ceb5a1aa7fa87fab28e7c75976e0bfe7e23ff069e0a260f894067d" dependencies = [ "memchr", "serde", @@ -1173,30 +1113,6 @@ version = "6.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8dcc9c7d52a811697d2151c701e0d08956f92b0e24136cf4cf27b57a6a0d9bf" -[[package]] -name = "rand" -version = "0.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a6b1679d49b24bbfe0c803429aa1874472f50d9b363131f0e89fc356b544d03" -dependencies = [ - "getrandom 0.1.16", - "libc", - "rand_chacha 0.2.2", - "rand_core 0.5.1", - "rand_hc", -] - -[[package]] -name = "rand" -version = "0.8.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" -dependencies = [ - "libc", - "rand_chacha 0.3.1", - "rand_core 0.6.4", -] - [[package]] name = "rand" version = "0.9.2" @@ -1207,41 +1123,14 @@ dependencies = [ ] [[package]] -name = "rand_chacha" -version = "0.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f4c8ed856279c9737206bf725bf36935d8666ead7aa69b52be55af369d193402" -dependencies = [ - "ppv-lite86", - "rand_core 0.5.1", -] - -[[package]] -name = "rand_chacha" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" -dependencies = [ - "ppv-lite86", - "rand_core 0.6.4", -] - -[[package]] -name = "rand_core" -version = "0.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "90bde5296fc891b0cef12a6d03ddccc162ce7b2aff54160af9338f8d40df6d19" -dependencies = [ - "getrandom 0.1.16", -] - -[[package]] -name = "rand_core" -version = "0.6.4" +name = "rand" +version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" +checksum = "bc266eb313df6c5c09c1c7b1fbe2510961e5bcd3add930c1e31f7ed9da0feff8" dependencies = [ - "getrandom 0.2.17", + "chacha20", + "getrandom", + "rand_core 0.10.0", ] [[package]] @@ -1251,13 +1140,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "76afc826de14238e6e8c374ddcc1fa19e374fd8dd986b0d2af0d02377261d83c" [[package]] -name = "rand_hc" -version = "0.2.0" +name = "rand_core" +version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca3129af7b92a17112d59ad498c6f81eaf463253766b90396d39ea7a39d6613c" -dependencies = [ - "rand_core 0.5.1", -] +checksum = "0c8d0fd677905edcbeedbf2edb6494d676f0e98d54d5cf9bda0b061cb8fb8aba" [[package]] name = "reqwest" @@ -1265,7 +1151,7 @@ version = "0.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ab3f43e3283ab1488b624b44b0e988d0acea0b3214e694730a055cb6b2efa801" dependencies = [ - "base64 0.22.1", + "base64", "bytes", "futures-core", "futures-util", @@ -1273,13 +1159,17 @@ dependencies = [ "http-body", "http-body-util", "hyper", + "hyper-tls", "hyper-util", "js-sys", "log", + "native-tls", "percent-encoding", "pin-project-lite", + "rustls-pki-types", "sync_wrapper", "tokio", + "tokio-native-tls", "tokio-util", "tower", "tower-http", @@ -1314,16 +1204,19 @@ dependencies = [ ] [[package]] -name = "rustversion" -version = "1.0.22" +name = "rustls-pki-types" +version = "1.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d" +checksum = "be040f8b0a225e40375822a563fa9524378b9d63112f53e19ffff34df5d33fdd" +dependencies = [ + "zeroize", +] [[package]] -name = "ryu" -version = "1.0.23" +name = "rustversion" +version = "1.0.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9774ba4a74de5f7b1c1451ed6cd5285a32eddb5cccb8cc655a4e50009e06477f" +checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d" [[package]] name = "schannel" @@ -1406,35 +1299,18 @@ dependencies = [ "zmij", ] -[[package]] -name = "serde_qs" -version = "0.8.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c7715380eec75f029a4ef7de39a9200e0a63823176b759d055b613f5a87df6a6" -dependencies = [ - "percent-encoding", - "serde", - "thiserror 1.0.69", -] - -[[package]] -name = "serde_urlencoded" -version = "0.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd" -dependencies = [ - "form_urlencoded", - "itoa", - "ryu", - "serde", -] - [[package]] name = "shlex" version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" +[[package]] +name = "simd-adler32" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e320a6c5ad31d271ad523dcf3ad13e2767ad8b1cb8f047f75a8aeaf8da139da2" + [[package]] name = "similar" version = "2.7.0" @@ -1512,40 +1388,20 @@ version = "3.26.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "82a72c767771b47409d2345987fda8628641887d5466101319899796367354a0" dependencies = [ - "fastrand 2.3.0", - "getrandom 0.4.2", + "fastrand", + "getrandom", "once_cell", "rustix", "windows-sys 0.61.2", ] -[[package]] -name = "thiserror" -version = "1.0.69" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52" -dependencies = [ - "thiserror-impl 1.0.69", -] - [[package]] name = "thiserror" version = "2.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4288b5bcbc7920c07a1149a35cf9590a2aa808e0bc1eafaade0b80947865fbc4" dependencies = [ - "thiserror-impl 2.0.18", -] - -[[package]] -name = "thiserror-impl" -version = "1.0.69" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" -dependencies = [ - "proc-macro2", - "quote", - "syn", + "thiserror-impl", ] [[package]] @@ -1567,7 +1423,6 @@ checksum = "743bd48c283afc0388f9b8827b976905fb217ad9e647fae3a379a9283c4def2c" dependencies = [ "deranged", "itoa", - "js-sys", "num-conv", "powerfmt", "serde_core", @@ -1615,6 +1470,16 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "tokio-native-tls" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2" +dependencies = [ + "native-tls", + "tokio", +] + [[package]] name = "tokio-util" version = "0.7.18" @@ -1649,13 +1514,18 @@ version = "0.6.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d4e6559d53cc268e5031cd8429d05415bc4cb4aefc4aa5d6cc35fbf5b924a1f8" dependencies = [ + "async-compression", "bitflags", "bytes", + "futures-core", "futures-util", "http", "http-body", + "http-body-util", "iri-string", "pin-project-lite", + "tokio", + "tokio-util", "tower", "tower-layer", "tower-service", @@ -1710,6 +1580,56 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "typespec" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b63559a2aab9c7694fa8d2658a828d6b36f1e3904b1860d820c7cc6a2ead61c7" +dependencies = [ + "base64", + "bytes", + "futures", + "quick-xml", + "serde", + "serde_json", + "url", +] + +[[package]] +name = "typespec_client_core" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "de81ecf3a175da5a10ed60344caa8b53fe6d8ce28c6c978a7e3e09ca1e1b4131" +dependencies = [ + "async-trait", + "base64", + "dyn-clone", + "futures", + "pin-project", + "rand 0.10.0", + "reqwest", + "serde", + "serde_json", + "time", + "tracing", + "typespec", + "typespec_macros", + "url", + "uuid", +] + +[[package]] +name = "typespec_macros" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07108c5d18e00ec7bb09d2e48df95ebfab6b7179112d1e4216e9968ac2a0a429" +dependencies = [ + "proc-macro2", + "quote", + "rustc_version", + "syn", +] + [[package]] name = "unicode-ident" version = "1.0.24" @@ -1738,7 +1658,6 @@ dependencies = [ "idna", "percent-encoding", "serde", - "serde_derive", ] [[package]] @@ -1753,9 +1672,8 @@ version = "1.22.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a68d3c8f01c0cfa54a75291d83601161799e4a89a39e0929f4b0354d88757a37" dependencies = [ - "getrandom 0.4.2", + "getrandom", "js-sys", - "serde_core", "wasm-bindgen", ] @@ -1765,12 +1683,6 @@ version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" -[[package]] -name = "waker-fn" -version = "1.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "317211a0dc0ceedd78fb2ca9a44aed3d7b9b26f81870d485c07122b4350673b7" - [[package]] name = "want" version = "0.3.1" @@ -1780,12 +1692,6 @@ dependencies = [ "try-lock", ] -[[package]] -name = "wasi" -version = "0.9.0+wasi-snapshot-preview1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519" - [[package]] name = "wasi" version = "0.11.1+wasi-snapshot-preview1" @@ -2131,26 +2037,6 @@ dependencies = [ "synstructure", ] -[[package]] -name = "zerocopy" -version = "0.8.40" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a789c6e490b576db9f7e6b6d661bcc9799f7c0ac8352f56ea20193b2681532e5" -dependencies = [ - "zerocopy-derive", -] - -[[package]] -name = "zerocopy-derive" -version = "0.8.40" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f65c489a7071a749c849713807783f70672b28094011623e200cb86dcb835953" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "zerofrom" version = "0.1.6" @@ -2172,6 +2058,12 @@ dependencies = [ "synstructure", ] +[[package]] +name = "zeroize" +version = "1.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b97154e67e32c85465826e8bcc1c59429aaaf107c1e4a9e53c8d8ccd5eff88d0" + [[package]] name = "zerotrie" version = "0.2.3" diff --git a/Cargo.toml b/Cargo.toml index bbc9d2a6..0ed2f995 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,7 +15,7 @@ rust-version = "1.88.0" [features] default = ["put", "blobstore", "native-tls"] put = ["dep:reqwest", "reqwest?/stream", "dep:url", "dep:tokio", "dep:tokio-util"] -blobstore = ["dep:url", "dep:azure_core", "dep:azure_storage_blobs", "dep:tokio", "dep:tokio-util", "dep:async-channel"] +blobstore = ["dep:url", "dep:azure_core", "dep:azure_storage_blob", "dep:tokio", "dep:tokio-util", "dep:async-channel"] status = ["dep:indicatif"] native-tls = ["dep:native-tls"] @@ -31,8 +31,8 @@ thiserror = "2.0" libc = "0.2" async-channel = {version="2.5", optional=true} -azure_core = {version="0.21", optional=true, default-features=false} -azure_storage_blobs = {version="0.21", optional=true, default-features=false} +azure_core = {version="0.33", optional=true, default-features=false} +azure_storage_blob = {version="0.10", optional=true} indicatif = {version="0.18", optional=true, default-features=false} native-tls = {version="0.2", features=["vendored"], optional=true, default-features=false} reqwest = {version="0.13", optional=true, default-features=false} diff --git a/src/upload/blobstore.rs b/src/upload/blobstore.rs index d44db5b3..c02db2b8 100644 --- a/src/upload/blobstore.rs +++ b/src/upload/blobstore.rs @@ -4,11 +4,11 @@ use crate::{ONE_MB, upload::status::Status}; use async_channel::{Receiver, Sender, bounded}; use azure_core::error::Error as AzureError; -use azure_storage_blobs::prelude::{BlobBlockType, BlobClient, BlockId, BlockList}; +use azure_storage_blob::{BlobClient, BlockBlobClient, models::BlockLookupList}; use bytes::Bytes; use core::cmp; use futures::future::try_join_all; -use std::path::Path; +use std::{path::Path, sync::Arc}; use tokio::{ fs::File, io::{AsyncRead, AsyncReadExt as _}, @@ -163,7 +163,7 @@ fn calc_concurrency( /// ``` #[derive(Clone)] pub struct BlobUploader { - client: BlobClient, + client: Arc, size: usize, block_size: Option, concurrency: usize, @@ -178,19 +178,19 @@ impl BlobUploader { /// Returns an error if: /// - The URL cannot be parsed as a valid Azure SAS URL pub fn new(sas: &Url) -> Result { - let blob_client = BlobClient::from_sas_url(sas)?; + let blob_client = BlobClient::from_url(sas.clone(), None, None)?; Ok(Self::with_blob_client(blob_client)) } - /// Create a ``BlobUploader`` with a ``BlobClient`` from ``azure_storage_blobs``. + /// Create a ``BlobUploader`` with a ``BlobClient`` from ``azure_storage_blob``. /// - /// Ref: + /// Ref: #[must_use] pub fn with_blob_client(client: BlobClient) -> Self { let (sender, receiver) = bounded::(1); Self { - client, + client: Arc::new(client.block_blob_client()), size: DEFAULT_FILE_SIZE, block_size: None, concurrency: DEFAULT_CONCURRENCY, @@ -247,14 +247,12 @@ impl BlobUploader { } async fn finalize(self, block_ids: Vec) -> Result<()> { - let blocks = block_ids - .into_iter() - .map(|x| BlobBlockType::Uncommitted(BlockId::new(x))) - .collect::>(); + let block_list = BlockLookupList { + latest: Some(block_ids.into_iter().map(|x| x.to_vec()).collect()), + ..Default::default() + }; - let block_list = BlockList { blocks }; - - self.client.put_block_list(block_list).await?; + self.client.commit_block_list(block_list.try_into()?, None).await?; Ok(()) } @@ -325,7 +323,7 @@ impl BlobUploader { } async fn block_uploader( - client: BlobClient, + client: Arc, receiver: Receiver, status: Status, ) -> Result<()> { @@ -333,7 +331,14 @@ impl BlobUploader { while let Ok(upload_chunk) = receiver.recv().await { let chunk_len = upload_chunk.data.len(); - let result = client.put_block(upload_chunk.id, upload_chunk.data).await; + let content_length = chunk_len.try_into()?; + let result = client.stage_block( + upload_chunk.id.as_ref(), + content_length, + upload_chunk.data.into(), + None, + ) + .await; // as soon as any error is seen (after retrying), bail out and stop other uploaders if result.is_err() { From f026b1f146849371d568980bab7ff647c4279bb0 Mon Sep 17 00:00:00 2001 From: Brian Caswell Date: Tue, 17 Mar 2026 19:34:36 +0000 Subject: [PATCH 02/11] fmt --- src/upload/blobstore.rs | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/src/upload/blobstore.rs b/src/upload/blobstore.rs index c02db2b8..56093086 100644 --- a/src/upload/blobstore.rs +++ b/src/upload/blobstore.rs @@ -252,7 +252,9 @@ impl BlobUploader { ..Default::default() }; - self.client.commit_block_list(block_list.try_into()?, None).await?; + self.client + .commit_block_list(block_list.try_into()?, None) + .await?; Ok(()) } @@ -332,13 +334,14 @@ impl BlobUploader { let chunk_len = upload_chunk.data.len(); let content_length = chunk_len.try_into()?; - let result = client.stage_block( - upload_chunk.id.as_ref(), - content_length, - upload_chunk.data.into(), - None, - ) - .await; + let result = client + .stage_block( + upload_chunk.id.as_ref(), + content_length, + upload_chunk.data.into(), + None, + ) + .await; // as soon as any error is seen (after retrying), bail out and stop other uploaders if result.is_err() { From 1e5781f635d30c317c8b204c4e2ca24b5fb0a763 Mon Sep 17 00:00:00 2001 From: Brian Caswell Date: Tue, 17 Mar 2026 20:01:08 +0000 Subject: [PATCH 03/11] address lint --- src/upload/blobstore.rs | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/src/upload/blobstore.rs b/src/upload/blobstore.rs index 56093086..dd3313f9 100644 --- a/src/upload/blobstore.rs +++ b/src/upload/blobstore.rs @@ -4,7 +4,7 @@ use crate::{ONE_MB, upload::status::Status}; use async_channel::{Receiver, Sender, bounded}; use azure_core::error::Error as AzureError; -use azure_storage_blob::{BlobClient, BlockBlobClient, models::BlockLookupList}; +use azure_storage_blob::{BlobClient, models::BlockLookupList}; use bytes::Bytes; use core::cmp; use futures::future::try_join_all; @@ -163,7 +163,7 @@ fn calc_concurrency( /// ``` #[derive(Clone)] pub struct BlobUploader { - client: Arc, + client: Arc, size: usize, block_size: Option, concurrency: usize, @@ -190,7 +190,7 @@ impl BlobUploader { let (sender, receiver) = bounded::(1); Self { - client: Arc::new(client.block_blob_client()), + client: Arc::new(client), size: DEFAULT_FILE_SIZE, block_size: None, concurrency: DEFAULT_CONCURRENCY, @@ -252,7 +252,8 @@ impl BlobUploader { ..Default::default() }; - self.client + let client = self.client.block_blob_client(); + client .commit_block_list(block_list.try_into()?, None) .await?; @@ -325,10 +326,12 @@ impl BlobUploader { } async fn block_uploader( - client: Arc, + client: Arc, receiver: Receiver, status: Status, ) -> Result<()> { + let client = client.block_blob_client(); + // the channel will respond with an Err to indicate the channel is closed while let Ok(upload_chunk) = receiver.recv().await { let chunk_len = upload_chunk.data.len(); From 4efbc04aba146a217c21eb718c5aad33dfe707cb Mon Sep 17 00:00:00 2001 From: Brian Caswell Date: Tue, 17 Mar 2026 20:13:40 +0000 Subject: [PATCH 04/11] reduce features used from azure_core --- Cargo.lock | 89 +++++++++--------------------------------------------- Cargo.toml | 6 ++-- 2 files changed, 17 insertions(+), 78 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 550b17f7..dfc599fb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,12 +2,6 @@ # It is not intended for manual editing. version = 4 -[[package]] -name = "adler2" -version = "2.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "320119579fcad9c21884f5c4861d16174d0e06250625266f50fe6898340abefa" - [[package]] name = "anstyle" version = "1.0.13" @@ -32,18 +26,6 @@ dependencies = [ "pin-project-lite", ] -[[package]] -name = "async-compression" -version = "0.4.41" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d0f9ee0f6e02ffd7ad5816e9464499fba7b3effd01123b515c41d1697c43dad1" -dependencies = [ - "compression-codecs", - "compression-core", - "pin-project-lite", - "tokio", -] - [[package]] name = "async-lock" version = "3.4.2" @@ -113,6 +95,7 @@ dependencies = [ "rustc_version", "serde", "serde_json", + "tokio", "tracing", "typespec", "typespec_client_core", @@ -242,23 +225,6 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3a822ea5bc7590f9d40f1ba12c0dc3c2760f3482c6984db1573ad11031420831" -[[package]] -name = "compression-codecs" -version = "0.4.37" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb7b51a7d9c967fc26773061ba86150f19c50c0d65c887cb1fbe295fd16619b7" -dependencies = [ - "compression-core", - "flate2", - "memchr", -] - -[[package]] -name = "compression-core" -version = "0.4.31" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75984efb6ed102a0d42db99afb6c1948f0380d1d91808d5529916e6c08b49d8d" - [[package]] name = "concurrent-queue" version = "2.5.0" @@ -317,15 +283,6 @@ dependencies = [ "libc", ] -[[package]] -name = "crc32fast" -version = "1.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9481c1c90cbf2ac953f07c8d4a58aa3945c425b7185c9154d67a65e4230da511" -dependencies = [ - "cfg-if", -] - [[package]] name = "crossbeam-utils" version = "0.8.21" @@ -420,16 +377,6 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5baebc0774151f905a1a2cc41989300b1e6fbb29aff0ceffa1064fdd3088d582" -[[package]] -name = "flate2" -version = "1.1.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "843fba2746e448b37e26a819579957415c8cef339bf08564fe8b7ddbd959573c" -dependencies = [ - "crc32fast", - "miniz_oxide", -] - [[package]] name = "foldhash" version = "0.1.5" @@ -894,16 +841,6 @@ version = "2.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8ca58f447f06ed17d5fc4043ce1b10dd205e060fb3ce5b979b8ed8e59ff3f79" -[[package]] -name = "miniz_oxide" -version = "0.8.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1fa76a2c86f704bdb222d66965fb3d63269ce38518b83cb0575fca855ebb6316" -dependencies = [ - "adler2", - "simd-adler32", -] - [[package]] name = "mio" version = "1.1.1" @@ -1305,12 +1242,6 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" -[[package]] -name = "simd-adler32" -version = "0.3.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e320a6c5ad31d271ad523dcf3ad13e2767ad8b1cb8f047f75a8aeaf8da139da2" - [[package]] name = "similar" version = "2.7.0" @@ -1467,9 +1398,21 @@ dependencies = [ "mio", "pin-project-lite", "socket2", + "tokio-macros", "windows-sys 0.61.2", ] +[[package]] +name = "tokio-macros" +version = "2.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c55a2eff8b69ce66c84f85e1da1c233edc36ceb85a2058d11b0d6a3c7e7569c" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "tokio-native-tls" version = "0.3.1" @@ -1514,18 +1457,13 @@ version = "0.6.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d4e6559d53cc268e5031cd8429d05415bc4cb4aefc4aa5d6cc35fbf5b924a1f8" dependencies = [ - "async-compression", "bitflags", "bytes", - "futures-core", "futures-util", "http", "http-body", - "http-body-util", "iri-string", "pin-project-lite", - "tokio", - "tokio-util", "tower", "tower-layer", "tower-service", @@ -1611,6 +1549,7 @@ dependencies = [ "serde", "serde_json", "time", + "tokio", "tracing", "typespec", "typespec_macros", diff --git a/Cargo.toml b/Cargo.toml index 0ed2f995..b81d75c1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,7 +17,7 @@ default = ["put", "blobstore", "native-tls"] put = ["dep:reqwest", "reqwest?/stream", "dep:url", "dep:tokio", "dep:tokio-util"] blobstore = ["dep:url", "dep:azure_core", "dep:azure_storage_blob", "dep:tokio", "dep:tokio-util", "dep:async-channel"] status = ["dep:indicatif"] -native-tls = ["dep:native-tls"] +native-tls = ["dep:native-tls", "azure_core?/reqwest_native_tls"] [dependencies] byteorder = "1.5" @@ -31,8 +31,8 @@ thiserror = "2.0" libc = "0.2" async-channel = {version="2.5", optional=true} -azure_core = {version="0.33", optional=true, default-features=false} -azure_storage_blob = {version="0.10", optional=true} +azure_core = {version="0.33", optional=true, default-features=false, features=["reqwest", "tokio"]} +azure_storage_blob = {version="0.10", optional=true, default-features=false} indicatif = {version="0.18", optional=true, default-features=false} native-tls = {version="0.2", features=["vendored"], optional=true, default-features=false} reqwest = {version="0.13", optional=true, default-features=false} From 37892b4827fcf8dad47e10097aac4fbb4f3f3bb9 Mon Sep 17 00:00:00 2001 From: Brian Caswell Date: Wed, 18 Mar 2026 11:19:15 +0000 Subject: [PATCH 05/11] use the Azure SDK's built-in parallel upload functionality --- Cargo.lock | 14 +- Cargo.toml | 6 +- src/upload/blobstore.rs | 372 +++++++++++++++++++++------------------- src/upload/status.rs | 11 ++ 4 files changed, 209 insertions(+), 194 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index dfc599fb..4c68598e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -14,18 +14,6 @@ version = "1.0.102" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f202df86484c868dbad7eaa557ef785d5c66295e41b460ef922eca0723b842c" -[[package]] -name = "async-channel" -version = "2.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "924ed96dd52d1b75e9c1a3e6275715fd320f5f9439fb5a4a11fa51f4221158d2" -dependencies = [ - "concurrent-queue", - "event-listener-strategy", - "futures-core", - "pin-project-lite", -] - [[package]] name = "async-lock" version = "3.4.2" @@ -58,7 +46,7 @@ checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" name = "avml" version = "0.17.0" dependencies = [ - "async-channel", + "async-trait", "azure_core", "azure_storage_blob", "byteorder", diff --git a/Cargo.toml b/Cargo.toml index b81d75c1..9ecbaf71 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,13 +13,14 @@ edition = "2024" rust-version = "1.88.0" [features] -default = ["put", "blobstore", "native-tls"] +default = ["put", "blobstore", "native-tls", "status"] put = ["dep:reqwest", "reqwest?/stream", "dep:url", "dep:tokio", "dep:tokio-util"] -blobstore = ["dep:url", "dep:azure_core", "dep:azure_storage_blob", "dep:tokio", "dep:tokio-util", "dep:async-channel"] +blobstore = ["dep:url", "dep:azure_core", "dep:azure_storage_blob", "dep:tokio", "dep:tokio-util", "dep:async-trait"] status = ["dep:indicatif"] native-tls = ["dep:native-tls", "azure_core?/reqwest_native_tls"] [dependencies] +async-trait = {version="0.1", optional=true} byteorder = "1.5" bytes = "1.11" clap = {version="4.6", default-features=false, features=["derive", "std", "usage", "error-context", "help"]} @@ -30,7 +31,6 @@ snap = "1.1" thiserror = "2.0" libc = "0.2" -async-channel = {version="2.5", optional=true} azure_core = {version="0.33", optional=true, default-features=false, features=["reqwest", "tokio"]} azure_storage_blob = {version="0.10", optional=true, default-features=false} indicatif = {version="0.18", optional=true, default-features=false} diff --git a/src/upload/blobstore.rs b/src/upload/blobstore.rs index dd3313f9..f8890723 100644 --- a/src/upload/blobstore.rs +++ b/src/upload/blobstore.rs @@ -1,17 +1,27 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // Licensed under the MIT License. -use crate::{ONE_MB, upload::status::Status}; -use async_channel::{Receiver, Sender, bounded}; -use azure_core::error::Error as AzureError; -use azure_storage_blob::{BlobClient, models::BlockLookupList}; -use bytes::Bytes; -use core::cmp; -use futures::future::try_join_all; -use std::{path::Path, sync::Arc}; +use crate::ONE_MB; +use crate::upload::status::Status; +use azure_core::{ + Bytes, + error::Error as AzureError, + http::{Body, NoFormat, RequestContent}, + stream::{DEFAULT_BUFFER_SIZE, SeekableStream}, +}; +use azure_storage_blob::{BlobClient, models::BlobClientUploadOptions}; +use core::{ + cmp, + future::Future, + num::NonZeroUsize, + pin::Pin, + task::{Context, Poll}, +}; +use std::{io::SeekFrom, path::Path, sync::Arc}; use tokio::{ fs::File, - io::{AsyncRead, AsyncReadExt as _}, + io::{AsyncSeekExt as _, ReadBuf}, + sync::{Mutex, OwnedMutexGuard}, }; use url::Url; @@ -20,12 +30,6 @@ pub enum Error { #[error("file is too large")] TooLarge, - #[error("unable to queue block for upload")] - QueueBlock(#[from] async_channel::SendError), - - #[error("uploading blocks failed")] - UploadFromQueue(#[source] tokio::task::JoinError), - #[error("error reading file")] Io(#[from] std::io::Error), @@ -40,12 +44,12 @@ type Result = core::result::Result; /// Maximum number of blocks /// -/// +/// const BLOB_MAX_BLOCKS: usize = 50_000; /// Maximum size of any single block /// -/// +/// const BLOB_MAX_BLOCK_SIZE: usize = ONE_MB.saturating_mul(4000); /// Maximum total size of a file @@ -76,23 +80,17 @@ const MAX_CONCURRENCY: usize = 10; pub const DEFAULT_CONCURRENCY: usize = 10; /// As chunks stay in memory until the upload is complete, as to enable -/// automatic retries in the case of TCP or HTTP errors, chunks sizes for huge -/// files is capped to 100MB each +/// automatic retries in the case of TCP or HTTP errors, chunk sizes for huge +/// files are capped to 100MB each. const REASONABLE_BLOCK_SIZE: usize = ONE_MB.saturating_mul(100); -/// try to keep under 500MB in flight. If that's not possible due to block -/// size, concurrency will get disabled. +/// Try to keep under 500MB in flight. If that's not possible due to block size, +/// concurrency will get disabled. const MEMORY_THRESHOLD: usize = 500 * ONE_MB; -/// When uploading a file without a size, such as when uploading a stream of an -/// unknown size, use a 1TB stream +/// Default anticipated upload size before the actual file size is known. const DEFAULT_FILE_SIZE: usize = 1024 * 1024 * 1024 * 1024; -pub struct UploadBlock { - id: Bytes, - data: Bytes, -} - fn calc_concurrency( file_size: usize, block_size: Option, @@ -103,39 +101,23 @@ fn calc_concurrency( } let block_size = match block_size { - // if the user specifies a block size of 0 or doesn't specify a block size, - // calculate the block size based on the file size - Some(0) | None => { - match file_size { - // if the file is small enough to fit with 5MB blocks, use that - // to reduce impact for failure retries and increase - // concurrency. - x if (x < BLOB_MIN_BLOCK_SIZE * BLOB_MAX_BLOCKS) => BLOB_MIN_BLOCK_SIZE, - // if the file is large enough that we can fit with 100MB blocks, use that. - x if (x < REASONABLE_BLOCK_SIZE * BLOB_MAX_BLOCKS) => REASONABLE_BLOCK_SIZE, - // otherwise, just use the smallest block size that will fit - // within MAX BLOCKS to reduce memory pressure - x => (x / BLOB_MAX_BLOCKS) - .checked_add(1) - .ok_or(Error::TooLarge)?, - } - } - // minimum required to hit high-throughput block blob performance thresholds - Some(x) if (x <= BLOB_MIN_BLOCK_SIZE) => BLOB_MIN_BLOCK_SIZE, - // otherwise use the user specified value + Some(0) | None => match file_size { + x if x < BLOB_MIN_BLOCK_SIZE * BLOB_MAX_BLOCKS => BLOB_MIN_BLOCK_SIZE, + x if x < REASONABLE_BLOCK_SIZE * BLOB_MAX_BLOCKS => REASONABLE_BLOCK_SIZE, + x => (x / BLOB_MAX_BLOCKS) + .checked_add(1) + .ok_or(Error::TooLarge)?, + }, + Some(x) if x <= BLOB_MIN_BLOCK_SIZE => BLOB_MIN_BLOCK_SIZE, Some(x) => x, }; - // if the block size is larger than the max block size, use the max block size let block_size = usize::min(block_size, BLOB_MAX_BLOCK_SIZE); let upload_concurrency = match upload_concurrency { - // manually specifying concurrency of 0 will disable concurrency 0 | 1 => 1, - _ => match (MEMORY_THRESHOLD).checked_div(block_size) { + _ => match MEMORY_THRESHOLD.checked_div(block_size) { None | Some(0) => 1, - // cap the number of concurrent threads to reduce concurrency issues - // at the server end. Some(x) => cmp::min(MAX_CONCURRENCY, x), }, }; @@ -143,13 +125,121 @@ fn calc_concurrency( Ok((block_size, upload_concurrency)) } -/// Concurrently upload a Stream/File to an Azure Blob Store using a SAS URL. +#[derive(Clone)] +struct FileStream { + handle: Arc>, + stream_size: u64, + buffer_size: usize, + read_state: Arc>, + status: Status, +} + +impl core::fmt::Debug for FileStream { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + f.debug_struct("FileStream") + .field("stream_size", &self.stream_size) + .field("buffer_size", &self.buffer_size) + .finish_non_exhaustive() + } +} + +impl FileStream { + async fn new(handle: File, buffer_size: usize) -> Result { + let stream_size = handle.metadata().await?.len(); + let handle = Arc::new(Mutex::new(handle)); + + Ok(Self { + handle, + stream_size, + buffer_size, + read_state: Arc::new(Mutex::new(ReadState::default())), + status: Status::new(Some(stream_size)), + }) + } +} + +type FileLockFuture = Pin> + Send>>; + +#[derive(Default)] +enum ReadState { + #[default] + Idle, + Locking(FileLockFuture), + Locked(OwnedMutexGuard), +} + +#[async_trait::async_trait] +impl SeekableStream for FileStream { + async fn reset(&mut self) -> azure_core::Result<()> { + *self.read_state.lock().await = ReadState::Idle; + let mut handle = self.handle.clone().lock_owned().await; + handle.seek(SeekFrom::Start(0)).await?; + self.status.reset(); + Ok(()) + } + + fn len(&self) -> usize { + self.stream_size.try_into().unwrap_or(usize::MAX) + } + + fn buffer_size(&self) -> usize { + self.buffer_size + } +} + +impl futures::io::AsyncRead for FileStream { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + slice: &mut [u8], + ) -> Poll> { + let this = self.get_mut(); + let Ok(mut state) = this.read_state.try_lock() else { + return Poll::Ready(Err(std::io::Error::other( + "read_state mutex should not be locked across poll calls", + ))); + }; + + loop { + match *state { + ReadState::Idle => { + *state = ReadState::Locking(Box::pin(this.handle.clone().lock_owned())); + } + ReadState::Locking(ref mut lock_future) => { + match Future::poll(Pin::as_mut(lock_future), cx) { + Poll::Ready(guard) => *state = ReadState::Locked(guard), + Poll::Pending => return Poll::Pending, + } + } + ReadState::Locked(ref mut guard) => { + let mut read_buf = ReadBuf::new(slice); + + return match tokio::io::AsyncRead::poll_read( + Pin::new(&mut **guard), + cx, + &mut read_buf, + ) { + Poll::Ready(Ok(())) => { + let len = read_buf.filled().len(); + this.status.inc(len); + Poll::Ready(Ok(len)) + } + Poll::Ready(Err(err)) => Poll::Ready(Err(err)), + Poll::Pending => Poll::Pending, + }; + } + } + } + } +} + +/// Upload a file to Azure Blob Store using a SAS URL. /// /// ```rust,no_run /// use avml::BlobUploader; -/// # use url::Url; /// # use avml::Result; /// # use std::path::Path; +/// # use url::Url; /// # async fn upload() -> Result<()> { /// let sas_url = Url::parse("https://contoso.com/container_name/blob_name?sas_token_here=1") /// .expect("url parsing failed"); @@ -167,8 +257,6 @@ pub struct BlobUploader { size: usize, block_size: Option, concurrency: usize, - sender: Sender, - receiver: Receiver, } impl BlobUploader { @@ -187,34 +275,21 @@ impl BlobUploader { /// Ref: #[must_use] pub fn with_blob_client(client: BlobClient) -> Self { - let (sender, receiver) = bounded::(1); - Self { client: Arc::new(client), size: DEFAULT_FILE_SIZE, block_size: None, concurrency: DEFAULT_CONCURRENCY, - sender, - receiver, } } /// Specify the size of the file to upload (in bytes) - /// - /// If the anticipated upload size is not specified, the maximum file - /// uploaded will be approximately 5TB. #[must_use] pub fn size(self, size: usize) -> Self { Self { size, ..self } } /// Specify the block size in multiples of 1MB - /// - /// If the block size is not specified and the size of the content to be - /// uploaded is provided, the default block size will be calculated to fit - /// within the bounds of the allowed number of blocks and the minimum - /// minimum required to hit high-throughput block blob performance - /// thresholds. #[must_use] pub fn block_size(self, block_size: Option) -> Self { Self { block_size, ..self } @@ -228,7 +303,7 @@ impl BlobUploader { } } - /// Upload a file to Azure Blob Store using a fully qualified SAS token + /// Upload a file to Azure Blob Store using a fully qualified SAS token. /// /// # Errors /// Returns an error if: @@ -236,124 +311,26 @@ impl BlobUploader { /// - The file size cannot be converted to a usize /// - The file is too large for Azure Blob Storage /// - There is a failure during the upload process - /// - There is a failure finalizing the block list pub async fn upload_file(mut self, filename: &Path) -> Result<()> { let file = File::open(filename).await?; let file_size = file.metadata().await?.len().try_into()?; - self.size = file_size; - self.upload_stream(file).await - } - - async fn finalize(self, block_ids: Vec) -> Result<()> { - let block_list = BlockLookupList { - latest: Some(block_ids.into_iter().map(|x| x.to_vec()).collect()), - ..Default::default() - }; - - let client = self.client.block_blob_client(); - client - .commit_block_list(block_list.try_into()?, None) - .await?; - - Ok(()) - } - - /// upload a stream to Azure Blob Store using a fully qualified SAS token - async fn upload_stream(self, handle: R) -> Result<()> - where - R: AsyncRead + Unpin + Send, - { let block_size = self.block_size.map(|x| x.saturating_mul(ONE_MB)); - let (block_size, uploaders_count) = calc_concurrency(self.size, block_size, self.concurrency)?; - let uploaders = self.uploaders(uploaders_count); - let queue_handle = self.block_reader(handle, block_size); - - let (block_list, ()) = futures::try_join!(queue_handle, uploaders)?; - - self.finalize(block_list).await - } + let stream = FileStream::new(file, DEFAULT_BUFFER_SIZE).await?; + let stream: Box = Box::new(stream); + let content: RequestContent = Body::from(stream).into(); - async fn uploaders(&self, count: usize) -> Result<()> { - let status = Status::new(Some(self.size.try_into()?)); - - let uploaders: Vec<_> = (0..usize::max(1, count)) - .map(|_| { - Self::block_uploader(self.client.clone(), self.receiver.clone(), status.clone()) - }) - .collect(); - - try_join_all(uploaders).await?; - - Ok(()) - } - - async fn block_reader(&self, mut handle: R, block_size: usize) -> Result> - where - R: AsyncRead + Unpin + Send, - { - let mut block_list = vec![]; - - for i in 0..usize::MAX { - let mut data = Vec::with_capacity(block_size); - - let mut take_handle = handle.take(block_size.try_into().unwrap_or(u64::MAX)); - let read_data = take_handle.read_to_end(&mut data).await?; - if read_data == 0 { - break; - } - handle = take_handle.into_inner(); - - if data.is_empty() { - break; - } - - let data = data.into(); - - let id = Bytes::from(format!("{i:032x}")); - - block_list.push(id.clone()); - - self.sender.send(UploadBlock { id, data }).await?; - } - self.sender.close(); - - Ok(block_list) - } - - async fn block_uploader( - client: Arc, - receiver: Receiver, - status: Status, - ) -> Result<()> { - let client = client.block_blob_client(); - - // the channel will respond with an Err to indicate the channel is closed - while let Ok(upload_chunk) = receiver.recv().await { - let chunk_len = upload_chunk.data.len(); - - let content_length = chunk_len.try_into()?; - let result = client - .stage_block( - upload_chunk.id.as_ref(), - content_length, - upload_chunk.data.into(), - None, - ) - .await; - - // as soon as any error is seen (after retrying), bail out and stop other uploaders - if result.is_err() { - receiver.close(); - result?; - } + let options = BlobClientUploadOptions { + parallel: NonZeroUsize::new(uploaders_count), + partition_size: NonZeroUsize::new(block_size), + ..Default::default() + }; - status.inc(chunk_len); - } + self.client.upload(content, Some(options)).await?; Ok(()) } @@ -362,6 +339,8 @@ impl BlobUploader { #[cfg(test)] mod tests { use super::*; + use futures::AsyncReadExt as _; + use std::time::{SystemTime, UNIX_EPOCH}; const ONE_GB: usize = ONE_MB.saturating_mul(1024); const ONE_TB: usize = ONE_GB.saturating_mul(1024); @@ -433,4 +412,41 @@ mod tests { ); Ok(()) } + + #[tokio::test] + async fn test_file_stream_reset() -> Result<()> { + let path = std::env::temp_dir().join(format!( + "avml-blob-upload-{}-{}.bin", + std::process::id(), + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_nanos() + )); + let expected = b"seekable stream content"; + tokio::fs::write(&path, expected).await?; + + let result = async { + let file = File::open(&path).await?; + let mut stream = FileStream::new(file, 8).await?; + + assert_eq!(stream.len(), expected.len()); + + let mut prefix = [0_u8; 8]; + stream.read_exact(&mut prefix).await?; + assert_eq!(&prefix, b"seekable"); + + stream.reset().await?; + + let mut reread = Vec::new(); + stream.read_to_end(&mut reread).await?; + assert_eq!(reread, expected); + + Result::<()>::Ok(()) + } + .await; + + let _ = tokio::fs::remove_file(&path).await; + result + } } diff --git a/src/upload/status.rs b/src/upload/status.rs index 130b759a..194558f1 100644 --- a/src/upload/status.rs +++ b/src/upload/status.rs @@ -30,6 +30,13 @@ impl Status { Self { bar, total } } + #[cfg(feature = "blobstore")] + pub fn reset(&self) { + if let Some(ref bar) = self.bar { + bar.reset(); + } + } + pub fn inc(&self, n: usize) { if let Some(ref bar) = self.bar { bar.inc(n.try_into().unwrap_or(u64::MAX)); @@ -51,4 +58,8 @@ impl Status { } #[allow(clippy::unused_self)] pub fn inc(&self, _n: usize) {} + + #[cfg(feature = "blobstore")] + #[allow(clippy::unused_self)] + pub fn reset(&self) {} } From 8af20841705831a6a1f47dd7adacd4bcd155b28e Mon Sep 17 00:00:00 2001 From: Brian Caswell Date: Wed, 18 Mar 2026 11:23:51 +0000 Subject: [PATCH 06/11] address copilot PR feedback --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 9ecbaf71..0fe0f3f8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,7 +17,7 @@ default = ["put", "blobstore", "native-tls", "status"] put = ["dep:reqwest", "reqwest?/stream", "dep:url", "dep:tokio", "dep:tokio-util"] blobstore = ["dep:url", "dep:azure_core", "dep:azure_storage_blob", "dep:tokio", "dep:tokio-util", "dep:async-trait"] status = ["dep:indicatif"] -native-tls = ["dep:native-tls", "azure_core?/reqwest_native_tls"] +native-tls = ["dep:native-tls", "azure_core?/reqwest_native_tls", "reqwest?/native-tls-vendored"] [dependencies] async-trait = {version="0.1", optional=true} From 8b54936e1758009c9fa6fca0807e29481c6baa43 Mon Sep 17 00:00:00 2001 From: Brian Caswell Date: Wed, 18 Mar 2026 12:00:13 +0000 Subject: [PATCH 07/11] address feedback --- src/upload/blobstore.rs | 34 ++++++++++++++++++++++++++++++---- 1 file changed, 30 insertions(+), 4 deletions(-) diff --git a/src/upload/blobstore.rs b/src/upload/blobstore.rs index f8890723..8b9d8317 100644 --- a/src/upload/blobstore.rs +++ b/src/upload/blobstore.rs @@ -88,7 +88,23 @@ const REASONABLE_BLOCK_SIZE: usize = ONE_MB.saturating_mul(100); /// concurrency will get disabled. const MEMORY_THRESHOLD: usize = 500 * ONE_MB; -/// Default anticipated upload size before the actual file size is known. +/// Heuristic "very large file" size used when the actual file size is not yet +/// known (for example, before probing a file or when size discovery fails). +/// This value feeds into block-size and concurrency calculations so that, in +/// the worst case, we behave as if we are uploading a large image while still +/// staying well below `BLOB_MAX_FILE_SIZE`. +/// +/// 1 TB is chosen as a conservative upper-bound estimate: +/// - It is large enough to exercise the "large upload" code paths, ensuring +/// that concurrency and block sizing are not overly optimistic when size +/// information is missing. +/// - It is small enough compared to Azure's maximum blob size that the +/// resulting configuration will not violate service limits. +/// +/// Once the real file size is known, it is validated against +/// `BLOB_MAX_FILE_SIZE` and used for the final concurrency/block-size +/// decisions; this constant only affects the initial tuning in the absence of +/// reliable size information. const DEFAULT_FILE_SIZE: usize = 1024 * 1024 * 1024 * 1024; fn calc_concurrency( @@ -125,6 +141,16 @@ fn calc_concurrency( Ok((block_size, upload_concurrency)) } +/// A seekable file-backed stream used for blob uploads. +/// +/// `FileStream` is `Clone` because the Azure core body/retry machinery may need +/// to duplicate the stream. All clones share the same underlying file handle +/// and read cursor state via `Arc>`. +/// +/// Invariants: +/// - At most one clone is actively reading from the stream at any time. +/// - All reads go through the shared `read_state` so that the current cursor +/// position is coordinated between clones. #[derive(Clone)] struct FileStream { handle: Arc>, @@ -195,9 +221,9 @@ impl futures::io::AsyncRead for FileStream { ) -> Poll> { let this = self.get_mut(); let Ok(mut state) = this.read_state.try_lock() else { - return Poll::Ready(Err(std::io::Error::other( - "read_state mutex should not be locked across poll calls", - ))); + // Another task is currently holding `read_state`; yield and try again later + cx.waker().wake_by_ref(); + return Poll::Pending; }; loop { From 8c9bda59ae873cedd1c7472015055f2342e48758 Mon Sep 17 00:00:00 2001 From: Brian Caswell Date: Wed, 18 Mar 2026 12:02:18 +0000 Subject: [PATCH 08/11] remove status as a default feature --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 0fe0f3f8..21ceaf8b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,7 +13,7 @@ edition = "2024" rust-version = "1.88.0" [features] -default = ["put", "blobstore", "native-tls", "status"] +default = ["put", "blobstore", "native-tls"] put = ["dep:reqwest", "reqwest?/stream", "dep:url", "dep:tokio", "dep:tokio-util"] blobstore = ["dep:url", "dep:azure_core", "dep:azure_storage_blob", "dep:tokio", "dep:tokio-util", "dep:async-trait"] status = ["dep:indicatif"] From 8f299718ce8bce7a105861f858fb0b5d97168a4e Mon Sep 17 00:00:00 2001 From: Brian Caswell Date: Wed, 18 Mar 2026 12:17:18 +0000 Subject: [PATCH 09/11] don't expose setting the size for BlobUploader, as this isn't used --- src/upload/blobstore.rs | 32 ++------------------------------ 1 file changed, 2 insertions(+), 30 deletions(-) diff --git a/src/upload/blobstore.rs b/src/upload/blobstore.rs index 8b9d8317..37ddd990 100644 --- a/src/upload/blobstore.rs +++ b/src/upload/blobstore.rs @@ -88,25 +88,6 @@ const REASONABLE_BLOCK_SIZE: usize = ONE_MB.saturating_mul(100); /// concurrency will get disabled. const MEMORY_THRESHOLD: usize = 500 * ONE_MB; -/// Heuristic "very large file" size used when the actual file size is not yet -/// known (for example, before probing a file or when size discovery fails). -/// This value feeds into block-size and concurrency calculations so that, in -/// the worst case, we behave as if we are uploading a large image while still -/// staying well below `BLOB_MAX_FILE_SIZE`. -/// -/// 1 TB is chosen as a conservative upper-bound estimate: -/// - It is large enough to exercise the "large upload" code paths, ensuring -/// that concurrency and block sizing are not overly optimistic when size -/// information is missing. -/// - It is small enough compared to Azure's maximum blob size that the -/// resulting configuration will not violate service limits. -/// -/// Once the real file size is known, it is validated against -/// `BLOB_MAX_FILE_SIZE` and used for the final concurrency/block-size -/// decisions; this constant only affects the initial tuning in the absence of -/// reliable size information. -const DEFAULT_FILE_SIZE: usize = 1024 * 1024 * 1024 * 1024; - fn calc_concurrency( file_size: usize, block_size: Option, @@ -280,7 +261,6 @@ impl futures::io::AsyncRead for FileStream { #[derive(Clone)] pub struct BlobUploader { client: Arc, - size: usize, block_size: Option, concurrency: usize, } @@ -303,18 +283,11 @@ impl BlobUploader { pub fn with_blob_client(client: BlobClient) -> Self { Self { client: Arc::new(client), - size: DEFAULT_FILE_SIZE, block_size: None, concurrency: DEFAULT_CONCURRENCY, } } - /// Specify the size of the file to upload (in bytes) - #[must_use] - pub fn size(self, size: usize) -> Self { - Self { size, ..self } - } - /// Specify the block size in multiples of 1MB #[must_use] pub fn block_size(self, block_size: Option) -> Self { @@ -337,14 +310,13 @@ impl BlobUploader { /// - The file size cannot be converted to a usize /// - The file is too large for Azure Blob Storage /// - There is a failure during the upload process - pub async fn upload_file(mut self, filename: &Path) -> Result<()> { + pub async fn upload_file(self, filename: &Path) -> Result<()> { let file = File::open(filename).await?; let file_size = file.metadata().await?.len().try_into()?; - self.size = file_size; let block_size = self.block_size.map(|x| x.saturating_mul(ONE_MB)); let (block_size, uploaders_count) = - calc_concurrency(self.size, block_size, self.concurrency)?; + calc_concurrency(file_size, block_size, self.concurrency)?; let stream = FileStream::new(file, DEFAULT_BUFFER_SIZE).await?; let stream: Box = Box::new(stream); From b1bacec008f6c6881d9d08f936ee6d678c2811af Mon Sep 17 00:00:00 2001 From: Brian Caswell Date: Wed, 18 Mar 2026 13:18:31 +0000 Subject: [PATCH 10/11] bump rev --- Cargo.lock | 2 +- Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4c68598e..3ac8719e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -44,7 +44,7 @@ checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" [[package]] name = "avml" -version = "0.17.0" +version = "0.18.0" dependencies = [ "async-trait", "azure_core", diff --git a/Cargo.toml b/Cargo.toml index 21ceaf8b..f456fdca 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "avml" -version = "0.17.0" +version = "0.18.0" license = "MIT" description = "A portable volatile memory acquisition tool" authors = ["avml@microsoft.com"] From d2615abf5a1289f9b6f9fcaa73b1ddf118073548 Mon Sep 17 00:00:00 2001 From: Brian Caswell Date: Thu, 19 Mar 2026 14:29:30 +0000 Subject: [PATCH 11/11] use NonZeroUsize for sizes --- README.md | 4 +- eng/build.sh | 1 + src/bin/avml-upload.rs | 14 +- src/bin/avml.rs | 12 +- src/upload/blobstore.rs | 308 +++++++++++++++++++++++++--------------- 5 files changed, 207 insertions(+), 132 deletions(-) diff --git a/README.md b/README.md index 66123f24..31fc360f 100644 --- a/README.md +++ b/README.md @@ -140,10 +140,10 @@ Options: upload via Azure Blob Store upon acquisition --sas-block-size - specify maximum block size in MiB + specify maximum block size in MiB; must be greater than 0 --sas-block-concurrency - specify blob upload concurrency + specify blob upload concurrency; must be greater than 0 [default: 10] diff --git a/eng/build.sh b/eng/build.sh index f3f1fb50..d27ce5ce 100755 --- a/eng/build.sh +++ b/eng/build.sh @@ -11,6 +11,7 @@ cd $(dirname ${BASH_SOURCE[0]})/../ ARCH=$(uname -m) cargo +stable test --release --target ${ARCH}-unknown-linux-musl --locked --all-targets --all-features +cargo +stable test --release --target ${ARCH}-unknown-linux-musl --locked --doc --all-features for FEATURE in $(cargo metadata --locked --format-version 1 | jq '.packages | [.[] | select(.name=="avml")][0].features | keys | @tsv' -r); do cargo +stable check --release --target ${ARCH}-unknown-linux-musl --locked --no-default-features --features ${FEATURE} --features native-tls done diff --git a/src/bin/avml-upload.rs b/src/bin/avml-upload.rs index b4835c3f..3e5e217c 100644 --- a/src/bin/avml-upload.rs +++ b/src/bin/avml-upload.rs @@ -7,9 +7,9 @@ #![deny(clippy::manual_assert)] #![deny(clippy::indexing_slicing)] -use avml::{BlobUploader, DEFAULT_CONCURRENCY, Error, Result, put}; +use avml::{BlobUploader, Error, Result, put}; use clap::{Parser, Subcommand}; -use std::path::PathBuf; +use std::{num::NonZeroUsize, path::PathBuf}; use tokio::runtime::Runtime; use url::Url; @@ -37,13 +37,13 @@ enum Commands { /// url to upload via Azure Blob Storage url: Url, - /// specify blob upload concurrency - #[arg(long, default_value_t=DEFAULT_CONCURRENCY)] - sas_block_concurrency: usize, + /// specify blob upload concurrency; must be greater than 0 + #[arg(long)] + sas_block_concurrency: Option, - /// specify maximum block size in MiB + /// specify maximum block size in MiB; must be greater than 0 #[arg(long)] - sas_block_size: Option, + sas_block_size: Option, }, } diff --git a/src/bin/avml.rs b/src/bin/avml.rs index 5274d6ea..6217d9e8 100644 --- a/src/bin/avml.rs +++ b/src/bin/avml.rs @@ -11,6 +11,8 @@ use avml::Error; use avml::{Result, Snapshot, Source, iomem}; use clap::Parser; +#[cfg(feature = "blobstore")] +use std::num::NonZeroUsize; use std::{num::NonZeroU64, ops::Range, path::PathBuf}; #[cfg(any(feature = "blobstore", feature = "put"))] use tokio::{fs::remove_file, runtime::Runtime}; @@ -52,15 +54,15 @@ struct Config { #[arg(long)] sas_url: Option, - /// specify maximum block size in MiB + /// specify maximum block size in MiB; must be greater than 0 #[cfg(feature = "blobstore")] #[arg(long)] - sas_block_size: Option, + sas_block_size: Option, - /// specify blob upload concurrency + /// specify blob upload concurrency; must be greater than 0 #[cfg(feature = "blobstore")] - #[arg(long, default_value_t=avml::DEFAULT_CONCURRENCY)] - sas_block_concurrency: usize, + #[arg(long)] + sas_block_concurrency: Option, /// name of the file to write to on local system filename: PathBuf, diff --git a/src/upload/blobstore.rs b/src/upload/blobstore.rs index 37ddd990..50829ca9 100644 --- a/src/upload/blobstore.rs +++ b/src/upload/blobstore.rs @@ -38,30 +38,44 @@ pub enum Error { #[error(transparent)] IntConversion(#[from] core::num::TryFromIntError), + + #[error(transparent)] + InvalidUrl(#[from] url::ParseError), } type Result = core::result::Result; +#[allow(clippy::expect_used)] +const ONE_MB_NZ: NonZeroUsize = NonZeroUsize::new(ONE_MB).expect("ONE_MB must be non-zero"); + /// Maximum number of blocks /// /// -const BLOB_MAX_BLOCKS: usize = 50_000; +#[allow(clippy::expect_used)] +const BLOB_MAX_BLOCKS: NonZeroUsize = + NonZeroUsize::new(50_000).expect("blob max blocks must be non-zero"); /// Maximum size of any single block /// /// -const BLOB_MAX_BLOCK_SIZE: usize = ONE_MB.saturating_mul(4000); +#[allow(clippy::expect_used)] +const BLOB_MAX_BLOCK_SIZE: NonZeroUsize = ONE_MB_NZ.saturating_mul( + NonZeroUsize::new(4000).expect("blob max block size multiplier must be non-zero"), +); /// Maximum total size of a file /// /// -const BLOB_MAX_FILE_SIZE: usize = BLOB_MAX_BLOCKS.saturating_mul(BLOB_MAX_BLOCK_SIZE); +#[allow(clippy::expect_used)] +const BLOB_MAX_FILE_SIZE: NonZeroUsize = BLOB_MAX_BLOCKS.saturating_mul(BLOB_MAX_BLOCK_SIZE); /// Minimum block size, which is required to trigger the "high-throughput block /// blobs" feature on all storage accounts /// /// -const BLOB_MIN_BLOCK_SIZE: usize = ONE_MB.saturating_mul(5); +#[allow(clippy::expect_used)] +const BLOB_MIN_BLOCK_SIZE: NonZeroUsize = ONE_MB_NZ + .saturating_mul(NonZeroUsize::new(5).expect("blob min block size multiplier must be non-zero")); /// Azure's default max request rate for a storage account is 20,000 per second. /// By keeping to 10 or fewer concurrent upload threads, AVML can be used to @@ -69,7 +83,9 @@ const BLOB_MIN_BLOCK_SIZE: usize = ONE_MB.saturating_mul(5); /// VM scaleset) to a single default storage account. /// /// -const MAX_CONCURRENCY: usize = 10; +#[allow(clippy::expect_used)] +const MAX_CONCURRENCY: NonZeroUsize = + NonZeroUsize::new(10).expect("max concurrency must be non-zero"); /// Azure's default max request rate for a storage account is 20,000 per second. /// By keeping to 10 or fewer concurrent upload threads, AVML can be used to @@ -77,51 +93,70 @@ const MAX_CONCURRENCY: usize = 10; /// VM scaleset) to a single default storage account. /// /// -pub const DEFAULT_CONCURRENCY: usize = 10; - -/// As chunks stay in memory until the upload is complete, as to enable -/// automatic retries in the case of TCP or HTTP errors, chunk sizes for huge -/// files are capped to 100MB each. -const REASONABLE_BLOCK_SIZE: usize = ONE_MB.saturating_mul(100); +#[allow(clippy::expect_used)] +pub const DEFAULT_CONCURRENCY: NonZeroUsize = + NonZeroUsize::new(10).expect("default concurrency must be non-zero"); + +/// Keep at most 500MB of block data in flight across all uploaders. +#[allow(clippy::expect_used)] +const MEMORY_THRESHOLD: NonZeroUsize = ONE_MB_NZ + .saturating_mul(NonZeroUsize::new(500).expect("memory threshold multiplier must be non-zero")); + +fn calc_block_size( + file_size: NonZeroUsize, + block_size: Option, +) -> Result { + let block_size = match block_size { + Some(block_size) => block_size, + None => NonZeroUsize::new(file_size.get().div_ceil(BLOB_MAX_BLOCKS.get())) + .ok_or(Error::TooLarge)?, + }; -/// Try to keep under 500MB in flight. If that's not possible due to block size, -/// concurrency will get disabled. -const MEMORY_THRESHOLD: usize = 500 * ONE_MB; + Ok(cmp::min( + cmp::max(block_size, BLOB_MIN_BLOCK_SIZE), + BLOB_MAX_BLOCK_SIZE, + )) +} fn calc_concurrency( - file_size: usize, - block_size: Option, - upload_concurrency: usize, -) -> Result<(usize, usize)> { + file_size: NonZeroUsize, + block_size: Option, + upload_concurrency: Option, +) -> Result<(NonZeroUsize, NonZeroUsize)> { if file_size > BLOB_MAX_FILE_SIZE { return Err(Error::TooLarge); } - - let block_size = match block_size { - Some(0) | None => match file_size { - x if x < BLOB_MIN_BLOCK_SIZE * BLOB_MAX_BLOCKS => BLOB_MIN_BLOCK_SIZE, - x if x < REASONABLE_BLOCK_SIZE * BLOB_MAX_BLOCKS => REASONABLE_BLOCK_SIZE, - x => (x / BLOB_MAX_BLOCKS) - .checked_add(1) - .ok_or(Error::TooLarge)?, - }, - Some(x) if x <= BLOB_MIN_BLOCK_SIZE => BLOB_MIN_BLOCK_SIZE, - Some(x) => x, - }; - - let block_size = usize::min(block_size, BLOB_MAX_BLOCK_SIZE); - - let upload_concurrency = match upload_concurrency { - 0 | 1 => 1, - _ => match MEMORY_THRESHOLD.checked_div(block_size) { - None | Some(0) => 1, - Some(x) => cmp::min(MAX_CONCURRENCY, x), - }, + let block_size = calc_block_size(file_size, block_size)?; + + let memory_limited_concurrency = match NonZeroUsize::new( + MEMORY_THRESHOLD + .get() + .checked_div(block_size.get()) + .unwrap_or(0), + ) { + Some(concurrency) => concurrency, + None => NonZeroUsize::MIN, }; + let upload_concurrency = cmp::min( + cmp::min( + upload_concurrency.unwrap_or(DEFAULT_CONCURRENCY), + memory_limited_concurrency, + ), + MAX_CONCURRENCY, + ); Ok((block_size, upload_concurrency)) } +fn upload_parameters( + file_size: NonZeroUsize, + block_size: Option, + upload_concurrency: Option, +) -> Result<(NonZeroUsize, NonZeroUsize)> { + let block_size = block_size.map(|x| x.saturating_mul(ONE_MB_NZ)); + calc_concurrency(file_size, block_size, upload_concurrency) +} + /// A seekable file-backed stream used for blob uploads. /// /// `FileStream` is `Clone` because the Azure core body/retry machinery may need @@ -245,15 +280,15 @@ impl futures::io::AsyncRead for FileStream { /// ```rust,no_run /// use avml::BlobUploader; /// # use avml::Result; -/// # use std::path::Path; +/// # use std::{num::NonZeroUsize, path::Path}; /// # use url::Url; /// # async fn upload() -> Result<()> { /// let sas_url = Url::parse("https://contoso.com/container_name/blob_name?sas_token_here=1") /// .expect("url parsing failed"); /// let path = Path::new("/tmp/image.lime"); /// let uploader = BlobUploader::new(&sas_url)? -/// .block_size(Some(100)) -/// .concurrency(5); +/// .block_size(NonZeroUsize::new(100)) +/// .concurrency(NonZeroUsize::new(5)); /// uploader.upload_file(&path).await?; /// # Ok(()) /// # } @@ -261,8 +296,8 @@ impl futures::io::AsyncRead for FileStream { #[derive(Clone)] pub struct BlobUploader { client: Arc, - block_size: Option, - concurrency: usize, + block_size: Option, + concurrency: Option, } impl BlobUploader { @@ -284,18 +319,19 @@ impl BlobUploader { Self { client: Arc::new(client), block_size: None, - concurrency: DEFAULT_CONCURRENCY, + concurrency: None, } } - /// Specify the block size in multiples of 1MB + /// Specify a positive block size in multiples of 1MB. #[must_use] - pub fn block_size(self, block_size: Option) -> Self { + pub fn block_size(self, block_size: Option) -> Self { Self { block_size, ..self } } + /// Specify a positive upload concurrency. #[must_use] - pub fn concurrency(self, concurrency: usize) -> Self { + pub fn concurrency(self, concurrency: Option) -> Self { Self { concurrency, ..self @@ -313,18 +349,19 @@ impl BlobUploader { pub async fn upload_file(self, filename: &Path) -> Result<()> { let file = File::open(filename).await?; let file_size = file.metadata().await?.len().try_into()?; - - let block_size = self.block_size.map(|x| x.saturating_mul(ONE_MB)); + let Some(file_size) = NonZeroUsize::new(file_size) else { + return Ok(()); + }; let (block_size, uploaders_count) = - calc_concurrency(file_size, block_size, self.concurrency)?; + upload_parameters(file_size, self.block_size, self.concurrency)?; let stream = FileStream::new(file, DEFAULT_BUFFER_SIZE).await?; let stream: Box = Box::new(stream); let content: RequestContent = Body::from(stream).into(); let options = BlobClientUploadOptions { - parallel: NonZeroUsize::new(uploaders_count), - partition_size: NonZeroUsize::new(block_size), + parallel: Some(uploaders_count), + partition_size: Some(block_size), ..Default::default() }; @@ -340,74 +377,91 @@ mod tests { use futures::AsyncReadExt as _; use std::time::{SystemTime, UNIX_EPOCH}; - const ONE_GB: usize = ONE_MB.saturating_mul(1024); - const ONE_TB: usize = ONE_GB.saturating_mul(1024); + fn non_zero(value: usize) -> Result { + NonZeroUsize::new(value).ok_or(Error::TooLarge) + } + + fn bytes_from_mib(mebibytes: usize) -> Result { + non_zero(mebibytes.checked_mul(ONE_MB).ok_or(Error::TooLarge)?) + } + + fn bytes_from_gib(gibibytes: usize) -> Result { + bytes_from_mib(gibibytes.checked_mul(1024).ok_or(Error::TooLarge)?) + } #[test] - fn test_calc_concurrency() -> Result<()> { - assert_eq!( - (BLOB_MIN_BLOCK_SIZE, 10), - calc_concurrency(ONE_MB * 300, Some(1), DEFAULT_CONCURRENCY)?, - "specified blocksize would overflow block count, so we use the minimum block size" - ); - - assert_eq!( - (BLOB_MIN_BLOCK_SIZE, 10), - calc_concurrency(ONE_GB * 30, Some(ONE_MB), DEFAULT_CONCURRENCY)?, - "30GB file, 1MB blocks" - ); - - assert_eq!( - (ONE_MB * 100, 5), - calc_concurrency(ONE_GB * 30, Some(ONE_MB * 100), DEFAULT_CONCURRENCY)?, - "30GB file, 100MB block size" - ); - - assert_eq!( - (5 * ONE_MB, 10), - calc_concurrency(ONE_MB * 400, None, DEFAULT_CONCURRENCY)?, - "400MB file, no block size" - ); - - assert_eq!( - (5 * ONE_MB, 10), - calc_concurrency(ONE_GB * 16, None, DEFAULT_CONCURRENCY)?, - "16GB file, no block size" - ); - - assert_eq!( - (5 * ONE_MB, 10), - calc_concurrency(ONE_GB * 32, None, DEFAULT_CONCURRENCY)?, - "32GB file, no block size", - ); - - assert_eq!( - (ONE_MB * 100, 5), - calc_concurrency(ONE_TB, None, DEFAULT_CONCURRENCY)?, - "1TB file, no block size" - ); - - assert_eq!( - (100 * ONE_MB, 5), - calc_concurrency(ONE_TB * 4, None, DEFAULT_CONCURRENCY)?, - "4TB file, no block size" - ); - - assert_eq!( - (100 * ONE_MB, 5), - calc_concurrency(ONE_TB * 4, Some(0), DEFAULT_CONCURRENCY)?, - "4TB file, zero block size" - ); + fn small_files_use_minimum_block_size_and_default_concurrency() -> Result<()> { + let (block_size, concurrency) = upload_parameters(bytes_from_mib(400)?, None, None)?; - let (block_size, uploaders_count) = - calc_concurrency(ONE_TB.saturating_mul(32), None, DEFAULT_CONCURRENCY)?; - assert!(block_size > REASONABLE_BLOCK_SIZE && block_size < BLOB_MAX_BLOCK_SIZE); - assert_eq!(uploaders_count, 1); - - assert!( - calc_concurrency((BLOB_MAX_BLOCKS * BLOB_MAX_BLOCK_SIZE) + 1, None, 10).is_err(), - "files beyond max size should fail" - ); + assert_eq!(block_size, bytes_from_mib(5)?); + assert_eq!(concurrency, DEFAULT_CONCURRENCY); + Ok(()) + } + + #[test] + fn user_block_size_is_clamped_to_minimum() -> Result<()> { + let (block_size, concurrency) = + upload_parameters(bytes_from_mib(300)?, Some(NonZeroUsize::MIN), None)?; + + assert_eq!(block_size, bytes_from_mib(5)?); + assert_eq!(concurrency, DEFAULT_CONCURRENCY); + Ok(()) + } + + #[test] + fn requested_concurrency_caps_memory_limited_uploaders() -> Result<()> { + let (block_size, concurrency) = upload_parameters( + bytes_from_gib(30)?, + Some(non_zero(100)?), + Some(non_zero(3)?), + )?; + + assert_eq!(block_size, bytes_from_mib(100)?); + assert_eq!(concurrency, non_zero(3)?); + Ok(()) + } + + #[test] + fn auto_block_size_grows_when_minimum_would_exceed_max_blocks() -> Result<()> { + let file_size = non_zero( + bytes_from_mib(5)? + .get() + .checked_mul(50_000) + .ok_or(Error::TooLarge)? + .checked_add(1) + .ok_or(Error::TooLarge)?, + )?; + let expected_block_size = non_zero(file_size.get().div_ceil(50_000))?; + let (block_size, concurrency) = upload_parameters(file_size, None, None)?; + + assert_eq!(block_size, expected_block_size); + assert_eq!(concurrency, DEFAULT_CONCURRENCY); + Ok(()) + } + + #[test] + fn huge_blocks_still_use_at_least_one_uploader() -> Result<()> { + let (block_size, concurrency) = + upload_parameters(bytes_from_gib(30)?, Some(non_zero(600)?), None)?; + + assert_eq!(block_size, bytes_from_mib(600)?); + assert_eq!(concurrency, NonZeroUsize::MIN); + Ok(()) + } + + #[test] + fn files_larger_than_azure_limit_are_rejected() -> Result<()> { + let oversized_file = non_zero( + bytes_from_mib(50_000usize.checked_mul(4000).ok_or(Error::TooLarge)?)? + .get() + .checked_add(1) + .ok_or(Error::TooLarge)?, + )?; + + assert!(matches!( + upload_parameters(oversized_file, None, None), + Err(Error::TooLarge) + )); Ok(()) } @@ -447,4 +501,22 @@ mod tests { let _ = tokio::fs::remove_file(&path).await; result } + + #[tokio::test] + async fn test_upload_file_empty_is_noop() -> Result<()> { + let path = std::env::temp_dir().join(format!( + "avml-empty-blob-upload-{}-{}.bin", + std::process::id(), + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_nanos() + )); + tokio::fs::write(&path, []).await?; + + let url = Url::parse("https://127.0.0.1:9/container/blob?sig=test")?; + BlobUploader::new(&url)?.upload_file(&path).await?; + let _ = tokio::fs::remove_file(&path).await; + Ok(()) + } }