diff --git a/yarn-project/end-to-end/src/e2e_epochs/epochs_mbps.pipeline.parallel.test.ts b/yarn-project/end-to-end/src/e2e_epochs/epochs_mbps.pipeline.parallel.test.ts index 1a597ca2dbea..76e228065a4d 100644 --- a/yarn-project/end-to-end/src/e2e_epochs/epochs_mbps.pipeline.parallel.test.ts +++ b/yarn-project/end-to-end/src/e2e_epochs/epochs_mbps.pipeline.parallel.test.ts @@ -30,10 +30,10 @@ import { EpochsTestContext } from './epochs_test.js'; jest.setTimeout(1000 * 60 * 20); const NODE_COUNT = 4; -const EXPECTED_BLOCKS_PER_CHECKPOINT = 3; +const EXPECTED_BLOCKS_PER_CHECKPOINT = 8; // Send enough transactions to trigger multiple blocks within a checkpoint assuming 2 txs per block. -const TX_COUNT = 10; +const TX_COUNT = 24; /** * E2E tests for proposer pipelining with Multiple Blocks Per Slot (MBPS). @@ -72,16 +72,18 @@ describe('e2e_epochs/epochs_mbps_pipeline', () => { initialValidators: validators, enableProposerPipelining: true, // <- yehaw mockGossipSubNetwork: true, + mockGossipSubNetworkLatency: 500, // 200 ms delay in message prop - adverse network conditions disableAnvilTestWatcher: true, startProverNode: true, - perBlockAllocationMultiplier: 1, + perBlockAllocationMultiplier: 8, aztecEpochDuration: 4, enforceTimeTable: true, - ethereumSlotDuration: 4, - aztecSlotDuration: 36, + ethereumSlotDuration: 12, + aztecSlotDuration: 72, blockDurationMs: 8000, - l1PublishingTime: 2, - attestationPropagationTime: 0.5, + // maxDABlockGas: 786432, // Set max DA block gas to be the same as the checkpoint + // l1PublishingTime: 2, + // attestationPropagationTime: 1, aztecTargetCommitteeSize: 3, ...setupOpts, pxeOpts: { syncChainTip }, diff --git a/yarn-project/end-to-end/src/fixtures/setup.ts b/yarn-project/end-to-end/src/fixtures/setup.ts index 059c7c295899..ca18d9982c85 100644 --- a/yarn-project/end-to-end/src/fixtures/setup.ts +++ b/yarn-project/end-to-end/src/fixtures/setup.ts @@ -177,6 +177,8 @@ export type SetupOptions = { proverNodeConfig?: Partial; /** Whether to use a mock gossip sub network for p2p clients. */ mockGossipSubNetwork?: boolean; + /** Whether to add simulated latency to the mock gossipsub network (in ms) */ + mockGossipSubNetworkLatency?: number; /** Whether to disable the anvil test watcher (can still be manually started) */ disableAnvilTestWatcher?: boolean; /** Whether to enable anvil automine during deployment of L1 contracts (consider defaulting this to true). */ @@ -464,7 +466,7 @@ export async function setup( let p2pClientDeps: P2PClientDeps | undefined = undefined; if (opts.mockGossipSubNetwork) { - mockGossipSubNetwork = new MockGossipSubNetwork(); + mockGossipSubNetwork = new MockGossipSubNetwork(opts.mockGossipSubNetworkLatency); p2pClientDeps = { p2pServiceFactory: getMockPubSubP2PServiceFactory(mockGossipSubNetwork) }; } diff --git a/yarn-project/ethereum/src/contracts/rollup.test.ts b/yarn-project/ethereum/src/contracts/rollup.test.ts index 637570e7479a..bcc2f57775a6 100644 --- a/yarn-project/ethereum/src/contracts/rollup.test.ts +++ b/yarn-project/ethereum/src/contracts/rollup.test.ts @@ -117,6 +117,27 @@ describe('Rollup', () => { }); }); + describe('makeArchiveOverride', () => { + it('creates state override that correctly sets archive for a checkpoint number', async () => { + const checkpointNumber = CheckpointNumber(5); + const expectedArchive = Fr.random(); + + // Create the override + const stateOverride = rollup.makeArchiveOverride(checkpointNumber, expectedArchive); + + // Test the override using simulateContract to read archiveAt(checkpointNumber) + const { result: overriddenArchive } = await publicClient.simulateContract({ + address: rollupAddress, + abi: RollupAbi as Abi, + functionName: 'archiveAt', + args: [BigInt(checkpointNumber)], + stateOverride, + }); + + expect(Fr.fromString(overriddenArchive as string).equals(expectedArchive)).toBe(true); + }); + }); + describe('getSlashingProposer', () => { it('returns a slashing proposer', async () => { const slashingProposer = await rollup.getSlashingProposer(); diff --git a/yarn-project/p2p/src/msg_validators/attestation_validator/attestation_validator.test.ts b/yarn-project/p2p/src/msg_validators/attestation_validator/attestation_validator.test.ts index f69b53930c38..e64d43e9470a 100644 --- a/yarn-project/p2p/src/msg_validators/attestation_validator/attestation_validator.test.ts +++ b/yarn-project/p2p/src/msg_validators/attestation_validator/attestation_validator.test.ts @@ -74,6 +74,72 @@ describe('CheckpointAttestationValidator', () => { expect(result).toEqual({ result: 'ignore' }); }); + it('accepts attestation for current slot within pipelining grace period', async () => { + // Proposal is for slot 98 (current wallclock slot), but targetSlot is 99 (pipelining) + const header = CheckpointHeader.random({ slotNumber: SlotNumber(98) }); + const mockAttestation = makeCheckpointAttestation({ + header, + attesterSigner: attester, + proposerSigner: proposer, + }); + + epochCache.getTargetAndNextSlot.mockReturnValue({ + targetSlot: SlotNumber(99), + nextSlot: SlotNumber(100), + }); + epochCache.getSlotNow.mockReturnValue(SlotNumber(98)); + epochCache.isProposerPipeliningEnabled.mockReturnValue(true); + epochCache.getL1Constants.mockReturnValue({ + ethereumSlotDuration: 12, + } as any); + + // Within grace period: 1000ms elapsed < 6000ms + epochCache.getEpochAndSlotNow.mockReturnValue({ + epoch: EpochNumber(1), + slot: SlotNumber(98), + ts: 1000n, + nowMs: 1001000n, // 1000ms elapsed + }); + epochCache.isInCommittee.mockResolvedValue(true); + epochCache.getProposerAttesterAddressInSlot.mockResolvedValue(proposer.address); + + const result = await validator.validate(mockAttestation); + expect(result).toEqual({ result: 'accept' }); + }); + + it('rejects attestation for current slot outside pipelining grace period', async () => { + // Proposal is for slot 97 (one behind current wallclock slot 98), targetSlot is 99 (pipelining) + const header = CheckpointHeader.random({ slotNumber: SlotNumber(97) }); + const mockAttestation = makeCheckpointAttestation({ + header, + attesterSigner: attester, + proposerSigner: proposer, + }); + + epochCache.getTargetAndNextSlot.mockReturnValue({ + targetSlot: SlotNumber(99), + nextSlot: SlotNumber(100), + }); + epochCache.getTargetSlot.mockReturnValue(SlotNumber(99)); + epochCache.getSlotNow.mockReturnValue(SlotNumber(98)); + epochCache.isProposerPipeliningEnabled.mockReturnValue(true); + epochCache.getL1Constants.mockReturnValue({ + ethereumSlotDuration: 12, + } as any); + + // Outside grace period AND outside clock tolerance: 7000ms elapsed + epochCache.getEpochAndSlotNow.mockReturnValue({ + epoch: EpochNumber(1), + slot: SlotNumber(99), + ts: 1000n, + nowMs: 1007000n, // 7000ms elapsed + }); + epochCache.isInCommittee.mockResolvedValue(true); + + const result = await validator.validate(mockAttestation); + expect(result).toEqual({ result: 'reject', severity: PeerErrorSeverity.HighToleranceError }); + }); + it('returns high tolerance error if attester is not in committee', async () => { const header = CheckpointHeader.random({ slotNumber: SlotNumber(100) }); const mockAttestation = makeCheckpointAttestation({ diff --git a/yarn-project/p2p/src/msg_validators/attestation_validator/attestation_validator.ts b/yarn-project/p2p/src/msg_validators/attestation_validator/attestation_validator.ts index 0f6fbcab8b94..940dce4e2d60 100644 --- a/yarn-project/p2p/src/msg_validators/attestation_validator/attestation_validator.ts +++ b/yarn-project/p2p/src/msg_validators/attestation_validator/attestation_validator.ts @@ -8,7 +8,7 @@ import { type ValidationResult, } from '@aztec/stdlib/p2p'; -import { isWithinClockTolerance } from '../clock_tolerance.js'; +import { isWithinClockTolerance, isWithinPipeliningGracePeriod } from '../clock_tolerance.js'; export class CheckpointAttestationValidator implements P2PValidator { protected epochCache: EpochCacheInterface; @@ -23,19 +23,23 @@ export class CheckpointAttestationValidator implements P2PValidator { describe('MAXIMUM_GOSSIP_CLOCK_DISPARITY_MS', () => { @@ -204,4 +208,98 @@ describe('clock_tolerance', () => { expect(isWithinClockTolerance(messageSlot, currentSlot, epochCache)).toBe(false); }); }); + + describe('isWithinPipeliningGracePeriod', () => { + let epochCache: ReturnType>; + + beforeEach(() => { + epochCache = mock(); + epochCache.getSlotNow.mockReturnValue(SlotNumber(100)); + epochCache.isProposerPipeliningEnabled.mockReturnValue(true); + }); + + it('returns true when pipelining enabled, message is for current slot, and within grace period', () => { + // Grace period = DEFAULT_P2P_PROPAGATION_TIME * 1000 = 2000ms + epochCache.getEpochAndSlotNow.mockReturnValue({ + epoch: 1 as any, + slot: SlotNumber(100), + ts: 1000n, + nowMs: 1001000n, // 1000ms elapsed, within 2000ms grace period + }); + + expect(isWithinPipeliningGracePeriod(SlotNumber(100), epochCache)).toBe(true); + }); + + it('returns true at exactly 0ms elapsed', () => { + epochCache.getEpochAndSlotNow.mockReturnValue({ + epoch: 1 as any, + slot: SlotNumber(100), + ts: 1000n, + nowMs: 1000000n, // 0ms elapsed + }); + + expect(isWithinPipeliningGracePeriod(SlotNumber(100), epochCache)).toBe(true); + }); + + it('returns false when elapsed time exceeds grace period', () => { + // 3000ms elapsed > 2000ms grace period + epochCache.getEpochAndSlotNow.mockReturnValue({ + epoch: 1 as any, + slot: SlotNumber(100), + ts: 1000n, + nowMs: 1003000n, // 3000ms elapsed + }); + + expect(isWithinPipeliningGracePeriod(SlotNumber(100), epochCache)).toBe(false); + }); + + it('returns false at exactly the grace period boundary', () => { + // 2000ms elapsed = DEFAULT_P2P_PROPAGATION_TIME * 1000 (not strictly less than) + epochCache.getEpochAndSlotNow.mockReturnValue({ + epoch: 1 as any, + slot: SlotNumber(100), + ts: 1000n, + nowMs: 1002000n, // 2000ms elapsed + }); + + expect(isWithinPipeliningGracePeriod(SlotNumber(100), epochCache)).toBe(false); + }); + + it('returns false when pipelining is disabled', () => { + epochCache.isProposerPipeliningEnabled.mockReturnValue(false); + + epochCache.getEpochAndSlotNow.mockReturnValue({ + epoch: 1 as any, + slot: SlotNumber(100), + ts: 1000n, + nowMs: 1001000n, // 1000ms elapsed, within grace period + }); + + expect(isWithinPipeliningGracePeriod(SlotNumber(100), epochCache)).toBe(false); + }); + + it('returns false when message is not for current slot', () => { + epochCache.getEpochAndSlotNow.mockReturnValue({ + epoch: 1 as any, + slot: SlotNumber(100), + ts: 1000n, + nowMs: 1001000n, + }); + + // Message for slot 99, current slot is 100 + expect(isWithinPipeliningGracePeriod(SlotNumber(99), epochCache)).toBe(false); + }); + + it('returns false when message is for a future slot', () => { + epochCache.getEpochAndSlotNow.mockReturnValue({ + epoch: 1 as any, + slot: SlotNumber(100), + ts: 1000n, + nowMs: 1001000n, + }); + + // Message for slot 101, current slot is 100 + expect(isWithinPipeliningGracePeriod(SlotNumber(101), epochCache)).toBe(false); + }); + }); }); diff --git a/yarn-project/p2p/src/msg_validators/clock_tolerance.ts b/yarn-project/p2p/src/msg_validators/clock_tolerance.ts index dc00e9e6ce2b..515fc4906762 100644 --- a/yarn-project/p2p/src/msg_validators/clock_tolerance.ts +++ b/yarn-project/p2p/src/msg_validators/clock_tolerance.ts @@ -1,5 +1,6 @@ import type { EpochCacheInterface } from '@aztec/epoch-cache'; import { SlotNumber } from '@aztec/foundation/branded-types'; +import { DEFAULT_P2P_PROPAGATION_TIME } from '@aztec/stdlib/timetable'; /** * Maximum clock disparity tolerance for P2P message validation (in milliseconds). @@ -50,3 +51,33 @@ export function isWithinClockTolerance( return elapsedMs < MAXIMUM_GOSSIP_CLOCK_DISPARITY_MS; } + +/** + * Checks if a message should be accepted under the pipelining grace period. + * + * When pipelining is enabled, `targetSlot = slotNow + 1`. A proposal built in slot N-1 + * for slot N arrives when validators are in slot N, so their `targetSlot = N+1`. + * This function accepts proposals for the current wallclock slot if we're within the + * first `DEFAULT_P2P_PROPAGATION_TIME` seconds of the slot (the pipelining grace period). + * + * @param messageSlot - The slot number from the received message + * @param epochCache - EpochCache to get timing and pipelining state + * @returns true if pipelining is enabled, the message is for the current slot, and we're within the grace period + */ +export function isWithinPipeliningGracePeriod(messageSlot: SlotNumber, epochCache: EpochCacheInterface): boolean { + if (!epochCache.isProposerPipeliningEnabled()) { + return false; + } + + const currentSlot = epochCache.getSlotNow(); + if (messageSlot !== currentSlot) { + return false; + } + + const { ts: slotStartTs, nowMs } = epochCache.getEpochAndSlotNow(); + const slotStartMs = slotStartTs * 1000n; + const elapsedMs = Number(nowMs - slotStartMs); + const gracePeriodMs = DEFAULT_P2P_PROPAGATION_TIME * 1000; + + return elapsedMs < gracePeriodMs; +} diff --git a/yarn-project/p2p/src/msg_validators/proposal_validator/proposal_validator.test.ts b/yarn-project/p2p/src/msg_validators/proposal_validator/proposal_validator.test.ts index 4210645babbe..9b22116bacb4 100644 --- a/yarn-project/p2p/src/msg_validators/proposal_validator/proposal_validator.test.ts +++ b/yarn-project/p2p/src/msg_validators/proposal_validator/proposal_validator.test.ts @@ -53,6 +53,8 @@ describe('ProposalValidator', () => { nextSlot, }); epochCache.getTargetSlot.mockReturnValue(currentSlot); + epochCache.getSlotNow.mockReturnValue(currentSlot); + epochCache.isProposerPipeliningEnabled.mockReturnValue(false); }); describe.each([ @@ -169,6 +171,63 @@ describe('ProposalValidator', () => { const result = await validator.validate(proposal); expect(result).toEqual({ result: 'accept' }); }); + + it('accepts proposal for current slot within pipelining grace period', async () => { + // Simulate pipelining: targetSlot = 101, but proposal is for slot 100 (current wallclock slot) + epochCache.getTargetAndNextSlot.mockReturnValue({ + targetSlot: SlotNumber(101), + nextSlot: SlotNumber(102), + }); + epochCache.getSlotNow.mockReturnValue(currentSlot); // slot 100 + epochCache.isProposerPipeliningEnabled.mockReturnValue(true); + epochCache.getL1Constants.mockReturnValue({ + ethereumSlotDuration: 12, + } as any); + + // Within grace period: 1000ms elapsed < 6000ms (ethereumSlotDuration/2 = 12000/2) + epochCache.getEpochAndSlotNow.mockReturnValue({ + epoch: EpochNumber(1), + slot: currentSlot, + ts: 1000n, + nowMs: 1001000n, // 1000ms elapsed + }); + + const signer = Secp256k1Signer.random(); + const proposal = await factory(currentSlot, signer); + + epochCache.getProposerAttesterAddressInSlot.mockResolvedValue(signer.address); + const result = await validator.validate(proposal); + expect(result).toEqual({ result: 'accept' }); + }); + + it('rejects proposal for current slot outside pipelining grace period', async () => { + // Simulate pipelining: targetSlot = 101, but proposal is for slot 100 (current wallclock slot) + epochCache.getTargetAndNextSlot.mockReturnValue({ + targetSlot: SlotNumber(101), + nextSlot: SlotNumber(102), + }); + epochCache.getTargetSlot.mockReturnValue(SlotNumber(101)); + epochCache.getSlotNow.mockReturnValue(currentSlot); // slot 100 + epochCache.isProposerPipeliningEnabled.mockReturnValue(true); + epochCache.getL1Constants.mockReturnValue({ + ethereumSlotDuration: 12, + } as any); + + // Outside grace period: 7000ms elapsed > 6000ms (ethereumSlotDuration/2 = 12000/2) + epochCache.getEpochAndSlotNow.mockReturnValue({ + epoch: EpochNumber(1), + slot: currentSlot, + ts: 1000n, + nowMs: 1007000n, // 7000ms elapsed + }); + + const signer = Secp256k1Signer.random(); + const proposal = await factory(currentSlot, signer); + + epochCache.getProposerAttesterAddressInSlot.mockResolvedValue(signer.address); + const result = await validator.validate(proposal); + expect(result).toEqual({ result: 'reject', severity: PeerErrorSeverity.HighToleranceError }); + }); }); describe('validateTxs', () => { diff --git a/yarn-project/p2p/src/msg_validators/proposal_validator/proposal_validator.ts b/yarn-project/p2p/src/msg_validators/proposal_validator/proposal_validator.ts index 0f2c5d47c5bf..563a7ab00aa4 100644 --- a/yarn-project/p2p/src/msg_validators/proposal_validator/proposal_validator.ts +++ b/yarn-project/p2p/src/msg_validators/proposal_validator/proposal_validator.ts @@ -8,7 +8,7 @@ import { type ValidationResult, } from '@aztec/stdlib/p2p'; -import { isWithinClockTolerance } from '../clock_tolerance.js'; +import { isWithinClockTolerance, isWithinPipeliningGracePeriod } from '../clock_tolerance.js'; /** Validates header-level and tx-level fields of block and checkpoint proposals. */ export class ProposalValidator { @@ -31,18 +31,22 @@ export class ProposalValidator { /** Validates header-level fields: slot, signature, and proposer. */ public async validate(proposal: BlockProposal | CheckpointProposalCore): Promise { try { - // Slot check: use target slots since proposals target pipeline slots (slot + 1 when pipelining) + // Slot check: use target slots since proposals target pipeline slots (slot + 1 when pipelining). const { targetSlot, nextSlot } = this.epochCache.getTargetAndNextSlot(); const slotNumber = proposal.slotNumber; if (slotNumber !== targetSlot && slotNumber !== nextSlot) { - // Check if message is for previous slot and within clock tolerance - if (!isWithinClockTolerance(slotNumber, targetSlot, this.epochCache)) { + // When pipelining, accept proposals for the current slot (built in the previous slot) + // if we're within the first ethereumSlotDuration/2 seconds of the slot. + if (isWithinPipeliningGracePeriod(slotNumber, this.epochCache)) { + // Fall through to remaining validation (signature, proposer, etc.) + } else if (!isWithinClockTolerance(slotNumber, targetSlot, this.epochCache)) { this.logger.warn(`Penalizing peer for invalid slot number ${slotNumber}`, { targetSlot, nextSlot }); return { result: 'reject', severity: PeerErrorSeverity.HighToleranceError }; + } else { + this.logger.verbose(`Ignoring proposal for previous slot ${slotNumber} within clock tolerance`); + return { result: 'ignore' }; } - this.logger.verbose(`Ignoring proposal for previous slot ${slotNumber} within clock tolerance`); - return { result: 'ignore' }; } // Signature validity diff --git a/yarn-project/p2p/src/test-helpers/mock-pubsub.ts b/yarn-project/p2p/src/test-helpers/mock-pubsub.ts index cf48654e0aff..d3ed324a6ccf 100644 --- a/yarn-project/p2p/src/test-helpers/mock-pubsub.ts +++ b/yarn-project/p2p/src/test-helpers/mock-pubsub.ts @@ -1,5 +1,6 @@ import type { EpochCacheInterface } from '@aztec/epoch-cache'; import { type Logger, createLogger } from '@aztec/foundation/log'; +import { sleep } from '@aztec/foundation/sleep'; import type { AztecAsyncKVStore } from '@aztec/kv-store'; import type { L2BlockSource } from '@aztec/stdlib/block'; import type { ContractDataSource } from '@aztec/stdlib/contract'; @@ -138,6 +139,11 @@ class MockReqResp implements ReqRespInterface { const responses: InstanceType[] = []; const peers = this.network.getReqRespPeers().filter(p => !p.peerId.equals(this.peerId)); const targetPeers = pinnedPeer ? peers.filter(p => p.peerId.equals(pinnedPeer)) : peers; + const delayMs = this.network.getPropagationDelayMs(); + + if (delayMs > 0) { + await sleep(delayMs); + } for (const request of requests) { const requestBuffer = request.toBuffer(); @@ -174,7 +180,12 @@ class MockReqResp implements ReqRespInterface { return { status: ReqRespStatus.SUCCESS, data: Buffer.from([]) }; } try { + const delayMs = this.network.getPropagationDelayMs(); + if (delayMs > 0) { + await sleep(delayMs); + } const data = await handler(this.peerId, payload); + return { status: ReqRespStatus.SUCCESS, data }; } catch { return { status: ReqRespStatus.FAILURE }; @@ -242,10 +253,10 @@ class MockGossipSubService extends TypedEventEmitter implements score: (_peerId: PeerIdStr) => 0, }; - publish(topic: TopicStr, data: Uint8Array, _opts?: PublishOpts): Promise { + async publish(topic: TopicStr, data: Uint8Array, _opts?: PublishOpts): Promise { this.logger.debug(`Publishing message on topic ${topic}`, { topic, sender: this.peerId.toString() }); - this.network.publishToPeers(topic, data, this.peerId); - return Promise.resolve({ recipients: this.network.getPeers().filter(peer => !this.peerId.equals(peer)) }); + await this.network.publishToPeers(topic, data, this.peerId); + return { recipients: this.network.getPeers().filter(peer => !this.peerId.equals(peer)) }; } receive(msg: GossipsubMessage) { @@ -281,7 +292,8 @@ class MockGossipSubService extends TypedEventEmitter implements /** * Mock gossip sub network used for testing. - * All instances of MockGossipSubService connected to the same network will instantly receive the same messages. + * All instances of MockGossipSubService connected to the same network receive the same messages, + * optionally delayed by a configurable propagation time. */ export class MockGossipSubNetwork { private peers: MockGossipSubService[] = []; @@ -290,6 +302,15 @@ export class MockGossipSubNetwork { private logger = createLogger('p2p:test:mock-gossipsub-network'); + constructor( + /** Artificial propagation delay in milliseconds applied to each message delivery. */ + private propagationDelayMs: number = 0, + ) {} + + public getPropagationDelayMs(): number { + return this.propagationDelayMs; + } + public getPeers(): PeerId[] { return this.peers.map(peer => peer.peerId); } @@ -306,7 +327,7 @@ export class MockGossipSubNetwork { return this.reqRespPeers; } - public publishToPeers(topic: TopicStr, data: Uint8Array, sender: PeerId): void { + public async publishToPeers(topic: TopicStr, data: Uint8Array, sender: PeerId): Promise { const msgId = (this.nextMsgId++).toString(); this.logger.debug(`Network is distributing message on topic ${topic}`, { topic, @@ -315,6 +336,10 @@ export class MockGossipSubNetwork { msgId, }); + if (this.propagationDelayMs > 0) { + await sleep(this.propagationDelayMs); + } + const gossipSubMsg: GossipsubMessage = { msgId, msg: { type: 'unsigned', topic, data }, propagationSource: sender }; for (const peer of this.peers) { if (peer.subscribedTopics.has(topic)) { diff --git a/yarn-project/sequencer-client/src/publisher/sequencer-publisher.ts b/yarn-project/sequencer-client/src/publisher/sequencer-publisher.ts index 732a4e046c1d..69701581c7b8 100644 --- a/yarn-project/sequencer-client/src/publisher/sequencer-publisher.ts +++ b/yarn-project/sequencer-client/src/publisher/sequencer-publisher.ts @@ -123,6 +123,13 @@ export type InvalidateCheckpointRequest = { lastArchive: Fr; }; +/** Options for overriding L1 state during propose simulation (used when pipelining). */ +type ProposeSimulationOverrides = { + forcePendingCheckpointNumber?: CheckpointNumber; + forcePendingArchive?: { checkpointNumber: CheckpointNumber; archive: Fr }; + forcePendingFeeHeader?: { checkpointNumber: CheckpointNumber; feeHeader: FeeHeader }; +}; + interface RequestWithExpiry { action: Action; request: L1TxRequest; @@ -880,10 +887,7 @@ export class SequencerPublisher { checkpoint: Checkpoint, attestationsAndSigners: CommitteeAttestationsAndSigners, attestationsAndSignersSignature: Signature, - options: { - forcePendingCheckpointNumber?: CheckpointNumber; - forcePendingFeeHeader?: { checkpointNumber: CheckpointNumber; feeHeader: FeeHeader }; - }, + options: ProposeSimulationOverrides, ): Promise { // Anchor the simulation timestamp to the checkpoint's own slot start time // rather than the current L1 block timestamp, which may overshoot into the next slot if the build ran late. @@ -1209,11 +1213,7 @@ export class SequencerPublisher { checkpoint: Checkpoint, attestationsAndSigners: CommitteeAttestationsAndSigners, attestationsAndSignersSignature: Signature, - opts: { - txTimeoutAt?: Date; - forcePendingCheckpointNumber?: CheckpointNumber; - forcePendingFeeHeader?: { checkpointNumber: CheckpointNumber; feeHeader: FeeHeader }; - } = {}, + opts: ProposeSimulationOverrides & { txTimeoutAt?: Date } = {}, ): Promise { const checkpointHeader = checkpoint.header; @@ -1397,11 +1397,7 @@ export class SequencerPublisher { this.l1TxUtils.restart(); } - private async prepareProposeTx( - encodedData: L1ProcessArgs, - timestamp: bigint, - options: { forcePendingCheckpointNumber?: CheckpointNumber }, - ) { + private async prepareProposeTx(encodedData: L1ProcessArgs, timestamp: bigint, options: ProposeSimulationOverrides) { const kzg = Blob.getViemKzgInstance(); const blobInput = getPrefixedEthBlobCommitments(encodedData.blobs); this.log.debug('Validating blob input', { blobInput }); @@ -1498,10 +1494,7 @@ export class SequencerPublisher { `0x${string}`, ], timestamp: bigint, - options: { - forcePendingCheckpointNumber?: CheckpointNumber; - forcePendingFeeHeader?: { checkpointNumber: CheckpointNumber; feeHeader: FeeHeader }; - }, + options: ProposeSimulationOverrides, ) { const rollupData = encodeFunctionData({ abi: RollupAbi, @@ -1526,12 +1519,23 @@ export class SequencerPublisher { : [] ).flatMap(override => override.stateDiff ?? []); + // override the archive for a specific checkpoint number if requested (used when pipelining) + const forcePendingArchiveStateDiff = ( + options.forcePendingArchive !== undefined + ? this.rollupContract.makeArchiveOverride( + options.forcePendingArchive.checkpointNumber, + options.forcePendingArchive.archive, + ) + : [] + ).flatMap(override => override.stateDiff ?? []); + const stateOverrides: StateOverride = [ { address: this.rollupContract.address, // @note we override checkBlob to false since blobs are not part simulate() stateDiff: [ { slot: toPaddedHex(RollupContract.checkBlobStorageSlot, true), value: toPaddedHex(0n, true) }, + ...forcePendingArchiveStateDiff, ...forcePendingCheckpointNumberStateDiff, ...forcePendingFeeHeaderStateDiff, ], @@ -1601,11 +1605,7 @@ export class SequencerPublisher { private async addProposeTx( checkpoint: Checkpoint, encodedData: L1ProcessArgs, - opts: { - txTimeoutAt?: Date; - forcePendingCheckpointNumber?: CheckpointNumber; - forcePendingFeeHeader?: { checkpointNumber: CheckpointNumber; feeHeader: FeeHeader }; - } = {}, + opts: ProposeSimulationOverrides & { txTimeoutAt?: Date } = {}, timestamp: bigint, preCheck?: () => Promise, ): Promise { diff --git a/yarn-project/sequencer-client/src/sequencer/checkpoint_proposal_job.test.ts b/yarn-project/sequencer-client/src/sequencer/checkpoint_proposal_job.test.ts index 8a23b8bbfb26..4c0f10bb7522 100644 --- a/yarn-project/sequencer-client/src/sequencer/checkpoint_proposal_job.test.ts +++ b/yarn-project/sequencer-client/src/sequencer/checkpoint_proposal_job.test.ts @@ -301,7 +301,7 @@ describe('CheckpointProposalJob', () => { validatorClient.collectAttestations.mockResolvedValue(getAttestations(block)); - const checkpoint = await job.execute(); + const checkpoint = await job.executeAndAwait(); expect(checkpoint).toBeDefined(); expect(checkpointBuilder.buildBlockCalls).toHaveLength(1); @@ -315,7 +315,7 @@ describe('CheckpointProposalJob', () => { job.updateConfig({ minTxsPerBlock: 2 }); - const checkpoint = await job.execute(); + const checkpoint = await job.executeAndAwait(); expect(checkpoint).toBeUndefined(); expect(checkpointBuilder.buildBlockCalls).toHaveLength(0); @@ -332,7 +332,7 @@ describe('CheckpointProposalJob', () => { job.updateConfig({ buildCheckpointIfEmpty: true, minTxsPerBlock: 1 }); - const checkpoint = await job.execute(); + const checkpoint = await job.executeAndAwait(); expect(checkpoint).toBeDefined(); expect(checkpointBuilder.buildBlockCalls).toHaveLength(1); @@ -353,7 +353,7 @@ describe('CheckpointProposalJob', () => { validatorClient.collectAttestations.mockResolvedValue(getAttestations(block)); - await job.execute(); + await job.executeAndAwait(); expect(validatorClient.collectAttestations).toHaveBeenCalledTimes(1); expect(validatorClient.collectAttestations).toHaveBeenCalledWith( @@ -388,7 +388,7 @@ describe('CheckpointProposalJob', () => { checkpointBuilder.seedBlocks([block], [txs]); validatorClient.collectAttestations.mockResolvedValue(getAttestations(block)); - await job.execute(); + await job.executeAndAwait(); // Verify startCheckpoint was called with the out hashes from previous checkpoints expect(checkpointsBuilder.startCheckpointCalls).toHaveLength(1); @@ -429,7 +429,7 @@ describe('CheckpointProposalJob', () => { checkpointBuilder.seedBlocks([block], [txs]); validatorClient.collectAttestations.mockResolvedValue(getAttestations(block)); - await job.execute(); + await job.executeAndAwait(); // Verify only the checkpoint before the current one is included expect(checkpointsBuilder.startCheckpointCalls).toHaveLength(1); @@ -615,7 +615,7 @@ describe('CheckpointProposalJob', () => { // Install spy on waitUntilTimeInSlot to verify it's called with expected deadlines const waitSpy = jest.spyOn(job, 'waitUntilTimeInSlot'); - const checkpoint = await job.execute(); + const checkpoint = await job.executeAndAwait(); expect(checkpoint).toBeDefined(); expect(checkpointBuilder.buildBlockCalls).toHaveLength(2); @@ -645,7 +645,7 @@ describe('CheckpointProposalJob', () => { const waitSpy = jest.spyOn(job, 'waitUntilTimeInSlot'); job.updateConfig({ minTxsPerBlock: 0 }); - const checkpoint = await job.execute(); + const checkpoint = await job.executeAndAwait(); expect(checkpoint).toBeDefined(); expect(checkpointBuilder.buildBlockCalls).toHaveLength(1); @@ -674,7 +674,7 @@ describe('CheckpointProposalJob', () => { const waitSpy = jest.spyOn(job, 'waitUntilTimeInSlot'); job.updateConfig({ minTxsPerBlock: 5, buildCheckpointIfEmpty: true }); - const checkpoint = await job.execute(); + const checkpoint = await job.executeAndAwait(); expect(checkpoint).toBeDefined(); expect(checkpointBuilder.buildBlockCalls).toHaveLength(1); @@ -702,7 +702,7 @@ describe('CheckpointProposalJob', () => { const waitSpy = jest.spyOn(job, 'waitUntilTimeInSlot'); job.updateConfig({ minTxsPerBlock: 5, buildCheckpointIfEmpty: false }); - const checkpoint = await job.execute(); + const checkpoint = await job.executeAndAwait(); expect(checkpoint).toBeUndefined(); expect(checkpointBuilder.buildBlockCalls).toHaveLength(0); @@ -735,7 +735,7 @@ describe('CheckpointProposalJob', () => { // Install spy on waitUntilTimeInSlot const waitSpy = jest.spyOn(job, 'waitUntilTimeInSlot'); - const checkpoint = await job.execute(); + const checkpoint = await job.executeAndAwait(); expect(checkpoint).toBeDefined(); // Only one block built due to time constraints @@ -764,7 +764,7 @@ describe('CheckpointProposalJob', () => { const waitSpy = jest.spyOn(job, 'waitUntilTimeInSlot'); - await job.execute(); + await job.executeAndAwait(); // With 3 blocks where the 3rd is the last, waitUntilTimeInSlot should be called twice // (after block 1 and block 2, but not after block 3 since it's the last) @@ -793,7 +793,7 @@ describe('CheckpointProposalJob', () => { const waitSpy = jest.spyOn(job, 'waitUntilTimeInSlot'); - const checkpoint = await job.execute(); + const checkpoint = await job.executeAndAwait(); expect(checkpoint).toBeDefined(); expect(checkpointBuilder.buildBlockCalls).toHaveLength(1); @@ -861,7 +861,7 @@ describe('CheckpointProposalJob', () => { p2p.getPendingTxCount.mockResolvedValue(txs.length); p2p.iterateEligiblePendingTxs.mockImplementation(() => mockTxIterator(Promise.resolve(txs))); - const checkpoint = await job.execute(); + const checkpoint = await job.executeAndAwait(); // Should return undefined when no time available expect(checkpoint).toBeUndefined(); @@ -879,7 +879,7 @@ describe('CheckpointProposalJob', () => { validatorClient.collectAttestations.mockResolvedValue(getAttestations(block)); - const checkpoint = await job.execute(); + const checkpoint = await job.executeAndAwait(); expect(checkpoint).toBeDefined(); expect(checkpointBuilder.buildBlockCalls).toHaveLength(1); @@ -902,7 +902,7 @@ describe('CheckpointProposalJob', () => { validatorClient.collectAttestations.mockResolvedValue(getAttestations(block)); - const checkpoint = await job.execute(); + const checkpoint = await job.executeAndAwait(); // Should still complete if first block succeeds expect(checkpoint).toBeDefined(); @@ -920,7 +920,7 @@ describe('CheckpointProposalJob', () => { checkpointBuilder.errorOnBuild = new Error('Block build failed'); // The job catches the error internally and returns undefined - const checkpoint = await job.execute(); + const checkpoint = await job.executeAndAwait(); expect(checkpoint).toBeUndefined(); }); @@ -931,9 +931,10 @@ describe('CheckpointProposalJob', () => { // Mock collectAttestations to fail with timeout validatorClient.collectAttestations.mockRejectedValue(new AttestationTimeoutError(0, 3, SlotNumber.ZERO)); - const checkpoint = await job.execute(); + // Checkpoint is returned after broadcast — attestation failure happens in the background + const checkpoint = await job.executeAndAwait(); - expect(checkpoint).toBeUndefined(); + expect(checkpoint).toBeDefined(); expect(validatorClient.collectAttestations).toHaveBeenCalled(); }); @@ -965,7 +966,7 @@ describe('CheckpointProposalJob', () => { const { txs, block } = await setupTxsAndBlock(p2p, globalVariables, 1, chainId); checkpointBuilder.seedBlocks([block], [txs]); - const checkpoint = await job.execute(); + const checkpoint = await job.executeAndAwait(); // Should complete even with empty committee expect(checkpoint).toBeDefined(); @@ -980,7 +981,7 @@ describe('CheckpointProposalJob', () => { const attestations = getAttestations(block); validatorClient.collectAttestations.mockResolvedValue(attestations); - const checkpoint = await job.execute(); + const checkpoint = await job.executeAndAwait(); expect(checkpoint).toBeDefined(); expect(validatorClient.collectAttestations).toHaveBeenCalled(); @@ -992,9 +993,9 @@ describe('CheckpointProposalJob', () => { validatorClient.collectAttestations.mockRejectedValue(new TimeoutError('Attestation collection timed out')); - await job.execute(); + await job.executeAndAwait(); - // Should handle timeout gracefully + // Should handle timeout gracefully (in background pipeline) expect(validatorClient.collectAttestations).toHaveBeenCalled(); }); }); @@ -1029,7 +1030,7 @@ describe('CheckpointProposalJob', () => { throw new DutyAlreadySignedError(SlotNumber(1), DutyType.BLOCK_PROPOSAL, 0, 'node-2'); }); - const result = await job.execute(); + const result = await job.executeAndAwait(); // Should return undefined and stop building expect(result).toBeUndefined(); @@ -1070,7 +1071,7 @@ describe('CheckpointProposalJob', () => { throw new SlashingProtectionError(SlotNumber(1), DutyType.BLOCK_PROPOSAL, 0, 'hash1', 'hash2', 'node-1'); }); - const result = await job.execute(); + const result = await job.executeAndAwait(); // Should return undefined and stop building expect(result).toBeUndefined(); @@ -1091,6 +1092,13 @@ class TestCheckpointProposalJob extends CheckpointProposalJob { return Promise.resolve(); } + /** Wraps execute + awaitPendingSubmission so tests see the full pipeline complete. */ + public async executeAndAwait(): Promise { + const result = await this.execute(); + await this.awaitPendingSubmission(); + return result; + } + /** Update config for testing - allows tests to modify config after job creation */ public updateConfig(partialConfig: Partial): void { this.config = { ...this.config, ...partialConfig }; diff --git a/yarn-project/sequencer-client/src/sequencer/checkpoint_proposal_job.timing.test.ts b/yarn-project/sequencer-client/src/sequencer/checkpoint_proposal_job.timing.test.ts index 0b660e9bcba3..367608f081ad 100644 --- a/yarn-project/sequencer-client/src/sequencer/checkpoint_proposal_job.timing.test.ts +++ b/yarn-project/sequencer-client/src/sequencer/checkpoint_proposal_job.timing.test.ts @@ -889,6 +889,7 @@ describe('CheckpointProposalJob Timing Tests', () => { job.setTimetable(timetable); await job.execute(); + await job.awaitPendingSubmission(); // Verify collectAttestations was called expect(validatorClient.collectAttestations).toHaveBeenCalled(); @@ -927,6 +928,7 @@ describe('CheckpointProposalJob Timing Tests', () => { job.setTimetable(timetable); await job.execute(); + await job.awaitPendingSubmission(); // Attestation collection should start after the last block is built and checkpoint is assembled // Last block deadline at 17s (sub-slot 2), plus assembly time @@ -956,6 +958,7 @@ describe('CheckpointProposalJob Timing Tests', () => { job.setTimetable(timetable); await job.execute(); + await job.awaitPendingSubmission(); // Deadline should still be absolute (slotStart + 60s), not relative to start time // Uses PUBLISHING_CHECKPOINT state: slotDuration - l1PublishingTime = 72 - 12 = 60 @@ -966,4 +969,135 @@ describe('CheckpointProposalJob Timing Tests', () => { expect(actualDeadlineSeconds).toBeCloseTo(expectedDeadlineSeconds, 0); }); }); + + describe('Pipelining Attestation Timing', () => { + const targetSlot = SlotNumber(2); // Target slot is one ahead of build slot + + /** Create a pipelining-aware job where targetSlot = slotNumber + 1 */ + function createPipeliningJob(): TimingTestCheckpointProposalJob { + const pipeliningTimetable = new SequencerTimetable( + { + ethereumSlotDuration: ETHEREUM_SLOT_DURATION, + aztecSlotDuration: AZTEC_SLOT_DURATION, + l1PublishingTime: L1_PUBLISHING_TIME, + p2pPropagationTime: P2P_PROPAGATION_TIME, + blockDurationMs: BLOCK_DURATION * 1000, + enforce: true, + pipelining: true, + }, + undefined, + createLogger('test:timetable:pipelining'), + ); + + const setStateFn = jest.fn(); + const eventEmitter = new EventEmitter() as TypedEventEmitter; + + const job = new TimingTestCheckpointProposalJob( + dateProvider, + getSecondsIntoSlot, + slotNumber, + targetSlot, + epoch, + checkpointNumber, + BlockNumber.ZERO, + proposer, + publisher, + attestorAddress, + undefined, // invalidateCheckpoint + validatorClient, + globalVariableBuilder, + p2p, + worldState, + l1ToL2MessageSource, + l2BlockSource, + checkpointsBuilder as unknown as FullNodeCheckpointsBuilder, + blockSink, + l1Constants, + config, + pipeliningTimetable, + slasherClient, + epochCache, + dateProvider, + metrics, + eventEmitter, + setStateFn, + getTelemetryClient().getTracer('timing-test-pipelining'), + { actor: 'timing-test-pipelining' }, + ); + + return job; + } + + beforeEach(() => { + epochCache.isProposerPipeliningEnabled.mockReturnValue(true); + }); + + it('sets attestation deadline to buildSlotStart + slotDuration + gracePeriod when pipelining', async () => { + const { blocks, txs } = await createTestBlocksAndTxs(2); + mockP2pWithTxs(txs); + checkpointBuilder.seedBlocks( + blocks, + blocks.map((_, i) => [txs[i]]), + ); + checkpointBuilder.setExecutionDurations([5, 5]); + + let collectAttestationsDeadline: Date | undefined; + validatorClient.collectAttestations.mockImplementation((_proposal, _required, deadline) => { + collectAttestationsDeadline = deadline; + return Promise.resolve(getAttestations(blocks[1])); + }); + + setTimeInSlot(1); + + const job = createPipeliningJob(); + await job.execute(); + await job.awaitPendingSubmission(); + + expect(validatorClient.collectAttestations).toHaveBeenCalled(); + expect(collectAttestationsDeadline).toBeDefined(); + + // Attestation deadline = buildSlotStart + aztecSlotDuration + gracePeriod + // gracePeriod = blockDuration + p2pPropagation (re-execution + attestation return) + const buildSlotStart = getSlotStartTime(slotNumber); + const gracePeriod = BLOCK_DURATION + P2P_PROPAGATION_TIME; + const expectedDeadlineSeconds = buildSlotStart + AZTEC_SLOT_DURATION + gracePeriod; + const actualDeadlineSeconds = collectAttestationsDeadline!.getTime() / 1000; + + expect(actualDeadlineSeconds).toBeCloseTo(expectedDeadlineSeconds, 0); + }); + + it('non-pipelining attestation deadline is unchanged', async () => { + epochCache.isProposerPipeliningEnabled.mockReturnValue(false); + + const { blocks, txs } = await createTestBlocksAndTxs(2); + mockP2pWithTxs(txs); + checkpointBuilder.seedBlocks( + blocks, + blocks.map((_, i) => [txs[i]]), + ); + checkpointBuilder.setExecutionDurations([5, 5]); + + let collectAttestationsDeadline: Date | undefined; + validatorClient.collectAttestations.mockImplementation((_proposal, _required, deadline) => { + collectAttestationsDeadline = deadline; + return Promise.resolve(getAttestations(blocks[1])); + }); + + setTimeInSlot(1); + + const job = createJob(); + job.setTimetable(timetable); + await job.execute(); + await job.awaitPendingSubmission(); + + expect(collectAttestationsDeadline).toBeDefined(); + + // Non-pipelining: deadline = buildSlotStart + slotDuration - l1PublishingTime + const slotStart = getSlotStartTime(slotNumber); + const expectedDeadlineSeconds = slotStart + AZTEC_SLOT_DURATION - L1_PUBLISHING_TIME; + const actualDeadlineSeconds = collectAttestationsDeadline!.getTime() / 1000; + + expect(actualDeadlineSeconds).toBeCloseTo(expectedDeadlineSeconds, 0); + }); + }); }); diff --git a/yarn-project/sequencer-client/src/sequencer/checkpoint_proposal_job.ts b/yarn-project/sequencer-client/src/sequencer/checkpoint_proposal_job.ts index b9815640a054..6800c24b1899 100644 --- a/yarn-project/sequencer-client/src/sequencer/checkpoint_proposal_job.ts +++ b/yarn-project/sequencer-client/src/sequencer/checkpoint_proposal_job.ts @@ -68,7 +68,14 @@ import { SequencerState } from './utils.js'; /** How much time to sleep while waiting for min transactions to accumulate for a block */ const TXS_POLLING_MS = 500; -/** Result from proposeCheckpoint when a checkpoint was successfully built and attested. */ +/** Result from proposeCheckpoint when a checkpoint was successfully built and broadcast. */ +type CheckpointProposalBroadcast = { + checkpoint: Checkpoint; + proposal: CheckpointProposal; + blockProposedAt: number; +}; + +/** Result after attestation collection and signing, ready for L1 submission. */ type CheckpointProposalResult = { checkpoint: Checkpoint; attestations: CommitteeAttestationsAndSigners; @@ -136,8 +143,9 @@ export class CheckpointProposalJob implements Traceable { /** * Executes the checkpoint proposal job. - * Builds blocks, collects attestations, enqueues requests, and schedules L1 submission as a - * background task so the work loop can return to IDLE immediately. + * Builds blocks, assembles checkpoint, and broadcasts the proposal (blocking). + * Attestation collection, signing, and L1 submission are backgrounded so the + * work loop can return to IDLE immediately for consecutive slot proposals. * Returns the built checkpoint if successful, undefined otherwise. */ @trackSpan('CheckpointProposalJob.execute') @@ -157,71 +165,99 @@ export class CheckpointProposalJob implements Traceable { this.log, ).enqueueVotes(); - // Build and propose the checkpoint. Builds blocks, broadcasts, collects attestations, and signs. - // Does NOT enqueue to L1 yet — that happens after the pipeline sleep. - const proposalResult = await this.proposeCheckpoint(); - const checkpoint = proposalResult?.checkpoint; + // Build blocks, assemble checkpoint, and broadcast proposal (BLOCKING). + // Returns after broadcast — attestation collection is deferred. + const broadcast = await this.proposeCheckpoint(); - // Wait until the voting promises have resolved, so all requests are enqueued (not sent) - await Promise.all(votesPromises); - - if (checkpoint) { - this.metrics.recordCheckpointProposalSuccess(); + if (!broadcast) { + await Promise.all(votesPromises); + // Still submit votes even without a checkpoint + if (!this.config.fishermanMode) { + this.pendingL1Submission = this.publisher.sendRequestsAt(new Date(this.dateProvider.now())).then(() => {}); + } + return undefined; } + const { checkpoint } = broadcast; + this.metrics.recordCheckpointProposalSuccess(); + // Do not post anything to L1 if we are fishermen, but do perform L1 fee analysis if (this.config.fishermanMode) { await this.handleCheckpointEndAsFisherman(checkpoint); - return; + return checkpoint; } - // Enqueue the checkpoint for L1 submission - if (proposalResult) { + // Background the attestation → signing → L1 pipeline so the work loop is unblocked + this.pendingL1Submission = this.waitForAttestationsAndEnqueueSubmissionAsync(broadcast, votesPromises); + + // Return the built checkpoint immediately — the work loop is now unblocked + return checkpoint; + } + + /** + * Background pipeline: collects attestations, signs them, enqueues the checkpoint, and submits to L1. + * Runs as a fire-and-forget task stored in `pendingL1Submission` so the work loop is unblocked. + */ + private async waitForAttestationsAndEnqueueSubmissionAsync( + broadcast: CheckpointProposalBroadcast, + votesPromises: Promise[], + ): Promise { + const { checkpoint, proposal, blockProposedAt } = broadcast; + try { + await Promise.all(votesPromises); + + this.setStateFn(SequencerState.COLLECTING_ATTESTATIONS, this.targetSlot); + const attestations = await this.waitForAttestations(proposal); + + this.metrics.recordCheckpointAttestationDelay(this.dateProvider.now() - blockProposedAt); + + // Proposer must sign over the attestations before pushing them to L1 + const signer = this.proposer ?? this.publisher.getSenderAddress(); + let attestationsSignature: Signature; try { - await this.enqueueCheckpointForSubmission(proposalResult); + attestationsSignature = await this.validatorClient.signAttestationsAndSigners( + attestations, + signer, + this.targetSlot, + this.checkpointNumber, + ); } catch (err) { - this.log.error(`Failed to enqueue checkpoint for L1 submission at slot ${this.targetSlot}`, err); - // Continue to sendRequestsAt so votes are still sent + if (this.handleHASigningError(err, 'Attestations signature')) { + return; + } + throw err; } - } - // Compute the earliest time to submit: pipeline slot start when pipelining, now otherwise. - const submitAfter = this.epochCache.isProposerPipeliningEnabled() - ? new Date(Number(getTimestampForSlot(this.targetSlot, this.l1Constants)) * 1000) - : new Date(this.dateProvider.now()); - - // Schedule L1 submission in the background so the work loop returns immediately. - // The publisher will sleep until submitAfter, then send the bundled requests. - // The promise is stored so it can be awaited during shutdown. - this.pendingL1Submission = this.publisher - .sendRequestsAt(submitAfter) - .then(async l1Response => { - const proposedAction = l1Response?.successfulActions.find(a => a === 'propose'); - if (proposedAction) { - this.eventEmitter.emit('checkpoint-published', { checkpoint: this.checkpointNumber, slot: this.targetSlot }); - const coinbase = checkpoint?.header.coinbase; - await this.metrics.incFilledSlot(this.publisher.getSenderAddress().toString(), coinbase); - } else if (checkpoint) { - this.eventEmitter.emit('checkpoint-publish-failed', { ...l1Response, slot: this.targetSlot }); - - if (this.epochCache.isProposerPipeliningEnabled()) { - this.metrics.recordPipelineDiscard(); - } + // Enqueue the checkpoint for L1 submission + await this.enqueueCheckpointForSubmission({ checkpoint, attestations, attestationsSignature }); + + // Compute the earliest time to submit: pipeline slot start when pipelining, now otherwise. + const submitAfter = this.epochCache.isProposerPipeliningEnabled() + ? new Date(Number(getTimestampForSlot(this.targetSlot, this.l1Constants)) * 1000) + : new Date(this.dateProvider.now()); + + const l1Response = await this.publisher.sendRequestsAt(submitAfter); + const proposedAction = l1Response?.successfulActions.find(a => a === 'propose'); + if (proposedAction) { + this.eventEmitter.emit('checkpoint-published', { checkpoint: this.checkpointNumber, slot: this.targetSlot }); + const coinbase = checkpoint.header.coinbase; + await this.metrics.incFilledSlot(this.publisher.getSenderAddress().toString(), coinbase); + } else { + this.eventEmitter.emit('checkpoint-publish-failed', { ...l1Response, slot: this.targetSlot }); + if (this.epochCache.isProposerPipeliningEnabled()) { + this.metrics.recordPipelineDiscard(); } - }) - .catch(err => { - this.log.error(`Background L1 submission failed for slot ${this.targetSlot}`, err); - if (checkpoint) { - this.eventEmitter.emit('checkpoint-publish-failed', { slot: this.targetSlot }); - - if (this.epochCache.isProposerPipeliningEnabled()) { - this.metrics.recordPipelineDiscard(); - } - } - }); - - // Return the built checkpoint immediately — the work loop is now unblocked - return checkpoint; + } + } catch (err) { + if (err instanceof SequencerInterruptedError) { + return; + } + this.log.error(`Background attestation/L1 pipeline failed for slot ${this.targetSlot}`, err); + this.eventEmitter.emit('checkpoint-publish-failed', { slot: this.targetSlot }); + if (this.epochCache.isProposerPipeliningEnabled()) { + this.metrics.recordPipelineDiscard(); + } + } } /** Enqueues the checkpoint for L1 submission. Called after pipeline sleep in execute(). */ @@ -247,10 +283,16 @@ export class CheckpointProposalJob implements Traceable { } } + const isPipelining = this.epochCache.isProposerPipeliningEnabled(); + const parentCheckpointNumber = CheckpointNumber(this.checkpointNumber - 1); + await this.publisher.enqueueProposeCheckpoint(checkpoint, attestations, attestationsSignature, { txTimeoutAt, forcePendingCheckpointNumber: this.invalidateCheckpoint?.forcePendingCheckpointNumber, forcePendingFeeHeader: this.computedForcePendingFeeHeader, + forcePendingArchive: isPipelining + ? { checkpointNumber: parentCheckpointNumber, archive: checkpoint.header.lastArchiveRoot } + : undefined, }); } @@ -261,7 +303,7 @@ export class CheckpointProposalJob implements Traceable { [Attributes.SLOT_NUMBER]: this.targetSlot, }; }) - private async proposeCheckpoint(): Promise { + private async proposeCheckpoint(): Promise { try { // Get operator configured coinbase and fee recipient for this attestor const coinbase = this.validatorClient.getCoinbaseForAttestor(this.attestorAddress); @@ -421,7 +463,7 @@ export class CheckpointProposalJob implements Traceable { Number(checkpoint.header.totalManaUsed.toBigInt()), ); - // Do not collect attestations nor publish to L1 in fisherman mode + // In fisherman mode, return the checkpoint without broadcasting or collecting attestations if (this.config.fishermanMode) { this.log.info( `Built checkpoint for slot ${this.targetSlot} with ${blocksInCheckpoint.length} blocks. ` + @@ -433,11 +475,8 @@ export class CheckpointProposalJob implements Traceable { }, ); this.metrics.recordCheckpointSuccess(); - return { - checkpoint, - attestations: CommitteeAttestationsAndSigners.empty(), - attestationsSignature: Signature.empty(), - }; + // Return a broadcast result with a dummy proposal — fisherman mode skips attestation collection + return { checkpoint, proposal: undefined!, blockProposedAt: this.dateProvider.now() }; } // Include the block pending broadcast in the checkpoint proposal if any @@ -460,33 +499,8 @@ export class CheckpointProposalJob implements Traceable { const blockProposedAt = this.dateProvider.now(); await this.p2pClient.broadcastCheckpointProposal(proposal); - this.setStateFn(SequencerState.COLLECTING_ATTESTATIONS, this.targetSlot); - const attestations = await this.waitForAttestations(proposal); - const blockAttestedAt = this.dateProvider.now(); - - this.metrics.recordCheckpointAttestationDelay(blockAttestedAt - blockProposedAt); - - // Proposer must sign over the attestations before pushing them to L1 - const signer = this.proposer ?? this.publisher.getSenderAddress(); - let attestationsSignature: Signature; - try { - attestationsSignature = await this.validatorClient.signAttestationsAndSigners( - attestations, - signer, - this.targetSlot, - this.checkpointNumber, - ); - } catch (err) { - // We shouldn't really get here since we yield to another HA node - // as soon as we see these errors when creating block or checkpoint proposals. - if (this.handleHASigningError(err, 'Attestations signature')) { - return undefined; - } - throw err; - } - - // Return the result for the caller to enqueue after the pipeline sleep - return { checkpoint, attestations, attestationsSignature }; + // Return immediately after broadcast — attestation collection happens in the background + return { checkpoint, proposal, blockProposedAt }; } catch (err) { if (err && (err instanceof DutyAlreadySignedError || err instanceof SlashingProtectionError)) { // swallow this error. It's already been logged by a function deeper in the stack diff --git a/yarn-project/sequencer-client/src/sequencer/sequencer.test.ts b/yarn-project/sequencer-client/src/sequencer/sequencer.test.ts index ec930edf9fb2..57ae645d462d 100644 --- a/yarn-project/sequencer-client/src/sequencer/sequencer.test.ts +++ b/yarn-project/sequencer-client/src/sequencer/sequencer.test.ts @@ -373,6 +373,7 @@ describe('sequencer', () => { it('builds a block out of a single tx', async () => { await setupSingleTxBlock(); await sequencer.work(); + await sequencer.awaitLastProposalSubmission(); expectPublisherProposeL2Block(); }); @@ -417,6 +418,7 @@ describe('sequencer', () => { }); await sequencer.work(); + await sequencer.awaitLastProposalSubmission(); // Now we should build and publish the checkpoint expect(checkpointBuilder.buildBlockCalls.length).toBeGreaterThan(0); expectPublisherProposeL2Block(); @@ -439,6 +441,7 @@ describe('sequencer', () => { block = await makeBlock(txs); await sequencer.work(); + await sequencer.awaitLastProposalSubmission(); expect(checkpointBuilder.buildBlockCalls.length).toBeGreaterThan(0); expectPublisherProposeL2Block(); @@ -455,6 +458,7 @@ describe('sequencer', () => { // Archiver reports synced to slot 0, which satisfies syncedL2Slot + 1 >= slot (slot=1) l2BlockSource.getSyncedL2SlotNumber.mockResolvedValue(SlotNumber(0)); await sequencer.work(); + await sequencer.awaitLastProposalSubmission(); expect(publisher.enqueueProposeCheckpoint).toHaveBeenCalled(); }); @@ -475,10 +479,9 @@ describe('sequencer', () => { publisher.enqueueProposeCheckpoint.mockRejectedValueOnce(new Error('Failed to enqueue propose checkpoint')); + // The error is caught in the background attestation/L1 pipeline and does not surface as an unhandled rejection await sequencer.work(); - - // We still call sendRequestsAt in case there are votes enqueued - expect(publisher.sendRequestsAt).toHaveBeenCalled(); + await sequencer.awaitLastProposalSubmission(); }); it('should proceed with block proposal when there is no proposer yet', async () => { @@ -497,6 +500,7 @@ describe('sequencer', () => { block = await makeBlock(txs); await sequencer.work(); + await sequencer.awaitLastProposalSubmission(); // Verify that the sequencer attempted to create and broadcast a block proposal expect(publisher.enqueueProposeCheckpoint).toHaveBeenCalled(); @@ -579,6 +583,7 @@ describe('sequencer', () => { block = await makeBlock([tx]); TestUtils.mockPendingTxs(p2p, [tx]); await sequencer.work(); + await sequencer.awaitLastProposalSubmission(); const attestationsAndSigners = new CommitteeAttestationsAndSigners(getSignatures()); expect(publishers[i].enqueueProposeCheckpoint).toHaveBeenCalledTimes(1); @@ -929,6 +934,7 @@ describe('sequencer', () => { await setupSingleTxBlock(); await sequencer.work(); + await sequencer.awaitLastProposalSubmission(); // Verify checkpoint was built and proposed expect(checkpointBuilder.buildBlockCalls.length).toBeGreaterThan(0); @@ -942,6 +948,7 @@ describe('sequencer', () => { await setupSingleTxBlock(); await sequencer.work(); + await sequencer.awaitLastProposalSubmission(); // Verify checkpoint was built and proposed expect(checkpointBuilder.buildBlockCalls).toHaveLength(1); @@ -958,6 +965,7 @@ describe('sequencer', () => { TestUtils.mockPendingTxs(p2p, txs); await sequencer.work(); + await sequencer.awaitLastProposalSubmission(); expect(checkpointBuilder.buildBlockCalls.length).toBeGreaterThan(1); expect(validatorClient.createCheckpointProposal).toHaveBeenCalled(); @@ -1213,6 +1221,10 @@ class TestSequencer extends Sequencer { return super.work(); } + public async awaitLastProposalSubmission() { + await this.lastCheckpointProposalJob?.awaitPendingSubmission(); + } + public checkCanProposeForTest(slot: SlotNumber) { return this.checkCanPropose(slot); } diff --git a/yarn-project/sequencer-client/src/sequencer/sequencer.ts b/yarn-project/sequencer-client/src/sequencer/sequencer.ts index a7c59bf1e04e..0ed3f4b50602 100644 --- a/yarn-project/sequencer-client/src/sequencer/sequencer.ts +++ b/yarn-project/sequencer-client/src/sequencer/sequencer.ts @@ -73,7 +73,7 @@ export class Sequencer extends (EventEmitter as new () => TypedEventEmitter { expect(withPipelining.maxNumberOfBlocks).toBeGreaterThan(withoutPipelining.maxNumberOfBlocks); }); - it('uses entire slot minus init and re-execution for block building', () => { + it('reserves time for assembly and one-way broadcast at end of slot', () => { const tt = new SequencerTimetable({ ethereumSlotDuration: ETHEREUM_SLOT_DURATION, aztecSlotDuration: AZTEC_SLOT_DURATION, @@ -468,8 +468,9 @@ describe('sequencer-timetable', () => { }); const blockDuration = BLOCK_DURATION_MS / 1000; - // Reserves one blockDuration for validator re-execution, but no finalization time - const availableTime = AZTEC_SLOT_DURATION - tt.initializationOffset - blockDuration; + // Reserves assembleTime + p2pPropagation (one-way broadcast) at end + const timeReservedAtEnd = tt.checkpointAssembleTime + tt.p2pPropagationTime; + const availableTime = AZTEC_SLOT_DURATION - tt.initializationOffset - timeReservedAtEnd; expect(tt.maxNumberOfBlocks).toBe(Math.floor(availableTime / blockDuration)); }); @@ -501,8 +502,63 @@ describe('sequencer-timetable', () => { }); // With pipelining and test config (ethereumSlotDuration < 8): - // init=0.5, reExec=8, available = 36 - 0.5 - 8 = 27.5, floor(27.5/8) = 3 - expect(tt.maxNumberOfBlocks).toBe(3); + // init=0.5, reservedAtEnd = 0.5 + 0 = 0.5, available = 36 - 0.5 - 0.5 = 35, floor(35/8) = 4 + expect(tt.maxNumberOfBlocks).toBe(4); + }); + + it('sets pipeliningAttestationGracePeriod to blockDuration + p2pPropagationTime', () => { + const tt = new SequencerTimetable({ + ethereumSlotDuration: ETHEREUM_SLOT_DURATION, + aztecSlotDuration: AZTEC_SLOT_DURATION, + l1PublishingTime: L1_PUBLISHING_TIME, + blockDurationMs: BLOCK_DURATION_MS, + enforce: ENFORCE_TIMETABLE, + pipelining: true, + }); + + expect(tt.pipeliningAttestationGracePeriod).toBe(tt.blockDuration! + tt.p2pPropagationTime); + }); + + it('returns aztecSlotDuration + gracePeriod for PUBLISHING_CHECKPOINT when pipelining', () => { + const tt = new SequencerTimetable({ + ethereumSlotDuration: ETHEREUM_SLOT_DURATION, + aztecSlotDuration: AZTEC_SLOT_DURATION, + l1PublishingTime: L1_PUBLISHING_TIME, + blockDurationMs: BLOCK_DURATION_MS, + enforce: ENFORCE_TIMETABLE, + pipelining: true, + }); + + expect(tt.getMaxAllowedTime(SequencerState.PUBLISHING_CHECKPOINT)).toBe( + AZTEC_SLOT_DURATION + tt.pipeliningAttestationGracePeriod, + ); + expect(tt.getMaxAllowedTime(SequencerState.COLLECTING_ATTESTATIONS)).toBe( + AZTEC_SLOT_DURATION + tt.pipeliningAttestationGracePeriod, + ); + }); + + it('ensures enough time from last block deadline to grace period end for assembly + round-trip + re-execution', () => { + const P2P_PROPAGATION_TIME = 2; + const BLOCK_DURATION = BLOCK_DURATION_MS / 1000; + + const tt = new SequencerTimetable({ + ethereumSlotDuration: ETHEREUM_SLOT_DURATION, + aztecSlotDuration: AZTEC_SLOT_DURATION, + l1PublishingTime: L1_PUBLISHING_TIME, + p2pPropagationTime: P2P_PROPAGATION_TIME, + blockDurationMs: BLOCK_DURATION_MS, + enforce: ENFORCE_TIMETABLE, + pipelining: true, + }); + + // Time from last block deadline to end of grace period into next slot + const lastBlockDeadline = tt.initializationOffset + tt.maxNumberOfBlocks * BLOCK_DURATION; + const remainingInBuildSlot = AZTEC_SLOT_DURATION - lastBlockDeadline; + const totalTimeAvailable = remainingInBuildSlot + tt.pipeliningAttestationGracePeriod; + + // Must be enough for: assembly + round-trip p2p + re-execution + const requiredTime = tt.checkpointAssembleTime + 2 * P2P_PROPAGATION_TIME + BLOCK_DURATION; + expect(totalTimeAvailable).toBeGreaterThanOrEqual(requiredTime); }); it('produces more blocks with production config where finalization time is large', () => { diff --git a/yarn-project/sequencer-client/src/sequencer/timetable.ts b/yarn-project/sequencer-client/src/sequencer/timetable.ts index 98373bf284da..0f5d8fe8cdae 100644 --- a/yarn-project/sequencer-client/src/sequencer/timetable.ts +++ b/yarn-project/sequencer-client/src/sequencer/timetable.ts @@ -73,6 +73,12 @@ export class SequencerTimetable { /** Whether pipelining is enabled (checkpoint finalization deferred to next slot). */ public readonly pipelining: boolean; + /** + * How far into the target slot attestation collection can extend when pipelining. + * Covers validator re-execution (one block duration) plus one-way attestation return. + */ + public readonly pipeliningAttestationGracePeriod: number; + constructor( opts: { ethereumSlotDuration: number; @@ -117,15 +123,19 @@ export class SequencerTimetable { this.p2pPropagationTime * 2 + // Round-trip propagation this.l1PublishingTime; // L1 publishing + // Grace period for attestation collection into the target slot when pipelining + this.pipeliningAttestationGracePeriod = (this.blockDuration ?? 0) + this.p2pPropagationTime; + // Calculate maximum number of blocks that fit in this slot if (!this.blockDuration) { this.maxNumberOfBlocks = 1; // Single block per slot } else { - // When pipelining, finalization is deferred to the next slot, but we still need - // a sub-slot for validator re-execution so they can produce attestations. - let timeReservedAtEnd = this.blockDuration; // Validatior re-execution only - if (!this.pipelining) { - timeReservedAtEnd += this.checkpointFinalizationTime; + let timeReservedAtEnd: number; + if (this.pipelining) { + // Proposal must reach validators within build slot: assembly + one-way broadcast + timeReservedAtEnd = this.checkpointAssembleTime + this.p2pPropagationTime; + } else { + timeReservedAtEnd = this.blockDuration + this.checkpointFinalizationTime; } const timeAvailableForBlocks = this.aztecSlotDuration - this.initializationOffset - timeReservedAtEnd; @@ -155,6 +165,7 @@ export class SequencerTimetable { initializeDeadline: this.initializeDeadline, enforce: this.enforce, pipelining: this.pipelining, + pipeliningAttestationGracePeriod: this.pipeliningAttestationGracePeriod, minWorkToDo, blockDuration: this.blockDuration, maxNumberOfBlocks: this.maxNumberOfBlocks, @@ -191,8 +202,16 @@ export class SequencerTimetable { return this.initializeDeadline + this.checkpointInitializationTime; case SequencerState.ASSEMBLING_CHECKPOINT: case SequencerState.COLLECTING_ATTESTATIONS: + if (this.pipelining) { + // Assembly + attestation collection extend into target slot + return this.aztecSlotDuration + this.pipeliningAttestationGracePeriod; + } return this.aztecSlotDuration - this.l1PublishingTime - 2 * this.p2pPropagationTime; case SequencerState.PUBLISHING_CHECKPOINT: + if (this.pipelining) { + // L1 submission happens in target slot, after attestations + return this.aztecSlotDuration + this.pipeliningAttestationGracePeriod; + } return this.aztecSlotDuration - this.l1PublishingTime; default: { const _exhaustiveCheck: never = state; diff --git a/yarn-project/stdlib/src/timetable/index.ts b/yarn-project/stdlib/src/timetable/index.ts index eb76fb63f72a..053b616e103d 100644 --- a/yarn-project/stdlib/src/timetable/index.ts +++ b/yarn-project/stdlib/src/timetable/index.ts @@ -57,11 +57,14 @@ export function calculateMaxBlocksPerSlot( // Calculate checkpoint finalization time (assembly + round-trip propagation + L1 publishing) const checkpointFinalizationTime = assembleTime + p2pTime * 2 + l1Time; - // When pipelining, finalization is deferred to the next slot, but we still reserve - // a sub-slot for validator re-execution so they can produce attestations. - let timeReservedAtEnd = blockDurationSec; - if (!opts.pipelining) { - timeReservedAtEnd += checkpointFinalizationTime; + // When pipelining, finalization is deferred to the next slot, so we only reserve + // time for assembly + one-way broadcast. Without pipelining, we also need a full + // block duration for validator re-execution plus full checkpoint finalization. + let timeReservedAtEnd: number; + if (opts.pipelining) { + timeReservedAtEnd = assembleTime + p2pTime; + } else { + timeReservedAtEnd = blockDurationSec + checkpointFinalizationTime; } // Time available for building blocks diff --git a/yarn-project/txe/src/state_machine/mock_epoch_cache.ts b/yarn-project/txe/src/state_machine/mock_epoch_cache.ts index cfca8a29605b..616f9744b218 100644 --- a/yarn-project/txe/src/state_machine/mock_epoch_cache.ts +++ b/yarn-project/txe/src/state_machine/mock_epoch_cache.ts @@ -33,10 +33,6 @@ export class MockEpochCache implements EpochCacheInterface { return EpochNumber.ZERO; } - pipeliningOffset(): number { - return 0; - } - getEpochAndSlotNow(): EpochAndSlot & { nowMs: bigint } { return { epoch: EpochNumber.ZERO, @@ -63,6 +59,10 @@ export class MockEpochCache implements EpochCacheInterface { return false; } + pipeliningOffset(): number { + return 0; + } + getProposerIndexEncoding(_epoch: EpochNumber, _slot: SlotNumber, _seed: bigint): `0x${string}` { return '0x00'; }