Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions crates/net/p2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ use crate::{
publish_attestation, publish_block,
},
req_resp::{
BLOCKS_BY_ROOT_PROTOCOL_V1, Codec, MAX_COMPRESSED_PAYLOAD_SIZE, Request,
STATUS_PROTOCOL_V1, build_status, fetch_block_from_peer,
BLOCKS_BY_RANGE_PROTOCOL_V1, BLOCKS_BY_ROOT_PROTOCOL_V1, Codec,
MAX_COMPRESSED_PAYLOAD_SIZE, Request, STATUS_PROTOCOL_V1, build_status,
fetch_block_from_peer,
},
swarm_adapter::SwarmHandle,
};
Expand Down Expand Up @@ -154,6 +155,10 @@ pub fn build_swarm(
StreamProtocol::new(BLOCKS_BY_ROOT_PROTOCOL_V1),
request_response::ProtocolSupport::Full,
),
(
StreamProtocol::new(BLOCKS_BY_RANGE_PROTOCOL_V1),
request_response::ProtocolSupport::Full,
),
],
Default::default(),
);
Expand Down
26 changes: 18 additions & 8 deletions crates/net/p2p/src/req_resp/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use tracing::{debug, trace, warn};
use super::{
encoding::{MAX_PAYLOAD_SIZE, decode_payload, write_payload},
messages::{
BLOCKS_BY_ROOT_PROTOCOL_V1, ErrorMessage, Request, Response, ResponseCode, ResponsePayload,
STATUS_PROTOCOL_V1, Status,
BLOCKS_BY_RANGE_PROTOCOL_V1, BLOCKS_BY_ROOT_PROTOCOL_V1, ErrorMessage, Request, Response,
ResponseCode, ResponsePayload, STATUS_PROTOCOL_V1, Status,
},
};

