diff --git a/Cargo.lock b/Cargo.lock index c7d8be9..3ac8719 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,12 +2,6 @@ # It is not intended for manual editing. version = 4 -[[package]] -name = "RustyXML" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b5ace29ee3216de37c0546865ad08edef58b0f9e76838ed8959a84a990e58c5" - [[package]] name = "anstyle" version = "1.0.13" @@ -20,36 +14,13 @@ version = "1.0.102" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f202df86484c868dbad7eaa557ef785d5c66295e41b460ef922eca0723b842c" -[[package]] -name = "async-channel" -version = "1.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81953c529336010edd6d8e358f886d9581267795c61b19475b71314bffa46d35" -dependencies = [ - "concurrent-queue", - "event-listener 2.5.3", - "futures-core", -] - -[[package]] -name = "async-channel" -version = "2.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "924ed96dd52d1b75e9c1a3e6275715fd320f5f9439fb5a4a11fa51f4221158d2" -dependencies = [ - "concurrent-queue", - "event-listener-strategy", - "futures-core", - "pin-project-lite", -] - [[package]] name = "async-lock" version = "3.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "290f7f2596bd5b78a9fec8088ccd89180d7f9f55b94b0576823bbbdc72ee8311" dependencies = [ - "event-listener 5.4.1", + "event-listener", "event-listener-strategy", "pin-project-lite", ] @@ -73,11 +44,11 @@ checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" [[package]] name = "avml" -version = "0.17.0" +version = "0.18.0" dependencies = [ - "async-channel 2.5.0", + "async-trait", "azure_core", - "azure_storage_blobs", + "azure_storage_blob", "byteorder", "bytes", "clap", @@ -91,7 +62,7 @@ dependencies = [ "rand 0.9.2", "reqwest", "snap", - "thiserror 2.0.18", + "thiserror", "tokio", "tokio-util", "url", @@ -99,93 +70,54 @@ dependencies = [ [[package]] name = "azure_core" -version = "0.21.0" +version = "0.33.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b552ad43a45a746461ec3d3a51dfb6466b4759209414b439c165eb6a6b7729e" +checksum = "bd0160068f7a3021b5e749dc552374e82360463e9fb51e1127631a69fdde641f" dependencies = [ + "async-lock", "async-trait", - "base64 0.22.1", + "azure_core_macros", "bytes", - "dyn-clone", "futures", - "getrandom 0.2.17", - "http-types", - "once_cell", - "paste", "pin-project", - "quick-xml", - "rand 0.8.5", "rustc_version", "serde", "serde_json", - "time", - "tracing", - "url", - "uuid", -] - -[[package]] -name = "azure_storage" -version = "0.21.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "59f838159f4d29cb400a14d9d757578ba495ae64feb07a7516bf9e4415127126" -dependencies = [ - "RustyXML", - "async-lock", - "async-trait", - "azure_core", - "bytes", - "serde", - "serde_derive", - "time", + "tokio", "tracing", - "url", - "uuid", + "typespec", + "typespec_client_core", ] [[package]] -name = "azure_storage_blobs" -version = "0.21.0" +name = "azure_core_macros" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97e83c3636ae86d9a6a7962b2112e3b19eb3903915c50ce06ff54ff0a2e6a7e4" +checksum = "1bd69a8e70ec6be32ebf7e947cf9a58f6c7255e4cd9c48e640532ef3e37adc6d" dependencies = [ - "RustyXML", - "azure_core", - "azure_storage", - "azure_svc_blobstorage", - "bytes", - "futures", - "serde", - "serde_derive", - "serde_json", - "time", + "proc-macro2", + "quote", + "syn", "tracing", - "url", - "uuid", ] [[package]] -name = "azure_svc_blobstorage" -version = "0.21.0" +name = "azure_storage_blob" +version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4e6c6f20c5611b885ba94c7bae5e02849a267381aecb8aee577e8c35ff4064c6" +checksum = "89cd8900ebab0da5cde56e07b8d9e4b94bfa5ede5ee6f5429027b93b58e1104e" dependencies = [ + "async-trait", "azure_core", "bytes", "futures", - "log", - "once_cell", + "percent-encoding", + "pin-project", "serde", "serde_json", "time", ] -[[package]] -name = "base64" -version = "0.13.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8" - [[package]] name = "base64" version = "0.22.1" @@ -232,6 +164,17 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9330f8b2ff13f34540b44e946ef35111825727b38d33286ef986142615121801" +[[package]] +name = "chacha20" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f8d983286843e49675a4b7a2d174efe136dc93a18d69130dd18198a6c167601" +dependencies = [ + "cfg-if", + "cpufeatures", + "rand_core 0.10.0", +] + [[package]] name = "clap" version = "4.6.0" @@ -319,6 +262,15 @@ version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" +[[package]] +name = "cpufeatures" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b2a41393f66f16b0823bb79094d54ac5fbd34ab292ddafb9a0456ac9f87d201" +dependencies = [ + "libc", +] + [[package]] name = "crossbeam-utils" version = "0.8.21" @@ -380,12 +332,6 @@ dependencies = [ "windows-sys 0.61.2", ] -[[package]] -name = "event-listener" -version = "2.5.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" - [[package]] name = "event-listener" version = "5.4.1" @@ -403,19 +349,10 @@ version = "0.5.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8be9f3dfaaffdae2972880079a491a1a8bb7cbed0b8dd7a347f668b4150a3b93" dependencies = [ - "event-listener 5.4.1", + "event-listener", "pin-project-lite", ] -[[package]] -name = "fastrand" -version = "1.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e51093e27b0797c359783294ca4f0a911c270184cb10f85783b118614a1501be" -dependencies = [ - "instant", -] - [[package]] name = "fastrand" version = "2.3.0" @@ -506,21 +443,6 @@ version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cecba35d7ad927e23624b22ad55235f2239cfa44fd10428eecbeba6d6a717718" -[[package]] -name = "futures-lite" -version = "1.13.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49a9d51ce47660b1e808d3c990b4709f2f415d928835a17dfd16991515c46bce" -dependencies = [ - "fastrand 1.9.0", - "futures-core", - "futures-io", - "memchr", - "parking", - "pin-project-lite", - "waker-fn", -] - [[package]] name = "futures-macro" version = "0.3.32" @@ -561,30 +483,6 @@ dependencies = [ "slab", ] -[[package]] -name = "getrandom" -version = "0.1.16" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8fc3cb4d91f53b50155bdcfd23f6a4c39ae1969c2ae85982b135750cccaf5fce" -dependencies = [ - "cfg-if", - "libc", - "wasi 0.9.0+wasi-snapshot-preview1", -] - -[[package]] -name = "getrandom" -version = "0.2.17" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ff2abc00be7fca6ebc474524697ae276ad847ad0a6b3faa4bcb027e9a4614ad0" -dependencies = [ - "cfg-if", - "js-sys", - "libc", - "wasi 0.11.1+wasi-snapshot-preview1", - "wasm-bindgen", -] - [[package]] name = "getrandom" version = "0.4.2" @@ -594,6 +492,7 @@ dependencies = [ "cfg-if", "libc", "r-efi", + "rand_core 0.10.0", "wasip2", "wasip3", ] @@ -652,26 +551,6 @@ dependencies = [ "pin-project-lite", ] -[[package]] -name = "http-types" -version = "2.12.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6e9b187a72d63adbfba487f48095306ac823049cb504ee195541e91c7775f5ad" -dependencies = [ - "anyhow", - "async-channel 1.9.0", - "base64 0.13.1", - "futures-lite", - "infer", - "pin-project-lite", - "rand 0.7.3", - "serde", - "serde_json", - "serde_qs", - "serde_urlencoded", - "url", -] - [[package]] name = "httparse" version = "1.10.1" @@ -699,13 +578,29 @@ dependencies = [ "want", ] +[[package]] +name = "hyper-tls" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70206fc6890eaca9fde8a0bf71caa2ddfc9fe045ac9e5c70df101a7dbde866e0" +dependencies = [ + "bytes", + "http-body-util", + "hyper", + "hyper-util", + "native-tls", + "tokio", + "tokio-native-tls", + "tower-service", +] + [[package]] name = "hyper-util" version = "0.1.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "96547c2556ec9d12fb1578c4eaf448b04993e7fb79cbaad930a656880a6bdfa0" dependencies = [ - "base64 0.22.1", + "base64", "bytes", "futures-channel", "futures-util", @@ -853,12 +748,6 @@ dependencies = [ "unit-prefix", ] -[[package]] -name = "infer" -version = "0.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "64e9829a50b42bb782c1df523f78d332fe371b10c661e78b7a3c34b0198e9fac" - [[package]] name = "insta" version = "1.46.3" @@ -872,15 +761,6 @@ dependencies = [ "tempfile", ] -[[package]] -name = "instant" -version = "0.1.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e0242819d153cba4b4b05a5a8f2a7e9bbf97b6055b2a002b395c96b5ff3c0222" -dependencies = [ - "cfg-if", -] - [[package]] name = "ipnet" version = "2.12.0" @@ -956,7 +836,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a69bcab0ad47271a0234d9422b131806bf3968021e5dc9328caf2d4cd58557fc" dependencies = [ "libc", - "wasi 0.11.1+wasi-snapshot-preview1", + "wasi", "windows-sys 0.61.2", ] @@ -1049,12 +929,6 @@ version = "2.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba" -[[package]] -name = "paste" -version = "1.0.15" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a" - [[package]] name = "percent-encoding" version = "2.3.2" @@ -1120,15 +994,6 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" -[[package]] -name = "ppv-lite86" -version = "0.2.21" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85eae3c4ed2f50dcfe72643da4befc30deadb458a9b590d720cde2f2b1e97da9" -dependencies = [ - "zerocopy", -] - [[package]] name = "prettyplease" version = "0.2.37" @@ -1150,9 +1015,9 @@ dependencies = [ [[package]] name = "quick-xml" -version = "0.31.0" +version = "0.39.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1004a344b30a54e2ee58d66a71b32d2db2feb0a31f9a2d302bf0536f15de2a33" +checksum = "958f21e8e7ceb5a1aa7fa87fab28e7c75976e0bfe7e23ff069e0a260f894067d" dependencies = [ "memchr", "serde", @@ -1173,30 +1038,6 @@ version = "6.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8dcc9c7d52a811697d2151c701e0d08956f92b0e24136cf4cf27b57a6a0d9bf" -[[package]] -name = "rand" -version = "0.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a6b1679d49b24bbfe0c803429aa1874472f50d9b363131f0e89fc356b544d03" -dependencies = [ - "getrandom 0.1.16", - "libc", - "rand_chacha 0.2.2", - "rand_core 0.5.1", - "rand_hc", -] - -[[package]] -name = "rand" -version = "0.8.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" -dependencies = [ - "libc", - "rand_chacha 0.3.1", - "rand_core 0.6.4", -] - [[package]] name = "rand" version = "0.9.2" @@ -1207,41 +1048,14 @@ dependencies = [ ] [[package]] -name = "rand_chacha" -version = "0.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f4c8ed856279c9737206bf725bf36935d8666ead7aa69b52be55af369d193402" -dependencies = [ - "ppv-lite86", - "rand_core 0.5.1", -] - -[[package]] -name = "rand_chacha" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" -dependencies = [ - "ppv-lite86", - "rand_core 0.6.4", -] - -[[package]] -name = "rand_core" -version = "0.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "90bde5296fc891b0cef12a6d03ddccc162ce7b2aff54160af9338f8d40df6d19" -dependencies = [ - "getrandom 0.1.16", -] - -[[package]] -name = "rand_core" -version = "0.6.4" +name = "rand" +version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" +checksum = "bc266eb313df6c5c09c1c7b1fbe2510961e5bcd3add930c1e31f7ed9da0feff8" dependencies = [ - "getrandom 0.2.17", + "chacha20", + "getrandom", + "rand_core 0.10.0", ] [[package]] @@ -1251,13 +1065,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "76afc826de14238e6e8c374ddcc1fa19e374fd8dd986b0d2af0d02377261d83c" [[package]] -name = "rand_hc" -version = "0.2.0" +name = "rand_core" +version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca3129af7b92a17112d59ad498c6f81eaf463253766b90396d39ea7a39d6613c" -dependencies = [ - "rand_core 0.5.1", -] +checksum = "0c8d0fd677905edcbeedbf2edb6494d676f0e98d54d5cf9bda0b061cb8fb8aba" [[package]] name = "reqwest" @@ -1265,7 +1076,7 @@ version = "0.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ab3f43e3283ab1488b624b44b0e988d0acea0b3214e694730a055cb6b2efa801" dependencies = [ - "base64 0.22.1", + "base64", "bytes", "futures-core", "futures-util", @@ -1273,13 +1084,17 @@ dependencies = [ "http-body", "http-body-util", "hyper", + "hyper-tls", "hyper-util", "js-sys", "log", + "native-tls", "percent-encoding", "pin-project-lite", + "rustls-pki-types", "sync_wrapper", "tokio", + "tokio-native-tls", "tokio-util", "tower", "tower-http", @@ -1314,16 +1129,19 @@ dependencies = [ ] [[package]] -name = "rustversion" -version = "1.0.22" +name = "rustls-pki-types" +version = "1.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d" +checksum = "be040f8b0a225e40375822a563fa9524378b9d63112f53e19ffff34df5d33fdd" +dependencies = [ + "zeroize", +] [[package]] -name = "ryu" -version = "1.0.23" +name = "rustversion" +version = "1.0.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9774ba4a74de5f7b1c1451ed6cd5285a32eddb5cccb8cc655a4e50009e06477f" +checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d" [[package]] name = "schannel" @@ -1406,29 +1224,6 @@ dependencies = [ "zmij", ] -[[package]] -name = "serde_qs" -version = "0.8.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c7715380eec75f029a4ef7de39a9200e0a63823176b759d055b613f5a87df6a6" -dependencies = [ - "percent-encoding", - "serde", - "thiserror 1.0.69", -] - -[[package]] -name = "serde_urlencoded" -version = "0.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd" -dependencies = [ - "form_urlencoded", - "itoa", - "ryu", - "serde", -] - [[package]] name = "shlex" version = "1.3.0" @@ -1512,40 +1307,20 @@ version = "3.26.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "82a72c767771b47409d2345987fda8628641887d5466101319899796367354a0" dependencies = [ - "fastrand 2.3.0", - "getrandom 0.4.2", + "fastrand", + "getrandom", "once_cell", "rustix", "windows-sys 0.61.2", ] -[[package]] -name = "thiserror" -version = "1.0.69" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52" -dependencies = [ - "thiserror-impl 1.0.69", -] - [[package]] name = "thiserror" version = "2.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4288b5bcbc7920c07a1149a35cf9590a2aa808e0bc1eafaade0b80947865fbc4" dependencies = [ - "thiserror-impl 2.0.18", -] - -[[package]] -name = "thiserror-impl" -version = "1.0.69" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" -dependencies = [ - "proc-macro2", - "quote", - "syn", + "thiserror-impl", ] [[package]] @@ -1567,7 +1342,6 @@ checksum = "743bd48c283afc0388f9b8827b976905fb217ad9e647fae3a379a9283c4def2c" dependencies = [ "deranged", "itoa", - "js-sys", "num-conv", "powerfmt", "serde_core", @@ -1612,9 +1386,31 @@ dependencies = [ "mio", "pin-project-lite", "socket2", + "tokio-macros", "windows-sys 0.61.2", ] +[[package]] +name = "tokio-macros" +version = "2.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c55a2eff8b69ce66c84f85e1da1c233edc36ceb85a2058d11b0d6a3c7e7569c" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tokio-native-tls" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2" +dependencies = [ + "native-tls", + "tokio", +] + [[package]] name = "tokio-util" version = "0.7.18" @@ -1710,6 +1506,57 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "typespec" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b63559a2aab9c7694fa8d2658a828d6b36f1e3904b1860d820c7cc6a2ead61c7" +dependencies = [ + "base64", + "bytes", + "futures", + "quick-xml", + "serde", + "serde_json", + "url", +] + +[[package]] +name = "typespec_client_core" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "de81ecf3a175da5a10ed60344caa8b53fe6d8ce28c6c978a7e3e09ca1e1b4131" +dependencies = [ + "async-trait", + "base64", + "dyn-clone", + "futures", + "pin-project", + "rand 0.10.0", + "reqwest", + "serde", + "serde_json", + "time", + "tokio", + "tracing", + "typespec", + "typespec_macros", + "url", + "uuid", +] + +[[package]] +name = "typespec_macros" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07108c5d18e00ec7bb09d2e48df95ebfab6b7179112d1e4216e9968ac2a0a429" +dependencies = [ + "proc-macro2", + "quote", + "rustc_version", + "syn", +] + [[package]] name = "unicode-ident" version = "1.0.24" @@ -1738,7 +1585,6 @@ dependencies = [ "idna", "percent-encoding", "serde", - "serde_derive", ] [[package]] @@ -1753,9 +1599,8 @@ version = "1.22.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a68d3c8f01c0cfa54a75291d83601161799e4a89a39e0929f4b0354d88757a37" dependencies = [ - "getrandom 0.4.2", + "getrandom", "js-sys", - "serde_core", "wasm-bindgen", ] @@ -1765,12 +1610,6 @@ version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" -[[package]] -name = "waker-fn" -version = "1.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "317211a0dc0ceedd78fb2ca9a44aed3d7b9b26f81870d485c07122b4350673b7" - [[package]] name = "want" version = "0.3.1" @@ -1780,12 +1619,6 @@ dependencies = [ "try-lock", ] -[[package]] -name = "wasi" -version = "0.9.0+wasi-snapshot-preview1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519" - [[package]] name = "wasi" version = "0.11.1+wasi-snapshot-preview1" @@ -2131,26 +1964,6 @@ dependencies = [ "synstructure", ] -[[package]] -name = "zerocopy" -version = "0.8.40" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a789c6e490b576db9f7e6b6d661bcc9799f7c0ac8352f56ea20193b2681532e5" -dependencies = [ - "zerocopy-derive", -] - -[[package]] -name = "zerocopy-derive" -version = "0.8.40" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f65c489a7071a749c849713807783f70672b28094011623e200cb86dcb835953" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "zerofrom" version = "0.1.6" @@ -2172,6 +1985,12 @@ dependencies = [ "synstructure", ] +[[package]] +name = "zeroize" +version = "1.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b97154e67e32c85465826e8bcc1c59429aaaf107c1e4a9e53c8d8ccd5eff88d0" + [[package]] name = "zerotrie" version = "0.2.3" diff --git a/Cargo.toml b/Cargo.toml index bbc9d2a..f456fdc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "avml" -version = "0.17.0" +version = "0.18.0" license = "MIT" description = "A portable volatile memory acquisition tool" authors = ["avml@microsoft.com"] @@ -15,11 +15,12 @@ rust-version = "1.88.0" [features] default = ["put", "blobstore", "native-tls"] put = ["dep:reqwest", "reqwest?/stream", "dep:url", "dep:tokio", "dep:tokio-util"] -blobstore = ["dep:url", "dep:azure_core", "dep:azure_storage_blobs", "dep:tokio", "dep:tokio-util", "dep:async-channel"] +blobstore = ["dep:url", "dep:azure_core", "dep:azure_storage_blob", "dep:tokio", "dep:tokio-util", "dep:async-trait"] status = ["dep:indicatif"] -native-tls = ["dep:native-tls"] +native-tls = ["dep:native-tls", "azure_core?/reqwest_native_tls", "reqwest?/native-tls-vendored"] [dependencies] +async-trait = {version="0.1", optional=true} byteorder = "1.5" bytes = "1.11" clap = {version="4.6", default-features=false, features=["derive", "std", "usage", "error-context", "help"]} @@ -30,9 +31,8 @@ snap = "1.1" thiserror = "2.0" libc = "0.2" -async-channel = {version="2.5", optional=true} -azure_core = {version="0.21", optional=true, default-features=false} -azure_storage_blobs = {version="0.21", optional=true, default-features=false} +azure_core = {version="0.33", optional=true, default-features=false, features=["reqwest", "tokio"]} +azure_storage_blob = {version="0.10", optional=true, default-features=false} indicatif = {version="0.18", optional=true, default-features=false} native-tls = {version="0.2", features=["vendored"], optional=true, default-features=false} reqwest = {version="0.13", optional=true, default-features=false} diff --git a/README.md b/README.md index 66123f2..31fc360 100644 --- a/README.md +++ b/README.md @@ -140,10 +140,10 @@ Options: upload via Azure Blob Store upon acquisition --sas-block-size - specify maximum block size in MiB + specify maximum block size in MiB; must be greater than 0 --sas-block-concurrency - specify blob upload concurrency + specify blob upload concurrency; must be greater than 0 [default: 10] diff --git a/eng/build.sh b/eng/build.sh index f3f1fb5..d27ce5c 100755 --- a/eng/build.sh +++ b/eng/build.sh @@ -11,6 +11,7 @@ cd $(dirname ${BASH_SOURCE[0]})/../ ARCH=$(uname -m) cargo +stable test --release --target ${ARCH}-unknown-linux-musl --locked --all-targets --all-features +cargo +stable test --release --target ${ARCH}-unknown-linux-musl --locked --doc --all-features for FEATURE in $(cargo metadata --locked --format-version 1 | jq '.packages | [.[] | select(.name=="avml")][0].features | keys | @tsv' -r); do cargo +stable check --release --target ${ARCH}-unknown-linux-musl --locked --no-default-features --features ${FEATURE} --features native-tls done diff --git a/src/bin/avml-upload.rs b/src/bin/avml-upload.rs index b4835c3..3e5e217 100644 --- a/src/bin/avml-upload.rs +++ b/src/bin/avml-upload.rs @@ -7,9 +7,9 @@ #![deny(clippy::manual_assert)] #![deny(clippy::indexing_slicing)] -use avml::{BlobUploader, DEFAULT_CONCURRENCY, Error, Result, put}; +use avml::{BlobUploader, Error, Result, put}; use clap::{Parser, Subcommand}; -use std::path::PathBuf; +use std::{num::NonZeroUsize, path::PathBuf}; use tokio::runtime::Runtime; use url::Url; @@ -37,13 +37,13 @@ enum Commands { /// url to upload via Azure Blob Storage url: Url, - /// specify blob upload concurrency - #[arg(long, default_value_t=DEFAULT_CONCURRENCY)] - sas_block_concurrency: usize, + /// specify blob upload concurrency; must be greater than 0 + #[arg(long)] + sas_block_concurrency: Option, - /// specify maximum block size in MiB + /// specify maximum block size in MiB; must be greater than 0 #[arg(long)] - sas_block_size: Option, + sas_block_size: Option, }, } diff --git a/src/bin/avml.rs b/src/bin/avml.rs index 5274d6e..6217d9e 100644 --- a/src/bin/avml.rs +++ b/src/bin/avml.rs @@ -11,6 +11,8 @@ use avml::Error; use avml::{Result, Snapshot, Source, iomem}; use clap::Parser; +#[cfg(feature = "blobstore")] +use std::num::NonZeroUsize; use std::{num::NonZeroU64, ops::Range, path::PathBuf}; #[cfg(any(feature = "blobstore", feature = "put"))] use tokio::{fs::remove_file, runtime::Runtime}; @@ -52,15 +54,15 @@ struct Config { #[arg(long)] sas_url: Option, - /// specify maximum block size in MiB + /// specify maximum block size in MiB; must be greater than 0 #[cfg(feature = "blobstore")] #[arg(long)] - sas_block_size: Option, + sas_block_size: Option, - /// specify blob upload concurrency + /// specify blob upload concurrency; must be greater than 0 #[cfg(feature = "blobstore")] - #[arg(long, default_value_t=avml::DEFAULT_CONCURRENCY)] - sas_block_concurrency: usize, + #[arg(long)] + sas_block_concurrency: Option, /// name of the file to write to on local system filename: PathBuf, diff --git a/src/upload/blobstore.rs b/src/upload/blobstore.rs index d44db5b..50829ca 100644 --- a/src/upload/blobstore.rs +++ b/src/upload/blobstore.rs @@ -1,17 +1,27 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // Licensed under the MIT License. -use crate::{ONE_MB, upload::status::Status}; -use async_channel::{Receiver, Sender, bounded}; -use azure_core::error::Error as AzureError; -use azure_storage_blobs::prelude::{BlobBlockType, BlobClient, BlockId, BlockList}; -use bytes::Bytes; -use core::cmp; -use futures::future::try_join_all; -use std::path::Path; +use crate::ONE_MB; +use crate::upload::status::Status; +use azure_core::{ + Bytes, + error::Error as AzureError, + http::{Body, NoFormat, RequestContent}, + stream::{DEFAULT_BUFFER_SIZE, SeekableStream}, +}; +use azure_storage_blob::{BlobClient, models::BlobClientUploadOptions}; +use core::{ + cmp, + future::Future, + num::NonZeroUsize, + pin::Pin, + task::{Context, Poll}, +}; +use std::{io::SeekFrom, path::Path, sync::Arc}; use tokio::{ fs::File, - io::{AsyncRead, AsyncReadExt as _}, + io::{AsyncSeekExt as _, ReadBuf}, + sync::{Mutex, OwnedMutexGuard}, }; use url::Url; @@ -20,12 +30,6 @@ pub enum Error { #[error("file is too large")] TooLarge, - #[error("unable to queue block for upload")] - QueueBlock(#[from] async_channel::SendError), - - #[error("uploading blocks failed")] - UploadFromQueue(#[source] tokio::task::JoinError), - #[error("error reading file")] Io(#[from] std::io::Error), @@ -34,30 +38,44 @@ pub enum Error { #[error(transparent)] IntConversion(#[from] core::num::TryFromIntError), + + #[error(transparent)] + InvalidUrl(#[from] url::ParseError), } type Result = core::result::Result; +#[allow(clippy::expect_used)] +const ONE_MB_NZ: NonZeroUsize = NonZeroUsize::new(ONE_MB).expect("ONE_MB must be non-zero"); + /// Maximum number of blocks /// -/// -const BLOB_MAX_BLOCKS: usize = 50_000; +/// +#[allow(clippy::expect_used)] +const BLOB_MAX_BLOCKS: NonZeroUsize = + NonZeroUsize::new(50_000).expect("blob max blocks must be non-zero"); /// Maximum size of any single block /// -/// -const BLOB_MAX_BLOCK_SIZE: usize = ONE_MB.saturating_mul(4000); +/// +#[allow(clippy::expect_used)] +const BLOB_MAX_BLOCK_SIZE: NonZeroUsize = ONE_MB_NZ.saturating_mul( + NonZeroUsize::new(4000).expect("blob max block size multiplier must be non-zero"), +); /// Maximum total size of a file /// /// -const BLOB_MAX_FILE_SIZE: usize = BLOB_MAX_BLOCKS.saturating_mul(BLOB_MAX_BLOCK_SIZE); +#[allow(clippy::expect_used)] +const BLOB_MAX_FILE_SIZE: NonZeroUsize = BLOB_MAX_BLOCKS.saturating_mul(BLOB_MAX_BLOCK_SIZE); /// Minimum block size, which is required to trigger the "high-throughput block /// blobs" feature on all storage accounts /// /// -const BLOB_MIN_BLOCK_SIZE: usize = ONE_MB.saturating_mul(5); +#[allow(clippy::expect_used)] +const BLOB_MIN_BLOCK_SIZE: NonZeroUsize = ONE_MB_NZ + .saturating_mul(NonZeroUsize::new(5).expect("blob min block size multiplier must be non-zero")); /// Azure's default max request rate for a storage account is 20,000 per second. /// By keeping to 10 or fewer concurrent upload threads, AVML can be used to @@ -65,7 +83,9 @@ const BLOB_MIN_BLOCK_SIZE: usize = ONE_MB.saturating_mul(5); /// VM scaleset) to a single default storage account. /// /// -const MAX_CONCURRENCY: usize = 10; +#[allow(clippy::expect_used)] +const MAX_CONCURRENCY: NonZeroUsize = + NonZeroUsize::new(10).expect("max concurrency must be non-zero"); /// Azure's default max request rate for a storage account is 20,000 per second. /// By keeping to 10 or fewer concurrent upload threads, AVML can be used to @@ -73,102 +93,211 @@ const MAX_CONCURRENCY: usize = 10; /// VM scaleset) to a single default storage account. /// /// -pub const DEFAULT_CONCURRENCY: usize = 10; - -/// As chunks stay in memory until the upload is complete, as to enable -/// automatic retries in the case of TCP or HTTP errors, chunks sizes for huge -/// files is capped to 100MB each -const REASONABLE_BLOCK_SIZE: usize = ONE_MB.saturating_mul(100); - -/// try to keep under 500MB in flight. If that's not possible due to block -/// size, concurrency will get disabled. -const MEMORY_THRESHOLD: usize = 500 * ONE_MB; - -/// When uploading a file without a size, such as when uploading a stream of an -/// unknown size, use a 1TB stream -const DEFAULT_FILE_SIZE: usize = 1024 * 1024 * 1024 * 1024; +#[allow(clippy::expect_used)] +pub const DEFAULT_CONCURRENCY: NonZeroUsize = + NonZeroUsize::new(10).expect("default concurrency must be non-zero"); + +/// Keep at most 500MB of block data in flight across all uploaders. +#[allow(clippy::expect_used)] +const MEMORY_THRESHOLD: NonZeroUsize = ONE_MB_NZ + .saturating_mul(NonZeroUsize::new(500).expect("memory threshold multiplier must be non-zero")); + +fn calc_block_size( + file_size: NonZeroUsize, + block_size: Option, +) -> Result { + let block_size = match block_size { + Some(block_size) => block_size, + None => NonZeroUsize::new(file_size.get().div_ceil(BLOB_MAX_BLOCKS.get())) + .ok_or(Error::TooLarge)?, + }; -pub struct UploadBlock { - id: Bytes, - data: Bytes, + Ok(cmp::min( + cmp::max(block_size, BLOB_MIN_BLOCK_SIZE), + BLOB_MAX_BLOCK_SIZE, + )) } fn calc_concurrency( - file_size: usize, - block_size: Option, - upload_concurrency: usize, -) -> Result<(usize, usize)> { + file_size: NonZeroUsize, + block_size: Option, + upload_concurrency: Option, +) -> Result<(NonZeroUsize, NonZeroUsize)> { if file_size > BLOB_MAX_FILE_SIZE { return Err(Error::TooLarge); } - - let block_size = match block_size { - // if the user specifies a block size of 0 or doesn't specify a block size, - // calculate the block size based on the file size - Some(0) | None => { - match file_size { - // if the file is small enough to fit with 5MB blocks, use that - // to reduce impact for failure retries and increase - // concurrency. - x if (x < BLOB_MIN_BLOCK_SIZE * BLOB_MAX_BLOCKS) => BLOB_MIN_BLOCK_SIZE, - // if the file is large enough that we can fit with 100MB blocks, use that. - x if (x < REASONABLE_BLOCK_SIZE * BLOB_MAX_BLOCKS) => REASONABLE_BLOCK_SIZE, - // otherwise, just use the smallest block size that will fit - // within MAX BLOCKS to reduce memory pressure - x => (x / BLOB_MAX_BLOCKS) - .checked_add(1) - .ok_or(Error::TooLarge)?, - } - } - // minimum required to hit high-throughput block blob performance thresholds - Some(x) if (x <= BLOB_MIN_BLOCK_SIZE) => BLOB_MIN_BLOCK_SIZE, - // otherwise use the user specified value - Some(x) => x, - }; - - // if the block size is larger than the max block size, use the max block size - let block_size = usize::min(block_size, BLOB_MAX_BLOCK_SIZE); - - let upload_concurrency = match upload_concurrency { - // manually specifying concurrency of 0 will disable concurrency - 0 | 1 => 1, - _ => match (MEMORY_THRESHOLD).checked_div(block_size) { - None | Some(0) => 1, - // cap the number of concurrent threads to reduce concurrency issues - // at the server end. - Some(x) => cmp::min(MAX_CONCURRENCY, x), - }, + let block_size = calc_block_size(file_size, block_size)?; + + let memory_limited_concurrency = match NonZeroUsize::new( + MEMORY_THRESHOLD + .get() + .checked_div(block_size.get()) + .unwrap_or(0), + ) { + Some(concurrency) => concurrency, + None => NonZeroUsize::MIN, }; + let upload_concurrency = cmp::min( + cmp::min( + upload_concurrency.unwrap_or(DEFAULT_CONCURRENCY), + memory_limited_concurrency, + ), + MAX_CONCURRENCY, + ); Ok((block_size, upload_concurrency)) } -/// Concurrently upload a Stream/File to an Azure Blob Store using a SAS URL. +fn upload_parameters( + file_size: NonZeroUsize, + block_size: Option, + upload_concurrency: Option, +) -> Result<(NonZeroUsize, NonZeroUsize)> { + let block_size = block_size.map(|x| x.saturating_mul(ONE_MB_NZ)); + calc_concurrency(file_size, block_size, upload_concurrency) +} + +/// A seekable file-backed stream used for blob uploads. +/// +/// `FileStream` is `Clone` because the Azure core body/retry machinery may need +/// to duplicate the stream. All clones share the same underlying file handle +/// and read cursor state via `Arc>`. +/// +/// Invariants: +/// - At most one clone is actively reading from the stream at any time. +/// - All reads go through the shared `read_state` so that the current cursor +/// position is coordinated between clones. +#[derive(Clone)] +struct FileStream { + handle: Arc>, + stream_size: u64, + buffer_size: usize, + read_state: Arc>, + status: Status, +} + +impl core::fmt::Debug for FileStream { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + f.debug_struct("FileStream") + .field("stream_size", &self.stream_size) + .field("buffer_size", &self.buffer_size) + .finish_non_exhaustive() + } +} + +impl FileStream { + async fn new(handle: File, buffer_size: usize) -> Result { + let stream_size = handle.metadata().await?.len(); + let handle = Arc::new(Mutex::new(handle)); + + Ok(Self { + handle, + stream_size, + buffer_size, + read_state: Arc::new(Mutex::new(ReadState::default())), + status: Status::new(Some(stream_size)), + }) + } +} + +type FileLockFuture = Pin> + Send>>; + +#[derive(Default)] +enum ReadState { + #[default] + Idle, + Locking(FileLockFuture), + Locked(OwnedMutexGuard), +} + +#[async_trait::async_trait] +impl SeekableStream for FileStream { + async fn reset(&mut self) -> azure_core::Result<()> { + *self.read_state.lock().await = ReadState::Idle; + let mut handle = self.handle.clone().lock_owned().await; + handle.seek(SeekFrom::Start(0)).await?; + self.status.reset(); + Ok(()) + } + + fn len(&self) -> usize { + self.stream_size.try_into().unwrap_or(usize::MAX) + } + + fn buffer_size(&self) -> usize { + self.buffer_size + } +} + +impl futures::io::AsyncRead for FileStream { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + slice: &mut [u8], + ) -> Poll> { + let this = self.get_mut(); + let Ok(mut state) = this.read_state.try_lock() else { + // Another task is currently holding `read_state`; yield and try again later + cx.waker().wake_by_ref(); + return Poll::Pending; + }; + + loop { + match *state { + ReadState::Idle => { + *state = ReadState::Locking(Box::pin(this.handle.clone().lock_owned())); + } + ReadState::Locking(ref mut lock_future) => { + match Future::poll(Pin::as_mut(lock_future), cx) { + Poll::Ready(guard) => *state = ReadState::Locked(guard), + Poll::Pending => return Poll::Pending, + } + } + ReadState::Locked(ref mut guard) => { + let mut read_buf = ReadBuf::new(slice); + + return match tokio::io::AsyncRead::poll_read( + Pin::new(&mut **guard), + cx, + &mut read_buf, + ) { + Poll::Ready(Ok(())) => { + let len = read_buf.filled().len(); + this.status.inc(len); + Poll::Ready(Ok(len)) + } + Poll::Ready(Err(err)) => Poll::Ready(Err(err)), + Poll::Pending => Poll::Pending, + }; + } + } + } + } +} + +/// Upload a file to Azure Blob Store using a SAS URL. /// /// ```rust,no_run /// use avml::BlobUploader; -/// # use url::Url; /// # use avml::Result; -/// # use std::path::Path; +/// # use std::{num::NonZeroUsize, path::Path}; +/// # use url::Url; /// # async fn upload() -> Result<()> { /// let sas_url = Url::parse("https://contoso.com/container_name/blob_name?sas_token_here=1") /// .expect("url parsing failed"); /// let path = Path::new("/tmp/image.lime"); /// let uploader = BlobUploader::new(&sas_url)? -/// .block_size(Some(100)) -/// .concurrency(5); +/// .block_size(NonZeroUsize::new(100)) +/// .concurrency(NonZeroUsize::new(5)); /// uploader.upload_file(&path).await?; /// # Ok(()) /// # } /// ``` #[derive(Clone)] pub struct BlobUploader { - client: BlobClient, - size: usize, - block_size: Option, - concurrency: usize, - sender: Sender, - receiver: Receiver, + client: Arc, + block_size: Option, + concurrency: Option, } impl BlobUploader { @@ -178,57 +307,38 @@ impl BlobUploader { /// Returns an error if: /// - The URL cannot be parsed as a valid Azure SAS URL pub fn new(sas: &Url) -> Result { - let blob_client = BlobClient::from_sas_url(sas)?; + let blob_client = BlobClient::from_url(sas.clone(), None, None)?; Ok(Self::with_blob_client(blob_client)) } - /// Create a ``BlobUploader`` with a ``BlobClient`` from ``azure_storage_blobs``. + /// Create a ``BlobUploader`` with a ``BlobClient`` from ``azure_storage_blob``. /// - /// Ref: + /// Ref: #[must_use] pub fn with_blob_client(client: BlobClient) -> Self { - let (sender, receiver) = bounded::(1); - Self { - client, - size: DEFAULT_FILE_SIZE, + client: Arc::new(client), block_size: None, - concurrency: DEFAULT_CONCURRENCY, - sender, - receiver, + concurrency: None, } } - /// Specify the size of the file to upload (in bytes) - /// - /// If the anticipated upload size is not specified, the maximum file - /// uploaded will be approximately 5TB. + /// Specify a positive block size in multiples of 1MB. #[must_use] - pub fn size(self, size: usize) -> Self { - Self { size, ..self } - } - - /// Specify the block size in multiples of 1MB - /// - /// If the block size is not specified and the size of the content to be - /// uploaded is provided, the default block size will be calculated to fit - /// within the bounds of the allowed number of blocks and the minimum - /// minimum required to hit high-throughput block blob performance - /// thresholds. - #[must_use] - pub fn block_size(self, block_size: Option) -> Self { + pub fn block_size(self, block_size: Option) -> Self { Self { block_size, ..self } } + /// Specify a positive upload concurrency. #[must_use] - pub fn concurrency(self, concurrency: usize) -> Self { + pub fn concurrency(self, concurrency: Option) -> Self { Self { concurrency, ..self } } - /// Upload a file to Azure Blob Store using a fully qualified SAS token + /// Upload a file to Azure Blob Store using a fully qualified SAS token. /// /// # Errors /// Returns an error if: @@ -236,190 +346,177 @@ impl BlobUploader { /// - The file size cannot be converted to a usize /// - The file is too large for Azure Blob Storage /// - There is a failure during the upload process - /// - There is a failure finalizing the block list - pub async fn upload_file(mut self, filename: &Path) -> Result<()> { + pub async fn upload_file(self, filename: &Path) -> Result<()> { let file = File::open(filename).await?; let file_size = file.metadata().await?.len().try_into()?; + let Some(file_size) = NonZeroUsize::new(file_size) else { + return Ok(()); + }; + let (block_size, uploaders_count) = + upload_parameters(file_size, self.block_size, self.concurrency)?; - self.size = file_size; - - self.upload_stream(file).await - } - - async fn finalize(self, block_ids: Vec) -> Result<()> { - let blocks = block_ids - .into_iter() - .map(|x| BlobBlockType::Uncommitted(BlockId::new(x))) - .collect::>(); + let stream = FileStream::new(file, DEFAULT_BUFFER_SIZE).await?; + let stream: Box = Box::new(stream); + let content: RequestContent = Body::from(stream).into(); - let block_list = BlockList { blocks }; + let options = BlobClientUploadOptions { + parallel: Some(uploaders_count), + partition_size: Some(block_size), + ..Default::default() + }; - self.client.put_block_list(block_list).await?; + self.client.upload(content, Some(options)).await?; Ok(()) } +} - /// upload a stream to Azure Blob Store using a fully qualified SAS token - async fn upload_stream(self, handle: R) -> Result<()> - where - R: AsyncRead + Unpin + Send, - { - let block_size = self.block_size.map(|x| x.saturating_mul(ONE_MB)); - - let (block_size, uploaders_count) = - calc_concurrency(self.size, block_size, self.concurrency)?; +#[cfg(test)] +mod tests { + use super::*; + use futures::AsyncReadExt as _; + use std::time::{SystemTime, UNIX_EPOCH}; - let uploaders = self.uploaders(uploaders_count); - let queue_handle = self.block_reader(handle, block_size); + fn non_zero(value: usize) -> Result { + NonZeroUsize::new(value).ok_or(Error::TooLarge) + } - let (block_list, ()) = futures::try_join!(queue_handle, uploaders)?; + fn bytes_from_mib(mebibytes: usize) -> Result { + non_zero(mebibytes.checked_mul(ONE_MB).ok_or(Error::TooLarge)?) + } - self.finalize(block_list).await + fn bytes_from_gib(gibibytes: usize) -> Result { + bytes_from_mib(gibibytes.checked_mul(1024).ok_or(Error::TooLarge)?) } - async fn uploaders(&self, count: usize) -> Result<()> { - let status = Status::new(Some(self.size.try_into()?)); + #[test] + fn small_files_use_minimum_block_size_and_default_concurrency() -> Result<()> { + let (block_size, concurrency) = upload_parameters(bytes_from_mib(400)?, None, None)?; + + assert_eq!(block_size, bytes_from_mib(5)?); + assert_eq!(concurrency, DEFAULT_CONCURRENCY); + Ok(()) + } - let uploaders: Vec<_> = (0..usize::max(1, count)) - .map(|_| { - Self::block_uploader(self.client.clone(), self.receiver.clone(), status.clone()) - }) - .collect(); + #[test] + fn user_block_size_is_clamped_to_minimum() -> Result<()> { + let (block_size, concurrency) = + upload_parameters(bytes_from_mib(300)?, Some(NonZeroUsize::MIN), None)?; - try_join_all(uploaders).await?; + assert_eq!(block_size, bytes_from_mib(5)?); + assert_eq!(concurrency, DEFAULT_CONCURRENCY); + Ok(()) + } + #[test] + fn requested_concurrency_caps_memory_limited_uploaders() -> Result<()> { + let (block_size, concurrency) = upload_parameters( + bytes_from_gib(30)?, + Some(non_zero(100)?), + Some(non_zero(3)?), + )?; + + assert_eq!(block_size, bytes_from_mib(100)?); + assert_eq!(concurrency, non_zero(3)?); Ok(()) } - async fn block_reader(&self, mut handle: R, block_size: usize) -> Result> - where - R: AsyncRead + Unpin + Send, - { - let mut block_list = vec![]; + #[test] + fn auto_block_size_grows_when_minimum_would_exceed_max_blocks() -> Result<()> { + let file_size = non_zero( + bytes_from_mib(5)? + .get() + .checked_mul(50_000) + .ok_or(Error::TooLarge)? + .checked_add(1) + .ok_or(Error::TooLarge)?, + )?; + let expected_block_size = non_zero(file_size.get().div_ceil(50_000))?; + let (block_size, concurrency) = upload_parameters(file_size, None, None)?; + + assert_eq!(block_size, expected_block_size); + assert_eq!(concurrency, DEFAULT_CONCURRENCY); + Ok(()) + } - for i in 0..usize::MAX { - let mut data = Vec::with_capacity(block_size); + #[test] + fn huge_blocks_still_use_at_least_one_uploader() -> Result<()> { + let (block_size, concurrency) = + upload_parameters(bytes_from_gib(30)?, Some(non_zero(600)?), None)?; - let mut take_handle = handle.take(block_size.try_into().unwrap_or(u64::MAX)); - let read_data = take_handle.read_to_end(&mut data).await?; - if read_data == 0 { - break; - } - handle = take_handle.into_inner(); + assert_eq!(block_size, bytes_from_mib(600)?); + assert_eq!(concurrency, NonZeroUsize::MIN); + Ok(()) + } - if data.is_empty() { - break; - } + #[test] + fn files_larger_than_azure_limit_are_rejected() -> Result<()> { + let oversized_file = non_zero( + bytes_from_mib(50_000usize.checked_mul(4000).ok_or(Error::TooLarge)?)? + .get() + .checked_add(1) + .ok_or(Error::TooLarge)?, + )?; + + assert!(matches!( + upload_parameters(oversized_file, None, None), + Err(Error::TooLarge) + )); + Ok(()) + } - let data = data.into(); + #[tokio::test] + async fn test_file_stream_reset() -> Result<()> { + let path = std::env::temp_dir().join(format!( + "avml-blob-upload-{}-{}.bin", + std::process::id(), + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_nanos() + )); + let expected = b"seekable stream content"; + tokio::fs::write(&path, expected).await?; - let id = Bytes::from(format!("{i:032x}")); + let result = async { + let file = File::open(&path).await?; + let mut stream = FileStream::new(file, 8).await?; - block_list.push(id.clone()); + assert_eq!(stream.len(), expected.len()); - self.sender.send(UploadBlock { id, data }).await?; - } - self.sender.close(); + let mut prefix = [0_u8; 8]; + stream.read_exact(&mut prefix).await?; + assert_eq!(&prefix, b"seekable"); - Ok(block_list) - } + stream.reset().await?; - async fn block_uploader( - client: BlobClient, - receiver: Receiver, - status: Status, - ) -> Result<()> { - // the channel will respond with an Err to indicate the channel is closed - while let Ok(upload_chunk) = receiver.recv().await { - let chunk_len = upload_chunk.data.len(); - - let result = client.put_block(upload_chunk.id, upload_chunk.data).await; - - // as soon as any error is seen (after retrying), bail out and stop other uploaders - if result.is_err() { - receiver.close(); - result?; - } + let mut reread = Vec::new(); + stream.read_to_end(&mut reread).await?; + assert_eq!(reread, expected); - status.inc(chunk_len); + Result::<()>::Ok(()) } + .await; - Ok(()) + let _ = tokio::fs::remove_file(&path).await; + result } -} - -#[cfg(test)] -mod tests { - use super::*; - - const ONE_GB: usize = ONE_MB.saturating_mul(1024); - const ONE_TB: usize = ONE_GB.saturating_mul(1024); - #[test] - fn test_calc_concurrency() -> Result<()> { - assert_eq!( - (BLOB_MIN_BLOCK_SIZE, 10), - calc_concurrency(ONE_MB * 300, Some(1), DEFAULT_CONCURRENCY)?, - "specified blocksize would overflow block count, so we use the minimum block size" - ); - - assert_eq!( - (BLOB_MIN_BLOCK_SIZE, 10), - calc_concurrency(ONE_GB * 30, Some(ONE_MB), DEFAULT_CONCURRENCY)?, - "30GB file, 1MB blocks" - ); - - assert_eq!( - (ONE_MB * 100, 5), - calc_concurrency(ONE_GB * 30, Some(ONE_MB * 100), DEFAULT_CONCURRENCY)?, - "30GB file, 100MB block size" - ); - - assert_eq!( - (5 * ONE_MB, 10), - calc_concurrency(ONE_MB * 400, None, DEFAULT_CONCURRENCY)?, - "400MB file, no block size" - ); - - assert_eq!( - (5 * ONE_MB, 10), - calc_concurrency(ONE_GB * 16, None, DEFAULT_CONCURRENCY)?, - "16GB file, no block size" - ); - - assert_eq!( - (5 * ONE_MB, 10), - calc_concurrency(ONE_GB * 32, None, DEFAULT_CONCURRENCY)?, - "32GB file, no block size", - ); - - assert_eq!( - (ONE_MB * 100, 5), - calc_concurrency(ONE_TB, None, DEFAULT_CONCURRENCY)?, - "1TB file, no block size" - ); - - assert_eq!( - (100 * ONE_MB, 5), - calc_concurrency(ONE_TB * 4, None, DEFAULT_CONCURRENCY)?, - "4TB file, no block size" - ); - - assert_eq!( - (100 * ONE_MB, 5), - calc_concurrency(ONE_TB * 4, Some(0), DEFAULT_CONCURRENCY)?, - "4TB file, zero block size" - ); - - let (block_size, uploaders_count) = - calc_concurrency(ONE_TB.saturating_mul(32), None, DEFAULT_CONCURRENCY)?; - assert!(block_size > REASONABLE_BLOCK_SIZE && block_size < BLOB_MAX_BLOCK_SIZE); - assert_eq!(uploaders_count, 1); - - assert!( - calc_concurrency((BLOB_MAX_BLOCKS * BLOB_MAX_BLOCK_SIZE) + 1, None, 10).is_err(), - "files beyond max size should fail" - ); + #[tokio::test] + async fn test_upload_file_empty_is_noop() -> Result<()> { + let path = std::env::temp_dir().join(format!( + "avml-empty-blob-upload-{}-{}.bin", + std::process::id(), + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_nanos() + )); + tokio::fs::write(&path, []).await?; + + let url = Url::parse("https://127.0.0.1:9/container/blob?sig=test")?; + BlobUploader::new(&url)?.upload_file(&path).await?; + let _ = tokio::fs::remove_file(&path).await; Ok(()) } } diff --git a/src/upload/status.rs b/src/upload/status.rs index 130b759..194558f 100644 --- a/src/upload/status.rs +++ b/src/upload/status.rs @@ -30,6 +30,13 @@ impl Status { Self { bar, total } } + #[cfg(feature = "blobstore")] + pub fn reset(&self) { + if let Some(ref bar) = self.bar { + bar.reset(); + } + } + pub fn inc(&self, n: usize) { if let Some(ref bar) = self.bar { bar.inc(n.try_into().unwrap_or(u64::MAX)); @@ -51,4 +58,8 @@ impl Status { } #[allow(clippy::unused_self)] pub fn inc(&self, _n: usize) {} + + #[cfg(feature = "blobstore")] + #[allow(clippy::unused_self)] + pub fn reset(&self) {} }