diff --git a/ethfinalizer/ethfinalizer.go b/ethfinalizer/ethfinalizer.go index ce67e7dd..26e5a38d 100644 --- a/ethfinalizer/ethfinalizer.go +++ b/ethfinalizer/ethfinalizer.go @@ -49,6 +49,15 @@ type FinalizerOptions[T any] struct { // Recommended to be at least 15, 10 is the default chosen by go-ethereum. PriceBump int + // NonceStuckTimeout is the period after which the finalizer is considered stuck on a nonce. + NonceStuckTimeout time.Duration + // TransactionStuckTimeout is the period after which a transaction is considered stuck. + TransactionStuckTimeout time.Duration + // OnStuck is called when the finalizer is stuck. + OnStuck func(first, latest *Status[T]) + // OnUnstuck is called when the finalizer is unstuck. + OnUnstuck func() + // SubscriptionBuffer is the size of the buffer for transaction events. SubscriptionBuffer int } @@ -86,6 +95,14 @@ func (o FinalizerOptions[T]) IsValid() error { return fmt.Errorf("negative price bump %v", o.PriceBump) } + if o.NonceStuckTimeout < 0 { + return fmt.Errorf("negative nonce stuck timeout %v", o.NonceStuckTimeout) + } + + if o.TransactionStuckTimeout < 0 { + return fmt.Errorf("negative transaction stuck timeout %v", o.TransactionStuckTimeout) + } + if o.SubscriptionBuffer < 0 { return fmt.Errorf("negative subscription buffer %v", o.SubscriptionBuffer) } @@ -100,7 +117,7 @@ func (o FinalizerOptions[T]) IsValid() error { type Finalizer[T any] struct { FinalizerOptions[T] - isRunning atomic.Bool + isRunning, isStuck atomic.Bool subscriptions map[chan Event[T]]struct{} subscriptionsMu sync.RWMutex @@ -121,6 +138,18 @@ type Event[T any] struct { Added *Transaction[T] } +// Status is the result of sending a Transaction on chain. +// +// Type parameters: +// - T: transaction metadata type +type Status[T any] struct { + Transaction *Transaction[T] + // Time is when the transaction was first committed. + Time time.Time + // Error is the most recent error from sending the transaction on chain. + Error error +} + // Transaction is a transaction with metadata of type T. // // Type parameters: @@ -256,6 +285,19 @@ func (f *Finalizer[T]) Run(ctx context.Context) error { return fmt.Errorf("unable to read chain nonce: %w", err) } + first, latest, err := f.Mempool.Status(ctx, chainNonce) + if err == nil { + if first != nil && latest != nil && (f.NonceStuckTimeout != 0 && time.Since(first.Time) >= f.NonceStuckTimeout || f.TransactionStuckTimeout != 0 && time.Since(latest.Time) >= f.TransactionStuckTimeout) { + if f.isStuck.CompareAndSwap(false, true) && f.OnStuck != nil { + f.OnStuck(first, latest) + } + } else if f.isStuck.CompareAndSwap(true, false) && f.OnUnstuck != nil { + f.OnUnstuck() + } + } else { + f.Logger.ErrorContext(ctx, "unable to read status", slog.Any("error", err), slog.Uint64("nonce", chainNonce)) + } + transactions, err := f.Mempool.PriciestTransactions(ctx, chainNonce, time.Now().Add(-f.RetryDelay)) if err != nil { return fmt.Errorf("unable to read mempool transactions: %w", err) @@ -445,6 +487,10 @@ func (f *Finalizer[T]) Run(ctx context.Context) error { } if err := f.Chain.Send(ctx, transaction.Transaction); err != nil { + if err := f.Mempool.SetError(ctx, transaction.Hash(), err); err != nil { + f.Logger.ErrorContext(ctx, "unable to set transaction error", slog.Any("error", err), slog.String("transaction", transaction.Hash().String())) + } + f.Logger.ErrorContext(ctx, "unable to resend transaction to chain", slog.Any("error", err), slog.String("transaction", transaction.Hash().String())) } } else { @@ -455,6 +501,10 @@ func (f *Finalizer[T]) Run(ctx context.Context) error { } if err := f.Chain.Send(ctx, replacement); err != nil { + if err := f.Mempool.SetError(ctx, replacement.Hash(), err); err != nil { + f.Logger.ErrorContext(ctx, "unable to set transaction error", slog.Any("error", err), slog.String("transaction", replacement.Hash().String())) + } + f.Logger.ErrorContext(ctx, "unable to send replacement transaction to chain", slog.Any("error", err), slog.String("transaction", replacement.Hash().String())) } } else { @@ -646,6 +696,10 @@ func (f *Finalizer[T]) Send(ctx context.Context, transaction *types.Transaction, } if err := f.Chain.Send(ctx, transaction); err != nil { + if err := f.Mempool.SetError(ctx, transaction.Hash(), err); err != nil { + f.Logger.ErrorContext(ctx, "unable to set transaction error", slog.Any("error", err), slog.String("transaction", transaction.Hash().String())) + } + f.Logger.ErrorContext(ctx, "unable to send transaction to chain", slog.Any("error", err), slog.String("transaction", transaction.Hash().String())) } diff --git a/ethfinalizer/ethfinalizer_test.go b/ethfinalizer/ethfinalizer_test.go index 8b52ceeb..2e26c9a8 100644 --- a/ethfinalizer/ethfinalizer_test.go +++ b/ethfinalizer/ethfinalizer_test.go @@ -19,17 +19,21 @@ import ( ) const ( - TestDuration = 10 * time.Second - MonitorPollInterval = 100 * time.Millisecond - FinalizerPollInterval = 1 * time.Second - FinalizerPollTimeout = 1 * time.Second - FinalizerRetryDelay = 5 * time.Second - TransactionsPerSecond = 2 - BlockPeriod = 2 * time.Second - StallPeriod = 20 * time.Second - MineProbability = 0.8 - ReorgProbability = 0.1 - ReorgLimit = 10 + TestDuration = 10 * time.Second + MonitorPollInterval = 100 * time.Millisecond + FinalizerPollInterval = 1 * time.Second + FinalizerPollTimeout = 1 * time.Second + FinalizerRetryDelay = 5 * time.Second + FinalizerFeeMargin = 20 + FinalizerPriceBump = 10 + FinalizerNonceStuckTimeout = 10 * time.Second + FinalizerTransactionStuckTimeout = 5 * time.Second + TransactionsPerSecond = 2 + BlockPeriod = 2 * time.Second + StallPeriod = 20 * time.Second + MineProbability = 0.8 + ReorgProbability = 0.1 + ReorgLimit = 10 ) func TestFinalizerNoEIP1559(t *testing.T) { @@ -62,22 +66,39 @@ func test(t *testing.T, isEIP1559 bool) { mempool := ethfinalizer.NewMemoryMempool[struct{}]() + ctx, cancel := context.WithTimeout(context.Background(), TestDuration) + defer cancel() + finalizer, err := ethfinalizer.NewFinalizer(ethfinalizer.FinalizerOptions[struct{}]{ - Wallet: wallet, - Chain: chain, - Mempool: mempool, - Logger: logger, - PollInterval: FinalizerPollInterval, - PollTimeout: FinalizerPollTimeout, - RetryDelay: FinalizerRetryDelay, - FeeMargin: 20, - PriceBump: 10, + Wallet: wallet, + Chain: chain, + Mempool: mempool, + Logger: logger, + PollInterval: FinalizerPollInterval, + PollTimeout: FinalizerPollTimeout, + RetryDelay: FinalizerRetryDelay, + FeeMargin: FinalizerFeeMargin, + PriceBump: FinalizerPriceBump, + NonceStuckTimeout: FinalizerNonceStuckTimeout, + TransactionStuckTimeout: FinalizerTransactionStuckTimeout, + OnStuck: func(first, latest *ethfinalizer.Status[struct{}]) { + logger.DebugContext( + ctx, + "stuck", + slog.String("first", first.Transaction.Hash().String()), + slog.Uint64("firstNonce", first.Transaction.Nonce()), + slog.Duration("firstAge", time.Since(first.Time)), + slog.String("latest", latest.Transaction.Hash().String()), + slog.Uint64("latestNonce", latest.Transaction.Nonce()), + slog.Duration("latestAge", time.Since(latest.Time)), + ) + }, + OnUnstuck: func() { + logger.DebugContext(ctx, "unstuck") + }, }) assert.NoError(t, err) - ctx, cancel := context.WithTimeout(context.Background(), TestDuration) - defer cancel() - var wg sync.WaitGroup wg.Go(func() { diff --git a/ethfinalizer/mempool.go b/ethfinalizer/mempool.go index 44dfb7e2..474c5ebf 100644 --- a/ethfinalizer/mempool.go +++ b/ethfinalizer/mempool.go @@ -2,6 +2,7 @@ package ethfinalizer import ( "context" + "fmt" "sync" "time" @@ -15,35 +16,40 @@ type Mempool[T any] interface { Nonce(ctx context.Context) (uint64, error) // Commit persists the signed transaction with its metadata in the store. - // The transaction must be persisted with a timestamp of the current time. - // If the transaction already exists in the mempool, the timestamp must be updated. + // The transaction must be persisted with timestamps of the first and latest submissions. + // If the transaction already exists in the mempool, the latest timestamp must be updated. Commit(ctx context.Context, transaction *types.Transaction, metadata T) error + // SetError sets the error for a previously committed transaction. + SetError(ctx context.Context, transaction common.Hash, err error) error + // Transactions returns the transactions for the specified hashes which are signed by this specific wallet for this specific chain. Transactions(ctx context.Context, hashes map[common.Hash]struct{}) (map[common.Hash]*Transaction[T], error) // PriciestTransactions returns, by nonce, the most expensive transactions signed by this specific wallet for this specific chain, with a minimum nonce and a latest timestamp. PriciestTransactions(ctx context.Context, fromNonce uint64, before time.Time) (map[uint64]*Transaction[T], error) + + // Status returns the statuses for the first and latest transactions for a given nonce. + Status(ctx context.Context, nonce uint64) (*Status[T], *Status[T], error) } type memoryMempool[T any] struct { - transactions map[common.Hash]*Transaction[T] - priciestTransactions map[uint64]*timestampedTransaction[T] - highestNonce *uint64 - mu sync.RWMutex + transactions map[common.Hash]*Status[T] + nonces map[uint64]*nonceStatus[T] + highestNonce *uint64 + mu sync.RWMutex } -type timestampedTransaction[T any] struct { - *Transaction[T] - - timestamp time.Time +type nonceStatus[T any] struct { + first, latest *Status[T] + time time.Time } // NewMemoryMempool creates a minimal in-memory Mempool. func NewMemoryMempool[T any]() Mempool[T] { return &memoryMempool[T]{ - transactions: map[common.Hash]*Transaction[T]{}, - priciestTransactions: map[uint64]*timestampedTransaction[T]{}, + transactions: map[common.Hash]*Status[T]{}, + nonces: map[uint64]*nonceStatus[T]{}, } } @@ -62,28 +68,52 @@ func (m *memoryMempool[T]) Commit(ctx context.Context, transaction *types.Transa m.mu.Lock() defer m.mu.Unlock() - transaction_ := Transaction[T]{ - Transaction: transaction, - Metadata: metadata, - } - - m.transactions[transaction.Hash()] = &transaction_ + now := time.Now() - previous := m.priciestTransactions[transaction.Nonce()] - if previous == nil || transaction.GasFeeCapCmp(previous.Transaction.Transaction) > 0 && transaction.GasTipCapCmp(previous.Transaction.Transaction) > 0 { - m.priciestTransactions[transaction.Nonce()] = ×tampedTransaction[T]{ - Transaction: &transaction_, - timestamp: time.Now(), + status_ := Status[T]{ + Transaction: &Transaction[T]{ + Transaction: transaction, + Metadata: metadata, + }, + Time: now, + } + m.transactions[transaction.Hash()] = &status_ + + status := m.nonces[transaction.Nonce()] + if status == nil { + status = &nonceStatus[T]{ + first: &status_, + latest: &status_, + time: now, } + m.nonces[transaction.Nonce()] = status if m.highestNonce == nil || transaction.Nonce() > *m.highestNonce { m.highestNonce = new(uint64) *m.highestNonce = transaction.Nonce() } - } else if previous.Hash() == transaction.Hash() { - previous.timestamp = time.Now() } + if transaction.Hash() == status.latest.Transaction.Hash() { + status.time = now + } else if transaction.GasFeeCapCmp(status.latest.Transaction.Transaction) > 0 && transaction.GasTipCapCmp(status.latest.Transaction.Transaction) > 0 { + status.latest = &status_ + status.time = now + } + + return nil +} + +func (m *memoryMempool[T]) SetError(ctx context.Context, transaction common.Hash, err error) error { + m.mu.Lock() + defer m.mu.Unlock() + + status := m.transactions[transaction] + if status == nil { + return fmt.Errorf("unknown transaction %v", transaction) + } + + status.Error = err return nil } @@ -93,9 +123,9 @@ func (m *memoryMempool[T]) Transactions(ctx context.Context, hashes map[common.H transactions := make(map[common.Hash]*Transaction[T], len(hashes)) for hash := range hashes { - transaction := m.transactions[hash] - if transaction != nil { - transactions[hash] = transaction + status := m.transactions[hash] + if status != nil { + transactions[hash] = status.Transaction } } @@ -113,12 +143,25 @@ func (m *memoryMempool[T]) PriciestTransactions(ctx context.Context, fromNonce u transactions := make(map[uint64]*Transaction[T], capacity) for nonce := fromNonce; ; nonce++ { - transaction := m.priciestTransactions[nonce] - if transaction == nil || !transaction.timestamp.Before(before) { + status := m.nonces[nonce] + if status == nil || !status.time.Before(before) { break } - transactions[nonce] = transaction.Transaction + + transactions[nonce] = status.latest.Transaction } return transactions, nil } + +func (m *memoryMempool[T]) Status(ctx context.Context, nonce uint64) (*Status[T], *Status[T], error) { + m.mu.RLock() + defer m.mu.RUnlock() + + status := m.nonces[nonce] + if status == nil { + return nil, nil, nil + } + + return status.first, status.latest, nil +}