Skip to content

feat: added replcator crate with basic protocol impl#1007

Draft
bmuddha wants to merge 3 commits intobmuddha/scheduler/dual-modefrom
bmuddha/replication/protocol
Draft

feat: added replcator crate with basic protocol impl#1007
bmuddha wants to merge 3 commits intobmuddha/scheduler/dual-modefrom
bmuddha/replication/protocol

Conversation

@bmuddha
Copy link
Collaborator

@bmuddha bmuddha commented Mar 1, 2026

Summary

Adds a new magicblock-replicator crate, with minimal
replication protocol design. This should lay the groundwork
for the future replication service implementation.

Compatibility

  • No breaking changes

Checklist

Summary by CodeRabbit

  • New Features

    • Added a state replication protocol library with length-prefixed streaming messaging, signed handshake/replication messages, transaction/block/superblock semantics, and async sender/receiver primitives.
    • Added TCP transport utilities to connect/split streams into sender/receiver halves.
  • Tests

    • Added comprehensive end-to-end tests validating wire-format, signatures, framing, ordering, and large-payload behavior.
  • Chores

    • Added "magicblock-replicator" workspace member and required dependency updates.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Mar 1, 2026

📝 Walkthrough

Walkthrough

The changes introduce a new "magicblock-replicator" workspace member to the Rust project, adding a complete state replication protocol library. The addition includes Cargo.toml configuration with workspace-managed dependencies, protocol definitions with a Message enum supporting handshakes, transactions, blocks, and failover signals, a connection layer providing async message framing using 4-byte little-endian length prefixes with bincode serialization, error handling types, and TCP transport utilities for sender/receiver patterns.

Assessment against linked issues

Objective (issue) Addressed Explanation
Define message types and wire-format (message types defined) (951)
Bincode serialization/deserialization works (951)
WebSocket frame wrapper implemented (951) The PR implements a 4-byte little-endian length-prefixed codec (LengthDelimitedCodec + MessageEncoder), not a WebSocket frame wrapper.
Unit tests for round-trip encoding/decoding (951)

Out-of-scope changes

Code Change Explanation
New protocol signing/verification using Keypair/Pubkey/Signature (magicblock-replicator/src/proto.rs) The linked issue requests wire protocol and codec; introducing cryptographic signature types and verification logic (solana Keypair/Pubkey/Signature) is not specified in the issue.
Addition of multiple Solana-specific workspace dependencies (Cargo.toml workspace.dependencies / magicblock-replicator/Cargo.toml) The issue focuses on message types, bincode, and frames; adding many solana-* crates and workspace-level bytes dep expands scope beyond the stated acceptance criteria.

Suggested reviewers

  • thlorenz
  • GabrielePicco
  • Dodecahedr0x
✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch bmuddha/replication/protocol

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Collaborator Author

bmuddha commented Mar 1, 2026

Warning

This pull request is not mergeable via GitHub because a downstack PR is open. Once all requirements are satisfied, merge this PR as a stack on Graphite.
Learn more

This stack of pull requests is managed by Graphite. Learn more about stacking.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@magicblock-replicator/src/connection.rs`:
- Around line 19-30: MessageEncoder::encode can produce frames larger than the
Receiver's 64KB limit, causing the remote side to reject them; update the
Encoder (MessageEncoder::encode) to compute the serialized message size (e.g.,
via bincode::serialized_size or by serializing into a temporary buffer), compare
it against the same 64 * 1024 frame limit used by the Receiver, and return an
Err(Error) if it exceeds that limit instead of writing an oversize frame; keep
the existing write path when size is within the limit so the existing
dst.put_u32_le / serialize_into logic is unchanged.

ℹ️ Review info

Configuration used: Path: .coderabbit.yaml

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 1acdff2 and 23c811a.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (7)
  • Cargo.toml
  • magicblock-replicator/Cargo.toml
  • magicblock-replicator/src/connection.rs
  • magicblock-replicator/src/error.rs
  • magicblock-replicator/src/lib.rs
  • magicblock-replicator/src/proto.rs
  • magicblock-replicator/src/tcp.rs

Comment on lines 19 to 30
impl tokio_util::codec::Encoder<Message> for MessageEncoder {
type Error = Error;

fn encode(&mut self, msg: Message, dst: &mut BytesMut) -> Result<()> {
let start = dst.len();
dst.put_u32_le(0);
bincode::serialize_into(dst.writer(), &msg)?;
let len = (dst.len() - start - 4) as u32;
dst[start..start + 4].copy_from_slice(&len.to_le_bytes());
Ok(())
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Consider aligning encoder with receiver's frame limit.

The Receiver enforces a 64KB max frame length (line 41), but MessageEncoder has no corresponding limit. If a Message serializes to >64KB, the sender will transmit it successfully, but the receiver will reject it with a frame-length error, making debugging non-obvious.

♻️ Suggested improvement

Add a frame size check in the encoder:

 fn encode(&mut self, msg: Message, dst: &mut BytesMut) -> Result<()> {
     let start = dst.len();
     dst.put_u32_le(0);
     bincode::serialize_into(dst.writer(), &msg)?;
     let len = (dst.len() - start - 4) as u32;
+    if len > 64 * 1024 {
+        return Err(Error::Io(std::io::Error::new(
+            std::io::ErrorKind::InvalidData,
+            "message exceeds max frame length",
+        )));
+    }
     dst[start..start + 4].copy_from_slice(&len.to_le_bytes());
     Ok(())
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@magicblock-replicator/src/connection.rs` around lines 19 - 30,
MessageEncoder::encode can produce frames larger than the Receiver's 64KB limit,
causing the remote side to reject them; update the Encoder
(MessageEncoder::encode) to compute the serialized message size (e.g., via
bincode::serialized_size or by serializing into a temporary buffer), compare it
against the same 64 * 1024 frame limit used by the Receiver, and return an
Err(Error) if it exceeds that limit instead of writing an oversize frame; keep
the existing write path when size is within the limit so the existing
dst.put_u32_le / serialize_into logic is unchanged.