Expand All @@ -21,6 +21,7 @@ fn protocol_label(protocol: &str) -> &'static str {
match protocol {
STATUS_PROTOCOL_V1 => "status",
BLOCKS_BY_ROOT_PROTOCOL_V1 => "blocks_by_root",
BLOCKS_BY_RANGE_PROTOCOL_V1 => "blocks_by_range",
_ => "unknown",
}
}
Expand Down Expand Up @@ -59,6 +60,12 @@ impl libp2p::request_response::Codec for Codec {
})?;
Ok(Request::BlocksByRoot(request))
}
BLOCKS_BY_RANGE_PROTOCOL_V1 => {
let request = SszDecode::from_ssz_bytes(&payload).map_err(|err| {
io::Error::new(io::ErrorKind::InvalidData, format!("{err:?}"))
})?;
Ok(Request::BlocksByRange(request))
}
_ => Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("unknown protocol: {}", protocol.as_ref()),
Expand All @@ -77,7 +84,9 @@ impl libp2p::request_response::Codec for Codec {
let label = protocol_label(protocol.as_ref());
match protocol.as_ref() {
STATUS_PROTOCOL_V1 => decode_status_response(io, label).await,
BLOCKS_BY_ROOT_PROTOCOL_V1 => decode_blocks_by_root_response(io, label).await,
BLOCKS_BY_ROOT_PROTOCOL_V1 | BLOCKS_BY_RANGE_PROTOCOL_V1 => {
decode_blocks_response(io, label).await
}
_ => Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("unknown protocol: {}", protocol.as_ref()),
Expand All @@ -99,6 +108,7 @@ impl libp2p::request_response::Codec for Codec {
let encoded = match req {
Request::Status(status) => status.to_ssz(),
Request::BlocksByRoot(request) => request.to_ssz(),
Request::BlocksByRange(request) => request.to_ssz(),
};

let compressed_size = write_payload(io, &encoded).await?;
Expand Down Expand Up @@ -132,7 +142,7 @@ impl libp2p::request_response::Codec for Codec {
);
Ok(())
}
ResponsePayload::BlocksByRoot(blocks) => {
ResponsePayload::Blocks(blocks) => {
// Write each block as a separate chunk.
// Encode first, then check size before writing the SUCCESS
// code byte. This avoids corrupting the stream if a block
Expand All @@ -143,7 +153,7 @@ impl libp2p::request_response::Codec for Codec {
if encoded.len() > MAX_PAYLOAD_SIZE - 1024 {
warn!(
size = encoded.len(),
"Skipping oversized block in BlocksByRoot response"
"Skipping oversized block in block response"
);
continue;
}
Expand Down Expand Up @@ -230,7 +240,7 @@ where
Ok(Response::success(ResponsePayload::Status(status)))
}

/// Decodes a BlocksByRoot protocol response from a multi-chunk response stream.
/// Decodes a block protocol response from a multi-chunk response stream.
///
/// Reads chunks until EOF, collecting successfully decoded blocks. Each chunk has
/// its own response code - chunks with error codes are logged and skipped rather
Expand All @@ -253,7 +263,7 @@ where
///
/// Note: Error chunks from the peer (non-SUCCESS response codes) do not cause this
/// function to return `Err` - they are logged and skipped.
async fn decode_blocks_by_root_response<T>(io: &mut T, protocol_label: &str) -> io::Result<Response>
async fn decode_blocks_response<T>(io: &mut T, protocol_label: &str) -> io::Result<Response>
where
T: AsyncRead + Unpin + Send,
{
Expand Down Expand Up @@ -291,5 +301,5 @@ where
blocks.push(block);
}

Ok(Response::success(ResponsePayload::BlocksByRoot(blocks)))
Ok(Response::success(ResponsePayload::Blocks(blocks)))
}
175 changes: 167 additions & 8 deletions crates/net/p2p/src/req_resp/handlers.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};

use ethlambda_storage::Store;
use libp2p::{PeerId, request_response};
Expand All @@ -12,7 +12,9 @@ use ethlambda_types::primitives::HashTreeRoot as _;
use ethlambda_types::{block::SignedBlock, primitives::H256};

use super::{
BLOCKS_BY_ROOT_PROTOCOL_V1, BlocksByRootRequest, Request, Response, ResponsePayload, Status,
BLOCKS_BY_ROOT_PROTOCOL_V1, BlocksByRangeRequest, BlocksByRootRequest, MAX_REQUEST_BLOCKS,
Request, Response, ResponsePayload, Status,
messages::{ResponseCode, error_message},
};
use crate::{
BACKOFF_MULTIPLIER, INITIAL_BACKOFF_MS, MAX_FETCH_RETRIES, P2PServer, PendingRequest,
Expand Down Expand Up @@ -42,6 +44,13 @@ pub async fn handle_req_resp_message(
);
handle_blocks_by_root_request(server, request, channel, peer).await;
}
Request::BlocksByRange(request) => {
info!(
kind = "blocks_by_range_request",
peer_count, "P2P message received"
);
handle_blocks_by_range_request(server, request, channel, peer).await;
}
}
}
request_response::Message::Response {
Expand All @@ -55,11 +64,8 @@ pub async fn handle_req_resp_message(
info!(kind = "status_response", peer_count, "P2P message received");
handle_status_response(status, peer).await;
}
ResponsePayload::BlocksByRoot(blocks) => {
info!(
kind = "blocks_by_root_response",
peer_count, "P2P message received"
);
ResponsePayload::Blocks(blocks) => {
info!(kind = "blocks_response", peer_count, "P2P message received");
handle_blocks_by_root_response(server, blocks, peer, request_id, ctx)
.await;
}
Expand Down Expand Up @@ -136,10 +142,99 @@ async fn handle_blocks_by_root_request(
let found = blocks.len();
info!(%peer, num_roots, found, "Responding to BlocksByRoot request");

let response = Response::success(ResponsePayload::BlocksByRoot(blocks));
let response = Response::success(ResponsePayload::Blocks(blocks));
server.swarm_handle.send_response(channel, response);
}

async fn handle_blocks_by_range_request(
server: &mut P2PServer,
request: BlocksByRangeRequest,
channel: request_response::ResponseChannel<Response>,
peer: PeerId,
) {
info!(
%peer,
start_slot = request.start_slot,
count = request.count,
step = request.step,
"Received BlocksByRange request"
);

if request.step == 0 || request.count == 0 || request.count > MAX_REQUEST_BLOCKS {
let response = Response::error(
ResponseCode::INVALID_REQUEST,
error_message("invalid BlocksByRange request"),
);
server.swarm_handle.send_response(channel, response);
return;
}

let blocks = canonical_blocks_by_range(
&server.store,
request.start_slot,
request.count,
request.step,
);

info!(
%peer,
start_slot = request.start_slot,
count = request.count,
step = request.step,
found = blocks.len(),
"Responding to BlocksByRange request"
);

let response = Response::success(ResponsePayload::Blocks(blocks));
server.swarm_handle.send_response(channel, response);
}

fn canonical_blocks_by_range(
store: &Store,
start_slot: u64,
count: u64,
step: u64,
) -> Vec<SignedBlock> {
if count == 0 {
return Vec::new();
}

let Some(end_slot) = count
.checked_sub(1)
.and_then(|value| value.checked_mul(step))
.and_then(|last_offset| start_slot.checked_add(last_offset))
else {
return Vec::new();
};

let mut roots_by_slot = HashMap::new();
let mut current_root = store.head();

while !current_root.is_zero() {
Comment thread
MegaRedHand marked this conversation as resolved.
let Some(header) = store.get_block_header(&current_root) else {
break;
};

if header.slot < start_slot {
break;
}

if header.slot <= end_slot && (header.slot - start_slot).is_multiple_of(step) {
roots_by_slot.insert(header.slot, current_root);
}

current_root = header.parent_root;
}

(0..count)
.filter_map(|index| {
let slot = start_slot.checked_add(index.checked_mul(step)?)?;
let root = roots_by_slot.get(&slot)?;
store.get_signed_block(root)
})
.collect()
}
Comment thread
MegaRedHand marked this conversation as resolved.

async fn handle_blocks_by_root_response(
server: &mut P2PServer,
blocks: Vec<SignedBlock>,
Expand Down Expand Up @@ -313,3 +408,67 @@ async fn handle_fetch_failure(

send_after(backoff, ctx.clone(), p2p_protocol::RetryBlockFetch { root });
}

#[cfg(test)]
mod tests {
use super::*;
use ethlambda_storage::{ForkCheckpoints, backend::InMemoryBackend};
use ethlambda_types::{
attestation::XmssSignature,
block::{Block, BlockBody, BlockSignatures},
signature::SIGNATURE_SIZE,
state::State,
};
use libssz_types::SszList;
use std::sync::Arc;

fn signed_block(slot: u64, parent_root: H256) -> SignedBlock {
SignedBlock {
message: Block {
slot,
proposer_index: 0,
parent_root,
state_root: H256::ZERO,
body: BlockBody::default(),
},
signature: BlockSignatures {
attestation_signatures: SszList::new(),
proposer_signature: XmssSignature::try_from(vec![0u8; SIGNATURE_SIZE]).unwrap(),
},
}
}

#[test]
fn blocks_by_range_returns_canonical_blocks_in_requested_order() {
let backend = Arc::new(InMemoryBackend::new());
let mut store = Store::from_anchor_state(backend, State::from_genesis(0, vec![]));

let block_1 = signed_block(1, store.head());
let root_1 = block_1.message.hash_tree_root();
store.insert_signed_block(root_1, block_1);

let block_2 = signed_block(2, root_1);
let root_2 = block_2.message.hash_tree_root();
store.insert_signed_block(root_2, block_2);

let side_block_3 = signed_block(3, root_1);
let side_root_3 = side_block_3.message.hash_tree_root();
store.insert_signed_block(side_root_3, side_block_3);

let block_4 = signed_block(4, root_2);
let root_4 = block_4.message.hash_tree_root();
store.insert_signed_block(root_4, block_4);
store.update_checkpoints(ForkCheckpoints::head_only(root_4));

let blocks = canonical_blocks_by_range(&store, 1, 4, 1);
let slots: Vec<_> = blocks.iter().map(|block| block.message.slot).collect();
let roots: Vec<_> = blocks
.iter()
.map(|block| block.message.hash_tree_root())
.collect();

assert_eq!(slots, vec![1, 2, 4]);
assert_eq!(roots, vec![root_1, root_2, root_4]);
assert!(!roots.contains(&side_root_3));
}
}
14 changes: 11 additions & 3 deletions crates/net/p2p/src/req_resp/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@ use libssz_types::SszList;

pub const STATUS_PROTOCOL_V1: &str = "/leanconsensus/req/status/1/ssz_snappy";
pub const BLOCKS_BY_ROOT_PROTOCOL_V1: &str = "/leanconsensus/req/blocks_by_root/1/ssz_snappy";
pub const BLOCKS_BY_RANGE_PROTOCOL_V1: &str = "/leanconsensus/req/blocks_by_range/1/ssz_snappy";
pub const MAX_REQUEST_BLOCKS: u64 = 1024; // Maximum number of blocks in a single request (1024).

#[derive(Debug, Clone)]
pub enum Request {
Status(Status),
BlocksByRoot(BlocksByRootRequest),
BlocksByRange(BlocksByRangeRequest),
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -88,7 +91,7 @@ impl std::fmt::Debug for ResponseCode {
#[allow(clippy::large_enum_variant)]
pub enum ResponsePayload {
Status(Status),
BlocksByRoot(Vec<SignedBlock>),
Blocks(Vec<SignedBlock>),
}

#[derive(Debug, Clone, SszEncode, SszDecode)]
Expand All @@ -106,8 +109,6 @@ pub type ErrorMessage = SszList<u8, 256>;
/// Helper to create an ErrorMessage from a string.
/// Debug builds panic if message exceeds 256 bytes (programming error).
/// Release builds truncate to 256 bytes.
#[expect(dead_code)]
// TODO: map errors to req/resp error messages
pub fn error_message(msg: impl AsRef<str>) -> ErrorMessage {
let bytes = msg.as_ref().as_bytes();
debug_assert!(
Expand All @@ -130,3 +131,10 @@ pub fn error_message(msg: impl AsRef<str>) -> ErrorMessage {
pub struct BlocksByRootRequest {
pub roots: RequestedBlockRoots,
}

#[derive(Debug, Clone, SszEncode, SszDecode)]
pub struct BlocksByRangeRequest {
pub start_slot: u64,
pub count: u64,
pub step: u64,
}
3 changes: 2 additions & 1 deletion crates/net/p2p/src/req_resp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ pub use codec::Codec;
pub use encoding::{MAX_COMPRESSED_PAYLOAD_SIZE, MAX_PAYLOAD_SIZE};
pub use handlers::{build_status, fetch_block_from_peer, handle_req_resp_message};
pub use messages::{
BLOCKS_BY_ROOT_PROTOCOL_V1, BlocksByRootRequest, Request, RequestedBlockRoots, Response,
BLOCKS_BY_RANGE_PROTOCOL_V1, BLOCKS_BY_ROOT_PROTOCOL_V1, BlocksByRangeRequest,
BlocksByRootRequest, MAX_REQUEST_BLOCKS, Request, RequestedBlockRoots, Response,
ResponsePayload, STATUS_PROTOCOL_V1, Status,
};