Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 55 additions & 1 deletion ethfinalizer/ethfinalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,15 @@ type FinalizerOptions[T any] struct {
// Recommended to be at least 15, 10 is the default chosen by go-ethereum.
PriceBump int

// NonceStuckTimeout is the period after which the finalizer is considered stuck on a nonce.
NonceStuckTimeout time.Duration
// TransactionStuckTimeout is the period after which a transaction is considered stuck.
TransactionStuckTimeout time.Duration
// OnStuck is called when the finalizer is stuck.
OnStuck func(first, latest *Status[T])
// OnUnstuck is called when the finalizer is unstuck.
OnUnstuck func()

// SubscriptionBuffer is the size of the buffer for transaction events.
SubscriptionBuffer int
}
Expand Down Expand Up @@ -86,6 +95,14 @@ func (o FinalizerOptions[T]) IsValid() error {
return fmt.Errorf("negative price bump %v", o.PriceBump)
}

if o.NonceStuckTimeout < 0 {
return fmt.Errorf("negative nonce stuck timeout %v", o.NonceStuckTimeout)
}

if o.TransactionStuckTimeout < 0 {
return fmt.Errorf("negative transaction stuck timeout %v", o.TransactionStuckTimeout)
}

if o.SubscriptionBuffer < 0 {
return fmt.Errorf("negative subscription buffer %v", o.SubscriptionBuffer)
}
Expand All @@ -100,7 +117,7 @@ func (o FinalizerOptions[T]) IsValid() error {
type Finalizer[T any] struct {
FinalizerOptions[T]

isRunning atomic.Bool
isRunning, isStuck atomic.Bool

subscriptions map[chan Event[T]]struct{}
subscriptionsMu sync.RWMutex
Expand All @@ -121,6 +138,18 @@ type Event[T any] struct {
Added *Transaction[T]
}

// Status is the result of sending a Transaction on chain.
//
// Type parameters:
// - T: transaction metadata type
type Status[T any] struct {
Transaction *Transaction[T]
// Time is when the transaction was first committed.
Time time.Time
// Error is the most recent error from sending the transaction on chain.
Error error
}

// Transaction is a transaction with metadata of type T.
//
// Type parameters:
Expand Down Expand Up @@ -256,6 +285,19 @@ func (f *Finalizer[T]) Run(ctx context.Context) error {
return fmt.Errorf("unable to read chain nonce: %w", err)
}

first, latest, err := f.Mempool.Status(ctx, chainNonce)
if err == nil {
if first != nil && latest != nil && (f.NonceStuckTimeout != 0 && time.Since(first.Time) >= f.NonceStuckTimeout || f.TransactionStuckTimeout != 0 && time.Since(latest.Time) >= f.TransactionStuckTimeout) {
if f.isStuck.CompareAndSwap(false, true) && f.OnStuck != nil {
f.OnStuck(first, latest)
}
} else if f.isStuck.CompareAndSwap(true, false) && f.OnUnstuck != nil {
f.OnUnstuck()
}
} else {
f.Logger.ErrorContext(ctx, "unable to read status", slog.Any("error", err), slog.Uint64("nonce", chainNonce))
}

transactions, err := f.Mempool.PriciestTransactions(ctx, chainNonce, time.Now().Add(-f.RetryDelay))
if err != nil {
return fmt.Errorf("unable to read mempool transactions: %w", err)
Expand Down Expand Up @@ -445,6 +487,10 @@ func (f *Finalizer[T]) Run(ctx context.Context) error {
}

if err := f.Chain.Send(ctx, transaction.Transaction); err != nil {
if err := f.Mempool.SetError(ctx, transaction.Hash(), err); err != nil {
f.Logger.ErrorContext(ctx, "unable to set transaction error", slog.Any("error", err), slog.String("transaction", transaction.Hash().String()))
}

f.Logger.ErrorContext(ctx, "unable to resend transaction to chain", slog.Any("error", err), slog.String("transaction", transaction.Hash().String()))
}
} else {
Expand All @@ -455,6 +501,10 @@ func (f *Finalizer[T]) Run(ctx context.Context) error {
}

if err := f.Chain.Send(ctx, replacement); err != nil {
if err := f.Mempool.SetError(ctx, replacement.Hash(), err); err != nil {
f.Logger.ErrorContext(ctx, "unable to set transaction error", slog.Any("error", err), slog.String("transaction", replacement.Hash().String()))
}

f.Logger.ErrorContext(ctx, "unable to send replacement transaction to chain", slog.Any("error", err), slog.String("transaction", replacement.Hash().String()))
}
} else {
Expand Down Expand Up @@ -646,6 +696,10 @@ func (f *Finalizer[T]) Send(ctx context.Context, transaction *types.Transaction,
}

if err := f.Chain.Send(ctx, transaction); err != nil {
if err := f.Mempool.SetError(ctx, transaction.Hash(), err); err != nil {
f.Logger.ErrorContext(ctx, "unable to set transaction error", slog.Any("error", err), slog.String("transaction", transaction.Hash().String()))
}

f.Logger.ErrorContext(ctx, "unable to send transaction to chain", slog.Any("error", err), slog.String("transaction", transaction.Hash().String()))
}

Expand Down
67 changes: 44 additions & 23 deletions ethfinalizer/ethfinalizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,21 @@ import (
)

const (
TestDuration = 10 * time.Second
MonitorPollInterval = 100 * time.Millisecond
FinalizerPollInterval = 1 * time.Second
FinalizerPollTimeout = 1 * time.Second
FinalizerRetryDelay = 5 * time.Second
TransactionsPerSecond = 2
BlockPeriod = 2 * time.Second
StallPeriod = 20 * time.Second
MineProbability = 0.8
ReorgProbability = 0.1
ReorgLimit = 10
TestDuration = 10 * time.Second
MonitorPollInterval = 100 * time.Millisecond
FinalizerPollInterval = 1 * time.Second
FinalizerPollTimeout = 1 * time.Second
FinalizerRetryDelay = 5 * time.Second
FinalizerFeeMargin = 20
FinalizerPriceBump = 10
FinalizerNonceStuckTimeout = 10 * time.Second
FinalizerTransactionStuckTimeout = 5 * time.Second
TransactionsPerSecond = 2
BlockPeriod = 2 * time.Second
StallPeriod = 20 * time.Second
MineProbability = 0.8
ReorgProbability = 0.1
ReorgLimit = 10
)

func TestFinalizerNoEIP1559(t *testing.T) {
Expand Down Expand Up @@ -62,22 +66,39 @@ func test(t *testing.T, isEIP1559 bool) {

mempool := ethfinalizer.NewMemoryMempool[struct{}]()

ctx, cancel := context.WithTimeout(context.Background(), TestDuration)
defer cancel()

finalizer, err := ethfinalizer.NewFinalizer(ethfinalizer.FinalizerOptions[struct{}]{
Wallet: wallet,
Chain: chain,
Mempool: mempool,
Logger: logger,
PollInterval: FinalizerPollInterval,
PollTimeout: FinalizerPollTimeout,
RetryDelay: FinalizerRetryDelay,
FeeMargin: 20,
PriceBump: 10,
Wallet: wallet,
Chain: chain,
Mempool: mempool,
Logger: logger,
PollInterval: FinalizerPollInterval,
PollTimeout: FinalizerPollTimeout,
RetryDelay: FinalizerRetryDelay,
FeeMargin: FinalizerFeeMargin,
PriceBump: FinalizerPriceBump,
NonceStuckTimeout: FinalizerNonceStuckTimeout,
TransactionStuckTimeout: FinalizerTransactionStuckTimeout,
OnStuck: func(first, latest *ethfinalizer.Status[struct{}]) {
logger.DebugContext(
ctx,
"stuck",
slog.String("first", first.Transaction.Hash().String()),
slog.Uint64("firstNonce", first.Transaction.Nonce()),
slog.Duration("firstAge", time.Since(first.Time)),
slog.String("latest", latest.Transaction.Hash().String()),
slog.Uint64("latestNonce", latest.Transaction.Nonce()),
slog.Duration("latestAge", time.Since(latest.Time)),
)
},
OnUnstuck: func() {
logger.DebugContext(ctx, "unstuck")
},
})
assert.NoError(t, err)

ctx, cancel := context.WithTimeout(context.Background(), TestDuration)
defer cancel()

var wg sync.WaitGroup

wg.Go(func() {
Expand Down
105 changes: 74 additions & 31 deletions ethfinalizer/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ethfinalizer

import (
"context"
"fmt"
"sync"
"time"

Expand All @@ -15,35 +16,40 @@ type Mempool[T any] interface {
Nonce(ctx context.Context) (uint64, error)

// Commit persists the signed transaction with its metadata in the store.
// The transaction must be persisted with a timestamp of the current time.
// If the transaction already exists in the mempool, the timestamp must be updated.
// The transaction must be persisted with timestamps of the first and latest submissions.
// If the transaction already exists in the mempool, the latest timestamp must be updated.
Commit(ctx context.Context, transaction *types.Transaction, metadata T) error

// SetError sets the error for a previously committed transaction.
SetError(ctx context.Context, transaction common.Hash, err error) error

// Transactions returns the transactions for the specified hashes which are signed by this specific wallet for this specific chain.
Transactions(ctx context.Context, hashes map[common.Hash]struct{}) (map[common.Hash]*Transaction[T], error)

// PriciestTransactions returns, by nonce, the most expensive transactions signed by this specific wallet for this specific chain, with a minimum nonce and a latest timestamp.
PriciestTransactions(ctx context.Context, fromNonce uint64, before time.Time) (map[uint64]*Transaction[T], error)

// Status returns the statuses for the first and latest transactions for a given nonce.
Status(ctx context.Context, nonce uint64) (*Status[T], *Status[T], error)
}

type memoryMempool[T any] struct {
transactions map[common.Hash]*Transaction[T]
priciestTransactions map[uint64]*timestampedTransaction[T]
highestNonce *uint64
mu sync.RWMutex
transactions map[common.Hash]*Status[T]
nonces map[uint64]*nonceStatus[T]
highestNonce *uint64
mu sync.RWMutex
}

type timestampedTransaction[T any] struct {
*Transaction[T]

timestamp time.Time
type nonceStatus[T any] struct {
first, latest *Status[T]
time time.Time
}

// NewMemoryMempool creates a minimal in-memory Mempool.
func NewMemoryMempool[T any]() Mempool[T] {
return &memoryMempool[T]{
transactions: map[common.Hash]*Transaction[T]{},
priciestTransactions: map[uint64]*timestampedTransaction[T]{},
transactions: map[common.Hash]*Status[T]{},
nonces: map[uint64]*nonceStatus[T]{},
}
}

Expand All @@ -62,28 +68,52 @@ func (m *memoryMempool[T]) Commit(ctx context.Context, transaction *types.Transa
m.mu.Lock()
defer m.mu.Unlock()

transaction_ := Transaction[T]{
Transaction: transaction,
Metadata: metadata,
}

m.transactions[transaction.Hash()] = &transaction_
now := time.Now()

previous := m.priciestTransactions[transaction.Nonce()]
if previous == nil || transaction.GasFeeCapCmp(previous.Transaction.Transaction) > 0 && transaction.GasTipCapCmp(previous.Transaction.Transaction) > 0 {
m.priciestTransactions[transaction.Nonce()] = &timestampedTransaction[T]{
Transaction: &transaction_,
timestamp: time.Now(),
status_ := Status[T]{
Transaction: &Transaction[T]{
Transaction: transaction,
Metadata: metadata,
},
Time: now,
}
m.transactions[transaction.Hash()] = &status_

status := m.nonces[transaction.Nonce()]
if status == nil {
status = &nonceStatus[T]{
first: &status_,
latest: &status_,
time: now,
}
m.nonces[transaction.Nonce()] = status

if m.highestNonce == nil || transaction.Nonce() > *m.highestNonce {
m.highestNonce = new(uint64)
*m.highestNonce = transaction.Nonce()
}
} else if previous.Hash() == transaction.Hash() {
previous.timestamp = time.Now()
}

if transaction.Hash() == status.latest.Transaction.Hash() {
status.time = now
} else if transaction.GasFeeCapCmp(status.latest.Transaction.Transaction) > 0 && transaction.GasTipCapCmp(status.latest.Transaction.Transaction) > 0 {
status.latest = &status_
status.time = now
}

return nil
}

func (m *memoryMempool[T]) SetError(ctx context.Context, transaction common.Hash, err error) error {
m.mu.Lock()
defer m.mu.Unlock()

status := m.transactions[transaction]
if status == nil {
return fmt.Errorf("unknown transaction %v", transaction)
}

status.Error = err
return nil
}

Expand All @@ -93,9 +123,9 @@ func (m *memoryMempool[T]) Transactions(ctx context.Context, hashes map[common.H

transactions := make(map[common.Hash]*Transaction[T], len(hashes))
for hash := range hashes {
transaction := m.transactions[hash]
if transaction != nil {
transactions[hash] = transaction
status := m.transactions[hash]
if status != nil {
transactions[hash] = status.Transaction
}
}

Expand All @@ -113,12 +143,25 @@ func (m *memoryMempool[T]) PriciestTransactions(ctx context.Context, fromNonce u

transactions := make(map[uint64]*Transaction[T], capacity)
for nonce := fromNonce; ; nonce++ {
transaction := m.priciestTransactions[nonce]
if transaction == nil || !transaction.timestamp.Before(before) {
status := m.nonces[nonce]
if status == nil || !status.time.Before(before) {
break
}
transactions[nonce] = transaction.Transaction

transactions[nonce] = status.latest.Transaction
}

return transactions, nil
}

func (m *memoryMempool[T]) Status(ctx context.Context, nonce uint64) (*Status[T], *Status[T], error) {
m.mu.RLock()
defer m.mu.RUnlock()

status := m.nonces[nonce]
if status == nil {
return nil, nil, nil
}

return status.first, status.latest, nil
}
Loading