Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ operated.
- [BREAKING] `GetAccount` can now return all storage map entries with a single request ([#2121](https://github.com/0xMiden/node/issues/2121)).
- Persisted attachments of private output notes when applying a block, so they are now returned by `GetNotesById` ([#2172](https://github.com/0xMiden/node/pull/2172)).
- [BREAKING] Replaced `StoreStatus` with `chain_tip` field in `RpcStatus` ([#2187](https://github.com/0xMiden/node/pull/2187)).
- Added logic to disconnect slow block and proof stream clients ([#2196](https://github.com/0xMiden/node/pull/2196)).

### Docker

Expand Down
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@ COPY --from=planner /app/recipe.json recipe.json
# caches are fragile when concurrent CI builds race or a build is interrupted.
RUN --mount=type=cache,sharing=locked,target=/usr/local/cargo/registry \
--mount=type=cache,sharing=locked,target=/usr/local/cargo/git/db \
--mount=type=cache,sharing=locked,target=/app/target \
--mount=type=cache,id=cargo-target-v2,sharing=locked,target=/app/target \
cargo chef cook --release --recipe-path recipe.json
# Build application
COPY . .
RUN --mount=type=cache,sharing=locked,target=/usr/local/cargo/registry \
--mount=type=cache,sharing=locked,target=/usr/local/cargo/git/db \
--mount=type=cache,sharing=locked,target=/app/target \
--mount=type=cache,id=cargo-target-v2,sharing=locked,target=/app/target \
cargo build --release --locked --bin ${BIN} && \
mkdir -p /app/bin && \
cp /app/target/release/${BIN} /app/bin/${BIN}
Expand Down
3 changes: 3 additions & 0 deletions crates/rpc/src/server/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1209,6 +1209,9 @@ fn state_subscription_error_to_status(err: StateSubscriptionError) -> Status {
"failed to load proof for block {block_num}: {}",
source.as_report()
)),
StateSubscriptionError::TooSlow => {
Status::resource_exhausted("subscriber is too slow to keep up with the chain")
},
}
}

Expand Down
204 changes: 106 additions & 98 deletions crates/store/src/state/subscription.rs
Comment thread
sergerad marked this conversation as resolved.
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;

use miden_protocol::block::BlockNumber;
use thiserror::Error;
Expand All @@ -10,6 +12,11 @@ use tokio_stream::wrappers::ReceiverStream;
use super::{BlockCache, ProofCache, State};
use crate::errors::DatabaseError;

/// Buffered messages per subscriber before back-pressure begins.
const SUBSCRIBER_CHANNEL_CAPACITY: usize = 32;
/// Safety-net timeout for a single send when the client has stalled.
const SEND_TIMEOUT: Duration = Duration::from_secs(10);

// SUBSCRIPTION EVENTS
// ================================================================================================

Expand Down Expand Up @@ -44,6 +51,8 @@ pub enum StateSubscriptionError {
},
#[error("proof for block {0} not found")]
ProofNotFound(BlockNumber),
#[error("subscriber is too slow to keep up with the chain")]
TooSlow,
}

pub type BlockSubscriptionStream =
Expand All @@ -56,148 +65,147 @@ impl State {
/// Streams committed blocks starting from `from`, replaying historical blocks first and then
/// following live commits.
pub fn block_subscription(self: &Arc<Self>, from: BlockNumber) -> BlockSubscriptionStream {
Box::pin(build_block_stream(
Box::pin(build_stream(
from,
self.block_cache.clone(),
self.subscribe_committed_tip(),
Arc::clone(self),
BlockSource {
cache: self.block_cache.clone(),
state: Arc::clone(self),
},
))
}

/// Streams block proofs starting from `from`, replaying historical proofs first and then
/// following newly proven blocks.
pub fn proof_subscription(self: &Arc<Self>, from: BlockNumber) -> ProofSubscriptionStream {
Box::pin(build_proof_stream(
Box::pin(build_stream(
from,
self.proof_cache.clone(),
self.subscribe_proven_tip(),
Arc::clone(self),
ProofSource {
cache: self.proof_cache.clone(),
state: Arc::clone(self),
},
))
}
}

// STREAM BUILDERS
// SUBSCRIPTION SOURCE
// ================================================================================================

fn build_block_stream(
from: BlockNumber,
trait SubscriptionSource: Send + Sync + 'static {
type Event: Send + 'static;

fn fetch(
&self,
block_num: BlockNumber,
) -> impl Future<Output = Result<Vec<u8>, StateSubscriptionError>> + Send + '_;

fn build_event(&self, block_num: BlockNumber, data: Vec<u8>, tip: BlockNumber) -> Self::Event;
}

struct BlockSource {
cache: BlockCache,
tip_rx: watch::Receiver<BlockNumber>,
state: Arc<State>,
) -> impl Stream<Item = Result<BlockSubscriptionEvent, StateSubscriptionError>> + Send + 'static {
let (tx, rx) = mpsc::channel(32);
tokio::spawn(async move {
if let Err(err) = run_block_stream(from, cache, tip_rx, state, &tx).await {
let _ = tx.send(Err(err)).await;
}

impl SubscriptionSource for BlockSource {
type Event = BlockSubscriptionEvent;

async fn fetch(&self, block_num: BlockNumber) -> Result<Vec<u8>, StateSubscriptionError> {
if let Some(entry) = self.cache.get(block_num) {
return Ok(entry.block_bytes().to_vec());
}
});
ReceiverStream::new(rx)
self.state
.load_block(block_num)
.await
.map_err(|source| StateSubscriptionError::BlockLoad { block_num, source })?
.ok_or(StateSubscriptionError::BlockNotFound(block_num))
}

fn build_event(
&self,
_block_num: BlockNumber,
block: Vec<u8>,
committed_chain_tip: BlockNumber,
) -> BlockSubscriptionEvent {
BlockSubscriptionEvent { block, committed_chain_tip }
}
}

fn build_proof_stream(
from: BlockNumber,
struct ProofSource {
cache: ProofCache,
tip_rx: watch::Receiver<BlockNumber>,
state: Arc<State>,
) -> impl Stream<Item = Result<ProofSubscriptionEvent, StateSubscriptionError>> + Send + 'static {
let (tx, rx) = mpsc::channel(32);
tokio::spawn(async move {
if let Err(err) = run_proof_stream(from, cache, tip_rx, state, &tx).await {
let _ = tx.send(Err(err)).await;
}

impl SubscriptionSource for ProofSource {
type Event = ProofSubscriptionEvent;

async fn fetch(&self, block_num: BlockNumber) -> Result<Vec<u8>, StateSubscriptionError> {
if let Some(entry) = self.cache.get(block_num) {
return Ok(entry.proof_bytes().to_vec());
}
});
ReceiverStream::new(rx)
self.state
.load_proof(block_num)
.await
.map_err(|source| StateSubscriptionError::ProofLoad { block_num, source })?
.ok_or(StateSubscriptionError::ProofNotFound(block_num))
}

fn build_event(
&self,
block_num: BlockNumber,
proof: Vec<u8>,
proven_chain_tip: BlockNumber,
) -> ProofSubscriptionEvent {
ProofSubscriptionEvent { block_num, proof, proven_chain_tip }
}
}

// STREAM TASKS
// STREAM
// ================================================================================================

async fn run_block_stream(
fn build_stream<S: SubscriptionSource>(
from: BlockNumber,
cache: BlockCache,
mut tip_rx: watch::Receiver<BlockNumber>,
state: Arc<State>,
tx: &mpsc::Sender<Result<BlockSubscriptionEvent, StateSubscriptionError>>,
) -> Result<(), StateSubscriptionError> {
let mut next = from;
loop {
let mut tip = *tip_rx.borrow_and_update();
while next <= tip {
let block = fetch_block(next, &cache, &state).await?;
tip = *tip_rx.borrow_and_update();
if tx
.send(Ok(BlockSubscriptionEvent { block, committed_chain_tip: tip }))
.await
.is_err()
{
return Ok(());
}
next = next.child();
}
if tip_rx.changed().await.is_err() {
return Ok(());
tip_rx: watch::Receiver<BlockNumber>,
source: S,
) -> impl Stream<Item = Result<S::Event, StateSubscriptionError>> + Send + 'static {
let (tx, rx) = mpsc::channel(SUBSCRIBER_CHANNEL_CAPACITY);
tokio::spawn(async move {
if let Err(err) = run_stream(from, tip_rx, &tx, source).await {
let _ = tx.send(Err(err)).await;
}
}
});
ReceiverStream::new(rx)
}

async fn run_proof_stream(
/// Drives a generic subscription stream, replaying history then following live tip advances.
///
/// Calls [`SubscriptionSource::fetch`] for each block in sequence starting from `from`, builds an
/// event with [`SubscriptionSource::build_event`], and sends it to `tx`. Disconnects the
/// subscriber with [`StateSubscriptionError::TooSlow`] if a single send blocks for longer than [`SEND_TIMEOUT`]
/// which may occur only after the buffer has [`SUBSCRIBER_CHANNEL_CAPACITY`] blocks queued.
async fn run_stream<S: SubscriptionSource>(
from: BlockNumber,
cache: ProofCache,
mut tip_rx: watch::Receiver<BlockNumber>,
state: Arc<State>,
tx: &mpsc::Sender<Result<ProofSubscriptionEvent, StateSubscriptionError>>,
tx: &mpsc::Sender<Result<S::Event, StateSubscriptionError>>,
source: S,
) -> Result<(), StateSubscriptionError> {
let mut next = from;
loop {
let mut tip = *tip_rx.borrow_and_update();
while next <= tip {
let proof = fetch_proof(next, &cache, &state).await?;
let data = source.fetch(next).await?;
tip = *tip_rx.borrow_and_update();
if tx
.send(Ok(ProofSubscriptionEvent {
block_num: next,
proof,
proven_chain_tip: tip,
}))
.await
.is_err()
{
return Ok(());
}
let permit = match tokio::time::timeout(SEND_TIMEOUT, tx.reserve()).await {
Ok(Ok(permit)) => permit,
Ok(Err(_)) => return Ok(()),
Err(_) => return Err(StateSubscriptionError::TooSlow),
};
permit.send(Ok(source.build_event(next, data, tip)));
next = next.child();
}
if tip_rx.changed().await.is_err() {
return Ok(());
}
}
}

async fn fetch_block(
block_num: BlockNumber,
cache: &BlockCache,
state: &State,
) -> Result<Vec<u8>, StateSubscriptionError> {
if let Some(entry) = cache.get(block_num) {
return Ok(entry.block_bytes().to_vec());
}
state
.load_block(block_num)
.await
.map_err(|source| StateSubscriptionError::BlockLoad { block_num, source })?
.ok_or(StateSubscriptionError::BlockNotFound(block_num))
}

async fn fetch_proof(
block_num: BlockNumber,
cache: &ProofCache,
state: &State,
) -> Result<Vec<u8>, StateSubscriptionError> {
if let Some(entry) = cache.get(block_num) {
return Ok(entry.proof_bytes().to_vec());
}
state
.load_proof(block_num)
.await
.map_err(|source| StateSubscriptionError::ProofLoad { block_num, source })?
.ok_or(StateSubscriptionError::ProofNotFound(block_num))
}
Loading