@bmuddha bmuddha force-pushed the bmuddha/scheduler/dual-mode branch from 1acdff2 to 6e3a51e Compare March 1, 2026 17:18
@bmuddha bmuddha force-pushed the bmuddha/replication/protocol branch from 23c811a to be09338 Compare March 1, 2026 17:18
@bmuddha bmuddha force-pushed the bmuddha/scheduler/dual-mode branch from 6e3a51e to 936c1c0 Compare March 2, 2026 10:41
@bmuddha bmuddha force-pushed the bmuddha/replication/protocol branch from be09338 to 31292d0 Compare March 2, 2026 10:41
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@magicblock-replicator/src/proto.rs`:
- Around line 83-145: The signatures are created over raw bytes that overlap
across message types and omit the protocol version; update all signing and
verification to domain-separate and bind version by including an explicit
context string and the protocol version in the signed payload. Concretely,
change each signer call (the constructors that call keypair.sign_message in the
implementations for the first message type (the HandshakeRequest-like impl),
HandshakeResponse::new, and FailoverSignal::new) to sign a concatenation like
b"<MSG_TYPE>|v{PROTOCOL_VERSION}|"+payload (use a distinct MSG_TYPE per struct,
e.g., "HandshakeRequest", "HandshakeResponse", "FailoverSignal"), and update the
corresponding verify methods (the verify implementations for those types) to
reconstruct the same domain-separated bytes (including version) before calling
signature.verify; do the same for error branches in
HandshakeResponse::new/verify so both Ok and Err paths use the same
domain-separated scheme. Ensure VersionedTransaction/Transaction::decode is
unaffected but do not sign raw payloads elsewhere.

In `@magicblock-replicator/src/tcp.rs`:
- Around line 15-23: The connect/split helpers currently hard-code Tokio
TcpStream and so cannot support the WebSocket framing required by the objective;
replace or refactor TcpSender/TcpReceiver creation so the transport is
abstracted and supports WebSocket-frame wrapping (type prefix + bincode
payload). Update the public functions connect and split (and any constructors
like Sender::new / Receiver::new) to work against a Transport trait (or separate
WsStream implementation) that implements send/receive with WebSocket framing, or
provide alternate connect_ws/split_ws that wrap a WebSocket stream into
TcpSender/TcpReceiver equivalents using the required frame format; ensure the
new implementation still returns the same TcpSender/TcpReceiver types or
introduces clearly named WsSender/WsReceiver to avoid breaking callers.

ℹ️ Review info

Configuration used: Path: .coderabbit.yaml

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 23c811a and 31292d0.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (8)
  • Cargo.toml
  • magicblock-replicator/Cargo.toml
  • magicblock-replicator/src/connection.rs
  • magicblock-replicator/src/error.rs
  • magicblock-replicator/src/lib.rs
  • magicblock-replicator/src/proto.rs
  • magicblock-replicator/src/tcp.rs
  • magicblock-replicator/src/tests.rs

Comment on lines +83 to +145
pub fn new(start_slot: Slot, keypair: &Keypair) -> Self {
Self {
version: PROTOCOL_VERSION,
start_slot,
identity: keypair.pubkey(),
signature: keypair.sign_message(&start_slot.to_le_bytes()),
}
}

/// Verifies signature matches claimed identity.
pub fn verify(&self) -> bool {
self.signature
.verify(self.identity.as_array(), &self.start_slot.to_le_bytes())
}
}

impl HandshakeResponse {
pub fn new(result: Result<Slot>, keypair: &Keypair) -> Self {
let result = result.map_err(|e| e.to_string());
let signature = match &result {
Ok(slot) => keypair.sign_message(&slot.to_le_bytes()),
Err(err) => keypair.sign_message(err.as_bytes()),
};
Self {
result,
identity: keypair.pubkey(),
signature,
}
}

/// Verifies signature matches server identity.
pub fn verify(&self) -> bool {
match &self.result {
Ok(slot) => self
.signature
.verify(self.identity.as_array(), &slot.to_le_bytes()),
Err(err) => self
.signature
.verify(self.identity.as_array(), err.as_bytes()),
}
}
}

impl Transaction {
/// Deserializes the inner transaction.
pub fn decode(&self) -> bincode::Result<VersionedTransaction> {
bincode::deserialize(&self.payload)
}
}

impl FailoverSignal {
pub fn new(slot: Slot, keypair: &Keypair) -> Self {
Self {
slot,
signature: keypair.sign_message(&slot.to_le_bytes()),
}
}

/// Verifies signal against expected identity.
pub fn verify(&self, identity: Pubkey) -> bool {
self.signature
.verify(identity.as_array(), &self.slot.to_le_bytes())
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Signatures are replayable across message types and do not bind protocol version.

Line 88, Line 103, and Line 137 sign overlapping raw payload shapes (not domain-separated), so signatures can be transplanted between message types that share the same bytes. Also, Line 93-Line 96 does not bind version, so version tampering is not detected by verify().

Suggested hardening
+const TAG_HANDSHAKE_REQ: u8 = 1;
+const TAG_HANDSHAKE_RESP: u8 = 2;
+const TAG_FAILOVER: u8 = 3;
+
+fn preimage(tag: u8, version: u32, body: &[u8]) -> Vec<u8> {
+    let mut out = Vec::with_capacity(1 + 4 + body.len());
+    out.push(tag);
+    out.extend_from_slice(&version.to_le_bytes());
+    out.extend_from_slice(body);
+    out
+}
+
 impl HandshakeRequest {
     pub fn new(start_slot: Slot, keypair: &Keypair) -> Self {
+        let version = PROTOCOL_VERSION;
+        let msg = preimage(TAG_HANDSHAKE_REQ, version, &start_slot.to_le_bytes());
         Self {
-            version: PROTOCOL_VERSION,
+            version,
             start_slot,
             identity: keypair.pubkey(),
-            signature: keypair.sign_message(&start_slot.to_le_bytes()),
+            signature: keypair.sign_message(&msg),
         }
     }
 
     pub fn verify(&self) -> bool {
-        self.signature
-            .verify(self.identity.as_array(), &self.start_slot.to_le_bytes())
+        let msg = preimage(TAG_HANDSHAKE_REQ, self.version, &self.start_slot.to_le_bytes());
+        self.signature.verify(self.identity.as_array(), &msg)
     }
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@magicblock-replicator/src/proto.rs` around lines 83 - 145, The signatures are
created over raw bytes that overlap across message types and omit the protocol
version; update all signing and verification to domain-separate and bind version
by including an explicit context string and the protocol version in the signed
payload. Concretely, change each signer call (the constructors that call
keypair.sign_message in the implementations for the first message type (the
HandshakeRequest-like impl), HandshakeResponse::new, and FailoverSignal::new) to
sign a concatenation like b"<MSG_TYPE>|v{PROTOCOL_VERSION}|"+payload (use a
distinct MSG_TYPE per struct, e.g., "HandshakeRequest", "HandshakeResponse",
"FailoverSignal"), and update the corresponding verify methods (the verify
implementations for those types) to reconstruct the same domain-separated bytes
(including version) before calling signature.verify; do the same for error
branches in HandshakeResponse::new/verify so both Ok and Err paths use the same
domain-separated scheme. Ensure VersionedTransaction/Transaction::decode is
unaffected but do not sign raw payloads elsewhere.

