From b6fefaaeea812c87b9b1d883f1a3d872a09adb20 Mon Sep 17 00:00:00 2001 From: sergerad Date: Thu, 4 Jun 2026 14:07:56 +1200 Subject: [PATCH 1/9] Add slow sync strike out logic --- crates/block-producer/src/lib.rs | 2 +- crates/rpc/src/server/api.rs | 3 + .../samples/02-with-account-files/bridge.mac | Bin 37762 -> 37762 bytes crates/store/src/state/subscription.rs | 113 +++++++++++++----- crates/utils/src/lib.rs | 5 + 5 files changed, 89 insertions(+), 34 deletions(-) diff --git a/crates/block-producer/src/lib.rs b/crates/block-producer/src/lib.rs index 35691a6e6..c9f460b50 100644 --- a/crates/block-producer/src/lib.rs +++ b/crates/block-producer/src/lib.rs @@ -64,7 +64,7 @@ const SERVER_MEMPOOL_EXPIRATION_SLACK: u32 = 2; const CACHED_MEMPOOL_STATS_UPDATE_INTERVAL: Duration = Duration::from_secs(5); /// How often a block is created. -pub const DEFAULT_BLOCK_INTERVAL: Duration = Duration::from_secs(3); +pub use miden_node_utils::DEFAULT_BLOCK_INTERVAL; /// How often a batch is created. pub const DEFAULT_BATCH_INTERVAL: Duration = Duration::from_secs(1); diff --git a/crates/rpc/src/server/api.rs b/crates/rpc/src/server/api.rs index 45dc2a79c..115fb9344 100644 --- a/crates/rpc/src/server/api.rs +++ b/crates/rpc/src/server/api.rs @@ -1209,6 +1209,9 @@ fn state_subscription_error_to_status(err: StateSubscriptionError) -> Status { "failed to load proof for block {block_num}: {}", source.as_report() )), + StateSubscriptionError::TooSlow => { + Status::resource_exhausted("subscriber is too slow to keep up with the chain") + }, } } diff --git a/crates/store/src/genesis/config/samples/02-with-account-files/bridge.mac b/crates/store/src/genesis/config/samples/02-with-account-files/bridge.mac index 6437e37b2a98f85a7cf5d6faa21d534ec0f49f01..8412cfe5cb431e2e9b554f126ab3e959d5d57864 100644 GIT binary patch delta 3925 zcmai%_dnE+1Av{86Fy32)*E+Zg>1*Cv*#gu#o3(59u;@?%(%13A$xPqiV!YYA(;&s zSvi>*rSJDY`27C7o}Zs5x{5NoiZY{)={R?|k+zA&Ke^Vnqt)5Fr8^A7RIw?sHolTmnrI+Y7v<7wvm^xl3%DN7W;CUVM^=HzCZl z9}NGV^vOG@sPVVcE+X|3N@}@K98kaHN6x>Zr6n;wMz3X${E3g&`9nZC6E#H6 zi9nE0MZHEG_?~zxM zx;F3wM0;&1=waqryj;S4$z#4jcl%Q%;ttSoz`6$06@m)lmQuuO|5emc$NwykV{&7Qw*{OK`Y3=>Wi>9dri8%uqq;*qz%!;xsLRyPGyMDoZa_X+fA?E;G)n;1rQAd<=9pzLAkzm43mcV$gciP>0^SI$#(8%9pbF^uNS|i%S zN~;L(V5~=k+}IX4kX_QG<2#2x1SNrM;~Ng@Y{oZ6^xTq6op*OhIEDg}-*!DuVft`n zA{Zq)Qu^JI^@e>?s+^FkXfSC$Rb)!m67rp2#SqrvERfLm;QTalUqpPLyiNr7^p)!K$Y}EhILqw%eowdlce6*?TzQ`_@nF~Ai>2DA+lUDC z#-yF!Wx4Yccm479SX74ix!lN`a1f+(sn4>1lgZNOGuGU81j9|5S1h;W#|tK-5?96vl*gFKp?RM!A`=M9i{xF%O03ML;ow-a>po z0T<>0hac{4RSj|lTo@~zQu;Yjaoi>kjXCTjf}25KwHhA=Xt{)bi00sx5A!Hc%FQgi zcTMi?{z1RWS*+5Ld`Zvh!W!<(0|4N}8UZ;ReG3!kJihbU#UR}|FoWp2)oO=*(Px|r z#Nre^0f0_u<8>OSC2PA?e08H!&Y?S)oMhlPaL)DH*a^b=}0eIlTzCfyIDiOq1 zs8FwEl~w94a~@~mSg)ru%FkhXb^`)QaCvE7Ybh)8f+l3BX;q`w_qM{XzqAc&zL+k= z>NVWL|0O!f&LEAGtOTl0`Ryy850e;pR-RMU>s9$FcJDNR`Mbjr;s+j^;PaJZMiOc~ z7a7h~gxC5g&xVN(4s6i+DTh<^0+2o*#4EK=jyVhpyv)+rvV|_Dlw#yhHLS;ODXL2# z5hV^~gq{QL+d45}Yme2VHG?ZXvldh5L9QS*=GGDwX%WrZqS3JUaI=l8&q3Q3gIse% zY%2`wm64MH+*NlY_GECKUMoNg?d;pccgNfdtr~VnlJH%V*vg%qE*IW06TY4g56zWu zov0P0ZOk7T@^r-Y5y^^iqL1!}vShA1rc{{ZYv%ur z_vNa;fx}7{j&9d+NdyG-GL6buWW#Fje6H$?`R9|}qN9f5PN7|?FR|Y@;+w)v+?jU? zTII{hfl>c0MWfKb(t>VV>GjXk0phr858>RJ;5mCK>XqL*95APR>SQXN!khp(oETeO zh9(#LR<&QnOPP0D1rRZD1YNPPZkvMGbt(%^s{S@Veazf5=&yx<`chvwjnqo<$Lt;Q zw#4^EGH(s7zL7QawFd{0%C&{{zKyv^eq$xkJc({dnm&GQ-|_S~^~%x)eE(OX`ZtTm zc<#xDOn-u+4*&)-V}gE1!J17|Qs!5>)7+wF20bb0Icey>dk+F*Bp4wrDIQIQyPLkC68aLxVI+SeMe?=s$?nmY=M#Swymj zf;C!i`=8JXi?}pq~%}Hk-7UIalHu%hWr_fPp+xVIkd_CjbF{{HXEX8ND4XV zhHTT>?~QDIxZA3BW6qnxE-ECpM8=A_j)?&m77f6&qFg3H3I*e=^rNTg$YIR3cyQn` zBt09t=I$m2g8hkJe9yekP$#feqqWd9tsocd3j)V2L<30eBPc2| zc@_-;rgoK}NR?IBL_ z%1kE0iy%d&WRu_uV)b$MTVh4{o1x9ELiHsVX{{`BaG+#fxEIbk20*)L^OZZK9N4hK z-mo^=U&r!Eah0AJyDQT$`{)ldXrgM|F$|-W>6L3oPjhx@)>AbNQS={)KL%BG#hRRmS3?XwU^itzJn ze(IY)5Q4>x9eO4$<$3SsBf7;fuWJ%iGNQrI3F`_^r~GIQCW?Wjs@{@*{4*rhl3KX4 zU4gL0^9L>GEcj_o=QU;QD0(X0{cHWbWjiCD$A%TN5X`HW2odyYai6w=9&e?hzeP}H z=+E$U>Yd3alVnb-%4<_3~ZrGV+qhEC}cz!eT|zp5xtmpa@E# zmxK7|a#p!v4b&9a=zuLiQ^jm|VB_9nMB{!MgR}{1lQAfttB+S4*wkPNCuvqq2O?T(ADG=8k&t=5Ko>wOzAA zLFwQvg?YqgBcEnUj{l_lUQlG-Q_xc!M*@Hjyy7;8Q>*`4{>b1v_=(>|#gIFw)YOjO zS6VN!DJIQi_mx~mRiobKP}*Ehki(~>w2NTNU-z;Ih&VQ*lEqNT*$zc1^(4>9WozK} z;1ii=s4`$A-H%H6v@#9y$~pG)^c5(crCr;Dl*ifkt}M^wAB1MF1Cy)fPQwdDNt;5@ z9Ii6~K$59d?FI*Es+Lm^=0-VA+Ni=T#F&{^ClU@5uu~R}L#08{)_yYiAypmMC%)sG zhtgkQ{?_2iPv(ZEs~(<$hErN1qJAPz^T%2qNLHvMvfFmm60_gYQ>hf$>3Zb+I+`20dB>x` zHyjz}R5OzWtsTYh^V#=mP<&ZaL0wRbA^Fo{vOPj=`0IG%`^{^cH*3bV+su&@9d z3ljGmuao!^R7wtAQ&9EnP~k(_B!$o4@~gsL`p36@TwIQ z)s3LrK)Hu!5H%>x&3Zm_l^f81OmE18-|($^X~Zi~yVT+xuPwy7_wp-52HfDA?&_q| z;VD^ky<+e0i1tpu$nOg;>?smSqz9z91F4sO7QTA`EA=Ty(irkvR&r`mz2 z@NyE_HpYbNhFjk{{U0|oV`ssr(6ymOyXcx(ITmZvK9N_nUTz2L%6F2-%>27%AI-?G zY~=2rS>RK|rnO}J%E_|EF!|>alA-*hz;E|oXCHRP%C^uyj^rpX+8q5h)9E#)F^hrXhGggI(B zrX^UNTsLOTB#ot=zjEQf)(WSH)BhjA|8k=M delta 3925 zcmai%_dnE+1II~LU1s^nd^?<#aoHoC5E&Uo&d$ipJi_7Zv)6sB!dcmbGcw91dqqfA zX54Xg)){?2|H1e7=i~M3^Yx4(6rkZK*Pi ze25D$?xKU8?%Y%$z zY`SXedO!^=YjaOi4{jhnR_=jg0xV##RSmG7%%3?k!$z*$!qR(^Vz-A#iPEDBINR|j z^_{RTb&!R+-2K#BBsP7yW(Itkl`X?_&F{c(V+ic$$g$iZ+NmMDs(A2={SPj=|p4BS)I*- z*jm5H;tS}Z-QKfYuW8^8p;MN4F$jC)HD%C1BWZnQjmfqIU_aqiOaeZI4(gOS2_Y{z*uCn_8@Ek-$>4;hW0?`Mdk&MS%%4;P($_g#uZLvg2; z03E8ld5Q`D7TeDfqB(i9AeaB!JJZ1J^L_{-%*t1%X28Y9N0=14RUgT-v;d2+-ZXg@ zvr4<*utsllZZLJ7ht0|>+%`M~OdZ`{Dt##5BzIe^da3N!;oMI0P@LDytJuGX6>f83 zxvP=+E5RRarn@)0kOvtN%HJDbq}A}Q_m!g9?*D12{nwNJ?k4euyhoq)$}O%{gu#!B zyKLE1Uhk>x!AV@~YbKIQ zY;61NdTri-CvN{(+b%+ik{=m|m%4rw>etTHhg?Gu$|G%AVLzbl65Vo09b|aJZ%k zK)XGl*o@&W)5~Do*8Sg=FLfJS57qhL`Kj9eZC}X7Y+F_F?=bVP0ARpCPU7zRGW9o# z{+XXz2Q%vQ*R?MdMVLBmOl7{f8Q?MOPB7pR*d=C~E+v8RuR@E0p4zzGI>A}kp~4{~P-e`q`=xNt7sao#gezmeENpk_>q+sRCvNcy~E+>Q28 z$LS<&w|)$=vvXRj9T65*Z$d8Hi}1~0PMrXNz8Z}mHM#c$>x0mGhddZEA9fX3`c%TG zcJ9BOVBS)MQE&zT8ZDc|XKa&w!3AGZ4zL`mjP~m5QbGP)^@cE3cTk9KD-DV3$SrZcd1J?u{P`vYgE8bQ znc-t+wET5Z5zPF5BcKLpS|s-JxFlVr?{B-H{2l&OceI=DuQ{saEz{ zX*J+w!m<*|e>|`(A+I1?z>Zr%&~6x=V|Y);{<9O=G=+OP3Y__9fJZZL{hzUsK?&LAHL zBIeYs0OfHddv`fEFlQH(^#OL4qL?v0LHYqoHlg5v4cW%Sw*=?T>K51yy6b&UGFv-h zE<~i;h#5j&H1NfHiqBkVf@aNCPsUi?^Za;xUT$7nHfb;eJ&(on)D zJ>Y%zjXj<;(2S_CQtE@E&PL$^@B>KStYFPJpUs`au?V@R{uu|y;+N?hmtqWfMgGy7 zqP}g(=6)hb(i~3VH9!Y&Q|e#*Sjzqr&}mj)8uHAtwi)b`%X)Is$~@9ts3+r49NIOiQZi)>Pd-9FS=;If0msBNvL#K;60^4;e8MI}p8 znaCaG^d&ShiDceFHIi`N?DFr2;w80xzus}#y9%b1ucPRTr7_Ksx3fZsTi(o%&{i`; zj&q1NT|q8rA&KYS1E4!u;;Vgx{Iu{m>4935r4=S10)X`!Dy4e_drrA|s(gtmnaHkF z&}2wi@fGY*X^%F-N*I{3xi^2yMjk&SV)wDxg4UlIH`%{xmP`gV8KkWNms^HyUmDkAL?Zw{ytmG$>Tol8|8gt5a>*=lrfj_+VFA*=e& zwK_9=zmtxL&anIt)4q&WNtcKIRgZ33Cs=mvqBc^1K4JDj@eN~TTv9+g#4 z0!m#?M7DP32_U~wPUkwAo3p2cO8<4bCE|4vWg^~uaDsb~#uS~D_0C34+7j9yE8#ik z115+(j#+hUvbZnPtA9GUk*4L{mhz)y`SlmOke&l0pBMfg7V+=CB0iuz@zBL`-)j(r zcRlGG61uT{2>ucm`;+-tC7nrr?LA%ajXGLJApGU?QA-d5fwzbzh zu|4tH4tmPPLcIXH!^Z4apjQ_1pfK zT7o!3hl54-1%Y)twgRIN0{|~&`|g-ExE4qV@!3OVaCevjg@%0@{suA(sm6yeenMjc zVgS?_bON!S^VL?Tlr>JU;gd4Kx}cCE6M=O2{qyh0L&?}pid%E^YOhZUFQ%kw#>Je) zH&nE$G8dLCKmt!kj=?X@d>oY)!rCpBJrnQ|k-=?{$ZtpG13fw(^aUbqW>Pee7`8QG z@uS&KY5a53ac08wh8H(7Bff*i|297LS{kQR!my(O)H-bi=Iwb|zg|3l6~j>+cMh=) z$KovA4(<~8$(Q^iWZR1>LQrwWU!za%4Zd&l_=Yw8+ii)H65E$ zXiq|Wb-SQg8(+{G5f@NLc$Q@LZGVjcNGrtpr}R!zx35-x{-_#lKMObBye7k(j!TYm zG!R0Gzg zVEC6_4yZ^{r-IX(b$)&QXLh02&dg5IfAMj5?In{cz_1|vh9t4op3kGcOCw5FYfiG2 z#3{svQKB$U+f95z$kt<*f%e5&QYlS*#YGn7L>S%Wo-4ClQo|75mTQ)P(#|HAf5&38JUqE&xCaS;Oil826z{_?G@UwMjZ2q|4@aq$t;RR@l?Hbz0w z`fk{Wx$xMG_gE>6Sv-JB!yC&A_s=#et-q1gr{nkp#_nBk!VXKTnAqRfmTV)YTB}Y$ z8c+kD7X5|bvI2b(YklG$Uj3%^J>*$3%$<4SG*h~!F#It%tDNSrbek6wY#%OhQY^7yaQG@ zFDv29NLHc0ACRgsCRU4~w0SG-<%O~?%Ej(-=WRE`JCn(1+Fyi3!42*8{7(-aJ6s~V z@@GGNr>9-3zQH)jCvzise)Ez}YuT;sxX@6Kk3$j;l8~Z2JAI zt-D021Vm^Y%23o+^X z?o_yom7auIP`H z^qYI|)ZD=cmqnj;xxjlR*sECHXV_Bydt!D=;Nh!qQHk5mLU^MOjvpQ*A>1>C3NtD# zh5L>SskkCKG)_GjC*FH#x87)|VGVlpYCtshk4CBZQ8YW9mcp+D*5MA_C}KN$1qPol zEqCO>%_oLY!A8NVrkZk-32L5_sqV^4@3Qn>^|7MAC|GvgQ%PYUzfu+Sb1J(n, + tip_rx: watch::Receiver, state: Arc, tx: &mpsc::Sender>, ) -> Result<(), StateSubscriptionError> { - let mut next = from; - loop { - let mut tip = *tip_rx.borrow_and_update(); - while next <= tip { - let block = fetch_block(next, &cache, &state).await?; - tip = *tip_rx.borrow_and_update(); - if tx - .send(Ok(BlockSubscriptionEvent { block, committed_chain_tip: tip })) - .await - .is_err() - { - return Ok(()); - } - next = next.child(); - } - if tip_rx.changed().await.is_err() { - return Ok(()); - } - } + run_stream( + from, + tip_rx, + tx, + |block_num| { + let cache = cache.clone(); + let state = Arc::clone(&state); + async move { fetch_block(block_num, &cache, &state).await } + }, + |_, block, committed_chain_tip| BlockSubscriptionEvent { + block, + committed_chain_tip, + }, + ) + .await } async fn run_proof_stream( from: BlockNumber, cache: ProofCache, - mut tip_rx: watch::Receiver, + tip_rx: watch::Receiver, state: Arc, tx: &mpsc::Sender>, ) -> Result<(), StateSubscriptionError> { + run_stream( + from, + tip_rx, + tx, + |block_num| { + let cache = cache.clone(); + let state = Arc::clone(&state); + async move { fetch_proof(block_num, &cache, &state).await } + }, + |block_num, proof, proven_chain_tip| ProofSubscriptionEvent { + block_num, + proof, + proven_chain_tip, + }, + ) + .await +} + +/// Drives a generic subscription stream, replaying history then following live tip advances. +/// +/// Calls `fetch` for each block in sequence starting from `from`, builds an event with +/// `build_event(block_num, data, tip)`, and sends it to `tx`. Disconnects the subscriber +/// with [`StateSubscriptionError::TooSlow`] if sending blocks for [`MAX_SLOW_STRIKES`] +/// consecutive [`SEND_TIMEOUT`] intervals. +async fn run_stream( + from: BlockNumber, + mut tip_rx: watch::Receiver, + tx: &mpsc::Sender>, + fetch: F, + build_event: impl Fn(BlockNumber, Vec, BlockNumber) -> E, +) -> Result<(), StateSubscriptionError> +where + F: Fn(BlockNumber) -> Fut, + Fut: Future, StateSubscriptionError>>, +{ let mut next = from; + let mut slow_strikes = 0u32; loop { let mut tip = *tip_rx.borrow_and_update(); while next <= tip { - let proof = fetch_proof(next, &cache, &state).await?; + let data = fetch(next).await?; tip = *tip_rx.borrow_and_update(); - if tx - .send(Ok(ProofSubscriptionEvent { - block_num: next, - proof, - proven_chain_tip: tip, - })) - .await - .is_err() - { - return Ok(()); - } + let permit = loop { + match tokio::time::timeout(SEND_TIMEOUT, tx.reserve()).await { + Ok(Ok(permit)) => { + slow_strikes = 0; + break permit; + } + Ok(Err(_)) => return Ok(()), + Err(_) => { + slow_strikes += 1; + if slow_strikes >= MAX_SLOW_STRIKES { + return Err(StateSubscriptionError::TooSlow); + } + } + } + }; + permit.send(Ok(build_event(next, data, tip))); next = next.child(); } if tip_rx.changed().await.is_err() { diff --git a/crates/utils/src/lib.rs b/crates/utils/src/lib.rs index 96e81b49a..4b96787aa 100644 --- a/crates/utils/src/lib.rs +++ b/crates/utils/src/lib.rs @@ -1,4 +1,9 @@ pub mod block_cache; + +/// The default block production interval. +/// +/// Used as a timing reference across crates (e.g. subscription send timeouts). +pub const DEFAULT_BLOCK_INTERVAL: std::time::Duration = std::time::Duration::from_secs(3); pub mod clap; pub mod cors; pub mod crypto; From a042c4f2f347d7c3f8b3629ab5cb6a3d0a4356c0 Mon Sep 17 00:00:00 2001 From: sergerad Date: Thu, 4 Jun 2026 14:13:35 +1200 Subject: [PATCH 2/9] Add const for capacity --- crates/store/src/state/subscription.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/crates/store/src/state/subscription.rs b/crates/store/src/state/subscription.rs index d94d2e9fa..f0114e6f5 100644 --- a/crates/store/src/state/subscription.rs +++ b/crates/store/src/state/subscription.rs @@ -13,6 +13,8 @@ use tokio_stream::wrappers::ReceiverStream; use super::{BlockCache, ProofCache, State}; use crate::errors::DatabaseError; +/// Buffered messages per subscriber before back-pressure begins and slow-strike timeouts apply. +const SUBSCRIBER_CHANNEL_CAPACITY: usize = 32; /// How long to wait for a subscriber to accept a message before counting a strike. const SEND_TIMEOUT: Duration = DEFAULT_BLOCK_INTERVAL; /// Number of consecutive send timeouts before a subscriber is considered too slow and disconnected. @@ -95,7 +97,7 @@ fn build_block_stream( tip_rx: watch::Receiver, state: Arc, ) -> impl Stream> + Send + 'static { - let (tx, rx) = mpsc::channel(32); + let (tx, rx) = mpsc::channel(SUBSCRIBER_CHANNEL_CAPACITY); tokio::spawn(async move { if let Err(err) = run_block_stream(from, cache, tip_rx, state, &tx).await { let _ = tx.send(Err(err)).await; @@ -110,7 +112,7 @@ fn build_proof_stream( tip_rx: watch::Receiver, state: Arc, ) -> impl Stream> + Send + 'static { - let (tx, rx) = mpsc::channel(32); + let (tx, rx) = mpsc::channel(SUBSCRIBER_CHANNEL_CAPACITY); tokio::spawn(async move { if let Err(err) = run_proof_stream(from, cache, tip_rx, state, &tx).await { let _ = tx.send(Err(err)).await; From f002548204daa9667ea94ba15be7ea6ea28cdda8 Mon Sep 17 00:00:00 2001 From: sergerad Date: Thu, 4 Jun 2026 14:18:29 +1200 Subject: [PATCH 3/9] Lint --- crates/store/src/state/subscription.rs | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/crates/store/src/state/subscription.rs b/crates/store/src/state/subscription.rs index f0114e6f5..afd3258db 100644 --- a/crates/store/src/state/subscription.rs +++ b/crates/store/src/state/subscription.rs @@ -140,10 +140,7 @@ async fn run_block_stream( let state = Arc::clone(&state); async move { fetch_block(block_num, &cache, &state).await } }, - |_, block, committed_chain_tip| BlockSubscriptionEvent { - block, - committed_chain_tip, - }, + |_, block, committed_chain_tip| BlockSubscriptionEvent { block, committed_chain_tip }, ) .await } @@ -202,14 +199,14 @@ where Ok(Ok(permit)) => { slow_strikes = 0; break permit; - } + }, Ok(Err(_)) => return Ok(()), Err(_) => { slow_strikes += 1; if slow_strikes >= MAX_SLOW_STRIKES { return Err(StateSubscriptionError::TooSlow); } - } + }, } }; permit.send(Ok(build_event(next, data, tip))); From 15ab59c4f64ae0de272d840344b04d91d1179480 Mon Sep 17 00:00:00 2001 From: sergerad Date: Thu, 4 Jun 2026 17:06:32 +1200 Subject: [PATCH 4/9] Add SubscriptionSource trait --- crates/store/src/state/subscription.rs | 200 +++++++++++-------------- 1 file changed, 85 insertions(+), 115 deletions(-) diff --git a/crates/store/src/state/subscription.rs b/crates/store/src/state/subscription.rs index afd3258db..cf9e0599b 100644 --- a/crates/store/src/state/subscription.rs +++ b/crates/store/src/state/subscription.rs @@ -68,131 +68,131 @@ impl State { /// Streams committed blocks starting from `from`, replaying historical blocks first and then /// following live commits. pub fn block_subscription(self: &Arc, from: BlockNumber) -> BlockSubscriptionStream { - Box::pin(build_block_stream( + Box::pin(build_stream( from, - self.block_cache.clone(), self.subscribe_committed_tip(), - Arc::clone(self), + BlockSource { cache: self.block_cache.clone(), state: Arc::clone(self) }, )) } /// Streams block proofs starting from `from`, replaying historical proofs first and then /// following newly proven blocks. pub fn proof_subscription(self: &Arc, from: BlockNumber) -> ProofSubscriptionStream { - Box::pin(build_proof_stream( + Box::pin(build_stream( from, - self.proof_cache.clone(), self.subscribe_proven_tip(), - Arc::clone(self), + ProofSource { cache: self.proof_cache.clone(), state: Arc::clone(self) }, )) } } -// STREAM BUILDERS +// SUBSCRIPTION SOURCE // ================================================================================================ -fn build_block_stream( - from: BlockNumber, +trait SubscriptionSource: Send + Sync + 'static { + type Event: Send + 'static; + + fn fetch( + &self, + block_num: BlockNumber, + ) -> impl Future, StateSubscriptionError>> + Send + '_; + + fn build_event(&self, block_num: BlockNumber, data: Vec, tip: BlockNumber) -> Self::Event; +} + +struct BlockSource { cache: BlockCache, - tip_rx: watch::Receiver, state: Arc, -) -> impl Stream> + Send + 'static { - let (tx, rx) = mpsc::channel(SUBSCRIBER_CHANNEL_CAPACITY); - tokio::spawn(async move { - if let Err(err) = run_block_stream(from, cache, tip_rx, state, &tx).await { - let _ = tx.send(Err(err)).await; +} + +impl SubscriptionSource for BlockSource { + type Event = BlockSubscriptionEvent; + + async fn fetch(&self, block_num: BlockNumber) -> Result, StateSubscriptionError> { + if let Some(entry) = self.cache.get(block_num) { + return Ok(entry.block_bytes().to_vec()); } - }); - ReceiverStream::new(rx) + self.state + .load_block(block_num) + .await + .map_err(|source| StateSubscriptionError::BlockLoad { block_num, source })? + .ok_or(StateSubscriptionError::BlockNotFound(block_num)) + } + + fn build_event( + &self, + _block_num: BlockNumber, + block: Vec, + committed_chain_tip: BlockNumber, + ) -> BlockSubscriptionEvent { + BlockSubscriptionEvent { block, committed_chain_tip } + } } -fn build_proof_stream( - from: BlockNumber, +struct ProofSource { cache: ProofCache, - tip_rx: watch::Receiver, state: Arc, -) -> impl Stream> + Send + 'static { - let (tx, rx) = mpsc::channel(SUBSCRIBER_CHANNEL_CAPACITY); - tokio::spawn(async move { - if let Err(err) = run_proof_stream(from, cache, tip_rx, state, &tx).await { - let _ = tx.send(Err(err)).await; - } - }); - ReceiverStream::new(rx) } -// STREAM TASKS -// ================================================================================================ +impl SubscriptionSource for ProofSource { + type Event = ProofSubscriptionEvent; -async fn run_block_stream( - from: BlockNumber, - cache: BlockCache, - tip_rx: watch::Receiver, - state: Arc, - tx: &mpsc::Sender>, -) -> Result<(), StateSubscriptionError> { - run_stream( - from, - tip_rx, - tx, - |block_num| { - let cache = cache.clone(); - let state = Arc::clone(&state); - async move { fetch_block(block_num, &cache, &state).await } - }, - |_, block, committed_chain_tip| BlockSubscriptionEvent { block, committed_chain_tip }, - ) - .await + async fn fetch(&self, block_num: BlockNumber) -> Result, StateSubscriptionError> { + if let Some(entry) = self.cache.get(block_num) { + return Ok(entry.proof_bytes().to_vec()); + } + self.state + .load_proof(block_num) + .await + .map_err(|source| StateSubscriptionError::ProofLoad { block_num, source })? + .ok_or(StateSubscriptionError::ProofNotFound(block_num)) + } + + fn build_event( + &self, + block_num: BlockNumber, + proof: Vec, + proven_chain_tip: BlockNumber, + ) -> ProofSubscriptionEvent { + ProofSubscriptionEvent { block_num, proof, proven_chain_tip } + } } -async fn run_proof_stream( +// STREAM +// ================================================================================================ + +fn build_stream( from: BlockNumber, - cache: ProofCache, tip_rx: watch::Receiver, - state: Arc, - tx: &mpsc::Sender>, -) -> Result<(), StateSubscriptionError> { - run_stream( - from, - tip_rx, - tx, - |block_num| { - let cache = cache.clone(); - let state = Arc::clone(&state); - async move { fetch_proof(block_num, &cache, &state).await } - }, - |block_num, proof, proven_chain_tip| ProofSubscriptionEvent { - block_num, - proof, - proven_chain_tip, - }, - ) - .await + source: S, +) -> impl Stream> + Send + 'static { + let (tx, rx) = mpsc::channel(SUBSCRIBER_CHANNEL_CAPACITY); + tokio::spawn(async move { + if let Err(err) = run_stream(from, tip_rx, &tx, source).await { + let _ = tx.send(Err(err)).await; + } + }); + ReceiverStream::new(rx) } /// Drives a generic subscription stream, replaying history then following live tip advances. /// -/// Calls `fetch` for each block in sequence starting from `from`, builds an event with -/// `build_event(block_num, data, tip)`, and sends it to `tx`. Disconnects the subscriber -/// with [`StateSubscriptionError::TooSlow`] if sending blocks for [`MAX_SLOW_STRIKES`] -/// consecutive [`SEND_TIMEOUT`] intervals. -async fn run_stream( +/// Calls [`SubscriptionSource::fetch`] for each block in sequence starting from `from`, builds an +/// event with [`SubscriptionSource::build_event`], and sends it to `tx`. Disconnects the +/// subscriber with [`StateSubscriptionError::TooSlow`] if sending blocks for +/// [`MAX_SLOW_STRIKES`] consecutive [`SEND_TIMEOUT`] intervals. +async fn run_stream( from: BlockNumber, mut tip_rx: watch::Receiver, - tx: &mpsc::Sender>, - fetch: F, - build_event: impl Fn(BlockNumber, Vec, BlockNumber) -> E, -) -> Result<(), StateSubscriptionError> -where - F: Fn(BlockNumber) -> Fut, - Fut: Future, StateSubscriptionError>>, -{ + tx: &mpsc::Sender>, + source: S, +) -> Result<(), StateSubscriptionError> { let mut next = from; let mut slow_strikes = 0u32; loop { let mut tip = *tip_rx.borrow_and_update(); while next <= tip { - let data = fetch(next).await?; + let data = source.fetch(next).await?; tip = *tip_rx.borrow_and_update(); let permit = loop { match tokio::time::timeout(SEND_TIMEOUT, tx.reserve()).await { @@ -209,7 +209,7 @@ where }, } }; - permit.send(Ok(build_event(next, data, tip))); + permit.send(Ok(source.build_event(next, data, tip))); next = next.child(); } if tip_rx.changed().await.is_err() { @@ -217,33 +217,3 @@ where } } } - -async fn fetch_block( - block_num: BlockNumber, - cache: &BlockCache, - state: &State, -) -> Result, StateSubscriptionError> { - if let Some(entry) = cache.get(block_num) { - return Ok(entry.block_bytes().to_vec()); - } - state - .load_block(block_num) - .await - .map_err(|source| StateSubscriptionError::BlockLoad { block_num, source })? - .ok_or(StateSubscriptionError::BlockNotFound(block_num)) -} - -async fn fetch_proof( - block_num: BlockNumber, - cache: &ProofCache, - state: &State, -) -> Result, StateSubscriptionError> { - if let Some(entry) = cache.get(block_num) { - return Ok(entry.proof_bytes().to_vec()); - } - state - .load_proof(block_num) - .await - .map_err(|source| StateSubscriptionError::ProofLoad { block_num, source })? - .ok_or(StateSubscriptionError::ProofNotFound(block_num)) -} From f5b443877ca57b25e999c7a716409f769469f51b Mon Sep 17 00:00:00 2001 From: sergerad Date: Fri, 5 Jun 2026 12:29:54 +1200 Subject: [PATCH 5/9] Use block limit logic --- crates/block-producer/src/lib.rs | 2 +- crates/store/src/state/subscription.rs | 51 +++++++++++++------------- crates/utils/src/lib.rs | 4 -- 3 files changed, 27 insertions(+), 30 deletions(-) diff --git a/crates/block-producer/src/lib.rs b/crates/block-producer/src/lib.rs index c9f460b50..35691a6e6 100644 --- a/crates/block-producer/src/lib.rs +++ b/crates/block-producer/src/lib.rs @@ -64,7 +64,7 @@ const SERVER_MEMPOOL_EXPIRATION_SLACK: u32 = 2; const CACHED_MEMPOOL_STATS_UPDATE_INTERVAL: Duration = Duration::from_secs(5); /// How often a block is created. -pub use miden_node_utils::DEFAULT_BLOCK_INTERVAL; +pub const DEFAULT_BLOCK_INTERVAL: Duration = Duration::from_secs(3); /// How often a batch is created. pub const DEFAULT_BATCH_INTERVAL: Duration = Duration::from_secs(1); diff --git a/crates/store/src/state/subscription.rs b/crates/store/src/state/subscription.rs index cf9e0599b..cb078dfc4 100644 --- a/crates/store/src/state/subscription.rs +++ b/crates/store/src/state/subscription.rs @@ -3,7 +3,6 @@ use std::pin::Pin; use std::sync::Arc; use std::time::Duration; -use miden_node_utils::DEFAULT_BLOCK_INTERVAL; use miden_protocol::block::BlockNumber; use thiserror::Error; use tokio::sync::{mpsc, watch}; @@ -13,12 +12,12 @@ use tokio_stream::wrappers::ReceiverStream; use super::{BlockCache, ProofCache, State}; use crate::errors::DatabaseError; -/// Buffered messages per subscriber before back-pressure begins and slow-strike timeouts apply. +/// Buffered messages per subscriber before back-pressure begins. const SUBSCRIBER_CHANNEL_CAPACITY: usize = 32; -/// How long to wait for a subscriber to accept a message before counting a strike. -const SEND_TIMEOUT: Duration = DEFAULT_BLOCK_INTERVAL; -/// Number of consecutive send timeouts before a subscriber is considered too slow and disconnected. -const MAX_SLOW_STRIKES: u32 = 5; +/// Number of blocks beyond the smallest gap observed so far before a subscriber is disconnected. +const MAX_SLOW_GAP: u32 = 100; +/// Safety-net timeout for a single send when the client has stalled. +const SEND_TIMEOUT: Duration = Duration::from_secs(10); // SUBSCRIPTION EVENTS // ================================================================================================ @@ -71,7 +70,10 @@ impl State { Box::pin(build_stream( from, self.subscribe_committed_tip(), - BlockSource { cache: self.block_cache.clone(), state: Arc::clone(self) }, + BlockSource { + cache: self.block_cache.clone(), + state: Arc::clone(self), + }, )) } @@ -81,7 +83,10 @@ impl State { Box::pin(build_stream( from, self.subscribe_proven_tip(), - ProofSource { cache: self.proof_cache.clone(), state: Arc::clone(self) }, + ProofSource { + cache: self.proof_cache.clone(), + state: Arc::clone(self), + }, )) } } @@ -179,8 +184,9 @@ fn build_stream( /// /// Calls [`SubscriptionSource::fetch`] for each block in sequence starting from `from`, builds an /// event with [`SubscriptionSource::build_event`], and sends it to `tx`. Disconnects the -/// subscriber with [`StateSubscriptionError::TooSlow`] if sending blocks for -/// [`MAX_SLOW_STRIKES`] consecutive [`SEND_TIMEOUT`] intervals. +/// subscriber with [`StateSubscriptionError::TooSlow`] if the gap between the tip and the next +/// block to send exceeds the minimum gap ever observed plus [`MAX_SLOW_GAP`], or if a single send +/// blocks for longer than [`SEND_TIMEOUT`] (safety net for a stalled client). async fn run_stream( from: BlockNumber, mut tip_rx: watch::Receiver, @@ -188,26 +194,21 @@ async fn run_stream( source: S, ) -> Result<(), StateSubscriptionError> { let mut next = from; - let mut slow_strikes = 0u32; + let mut min_gap = u32::MAX; loop { let mut tip = *tip_rx.borrow_and_update(); while next <= tip { + let gap = tip.as_u32() - next.as_u32(); + min_gap = min_gap.min(gap); + if gap > min_gap + MAX_SLOW_GAP { + return Err(StateSubscriptionError::TooSlow); + } let data = source.fetch(next).await?; tip = *tip_rx.borrow_and_update(); - let permit = loop { - match tokio::time::timeout(SEND_TIMEOUT, tx.reserve()).await { - Ok(Ok(permit)) => { - slow_strikes = 0; - break permit; - }, - Ok(Err(_)) => return Ok(()), - Err(_) => { - slow_strikes += 1; - if slow_strikes >= MAX_SLOW_STRIKES { - return Err(StateSubscriptionError::TooSlow); - } - }, - } + let permit = match tokio::time::timeout(SEND_TIMEOUT, tx.reserve()).await { + Ok(Ok(permit)) => permit, + Ok(Err(_)) => return Ok(()), + Err(_) => return Err(StateSubscriptionError::TooSlow), }; permit.send(Ok(source.build_event(next, data, tip))); next = next.child(); diff --git a/crates/utils/src/lib.rs b/crates/utils/src/lib.rs index 4b96787aa..840de3887 100644 --- a/crates/utils/src/lib.rs +++ b/crates/utils/src/lib.rs @@ -1,9 +1,5 @@ pub mod block_cache; -/// The default block production interval. -/// -/// Used as a timing reference across crates (e.g. subscription send timeouts). -pub const DEFAULT_BLOCK_INTERVAL: std::time::Duration = std::time::Duration::from_secs(3); pub mod clap; pub mod cors; pub mod crypto; From 173419382cd45ca5406beafa7977492445a25fa5 Mon Sep 17 00:00:00 2001 From: sergerad Date: Fri, 5 Jun 2026 12:34:59 +1200 Subject: [PATCH 6/9] RM whitespace --- crates/utils/src/lib.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/utils/src/lib.rs b/crates/utils/src/lib.rs index 840de3887..96e81b49a 100644 --- a/crates/utils/src/lib.rs +++ b/crates/utils/src/lib.rs @@ -1,5 +1,4 @@ pub mod block_cache; - pub mod clap; pub mod cors; pub mod crypto; From 409dd2d9a96a84bca5722cf6598e5d1cf8c45834 Mon Sep 17 00:00:00 2001 From: sergerad Date: Fri, 5 Jun 2026 12:42:26 +1200 Subject: [PATCH 7/9] Changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index b721b86b1..45018e2bb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -57,6 +57,7 @@ - Persisted attachments of private output notes when applying a block, so they are now returned by `GetNotesById` ([#2172](https://github.com/0xMiden/node/pull/2172)). - [BREAKING] `miden-ntx-builder` now requires a remote transaction prover to be configured ([#2179](https://github.com/0xMiden/node/pull/2179)). - [BREAKING] Replaced `StoreStatus` with `chain_tip` field in `RpcStatus` ([#2187](https://github.com/0xMiden/node/pull/2187)). +- Added logic to disconnect slow block and proof stream clients ([#2196](https://github.com/0xMiden/node/pull/2196)). ## v0.14.11 (TBD) From ef7f63c3336d1ab5581a381b385440ad5d168125 Mon Sep 17 00:00:00 2001 From: sergerad Date: Mon, 8 Jun 2026 10:39:30 +1200 Subject: [PATCH 8/9] RM gap logic --- crates/store/src/state/subscription.rs | 13 ++----------- 1 file changed, 2 insertions(+), 11 deletions(-) diff --git a/crates/store/src/state/subscription.rs b/crates/store/src/state/subscription.rs index cb078dfc4..700b605da 100644 --- a/crates/store/src/state/subscription.rs +++ b/crates/store/src/state/subscription.rs @@ -14,8 +14,6 @@ use crate::errors::DatabaseError; /// Buffered messages per subscriber before back-pressure begins. const SUBSCRIBER_CHANNEL_CAPACITY: usize = 32; -/// Number of blocks beyond the smallest gap observed so far before a subscriber is disconnected. -const MAX_SLOW_GAP: u32 = 100; /// Safety-net timeout for a single send when the client has stalled. const SEND_TIMEOUT: Duration = Duration::from_secs(10); @@ -184,9 +182,8 @@ fn build_stream( /// /// Calls [`SubscriptionSource::fetch`] for each block in sequence starting from `from`, builds an /// event with [`SubscriptionSource::build_event`], and sends it to `tx`. Disconnects the -/// subscriber with [`StateSubscriptionError::TooSlow`] if the gap between the tip and the next -/// block to send exceeds the minimum gap ever observed plus [`MAX_SLOW_GAP`], or if a single send -/// blocks for longer than [`SEND_TIMEOUT`] (safety net for a stalled client). +/// subscriber with [`StateSubscriptionError::TooSlow`] if a single send blocks for longer than [`SEND_TIMEOUT`] +/// which may occur only after the buffer has [`SUBSCRIBER_CHANNEL_CAPACITY`] blocks queued. async fn run_stream( from: BlockNumber, mut tip_rx: watch::Receiver, @@ -194,15 +191,9 @@ async fn run_stream( source: S, ) -> Result<(), StateSubscriptionError> { let mut next = from; - let mut min_gap = u32::MAX; loop { let mut tip = *tip_rx.borrow_and_update(); while next <= tip { - let gap = tip.as_u32() - next.as_u32(); - min_gap = min_gap.min(gap); - if gap > min_gap + MAX_SLOW_GAP { - return Err(StateSubscriptionError::TooSlow); - } let data = source.fetch(next).await?; tip = *tip_rx.borrow_and_update(); let permit = match tokio::time::timeout(SEND_TIMEOUT, tx.reserve()).await { From 7385ce58cdd2d23ed45b56e1c96baf72f2272f54 Mon Sep 17 00:00:00 2001 From: sergerad Date: Mon, 8 Jun 2026 14:10:50 +1200 Subject: [PATCH 9/9] Update docker cache line --- Dockerfile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Dockerfile b/Dockerfile index 5059c4bc5..cc14b5d4e 100644 --- a/Dockerfile +++ b/Dockerfile @@ -34,13 +34,13 @@ COPY --from=planner /app/recipe.json recipe.json # caches are fragile when concurrent CI builds race or a build is interrupted. RUN --mount=type=cache,sharing=locked,target=/usr/local/cargo/registry \ --mount=type=cache,sharing=locked,target=/usr/local/cargo/git/db \ - --mount=type=cache,sharing=locked,target=/app/target \ + --mount=type=cache,id=cargo-target-v2,sharing=locked,target=/app/target \ cargo chef cook --release --recipe-path recipe.json # Build application COPY . . RUN --mount=type=cache,sharing=locked,target=/usr/local/cargo/registry \ --mount=type=cache,sharing=locked,target=/usr/local/cargo/git/db \ - --mount=type=cache,sharing=locked,target=/app/target \ + --mount=type=cache,id=cargo-target-v2,sharing=locked,target=/app/target \ cargo build --release --locked --bin ${BIN} && \ mkdir -p /app/bin && \ cp /app/target/release/${BIN} /app/bin/${BIN}