Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@

from pydantic import model_validator

from lean_spec.subspecs.chain.config import SECONDS_PER_SLOT
from lean_spec.subspecs.chain.config import (
INTERVALS_PER_SLOT,
MILLISECONDS_PER_INTERVAL,
)
from lean_spec.subspecs.containers.attestation import (
Attestation,
AttestationData,
Expand Down Expand Up @@ -239,8 +242,13 @@ def make_fixture(self) -> Self:
# Time advancement may trigger slot boundaries.
# At slot boundaries, pending attestations may become active.
# Always act as aggregator to ensure gossip signatures are aggregated
#
# TickStep.time is a Unix timestamp in seconds.
# Convert to intervals since genesis for the store.
delta_ms = (Uint64(step.time) - store.config.genesis_time) * Uint64(1000)
target_interval = delta_ms // MILLISECONDS_PER_INTERVAL
store, _ = store.on_tick(
Uint64(step.time), has_proposal=False, is_aggregator=True
target_interval, has_proposal=False, is_aggregator=True
)

case BlockStep():
Expand Down Expand Up @@ -268,9 +276,10 @@ def make_fixture(self) -> Self:
# Store rejects blocks from the future.
# This tick includes a block (has proposal).
# Always act as aggregator to ensure gossip signatures are aggregated
slot_duration_seconds = block.slot * SECONDS_PER_SLOT
block_time = store.config.genesis_time + slot_duration_seconds
store, _ = store.on_tick(block_time, has_proposal=True, is_aggregator=True)
target_interval = block.slot * INTERVALS_PER_SLOT
store, _ = store.on_tick(
target_interval, has_proposal=True, is_aggregator=True
)

# Process the block through Store.
# This validates, applies state transition, and updates head.
Expand Down
96 changes: 64 additions & 32 deletions src/lean_spec/subspecs/chain/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,12 @@
import logging
from dataclasses import dataclass, field

from lean_spec.subspecs.chain.config import INTERVALS_PER_SLOT
from lean_spec.subspecs.containers.attestation.attestation import (
SignedAggregatedAttestation,
)
from lean_spec.subspecs.sync import SyncService
from lean_spec.types import Uint64

from .clock import Interval, SlotClock

Expand Down Expand Up @@ -112,49 +117,79 @@ async def run(self) -> None:
if total_interval <= last_handled_total_interval:
continue

# Get current wall-clock time as Unix timestamp (may have changed after sleep).
#
# The store expects an absolute timestamp, not intervals.
# It internally converts to intervals.
current_time = self.clock.current_time()

# Tick the store forward to current time.
# Tick the store forward to current interval.
#
# The store advances time interval by interval, performing
# appropriate actions at each interval.
#
# This minimal service does not produce blocks.
# Block production requires validator keys.
new_store, new_aggregated_attestations = self.sync_service.store.on_tick(
time=current_time,
has_proposal=False,
is_aggregator=self.sync_service.is_aggregator,
)
new_aggregated_attestations = await self._tick_to(total_interval)

# Update sync service's store reference.
#
# SyncService owns the authoritative store. After ticking,
# we update its reference so gossip block processing sees
# the updated time.
self.sync_service.store = new_store

# Publish any new aggregated attestations produced this tick
# Publish any new aggregated attestations produced this tick.
if new_aggregated_attestations:
for agg in new_aggregated_attestations:
await self.sync_service.publish_aggregated_attestation(agg)

logger.info(
"Tick: slot=%d interval=%d time=%d head=%s finalized=slot%d",
"Tick: slot=%d interval=%d head=%s finalized=slot%d",
self.clock.current_slot(),
self.clock.total_intervals(),
current_time,
new_store.head.hex(),
new_store.latest_finalized.slot,
total_interval,
self.sync_service.store.head.hex(),
self.sync_service.store.latest_finalized.slot,
)

# Mark this interval as handled.
last_handled_total_interval = total_interval

async def _tick_to(self, target_interval: Interval) -> list[SignedAggregatedAttestation]:
"""
Advance store to target interval with skip and yield.

When the node falls behind by more than one slot, stale intervals
are skipped. Processing every missed interval synchronously would
block the event loop, starving gossip and causing the node to fall
further behind.

Between each remaining interval tick, yields to the event loop so
gossip messages can be processed.

Updates ``self.sync_service.store`` in place after each tick so
concurrent gossip handlers see current time.

Returns aggregated attestations produced during the ticks.
"""
store = self.sync_service.store
all_new_aggregates: list[SignedAggregatedAttestation] = []

# Skip stale intervals when falling behind.
#
# Jump to the last full slot boundary before the target.
# The final slot's worth of intervals still runs normally so that
# aggregation, safe target, and attestation acceptance happen.
gap = target_interval - store.time
if gap > INTERVALS_PER_SLOT:
skip_to = Uint64(target_interval - INTERVALS_PER_SLOT)
store = store.model_copy(update={"time": skip_to})
self.sync_service.store = store

# Tick remaining intervals one at a time.
while store.time < target_interval:
store, new_aggregates = store.tick_interval(
has_proposal=False,
is_aggregator=self.sync_service.is_aggregator,
)
all_new_aggregates.extend(new_aggregates)
self.sync_service.store = store

# Yield to the event loop so gossip handlers can run.
# Re-read store afterward: a gossip handler may have added
# blocks or attestations during the yield.
await asyncio.sleep(0)
store = self.sync_service.store

return all_new_aggregates

async def _initial_tick(self) -> Interval | None:
"""
Perform initial tick to catch up store time to current wall clock.
Expand All @@ -168,18 +203,15 @@ async def _initial_tick(self) -> Interval | None:

# Only tick if we're past genesis.
if current_time >= self.clock.genesis_time:
new_store, _ = self.sync_service.store.on_tick(
time=current_time,
has_proposal=False,
is_aggregator=self.sync_service.is_aggregator,
)
self.sync_service.store = new_store
target_interval = self.clock.total_intervals()

# Use _tick_to for skip + yield during catch-up.
# Discard aggregated attestations from catch-up.
# During initial sync we may be many slots behind.
# Publishing stale aggregations would spam the network.
await self._tick_to(target_interval)

return self.clock.total_intervals()
return target_interval

return None

Expand Down
125 changes: 91 additions & 34 deletions src/lean_spec/subspecs/forkchoice/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
ATTESTATION_COMMITTEE_COUNT,
INTERVALS_PER_SLOT,
JUSTIFICATION_LOOKBACK_SLOTS,
MILLISECONDS_PER_INTERVAL,
SECONDS_PER_SLOT,
)
from lean_spec.subspecs.containers import (
Attestation,
Expand Down Expand Up @@ -905,41 +903,106 @@ def accept_new_attestations(self) -> "Store":

def update_safe_target(self) -> "Store":
"""
Update the safe target for attestations.
Compute the deepest block that has 2/3+ supermajority attestation weight.

Computes target that has sufficient (2/3+ majority) attestation support.
The safe target represents a block with enough attestation weight to be
considered "safe" for validators to attest to.
The safe target is the furthest-from-genesis block where enough validators
agree. Validators use it to decide which block is safe to attest to.
Only blocks meeting the supermajority threshold qualify.

Algorithm
---------
1. Get validator count from head state
2. Calculate 2/3 majority threshold (ceiling division)
3. Run fork choice with minimum score requirement
4. Return new Store with updated safe_target
This runs at interval 3 of the slot cycle:

- Interval 0: Block proposal
- Interval 1: Validators cast attestation votes
- Interval 2: Aggregators create proofs, broadcast via gossip
- Interval 3: Safe target update (HERE)
- Interval 4: New attestations migrate to "known" pool

Because interval 4 has not yet run, attestations live in two pools:

- "new": freshly received from gossipsub aggregation this slot
- "known": from block attestations and previously accepted gossip

Both pools must be merged to get the full attestation picture.
Using only one pool undercounts support. See inline comments for
concrete scenarios where this matters.

Note: the Ream reference implementation uses only the "new" pool.
Our merge approach is more conservative. It ensures the safe target
reflects every attestation the node knows about.

Returns:
New Store with updated safe_target.
"""
# Get validator count from head state
# Look up the post-state of the current head block.
#
# The validator registry in this state tells us how many active
# validators exist. We need that count to compute the threshold.
head_state = self.states[self.head]
num_validators = len(head_state.validators)

# Calculate 2/3 majority threshold (ceiling division)
# Compute the 2/3 supermajority threshold.
#
# A block needs at least this many attestation votes to be "safe".
# The ceiling division (negation trick) ensures we round UP.
# For example, 100 validators => threshold is 67, not 66.
min_target_score = -(-num_validators * 2 // 3)

# Extract attestations from new aggregated payloads
attestations = self.extract_attestations_from_aggregated_payloads(
self.latest_new_aggregated_payloads
# Merge both attestation pools into a single unified view.
#
# Why merge? At interval 3, the migration step (interval 4) has not
# run yet. Attestations can enter the "known" pool through paths that
# bypass gossipsub entirely:
#
# 1. Proposer's own attestation: the block proposer bundles their
# attestation directly in the block body. When the block is
# processed, this attestation lands in "known" immediately.
# It never appears in "new" because it was never gossipped.
#
# 2. Self-attestation: a node's own gossip attestation does not
# loop back through gossipsub to itself. The node records it
# locally in "known" without going through the "new" pipeline.
#
# Without this merge, those attestations would be invisible to the
# safe target calculation, causing it to undercount support.
#
# The technique: start with a shallow copy of "known", then overlay
# every entry from "new" on top. When both pools contain proofs for
# the same signature key, concatenate the proof lists.
all_payloads: dict[SignatureKey, list[AggregatedSignatureProof]] = dict(
self.latest_known_aggregated_payloads
)
for sig_key, proofs in self.latest_new_aggregated_payloads.items():
if sig_key in all_payloads:
# Both pools have proofs for this key. Combine them.
all_payloads[sig_key] = [*all_payloads[sig_key], *proofs]
else:
# Only "new" has proofs for this key. Add them directly.
all_payloads[sig_key] = proofs

# Find head with minimum attestation threshold.
# Convert the merged aggregated payloads into per-validator votes.
#
# Each proof encodes which validators participated.
# This step unpacks those bitfields into a flat mapping of validator -> vote.
attestations = self.extract_attestations_from_aggregated_payloads(all_payloads)
Comment on lines +971 to +986
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@unnawut @kamilsa please let me know what you think about this fix? If you think this is ok or not?


# Run LMD GHOST with the supermajority threshold.
#
# The walk starts from the latest justified checkpoint and descends
# through the block tree. At each fork, only children with at least
# `min_target_score` attestation weight are considered. The result
# is the deepest block that clears the 2/3 bar.
#
# If no child meets the threshold at some fork, the walk stops
# early. The safe target is then shallower than the actual head.
safe_target = self._compute_lmd_ghost_head(
start_root=self.latest_justified.root,
attestations=attestations,
min_score=min_target_score,
)

# Return a new Store with only the safe target updated.
#
# The head and attestation pools remain unchanged.
return self.model_copy(update={"safe_target": safe_target})

def aggregate_committee_signatures(self) -> tuple["Store", list[SignedAggregatedAttestation]]:
Expand Down Expand Up @@ -1076,34 +1139,31 @@ def tick_interval(
return store, new_aggregates

def on_tick(
self, time: Uint64, has_proposal: bool, is_aggregator: bool = False
self, target_interval: Uint64, has_proposal: bool, is_aggregator: bool = False
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@MegaRedHand Let me know if this solves the problem you had in mind?

) -> tuple["Store", list[SignedAggregatedAttestation]]:
"""
Advance forkchoice store time to given timestamp.
Advance forkchoice store time to given interval count.

Ticks store forward interval by interval, performing appropriate
actions for each interval type. This method handles time progression
incrementally to ensure all interval-specific actions are performed.

Args:
time: Target time as Unix timestamp in seconds.
target_interval: Target time as intervals since genesis.
has_proposal: Whether node has proposal for current slot.
is_aggregator: Whether the node is an aggregator.

Returns:
Tuple of (new store with time advanced,
list of all produced signed aggregated attestation).
"""
# Calculate target time in intervals
time_delta_ms = (time - self.config.genesis_time) * Uint64(1000)
tick_interval_time = time_delta_ms // MILLISECONDS_PER_INTERVAL

# Tick forward one interval at a time
store = self
all_new_aggregates: list[SignedAggregatedAttestation] = []
while store.time < tick_interval_time:

# Tick forward one interval at a time
while store.time < target_interval:
# Check if proposal should be signaled for next interval
should_signal_proposal = has_proposal and (store.time + Uint64(1)) == tick_interval_time
should_signal_proposal = has_proposal and (store.time + Uint64(1)) == target_interval

# Advance by one interval with appropriate signaling
store, new_aggregates = store.tick_interval(should_signal_proposal, is_aggregator)
Expand Down Expand Up @@ -1132,12 +1192,9 @@ def get_proposal_head(self, slot: Slot) -> tuple["Store", Bytes32]:
Returns:
Tuple of (new Store with updated time, head root for building).
"""
# Calculate time corresponding to this slot
slot_duration_seconds = slot * SECONDS_PER_SLOT
slot_time = self.config.genesis_time + slot_duration_seconds

# Advance time to current slot (ticking intervals)
store, _ = self.on_tick(slot_time, True)
# Advance time to this slot's first interval
target_interval = Uint64(slot * INTERVALS_PER_SLOT)
store, _ = self.on_tick(target_interval, True)

# Process any pending attestations before proposal
store = store.accept_new_attestations()
Expand Down
10 changes: 7 additions & 3 deletions src/lean_spec/subspecs/networking/client/event_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -699,11 +699,15 @@ async def _forward_gossipsub_events(self) -> None:
break
if isinstance(event, GossipsubMessageEvent):
# Decode the message and emit appropriate event.
await self._handle_gossipsub_message(event)
#
# Catch per-message exceptions to prevent one bad message
# from killing the entire forwarding loop.
try:
await self._handle_gossipsub_message(event)
except Exception as e:
logger.warning("Error handling gossipsub message: %s", e)
except asyncio.CancelledError:
pass
except Exception as e:
logger.warning("Error forwarding gossipsub events: %s", e)

async def _handle_gossipsub_message(self, event: GossipsubMessageEvent) -> None:
"""
Expand Down
Loading
Loading