Comment on lines +15 to +23
/// Connects to a primary at `addr`, returning (sender, receiver).
pub async fn connect(addr: SocketAddr) -> io::Result<(TcpSender, TcpReceiver)> {
TcpStream::connect(addr).await.map(split)
}

/// Splits a TCP stream into sender and receiver halves.
pub fn split(stream: TcpStream) -> (TcpSender, TcpReceiver) {
let (rx, tx) = stream.into_split();
(Sender::new(tx), Receiver::new(rx))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Transport API is TCP-only, but the objective specifies WebSocket framing.

The linked objective calls for a WebSocket frame wrapper with a type prefix + bincode payload. This module currently hard-codes TcpStream split/connect helpers, so it won’t interoperate with a WS-based replication transport as specified.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@magicblock-replicator/src/tcp.rs` around lines 15 - 23, The connect/split
helpers currently hard-code Tokio TcpStream and so cannot support the WebSocket
framing required by the objective; replace or refactor TcpSender/TcpReceiver
creation so the transport is abstracted and supports WebSocket-frame wrapping
(type prefix + bincode payload). Update the public functions connect and split
(and any constructors like Sender::new / Receiver::new) to work against a
Transport trait (or separate WsStream implementation) that implements
send/receive with WebSocket framing, or provide alternate connect_ws/split_ws
that wrap a WebSocket stream into TcpSender/TcpReceiver equivalents using the
required frame format; ensure the new implementation still returns the same
TcpSender/TcpReceiver types or introduces clearly named WsSender/WsReceiver to
avoid breaking callers.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant