From 17fcc4894558cb320f0b684f7d754b1e73727701 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Mon, 9 Feb 2026 15:01:30 +0100 Subject: [PATCH 01/13] feat(sequencer): catchup from base --- block/internal/syncing/syncer.go | 44 +- pkg/sequencers/single/sequencer.go | 143 ++++- pkg/sequencers/single/sequencer_test.go | 683 ++++++++++++++++++++++++ 3 files changed, 862 insertions(+), 8 deletions(-) diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index 6df6b2c22e..4923fe4992 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -755,9 +755,49 @@ func (s *Syncer) TrySyncNextBlock(ctx context.Context, event *common.DAHeightEve // Update DA height if needed // This height is only updated when a height is processed from DA as P2P - // events do not contain DA height information + // events do not contain DA height information. + // + // When a sequencer restarts after extended downtime, it produces "catch-up" + // blocks containing forced inclusion transactions from missed DA epochs and + // submits them to DA at the current (much higher) DA height. This creates a + // gap between the state's DAHeight (tracking forced inclusion epoch progress) + // and event.DaHeight (the DA submission height). + // + // If we jump state.DAHeight directly to event.DaHeight, subsequent calls to + // VerifyForcedInclusionTxs would check the wrong epoch (the submission epoch + // instead of the next forced-inclusion epoch), causing valid catch-up blocks + // to be incorrectly flagged as malicious. + // + // To handle this, when the gap exceeds one DA epoch, we advance DAHeight by + // exactly one epoch per block. This lets the forced inclusion verifier check + // the correct epoch for each catch-up block. Once the sequencer finishes + // catching up and the gap closes, DAHeight converges to event.DaHeight. if event.DaHeight > newState.DAHeight { - newState.DAHeight = event.DaHeight + epochSize := s.genesis.DAEpochForcedInclusion + gap := event.DaHeight - newState.DAHeight + + if epochSize > 0 && gap > epochSize { + // Large gap detected — likely catch-up blocks from a restarted sequencer. + // Advance DAHeight by one epoch to keep forced inclusion verification + // aligned with the epoch the sequencer is replaying. + _, epochEnd, _ := types.CalculateEpochBoundaries( + newState.DAHeight, s.genesis.DAStartHeight, epochSize, + ) + nextEpochStart := epochEnd + 1 + if nextEpochStart > event.DaHeight { + // Shouldn't happen, but clamp to event.DaHeight as a safety net. + nextEpochStart = event.DaHeight + } + s.logger.Debug(). + Uint64("current_da_height", newState.DAHeight). + Uint64("event_da_height", event.DaHeight). + Uint64("advancing_to", nextEpochStart). + Uint64("gap", gap). + Msg("large DA height gap detected (sequencer catch-up), advancing DA height by one epoch") + newState.DAHeight = nextEpochStart + } else { + newState.DAHeight = event.DaHeight + } } batch, err := s.store.NewBatch(ctx) diff --git a/pkg/sequencers/single/sequencer.go b/pkg/sequencers/single/sequencer.go index 228bde2791..f11a238837 100644 --- a/pkg/sequencers/single/sequencer.go +++ b/pkg/sequencers/single/sequencer.go @@ -51,6 +51,16 @@ type Sequencer struct { // Cached forced inclusion transactions from the current epoch cachedForcedInclusionTxs [][]byte + + // Catch-up state: when the sequencer restarts after being down for more than + // one DA epoch, it must replay missed epochs (producing blocks with only forced + // inclusion transactions, no mempool) before resuming normal sequencing. + // This ensures the sequencer produces the same blocks that nodes running in + // base sequencing mode would have produced during the downtime. + catchingUp bool + // currentDAEndTime is the DA epoch end timestamp from the last fetched epoch. + // Used as the block timestamp during catch-up to match based sequencing behavior. + currentDAEndTime time.Time } // NewSequencer creates a new Single Sequencer @@ -168,6 +178,13 @@ func (c *Sequencer) SubmitBatchTxs(ctx context.Context, req coresequencer.Submit // GetNextBatch implements sequencing.Sequencer. // It gets the next batch of transactions and fetch for forced included transactions. +// +// During catch-up mode (after sequencer downtime spanning one or more DA epochs), +// only forced inclusion transactions are returned — no mempool transactions. This +// ensures the sequencer produces blocks identical to what nodes running in base +// sequencing mode would have produced during the downtime. Once the sequencer has +// processed all missed DA epochs and reaches the DA head, it exits catch-up mode +// and resumes normal operation with both forced inclusion and mempool transactions. func (c *Sequencer) GetNextBatch(ctx context.Context, req coresequencer.GetNextBatchRequest) (*coresequencer.GetNextBatchResponse, error) { if !c.isValid(req.Id) { return nil, ErrInvalidId @@ -208,10 +225,22 @@ func (c *Sequencer) GetNextBatch(ctx context.Context, req coresequencer.GetNextB forcedTxs = c.cachedForcedInclusionTxs[c.checkpoint.TxIndex:] } - // Get mempool transactions from queue - mempoolBatch, err := c.queue.Next(ctx) - if err != nil { - return nil, err + // Get mempool transactions from queue, but ONLY if we're not catching up. + // During catch-up, the sequencer must produce blocks identical to what base + // sequencing would produce (forced inclusion txs only, no mempool). + var mempoolBatch *coresequencer.Batch + if !c.catchingUp { + var err error + mempoolBatch, err = c.queue.Next(ctx) + if err != nil { + return nil, err + } + } else { + mempoolBatch = &coresequencer.Batch{} + c.logger.Debug(). + Uint64("checkpoint_da_height", c.checkpoint.DAHeight). + Int("forced_txs", len(forcedTxs)). + Msg("catch-up mode: skipping mempool transactions") } // Build combined tx list for filtering @@ -318,6 +347,7 @@ func (c *Sequencer) GetNextBatch(ctx context.Context, req coresequencer.GetNextB Uint64("consumed_count", forcedTxConsumedCount). Uint64("checkpoint_tx_index", c.checkpoint.TxIndex). Uint64("checkpoint_da_height", c.checkpoint.DAHeight). + Bool("catching_up", c.catchingUp). Msg("updated checkpoint after processing forced inclusion transactions") } @@ -326,11 +356,19 @@ func (c *Sequencer) GetNextBatch(ctx context.Context, req coresequencer.GetNextB batchTxs = append(batchTxs, validForcedTxs...) batchTxs = append(batchTxs, validMempoolTxs...) + // During catch-up, use the DA epoch end timestamp to match based sequencing behavior. + // This ensures blocks produced during catch-up have timestamps consistent with + // what base sequencing nodes would have produced. + timestamp := time.Now() + if c.catchingUp && !c.currentDAEndTime.IsZero() { + timestamp = c.currentDAEndTime + } + return &coresequencer.GetNextBatchResponse{ Batch: &coresequencer.Batch{ Transactions: batchTxs, }, - Timestamp: time.Now(), + Timestamp: timestamp, BatchData: req.LastBatchData, }, nil } @@ -374,13 +412,27 @@ func (c *Sequencer) GetDAHeight() uint64 { return c.daHeight.Load() } -// fetchNextDAEpoch fetches transactions from the next DA epoch using checkpoint +// IsCatchingUp returns whether the sequencer is in catch-up mode. +// During catch-up, the sequencer replays missed DA epochs producing blocks +// with only forced inclusion transactions (no mempool), matching the blocks +// that base sequencing nodes would have produced during sequencer downtime. +func (c *Sequencer) IsCatchingUp() bool { + return c.catchingUp +} + +// fetchNextDAEpoch fetches transactions from the next DA epoch using checkpoint. +// It also updates the catch-up state based on the DA epoch timestamp: +// - If the fetched epoch's timestamp is significantly in the past (more than +// one epoch's wall-clock duration), the sequencer enters catch-up mode. +// - If the DA height is from the future (not yet produced), the sequencer +// exits catch-up mode as it has reached the DA head. func (c *Sequencer) fetchNextDAEpoch(ctx context.Context, maxBytes uint64) (uint64, error) { currentDAHeight := c.checkpoint.DAHeight c.logger.Debug(). Uint64("da_height", currentDAHeight). Uint64("tx_index", c.checkpoint.TxIndex). + Bool("catching_up", c.catchingUp). Msg("fetching forced inclusion transactions from DA") forcedTxsEvent, err := c.fiRetriever.RetrieveForcedIncludedTxs(ctx, currentDAHeight) @@ -389,16 +441,36 @@ func (c *Sequencer) fetchNextDAEpoch(ctx context.Context, maxBytes uint64) (uint c.logger.Debug(). Uint64("da_height", currentDAHeight). Msg("DA height from future, waiting for DA to produce block") + + // We've reached the DA head — exit catch-up mode + if c.catchingUp { + c.logger.Info(). + Uint64("da_height", currentDAHeight). + Msg("catch-up complete: reached DA head, resuming normal sequencing") + c.catchingUp = false + } + return 0, nil } else if errors.Is(err, block.ErrForceInclusionNotConfigured) { // Forced inclusion not configured, continue without forced txs c.cachedForcedInclusionTxs = [][]byte{} + c.catchingUp = false return 0, nil } return 0, fmt.Errorf("failed to retrieve forced inclusion transactions: %w", err) } + // Store the DA epoch end time for timestamp usage during catch-up + if !forcedTxsEvent.Timestamp.IsZero() { + c.currentDAEndTime = forcedTxsEvent.Timestamp.UTC() + } + + // Determine catch-up state based on epoch timestamp. + // If the epoch we just fetched ended more than one epoch's wall-clock duration ago, + // we are behind the DA head and must catch up by replaying missed epochs. + c.updateCatchUpState(forcedTxsEvent) + // Validate and filter transactions validTxs := make([][]byte, 0, len(forcedTxsEvent.Txs)) skippedTxs := 0 @@ -420,6 +492,7 @@ func (c *Sequencer) fetchNextDAEpoch(ctx context.Context, maxBytes uint64) (uint Int("skipped_tx_count", skippedTxs). Uint64("da_height_start", forcedTxsEvent.StartDaHeight). Uint64("da_height_end", forcedTxsEvent.EndDaHeight). + Bool("catching_up", c.catchingUp). Msg("fetched forced inclusion transactions from DA") // Cache the transactions @@ -427,3 +500,61 @@ func (c *Sequencer) fetchNextDAEpoch(ctx context.Context, maxBytes uint64) (uint return forcedTxsEvent.EndDaHeight, nil } + +// updateCatchUpState determines whether the sequencer is catching up to the DA head. +// +// The sequencer is considered to be catching up when the DA epoch it just fetched +// has a timestamp that is significantly in the past — specifically, more than one +// full epoch's wall-clock duration ago. This means other nodes likely switched to +// base sequencing during the sequencer's downtime, and the sequencer must replay +// those missed epochs before resuming normal block production. +// +// When the epoch timestamp is recent (within one epoch duration), the sequencer +// has reached the DA head and can resume normal operation. +func (c *Sequencer) updateCatchUpState(event *block.ForcedInclusionEvent) { + if event == nil || event.Timestamp.IsZero() { + // No timestamp available (e.g., empty epoch) — don't change catch-up state. + // If we were already catching up, we remain in that state until we see a + // recent timestamp or hit HeightFromFuture. + return + } + + if c.genesis.DAEpochForcedInclusion == 0 { + // No epoch-based forced inclusion configured — catch-up is irrelevant. + c.catchingUp = false + return + } + + // Calculate how long one DA epoch takes in wall-clock time. + epochWallDuration := time.Duration(c.genesis.DAEpochForcedInclusion) * c.cfg.DA.BlockTime.Duration + + // Use a minimum threshold to avoid false positives from minor delays. + catchUpThreshold := epochWallDuration + if catchUpThreshold < 30*time.Second { + catchUpThreshold = 30 * time.Second + } + + timeSinceEpoch := time.Since(event.Timestamp) + wasCatchingUp := c.catchingUp + + if timeSinceEpoch > catchUpThreshold { + c.catchingUp = true + if !wasCatchingUp { + c.logger.Warn(). + Dur("time_since_epoch", timeSinceEpoch). + Dur("threshold", catchUpThreshold). + Uint64("epoch_start", event.StartDaHeight). + Uint64("epoch_end", event.EndDaHeight). + Msg("entering catch-up mode: DA epoch is behind head, replaying missed epochs with forced inclusion txs only") + } + } else { + c.catchingUp = false + if wasCatchingUp { + c.logger.Info(). + Dur("time_since_epoch", timeSinceEpoch). + Uint64("epoch_start", event.StartDaHeight). + Uint64("epoch_end", event.EndDaHeight). + Msg("exiting catch-up mode: reached DA head, resuming normal sequencing") + } + } +} diff --git a/pkg/sequencers/single/sequencer_test.go b/pkg/sequencers/single/sequencer_test.go index 04d7f88721..d5780542df 100644 --- a/pkg/sequencers/single/sequencer_test.go +++ b/pkg/sequencers/single/sequencer_test.go @@ -1224,6 +1224,689 @@ func TestSequencer_GetNextBatch_GasFilterError(t *testing.T) { // preserves any transactions that weren't even processed yet due to maxBytes limits. // // This test uses maxBytes to limit how many txs are fetched, triggering the unprocessed txs scenario. +func TestSequencer_CatchUp_DetectsOldEpoch(t *testing.T) { + ctx := context.Background() + logger := zerolog.New(zerolog.NewConsoleWriter()) + + db := ds.NewMapDatastore() + defer db.Close() + + mockDA := newMockFullDAClient(t) + forcedInclusionNS := []byte("forced-inclusion") + + mockDA.MockClient.On("GetHeaderNamespace").Return([]byte("header")).Maybe() + mockDA.MockClient.On("GetDataNamespace").Return([]byte("data")).Maybe() + mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() + mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + + // DA epoch at height 100 with a timestamp far in the past (simulating sequencer downtime) + oldTimestamp := time.Now().Add(-10 * time.Minute) + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(100), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: oldTimestamp}, + Data: [][]byte{[]byte("forced-tx-1")}, + }).Once() + + // Next DA epoch at height 101 also in the past (still catching up) + // Use .Maybe() since this test only calls GetNextBatch once (processing epoch 100), + // so epoch 101 may not be retrieved. + oldTimestamp2 := time.Now().Add(-9 * time.Minute) + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(101), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: oldTimestamp2}, + Data: [][]byte{[]byte("forced-tx-2")}, + }).Maybe() + + // DA epoch at height 102 is from the future (DA head reached) + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(102), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusHeightFromFuture}, + }).Maybe() + + gen := genesis.Genesis{ + ChainID: "test-chain", + DAStartHeight: 100, + DAEpochForcedInclusion: 1, + } + + seq, err := NewSequencer( + logger, + db, + mockDA, + config.DefaultConfig(), + []byte("test-chain"), + 1000, + gen, + createDefaultMockExecutor(t), + ) + require.NoError(t, err) + + // Submit a mempool transaction + _, err = seq.SubmitBatchTxs(ctx, coresequencer.SubmitBatchTxsRequest{ + Id: []byte("test-chain"), + Batch: &coresequencer.Batch{Transactions: [][]byte{[]byte("mempool-tx-1")}}, + }) + require.NoError(t, err) + + assert.False(t, seq.IsCatchingUp(), "should not be catching up initially") + + // First GetNextBatch — epoch 100 is far in the past, should enter catch-up + req := coresequencer.GetNextBatchRequest{ + Id: []byte("test-chain"), + MaxBytes: 1000000, + LastBatchData: nil, + } + resp, err := seq.GetNextBatch(ctx, req) + require.NoError(t, err) + require.NotNil(t, resp.Batch) + + assert.True(t, seq.IsCatchingUp(), "should be catching up after fetching old epoch") + + // During catch-up, batch should contain only forced inclusion tx, no mempool tx + assert.Equal(t, 1, len(resp.Batch.Transactions), "should have only forced inclusion tx during catch-up") + assert.Equal(t, []byte("forced-tx-1"), resp.Batch.Transactions[0]) +} + +func TestSequencer_CatchUp_SkipsMempoolDuringCatchUp(t *testing.T) { + ctx := context.Background() + logger := zerolog.New(zerolog.NewConsoleWriter()) + + db := ds.NewMapDatastore() + defer db.Close() + + mockDA := newMockFullDAClient(t) + forcedInclusionNS := []byte("forced-inclusion") + + mockDA.MockClient.On("GetHeaderNamespace").Return([]byte("header")).Maybe() + mockDA.MockClient.On("GetDataNamespace").Return([]byte("data")).Maybe() + mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() + mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + + // Epoch at height 100: old timestamp (catching up) + oldTimestamp := time.Now().Add(-5 * time.Minute) + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(100), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: oldTimestamp}, + Data: [][]byte{[]byte("forced-1"), []byte("forced-2")}, + }).Once() + + // Epoch at height 101: also old (still catching up) + oldTimestamp2 := time.Now().Add(-4 * time.Minute) + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(101), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: oldTimestamp2}, + Data: [][]byte{[]byte("forced-3")}, + }).Once() + + // Epoch at height 102: from the future (head) + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(102), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusHeightFromFuture}, + }).Maybe() + + gen := genesis.Genesis{ + ChainID: "test-chain", + DAStartHeight: 100, + DAEpochForcedInclusion: 1, + } + + seq, err := NewSequencer( + logger, + db, + mockDA, + config.DefaultConfig(), + []byte("test-chain"), + 1000, + gen, + createDefaultMockExecutor(t), + ) + require.NoError(t, err) + + // Submit several mempool transactions + for i := 0; i < 5; i++ { + _, err = seq.SubmitBatchTxs(ctx, coresequencer.SubmitBatchTxsRequest{ + Id: []byte("test-chain"), + Batch: &coresequencer.Batch{Transactions: [][]byte{[]byte("mempool-tx")}}, + }) + require.NoError(t, err) + } + + req := coresequencer.GetNextBatchRequest{ + Id: []byte("test-chain"), + MaxBytes: 1000000, + LastBatchData: nil, + } + + // First batch (epoch 100): only forced txs + resp1, err := seq.GetNextBatch(ctx, req) + require.NoError(t, err) + assert.True(t, seq.IsCatchingUp()) + + for _, tx := range resp1.Batch.Transactions { + assert.NotEqual(t, []byte("mempool-tx"), tx, "mempool tx should not appear during catch-up") + } + assert.Equal(t, 2, len(resp1.Batch.Transactions), "should have 2 forced txs from epoch 100") + + // Second batch (epoch 101): only forced txs + resp2, err := seq.GetNextBatch(ctx, req) + require.NoError(t, err) + assert.True(t, seq.IsCatchingUp()) + + for _, tx := range resp2.Batch.Transactions { + assert.NotEqual(t, []byte("mempool-tx"), tx, "mempool tx should not appear during catch-up") + } + assert.Equal(t, 1, len(resp2.Batch.Transactions), "should have 1 forced tx from epoch 101") +} + +func TestSequencer_CatchUp_UsesDATimestamp(t *testing.T) { + ctx := context.Background() + + db := ds.NewMapDatastore() + defer db.Close() + + mockDA := newMockFullDAClient(t) + forcedInclusionNS := []byte("forced-inclusion") + + mockDA.MockClient.On("GetHeaderNamespace").Return([]byte("header")).Maybe() + mockDA.MockClient.On("GetDataNamespace").Return([]byte("data")).Maybe() + mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() + mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + + // Epoch at height 100: timestamp 5 minutes ago + epochTimestamp := time.Now().Add(-5 * time.Minute).UTC() + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(100), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: epochTimestamp}, + Data: [][]byte{[]byte("forced-tx")}, + }).Once() + + // Next epoch from future + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(101), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusHeightFromFuture}, + }).Maybe() + + gen := genesis.Genesis{ + ChainID: "test-chain", + DAStartHeight: 100, + DAEpochForcedInclusion: 1, + } + + seq, err := NewSequencer( + zerolog.Nop(), + db, + mockDA, + config.DefaultConfig(), + []byte("test-chain"), + 1000, + gen, + createDefaultMockExecutor(t), + ) + require.NoError(t, err) + + req := coresequencer.GetNextBatchRequest{ + Id: []byte("test-chain"), + MaxBytes: 1000000, + LastBatchData: nil, + } + + resp, err := seq.GetNextBatch(ctx, req) + require.NoError(t, err) + require.NotNil(t, resp) + assert.True(t, seq.IsCatchingUp(), "should be in catch-up mode") + + // During catch-up, the timestamp should be the DA epoch end time, not time.Now() + assert.Equal(t, epochTimestamp, resp.Timestamp, + "catch-up batch timestamp should match DA epoch timestamp") +} + +func TestSequencer_CatchUp_ExitsCatchUpAtDAHead(t *testing.T) { + ctx := context.Background() + logger := zerolog.New(zerolog.NewConsoleWriter()) + + db := ds.NewMapDatastore() + defer db.Close() + + mockDA := newMockFullDAClient(t) + forcedInclusionNS := []byte("forced-inclusion") + + mockDA.MockClient.On("GetHeaderNamespace").Return([]byte("header")).Maybe() + mockDA.MockClient.On("GetDataNamespace").Return([]byte("data")).Maybe() + mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() + mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + + // Epoch 100: old (catch-up) + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(100), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: time.Now().Add(-5 * time.Minute)}, + Data: [][]byte{[]byte("forced-old")}, + }).Once() + + // Epoch 101: recent timestamp (current epoch at DA head) + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(101), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: time.Now()}, + Data: [][]byte{[]byte("forced-current")}, + }).Once() + + // Epoch 102: from the future + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(102), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusHeightFromFuture}, + }).Maybe() + + gen := genesis.Genesis{ + ChainID: "test-chain", + DAStartHeight: 100, + DAEpochForcedInclusion: 1, + } + + seq, err := NewSequencer( + logger, + db, + mockDA, + config.DefaultConfig(), + []byte("test-chain"), + 1000, + gen, + createDefaultMockExecutor(t), + ) + require.NoError(t, err) + + // Submit mempool tx + _, err = seq.SubmitBatchTxs(ctx, coresequencer.SubmitBatchTxsRequest{ + Id: []byte("test-chain"), + Batch: &coresequencer.Batch{Transactions: [][]byte{[]byte("mempool-tx")}}, + }) + require.NoError(t, err) + + req := coresequencer.GetNextBatchRequest{ + Id: []byte("test-chain"), + MaxBytes: 1000000, + LastBatchData: nil, + } + + // First batch: catch-up (old epoch 100) + resp1, err := seq.GetNextBatch(ctx, req) + require.NoError(t, err) + assert.True(t, seq.IsCatchingUp(), "should be catching up during old epoch") + assert.Equal(t, 1, len(resp1.Batch.Transactions), "catch-up: only forced tx") + assert.Equal(t, []byte("forced-old"), resp1.Batch.Transactions[0]) + + // Second batch: recent epoch 101 — should exit catch-up + resp2, err := seq.GetNextBatch(ctx, req) + require.NoError(t, err) + assert.False(t, seq.IsCatchingUp(), "should have exited catch-up after reaching recent epoch") + + // Should include both forced tx and mempool tx now + hasForcedTx := false + hasMempoolTx := false + for _, tx := range resp2.Batch.Transactions { + if bytes.Equal(tx, []byte("forced-current")) { + hasForcedTx = true + } + if bytes.Equal(tx, []byte("mempool-tx")) { + hasMempoolTx = true + } + } + assert.True(t, hasForcedTx, "should contain forced tx from current epoch") + assert.True(t, hasMempoolTx, "should contain mempool tx after exiting catch-up") +} + +func TestSequencer_CatchUp_HeightFromFutureExitsCatchUp(t *testing.T) { + ctx := context.Background() + + db := ds.NewMapDatastore() + defer db.Close() + + mockDA := newMockFullDAClient(t) + forcedInclusionNS := []byte("forced-inclusion") + + mockDA.MockClient.On("GetHeaderNamespace").Return([]byte("header")).Maybe() + mockDA.MockClient.On("GetDataNamespace").Return([]byte("data")).Maybe() + mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() + mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + + // Epoch 100: old, triggers catch-up + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(100), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: time.Now().Add(-5 * time.Minute)}, + Data: [][]byte{[]byte("forced-tx")}, + }).Once() + + // Epoch 101: from the future — DA head reached + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(101), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusHeightFromFuture}, + }).Maybe() + + gen := genesis.Genesis{ + ChainID: "test-chain", + DAStartHeight: 100, + DAEpochForcedInclusion: 1, + } + + seq, err := NewSequencer( + zerolog.Nop(), + db, + mockDA, + config.DefaultConfig(), + []byte("test-chain"), + 1000, + gen, + createDefaultMockExecutor(t), + ) + require.NoError(t, err) + + req := coresequencer.GetNextBatchRequest{ + Id: []byte("test-chain"), + MaxBytes: 1000000, + LastBatchData: nil, + } + + // First call: fetches epoch 100 (old), enters catch-up + resp1, err := seq.GetNextBatch(ctx, req) + require.NoError(t, err) + assert.True(t, seq.IsCatchingUp()) + assert.Equal(t, 1, len(resp1.Batch.Transactions)) + + // Second call: epoch 101 is from the future, should exit catch-up + resp2, err := seq.GetNextBatch(ctx, req) + require.NoError(t, err) + assert.False(t, seq.IsCatchingUp(), "should exit catch-up when DA returns HeightFromFuture") + // No forced txs available, batch is empty + assert.Equal(t, 0, len(resp2.Batch.Transactions)) +} + +func TestSequencer_CatchUp_NoCatchUpWhenRecentEpoch(t *testing.T) { + ctx := context.Background() + + db := ds.NewMapDatastore() + defer db.Close() + + mockDA := newMockFullDAClient(t) + forcedInclusionNS := []byte("forced-inclusion") + + mockDA.MockClient.On("GetHeaderNamespace").Return([]byte("header")).Maybe() + mockDA.MockClient.On("GetDataNamespace").Return([]byte("data")).Maybe() + mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() + mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + + // Epoch at height 100: RECENT timestamp (sequencer was NOT down for long) + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(100), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: time.Now()}, + Data: [][]byte{[]byte("forced-tx")}, + }).Once() + + // Next epoch from the future + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(101), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusHeightFromFuture}, + }).Maybe() + + gen := genesis.Genesis{ + ChainID: "test-chain", + DAStartHeight: 100, + DAEpochForcedInclusion: 1, + } + + seq, err := NewSequencer( + zerolog.Nop(), + db, + mockDA, + config.DefaultConfig(), + []byte("test-chain"), + 1000, + gen, + createDefaultMockExecutor(t), + ) + require.NoError(t, err) + + // Submit a mempool tx + _, err = seq.SubmitBatchTxs(ctx, coresequencer.SubmitBatchTxsRequest{ + Id: []byte("test-chain"), + Batch: &coresequencer.Batch{Transactions: [][]byte{[]byte("mempool-tx")}}, + }) + require.NoError(t, err) + + req := coresequencer.GetNextBatchRequest{ + Id: []byte("test-chain"), + MaxBytes: 1000000, + LastBatchData: nil, + } + + resp, err := seq.GetNextBatch(ctx, req) + require.NoError(t, err) + assert.False(t, seq.IsCatchingUp(), "should NOT be catching up when epoch is recent") + + // Should have both forced and mempool txs (normal operation) + assert.Equal(t, 2, len(resp.Batch.Transactions), "should have forced + mempool tx in normal mode") +} + +func TestSequencer_CatchUp_MultiEpochReplay(t *testing.T) { + // Simulates a sequencer that missed 3 DA epochs and must replay them all + // before resuming normal operation. + ctx := context.Background() + logger := zerolog.New(zerolog.NewConsoleWriter()) + + db := ds.NewMapDatastore() + defer db.Close() + + mockDA := newMockFullDAClient(t) + forcedInclusionNS := []byte("forced-inclusion") + + mockDA.MockClient.On("GetHeaderNamespace").Return([]byte("header")).Maybe() + mockDA.MockClient.On("GetDataNamespace").Return([]byte("data")).Maybe() + mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() + mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + + // 3 old epochs (100, 101, 102) — all with timestamps far in the past + for h := uint64(100); h <= 102; h++ { + ts := time.Now().Add(-time.Duration(103-h) * time.Minute) // older epochs further in the past + txData := []byte("forced-from-epoch-" + string(rune('0'+h-100))) + mockDA.MockClient.On("Retrieve", mock.Anything, h, forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: ts}, + Data: [][]byte{txData}, + }).Once() + } + + // Epoch 103: recent (DA head) + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(103), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: time.Now()}, + Data: [][]byte{[]byte("forced-current")}, + }).Once() + + // Epoch 104: from the future + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(104), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusHeightFromFuture}, + }).Maybe() + + gen := genesis.Genesis{ + ChainID: "test-chain", + DAStartHeight: 100, + DAEpochForcedInclusion: 1, + } + + seq, err := NewSequencer( + logger, + db, + mockDA, + config.DefaultConfig(), + []byte("test-chain"), + 1000, + gen, + createDefaultMockExecutor(t), + ) + require.NoError(t, err) + + // Submit mempool txs + _, err = seq.SubmitBatchTxs(ctx, coresequencer.SubmitBatchTxsRequest{ + Id: []byte("test-chain"), + Batch: &coresequencer.Batch{Transactions: [][]byte{[]byte("mempool-1"), []byte("mempool-2")}}, + }) + require.NoError(t, err) + + req := coresequencer.GetNextBatchRequest{ + Id: []byte("test-chain"), + MaxBytes: 1000000, + LastBatchData: nil, + } + + // Process the 3 old epochs — all should be catch-up (no mempool) + for i := 0; i < 3; i++ { + resp, err := seq.GetNextBatch(ctx, req) + require.NoError(t, err) + assert.True(t, seq.IsCatchingUp(), "should be catching up during epoch %d", 100+i) + assert.Equal(t, 1, len(resp.Batch.Transactions), + "epoch %d: should have exactly 1 forced tx", 100+i) + + for _, tx := range resp.Batch.Transactions { + assert.NotEqual(t, []byte("mempool-1"), tx, "no mempool during catch-up epoch %d", 100+i) + assert.NotEqual(t, []byte("mempool-2"), tx, "no mempool during catch-up epoch %d", 100+i) + } + } + + // DA height should have advanced through the 3 old epochs + assert.Equal(t, uint64(103), seq.GetDAHeight(), "DA height should be at 103 after replaying 3 epochs") + + // Next batch: epoch 103 is recent — should exit catch-up and include mempool + resp4, err := seq.GetNextBatch(ctx, req) + require.NoError(t, err) + assert.False(t, seq.IsCatchingUp(), "should have exited catch-up at recent epoch") + + hasForcedCurrent := false + hasMempoolTx := false + for _, tx := range resp4.Batch.Transactions { + if bytes.Equal(tx, []byte("forced-current")) { + hasForcedCurrent = true + } + if bytes.Equal(tx, []byte("mempool-1")) || bytes.Equal(tx, []byte("mempool-2")) { + hasMempoolTx = true + } + } + assert.True(t, hasForcedCurrent, "should include forced tx from current epoch") + assert.True(t, hasMempoolTx, "should include mempool txs after exiting catch-up") +} + +func TestSequencer_CatchUp_NoForcedInclusionConfigured(t *testing.T) { + // When forced inclusion is not configured, catch-up should never activate. + ctx := context.Background() + + db := ds.NewMapDatastore() + defer db.Close() + + mockDA := newMockFullDAClient(t) + // No forced inclusion namespace configured + mockDA.MockClient.On("GetHeaderNamespace").Return([]byte("header")).Maybe() + mockDA.MockClient.On("GetDataNamespace").Return([]byte("data")).Maybe() + mockDA.MockClient.On("GetForcedInclusionNamespace").Return([]byte(nil)).Maybe() + mockDA.MockClient.On("HasForcedInclusionNamespace").Return(false).Maybe() + + gen := genesis.Genesis{ + ChainID: "test-chain", + DAStartHeight: 100, + DAEpochForcedInclusion: 1, + } + + seq, err := NewSequencer( + zerolog.Nop(), + db, + mockDA, + config.DefaultConfig(), + []byte("test-chain"), + 1000, + gen, + createDefaultMockExecutor(t), + ) + require.NoError(t, err) + + // Submit mempool tx + _, err = seq.SubmitBatchTxs(ctx, coresequencer.SubmitBatchTxsRequest{ + Id: []byte("test-chain"), + Batch: &coresequencer.Batch{Transactions: [][]byte{[]byte("mempool-tx")}}, + }) + require.NoError(t, err) + + req := coresequencer.GetNextBatchRequest{ + Id: []byte("test-chain"), + MaxBytes: 1000000, + LastBatchData: nil, + } + + resp, err := seq.GetNextBatch(ctx, req) + require.NoError(t, err) + assert.False(t, seq.IsCatchingUp(), "should never catch up when forced inclusion not configured") + assert.Equal(t, 1, len(resp.Batch.Transactions)) + assert.Equal(t, []byte("mempool-tx"), resp.Batch.Transactions[0]) +} + +func TestSequencer_CatchUp_CheckpointAdvancesDuringCatchUp(t *testing.T) { + // Verify that the checkpoint (DA epoch tracking) advances correctly during catch-up. + ctx := context.Background() + + db := ds.NewMapDatastore() + defer db.Close() + + mockDA := newMockFullDAClient(t) + forcedInclusionNS := []byte("forced-inclusion") + + mockDA.MockClient.On("GetHeaderNamespace").Return([]byte("header")).Maybe() + mockDA.MockClient.On("GetDataNamespace").Return([]byte("data")).Maybe() + mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() + mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + + // Epoch 100: old + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(100), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: time.Now().Add(-5 * time.Minute)}, + Data: [][]byte{[]byte("tx-a"), []byte("tx-b")}, + }).Once() + + // Epoch 101: old + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(101), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: time.Now().Add(-4 * time.Minute)}, + Data: [][]byte{[]byte("tx-c")}, + }).Once() + + // Epoch 102: from the future + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(102), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusHeightFromFuture}, + }).Maybe() + + gen := genesis.Genesis{ + ChainID: "test-chain", + DAStartHeight: 100, + DAEpochForcedInclusion: 1, + } + + seq, err := NewSequencer( + zerolog.Nop(), + db, + mockDA, + config.DefaultConfig(), + []byte("test-chain"), + 1000, + gen, + createDefaultMockExecutor(t), + ) + require.NoError(t, err) + + // Initial checkpoint + assert.Equal(t, uint64(100), seq.checkpoint.DAHeight) + assert.Equal(t, uint64(0), seq.checkpoint.TxIndex) + + req := coresequencer.GetNextBatchRequest{ + Id: []byte("test-chain"), + MaxBytes: 1000000, + LastBatchData: nil, + } + + // Process epoch 100 + resp1, err := seq.GetNextBatch(ctx, req) + require.NoError(t, err) + assert.Equal(t, 2, len(resp1.Batch.Transactions)) + + // Checkpoint should advance to epoch 101 + assert.Equal(t, uint64(101), seq.checkpoint.DAHeight) + assert.Equal(t, uint64(0), seq.checkpoint.TxIndex) + assert.Equal(t, uint64(101), seq.GetDAHeight()) + + // Process epoch 101 + resp2, err := seq.GetNextBatch(ctx, req) + require.NoError(t, err) + assert.Equal(t, 1, len(resp2.Batch.Transactions)) + + // Checkpoint should advance to epoch 102 + assert.Equal(t, uint64(102), seq.checkpoint.DAHeight) + assert.Equal(t, uint64(0), seq.checkpoint.TxIndex) + assert.Equal(t, uint64(102), seq.GetDAHeight()) +} + func TestSequencer_GetNextBatch_GasFilteringPreservesUnprocessedTxs(t *testing.T) { db := ds.NewMapDatastore() logger := zerolog.New(zerolog.NewTestWriter(t)) From c9639846aad3769b5e8982c9d5dbee77eb5d2a8e Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Mon, 9 Feb 2026 17:53:26 +0100 Subject: [PATCH 02/13] fetch DA height --- apps/evm/server/force_inclusion_test.go | 4 + block/internal/da/client.go | 17 +++ block/internal/da/interface.go | 3 + block/internal/da/tracing.go | 14 +++ block/internal/da/tracing_test.go | 9 +- pkg/sequencers/single/sequencer.go | 122 ++++++++++++--------- pkg/sequencers/single/sequencer_test.go | 138 ++++++++++++------------ test/mocks/da.go | 59 +++++++++- test/testda/dummy.go | 5 + 9 files changed, 248 insertions(+), 123 deletions(-) diff --git a/apps/evm/server/force_inclusion_test.go b/apps/evm/server/force_inclusion_test.go index a1ad3059ef..21e06bc5cb 100644 --- a/apps/evm/server/force_inclusion_test.go +++ b/apps/evm/server/force_inclusion_test.go @@ -73,6 +73,10 @@ func (m *mockDA) HasForcedInclusionNamespace() bool { return true } +func (m *mockDA) GetLatestDAHeight(_ context.Context) (uint64, error) { + return 0, nil +} + func TestForceInclusionServer_handleSendRawTransaction_Success(t *testing.T) { testHeight := uint64(100) diff --git a/block/internal/da/client.go b/block/internal/da/client.go index d2e1d626e1..41617ed49e 100644 --- a/block/internal/da/client.go +++ b/block/internal/da/client.go @@ -299,6 +299,23 @@ func (c *client) Retrieve(ctx context.Context, height uint64, namespace []byte) } } +// GetLatestDAHeight returns the latest height available on the DA layer by +// querying the network head. +func (c *client) GetLatestDAHeight(ctx context.Context) (uint64, error) { + headCtx, cancel := context.WithTimeout(ctx, c.defaultTimeout) + defer cancel() + + header, err := c.headerAPI.NetworkHead(headCtx) + if err != nil { + return 0, fmt.Errorf("failed to get DA network head: %w", err) + } + if header == nil { + return 0, fmt.Errorf("DA network head returned nil header") + } + + return header.Height, nil +} + // RetrieveForcedInclusion retrieves blobs from the forced inclusion namespace at the specified height. func (c *client) RetrieveForcedInclusion(ctx context.Context, height uint64) datypes.ResultRetrieve { if !c.hasForcedNamespace { diff --git a/block/internal/da/interface.go b/block/internal/da/interface.go index 69c2d18f7e..1e9f6cedee 100644 --- a/block/internal/da/interface.go +++ b/block/internal/da/interface.go @@ -17,6 +17,9 @@ type Client interface { // Get retrieves blobs by their IDs. Used for visualization and fetching specific blobs. Get(ctx context.Context, ids []datypes.ID, namespace []byte) ([]datypes.Blob, error) + // GetLatestDAHeight returns the latest height available on the DA layer.. + GetLatestDAHeight(ctx context.Context) (uint64, error) + // Namespace accessors. GetHeaderNamespace() []byte GetDataNamespace() []byte diff --git a/block/internal/da/tracing.go b/block/internal/da/tracing.go index 45fae2e863..4d946a8b74 100644 --- a/block/internal/da/tracing.go +++ b/block/internal/da/tracing.go @@ -123,6 +123,20 @@ func (t *tracedClient) Validate(ctx context.Context, ids []datypes.ID, proofs [] return res, nil } +func (t *tracedClient) GetLatestDAHeight(ctx context.Context) (uint64, error) { + ctx, span := t.tracer.Start(ctx, "DA.GetLatestDAHeight") + defer span.End() + + height, err := t.inner.GetLatestDAHeight(ctx) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return 0, err + } + span.SetAttributes(attribute.Int64("da.latest_height", int64(height))) + return height, nil +} + func (t *tracedClient) GetHeaderNamespace() []byte { return t.inner.GetHeaderNamespace() } func (t *tracedClient) GetDataNamespace() []byte { return t.inner.GetDataNamespace() } func (t *tracedClient) GetForcedInclusionNamespace() []byte { diff --git a/block/internal/da/tracing_test.go b/block/internal/da/tracing_test.go index ea01c9e425..de32532a31 100644 --- a/block/internal/da/tracing_test.go +++ b/block/internal/da/tracing_test.go @@ -54,10 +54,11 @@ func (m *mockFullClient) Validate(ctx context.Context, ids []datypes.ID, proofs } return nil, nil } -func (m *mockFullClient) GetHeaderNamespace() []byte { return []byte{0x01} } -func (m *mockFullClient) GetDataNamespace() []byte { return []byte{0x02} } -func (m *mockFullClient) GetForcedInclusionNamespace() []byte { return []byte{0x03} } -func (m *mockFullClient) HasForcedInclusionNamespace() bool { return true } +func (m *mockFullClient) GetLatestDAHeight(_ context.Context) (uint64, error) { return 0, nil } +func (m *mockFullClient) GetHeaderNamespace() []byte { return []byte{0x01} } +func (m *mockFullClient) GetDataNamespace() []byte { return []byte{0x02} } +func (m *mockFullClient) GetForcedInclusionNamespace() []byte { return []byte{0x03} } +func (m *mockFullClient) HasForcedInclusionNamespace() bool { return true } // setup a tracer provider + span recorder func setupDATrace(t *testing.T, inner FullClient) (FullClient, *tracetest.SpanRecorder) { diff --git a/pkg/sequencers/single/sequencer.go b/pkg/sequencers/single/sequencer.go index f11a238837..4874e77fc1 100644 --- a/pkg/sequencers/single/sequencer.go +++ b/pkg/sequencers/single/sequencer.go @@ -21,6 +21,7 @@ import ( "github.com/evstack/ev-node/pkg/genesis" seqcommon "github.com/evstack/ev-node/pkg/sequencers/common" "github.com/evstack/ev-node/pkg/store" + "github.com/evstack/ev-node/types" ) // ErrInvalidId is returned when the chain id is invalid @@ -57,6 +58,11 @@ type Sequencer struct { // inclusion transactions, no mempool) before resuming normal sequencing. // This ensures the sequencer produces the same blocks that nodes running in // base sequencing mode would have produced during the downtime. + // + // catchingUp is true when the sequencer is replaying missed DA epochs. + // It is set when we detect (via GetLatestDAHeight) that the DA layer is more + // than one epoch ahead of our checkpoint, and cleared when we hit + // ErrHeightFromFuture (meaning we've reached the DA head). catchingUp bool // currentDAEndTime is the DA epoch end timestamp from the last fetched epoch. // Used as the block timestamp during catch-up to match based sequencing behavior. @@ -421,14 +427,20 @@ func (c *Sequencer) IsCatchingUp() bool { } // fetchNextDAEpoch fetches transactions from the next DA epoch using checkpoint. -// It also updates the catch-up state based on the DA epoch timestamp: -// - If the fetched epoch's timestamp is significantly in the past (more than -// one epoch's wall-clock duration), the sequencer enters catch-up mode. +// It also updates the catch-up state based on DA heights: +// - Before the first fetch, it queries GetLatestDAHeight to determine if the +// sequencer has missed more than one DA epoch. If so, catch-up mode is +// entered and only forced-inclusion blocks (no mempool) are produced. // - If the DA height is from the future (not yet produced), the sequencer // exits catch-up mode as it has reached the DA head. func (c *Sequencer) fetchNextDAEpoch(ctx context.Context, maxBytes uint64) (uint64, error) { currentDAHeight := c.checkpoint.DAHeight + // Determine catch-up state before the (potentially expensive) epoch fetch. + // This is done once per sequencer lifecycle — subsequent catch-up exits are + // handled by ErrHeightFromFuture below. + c.updateCatchUpState(ctx) + c.logger.Debug(). Uint64("da_height", currentDAHeight). Uint64("tx_index", c.checkpoint.TxIndex). @@ -466,11 +478,6 @@ func (c *Sequencer) fetchNextDAEpoch(ctx context.Context, maxBytes uint64) (uint c.currentDAEndTime = forcedTxsEvent.Timestamp.UTC() } - // Determine catch-up state based on epoch timestamp. - // If the epoch we just fetched ended more than one epoch's wall-clock duration ago, - // we are behind the DA head and must catch up by replaying missed epochs. - c.updateCatchUpState(forcedTxsEvent) - // Validate and filter transactions validTxs := make([][]byte, 0, len(forcedTxsEvent.Txs)) skippedTxs := 0 @@ -501,60 +508,75 @@ func (c *Sequencer) fetchNextDAEpoch(ctx context.Context, maxBytes uint64) (uint return forcedTxsEvent.EndDaHeight, nil } -// updateCatchUpState determines whether the sequencer is catching up to the DA head. +// updateCatchUpState determines whether the sequencer needs to catch up to the +// DA head by comparing the sequencer's checkpoint DA height against the latest +// DA height. // -// The sequencer is considered to be catching up when the DA epoch it just fetched -// has a timestamp that is significantly in the past — specifically, more than one -// full epoch's wall-clock duration ago. This means other nodes likely switched to -// base sequencing during the sequencer's downtime, and the sequencer must replay -// those missed epochs before resuming normal block production. +// The detection is purely height-based: we query GetLatestDAHeight once (on the +// first epoch fetch) and calculate how many epochs the sequencer has missed. If +// the gap exceeds one epoch, the sequencer enters catch-up mode and replays +// missed epochs with forced-inclusion transactions only (no mempool). It remains +// in catch-up until fetchNextDAEpoch hits ErrHeightFromFuture, meaning we've +// reached the DA head. // -// When the epoch timestamp is recent (within one epoch duration), the sequencer -// has reached the DA head and can resume normal operation. -func (c *Sequencer) updateCatchUpState(event *block.ForcedInclusionEvent) { - if event == nil || event.Timestamp.IsZero() { - // No timestamp available (e.g., empty epoch) — don't change catch-up state. - // If we were already catching up, we remain in that state until we see a - // recent timestamp or hit HeightFromFuture. +// This check is performed only once per sequencer lifecycle. If the downtime was +// short enough that the sequencer is still within the current or next epoch, no +// catch-up is needed and the (lightweight) GetLatestDAHeight call is the only +// overhead. +func (c *Sequencer) updateCatchUpState(ctx context.Context) { + // Already catching up — nothing to do. We'll exit via ErrHeightFromFuture. + if c.catchingUp { return } - if c.genesis.DAEpochForcedInclusion == 0 { + epochSize := c.genesis.DAEpochForcedInclusion + if epochSize == 0 { // No epoch-based forced inclusion configured — catch-up is irrelevant. - c.catchingUp = false return } - // Calculate how long one DA epoch takes in wall-clock time. - epochWallDuration := time.Duration(c.genesis.DAEpochForcedInclusion) * c.cfg.DA.BlockTime.Duration + currentDAHeight := c.checkpoint.DAHeight + daStartHeight := c.genesis.DAStartHeight + + latestDAHeight, err := c.daClient.GetLatestDAHeight(ctx) + if err != nil { + c.logger.Warn().Err(err). + Msg("failed to get latest DA height for catch-up detection, skipping check") + return + } - // Use a minimum threshold to avoid false positives from minor delays. - catchUpThreshold := epochWallDuration - if catchUpThreshold < 30*time.Second { - catchUpThreshold = 30 * time.Second + if latestDAHeight <= currentDAHeight { + // DA hasn't moved beyond our position — nothing to catch up. + c.logger.Debug(). + Uint64("checkpoint_da_height", currentDAHeight). + Uint64("latest_da_height", latestDAHeight). + Msg("sequencer is at or ahead of DA head, no catch-up needed") + return } - timeSinceEpoch := time.Since(event.Timestamp) - wasCatchingUp := c.catchingUp + // Calculate epoch numbers for current position and DA head. + currentEpoch := types.CalculateEpochNumber(currentDAHeight, daStartHeight, epochSize) + latestEpoch := types.CalculateEpochNumber(latestDAHeight, daStartHeight, epochSize) + missedEpochs := latestEpoch - currentEpoch - if timeSinceEpoch > catchUpThreshold { - c.catchingUp = true - if !wasCatchingUp { - c.logger.Warn(). - Dur("time_since_epoch", timeSinceEpoch). - Dur("threshold", catchUpThreshold). - Uint64("epoch_start", event.StartDaHeight). - Uint64("epoch_end", event.EndDaHeight). - Msg("entering catch-up mode: DA epoch is behind head, replaying missed epochs with forced inclusion txs only") - } - } else { - c.catchingUp = false - if wasCatchingUp { - c.logger.Info(). - Dur("time_since_epoch", timeSinceEpoch). - Uint64("epoch_start", event.StartDaHeight). - Uint64("epoch_end", event.EndDaHeight). - Msg("exiting catch-up mode: reached DA head, resuming normal sequencing") - } + if missedEpochs <= 1 { + // Within the current or next epoch — normal operation, no catch-up. + c.logger.Debug(). + Uint64("checkpoint_da_height", currentDAHeight). + Uint64("latest_da_height", latestDAHeight). + Uint64("current_epoch", currentEpoch). + Uint64("latest_epoch", latestEpoch). + Msg("sequencer within one epoch of DA head, no catch-up needed") + return } + + // The DA layer is more than one epoch ahead. Enter catch-up mode. + c.catchingUp = true + c.logger.Warn(). + Uint64("checkpoint_da_height", currentDAHeight). + Uint64("latest_da_height", latestDAHeight). + Uint64("current_epoch", currentEpoch). + Uint64("latest_epoch", latestEpoch). + Uint64("missed_epochs", missedEpochs). + Msg("entering catch-up mode: DA layer is multiple epochs ahead, replaying missed epochs with forced inclusion txs only") } diff --git a/pkg/sequencers/single/sequencer_test.go b/pkg/sequencers/single/sequencer_test.go index d5780542df..e4dcfb1250 100644 --- a/pkg/sequencers/single/sequencer_test.go +++ b/pkg/sequencers/single/sequencer_test.go @@ -381,6 +381,9 @@ func TestSequencer_GetNextBatch_ForcedInclusionAndBatch_MaxBytes(t *testing.T) { mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + // DA head is at 100 — same as sequencer start, no catch-up needed + mockDA.MockClient.On("GetLatestDAHeight", mock.Anything).Return(uint64(100), nil).Maybe() + // Create forced inclusion txs that are 50 and 60 bytes forcedTx1 := make([]byte, 50) forcedTx2 := make([]byte, 60) @@ -469,6 +472,9 @@ func TestSequencer_GetNextBatch_ForcedInclusion_ExceedsMaxBytes(t *testing.T) { mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + // DA head is at 100 — same as sequencer start, no catch-up needed + mockDA.MockClient.On("GetLatestDAHeight", mock.Anything).Return(uint64(100), nil).Maybe() + // Create forced inclusion txs where combined they exceed maxBytes forcedTx1 := make([]byte, 100) forcedTx2 := make([]byte, 80) // This would be deferred @@ -549,6 +555,9 @@ func TestSequencer_GetNextBatch_AlwaysCheckPendingForcedInclusion(t *testing.T) mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + // DA head is at 100 — same as sequencer start, no catch-up needed + mockDA.MockClient.On("GetLatestDAHeight", mock.Anything).Return(uint64(100), nil).Maybe() + // First call returns large forced txs largeForcedTx1, largeForcedTx2 := make([]byte, 75), make([]byte, 75) mockDA.MockClient.On("Retrieve", mock.Anything, uint64(100), forcedInclusionNS).Return(datypes.ResultRetrieve{ @@ -887,6 +896,10 @@ func TestSequencer_CheckpointPersistence_CrashRecovery(t *testing.T) { mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + // DA head is at 101 — close to sequencer start (100), no catch-up needed. + // Use Maybe() since two sequencer instances share this mock. + mockDA.MockClient.On("GetLatestDAHeight", mock.Anything).Return(uint64(101), nil).Maybe() + // Create forced inclusion txs at DA height 100 // Use sizes that all fit in one batch to test checkpoint advancing forcedTx1 := make([]byte, 50) @@ -986,6 +999,9 @@ func TestSequencer_GetNextBatch_EmptyDABatch_IncreasesDAHeight(t *testing.T) { mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + // DA head is at 100 — same as sequencer start, no catch-up needed + mockDA.MockClient.On("GetLatestDAHeight", mock.Anything).Return(uint64(100), nil).Maybe() + // First DA epoch returns empty transactions mockDA.MockClient.On("Retrieve", mock.Anything, uint64(100), forcedInclusionNS).Return(datypes.ResultRetrieve{ BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess}, @@ -1239,27 +1255,17 @@ func TestSequencer_CatchUp_DetectsOldEpoch(t *testing.T) { mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() - // DA epoch at height 100 with a timestamp far in the past (simulating sequencer downtime) + // DA head is at height 105 — sequencer starts at 100 with epoch size 1, + // so it has missed 5 epochs (>1), triggering catch-up. + mockDA.MockClient.On("GetLatestDAHeight", mock.Anything).Return(uint64(105), nil).Once() + + // DA epoch at height 100 oldTimestamp := time.Now().Add(-10 * time.Minute) mockDA.MockClient.On("Retrieve", mock.Anything, uint64(100), forcedInclusionNS).Return(datypes.ResultRetrieve{ BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: oldTimestamp}, Data: [][]byte{[]byte("forced-tx-1")}, }).Once() - // Next DA epoch at height 101 also in the past (still catching up) - // Use .Maybe() since this test only calls GetNextBatch once (processing epoch 100), - // so epoch 101 may not be retrieved. - oldTimestamp2 := time.Now().Add(-9 * time.Minute) - mockDA.MockClient.On("Retrieve", mock.Anything, uint64(101), forcedInclusionNS).Return(datypes.ResultRetrieve{ - BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: oldTimestamp2}, - Data: [][]byte{[]byte("forced-tx-2")}, - }).Maybe() - - // DA epoch at height 102 is from the future (DA head reached) - mockDA.MockClient.On("Retrieve", mock.Anything, uint64(102), forcedInclusionNS).Return(datypes.ResultRetrieve{ - BaseResult: datypes.BaseResult{Code: datypes.StatusHeightFromFuture}, - }).Maybe() - gen := genesis.Genesis{ ChainID: "test-chain", DAStartHeight: 100, @@ -1287,7 +1293,7 @@ func TestSequencer_CatchUp_DetectsOldEpoch(t *testing.T) { assert.False(t, seq.IsCatchingUp(), "should not be catching up initially") - // First GetNextBatch — epoch 100 is far in the past, should enter catch-up + // First GetNextBatch — DA head is far ahead, should enter catch-up req := coresequencer.GetNextBatchRequest{ Id: []byte("test-chain"), MaxBytes: 1000000, @@ -1297,7 +1303,7 @@ func TestSequencer_CatchUp_DetectsOldEpoch(t *testing.T) { require.NoError(t, err) require.NotNil(t, resp.Batch) - assert.True(t, seq.IsCatchingUp(), "should be catching up after fetching old epoch") + assert.True(t, seq.IsCatchingUp(), "should be catching up after detecting epoch gap") // During catch-up, batch should contain only forced inclusion tx, no mempool tx assert.Equal(t, 1, len(resp.Batch.Transactions), "should have only forced inclusion tx during catch-up") @@ -1319,21 +1325,26 @@ func TestSequencer_CatchUp_SkipsMempoolDuringCatchUp(t *testing.T) { mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() - // Epoch at height 100: old timestamp (catching up) + // DA head is at 105 — sequencer starts at 100 with epoch size 1, + // so it has missed multiple epochs, triggering catch-up. + // Called once on first fetchNextDAEpoch; subsequent fetches skip the check. + mockDA.MockClient.On("GetLatestDAHeight", mock.Anything).Return(uint64(105), nil).Once() + + // Epoch at height 100: two forced txs oldTimestamp := time.Now().Add(-5 * time.Minute) mockDA.MockClient.On("Retrieve", mock.Anything, uint64(100), forcedInclusionNS).Return(datypes.ResultRetrieve{ BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: oldTimestamp}, Data: [][]byte{[]byte("forced-1"), []byte("forced-2")}, }).Once() - // Epoch at height 101: also old (still catching up) + // Epoch at height 101: one forced tx oldTimestamp2 := time.Now().Add(-4 * time.Minute) mockDA.MockClient.On("Retrieve", mock.Anything, uint64(101), forcedInclusionNS).Return(datypes.ResultRetrieve{ BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: oldTimestamp2}, Data: [][]byte{[]byte("forced-3")}, }).Once() - // Epoch at height 102: from the future (head) + // Epoch at height 102: from the future (head reached during replay) mockDA.MockClient.On("Retrieve", mock.Anything, uint64(102), forcedInclusionNS).Return(datypes.ResultRetrieve{ BaseResult: datypes.BaseResult{Code: datypes.StatusHeightFromFuture}, }).Maybe() @@ -1406,6 +1417,9 @@ func TestSequencer_CatchUp_UsesDATimestamp(t *testing.T) { mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + // DA head is at 105 — multiple epochs ahead, triggers catch-up + mockDA.MockClient.On("GetLatestDAHeight", mock.Anything).Return(uint64(105), nil).Once() + // Epoch at height 100: timestamp 5 minutes ago epochTimestamp := time.Now().Add(-5 * time.Minute).UTC() mockDA.MockClient.On("Retrieve", mock.Anything, uint64(100), forcedInclusionNS).Return(datypes.ResultRetrieve{ @@ -1413,11 +1427,6 @@ func TestSequencer_CatchUp_UsesDATimestamp(t *testing.T) { Data: [][]byte{[]byte("forced-tx")}, }).Once() - // Next epoch from future - mockDA.MockClient.On("Retrieve", mock.Anything, uint64(101), forcedInclusionNS).Return(datypes.ResultRetrieve{ - BaseResult: datypes.BaseResult{Code: datypes.StatusHeightFromFuture}, - }).Maybe() - gen := genesis.Genesis{ ChainID: "test-chain", DAStartHeight: 100, @@ -1467,22 +1476,19 @@ func TestSequencer_CatchUp_ExitsCatchUpAtDAHead(t *testing.T) { mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + // DA head is at 105 — multiple epochs ahead, triggers catch-up + mockDA.MockClient.On("GetLatestDAHeight", mock.Anything).Return(uint64(105), nil).Once() + // Epoch 100: old (catch-up) mockDA.MockClient.On("Retrieve", mock.Anything, uint64(100), forcedInclusionNS).Return(datypes.ResultRetrieve{ BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: time.Now().Add(-5 * time.Minute)}, Data: [][]byte{[]byte("forced-old")}, }).Once() - // Epoch 101: recent timestamp (current epoch at DA head) + // Epoch 101: fetched during catch-up, but returns HeightFromFuture to exit catch-up mockDA.MockClient.On("Retrieve", mock.Anything, uint64(101), forcedInclusionNS).Return(datypes.ResultRetrieve{ - BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: time.Now()}, - Data: [][]byte{[]byte("forced-current")}, - }).Once() - - // Epoch 102: from the future - mockDA.MockClient.On("Retrieve", mock.Anything, uint64(102), forcedInclusionNS).Return(datypes.ResultRetrieve{ BaseResult: datypes.BaseResult{Code: datypes.StatusHeightFromFuture}, - }).Maybe() + }).Once() gen := genesis.Genesis{ ChainID: "test-chain", @@ -1522,23 +1528,18 @@ func TestSequencer_CatchUp_ExitsCatchUpAtDAHead(t *testing.T) { assert.Equal(t, 1, len(resp1.Batch.Transactions), "catch-up: only forced tx") assert.Equal(t, []byte("forced-old"), resp1.Batch.Transactions[0]) - // Second batch: recent epoch 101 — should exit catch-up + // Second batch: epoch 101 returns HeightFromFuture — should exit catch-up resp2, err := seq.GetNextBatch(ctx, req) require.NoError(t, err) - assert.False(t, seq.IsCatchingUp(), "should have exited catch-up after reaching recent epoch") + assert.False(t, seq.IsCatchingUp(), "should have exited catch-up after reaching DA head") - // Should include both forced tx and mempool tx now - hasForcedTx := false + // Should include mempool tx now (no forced txs available) hasMempoolTx := false for _, tx := range resp2.Batch.Transactions { - if bytes.Equal(tx, []byte("forced-current")) { - hasForcedTx = true - } if bytes.Equal(tx, []byte("mempool-tx")) { hasMempoolTx = true } } - assert.True(t, hasForcedTx, "should contain forced tx from current epoch") assert.True(t, hasMempoolTx, "should contain mempool tx after exiting catch-up") } @@ -1556,16 +1557,19 @@ func TestSequencer_CatchUp_HeightFromFutureExitsCatchUp(t *testing.T) { mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() - // Epoch 100: old, triggers catch-up + // DA head is at 105 — multiple epochs ahead, triggers catch-up + mockDA.MockClient.On("GetLatestDAHeight", mock.Anything).Return(uint64(105), nil).Once() + + // Epoch 100: success, fetched during catch-up mockDA.MockClient.On("Retrieve", mock.Anything, uint64(100), forcedInclusionNS).Return(datypes.ResultRetrieve{ BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: time.Now().Add(-5 * time.Minute)}, Data: [][]byte{[]byte("forced-tx")}, }).Once() - // Epoch 101: from the future — DA head reached + // Epoch 101: from the future — DA head reached, exits catch-up mockDA.MockClient.On("Retrieve", mock.Anything, uint64(101), forcedInclusionNS).Return(datypes.ResultRetrieve{ BaseResult: datypes.BaseResult{Code: datypes.StatusHeightFromFuture}, - }).Maybe() + }).Once() gen := genesis.Genesis{ ChainID: "test-chain", @@ -1591,7 +1595,7 @@ func TestSequencer_CatchUp_HeightFromFutureExitsCatchUp(t *testing.T) { LastBatchData: nil, } - // First call: fetches epoch 100 (old), enters catch-up + // First call: fetches epoch 100, enters catch-up via epoch gap detection resp1, err := seq.GetNextBatch(ctx, req) require.NoError(t, err) assert.True(t, seq.IsCatchingUp()) @@ -1619,17 +1623,16 @@ func TestSequencer_CatchUp_NoCatchUpWhenRecentEpoch(t *testing.T) { mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() - // Epoch at height 100: RECENT timestamp (sequencer was NOT down for long) + // DA head is at 100 — sequencer starts at 100 with epoch size 1, + // so it is within the same epoch (0 missed). No catch-up. + mockDA.MockClient.On("GetLatestDAHeight", mock.Anything).Return(uint64(100), nil).Once() + + // Epoch at height 100: current epoch mockDA.MockClient.On("Retrieve", mock.Anything, uint64(100), forcedInclusionNS).Return(datypes.ResultRetrieve{ BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: time.Now()}, Data: [][]byte{[]byte("forced-tx")}, }).Once() - // Next epoch from the future - mockDA.MockClient.On("Retrieve", mock.Anything, uint64(101), forcedInclusionNS).Return(datypes.ResultRetrieve{ - BaseResult: datypes.BaseResult{Code: datypes.StatusHeightFromFuture}, - }).Maybe() - gen := genesis.Genesis{ ChainID: "test-chain", DAStartHeight: 100, @@ -1663,7 +1666,7 @@ func TestSequencer_CatchUp_NoCatchUpWhenRecentEpoch(t *testing.T) { resp, err := seq.GetNextBatch(ctx, req) require.NoError(t, err) - assert.False(t, seq.IsCatchingUp(), "should NOT be catching up when epoch is recent") + assert.False(t, seq.IsCatchingUp(), "should NOT be catching up when within one epoch of DA head") // Should have both forced and mempool txs (normal operation) assert.Equal(t, 2, len(resp.Batch.Transactions), "should have forced + mempool tx in normal mode") @@ -1686,6 +1689,11 @@ func TestSequencer_CatchUp_MultiEpochReplay(t *testing.T) { mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + // DA head is at 106 — sequencer starts at 100 with epoch size 1, + // so it has missed 6 epochs (>1), triggering catch-up. + // Called once on first fetchNextDAEpoch. + mockDA.MockClient.On("GetLatestDAHeight", mock.Anything).Return(uint64(106), nil).Once() + // 3 old epochs (100, 101, 102) — all with timestamps far in the past for h := uint64(100); h <= 102; h++ { ts := time.Now().Add(-time.Duration(103-h) * time.Minute) // older epochs further in the past @@ -1696,16 +1704,10 @@ func TestSequencer_CatchUp_MultiEpochReplay(t *testing.T) { }).Once() } - // Epoch 103: recent (DA head) + // Epoch 103: returns HeightFromFuture — DA head reached, exits catch-up mockDA.MockClient.On("Retrieve", mock.Anything, uint64(103), forcedInclusionNS).Return(datypes.ResultRetrieve{ - BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: time.Now()}, - Data: [][]byte{[]byte("forced-current")}, - }).Once() - - // Epoch 104: from the future - mockDA.MockClient.On("Retrieve", mock.Anything, uint64(104), forcedInclusionNS).Return(datypes.ResultRetrieve{ BaseResult: datypes.BaseResult{Code: datypes.StatusHeightFromFuture}, - }).Maybe() + }).Once() gen := genesis.Genesis{ ChainID: "test-chain", @@ -1755,27 +1757,24 @@ func TestSequencer_CatchUp_MultiEpochReplay(t *testing.T) { // DA height should have advanced through the 3 old epochs assert.Equal(t, uint64(103), seq.GetDAHeight(), "DA height should be at 103 after replaying 3 epochs") - // Next batch: epoch 103 is recent — should exit catch-up and include mempool + // Next batch: epoch 103 returns HeightFromFuture — should exit catch-up and include mempool resp4, err := seq.GetNextBatch(ctx, req) require.NoError(t, err) - assert.False(t, seq.IsCatchingUp(), "should have exited catch-up at recent epoch") + assert.False(t, seq.IsCatchingUp(), "should have exited catch-up at DA head") - hasForcedCurrent := false hasMempoolTx := false for _, tx := range resp4.Batch.Transactions { - if bytes.Equal(tx, []byte("forced-current")) { - hasForcedCurrent = true - } if bytes.Equal(tx, []byte("mempool-1")) || bytes.Equal(tx, []byte("mempool-2")) { hasMempoolTx = true } } - assert.True(t, hasForcedCurrent, "should include forced tx from current epoch") assert.True(t, hasMempoolTx, "should include mempool txs after exiting catch-up") } func TestSequencer_CatchUp_NoForcedInclusionConfigured(t *testing.T) { // When forced inclusion is not configured, catch-up should never activate. + // GetLatestDAHeight should NOT be called because DAEpochForcedInclusion == 0 + // causes updateCatchUpState to bail out early. ctx := context.Background() db := ds.NewMapDatastore() @@ -1791,7 +1790,7 @@ func TestSequencer_CatchUp_NoForcedInclusionConfigured(t *testing.T) { gen := genesis.Genesis{ ChainID: "test-chain", DAStartHeight: 100, - DAEpochForcedInclusion: 1, + DAEpochForcedInclusion: 0, // no epoch-based forced inclusion } seq, err := NewSequencer( @@ -1841,6 +1840,9 @@ func TestSequencer_CatchUp_CheckpointAdvancesDuringCatchUp(t *testing.T) { mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + // DA head is at 105 — multiple epochs ahead, triggers catch-up + mockDA.MockClient.On("GetLatestDAHeight", mock.Anything).Return(uint64(105), nil).Once() + // Epoch 100: old mockDA.MockClient.On("Retrieve", mock.Anything, uint64(100), forcedInclusionNS).Return(datypes.ResultRetrieve{ BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: time.Now().Add(-5 * time.Minute)}, diff --git a/test/mocks/da.go b/test/mocks/da.go index 0b5c71a49c..c7d17d5bbf 100644 --- a/test/mocks/da.go +++ b/test/mocks/da.go @@ -7,7 +7,7 @@ package mocks import ( "context" - "github.com/evstack/ev-node/pkg/da/types" + da "github.com/evstack/ev-node/pkg/da/types" mock "github.com/stretchr/testify/mock" ) @@ -251,6 +251,63 @@ func (_c *MockClient_GetHeaderNamespace_Call) RunAndReturn(run func() []byte) *M } // HasForcedInclusionNamespace provides a mock function for the type MockClient +func (_mock *MockClient) GetLatestDAHeight(ctx context.Context) (uint64, error) { + ret := _mock.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for GetLatestDAHeight") + } + + var r0 uint64 + var r1 error + if returnFunc, ok := ret.Get(0).(func(context.Context) (uint64, error)); ok { + return returnFunc(ctx) + } + if returnFunc, ok := ret.Get(0).(func(context.Context) uint64); ok { + r0 = returnFunc(ctx) + } else { + r0 = ret.Get(0).(uint64) + } + if returnFunc, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = returnFunc(ctx) + } else { + r1 = ret.Error(1) + } + return r0, r1 +} + +// MockClient_GetLatestDAHeight_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetLatestDAHeight' +type MockClient_GetLatestDAHeight_Call struct { + *mock.Call +} + +// GetLatestDAHeight is a helper method to define mock.On call +// - ctx context.Context +func (_e *MockClient_Expecter) GetLatestDAHeight(ctx interface{}) *MockClient_GetLatestDAHeight_Call { + return &MockClient_GetLatestDAHeight_Call{Call: _e.mock.On("GetLatestDAHeight", ctx)} +} + +func (_c *MockClient_GetLatestDAHeight_Call) Run(run func(ctx context.Context)) *MockClient_GetLatestDAHeight_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 context.Context + if args[0] != nil { + arg0 = args[0].(context.Context) + } + run(arg0) + }) + return _c +} + +func (_c *MockClient_GetLatestDAHeight_Call) Return(height uint64, err error) *MockClient_GetLatestDAHeight_Call { + _c.Call.Return(height, err) + return _c +} + +func (_c *MockClient_GetLatestDAHeight_Call) RunAndReturn(run func(context.Context) (uint64, error)) *MockClient_GetLatestDAHeight_Call { + _c.Call.Return(run) + return _c +} + func (_mock *MockClient) HasForcedInclusionNamespace() bool { ret := _mock.Called() diff --git a/test/testda/dummy.go b/test/testda/dummy.go index 684d3fcee5..648021b76a 100644 --- a/test/testda/dummy.go +++ b/test/testda/dummy.go @@ -184,6 +184,11 @@ func (d *DummyDA) GetForcedInclusionNamespace() []byte { return nil } // HasForcedInclusionNamespace reports whether forced inclusion is configured. func (d *DummyDA) HasForcedInclusionNamespace() bool { return false } +// GetLatestDAHeight returns the current DA height (the latest height available). +func (d *DummyDA) GetLatestDAHeight(_ context.Context) (uint64, error) { + return d.height.Load(), nil +} + // Get retrieves blobs by ID (stub implementation). func (d *DummyDA) Get(_ context.Context, _ []datypes.ID, _ []byte) ([]datypes.Blob, error) { return nil, nil From 42f04058156dfac1db5fc498df837a1b1926dbe6 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Tue, 10 Feb 2026 15:43:33 +0100 Subject: [PATCH 03/13] cl --- CHANGELOG.md | 6 ++++++ apps/evm/go.mod | 8 ++++---- apps/evm/go.sum | 4 ---- apps/grpc/go.mod | 8 ++++---- apps/grpc/go.sum | 4 ---- apps/testapp/go.mod | 2 +- apps/testapp/go.sum | 2 -- 7 files changed, 15 insertions(+), 19 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index cf7ce9a519..0f6d66dabc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- Add disaster recovery for sequencer + - Catch up possible DA-only blocks when restarting. [#3057](https://github.com/evstack/ev-node/pull/3057) + - Verify DA and P2P state on restart (prevent double-signing). [#3061](https://github.com/evstack/ev-node/pull/3061) + ## v1.0.0-rc.4 ### Changes diff --git a/apps/evm/go.mod b/apps/evm/go.mod index ed4b6c5126..4052c3afb2 100644 --- a/apps/evm/go.mod +++ b/apps/evm/go.mod @@ -2,10 +2,10 @@ module github.com/evstack/ev-node/apps/evm go 1.25.6 -//replace ( -// github.com/evstack/ev-node => ../../ -// github.com/evstack/ev-node/execution/evm => ../../execution/evm -//) +replace ( + github.com/evstack/ev-node => ../../ + github.com/evstack/ev-node/execution/evm => ../../execution/evm +) require ( github.com/ethereum/go-ethereum v1.16.8 diff --git a/apps/evm/go.sum b/apps/evm/go.sum index 00e5995e9a..49e723062b 100644 --- a/apps/evm/go.sum +++ b/apps/evm/go.sum @@ -411,12 +411,8 @@ github.com/ethereum/go-ethereum v1.16.8 h1:LLLfkZWijhR5m6yrAXbdlTeXoqontH+Ga2f9i github.com/ethereum/go-ethereum v1.16.8/go.mod h1:Fs6QebQbavneQTYcA39PEKv2+zIjX7rPUZ14DER46wk= github.com/ethereum/go-verkle v0.2.2 h1:I2W0WjnrFUIzzVPwm8ykY+7pL2d4VhlsePn4j7cnFk8= github.com/ethereum/go-verkle v0.2.2/go.mod h1:M3b90YRnzqKyyzBEWJGqj8Qff4IDeXnzFw0P9bFw3uk= -github.com/evstack/ev-node v1.0.0-rc.4 h1:Ju7pSETFdadBZxmAj0//4z7hHkXbSRDy9iTzhF60Dew= -github.com/evstack/ev-node v1.0.0-rc.4/go.mod h1:xGCH5NCdGiYk6v3GVPm4NhzAtcKQgnaVnORg8b4tbOk= github.com/evstack/ev-node/core v1.0.0-rc.1 h1:Dic2PMUMAYUl5JW6DkDj6HXDEWYzorVJQuuUJOV0FjE= github.com/evstack/ev-node/core v1.0.0-rc.1/go.mod h1:n2w/LhYQTPsi48m6lMj16YiIqsaQw6gxwjyJvR+B3sY= -github.com/evstack/ev-node/execution/evm v1.0.0-rc.3 h1:3o8H1TNywnst56lo2RlS2SXulDfp9yZJtkYYh7ZJrdM= -github.com/evstack/ev-node/execution/evm v1.0.0-rc.3/go.mod h1:VUEEklKoclg45GL7dzLoDwu3UQ4ptT3rF8bw5zUmnRk= github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM= github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= github.com/fatih/color v1.15.0/go.mod h1:0h5ZqXfHYED7Bhv2ZJamyIOUej9KtShiJESRwBDUSsw= diff --git a/apps/grpc/go.mod b/apps/grpc/go.mod index e7e57b2bfe..19611a2624 100644 --- a/apps/grpc/go.mod +++ b/apps/grpc/go.mod @@ -2,10 +2,10 @@ module github.com/evstack/ev-node/apps/grpc go 1.25.6 -//replace ( -// github.com/evstack/ev-node => ../../ -// github.com/evstack/ev-node/execution/grpc => ../../execution/grpc -//) +replace ( + github.com/evstack/ev-node => ../../ + github.com/evstack/ev-node/execution/grpc => ../../execution/grpc +) require ( github.com/evstack/ev-node v1.0.0-rc.4 diff --git a/apps/grpc/go.sum b/apps/grpc/go.sum index cd9dc009b9..13b8f9adce 100644 --- a/apps/grpc/go.sum +++ b/apps/grpc/go.sum @@ -367,12 +367,8 @@ github.com/envoyproxy/protoc-gen-validate v0.10.0/go.mod h1:DRjgyB0I43LtJapqN6Ni github.com/envoyproxy/protoc-gen-validate v0.10.1/go.mod h1:DRjgyB0I43LtJapqN6NiRwroiAU2PaFuvk/vjgh61ss= github.com/envoyproxy/protoc-gen-validate v1.0.1/go.mod h1:0vj8bNkYbSTNS2PIyH87KZaeN4x9zpL9Qt8fQC7d+vs= github.com/envoyproxy/protoc-gen-validate v1.0.2/go.mod h1:GpiZQP3dDbg4JouG/NNS7QWXpgx6x8QiMKdmN72jogE= -github.com/evstack/ev-node v1.0.0-rc.4 h1:Ju7pSETFdadBZxmAj0//4z7hHkXbSRDy9iTzhF60Dew= -github.com/evstack/ev-node v1.0.0-rc.4/go.mod h1:xGCH5NCdGiYk6v3GVPm4NhzAtcKQgnaVnORg8b4tbOk= github.com/evstack/ev-node/core v1.0.0-rc.1 h1:Dic2PMUMAYUl5JW6DkDj6HXDEWYzorVJQuuUJOV0FjE= github.com/evstack/ev-node/core v1.0.0-rc.1/go.mod h1:n2w/LhYQTPsi48m6lMj16YiIqsaQw6gxwjyJvR+B3sY= -github.com/evstack/ev-node/execution/grpc v1.0.0-rc.1 h1:OzrWLDDY6/9+LWx0XmUqPzxs/CHZRJICOwQ0Me/i6dY= -github.com/evstack/ev-node/execution/grpc v1.0.0-rc.1/go.mod h1:Pr/sF6Zx8am9ZeWFcoz1jYPs0kXmf+OmL8Tz2Gyq7E4= github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM= github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= github.com/fatih/color v1.15.0/go.mod h1:0h5ZqXfHYED7Bhv2ZJamyIOUej9KtShiJESRwBDUSsw= diff --git a/apps/testapp/go.mod b/apps/testapp/go.mod index befa3aa536..d4e7306a95 100644 --- a/apps/testapp/go.mod +++ b/apps/testapp/go.mod @@ -2,7 +2,7 @@ module github.com/evstack/ev-node/apps/testapp go 1.25.6 -//replace github.com/evstack/ev-node => ../../ +replace github.com/evstack/ev-node => ../../ require ( github.com/evstack/ev-node v1.0.0-rc.4 diff --git a/apps/testapp/go.sum b/apps/testapp/go.sum index f07cb58dc1..13b8f9adce 100644 --- a/apps/testapp/go.sum +++ b/apps/testapp/go.sum @@ -367,8 +367,6 @@ github.com/envoyproxy/protoc-gen-validate v0.10.0/go.mod h1:DRjgyB0I43LtJapqN6Ni github.com/envoyproxy/protoc-gen-validate v0.10.1/go.mod h1:DRjgyB0I43LtJapqN6NiRwroiAU2PaFuvk/vjgh61ss= github.com/envoyproxy/protoc-gen-validate v1.0.1/go.mod h1:0vj8bNkYbSTNS2PIyH87KZaeN4x9zpL9Qt8fQC7d+vs= github.com/envoyproxy/protoc-gen-validate v1.0.2/go.mod h1:GpiZQP3dDbg4JouG/NNS7QWXpgx6x8QiMKdmN72jogE= -github.com/evstack/ev-node v1.0.0-rc.4 h1:Ju7pSETFdadBZxmAj0//4z7hHkXbSRDy9iTzhF60Dew= -github.com/evstack/ev-node v1.0.0-rc.4/go.mod h1:xGCH5NCdGiYk6v3GVPm4NhzAtcKQgnaVnORg8b4tbOk= github.com/evstack/ev-node/core v1.0.0-rc.1 h1:Dic2PMUMAYUl5JW6DkDj6HXDEWYzorVJQuuUJOV0FjE= github.com/evstack/ev-node/core v1.0.0-rc.1/go.mod h1:n2w/LhYQTPsi48m6lMj16YiIqsaQw6gxwjyJvR+B3sY= github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM= From 014510be905fab25e1058a1f6c5f266e4e8cfcd6 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Tue, 10 Feb 2026 17:20:50 +0100 Subject: [PATCH 04/13] align timestamping --- pkg/sequencers/single/sequencer.go | 9 +- pkg/sequencers/single/sequencer_test.go | 206 ++++++++++++++++++++++++ 2 files changed, 212 insertions(+), 3 deletions(-) diff --git a/pkg/sequencers/single/sequencer.go b/pkg/sequencers/single/sequencer.go index 4874e77fc1..fdf21e4024 100644 --- a/pkg/sequencers/single/sequencer.go +++ b/pkg/sequencers/single/sequencer.go @@ -363,11 +363,14 @@ func (c *Sequencer) GetNextBatch(ctx context.Context, req coresequencer.GetNextB batchTxs = append(batchTxs, validMempoolTxs...) // During catch-up, use the DA epoch end timestamp to match based sequencing behavior. - // This ensures blocks produced during catch-up have timestamps consistent with - // what base sequencing nodes would have produced. + // Replicates based sequencing nodes' behavior of timestamping blocks during catchingUp. timestamp := time.Now() if c.catchingUp && !c.currentDAEndTime.IsZero() { - timestamp = c.currentDAEndTime + var remainingForcedTxs uint64 + if len(c.cachedForcedInclusionTxs) > 0 { + remainingForcedTxs = uint64(len(c.cachedForcedInclusionTxs)) - c.checkpoint.TxIndex + } + timestamp = c.currentDAEndTime.Add(-time.Duration(remainingForcedTxs) * time.Millisecond) } return &coresequencer.GetNextBatchResponse{ diff --git a/pkg/sequencers/single/sequencer_test.go b/pkg/sequencers/single/sequencer_test.go index e4dcfb1250..fc71f6bf08 100644 --- a/pkg/sequencers/single/sequencer_test.go +++ b/pkg/sequencers/single/sequencer_test.go @@ -1909,6 +1909,212 @@ func TestSequencer_CatchUp_CheckpointAdvancesDuringCatchUp(t *testing.T) { assert.Equal(t, uint64(102), seq.GetDAHeight()) } +func TestSequencer_CatchUp_MonotonicTimestamps(t *testing.T) { + // When a single DA epoch has more forced txs than fit in one block, + // catch-up must produce strictly monotonic timestamps across the + // resulting blocks. This uses the same jitter scheme as the based + // sequencer: timestamp = DAEndTime - (remainingForcedTxs * 1ms). + ctx := context.Background() + logger := zerolog.New(zerolog.NewConsoleWriter()) + + db := ds.NewMapDatastore() + defer db.Close() + + mockDA := newMockFullDAClient(t) + forcedInclusionNS := []byte("forced-inclusion") + + mockDA.MockClient.On("GetHeaderNamespace").Return([]byte("header")).Maybe() + mockDA.MockClient.On("GetDataNamespace").Return([]byte("data")).Maybe() + mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() + mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + + // DA head is far ahead — triggers catch-up + mockDA.MockClient.On("GetLatestDAHeight", mock.Anything).Return(uint64(110), nil).Once() + + // Epoch at height 100: 3 forced txs, each 100 bytes + epochTimestamp := time.Date(2025, 1, 1, 12, 0, 0, 0, time.UTC) + tx1 := make([]byte, 100) + tx2 := make([]byte, 100) + tx3 := make([]byte, 100) + copy(tx1, "forced-tx-1") + copy(tx2, "forced-tx-2") + copy(tx3, "forced-tx-3") + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(100), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: epochTimestamp}, + Data: [][]byte{tx1, tx2, tx3}, + }).Once() + + // Epoch at height 101: single tx (to verify cross-epoch monotonicity) + epoch2Timestamp := time.Date(2025, 1, 1, 12, 0, 10, 0, time.UTC) // 10 seconds later + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(101), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: epoch2Timestamp}, + Data: [][]byte{[]byte("forced-tx-4")}, + }).Once() + + // Epoch 102: future — exits catch-up + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(102), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusHeightFromFuture}, + }).Maybe() + + gen := genesis.Genesis{ + ChainID: "test-chain", + DAStartHeight: 100, + DAEpochForcedInclusion: 1, + } + + // Custom executor: only 1 tx fits per block (gas-limited) + mockExec := mocks.NewMockExecutor(t) + mockExec.On("GetExecutionInfo", mock.Anything).Return(execution.ExecutionInfo{MaxGas: 1000000}, nil).Maybe() + mockExec.On("FilterTxs", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return( + func(ctx context.Context, txs [][]byte, maxBytes, maxGas uint64, hasForceIncludedTransaction bool) []execution.FilterStatus { + result := make([]execution.FilterStatus, len(txs)) + // Only first tx fits, rest are postponed + for i := range result { + if i == 0 { + result[i] = execution.FilterOK + } else { + result[i] = execution.FilterPostpone + } + } + return result + }, + nil, + ).Maybe() + + seq, err := NewSequencer( + logger, + db, + mockDA, + config.DefaultConfig(), + []byte("test-chain"), + 1000, + gen, + mockExec, + ) + require.NoError(t, err) + + req := coresequencer.GetNextBatchRequest{ + Id: []byte("test-chain"), + MaxBytes: 1000000, + LastBatchData: nil, + } + + // Produce 3 blocks from epoch 100 (1 tx each due to gas filter) + var timestamps []time.Time + for i := 0; i < 3; i++ { + resp, err := seq.GetNextBatch(ctx, req) + require.NoError(t, err) + assert.True(t, seq.IsCatchingUp(), "should be catching up during block %d", i) + assert.Equal(t, 1, len(resp.Batch.Transactions), "block %d: exactly 1 forced tx", i) + timestamps = append(timestamps, resp.Timestamp) + } + + // All 3 timestamps must be strictly monotonically increasing + for i := 1; i < len(timestamps); i++ { + assert.True(t, timestamps[i].After(timestamps[i-1]), + "timestamp[%d] (%v) must be strictly after timestamp[%d] (%v)", + i, timestamps[i], i-1, timestamps[i-1]) + } + + // Verify exact jitter values: + // Block 0: 3 txs total, 1 consumed → 2 remaining → T - 2ms + // Block 1: 1 consumed → 1 remaining → T - 1ms + // Block 2: 1 consumed → 0 remaining → T + assert.Equal(t, epochTimestamp.Add(-2*time.Millisecond), timestamps[0], "block 0: T - 2ms") + assert.Equal(t, epochTimestamp.Add(-1*time.Millisecond), timestamps[1], "block 1: T - 1ms") + assert.Equal(t, epochTimestamp, timestamps[2], "block 2: T (exact epoch end time)") + + // Block from epoch 101 should also be monotonically after epoch 100's last block + resp4, err := seq.GetNextBatch(ctx, req) + require.NoError(t, err) + assert.True(t, seq.IsCatchingUp(), "should still be catching up") + assert.Equal(t, 1, len(resp4.Batch.Transactions)) + assert.True(t, resp4.Timestamp.After(timestamps[2]), + "epoch 101 timestamp (%v) must be after epoch 100 last timestamp (%v)", + resp4.Timestamp, timestamps[2]) + assert.Equal(t, epoch2Timestamp, resp4.Timestamp, "single-tx epoch gets exact DA end time") +} + +func TestSequencer_CatchUp_MonotonicTimestamps_EmptyEpoch(t *testing.T) { + // Verify that an empty DA epoch (no forced txs) still advances the + // checkpoint and updates currentDAEndTime so subsequent epochs get + // correct timestamps. + ctx := context.Background() + + db := ds.NewMapDatastore() + defer db.Close() + + mockDA := newMockFullDAClient(t) + forcedInclusionNS := []byte("forced-inclusion") + + mockDA.MockClient.On("GetHeaderNamespace").Return([]byte("header")).Maybe() + mockDA.MockClient.On("GetDataNamespace").Return([]byte("data")).Maybe() + mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() + mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + + mockDA.MockClient.On("GetLatestDAHeight", mock.Anything).Return(uint64(110), nil).Once() + + // Epoch 100: empty (no forced txs) but valid timestamp + emptyEpochTimestamp := time.Date(2025, 1, 1, 12, 0, 0, 0, time.UTC) + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(100), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: emptyEpochTimestamp}, + Data: [][]byte{}, + }).Once() + + // Epoch 101: has a forced tx with a later timestamp + epoch2Timestamp := time.Date(2025, 1, 1, 12, 0, 15, 0, time.UTC) + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(101), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: epoch2Timestamp}, + Data: [][]byte{[]byte("forced-tx-after-empty")}, + }).Once() + + // Epoch 102: future + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(102), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusHeightFromFuture}, + }).Maybe() + + gen := genesis.Genesis{ + ChainID: "test-chain", + DAStartHeight: 100, + DAEpochForcedInclusion: 1, + } + + seq, err := NewSequencer( + zerolog.Nop(), + db, + mockDA, + config.DefaultConfig(), + []byte("test-chain"), + 1000, + gen, + createDefaultMockExecutor(t), + ) + require.NoError(t, err) + + req := coresequencer.GetNextBatchRequest{ + Id: []byte("test-chain"), + MaxBytes: 1000000, + LastBatchData: nil, + } + + // First call processes the empty epoch 100 — empty batch, but checkpoint advances + resp1, err := seq.GetNextBatch(ctx, req) + require.NoError(t, err) + assert.True(t, seq.IsCatchingUp()) + assert.Equal(t, 0, len(resp1.Batch.Transactions), "empty epoch should produce empty batch") + assert.Equal(t, emptyEpochTimestamp, resp1.Timestamp, + "empty epoch batch should use epoch DA end time (0 remaining)") + + // Second call processes epoch 101 — should have later timestamp + resp2, err := seq.GetNextBatch(ctx, req) + require.NoError(t, err) + assert.True(t, seq.IsCatchingUp()) + assert.Equal(t, 1, len(resp2.Batch.Transactions)) + assert.True(t, resp2.Timestamp.After(resp1.Timestamp), + "epoch 101 timestamp (%v) must be after empty epoch 100 timestamp (%v)", + resp2.Timestamp, resp1.Timestamp) +} + func TestSequencer_GetNextBatch_GasFilteringPreservesUnprocessedTxs(t *testing.T) { db := ds.NewMapDatastore() logger := zerolog.New(zerolog.NewTestWriter(t)) From a30ea6885a63299e3d3b48e43841e86f54c5bd97 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Wed, 11 Feb 2026 13:28:29 +0100 Subject: [PATCH 05/13] updates --- pkg/sequencers/single/sequencer.go | 36 ++++++++++++++----------- pkg/sequencers/single/sequencer_test.go | 18 ++++++------- 2 files changed, 29 insertions(+), 25 deletions(-) diff --git a/pkg/sequencers/single/sequencer.go b/pkg/sequencers/single/sequencer.go index fdf21e4024..27a36c71ad 100644 --- a/pkg/sequencers/single/sequencer.go +++ b/pkg/sequencers/single/sequencer.go @@ -63,7 +63,8 @@ type Sequencer struct { // It is set when we detect (via GetLatestDAHeight) that the DA layer is more // than one epoch ahead of our checkpoint, and cleared when we hit // ErrHeightFromFuture (meaning we've reached the DA head). - catchingUp bool + + catchingUp atomic.Bool // currentDAEndTime is the DA epoch end timestamp from the last fetched epoch. // Used as the block timestamp during catch-up to match based sequencing behavior. currentDAEndTime time.Time @@ -235,7 +236,7 @@ func (c *Sequencer) GetNextBatch(ctx context.Context, req coresequencer.GetNextB // During catch-up, the sequencer must produce blocks identical to what base // sequencing would produce (forced inclusion txs only, no mempool). var mempoolBatch *coresequencer.Batch - if !c.catchingUp { + if !c.catchingUp.Load() { var err error mempoolBatch, err = c.queue.Next(ctx) if err != nil { @@ -353,7 +354,7 @@ func (c *Sequencer) GetNextBatch(ctx context.Context, req coresequencer.GetNextB Uint64("consumed_count", forcedTxConsumedCount). Uint64("checkpoint_tx_index", c.checkpoint.TxIndex). Uint64("checkpoint_da_height", c.checkpoint.DAHeight). - Bool("catching_up", c.catchingUp). + Bool("catching_up", c.catchingUp.Load()). Msg("updated checkpoint after processing forced inclusion transactions") } @@ -365,12 +366,15 @@ func (c *Sequencer) GetNextBatch(ctx context.Context, req coresequencer.GetNextB // During catch-up, use the DA epoch end timestamp to match based sequencing behavior. // Replicates based sequencing nodes' behavior of timestamping blocks during catchingUp. timestamp := time.Now() - if c.catchingUp && !c.currentDAEndTime.IsZero() { - var remainingForcedTxs uint64 - if len(c.cachedForcedInclusionTxs) > 0 { - remainingForcedTxs = uint64(len(c.cachedForcedInclusionTxs)) - c.checkpoint.TxIndex + if c.catchingUp.Load() { + daEndTime := c.currentDAEndTime + if !daEndTime.IsZero() { + var remainingForcedTxs uint64 + if len(c.cachedForcedInclusionTxs) > 0 { + remainingForcedTxs = uint64(len(c.cachedForcedInclusionTxs)) - c.checkpoint.TxIndex + } + timestamp = daEndTime.Add(-time.Duration(remainingForcedTxs) * time.Millisecond) } - timestamp = c.currentDAEndTime.Add(-time.Duration(remainingForcedTxs) * time.Millisecond) } return &coresequencer.GetNextBatchResponse{ @@ -426,7 +430,7 @@ func (c *Sequencer) GetDAHeight() uint64 { // with only forced inclusion transactions (no mempool), matching the blocks // that base sequencing nodes would have produced during sequencer downtime. func (c *Sequencer) IsCatchingUp() bool { - return c.catchingUp + return c.catchingUp.Load() } // fetchNextDAEpoch fetches transactions from the next DA epoch using checkpoint. @@ -447,7 +451,7 @@ func (c *Sequencer) fetchNextDAEpoch(ctx context.Context, maxBytes uint64) (uint c.logger.Debug(). Uint64("da_height", currentDAHeight). Uint64("tx_index", c.checkpoint.TxIndex). - Bool("catching_up", c.catchingUp). + Bool("catching_up", c.catchingUp.Load()). Msg("fetching forced inclusion transactions from DA") forcedTxsEvent, err := c.fiRetriever.RetrieveForcedIncludedTxs(ctx, currentDAHeight) @@ -458,18 +462,18 @@ func (c *Sequencer) fetchNextDAEpoch(ctx context.Context, maxBytes uint64) (uint Msg("DA height from future, waiting for DA to produce block") // We've reached the DA head — exit catch-up mode - if c.catchingUp { + if c.catchingUp.Load() { c.logger.Info(). Uint64("da_height", currentDAHeight). Msg("catch-up complete: reached DA head, resuming normal sequencing") - c.catchingUp = false + c.catchingUp.Store(false) } return 0, nil } else if errors.Is(err, block.ErrForceInclusionNotConfigured) { // Forced inclusion not configured, continue without forced txs c.cachedForcedInclusionTxs = [][]byte{} - c.catchingUp = false + c.catchingUp.Store(false) return 0, nil } @@ -502,7 +506,7 @@ func (c *Sequencer) fetchNextDAEpoch(ctx context.Context, maxBytes uint64) (uint Int("skipped_tx_count", skippedTxs). Uint64("da_height_start", forcedTxsEvent.StartDaHeight). Uint64("da_height_end", forcedTxsEvent.EndDaHeight). - Bool("catching_up", c.catchingUp). + Bool("catching_up", c.catchingUp.Load()). Msg("fetched forced inclusion transactions from DA") // Cache the transactions @@ -528,7 +532,7 @@ func (c *Sequencer) fetchNextDAEpoch(ctx context.Context, maxBytes uint64) (uint // overhead. func (c *Sequencer) updateCatchUpState(ctx context.Context) { // Already catching up — nothing to do. We'll exit via ErrHeightFromFuture. - if c.catchingUp { + if c.catchingUp.Load() { return } @@ -574,7 +578,7 @@ func (c *Sequencer) updateCatchUpState(ctx context.Context) { } // The DA layer is more than one epoch ahead. Enter catch-up mode. - c.catchingUp = true + c.catchingUp.Store(true) c.logger.Warn(). Uint64("checkpoint_da_height", currentDAHeight). Uint64("latest_da_height", latestDAHeight). diff --git a/pkg/sequencers/single/sequencer_test.go b/pkg/sequencers/single/sequencer_test.go index fc71f6bf08..f3ac6a01c0 100644 --- a/pkg/sequencers/single/sequencer_test.go +++ b/pkg/sequencers/single/sequencer_test.go @@ -365,7 +365,7 @@ func TestSequencer_GetNextBatch_BeforeDASubmission(t *testing.T) { func TestSequencer_GetNextBatch_ForcedInclusionAndBatch_MaxBytes(t *testing.T) { ctx := context.Background() - logger := zerolog.New(zerolog.NewConsoleWriter()) + logger := zerolog.New(zerolog.NewTestWriter(t)) // Create in-memory datastore db := ds.NewMapDatastore() @@ -458,7 +458,7 @@ func TestSequencer_GetNextBatch_ForcedInclusionAndBatch_MaxBytes(t *testing.T) { func TestSequencer_GetNextBatch_ForcedInclusion_ExceedsMaxBytes(t *testing.T) { ctx := context.Background() - logger := zerolog.New(zerolog.NewConsoleWriter()) + logger := zerolog.New(zerolog.NewTestWriter(t)) db := ds.NewMapDatastore() defer db.Close() @@ -541,7 +541,7 @@ func TestSequencer_GetNextBatch_ForcedInclusion_ExceedsMaxBytes(t *testing.T) { func TestSequencer_GetNextBatch_AlwaysCheckPendingForcedInclusion(t *testing.T) { ctx := context.Background() - logger := zerolog.New(zerolog.NewConsoleWriter()) + logger := zerolog.New(zerolog.NewTestWriter(t)) db := ds.NewMapDatastore() defer db.Close() @@ -882,7 +882,7 @@ func TestSequencer_DAFailureAndQueueThrottling_Integration(t *testing.T) { func TestSequencer_CheckpointPersistence_CrashRecovery(t *testing.T) { ctx := context.Background() - logger := zerolog.New(zerolog.NewConsoleWriter()) + logger := zerolog.New(zerolog.NewTestWriter(t)) db := ds.NewMapDatastore() defer db.Close() @@ -1242,7 +1242,7 @@ func TestSequencer_GetNextBatch_GasFilterError(t *testing.T) { // This test uses maxBytes to limit how many txs are fetched, triggering the unprocessed txs scenario. func TestSequencer_CatchUp_DetectsOldEpoch(t *testing.T) { ctx := context.Background() - logger := zerolog.New(zerolog.NewConsoleWriter()) + logger := zerolog.New(zerolog.NewTestWriter(t)) db := ds.NewMapDatastore() defer db.Close() @@ -1312,7 +1312,7 @@ func TestSequencer_CatchUp_DetectsOldEpoch(t *testing.T) { func TestSequencer_CatchUp_SkipsMempoolDuringCatchUp(t *testing.T) { ctx := context.Background() - logger := zerolog.New(zerolog.NewConsoleWriter()) + logger := zerolog.New(zerolog.NewTestWriter(t)) db := ds.NewMapDatastore() defer db.Close() @@ -1463,7 +1463,7 @@ func TestSequencer_CatchUp_UsesDATimestamp(t *testing.T) { func TestSequencer_CatchUp_ExitsCatchUpAtDAHead(t *testing.T) { ctx := context.Background() - logger := zerolog.New(zerolog.NewConsoleWriter()) + logger := zerolog.New(zerolog.NewTestWriter(t)) db := ds.NewMapDatastore() defer db.Close() @@ -1676,7 +1676,7 @@ func TestSequencer_CatchUp_MultiEpochReplay(t *testing.T) { // Simulates a sequencer that missed 3 DA epochs and must replay them all // before resuming normal operation. ctx := context.Background() - logger := zerolog.New(zerolog.NewConsoleWriter()) + logger := zerolog.New(zerolog.NewTestWriter(t)) db := ds.NewMapDatastore() defer db.Close() @@ -1915,7 +1915,7 @@ func TestSequencer_CatchUp_MonotonicTimestamps(t *testing.T) { // resulting blocks. This uses the same jitter scheme as the based // sequencer: timestamp = DAEndTime - (remainingForcedTxs * 1ms). ctx := context.Background() - logger := zerolog.New(zerolog.NewConsoleWriter()) + logger := zerolog.New(zerolog.NewTestWriter(t)) db := ds.NewMapDatastore() defer db.Close() From 97f055fec5ea15b6f9e71751c50d9f3eb2874b4f Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Mon, 16 Feb 2026 17:50:06 +0100 Subject: [PATCH 06/13] updates --- apps/evm/go.mod | 1 + apps/evm/go.sum | 2 -- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/apps/evm/go.mod b/apps/evm/go.mod index 315d466934..24c089d866 100644 --- a/apps/evm/go.mod +++ b/apps/evm/go.mod @@ -4,6 +4,7 @@ go 1.25.6 replace ( github.com/evstack/ev-node => ../../ + github.com/evstack/ev-node/core => ../../core github.com/evstack/ev-node/execution/evm => ../../execution/evm ) diff --git a/apps/evm/go.sum b/apps/evm/go.sum index f3867c1bfb..499ccdec59 100644 --- a/apps/evm/go.sum +++ b/apps/evm/go.sum @@ -411,8 +411,6 @@ github.com/ethereum/go-ethereum v1.16.8 h1:LLLfkZWijhR5m6yrAXbdlTeXoqontH+Ga2f9i github.com/ethereum/go-ethereum v1.16.8/go.mod h1:Fs6QebQbavneQTYcA39PEKv2+zIjX7rPUZ14DER46wk= github.com/ethereum/go-verkle v0.2.2 h1:I2W0WjnrFUIzzVPwm8ykY+7pL2d4VhlsePn4j7cnFk8= github.com/ethereum/go-verkle v0.2.2/go.mod h1:M3b90YRnzqKyyzBEWJGqj8Qff4IDeXnzFw0P9bFw3uk= -github.com/evstack/ev-node/core v1.0.0-rc.1 h1:Dic2PMUMAYUl5JW6DkDj6HXDEWYzorVJQuuUJOV0FjE= -github.com/evstack/ev-node/core v1.0.0-rc.1/go.mod h1:n2w/LhYQTPsi48m6lMj16YiIqsaQw6gxwjyJvR+B3sY= github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM= github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= github.com/fatih/color v1.15.0/go.mod h1:0h5ZqXfHYED7Bhv2ZJamyIOUej9KtShiJESRwBDUSsw= From 9288b292df1e83b14f72ad6431dbb5b1753b0190 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Mon, 16 Feb 2026 18:09:17 +0100 Subject: [PATCH 07/13] fixes --- pkg/sequencers/single/sequencer.go | 68 ++++++++++++------------- pkg/sequencers/single/sequencer_test.go | 34 ++++++------- 2 files changed, 51 insertions(+), 51 deletions(-) diff --git a/pkg/sequencers/single/sequencer.go b/pkg/sequencers/single/sequencer.go index 27a36c71ad..ada324e127 100644 --- a/pkg/sequencers/single/sequencer.go +++ b/pkg/sequencers/single/sequencer.go @@ -27,6 +27,14 @@ import ( // ErrInvalidId is returned when the chain id is invalid var ErrInvalidId = errors.New("invalid chain id") +// Catch-up state machine. The sequencer transitions through these once per +// lifecycle: unchecked → (inProgress | done). +const ( + catchUpUnchecked int32 = iota // haven't checked DA height yet + catchUpInProgress // actively replaying missed DA epochs + catchUpDone // caught up or was never behind +) + var _ coresequencer.Sequencer = (*Sequencer)(nil) // Sequencer implements core sequencing interface @@ -53,18 +61,11 @@ type Sequencer struct { // Cached forced inclusion transactions from the current epoch cachedForcedInclusionTxs [][]byte - // Catch-up state: when the sequencer restarts after being down for more than - // one DA epoch, it must replay missed epochs (producing blocks with only forced - // inclusion transactions, no mempool) before resuming normal sequencing. - // This ensures the sequencer produces the same blocks that nodes running in - // base sequencing mode would have produced during the downtime. - // - // catchingUp is true when the sequencer is replaying missed DA epochs. - // It is set when we detect (via GetLatestDAHeight) that the DA layer is more - // than one epoch ahead of our checkpoint, and cleared when we hit - // ErrHeightFromFuture (meaning we've reached the DA head). - - catchingUp atomic.Bool + // catchUpState tracks the catch-up lifecycle (see constants above). + // Checked once on the first epoch fetch via updateCatchUpState. If catch-up + // is needed, transitions to catchUpInProgress until ErrHeightFromFuture + // moves it to catchUpDone. + catchUpState atomic.Int32 // currentDAEndTime is the DA epoch end timestamp from the last fetched epoch. // Used as the block timestamp during catch-up to match based sequencing behavior. currentDAEndTime time.Time @@ -236,7 +237,7 @@ func (c *Sequencer) GetNextBatch(ctx context.Context, req coresequencer.GetNextB // During catch-up, the sequencer must produce blocks identical to what base // sequencing would produce (forced inclusion txs only, no mempool). var mempoolBatch *coresequencer.Batch - if !c.catchingUp.Load() { + if c.catchUpState.Load() != catchUpInProgress { var err error mempoolBatch, err = c.queue.Next(ctx) if err != nil { @@ -354,7 +355,7 @@ func (c *Sequencer) GetNextBatch(ctx context.Context, req coresequencer.GetNextB Uint64("consumed_count", forcedTxConsumedCount). Uint64("checkpoint_tx_index", c.checkpoint.TxIndex). Uint64("checkpoint_da_height", c.checkpoint.DAHeight). - Bool("catching_up", c.catchingUp.Load()). + Bool("catching_up", c.catchUpState.Load() == catchUpInProgress). Msg("updated checkpoint after processing forced inclusion transactions") } @@ -364,9 +365,8 @@ func (c *Sequencer) GetNextBatch(ctx context.Context, req coresequencer.GetNextB batchTxs = append(batchTxs, validMempoolTxs...) // During catch-up, use the DA epoch end timestamp to match based sequencing behavior. - // Replicates based sequencing nodes' behavior of timestamping blocks during catchingUp. timestamp := time.Now() - if c.catchingUp.Load() { + if c.catchUpState.Load() == catchUpInProgress { daEndTime := c.currentDAEndTime if !daEndTime.IsZero() { var remainingForcedTxs uint64 @@ -425,12 +425,9 @@ func (c *Sequencer) GetDAHeight() uint64 { return c.daHeight.Load() } -// IsCatchingUp returns whether the sequencer is in catch-up mode. -// During catch-up, the sequencer replays missed DA epochs producing blocks -// with only forced inclusion transactions (no mempool), matching the blocks -// that base sequencing nodes would have produced during sequencer downtime. -func (c *Sequencer) IsCatchingUp() bool { - return c.catchingUp.Load() +// isCatchingUp returns whether the sequencer is in catch-up mode. +func (c *Sequencer) isCatchingUp() bool { + return c.catchUpState.Load() == catchUpInProgress } // fetchNextDAEpoch fetches transactions from the next DA epoch using checkpoint. @@ -451,7 +448,7 @@ func (c *Sequencer) fetchNextDAEpoch(ctx context.Context, maxBytes uint64) (uint c.logger.Debug(). Uint64("da_height", currentDAHeight). Uint64("tx_index", c.checkpoint.TxIndex). - Bool("catching_up", c.catchingUp.Load()). + Bool("catching_up", c.catchUpState.Load() == catchUpInProgress). Msg("fetching forced inclusion transactions from DA") forcedTxsEvent, err := c.fiRetriever.RetrieveForcedIncludedTxs(ctx, currentDAHeight) @@ -462,18 +459,18 @@ func (c *Sequencer) fetchNextDAEpoch(ctx context.Context, maxBytes uint64) (uint Msg("DA height from future, waiting for DA to produce block") // We've reached the DA head — exit catch-up mode - if c.catchingUp.Load() { + if c.catchUpState.Load() == catchUpInProgress { c.logger.Info(). Uint64("da_height", currentDAHeight). Msg("catch-up complete: reached DA head, resuming normal sequencing") - c.catchingUp.Store(false) + c.catchUpState.Store(catchUpDone) } return 0, nil } else if errors.Is(err, block.ErrForceInclusionNotConfigured) { // Forced inclusion not configured, continue without forced txs c.cachedForcedInclusionTxs = [][]byte{} - c.catchingUp.Store(false) + c.catchUpState.Store(catchUpDone) return 0, nil } @@ -506,7 +503,7 @@ func (c *Sequencer) fetchNextDAEpoch(ctx context.Context, maxBytes uint64) (uint Int("skipped_tx_count", skippedTxs). Uint64("da_height_start", forcedTxsEvent.StartDaHeight). Uint64("da_height_end", forcedTxsEvent.EndDaHeight). - Bool("catching_up", c.catchingUp.Load()). + Bool("catching_up", c.catchUpState.Load() == catchUpInProgress). Msg("fetched forced inclusion transactions from DA") // Cache the transactions @@ -526,15 +523,18 @@ func (c *Sequencer) fetchNextDAEpoch(ctx context.Context, maxBytes uint64) (uint // in catch-up until fetchNextDAEpoch hits ErrHeightFromFuture, meaning we've // reached the DA head. // -// This check is performed only once per sequencer lifecycle. If the downtime was -// short enough that the sequencer is still within the current or next epoch, no -// catch-up is needed and the (lightweight) GetLatestDAHeight call is the only -// overhead. +// This check runs exactly once per sequencer lifecycle, enforced by the +// catchUpState field: any state other than catchUpUnchecked causes an immediate +// return. If the downtime was short enough that the sequencer is still within +// the current or next epoch, no catch-up is needed and the single +// GetLatestDAHeight call is the only overhead. func (c *Sequencer) updateCatchUpState(ctx context.Context) { - // Already catching up — nothing to do. We'll exit via ErrHeightFromFuture. - if c.catchingUp.Load() { + if c.catchUpState.Load() != catchUpUnchecked { return } + // Optimistically mark as done; overridden to catchUpInProgress below if + // catch-up is actually needed. + c.catchUpState.Store(catchUpDone) epochSize := c.genesis.DAEpochForcedInclusion if epochSize == 0 { @@ -578,7 +578,7 @@ func (c *Sequencer) updateCatchUpState(ctx context.Context) { } // The DA layer is more than one epoch ahead. Enter catch-up mode. - c.catchingUp.Store(true) + c.catchUpState.Store(catchUpInProgress) c.logger.Warn(). Uint64("checkpoint_da_height", currentDAHeight). Uint64("latest_da_height", latestDAHeight). diff --git a/pkg/sequencers/single/sequencer_test.go b/pkg/sequencers/single/sequencer_test.go index f3ac6a01c0..2f479dedd4 100644 --- a/pkg/sequencers/single/sequencer_test.go +++ b/pkg/sequencers/single/sequencer_test.go @@ -1291,7 +1291,7 @@ func TestSequencer_CatchUp_DetectsOldEpoch(t *testing.T) { }) require.NoError(t, err) - assert.False(t, seq.IsCatchingUp(), "should not be catching up initially") + assert.False(t, seq.isCatchingUp(), "should not be catching up initially") // First GetNextBatch — DA head is far ahead, should enter catch-up req := coresequencer.GetNextBatchRequest{ @@ -1303,7 +1303,7 @@ func TestSequencer_CatchUp_DetectsOldEpoch(t *testing.T) { require.NoError(t, err) require.NotNil(t, resp.Batch) - assert.True(t, seq.IsCatchingUp(), "should be catching up after detecting epoch gap") + assert.True(t, seq.isCatchingUp(), "should be catching up after detecting epoch gap") // During catch-up, batch should contain only forced inclusion tx, no mempool tx assert.Equal(t, 1, len(resp.Batch.Transactions), "should have only forced inclusion tx during catch-up") @@ -1385,7 +1385,7 @@ func TestSequencer_CatchUp_SkipsMempoolDuringCatchUp(t *testing.T) { // First batch (epoch 100): only forced txs resp1, err := seq.GetNextBatch(ctx, req) require.NoError(t, err) - assert.True(t, seq.IsCatchingUp()) + assert.True(t, seq.isCatchingUp()) for _, tx := range resp1.Batch.Transactions { assert.NotEqual(t, []byte("mempool-tx"), tx, "mempool tx should not appear during catch-up") @@ -1395,7 +1395,7 @@ func TestSequencer_CatchUp_SkipsMempoolDuringCatchUp(t *testing.T) { // Second batch (epoch 101): only forced txs resp2, err := seq.GetNextBatch(ctx, req) require.NoError(t, err) - assert.True(t, seq.IsCatchingUp()) + assert.True(t, seq.isCatchingUp()) for _, tx := range resp2.Batch.Transactions { assert.NotEqual(t, []byte("mempool-tx"), tx, "mempool tx should not appear during catch-up") @@ -1454,7 +1454,7 @@ func TestSequencer_CatchUp_UsesDATimestamp(t *testing.T) { resp, err := seq.GetNextBatch(ctx, req) require.NoError(t, err) require.NotNil(t, resp) - assert.True(t, seq.IsCatchingUp(), "should be in catch-up mode") + assert.True(t, seq.isCatchingUp(), "should be in catch-up mode") // During catch-up, the timestamp should be the DA epoch end time, not time.Now() assert.Equal(t, epochTimestamp, resp.Timestamp, @@ -1524,14 +1524,14 @@ func TestSequencer_CatchUp_ExitsCatchUpAtDAHead(t *testing.T) { // First batch: catch-up (old epoch 100) resp1, err := seq.GetNextBatch(ctx, req) require.NoError(t, err) - assert.True(t, seq.IsCatchingUp(), "should be catching up during old epoch") + assert.True(t, seq.isCatchingUp(), "should be catching up during old epoch") assert.Equal(t, 1, len(resp1.Batch.Transactions), "catch-up: only forced tx") assert.Equal(t, []byte("forced-old"), resp1.Batch.Transactions[0]) // Second batch: epoch 101 returns HeightFromFuture — should exit catch-up resp2, err := seq.GetNextBatch(ctx, req) require.NoError(t, err) - assert.False(t, seq.IsCatchingUp(), "should have exited catch-up after reaching DA head") + assert.False(t, seq.isCatchingUp(), "should have exited catch-up after reaching DA head") // Should include mempool tx now (no forced txs available) hasMempoolTx := false @@ -1598,13 +1598,13 @@ func TestSequencer_CatchUp_HeightFromFutureExitsCatchUp(t *testing.T) { // First call: fetches epoch 100, enters catch-up via epoch gap detection resp1, err := seq.GetNextBatch(ctx, req) require.NoError(t, err) - assert.True(t, seq.IsCatchingUp()) + assert.True(t, seq.isCatchingUp()) assert.Equal(t, 1, len(resp1.Batch.Transactions)) // Second call: epoch 101 is from the future, should exit catch-up resp2, err := seq.GetNextBatch(ctx, req) require.NoError(t, err) - assert.False(t, seq.IsCatchingUp(), "should exit catch-up when DA returns HeightFromFuture") + assert.False(t, seq.isCatchingUp(), "should exit catch-up when DA returns HeightFromFuture") // No forced txs available, batch is empty assert.Equal(t, 0, len(resp2.Batch.Transactions)) } @@ -1666,7 +1666,7 @@ func TestSequencer_CatchUp_NoCatchUpWhenRecentEpoch(t *testing.T) { resp, err := seq.GetNextBatch(ctx, req) require.NoError(t, err) - assert.False(t, seq.IsCatchingUp(), "should NOT be catching up when within one epoch of DA head") + assert.False(t, seq.isCatchingUp(), "should NOT be catching up when within one epoch of DA head") // Should have both forced and mempool txs (normal operation) assert.Equal(t, 2, len(resp.Batch.Transactions), "should have forced + mempool tx in normal mode") @@ -1744,7 +1744,7 @@ func TestSequencer_CatchUp_MultiEpochReplay(t *testing.T) { for i := 0; i < 3; i++ { resp, err := seq.GetNextBatch(ctx, req) require.NoError(t, err) - assert.True(t, seq.IsCatchingUp(), "should be catching up during epoch %d", 100+i) + assert.True(t, seq.isCatchingUp(), "should be catching up during epoch %d", 100+i) assert.Equal(t, 1, len(resp.Batch.Transactions), "epoch %d: should have exactly 1 forced tx", 100+i) @@ -1760,7 +1760,7 @@ func TestSequencer_CatchUp_MultiEpochReplay(t *testing.T) { // Next batch: epoch 103 returns HeightFromFuture — should exit catch-up and include mempool resp4, err := seq.GetNextBatch(ctx, req) require.NoError(t, err) - assert.False(t, seq.IsCatchingUp(), "should have exited catch-up at DA head") + assert.False(t, seq.isCatchingUp(), "should have exited catch-up at DA head") hasMempoolTx := false for _, tx := range resp4.Batch.Transactions { @@ -1820,7 +1820,7 @@ func TestSequencer_CatchUp_NoForcedInclusionConfigured(t *testing.T) { resp, err := seq.GetNextBatch(ctx, req) require.NoError(t, err) - assert.False(t, seq.IsCatchingUp(), "should never catch up when forced inclusion not configured") + assert.False(t, seq.isCatchingUp(), "should never catch up when forced inclusion not configured") assert.Equal(t, 1, len(resp.Batch.Transactions)) assert.Equal(t, []byte("mempool-tx"), resp.Batch.Transactions[0]) } @@ -2004,7 +2004,7 @@ func TestSequencer_CatchUp_MonotonicTimestamps(t *testing.T) { for i := 0; i < 3; i++ { resp, err := seq.GetNextBatch(ctx, req) require.NoError(t, err) - assert.True(t, seq.IsCatchingUp(), "should be catching up during block %d", i) + assert.True(t, seq.isCatchingUp(), "should be catching up during block %d", i) assert.Equal(t, 1, len(resp.Batch.Transactions), "block %d: exactly 1 forced tx", i) timestamps = append(timestamps, resp.Timestamp) } @@ -2027,7 +2027,7 @@ func TestSequencer_CatchUp_MonotonicTimestamps(t *testing.T) { // Block from epoch 101 should also be monotonically after epoch 100's last block resp4, err := seq.GetNextBatch(ctx, req) require.NoError(t, err) - assert.True(t, seq.IsCatchingUp(), "should still be catching up") + assert.True(t, seq.isCatchingUp(), "should still be catching up") assert.Equal(t, 1, len(resp4.Batch.Transactions)) assert.True(t, resp4.Timestamp.After(timestamps[2]), "epoch 101 timestamp (%v) must be after epoch 100 last timestamp (%v)", @@ -2100,7 +2100,7 @@ func TestSequencer_CatchUp_MonotonicTimestamps_EmptyEpoch(t *testing.T) { // First call processes the empty epoch 100 — empty batch, but checkpoint advances resp1, err := seq.GetNextBatch(ctx, req) require.NoError(t, err) - assert.True(t, seq.IsCatchingUp()) + assert.True(t, seq.isCatchingUp()) assert.Equal(t, 0, len(resp1.Batch.Transactions), "empty epoch should produce empty batch") assert.Equal(t, emptyEpochTimestamp, resp1.Timestamp, "empty epoch batch should use epoch DA end time (0 remaining)") @@ -2108,7 +2108,7 @@ func TestSequencer_CatchUp_MonotonicTimestamps_EmptyEpoch(t *testing.T) { // Second call processes epoch 101 — should have later timestamp resp2, err := seq.GetNextBatch(ctx, req) require.NoError(t, err) - assert.True(t, seq.IsCatchingUp()) + assert.True(t, seq.isCatchingUp()) assert.Equal(t, 1, len(resp2.Batch.Transactions)) assert.True(t, resp2.Timestamp.After(resp1.Timestamp), "epoch 101 timestamp (%v) must be after empty epoch 100 timestamp (%v)", From bef0bef06d3ad579e4b3253ace545c887f79b3c8 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Tue, 17 Feb 2026 22:42:41 +0100 Subject: [PATCH 08/13] rm test --- scripts/test-catchup.sh | 393 ---------------------------------------- 1 file changed, 393 deletions(-) delete mode 100755 scripts/test-catchup.sh diff --git a/scripts/test-catchup.sh b/scripts/test-catchup.sh deleted file mode 100755 index 153aeece81..0000000000 --- a/scripts/test-catchup.sh +++ /dev/null @@ -1,393 +0,0 @@ -#!/usr/bin/env bash -# -# test-catchup.sh — Automated test suite for PR #3057 (sequencer catch-up from base) -# -# This script exercises all code paths introduced by the catch-up feature: -# 1. Sequencer catch-up unit tests (detection, mempool skipping, timestamps, exit, etc.) -# 2. DA client interface changes (GetLatestDAHeight, tracing) -# 3. Syncer DA height advancement logic (epoch-based stepping) -# 4. Mock/testda updates -# 5. Force inclusion e2e tests (exercises catch-up path end-to-end) -# -# Usage: -# ./scripts/test-catchup.sh # Run all test stages -# ./scripts/test-catchup.sh --unit # Run only unit tests -# ./scripts/test-catchup.sh --e2e # Run only e2e tests (requires building binaries) -# ./scripts/test-catchup.sh --verbose # Verbose output (-v flag to go test) -# ./scripts/test-catchup.sh --race # Enable race detector -# -# Exit codes: -# 0 All tests passed -# 1 One or more test stages failed - -set -euo pipefail - -# --- Configuration ----------------------------------------------------------- - -REPO_ROOT="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)" -cd "$REPO_ROOT" - -RED='\033[0;31m' -GREEN='\033[0;32m' -YELLOW='\033[1;33m' -CYAN='\033[0;36m' -BOLD='\033[1m' -RESET='\033[0m' - -VERBOSE="" -RACE="" -RUN_UNIT=true -RUN_E2E=false -FAILURES=0 -PASSED=0 -SKIPPED=0 -STAGE_RESULTS=() - -# --- Argument parsing --------------------------------------------------------- - -for arg in "$@"; do - case "$arg" in - --unit) - RUN_UNIT=true - RUN_E2E=false - ;; - --e2e) - RUN_UNIT=false - RUN_E2E=true - ;; - --all) - RUN_UNIT=true - RUN_E2E=true - ;; - --verbose|-v) - VERBOSE="-v" - ;; - --race) - RACE="-race" - ;; - --help|-h) - echo "Usage: $0 [--unit|--e2e|--all] [--verbose|-v] [--race]" - echo "" - echo "Runs the test suite for the sequencer catch-up feature (PR #3057)." - echo "" - echo "Options:" - echo " --unit Run only unit/integration tests (default)" - echo " --e2e Run only e2e tests (builds binaries first)" - echo " --all Run both unit and e2e tests" - echo " --verbose Pass -v to go test" - echo " --race Enable Go race detector" - echo " --help Show this help message" - exit 0 - ;; - *) - echo "Unknown argument: $arg (use --help for usage)" - exit 1 - ;; - esac -done - -# --- Helpers ------------------------------------------------------------------ - -log_header() { - echo "" - echo -e "${CYAN}${BOLD}========================================${RESET}" - echo -e "${CYAN}${BOLD} $1${RESET}" - echo -e "${CYAN}${BOLD}========================================${RESET}" -} - -log_stage() { - echo "" - echo -e "${YELLOW}--- Stage: $1${RESET}" -} - -log_pass() { - echo -e "${GREEN} PASS${RESET} $1" - PASSED=$((PASSED + 1)) - STAGE_RESULTS+=("PASS: $1") -} - -log_fail() { - echo -e "${RED} FAIL${RESET} $1" - FAILURES=$((FAILURES + 1)) - STAGE_RESULTS+=("FAIL: $1") -} - -log_skip() { - echo -e "${YELLOW} SKIP${RESET} $1" - SKIPPED=$((SKIPPED + 1)) - STAGE_RESULTS+=("SKIP: $1") -} - -# run_tests [extra go test args...] -run_tests() { - local stage_name="$1" - shift - local pkg="$1" - shift - - log_stage "$stage_name" - - if go test $VERBOSE $RACE -timeout=5m -count=1 "$pkg" "$@" 2>&1; then - log_pass "$stage_name" - else - log_fail "$stage_name" - fi -} - -# run_tests_with_run <-run pattern> -run_tests_with_run() { - local stage_name="$1" - local pkg="$2" - local pattern="$3" - - log_stage "$stage_name" - - if go test $VERBOSE $RACE -timeout=5m -count=1 -run "$pattern" "$pkg" 2>&1; then - log_pass "$stage_name" - else - log_fail "$stage_name" - fi -} - -# --- Banner ------------------------------------------------------------------- - -log_header "PR #3057: Sequencer Catch-Up From Base — Test Suite" -echo "" -echo " Branch: $(git branch --show-current 2>/dev/null || echo 'detached')" -echo " Commit: $(git rev-parse --short HEAD 2>/dev/null || echo 'unknown')" -echo " Date: $(date -Iseconds)" -echo " Flags: unit=$RUN_UNIT e2e=$RUN_E2E verbose=${VERBOSE:-off} race=${RACE:-off}" - -# ============================================================================== -# UNIT & INTEGRATION TESTS -# ============================================================================== - -if [ "$RUN_UNIT" = true ]; then - - log_header "Unit & Integration Tests" - - # -------------------------------------------------------------------------- - # Stage 1: Sequencer catch-up unit tests - # - # These are the core tests added by the PR. They test: - # - Detecting old epochs and entering catch-up mode - # - Skipping mempool transactions during catch-up - # - Using DA timestamps for catch-up block timestamps - # - Exiting catch-up when HeightFromFuture is returned - # - Multi-epoch replay - # - No catch-up when epoch is recent - # - No catch-up when forced inclusion is not configured - # - Checkpoint advancement during catch-up - # - Monotonic timestamp generation - # -------------------------------------------------------------------------- - run_tests_with_run \ - "Sequencer catch-up tests (all TestSequencer_CatchUp_*)" \ - "./pkg/sequencers/single/..." \ - "TestSequencer_CatchUp_" - - # -------------------------------------------------------------------------- - # Stage 2: Existing sequencer tests (ensure no regressions) - # - # The PR modifies GetNextBatch, fetchNextDAEpoch, and adds new fields to - # the Sequencer struct. Run ALL sequencer tests to catch regressions. - # -------------------------------------------------------------------------- - run_tests \ - "Sequencer full test suite (regression check)" \ - "./pkg/sequencers/single/..." - - # -------------------------------------------------------------------------- - # Stage 3: DA client interface & tracing tests - # - # The PR adds GetLatestDAHeight to the DA Client interface and adds tracing - # for it. This verifies the new method integrates correctly. - # -------------------------------------------------------------------------- - run_tests \ - "DA client tracing tests" \ - "./block/internal/da/..." - - # -------------------------------------------------------------------------- - # Stage 4: Syncer tests - # - # The PR modifies the DA height advancement logic in the syncer to handle - # catch-up blocks by stepping one epoch at a time instead of jumping. - # -------------------------------------------------------------------------- - run_tests \ - "Syncer tests (DA height advancement)" \ - "./block/internal/syncing/..." - - # -------------------------------------------------------------------------- - # Stage 5: Mock & test DA - # - # The PR adds GetLatestDAHeight to the mock DA client and the DummyDA. - # Verify mocks compile and are consistent. - # -------------------------------------------------------------------------- - run_tests \ - "Mock DA (build check)" \ - "./test/mocks/..." \ - -run "^$" # No test functions, just compile check - - run_tests \ - "Test DA (DummyDA)" \ - "./test/testda/..." - - # -------------------------------------------------------------------------- - # Stage 6: EVM force inclusion mock update - # - # The PR adds GetLatestDAHeight to the mockDA in apps/evm/server test. - # This is a separate Go module, so we run from within its directory. - # -------------------------------------------------------------------------- - log_stage "EVM force inclusion server tests" - if (cd apps/evm && go test $VERBOSE $RACE -timeout=5m -count=1 ./server/...) 2>&1; then - log_pass "EVM force inclusion server tests" - else - log_fail "EVM force inclusion server tests" - fi - - # -------------------------------------------------------------------------- - # Stage 7: Types package — epoch calculations - # - # The catch-up logic relies on CalculateEpochNumber and - # CalculateEpochBoundaries from the types package. - # -------------------------------------------------------------------------- - run_tests_with_run \ - "Epoch calculation tests" \ - "./types/..." \ - "Epoch" - - # -------------------------------------------------------------------------- - # Stage 8: Block package (full build & test) - # - # The block package contains the DA client, syncer, and forced inclusion - # retrieval. Run the full suite to catch any integration issues. - # -------------------------------------------------------------------------- - run_tests \ - "Block package full suite" \ - "./block/..." - -fi - -# ============================================================================== -# E2E TESTS -# ============================================================================== - -if [ "$RUN_E2E" = true ]; then - - log_header "End-to-End Tests" - - # -------------------------------------------------------------------------- - # Build binaries required for e2e tests - # -------------------------------------------------------------------------- - log_stage "Building binaries for e2e tests" - if make build build-da build-evm 2>&1; then - log_pass "Binary build" - else - log_fail "Binary build" - echo -e "${RED}Cannot run e2e tests without binaries. Aborting e2e stage.${RESET}" - fi - - if [ -f "./build/testapp" ] && [ -f "./build/evm" ]; then - # Build Docker image for Docker e2e tests (if not already built) - log_stage "Building Docker image for e2e tests" - if make docker-build-if-local 2>&1; then - log_pass "Docker image build" - else - log_skip "Docker image build (non-fatal)" - fi - - # -------------------------------------------------------------------------- - # Stage E1: Force inclusion e2e tests - # - # These tests exercise the end-to-end forced inclusion flow, which is the - # mechanism that catch-up relies on. The sequencer fetches forced inclusion - # transactions from DA epochs and includes them in blocks. - # -------------------------------------------------------------------------- - log_stage "Force inclusion e2e tests" - if (cd test/e2e && go test -mod=readonly -failfast -timeout=15m \ - -tags='e2e evm' $VERBOSE $RACE \ - -run "TestEvmSequencerForceInclusionE2E|TestEvmFullNodeForceInclusionE2E" \ - ./... --binary=../../build/testapp --evm-binary=../../build/evm) 2>&1; then - log_pass "Force inclusion e2e tests" - else - log_fail "Force inclusion e2e tests" - fi - - # -------------------------------------------------------------------------- - # Stage E2: DA restart e2e test - # - # Tests DA layer failure and recovery — related to catch-up because the - # sequencer must handle accumulated blocks during DA downtime. - # -------------------------------------------------------------------------- - log_stage "DA restart e2e test" - if (cd test/e2e && go test -mod=readonly -failfast -timeout=15m \ - -tags='e2e evm' $VERBOSE $RACE \ - -run "TestEvmDARestartWithPendingBlocksE2E" \ - ./... --binary=../../build/testapp --evm-binary=../../build/evm) 2>&1; then - log_pass "DA restart e2e test" - else - log_fail "DA restart e2e test" - fi - - # -------------------------------------------------------------------------- - # Stage E3: Sequencer restart e2e test - # - # Tests sequencer restart and state persistence — the catch-up feature - # activates during sequencer restart when DA has advanced. - # -------------------------------------------------------------------------- - log_stage "Sequencer restart e2e test" - if (cd test/e2e && go test -mod=readonly -failfast -timeout=15m \ - -tags='e2e evm' $VERBOSE $RACE \ - -run "TestEvmSequencerFullNodeRestartE2E" \ - ./... --binary=../../build/testapp --evm-binary=../../build/evm) 2>&1; then - log_pass "Sequencer restart e2e test" - else - log_fail "Sequencer restart e2e test" - fi - - # -------------------------------------------------------------------------- - # Stage E4: Basic testapp e2e (aggregator + fullnode sanity) - # - # Sanity check that the DA interface changes don't break basic operation. - # -------------------------------------------------------------------------- - log_stage "Basic testapp e2e test" - if (cd test/e2e && go test -mod=readonly -failfast -timeout=15m \ - -tags='e2e' $VERBOSE $RACE \ - -run "TestBasic|TestNodeRestartPersistence" \ - ./... --binary=../../build/testapp --evm-binary=../../build/evm) 2>&1; then - log_pass "Basic testapp e2e test" - else - log_fail "Basic testapp e2e test" - fi - - else - log_skip "E2E tests (binaries not found)" - fi -fi - -# ============================================================================== -# SUMMARY -# ============================================================================== - -log_header "Test Summary" - -for result in "${STAGE_RESULTS[@]}"; do - case "$result" in - PASS:*) echo -e " ${GREEN}$result${RESET}" ;; - FAIL:*) echo -e " ${RED}$result${RESET}" ;; - SKIP:*) echo -e " ${YELLOW}$result${RESET}" ;; - esac -done - -echo "" -echo -e " Total: $((PASSED + FAILURES + SKIPPED))" -echo -e " ${GREEN}Passed: $PASSED${RESET}" -echo -e " ${RED}Failed: $FAILURES${RESET}" -echo -e " ${YELLOW}Skipped: $SKIPPED${RESET}" -echo "" - -if [ "$FAILURES" -gt 0 ]; then - echo -e "${RED}${BOLD}RESULT: FAILED${RESET} ($FAILURES stage(s) failed)" - exit 1 -else - echo -e "${GREEN}${BOLD}RESULT: PASSED${RESET}" - exit 0 -fi From 2f05cb9a2301f90256b32fa7e50423babc1b9dd6 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Wed, 18 Feb 2026 10:02:53 +0100 Subject: [PATCH 09/13] ai test --- test/e2e/evm_force_inclusion_e2e_test.go | 305 +++++++++++++++++++++++ 1 file changed, 305 insertions(+) diff --git a/test/e2e/evm_force_inclusion_e2e_test.go b/test/e2e/evm_force_inclusion_e2e_test.go index ceabc78c15..582a7979b2 100644 --- a/test/e2e/evm_force_inclusion_e2e_test.go +++ b/test/e2e/evm_force_inclusion_e2e_test.go @@ -10,6 +10,7 @@ import ( "net/http" "os" "path/filepath" + "syscall" "testing" "time" @@ -560,3 +561,307 @@ func TestEvmSyncerMaliciousSequencerForceInclusionE2E(t *testing.T) { require.False(t, evm.CheckTxIncluded(seqClient, txForce.Hash()), "Malicious sequencer should NOT have included the forced inclusion transaction") } + +// setDAStartHeightInGenesis modifies the genesis file to set da_start_height. +// This is needed because the based sequencer requires non-zero DAStartHeight, +// and catch-up detection via CalculateEpochNumber also depends on it. +func setDAStartHeightInGenesis(t *testing.T, homeDir string, height uint64) { + t.Helper() + genesisPath := filepath.Join(homeDir, "config", "genesis.json") + data, err := os.ReadFile(genesisPath) + require.NoError(t, err) + + var genesis map[string]interface{} + err = json.Unmarshal(data, &genesis) + require.NoError(t, err) + + genesis["da_start_height"] = height + + newData, err := json.MarshalIndent(genesis, "", " ") + require.NoError(t, err) + + err = os.WriteFile(genesisPath, newData, 0644) + require.NoError(t, err) +} + +// TestEvmSequencerCatchUpBasedSequencerE2E tests that when a sequencer restarts after +// extended downtime (multiple DA epochs), it correctly enters catch-up mode, replays +// missed forced inclusion transactions from DA (matching what a based sequencer would +// produce), and then resumes normal operation. +// +// Test Scenario: +// A sequencer goes down. During downtime, forced inclusion txs accumulate on DA. +// A based sequencer keeps the chain alive using those DA txs. When the original +// sequencer restarts, it must catch up by replaying the same DA epochs — producing +// identical forced-inclusion-only blocks — before resuming normal mempool-based operation. +// +// Architecture: +// - 2 Reth instances (via setupCommonEVMTest with needsFullNode=true) +// - Reth1 -> Sequencer (phases 1 and 4) +// - Reth2 -> Based sequencer (phase 3, reuses full node Reth/port slots) +// +// Phases: +// 0. Setup: DA, 2 Reth containers, init sequencer with force inclusion epoch=2 and da_start_height=1 +// 1. Normal sequencer operation: submit normal txs +// 2. Sequencer downtime: stop sequencer, submit forced inclusion txs directly to DA +// 3. Start based sequencer: verify it includes forced inclusion txs +// 4. Restart original sequencer: it catches up via DA replay +// 5. Verification: forced txs on both nodes, sequencer resumes normal operation +func TestEvmSequencerCatchUpBasedSequencerE2E(t *testing.T) { + sut := NewSystemUnderTest(t) + workDir := t.TempDir() + sequencerHome := filepath.Join(workDir, "sequencer") + basedSeqHome := filepath.Join(workDir, "based-sequencer") + + // ===== PHASE 0: Setup ===== + t.Log("Phase 0: Setup") + + jwtSecret, fullNodeJwtSecret, genesisHash, endpoints := setupCommonEVMTest(t, sut, true) + + // Create passphrase and JWT secret files for sequencer + passphraseFile := createPassphraseFile(t, sequencerHome) + jwtSecretFile := createJWTSecretFile(t, sequencerHome, jwtSecret) + + // Initialize sequencer node + output, err := sut.RunCmd(evmSingleBinaryPath, + "init", + "--evnode.node.aggregator=true", + "--evnode.signer.passphrase_file", passphraseFile, + "--home", sequencerHome, + ) + require.NoError(t, err, "failed to init sequencer", output) + + // Modify genesis: enable force inclusion with epoch=2, set da_start_height=1 + enableForceInclusionInGenesis(t, sequencerHome, 2) + setDAStartHeightInGenesis(t, sequencerHome, 1) + + // Start sequencer with forced inclusion namespace + seqProcess := sut.ExecCmd(evmSingleBinaryPath, + "start", + "--evm.jwt-secret-file", jwtSecretFile, + "--evm.genesis-hash", genesisHash, + "--evnode.node.block_time", DefaultBlockTime, + "--evnode.node.aggregator=true", + "--evnode.signer.passphrase_file", passphraseFile, + "--home", sequencerHome, + "--evnode.da.block_time", DefaultDABlockTime, + "--evnode.da.address", endpoints.GetDAAddress(), + "--evnode.da.namespace", DefaultDANamespace, + "--evnode.da.forced_inclusion_namespace", "forced-inc", + "--evnode.rpc.address", endpoints.GetRollkitRPCListen(), + "--evnode.p2p.listen_address", endpoints.GetRollkitP2PAddress(), + "--evm.engine-url", endpoints.GetSequencerEngineURL(), + "--evm.eth-url", endpoints.GetSequencerEthURL(), + ) + sut.AwaitNodeUp(t, endpoints.GetRollkitRPCAddress(), NodeStartupTimeout) + t.Log("Sequencer is up with force inclusion enabled") + + // ===== PHASE 1: Normal Sequencer Operation ===== + t.Log("Phase 1: Normal Sequencer Operation") + + seqClient, err := ethclient.Dial(endpoints.GetSequencerEthURL()) + require.NoError(t, err) + defer seqClient.Close() + + ctx := context.Background() + var nonce uint64 = 0 + + // Submit 2 normal transactions + var normalTxHashes []common.Hash + for i := 0; i < 2; i++ { + tx := evm.GetRandomTransaction(t, TestPrivateKey, TestToAddress, DefaultChainID, DefaultGasLimit, &nonce) + err = seqClient.SendTransaction(ctx, tx) + require.NoError(t, err) + normalTxHashes = append(normalTxHashes, tx.Hash()) + t.Logf("Submitted normal tx %d: %s (nonce=%d)", i+1, tx.Hash().Hex(), tx.Nonce()) + } + + // Wait for normal txs to be included + for i, txHash := range normalTxHashes { + require.Eventually(t, func() bool { + return evm.CheckTxIncluded(seqClient, txHash) + }, 15*time.Second, 500*time.Millisecond, "Normal tx %d not included", i+1) + t.Logf("Normal tx %d included", i+1) + } + + // Record sequencer height + seqHeader, err := seqClient.HeaderByNumber(ctx, nil) + require.NoError(t, err) + preDowntimeHeight := seqHeader.Number.Uint64() + t.Logf("Sequencer height before downtime: %d", preDowntimeHeight) + + // ===== PHASE 2: Sequencer Downtime + Submit Forced Inclusion Txs to DA ===== + t.Log("Phase 2: Sequencer Downtime + Submit Forced Inclusion Txs to DA") + + // Stop sequencer process (SIGTERM to just this process, not all evm processes) + err = seqProcess.Signal(syscall.SIGTERM) + require.NoError(t, err, "failed to stop sequencer process") + time.Sleep(1 * time.Second) // Wait for process to exit + + // Submit forced inclusion transactions directly to DA + blobClient, err := blobrpc.NewClient(ctx, endpoints.GetDAAddress(), "", "") + require.NoError(t, err, "Failed to create blob RPC client") + defer blobClient.Close() + + daClient := block.NewDAClient( + blobClient, + config.Config{ + DA: config.DAConfig{ + Namespace: DefaultDANamespace, + ForcedInclusionNamespace: "forced-inc", + }, + }, + zerolog.Nop(), + ) + + // Create and submit 3 forced inclusion txs to DA + var forcedTxHashes []common.Hash + for i := 0; i < 3; i++ { + txForce := evm.GetRandomTransaction(t, TestPrivateKey, TestToAddress, DefaultChainID, DefaultGasLimit, &nonce) + txBytes, err := txForce.MarshalBinary() + require.NoError(t, err) + + result := daClient.Submit(ctx, [][]byte{txBytes}, -1, daClient.GetForcedInclusionNamespace(), nil) + require.Equal(t, da.StatusSuccess, result.Code, "Failed to submit forced tx %d to DA: %s", i+1, result.Message) + + forcedTxHashes = append(forcedTxHashes, txForce.Hash()) + t.Logf("Submitted forced inclusion tx %d to DA: %s (nonce=%d)", i+1, txForce.Hash().Hex(), txForce.Nonce()) + } + + // Wait for DA to advance past multiple epochs (epoch=2, DA block time=1s) + // Need missedEpochs > 1, so DA must be at least 2 full epochs ahead + t.Log("Waiting for DA to advance past multiple epochs...") + time.Sleep(6 * time.Second) + + // ===== PHASE 3: Start Based Sequencer ===== + t.Log("Phase 3: Start Based Sequencer") + + // Initialize based sequencer node + output, err = sut.RunCmd(evmSingleBinaryPath, + "init", + "--home", basedSeqHome, + ) + require.NoError(t, err, "failed to init based sequencer", output) + + // Copy genesis from sequencer to based sequencer (shares chain config) + MustCopyFile(t, + filepath.Join(sequencerHome, "config", "genesis.json"), + filepath.Join(basedSeqHome, "config", "genesis.json"), + ) + + // Create JWT secret file for based sequencer using fullNodeJwtSecret + basedSeqJwtSecretFile := createJWTSecretFile(t, basedSeqHome, fullNodeJwtSecret) + + // Start based sequencer (uses full node Reth/port slots) + sut.ExecCmd(evmSingleBinaryPath, + "start", + "--evnode.node.aggregator=true", + "--evnode.node.based_sequencer=true", + "--evm.jwt-secret-file", basedSeqJwtSecretFile, + "--evm.genesis-hash", genesisHash, + "--evnode.node.block_time", DefaultBlockTime, + "--home", basedSeqHome, + "--evnode.da.block_time", DefaultDABlockTime, + "--evnode.da.address", endpoints.GetDAAddress(), + "--evnode.da.namespace", DefaultDANamespace, + "--evnode.da.forced_inclusion_namespace", "forced-inc", + "--evnode.rpc.address", endpoints.GetFullNodeRPCListen(), + "--evnode.p2p.listen_address", endpoints.GetFullNodeP2PAddress(), + "--evm.engine-url", endpoints.GetFullNodeEngineURL(), + "--evm.eth-url", endpoints.GetFullNodeEthURL(), + ) + + // Based sequencer may take longer to be ready (processing DA from start height) + sut.AwaitNodeLive(t, endpoints.GetFullNodeRPCAddress(), NodeStartupTimeout) + t.Log("Based sequencer is live") + + // Connect ethclient to based sequencer + basedSeqClient, err := ethclient.Dial(endpoints.GetFullNodeEthURL()) + require.NoError(t, err) + defer basedSeqClient.Close() + + // Verify based sequencer includes forced inclusion txs + t.Log("Waiting for based sequencer to include forced inclusion txs...") + for i, txHash := range forcedTxHashes { + require.Eventually(t, func() bool { + return evm.CheckTxIncluded(basedSeqClient, txHash) + }, 60*time.Second, 1*time.Second, + "Forced inclusion tx %d (%s) not included in based sequencer", i+1, txHash.Hex()) + t.Logf("Based sequencer included forced tx %d: %s", i+1, txHash.Hex()) + } + t.Log("All forced inclusion txs verified on based sequencer") + + // ===== PHASE 4: Restart Original Sequencer (Catch-Up) ===== + t.Log("Phase 4: Restart Original Sequencer (Catch-Up)") + + // Restart the sequencer using existing home directory (no init needed) + sut.ExecCmd(evmSingleBinaryPath, + "start", + "--evm.jwt-secret-file", jwtSecretFile, + "--evm.genesis-hash", genesisHash, + "--evnode.node.block_time", DefaultBlockTime, + "--evnode.node.aggregator=true", + "--evnode.signer.passphrase_file", passphraseFile, + "--home", sequencerHome, + "--evnode.da.block_time", DefaultDABlockTime, + "--evnode.da.address", endpoints.GetDAAddress(), + "--evnode.da.namespace", DefaultDANamespace, + "--evnode.da.forced_inclusion_namespace", "forced-inc", + "--evnode.rpc.address", endpoints.GetRollkitRPCListen(), + "--evnode.p2p.listen_address", endpoints.GetRollkitP2PAddress(), + "--evm.engine-url", endpoints.GetSequencerEngineURL(), + "--evm.eth-url", endpoints.GetSequencerEthURL(), + ) + sut.AwaitNodeUp(t, endpoints.GetRollkitRPCAddress(), NodeStartupTimeout) + t.Log("Sequencer restarted successfully") + + // Reconnect ethclient to sequencer + seqClient.Close() + seqClient, err = ethclient.Dial(endpoints.GetSequencerEthURL()) + require.NoError(t, err) + + // ===== PHASE 5: Verification ===== + t.Log("Phase 5: Verification") + + // 5a. Verify sequencer includes forced inclusion txs after catch-up + t.Log("Verifying sequencer includes forced inclusion txs after catch-up...") + for i, txHash := range forcedTxHashes { + require.Eventually(t, func() bool { + return evm.CheckTxIncluded(seqClient, txHash) + }, 30*time.Second, 1*time.Second, + "Forced inclusion tx %d (%s) should be included after catch-up", i+1, txHash.Hex()) + t.Logf("Sequencer caught up with forced tx %d: %s", i+1, txHash.Hex()) + } + t.Log("All forced inclusion txs verified on sequencer after catch-up") + + // 5b. Verify sequencer resumes normal operation + t.Log("Verifying sequencer resumes normal mempool-based operation...") + txNormal := evm.GetRandomTransaction(t, TestPrivateKey, TestToAddress, DefaultChainID, DefaultGasLimit, &nonce) + err = seqClient.SendTransaction(ctx, txNormal) + require.NoError(t, err) + t.Logf("Submitted post-catchup normal tx: %s (nonce=%d)", txNormal.Hash().Hex(), txNormal.Nonce()) + + require.Eventually(t, func() bool { + return evm.CheckTxIncluded(seqClient, txNormal.Hash()) + }, 15*time.Second, 500*time.Millisecond, + "Normal tx after catch-up should be included") + t.Log("Post-catchup normal tx included - sequencer resumed normal operation") + + // 5c. Verify both nodes have the forced inclusion txs + t.Log("Verifying both nodes have all forced inclusion txs...") + for i, txHash := range forcedTxHashes { + seqIncluded := evm.CheckTxIncluded(seqClient, txHash) + basedIncluded := evm.CheckTxIncluded(basedSeqClient, txHash) + require.True(t, seqIncluded, "Forced tx %d should be on sequencer", i+1) + require.True(t, basedIncluded, "Forced tx %d should be on based sequencer", i+1) + t.Logf("Forced tx %d verified on both nodes: %s", i+1, txHash.Hex()) + } + + t.Log("Test PASSED: Sequencer catch-up with based sequencer verified successfully") + t.Logf(" - Sequencer processed %d normal txs before downtime", len(normalTxHashes)) + t.Logf(" - %d forced inclusion txs submitted to DA during downtime", len(forcedTxHashes)) + t.Logf(" - Based sequencer included all forced txs from DA") + t.Logf(" - Sequencer caught up and replayed all forced txs after restart") + t.Logf(" - Sequencer resumed normal mempool-based operation") + t.Logf(" - Both nodes have identical forced inclusion tx set") +} From 728bd4c8b032ce400c321cc41c5271cd73a2f67c Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Wed, 18 Feb 2026 10:03:05 +0100 Subject: [PATCH 10/13] cleanup --- pkg/store/cached_store.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/pkg/store/cached_store.go b/pkg/store/cached_store.go index 86f81129da..8a5cca5b38 100644 --- a/pkg/store/cached_store.go +++ b/pkg/store/cached_store.go @@ -169,12 +169,6 @@ func (cs *CachedStore) PruneBlocks(ctx context.Context, height uint64) error { return nil } -// DeleteStateAtHeight removes the state entry at the given height from the underlying store. -func (cs *CachedStore) DeleteStateAtHeight(ctx context.Context, height uint64) error { - // This value is not cached, so nothing to invalidate. - return cs.Store.DeleteStateAtHeight(ctx, height) -} - // Close closes the underlying store. func (cs *CachedStore) Close() error { cs.ClearCache() From d7b8b2ccd0376a9f86f2aa52d2804ac6396e1092 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Wed, 18 Feb 2026 14:48:37 +0100 Subject: [PATCH 11/13] fixes --- test/e2e/evm_force_inclusion_e2e_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/test/e2e/evm_force_inclusion_e2e_test.go b/test/e2e/evm_force_inclusion_e2e_test.go index 582a7979b2..1bf060f3c6 100644 --- a/test/e2e/evm_force_inclusion_e2e_test.go +++ b/test/e2e/evm_force_inclusion_e2e_test.go @@ -737,8 +737,11 @@ func TestEvmSequencerCatchUpBasedSequencerE2E(t *testing.T) { t.Log("Phase 3: Start Based Sequencer") // Initialize based sequencer node + basedSeqPassphraseFile := createPassphraseFile(t, basedSeqHome) output, err = sut.RunCmd(evmSingleBinaryPath, "init", + "--evnode.node.aggregator=true", + "--evnode.signer.passphrase_file", basedSeqPassphraseFile, "--home", basedSeqHome, ) require.NoError(t, err, "failed to init based sequencer", output) @@ -757,6 +760,7 @@ func TestEvmSequencerCatchUpBasedSequencerE2E(t *testing.T) { "start", "--evnode.node.aggregator=true", "--evnode.node.based_sequencer=true", + "--evnode.signer.passphrase_file", basedSeqPassphraseFile, "--evm.jwt-secret-file", basedSeqJwtSecretFile, "--evm.genesis-hash", genesisHash, "--evnode.node.block_time", DefaultBlockTime, From 489fc33b81b0449c6425d14c9ff436c7c448ffa9 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Wed, 18 Feb 2026 16:09:42 +0100 Subject: [PATCH 12/13] bump --- pkg/config/config.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/config/config.go b/pkg/config/config.go index 4bf4ae9658..fd1b94ba78 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -255,9 +255,9 @@ func (d *DAConfig) GetForcedInclusionNamespace() string { // NodeConfig contains all Rollkit specific configuration parameters type NodeConfig struct { // Node mode configuration - Aggregator bool `yaml:"aggregator" comment:"Run node in aggregator mode"` - BasedSequencer bool `yaml:"based_sequencer" comment:"Run node with based sequencer (fetches transactions only from DA forced inclusion namespace). Requires aggregator mode to be enabled."` - Light bool `yaml:"light" comment:"Run node in light mode"` + Aggregator bool `mapstructure:"aggregator" yaml:"aggregator" comment:"Run node in aggregator mode"` + BasedSequencer bool `mapstructure:"based_sequencer" yaml:"based_sequencer" comment:"Run node with based sequencer (fetches transactions only from DA forced inclusion namespace). Requires aggregator mode to be enabled."` + Light bool `mapstructure:"light" yaml:"light" comment:"Run node in light mode"` // Block management configuration BlockTime DurationWrapper `mapstructure:"block_time" yaml:"block_time" comment:"Block time (duration). Examples: \"500ms\", \"1s\", \"5s\", \"1m\", \"2m30s\", \"10m\"."` From 551c81816331da349906c39fc3033b6d04415c28 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Wed, 18 Feb 2026 16:30:53 +0100 Subject: [PATCH 13/13] base sequencer don't sign --- block/internal/executing/executor.go | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/block/internal/executing/executor.go b/block/internal/executing/executor.go index ac68f2cd85..49e8ad7e00 100644 --- a/block/internal/executing/executor.go +++ b/block/internal/executing/executor.go @@ -774,12 +774,15 @@ func (e *Executor) executeTxsWithRetry(ctx context.Context, rawTxs [][]byte, hea // ValidateBlock validates the created block. func (e *Executor) ValidateBlock(_ context.Context, lastState types.State, header *types.SignedHeader, data *types.Data) error { - // Set custom verifier for aggregator node signature - header.SetCustomVerifierForAggregator(e.options.AggregatorNodeSignatureBytesProvider) - - // Basic header validation - if err := header.ValidateBasic(); err != nil { - return fmt.Errorf("invalid header: %w", err) + if e.config.Node.BasedSequencer { + if err := header.Header.ValidateBasic(); err != nil { + return fmt.Errorf("invalid header: %w", err) + } + } else { + header.SetCustomVerifierForAggregator(e.options.AggregatorNodeSignatureBytesProvider) + if err := header.ValidateBasic(); err != nil { + return fmt.Errorf("invalid header: %w", err) + } } return lastState.AssertValidForNextState(header, data)