Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ operated.
- [BREAKING] `GetAccount` can now return all storage map entries with a single request ([#2121](https://github.com/0xMiden/node/issues/2121)).
- Persisted attachments of private output notes when applying a block, so they are now returned by `GetNotesById` ([#2172](https://github.com/0xMiden/node/pull/2172)).
- [BREAKING] Replaced `StoreStatus` with `chain_tip` field in `RpcStatus` ([#2187](https://github.com/0xMiden/node/pull/2187)).
- Added gRPC health check endpoint to node's RPC service ([#2188](https://github.com/0xMiden/node/pull/2188)).

### Docker

Expand Down
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 0 additions & 26 deletions bin/network-monitor/.env

This file was deleted.

29 changes: 0 additions & 29 deletions bin/node/.env

This file was deleted.

9 changes: 2 additions & 7 deletions bin/node/src/commands/modes.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::sync::Arc;

use anyhow::Context;
use miden_node_block_producer::{RpcSync, Sequencer};
use miden_node_block_producer::Sequencer;
use miden_node_proto::clients::{Builder, NtxBuilderClient, RpcClient, ValidatorClient};
use miden_node_rpc::{Rpc, RpcMode};
use miden_node_store::State;
Expand Down Expand Up @@ -125,21 +125,16 @@ impl FullNodeCommand {
let network_tx_auth = self.runtime.rpc.network_tx_auth()?;
let state = load_state(&runtime).await?;
let _disk_monitor = state.spawn_disk_monitor();
let sync = RpcSync {
state: Arc::clone(&state),
source_rpc: source_rpc.clone(),
};

let rpc = Rpc {
listener: bind_rpc(runtime.rpc_listen).await?,
store: state,
mode: RpcMode::full_node(source_rpc),
mode: RpcMode::full_node(source_rpc, self.sync.readiness_threshold),
ntx_builder: None,
grpc_options: runtime.external_grpc_options,
network_tx_auth,
};
let mut tasks = Tasks::new();
tasks.spawn("RPC sync", sync.run());
tasks.spawn("RPC server", rpc.serve());

tasks.join_next_as_error().await
Expand Down
10 changes: 10 additions & 0 deletions bin/node/src/commands/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,4 +124,14 @@ pub struct SyncOptions {
value_name = "URL"
)]
pub block_source_url: Url,

// Number of blocks that this RPC server must be within that of the sync source to be considered
// ready.
#[arg(
long = "sync.ready-threshold",
env = "MIDEN_NODE_SYNC_READY_THRESHOLD",
value_name = "NUM",
default_value_t = 10
)]
pub readiness_threshold: u32,
Comment thread
Mirko-von-Leipzig marked this conversation as resolved.
}
12 changes: 0 additions & 12 deletions bin/ntx-builder/.env

This file was deleted.

6 changes: 0 additions & 6 deletions bin/remote-prover/.env

This file was deleted.

9 changes: 0 additions & 9 deletions bin/validator/.env

This file was deleted.

1 change: 1 addition & 0 deletions crates/block-producer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ thiserror = { workspace = true }
tokio = { features = ["macros", "net", "rt-multi-thread"], workspace = true }
tokio-stream = { workspace = true }
tonic = { default-features = true, features = ["transport"], workspace = true }
tonic-health = { workspace = true }
tracing = { workspace = true }
url = { workspace = true }

Expand Down
2 changes: 1 addition & 1 deletion crates/block-producer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ mod errors;
pub mod server;
pub use errors::MempoolSubmissionError;
pub use proof_scheduler::DEFAULT_MAX_CONCURRENT_PROOFS;
pub use rpc_sync::RpcSync;
pub use rpc_sync::{RpcReadiness, RpcSync};
pub use server::{
BlockProducerApi,
BlockProducerApiConfig,
Expand Down
40 changes: 40 additions & 0 deletions crates/block-producer/src/rpc_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,51 @@ use miden_node_utils::tasks::Tasks;
use miden_protocol::block::{BlockNumber, SignedBlock};
use miden_protocol::utils::serde::Deserializable;
use tokio_stream::StreamExt;
use tonic_health::ServingStatus;
use tonic_health::server::HealthReporter;
use tracing::{info, warn};

pub(crate) const RECONNECT_DELAY: Duration = Duration::from_secs(5);

// RPC READINESS
// ================================================================================================

/// Tracks readiness of the RPC API service for a full-node.
///
/// Holds the gRPC [`HealthReporter`] and the readiness threshold. Created by [`Rpc::serve`]
/// once the health pair is available and passed directly into [`BlockSync`].
#[derive(Clone)]
pub struct RpcReadiness {
reporter: HealthReporter,
threshold: u32,
}

impl RpcReadiness {
const SERVICE_NAME: &'static str = "rpc.Api";

pub fn new(reporter: HealthReporter, threshold: u32) -> Self {
Self { reporter, threshold }
}

/// Updates the RPC service health status based on the upstream/local tip gap.
pub async fn update(&self, upstream_tip: BlockNumber, local_tip: BlockNumber) {
let status = if upstream_tip.as_u32().saturating_sub(local_tip.as_u32()) <= self.threshold {
ServingStatus::Serving
} else {
ServingStatus::NotServing
};
self.reporter.set_service_status(Self::SERVICE_NAME, status).await;
}
}
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Mirko-von-Leipzig Wondering why the rpc sync code is in block producer crate. Maybe in followup I can look at moving the sync related stuff out? Unsure

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't sure where else to put it.

I'd like the store crate to be purely about storing stuff as an API - and not knowing about RPC, gRPC etc. So it shouldn't live there.

I think the options are either block-producer, or in its own separate crate? But I think it sort of makes sense if we think of block-producer being the source of blocks. Which is therefore either

  • the sequencer stuff, or
  • the sync stuff

I don't feel strongly about this 🤷 If you have ideas, or maybe a better name for this crate could be an option?


// RPC SYNC
// ================================================================================================

/// Synchronizes local state from an upstream RPC service.
pub struct RpcSync {
pub state: Arc<State>,
pub source_rpc: RpcClient,
pub readiness: RpcReadiness,
}

impl RpcSync {
Expand All @@ -30,6 +64,7 @@ impl RpcSync {
let block_sync = BlockSync {
state: Arc::clone(&self.state),
source_rpc: self.source_rpc.clone(),
readiness: self.readiness,
};
let proof_sync = ProofSync {
state: self.state,
Expand All @@ -49,6 +84,7 @@ impl RpcSync {
struct BlockSync {
state: Arc<State>,
source_rpc: RpcClient,
readiness: RpcReadiness,
}

struct ProofSync {
Expand Down Expand Up @@ -86,9 +122,13 @@ impl BlockSync {

while let Some(result) = stream.next().await {
let event = result?;
let upstream_tip = BlockNumber::from(event.committed_chain_tip);
let block = SignedBlock::read_from_bytes(&event.block)
.context("failed to deserialize block from upstream")?;
self.state.apply_block(block).await?;

let local_tip = self.state.chain_tip(Finality::Committed).await;
self.readiness.update(upstream_tip, local_tip).await;
}

Ok(())
Expand Down
3 changes: 2 additions & 1 deletion crates/rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,10 @@ miden-tx = { features = ["concurrent"], workspace = true }
miden-tx-batch-prover = { workspace = true }
semver = { version = "1.0" }
thiserror = { workspace = true }
tokio = { features = ["macros", "net", "rt-multi-thread"], workspace = true }
tokio = { features = ["macros", "net", "rt-multi-thread", "time"], workspace = true }
tokio-stream = { features = ["net"], workspace = true }
tonic = { default-features = true, features = ["tls-native-roots", "tls-ring"], workspace = true }
tonic-health = { workspace = true }
tonic-reflection = { workspace = true }
tonic-web = { workspace = true }
tower = { workspace = true }
Expand Down
8 changes: 4 additions & 4 deletions crates/rpc/src/server/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -798,7 +798,7 @@ impl api_server::Api for RpcService {
RpcMode::Sequencer { block_producer, validator } => {
(block_producer.as_ref(), validator.as_ref())
},
RpcMode::FullNode { source_rpc } => {
RpcMode::FullNode { source_rpc, .. } => {
return source_rpc.as_ref().clone().submit_proven_tx(request).await;
},
};
Expand Down Expand Up @@ -915,7 +915,7 @@ impl api_server::Api for RpcService {
RpcMode::Sequencer { block_producer, validator } => {
(block_producer.as_ref(), validator.as_ref())
},
RpcMode::FullNode { source_rpc } => {
RpcMode::FullNode { source_rpc, .. } => {
return source_rpc.as_ref().clone().submit_proven_tx_batch(request).await;
},
};
Expand Down Expand Up @@ -994,7 +994,7 @@ impl api_server::Api for RpcService {
RpcMode::Sequencer { block_producer, .. } => {
Some(block_producer_status_to_proto(block_producer.status().await))
},
RpcMode::FullNode { source_rpc } => source_rpc
RpcMode::FullNode { source_rpc, .. } => source_rpc
.as_ref()
.clone()
.status(Request::new(()))
Expand Down Expand Up @@ -1041,7 +1041,7 @@ impl api_server::Api for RpcService {

ntx_builder.clone().get_network_note_status(request).await?.into_inner()
},
RpcMode::FullNode { source_rpc } => {
RpcMode::FullNode { source_rpc, .. } => {
source_rpc.as_ref().clone().get_network_note_status(request).await?.into_inner()
},
};
Expand Down
Loading
Loading