feat: added replcator crate with basic protocol impl#1007
feat: added replcator crate with basic protocol impl#1007bmuddha wants to merge 3 commits intobmuddha/scheduler/dual-modefrom
Conversation
📝 WalkthroughWalkthroughThe changes introduce a new "magicblock-replicator" workspace member to the Rust project, adding a complete state replication protocol library. The addition includes Cargo.toml configuration with workspace-managed dependencies, protocol definitions with a Message enum supporting handshakes, transactions, blocks, and failover signals, a connection layer providing async message framing using 4-byte little-endian length prefixes with bincode serialization, error handling types, and TCP transport utilities for sender/receiver patterns. Assessment against linked issues
Out-of-scope changes
Suggested reviewers
✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
Warning This pull request is not mergeable via GitHub because a downstack PR is open. Once all requirements are satisfied, merge this PR as a stack on Graphite.
This stack of pull requests is managed by Graphite. Learn more about stacking. |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@magicblock-replicator/src/connection.rs`:
- Around line 19-30: MessageEncoder::encode can produce frames larger than the
Receiver's 64KB limit, causing the remote side to reject them; update the
Encoder (MessageEncoder::encode) to compute the serialized message size (e.g.,
via bincode::serialized_size or by serializing into a temporary buffer), compare
it against the same 64 * 1024 frame limit used by the Receiver, and return an
Err(Error) if it exceeds that limit instead of writing an oversize frame; keep
the existing write path when size is within the limit so the existing
dst.put_u32_le / serialize_into logic is unchanged.
ℹ️ Review info
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (7)
Cargo.tomlmagicblock-replicator/Cargo.tomlmagicblock-replicator/src/connection.rsmagicblock-replicator/src/error.rsmagicblock-replicator/src/lib.rsmagicblock-replicator/src/proto.rsmagicblock-replicator/src/tcp.rs
| impl tokio_util::codec::Encoder<Message> for MessageEncoder { | ||
| type Error = Error; | ||
|
|
||
| fn encode(&mut self, msg: Message, dst: &mut BytesMut) -> Result<()> { | ||
| let start = dst.len(); | ||
| dst.put_u32_le(0); | ||
| bincode::serialize_into(dst.writer(), &msg)?; | ||
| let len = (dst.len() - start - 4) as u32; | ||
| dst[start..start + 4].copy_from_slice(&len.to_le_bytes()); | ||
| Ok(()) | ||
| } | ||
| } |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Consider aligning encoder with receiver's frame limit.
The Receiver enforces a 64KB max frame length (line 41), but MessageEncoder has no corresponding limit. If a Message serializes to >64KB, the sender will transmit it successfully, but the receiver will reject it with a frame-length error, making debugging non-obvious.
♻️ Suggested improvement
Add a frame size check in the encoder:
fn encode(&mut self, msg: Message, dst: &mut BytesMut) -> Result<()> {
let start = dst.len();
dst.put_u32_le(0);
bincode::serialize_into(dst.writer(), &msg)?;
let len = (dst.len() - start - 4) as u32;
+ if len > 64 * 1024 {
+ return Err(Error::Io(std::io::Error::new(
+ std::io::ErrorKind::InvalidData,
+ "message exceeds max frame length",
+ )));
+ }
dst[start..start + 4].copy_from_slice(&len.to_le_bytes());
Ok(())
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@magicblock-replicator/src/connection.rs` around lines 19 - 30,
MessageEncoder::encode can produce frames larger than the Receiver's 64KB limit,
causing the remote side to reject them; update the Encoder
(MessageEncoder::encode) to compute the serialized message size (e.g., via
bincode::serialized_size or by serializing into a temporary buffer), compare it
against the same 64 * 1024 frame limit used by the Receiver, and return an
Err(Error) if it exceeds that limit instead of writing an oversize frame; keep
the existing write path when size is within the limit so the existing
dst.put_u32_le / serialize_into logic is unchanged.
1acdff2 to
6e3a51e
Compare
23c811a to
be09338
Compare
6e3a51e to
936c1c0
Compare
be09338 to
31292d0
Compare
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@magicblock-replicator/src/proto.rs`:
- Around line 83-145: The signatures are created over raw bytes that overlap
across message types and omit the protocol version; update all signing and
verification to domain-separate and bind version by including an explicit
context string and the protocol version in the signed payload. Concretely,
change each signer call (the constructors that call keypair.sign_message in the
implementations for the first message type (the HandshakeRequest-like impl),
HandshakeResponse::new, and FailoverSignal::new) to sign a concatenation like
b"<MSG_TYPE>|v{PROTOCOL_VERSION}|"+payload (use a distinct MSG_TYPE per struct,
e.g., "HandshakeRequest", "HandshakeResponse", "FailoverSignal"), and update the
corresponding verify methods (the verify implementations for those types) to
reconstruct the same domain-separated bytes (including version) before calling
signature.verify; do the same for error branches in
HandshakeResponse::new/verify so both Ok and Err paths use the same
domain-separated scheme. Ensure VersionedTransaction/Transaction::decode is
unaffected but do not sign raw payloads elsewhere.
In `@magicblock-replicator/src/tcp.rs`:
- Around line 15-23: The connect/split helpers currently hard-code Tokio
TcpStream and so cannot support the WebSocket framing required by the objective;
replace or refactor TcpSender/TcpReceiver creation so the transport is
abstracted and supports WebSocket-frame wrapping (type prefix + bincode
payload). Update the public functions connect and split (and any constructors
like Sender::new / Receiver::new) to work against a Transport trait (or separate
WsStream implementation) that implements send/receive with WebSocket framing, or
provide alternate connect_ws/split_ws that wrap a WebSocket stream into
TcpSender/TcpReceiver equivalents using the required frame format; ensure the
new implementation still returns the same TcpSender/TcpReceiver types or
introduces clearly named WsSender/WsReceiver to avoid breaking callers.
ℹ️ Review info
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (8)
Cargo.tomlmagicblock-replicator/Cargo.tomlmagicblock-replicator/src/connection.rsmagicblock-replicator/src/error.rsmagicblock-replicator/src/lib.rsmagicblock-replicator/src/proto.rsmagicblock-replicator/src/tcp.rsmagicblock-replicator/src/tests.rs
| pub fn new(start_slot: Slot, keypair: &Keypair) -> Self { | ||
| Self { | ||
| version: PROTOCOL_VERSION, | ||
| start_slot, | ||
| identity: keypair.pubkey(), | ||
| signature: keypair.sign_message(&start_slot.to_le_bytes()), | ||
| } | ||
| } | ||
|
|
||
| /// Verifies signature matches claimed identity. | ||
| pub fn verify(&self) -> bool { | ||
| self.signature | ||
| .verify(self.identity.as_array(), &self.start_slot.to_le_bytes()) | ||
| } | ||
| } | ||
|
|
||
| impl HandshakeResponse { | ||
| pub fn new(result: Result<Slot>, keypair: &Keypair) -> Self { | ||
| let result = result.map_err(|e| e.to_string()); | ||
| let signature = match &result { | ||
| Ok(slot) => keypair.sign_message(&slot.to_le_bytes()), | ||
| Err(err) => keypair.sign_message(err.as_bytes()), | ||
| }; | ||
| Self { | ||
| result, | ||
| identity: keypair.pubkey(), | ||
| signature, | ||
| } | ||
| } | ||
|
|
||
| /// Verifies signature matches server identity. | ||
| pub fn verify(&self) -> bool { | ||
| match &self.result { | ||
| Ok(slot) => self | ||
| .signature | ||
| .verify(self.identity.as_array(), &slot.to_le_bytes()), | ||
| Err(err) => self | ||
| .signature | ||
| .verify(self.identity.as_array(), err.as_bytes()), | ||
| } | ||
| } | ||
| } | ||
|
|
||
| impl Transaction { | ||
| /// Deserializes the inner transaction. | ||
| pub fn decode(&self) -> bincode::Result<VersionedTransaction> { | ||
| bincode::deserialize(&self.payload) | ||
| } | ||
| } | ||
|
|
||
| impl FailoverSignal { | ||
| pub fn new(slot: Slot, keypair: &Keypair) -> Self { | ||
| Self { | ||
| slot, | ||
| signature: keypair.sign_message(&slot.to_le_bytes()), | ||
| } | ||
| } | ||
|
|
||
| /// Verifies signal against expected identity. | ||
| pub fn verify(&self, identity: Pubkey) -> bool { | ||
| self.signature | ||
| .verify(identity.as_array(), &self.slot.to_le_bytes()) | ||
| } |
There was a problem hiding this comment.
Signatures are replayable across message types and do not bind protocol version.
Line 88, Line 103, and Line 137 sign overlapping raw payload shapes (not domain-separated), so signatures can be transplanted between message types that share the same bytes. Also, Line 93-Line 96 does not bind version, so version tampering is not detected by verify().
Suggested hardening
+const TAG_HANDSHAKE_REQ: u8 = 1;
+const TAG_HANDSHAKE_RESP: u8 = 2;
+const TAG_FAILOVER: u8 = 3;
+
+fn preimage(tag: u8, version: u32, body: &[u8]) -> Vec<u8> {
+ let mut out = Vec::with_capacity(1 + 4 + body.len());
+ out.push(tag);
+ out.extend_from_slice(&version.to_le_bytes());
+ out.extend_from_slice(body);
+ out
+}
+
impl HandshakeRequest {
pub fn new(start_slot: Slot, keypair: &Keypair) -> Self {
+ let version = PROTOCOL_VERSION;
+ let msg = preimage(TAG_HANDSHAKE_REQ, version, &start_slot.to_le_bytes());
Self {
- version: PROTOCOL_VERSION,
+ version,
start_slot,
identity: keypair.pubkey(),
- signature: keypair.sign_message(&start_slot.to_le_bytes()),
+ signature: keypair.sign_message(&msg),
}
}
pub fn verify(&self) -> bool {
- self.signature
- .verify(self.identity.as_array(), &self.start_slot.to_le_bytes())
+ let msg = preimage(TAG_HANDSHAKE_REQ, self.version, &self.start_slot.to_le_bytes());
+ self.signature.verify(self.identity.as_array(), &msg)
}
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@magicblock-replicator/src/proto.rs` around lines 83 - 145, The signatures are
created over raw bytes that overlap across message types and omit the protocol
version; update all signing and verification to domain-separate and bind version
by including an explicit context string and the protocol version in the signed
payload. Concretely, change each signer call (the constructors that call
keypair.sign_message in the implementations for the first message type (the
HandshakeRequest-like impl), HandshakeResponse::new, and FailoverSignal::new) to
sign a concatenation like b"<MSG_TYPE>|v{PROTOCOL_VERSION}|"+payload (use a
distinct MSG_TYPE per struct, e.g., "HandshakeRequest", "HandshakeResponse",
"FailoverSignal"), and update the corresponding verify methods (the verify
implementations for those types) to reconstruct the same domain-separated bytes
(including version) before calling signature.verify; do the same for error
branches in HandshakeResponse::new/verify so both Ok and Err paths use the same
domain-separated scheme. Ensure VersionedTransaction/Transaction::decode is
unaffected but do not sign raw payloads elsewhere.
| /// Connects to a primary at `addr`, returning (sender, receiver). | ||
| pub async fn connect(addr: SocketAddr) -> io::Result<(TcpSender, TcpReceiver)> { | ||
| TcpStream::connect(addr).await.map(split) | ||
| } | ||
|
|
||
| /// Splits a TCP stream into sender and receiver halves. | ||
| pub fn split(stream: TcpStream) -> (TcpSender, TcpReceiver) { | ||
| let (rx, tx) = stream.into_split(); | ||
| (Sender::new(tx), Receiver::new(rx)) |
There was a problem hiding this comment.
Transport API is TCP-only, but the objective specifies WebSocket framing.
The linked objective calls for a WebSocket frame wrapper with a type prefix + bincode payload. This module currently hard-codes TcpStream split/connect helpers, so it won’t interoperate with a WS-based replication transport as specified.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@magicblock-replicator/src/tcp.rs` around lines 15 - 23, The connect/split
helpers currently hard-code Tokio TcpStream and so cannot support the WebSocket
framing required by the objective; replace or refactor TcpSender/TcpReceiver
creation so the transport is abstracted and supports WebSocket-frame wrapping
(type prefix + bincode payload). Update the public functions connect and split
(and any constructors like Sender::new / Receiver::new) to work against a
Transport trait (or separate WsStream implementation) that implements
send/receive with WebSocket framing, or provide alternate connect_ws/split_ws
that wrap a WebSocket stream into TcpSender/TcpReceiver equivalents using the
required frame format; ensure the new implementation still returns the same
TcpSender/TcpReceiver types or introduces clearly named WsSender/WsReceiver to
avoid breaking callers.

Summary
Adds a new magicblock-replicator crate, with minimal
replication protocol design. This should lay the groundwork
for the future replication service implementation.
Compatibility
Checklist
Summary by CodeRabbit
New Features
Tests
Chores