Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 57 additions & 2 deletions block/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,10 +212,10 @@ func NewSyncComponents(
}, nil
}

// NewAggregatorComponents creates components for an aggregator full node that can produce and sync blocks.
// newAggregatorComponents creates components for an aggregator full node that can produce and sync blocks.
// Aggregator nodes have full capabilities - they can produce blocks, sync from P2P and DA,
// and submit headers/data to DA. Requires a signer for block production and DA submission.
func NewAggregatorComponents(
func newAggregatorComponents(
config config.Config,
genesis genesis.Genesis,
store store.Store,
Expand Down Expand Up @@ -323,3 +323,58 @@ func NewAggregatorComponents(
errorCh: errorCh,
}, nil
}

// NewAggregatorWithCatchupComponents creates aggregator components that include a Syncer
// for DA/P2P catchup before block production begins.
//
// The caller should:
// 1. Start the Syncer and wait for DA head + P2P catchup
// 2. Stop the Syncer and set Components.Syncer = nil
// 3. Call Components.Start() — which will start the Executor and other components
func NewAggregatorWithCatchupComponents(
config config.Config,
genesis genesis.Genesis,
store store.Store,
exec coreexecutor.Executor,
sequencer coresequencer.Sequencer,
daClient da.Client,
signer signer.Signer,
headerSyncService *sync.HeaderSyncService,
dataSyncService *sync.DataSyncService,
logger zerolog.Logger,
metrics *Metrics,
blockOpts BlockOptions,
raftNode common.RaftNode,
) (*Components, error) {
bc, err := newAggregatorComponents(
config, genesis, store, exec, sequencer, daClient, signer,
headerSyncService, dataSyncService, logger, metrics, blockOpts, raftNode,
)
if err != nil {
return nil, err
}

// Create a catchup syncer that shares the same cache manager
catchupErrCh := make(chan error, 1)
catchupSyncer := syncing.NewSyncer(
store,
exec,
daClient,
bc.Cache,
metrics,
config,
genesis,
headerSyncService.Store(),
dataSyncService.Store(),
logger,
blockOpts,
catchupErrCh,
raftNode,
)
if config.Instrumentation.IsTracingEnabled() {
catchupSyncer.SetBlockSyncer(syncing.WithTracingBlockSyncer(catchupSyncer))
}

bc.Syncer = catchupSyncer
return bc, nil
}
4 changes: 2 additions & 2 deletions block/components_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func TestNewAggregatorComponents_Creation(t *testing.T) {
daClient.On("GetForcedInclusionNamespace").Return([]byte(nil)).Maybe()
daClient.On("HasForcedInclusionNamespace").Return(false).Maybe()

components, err := NewAggregatorComponents(
components, err := newAggregatorComponents(
cfg,
gen,
memStore,
Expand Down Expand Up @@ -247,7 +247,7 @@ func TestExecutor_RealExecutionClientFailure_StopsNode(t *testing.T) {
Return(nil, criticalError).Maybe()

// Create aggregator node
components, err := NewAggregatorComponents(
components, err := newAggregatorComponents(
cfg,
gen,
memStore,
Expand Down
10 changes: 10 additions & 0 deletions block/internal/syncing/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ type Syncer struct {
// P2P wait coordination
p2pWaitState atomic.Value // stores p2pWaitState

// DA head-reached signal for recovery mode (stays true once DA head is seen)
daHeadReached atomic.Bool

// blockSyncer is the interface used for block sync operations.
// defaults to self, but can be wrapped with tracing.
blockSyncer BlockSyncer
Expand Down Expand Up @@ -405,6 +408,7 @@ func (s *Syncer) daWorkerLoop() {
var backoff time.Duration
if err == nil {
// No error, means we are caught up.
s.daHeadReached.Store(true)
backoff = s.config.DA.BlockTime.Duration
} else {
// Error, back off for a shorter duration.
Expand All @@ -422,6 +426,12 @@ func (s *Syncer) daWorkerLoop() {
}
}

// HasReachedDAHead returns true once the DA worker has reached the DA head.
// Once set, it stays true.
func (s *Syncer) HasReachedDAHead() bool {
return s.daHeadReached.Load()
}

func (s *Syncer) fetchDAUntilCaughtUp() error {
for {
select {
Expand Down
111 changes: 105 additions & 6 deletions node/failover.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ type failoverState struct {
dataSyncService *evsync.DataSyncService
rpcServer *http.Server
bc *block.Components

// catchup fields — used when the aggregator needs to sync before producing
catchupEnabled bool
catchupTimeout time.Duration
daBlockTime time.Duration
store store.Store
}

func newSyncMode(
Expand Down Expand Up @@ -63,7 +69,7 @@ func newSyncMode(
raftNode,
)
}
return setupFailoverState(nodeConfig, genesis, logger, rktStore, blockComponentsFn, raftNode, p2pClient)
return setupFailoverState(nodeConfig, genesis, logger, rktStore, blockComponentsFn, raftNode, p2pClient, false)
}

func newAggregatorMode(
Expand All @@ -81,7 +87,7 @@ func newAggregatorMode(
p2pClient *p2p.Client,
) (*failoverState, error) {
blockComponentsFn := func(headerSyncService *evsync.HeaderSyncService, dataSyncService *evsync.DataSyncService) (*block.Components, error) {
return block.NewAggregatorComponents(
return block.NewAggregatorWithCatchupComponents(
nodeConfig,
genesis,
rktStore,
Expand All @@ -97,8 +103,7 @@ func newAggregatorMode(
raftNode,
)
}

return setupFailoverState(nodeConfig, genesis, logger, rktStore, blockComponentsFn, raftNode, p2pClient)
return setupFailoverState(nodeConfig, genesis, logger, rktStore, blockComponentsFn, raftNode, p2pClient, true)
}

func setupFailoverState(
Expand All @@ -109,6 +114,7 @@ func setupFailoverState(
buildComponentsFn func(headerSyncService *evsync.HeaderSyncService, dataSyncService *evsync.DataSyncService) (*block.Components, error),
raftNode *raft.Node,
p2pClient *p2p.Client,
isAggregator bool,
) (*failoverState, error) {
headerSyncService, err := evsync.NewHeaderSyncService(rktStore, nodeConfig, genesis, p2pClient, logger.With().Str("component", "HeaderSyncService").Logger())
if err != nil {
Expand Down Expand Up @@ -152,13 +158,23 @@ func setupFailoverState(
return nil, fmt.Errorf("build follower components: %w", err)
}

// Catchup only applies to aggregator nodes that need to sync before
catchupEnabled := isAggregator && nodeConfig.Node.CatchupTimeout.Duration > 0
if isAggregator && !catchupEnabled {
bc.Syncer = nil
}

return &failoverState{
logger: logger,
p2pClient: p2pClient,
headerSyncService: headerSyncService,
dataSyncService: dataSyncService,
rpcServer: rpcServer,
bc: bc,
store: rktStore,
catchupEnabled: catchupEnabled,
catchupTimeout: nodeConfig.Node.CatchupTimeout.Duration,
daBlockTime: nodeConfig.DA.BlockTime.Duration,
}, nil
}

Expand Down Expand Up @@ -209,6 +225,12 @@ func (f *failoverState) Run(pCtx context.Context) (multiErr error) {
defer stopService(f.headerSyncService.Stop, "header sync")
defer stopService(f.dataSyncService.Stop, "data sync")

if f.catchupEnabled && f.bc.Syncer != nil {
if err := f.runCatchupPhase(ctx); err != nil {
return err
}
}

wg.Go(func() error {
defer func() {
shutdownCtx, done := context.WithTimeout(context.Background(), 3*time.Second)
Expand All @@ -224,6 +246,85 @@ func (f *failoverState) Run(pCtx context.Context) (multiErr error) {
return wg.Wait()
}

// runCatchupPhase starts the catchup syncer, waits until DA head is reached and P2P
// is caught up, then stops the syncer so the executor can take over.
func (f *failoverState) runCatchupPhase(ctx context.Context) error {
f.logger.Info().Msg("catchup: syncing from DA and P2P before producing blocks")

if err := f.bc.Syncer.Start(ctx); err != nil {
return fmt.Errorf("catchup syncer start: %w", err)
}
defer f.bc.Syncer.Stop() // nolint:errcheck // not critical

caughtUp, err := f.waitForCatchup(ctx)
if err != nil {
return err
}
if !caughtUp {
return ctx.Err()
}
f.logger.Info().Msg("catchup: fully caught up, stopping syncer and starting block production")
f.bc.Syncer = nil
return nil
}

// waitForCatchup polls DA and P2P catchup status until both sources indicate the node is caught up.
func (f *failoverState) waitForCatchup(ctx context.Context) (bool, error) {
pollInterval := f.daBlockTime
if pollInterval <= 0 {
pollInterval = time.Second / 10
}

ticker := time.NewTicker(pollInterval)
defer ticker.Stop()

var timeoutCh <-chan time.Time
if f.catchupTimeout > 0 {
f.logger.Debug().Dur("p2p_timeout", f.catchupTimeout).Msg("P2P catchup timeout configured")
timeoutCh = time.After(f.catchupTimeout)
} else {
f.logger.Debug().Msg("P2P catchup timeout disabled, relying on DA only")
}
ignoreP2P := false

for {
select {
case <-ctx.Done():
return false, nil
case <-timeoutCh:
f.logger.Info().Msg("catchup: P2P timeout reached, ignoring P2P status")
ignoreP2P = true
timeoutCh = nil
case <-ticker.C:
daCaughtUp := f.bc.Syncer != nil && f.bc.Syncer.HasReachedDAHead()

storeHeight, err := f.store.Height(ctx)
if err != nil {
f.logger.Warn().Err(err).Msg("failed to get store height during catchup")
continue
}

maxP2PHeight := max(
f.headerSyncService.Store().Height(),
f.dataSyncService.Store().Height(),
)

p2pCaughtUp := ignoreP2P || (maxP2PHeight > 0 && storeHeight >= maxP2PHeight)
if !ignoreP2P && f.catchupTimeout == 0 && maxP2PHeight == 0 {
p2pCaughtUp = true
}

if daCaughtUp && p2pCaughtUp {
f.logger.Info().
Uint64("store_height", storeHeight).
Uint64("max_p2p_height", maxP2PHeight).
Msg("catchup: fully caught up")
return true, nil
}
}
}
}

func (f *failoverState) IsSynced(s *raft.RaftBlockState) (int, error) {
if f.bc.Syncer != nil {
return f.bc.Syncer.IsSyncedWithRaft(s)
Expand All @@ -238,8 +339,6 @@ func (f *failoverState) Recover(ctx context.Context, state *raft.RaftBlockState)
if f.bc.Syncer != nil {
return f.bc.Syncer.RecoverFromRaft(ctx, state)
}
// For aggregator mode without syncer (e.g. based sequencer only?), recovery logic might differ or be implicit.
// But failure to recover means we are stuck.
return errors.New("recovery not supported in this mode")
}

Expand Down
Loading
Loading