diff --git a/cmd/bee/cmd/cmd.go b/cmd/bee/cmd/cmd.go index eb09b03021a..cffdeeb7782 100644 --- a/cmd/bee/cmd/cmd.go +++ b/cmd/bee/cmd/cmd.go @@ -8,6 +8,7 @@ import ( "errors" "fmt" "io" + "math/big" "os" "path/filepath" "strings" @@ -17,6 +18,7 @@ import ( "github.com/ethersphere/bee/v2/pkg/log" "github.com/ethersphere/bee/v2/pkg/node" "github.com/ethersphere/bee/v2/pkg/swarm" + "github.com/ethersphere/bee/v2/pkg/transaction" p2pforge "github.com/ipshipyard/p2p-forge/client" "github.com/spf13/cobra" "github.com/spf13/viper" @@ -103,6 +105,13 @@ const ( configKeyBlockchainRpcTLSTimeout = "blockchain-rpc.tls-timeout" configKeyBlockchainRpcIdleTimeout = "blockchain-rpc.idle-timeout" configKeyBlockchainRpcKeepalive = "blockchain-rpc.keepalive" + + // transaction retry + optionNameTransactionRetryDelay = "transaction-retry-delay" + optionNameTransactionFeePriority = "transaction-fee-priority" + optionNameTransactionFeePriorityMax = "transaction-fee-priority-max" + optionNameTransactionFeeMaxTxPriceWei = "transaction-fee-max-tx-price-wei" + optionNameFeeHistoryBlockCount = "fee-history-block-count" ) var blockchainRpcConfigPairs = []struct{ flat, dotted string }{ @@ -314,6 +323,7 @@ func (c *command) setAllFlags(cmd *cobra.Command) { cmd.Flags().String(optionNameStakingAddress, "", "staking contract address") cmd.Flags().Uint64(optionNameBlockTime, 5, "chain block time") cmd.Flags().Uint64(optionNameBlockSyncInterval, 10, "block number cache sync interval in blocks") + cmd.Flags().Uint64(optionNameFeeHistoryBlockCount, 100, "eth_feeHistory block count for fee hints") cmd.Flags().Duration(optionWarmUpTime, time.Minute*5, "maximum node warmup duration; proceeds when stable or after this time") cmd.Flags().Bool(optionNameMainNet, true, "triggers connect to main net bootnodes.") cmd.Flags().Bool(optionNameRetrievalCaching, true, "enable forwarded content caching") @@ -333,6 +343,10 @@ func (c *command) setAllFlags(cmd *cobra.Command) { cmd.Flags().Bool(optionSkipPostageSnapshot, false, "skip postage snapshot") cmd.Flags().Uint64(optionNameMinimumGasTipCap, 0, "minimum gas tip cap in wei for transactions, 0 means use suggested gas tip cap") cmd.Flags().Uint64(optionNameGasLimitFallback, 500_000, "gas limit fallback when estimation fails for contract transactions") + cmd.Flags().Duration(optionNameTransactionRetryDelay, time.Minute, "how long to wait for a receipt before escalating fees in transactions with retry") + cmd.Flags().String(optionNameTransactionFeePriority, "market", "starting fee priority for transaction broadcasts (low, market, aggressive)") + cmd.Flags().String(optionNameTransactionFeePriorityMax, "aggressive", "maximum fee priority for transaction escalation (low, market, aggressive)") + cmd.Flags().Uint64(optionNameTransactionFeeMaxTxPriceWei, 0, "maximum maxFeePerGas in wei per gas for transactions with retry; 0 means no limit") cmd.Flags().Bool(optionNameP2PWSSEnable, false, "Enable Secure WebSocket P2P connections") cmd.Flags().String(optionP2PWSSAddr, ":1635", "p2p wss address") cmd.Flags().String(optionNATWSSAddr, "", "WSS NAT exposed address") @@ -380,6 +394,18 @@ func (c *command) bindBlockchainRpcConfig(cmd *cobra.Command) { } } +func txRetryConfigFromCommand(c *command) transaction.TransactionsRetryConfig { + cfg := transaction.TransactionsRetryConfig{ + RetryDelay: c.config.GetDuration(optionNameTransactionRetryDelay), + StartTier: c.config.GetString(optionNameTransactionFeePriority), + EndTier: c.config.GetString(optionNameTransactionFeePriorityMax), + } + if v := c.config.GetUint64(optionNameTransactionFeeMaxTxPriceWei); v != 0 { + cfg.MaxTxPrice = new(big.Int).SetUint64(v) + } + return cfg +} + func newLogger(cmd *cobra.Command, verbosity string) (log.Logger, error) { var ( sink = cmd.OutOrStdout() diff --git a/cmd/bee/cmd/deploy.go b/cmd/bee/cmd/deploy.go index 874dcc38a8b..60aab3c73b4 100644 --- a/cmd/bee/cmd/deploy.go +++ b/cmd/bee/cmd/deploy.go @@ -60,6 +60,8 @@ func (c *command) initDeployCmd() error { Keepalive: c.config.GetDuration(configKeyBlockchainRpcKeepalive), }, c.config.GetUint64(optionNameBlockSyncInterval), + c.config.GetUint64(optionNameFeeHistoryBlockCount), + txRetryConfigFromCommand(c), ) if err != nil { return err diff --git a/cmd/bee/cmd/start.go b/cmd/bee/cmd/start.go index 0e13c75ccb3..38f84fa7f41 100644 --- a/cmd/bee/cmd/start.go +++ b/cmd/bee/cmd/start.go @@ -300,6 +300,8 @@ func buildBeeNode(ctx context.Context, c *command, cmd *cobra.Command, logger lo BlockProfile: c.config.GetBool(optionNamePProfBlock), BlockTime: networkConfig.blockTime, BlockSyncInterval: c.config.GetUint64(optionNameBlockSyncInterval), + FeeHistoryBlockCount: c.config.GetUint64(optionNameFeeHistoryBlockCount), + TransactionRetry: txRetryConfigFromCommand(c), BootnodeMode: bootNode, Bootnodes: networkConfig.bootNodes, CacheCapacity: c.config.GetUint64(optionNameCacheCapacity), diff --git a/openapi/SwarmCommon.yaml b/openapi/SwarmCommon.yaml index ffcbbac3b8e..c0d01c05414 100644 --- a/openapi/SwarmCommon.yaml +++ b/openapi/SwarmCommon.yaml @@ -1126,7 +1126,9 @@ components: schema: $ref: "SwarmCommon.yaml#/components/schemas/GasPrice" required: false - description: "Gas price for transaction" + description: > + Maximum gas price for the transaction. When set, the node + uses the legacy send path, otherwise transaction is sent with retry and automatically escalated fees. GasLimitParameter: in: header diff --git a/pkg/api/api.go b/pkg/api/api.go index acd838a3ff6..ac63c71b84f 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -94,10 +94,11 @@ const ( SwarmActPublisherHeader = "Swarm-Act-Publisher" SwarmActHistoryAddressHeader = "Swarm-Act-History-Address" - ImmutableHeader = "Immutable" - GasPriceHeader = "Gas-Price" - GasLimitHeader = "Gas-Limit" - ETagHeader = "ETag" + ImmutableHeader = "Immutable" + GasPriceHeader = "Gas-Price" + GasLimitHeader = "Gas-Limit" + FeePriorityHeader = "Fee-Priority" + ETagHeader = "ETag" AuthorizationHeader = "Authorization" AcceptEncodingHeader = "Accept-Encoding" @@ -557,8 +558,9 @@ func (s *Service) gasConfigMiddleware(handlerName string) func(h http.Handler) h logger := s.logger.WithName(handlerName).Build() headers := struct { - GasPrice *big.Int `map:"Gas-Price"` - GasLimit uint64 `map:"Gas-Limit"` + GasPrice *big.Int `map:"Gas-Price"` + GasLimit uint64 `map:"Gas-Limit"` + FeePriority string `map:"Fee-Priority"` }{} if response := s.mapStructure(r.Header, &headers); response != nil { response("invalid header params", logger, w) @@ -567,6 +569,22 @@ func (s *Service) gasConfigMiddleware(handlerName string) func(h http.Handler) h ctx := r.Context() ctx = sctx.SetGasPrice(ctx, headers.GasPrice) ctx = sctx.SetGasLimit(ctx, headers.GasLimit) + if headers.FeePriority != "" { + tier, err := transaction.ParseFeeTier(headers.FeePriority) + if err != nil { + logger.Debug("invalid fee priority header", "error", err) + jsonhttp.BadRequest(w, jsonhttp.StatusResponse{ + Message: "invalid header params", + Code: http.StatusBadRequest, + Reasons: []jsonhttp.Reason{{ + Field: FeePriorityHeader, + Error: err.Error(), + }}, + }) + return + } + ctx = sctx.SetFeePriority(ctx, tier.String()) + } h.ServeHTTP(w, r.WithContext(ctx)) }) @@ -581,7 +599,7 @@ func (s *Service) corsHandler(h http.Handler) http.Handler { SwarmTagHeader, SwarmPinHeader, SwarmEncryptHeader, SwarmIndexDocumentHeader, SwarmErrorDocumentHeader, SwarmCollectionHeader, SwarmPostageBatchIdHeader, SwarmPostageStampHeader, SwarmDeferredUploadHeader, SwarmRedundancyLevelHeader, SwarmRedundancyStrategyHeader, SwarmRedundancyFallbackModeHeader, SwarmChunkRetrievalTimeoutHeader, SwarmLookAheadBufferSizeHeader, - SwarmFeedIndexHeader, SwarmFeedIndexNextHeader, SwarmSocSignatureHeader, SwarmOnlyRootChunk, GasPriceHeader, GasLimitHeader, ImmutableHeader, + SwarmFeedIndexHeader, SwarmFeedIndexNextHeader, SwarmSocSignatureHeader, SwarmOnlyRootChunk, GasPriceHeader, GasLimitHeader, FeePriorityHeader, ImmutableHeader, SwarmActHeader, SwarmActTimestampHeader, SwarmActPublisherHeader, SwarmActHistoryAddressHeader, } allowedHeadersStr := strings.Join(allowedHeaders, ", ") diff --git a/pkg/node/chain.go b/pkg/node/chain.go index ef1a63c9881..c714c8c9fca 100644 --- a/pkg/node/chain.go +++ b/pkg/node/chain.go @@ -64,6 +64,8 @@ func InitChain( fallbackGasLimit uint64, rpcCfg BlockchainRPCConfig, blockSyncInterval uint64, + feeHistoryBlockCount uint64, + retryCfg transaction.TransactionsRetryConfig, ) (transaction.Backend, common.Address, int64, transaction.Monitor, transaction.Service, error) { backend := backendnoop.New(chainID) @@ -98,7 +100,7 @@ func InitChain( logger.Info("connected to blockchain backend", "version", versionString) - backend = wrapped.NewBackend(ethclient.NewClient(rpcClient), minimumGasTipCap, pollingInterval, blockSyncInterval) + backend = wrapped.NewBackend(ethclient.NewClient(rpcClient), minimumGasTipCap, pollingInterval, blockSyncInterval, feeHistoryBlockCount) } backendChainID, err := backend.ChainID(ctx) @@ -117,7 +119,7 @@ func InitChain( transactionMonitor := transaction.NewMonitor(logger, backend, overlayEthAddress, pollingInterval, cancellationDepth) - transactionService, err := transaction.NewService(logger, overlayEthAddress, backend, signer, stateStore, backendChainID, transactionMonitor, fallbackGasLimit) + transactionService, err := transaction.NewService(logger, overlayEthAddress, backend, signer, stateStore, backendChainID, transactionMonitor, fallbackGasLimit, retryCfg) if err != nil { return nil, common.Address{}, 0, nil, nil, fmt.Errorf("transaction service: %w", err) } diff --git a/pkg/node/node.go b/pkg/node/node.go index 0b3c6f12bd4..4180fc2d5d4 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -142,6 +142,8 @@ type Options struct { BlockProfile bool BlockTime time.Duration BlockSyncInterval uint64 + FeeHistoryBlockCount uint64 + TransactionRetry transaction.TransactionsRetryConfig BootnodeMode bool Bootnodes []string CacheCapacity uint64 @@ -438,6 +440,8 @@ func NewBee( Keepalive: o.BlockchainRpcKeepalive, }, o.BlockSyncInterval, + o.FeeHistoryBlockCount, + o.TransactionRetry, ) if err != nil { return nil, fmt.Errorf("init chain: %w", err) @@ -1409,6 +1413,9 @@ func NewBee( if swapBackendMetrics, ok := chainBackend.(metrics.Collector); ok { apiService.MustRegisterMetrics(swapBackendMetrics.Metrics()...) } + if txMetrics, ok := transactionService.(metrics.Collector); ok { + apiService.MustRegisterMetrics(txMetrics.Metrics()...) + } if l, ok := logger.(metrics.Collector); ok { apiService.MustRegisterMetrics(l.Metrics()...) diff --git a/pkg/postage/postagecontract/contract.go b/pkg/postage/postagecontract/contract.go index 42317014b38..bab10f4d0cd 100644 --- a/pkg/postage/postagecontract/contract.go +++ b/pkg/postage/postagecontract/contract.go @@ -180,21 +180,7 @@ func (c *postageContract) sendApproveTransaction(ctx context.Context, amount *bi ) }() - txHash, err := c.transactionService.Send(ctx, request, transaction.DefaultTipBoostPercent) - if err != nil { - return nil, err - } - - receipt, err = c.transactionService.WaitForReceipt(ctx, txHash) - if err != nil { - return nil, err - } - - if receipt.Status == 0 { - return nil, transaction.ErrTransactionReverted - } - - return receipt, nil + return c.sendRequest(ctx, request) } func (c *postageContract) sendTransaction(ctx context.Context, callData []byte, desc string) (receipt *types.Receipt, err error) { @@ -216,20 +202,32 @@ func (c *postageContract) sendTransaction(ctx context.Context, callData []byte, ) }() - txHash, err := c.transactionService.Send(ctx, request, transaction.DefaultTipBoostPercent) - if err != nil { - return nil, err + return c.sendRequest(ctx, request) +} + +// sendRequest sends a postage transaction. When Gas-Price is set in the request +// context the legacy Send path is used (client spend ceiling); otherwise +// SendWithRetry applies automatic fee escalation. +func (c *postageContract) sendRequest(ctx context.Context, request *transaction.TxRequest) (*types.Receipt, error) { + if sctx.GetGasPrice(ctx) != nil { + txHash, err := c.transactionService.Send(ctx, request, transaction.DefaultTipBoostPercent) + if err != nil { + return nil, err + } + receipt, err := c.transactionService.WaitForReceipt(ctx, txHash) + if err != nil { + return nil, err + } + if receipt.Status == 0 { + return nil, transaction.ErrTransactionReverted + } + return receipt, nil } - receipt, err = c.transactionService.WaitForReceipt(ctx, txHash) + _, receipt, err := c.transactionService.SendWithRetry(ctx, request) if err != nil { return nil, err } - - if receipt.Status == 0 { - return nil, transaction.ErrTransactionReverted - } - return receipt, nil } diff --git a/pkg/postage/postagecontract/contract_test.go b/pkg/postage/postagecontract/contract_test.go index 92020efbe9a..6592a7f23a7 100644 --- a/pkg/postage/postagecontract/contract_test.go +++ b/pkg/postage/postagecontract/contract_test.go @@ -74,36 +74,25 @@ func TestCreateBatch(t *testing.T) { postageStampContractABI, bzzTokenAddress, transactionMock.New( - transactionMock.WithSendFunc(func(ctx context.Context, request *transaction.TxRequest, boost int) (txHash common.Hash, err error) { + transactionMock.WithSendWithRetryFunc(func(ctx context.Context, request *transaction.TxRequest) (common.Hash, *types.Receipt, error) { switch *request.To { case bzzTokenAddress: - return txHashApprove, nil + return txHashApprove, &types.Receipt{Status: 1}, nil case postageStampAddress: if bytes.Equal(expectedCallDataForExpireLimitedBatches[:32], request.Data[:32]) { - return txHashApprove, nil + return txHashApprove, &types.Receipt{Status: 1}, nil } if !bytes.Equal(expectedCallData[:100], request.Data[:100]) { - return common.Hash{}, fmt.Errorf("got wrong call data. wanted %x, got %x", expectedCallData, request.Data) + return common.Hash{}, nil, fmt.Errorf("got wrong call data. wanted %x, got %x", expectedCallData, request.Data) } - return txHashCreate, nil - } - return common.Hash{}, errors.New("sent to wrong contract") - }), - transactionMock.WithWaitForReceiptFunc(func(ctx context.Context, txHash common.Hash) (receipt *types.Receipt, err error) { - switch txHash { - case txHashApprove: - return &types.Receipt{ - Status: 1, - }, nil - case txHashCreate: - return &types.Receipt{ + return txHashCreate, &types.Receipt{ Logs: []*types.Log{ newCreateEvent(postageStampAddress, batchID), }, Status: 1, }, nil } - return nil, errors.New("unknown tx hash") + return common.Hash{}, nil, errors.New("sent to wrong contract") }), transactionMock.WithCallFunc(func(ctx context.Context, request *transaction.TxRequest) (result []byte, err error) { if *request.To == bzzTokenAddress { @@ -319,33 +308,22 @@ func TestTopUpBatch(t *testing.T) { postageStampContractABI, bzzTokenAddress, transactionMock.New( - transactionMock.WithSendFunc(func(ctx context.Context, request *transaction.TxRequest, boost int) (txHash common.Hash, err error) { + transactionMock.WithSendWithRetryFunc(func(ctx context.Context, request *transaction.TxRequest) (common.Hash, *types.Receipt, error) { switch *request.To { case bzzTokenAddress: - return txHashApprove, nil + return txHashApprove, &types.Receipt{Status: 1}, nil case postageStampAddress: if !bytes.Equal(expectedCallData[:64], request.Data[:64]) { - return common.Hash{}, fmt.Errorf("got wrong call data. wanted %x, got %x", expectedCallData, request.Data) + return common.Hash{}, nil, fmt.Errorf("got wrong call data. wanted %x, got %x", expectedCallData, request.Data) } - return txHashTopup, nil - } - return common.Hash{}, errors.New("sent to wrong contract") - }), - transactionMock.WithWaitForReceiptFunc(func(ctx context.Context, txHash common.Hash) (receipt *types.Receipt, err error) { - switch txHash { - case txHashApprove: - return &types.Receipt{ - Status: 1, - }, nil - case txHashTopup: - return &types.Receipt{ + return txHashTopup, &types.Receipt{ Logs: []*types.Log{ newTopUpEvent(postageStampAddress, batch), }, Status: 1, }, nil } - return nil, errors.New("unknown tx hash") + return common.Hash{}, nil, errors.New("sent to wrong contract") }), transactionMock.WithCallFunc(func(ctx context.Context, request *transaction.TxRequest) (result []byte, err error) { if *request.To == bzzTokenAddress { @@ -490,33 +468,22 @@ func TestDiluteBatch(t *testing.T) { postageStampContractABI, bzzTokenAddress, transactionMock.New( - transactionMock.WithSendFunc(func(ctx context.Context, request *transaction.TxRequest, boost int) (txHash common.Hash, err error) { + transactionMock.WithSendWithRetryFunc(func(ctx context.Context, request *transaction.TxRequest) (common.Hash, *types.Receipt, error) { if *request.To == postageStampAddress { if bytes.Equal(expectedCallDataForExpireLimitedBatches[:32], request.Data[:32]) { - return txHashApprove, nil + return txHashApprove, &types.Receipt{Status: 1}, nil } if !bytes.Equal(expectedCallData[:64], request.Data[:64]) { - return common.Hash{}, fmt.Errorf("got wrong call data. wanted %x, got %x", expectedCallData, request.Data) + return common.Hash{}, nil, fmt.Errorf("got wrong call data. wanted %x, got %x", expectedCallData, request.Data) } - return txHashDilute, nil - } - return common.Hash{}, errors.New("sent to wrong contract") - }), - transactionMock.WithWaitForReceiptFunc(func(ctx context.Context, txHash common.Hash) (receipt *types.Receipt, err error) { - if txHash == txHashDilute { - return &types.Receipt{ + return txHashDilute, &types.Receipt{ Logs: []*types.Log{ newDiluteEvent(postageStampAddress, batch), }, Status: 1, }, nil } - if txHash == txHashApprove { - return &types.Receipt{ - Status: 1, - }, nil - } - return nil, errors.New("unknown tx hash") + return common.Hash{}, nil, errors.New("sent to wrong contract") }), transactionMock.WithCallFunc(func(ctx context.Context, request *transaction.TxRequest) (result []byte, err error) { if *request.To == postageStampAddress { @@ -663,12 +630,8 @@ func TestBatchExpirer(t *testing.T) { } } return nil, errors.New("unexpected call") - }), transactionMock.WithSendFunc(func(ctx context.Context, request *transaction.TxRequest, i int) (txHash common.Hash, err error) { - return common.Hash{}, nil - }), transactionMock.WithWaitForReceiptFunc(func(ctx context.Context, txHash common.Hash) (receipt *types.Receipt, err error) { - return &types.Receipt{ - Status: 1, - }, nil + }), transactionMock.WithSendWithRetryFunc(func(ctx context.Context, request *transaction.TxRequest) (common.Hash, *types.Receipt, error) { + return common.Hash{}, &types.Receipt{Status: 1}, nil }), ), postageMock, @@ -805,13 +768,13 @@ func TestBatchExpirer(t *testing.T) { } } return nil, errors.New("unexpected call") - }), transactionMock.WithSendFunc(func(ctx context.Context, request *transaction.TxRequest, i int) (txHash common.Hash, err error) { + }), transactionMock.WithSendWithRetryFunc(func(ctx context.Context, request *transaction.TxRequest) (common.Hash, *types.Receipt, error) { if *request.To == postageContractAddress { if bytes.Equal(expectedCallDataForExpireLimitedBatches[:32], request.Data[:32]) { - return txHash, fmt.Errorf("some error") + return common.Hash{}, nil, fmt.Errorf("some error") } } - return txHash, errors.New("unexpected call") + return common.Hash{}, nil, errors.New("unexpected call") }), ), postageMock, @@ -928,12 +891,8 @@ func TestBatchExpirer(t *testing.T) { } } return nil, errors.New("unexpected call") - }), transactionMock.WithSendFunc(func(ctx context.Context, request *transaction.TxRequest, i int) (txHash common.Hash, err error) { - return common.Hash{}, nil - }), transactionMock.WithWaitForReceiptFunc(func(ctx context.Context, txHash common.Hash) (receipt *types.Receipt, err error) { - return &types.Receipt{ - Status: 0, - }, nil + }), transactionMock.WithSendWithRetryFunc(func(ctx context.Context, request *transaction.TxRequest) (common.Hash, *types.Receipt, error) { + return common.Hash{}, &types.Receipt{Status: 0}, transaction.ErrTransactionReverted }), ), postageMock, diff --git a/pkg/sctx/sctx.go b/pkg/sctx/sctx.go index e2f1d3aefd4..8f2d8ccde66 100644 --- a/pkg/sctx/sctx.go +++ b/pkg/sctx/sctx.go @@ -20,6 +20,7 @@ type ( requestHostKey struct{} gasPriceKey struct{} gasLimitKey struct{} + feePriorityKey struct{} ) // SetHost sets the http request host in the context @@ -67,3 +68,15 @@ func GetGasPrice(ctx context.Context) *big.Int { } return nil } + +func SetFeePriority(ctx context.Context, priority string) context.Context { + return context.WithValue(ctx, feePriorityKey{}, priority) +} + +func GetFeePriority(ctx context.Context) string { + v, ok := ctx.Value(feePriorityKey{}).(string) + if ok { + return v + } + return "" +} diff --git a/pkg/storageincentives/redistribution/redistribution.go b/pkg/storageincentives/redistribution/redistribution.go index 77013a8a990..18f3b888262 100644 --- a/pkg/storageincentives/redistribution/redistribution.go +++ b/pkg/storageincentives/redistribution/redistribution.go @@ -6,6 +6,7 @@ package redistribution import ( "context" + "errors" "fmt" "math/big" @@ -115,7 +116,7 @@ func (c *contract) Claim(ctx context.Context, proofs ChunkInclusionProofs) (comm Value: big.NewInt(0), Description: "claim win transaction", } - txHash, err := c.sendAndWait(ctx, request, BoostTipPercent) + txHash, err := c.sendAndWait(ctx, request) if err != nil { return txHash, fmt.Errorf("claim: %w", err) } @@ -138,7 +139,7 @@ func (c *contract) Commit(ctx context.Context, obfusHash []byte, round uint64) ( Value: big.NewInt(0), Description: "commit transaction", } - txHash, err := c.sendAndWait(ctx, request, BoostTipPercent) + txHash, err := c.sendAndWait(ctx, request) if err != nil { return txHash, fmt.Errorf("commit: obfusHash %v: %w", common.BytesToHash(obfusHash), err) } @@ -161,7 +162,7 @@ func (c *contract) Reveal(ctx context.Context, storageDepth uint8, reserveCommit Value: big.NewInt(0), Description: "reveal transaction", } - txHash, err := c.sendAndWait(ctx, request, BoostTipPercent) + txHash, err := c.sendAndWait(ctx, request) if err != nil { return txHash, fmt.Errorf("reveal: storageDepth %d reserveCommitmentHash %v RandomNonce %v: %w", storageDepth, common.BytesToHash(reserveCommitmentHash), common.BytesToHash(RandomNonce), err) } @@ -189,7 +190,7 @@ func (c *contract) ReserveSalt(ctx context.Context) ([]byte, error) { return salt[:], nil } -func (c *contract) sendAndWait(ctx context.Context, request *transaction.TxRequest, boostPercent int) (txHash common.Hash, err error) { +func (c *contract) sendAndWait(ctx context.Context, request *transaction.TxRequest) (txHash common.Hash, err error) { defer func() { err = c.txService.UnwrapABIError( ctx, @@ -199,17 +200,12 @@ func (c *contract) sendAndWait(ctx context.Context, request *transaction.TxReque ) }() - txHash, err = c.txService.Send(ctx, request, boostPercent) + txHash, receipt, err := c.txService.SendWithRetry(ctx, request) if err != nil { return txHash, err } - receipt, err := c.txService.WaitForReceipt(ctx, txHash) - if err != nil { - return txHash, err - } - - if receipt.Status == 0 { - return txHash, transaction.ErrTransactionReverted + if receipt == nil { + return txHash, errors.New("missing receipt after send with retry") } return txHash, nil } diff --git a/pkg/storageincentives/redistribution/redistribution_test.go b/pkg/storageincentives/redistribution/redistribution_test.go index b48e9014b02..85ec99b7b19 100644 --- a/pkg/storageincentives/redistribution/redistribution_test.go +++ b/pkg/storageincentives/redistribution/redistribution_test.go @@ -203,22 +203,14 @@ func TestRedistribution(t *testing.T) { owner, log.Noop, transactionMock.New( - transactionMock.WithSendFunc(func(ctx context.Context, request *transaction.TxRequest, boost int) (txHash common.Hash, err error) { + transactionMock.WithSendWithRetryFunc(func(ctx context.Context, request *transaction.TxRequest) (common.Hash, *types.Receipt, error) { if *request.To == redistributionContractAddress { if !bytes.Equal(expectedCallData[:32], request.Data[:32]) { - return common.Hash{}, fmt.Errorf("got wrong call data. wanted %x, got %x", expectedCallData, request.Data) + return common.Hash{}, nil, fmt.Errorf("got wrong call data. wanted %x, got %x", expectedCallData, request.Data) } - return txHashDeposited, nil + return txHashDeposited, &types.Receipt{Status: 1}, nil } - return common.Hash{}, errors.New("sent to wrong contract") - }), - transactionMock.WithWaitForReceiptFunc(func(ctx context.Context, txHash common.Hash) (receipt *types.Receipt, err error) { - if txHash == txHashDeposited { - return &types.Receipt{ - Status: 1, - }, nil - } - return nil, errors.New("unknown tx hash") + return common.Hash{}, nil, errors.New("sent to wrong contract") }), ), redistributionContractAddress, @@ -245,22 +237,14 @@ func TestRedistribution(t *testing.T) { owner, log.Noop, transactionMock.New( - transactionMock.WithSendFunc(func(ctx context.Context, request *transaction.TxRequest, boost int) (txHash common.Hash, err error) { + transactionMock.WithSendWithRetryFunc(func(ctx context.Context, request *transaction.TxRequest) (common.Hash, *types.Receipt, error) { if *request.To == redistributionContractAddress { if !bytes.Equal(expectedCallData[:32], request.Data[:32]) { - return common.Hash{}, fmt.Errorf("got wrong call data. wanted %x, got %x", expectedCallData, request.Data) + return common.Hash{}, nil, fmt.Errorf("got wrong call data. wanted %x, got %x", expectedCallData, request.Data) } - return txHashDeposited, nil - } - return common.Hash{}, errors.New("sent to wrong contract") - }), - transactionMock.WithWaitForReceiptFunc(func(ctx context.Context, txHash common.Hash) (receipt *types.Receipt, err error) { - if txHash == txHashDeposited { - return &types.Receipt{ - Status: 0, - }, nil + return txHashDeposited, &types.Receipt{Status: 0}, transaction.ErrTransactionReverted } - return nil, errors.New("unknown tx hash") + return common.Hash{}, nil, errors.New("sent to wrong contract") }), ), redistributionContractAddress, @@ -288,22 +272,14 @@ func TestRedistribution(t *testing.T) { owner, log.Noop, transactionMock.New( - transactionMock.WithSendFunc(func(ctx context.Context, request *transaction.TxRequest, boost int) (txHash common.Hash, err error) { + transactionMock.WithSendWithRetryFunc(func(ctx context.Context, request *transaction.TxRequest) (common.Hash, *types.Receipt, error) { if *request.To == redistributionContractAddress { if !bytes.Equal(expectedCallData[:32], request.Data[:32]) { - return common.Hash{}, fmt.Errorf("got wrong call data. wanted %x, got %x", expectedCallData, request.Data) + return common.Hash{}, nil, fmt.Errorf("got wrong call data. wanted %x, got %x", expectedCallData, request.Data) } - return txHashDeposited, nil + return txHashDeposited, &types.Receipt{Status: 1}, nil } - return common.Hash{}, errors.New("sent to wrong contract") - }), - transactionMock.WithWaitForReceiptFunc(func(ctx context.Context, txHash common.Hash) (receipt *types.Receipt, err error) { - if txHash == txHashDeposited { - return &types.Receipt{ - Status: 1, - }, nil - } - return nil, errors.New("unknown tx hash") + return common.Hash{}, nil, errors.New("sent to wrong contract") }), ), redistributionContractAddress, @@ -333,22 +309,14 @@ func TestRedistribution(t *testing.T) { owner, log.Noop, transactionMock.New( - transactionMock.WithSendFunc(func(ctx context.Context, request *transaction.TxRequest, _ int) (txHash common.Hash, err error) { + transactionMock.WithSendWithRetryFunc(func(ctx context.Context, request *transaction.TxRequest) (common.Hash, *types.Receipt, error) { if *request.To == redistributionContractAddress { if !bytes.Equal(expectedCallData[:32], request.Data[:32]) { - return common.Hash{}, fmt.Errorf("got wrong call data. wanted %x, got %x", expectedCallData, request.Data) + return common.Hash{}, nil, fmt.Errorf("got wrong call data. wanted %x, got %x", expectedCallData, request.Data) } - return txHashDeposited, nil - } - return common.Hash{}, errors.New("sent to wrong contract") - }), - transactionMock.WithWaitForReceiptFunc(func(ctx context.Context, txHash common.Hash) (receipt *types.Receipt, err error) { - if txHash == txHashDeposited { - return &types.Receipt{ - Status: 1, - }, nil + return txHashDeposited, &types.Receipt{Status: 1}, nil } - return nil, errors.New("unknown tx hash") + return common.Hash{}, nil, errors.New("sent to wrong contract") }), ), redistributionContractAddress, @@ -421,7 +389,10 @@ func TestRedistribution(t *testing.T) { t.Run("invalid call data", func(t *testing.T) { t.Parallel() - expectedCallData, err := redistributionContractABI.Pack("commit", common.BytesToHash(common.Hex2Bytes("some hash")), uint64(0)) + // Use valid distinct hashes: Hex2Bytes("some hash") and Hex2Bytes("hash") both decode to empty bytes. + expectedHash := common.HexToHash("0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa") + actualHash := common.Hex2Bytes("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb") + expectedCallData, err := redistributionContractABI.Pack("commit", expectedHash, uint64(0)) if err != nil { t.Fatal(err) } @@ -430,14 +401,14 @@ func TestRedistribution(t *testing.T) { owner, log.Noop, transactionMock.New( - transactionMock.WithSendFunc(func(ctx context.Context, request *transaction.TxRequest, boost int) (txHash common.Hash, err error) { + transactionMock.WithSendWithRetryFunc(func(ctx context.Context, request *transaction.TxRequest) (common.Hash, *types.Receipt, error) { if *request.To == redistributionContractAddress { - if !bytes.Equal(expectedCallData[:], request.Data[:]) { - return common.Hash{}, fmt.Errorf("got wrong call data. wanted %x, got %x", expectedCallData, request.Data) + if !bytes.Equal(expectedCallData, request.Data) { + return common.Hash{}, nil, fmt.Errorf("got wrong call data. wanted %x, got %x", expectedCallData, request.Data) } - return txHashDeposited, nil + return txHashDeposited, &types.Receipt{Status: 1}, nil } - return common.Hash{}, errors.New("sent to wrong contract") + return common.Hash{}, nil, errors.New("sent to wrong contract") }), ), redistributionContractAddress, @@ -445,7 +416,7 @@ func TestRedistribution(t *testing.T) { 0, ) - _, err = contract.Commit(ctx, common.Hex2Bytes("hash"), 0) + _, err = contract.Commit(ctx, actualHash, 0) if err == nil { t.Fatal("expected error") } diff --git a/pkg/transaction/backend.go b/pkg/transaction/backend.go index 075dbfe19b8..48a1c64abee 100644 --- a/pkg/transaction/backend.go +++ b/pkg/transaction/backend.go @@ -17,10 +17,19 @@ import ( "github.com/ethersphere/bee/v2/pkg/transaction/backend" ) +// FeeHistorySuggestedFeeAndTips are max-fee-per-gas style estimates from eth_feeHistory over the last N blocks (by default 100) +// Low, Market, and Aggressive are the median per-block priority fee at the 10th, 50th, and 90th reward percentiles respectively (each priority tier is floored by the configured minimum tip). +type FeeHistorySuggestedFeeAndTips struct { + LowTip *big.Int + MarketTip *big.Int + AggressiveTip *big.Int +} + // Backend is the minimum of blockchain backend functions we need. type Backend interface { backend.Geth SuggestedFeeAndTip(ctx context.Context, gasPrice *big.Int, boostPercent int) (*big.Int, *big.Int, error) + SuggestedFeeAndTipsFromHistory(ctx context.Context, lastBlock *big.Int) (*FeeHistorySuggestedFeeAndTips, error) } // IsSynced will check if we are synced with the given blockchain backend. This diff --git a/pkg/transaction/backend/backend.go b/pkg/transaction/backend/backend.go index e0d723f5f45..c08dcdfcc2f 100644 --- a/pkg/transaction/backend/backend.go +++ b/pkg/transaction/backend/backend.go @@ -22,6 +22,7 @@ type Geth interface { Close() CodeAt(ctx context.Context, contract common.Address, blockNumber *big.Int) ([]byte, error) EstimateGas(ctx context.Context, msg ethereum.CallMsg) (uint64, error) + FeeHistory(ctx context.Context, blockCount uint64, lastBlock *big.Int, rewardPercentiles []float64) (*ethereum.FeeHistory, error) FilterLogs(ctx context.Context, q ethereum.FilterQuery) ([]types.Log, error) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) NonceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (uint64, error) diff --git a/pkg/transaction/backendmock/backend.go b/pkg/transaction/backendmock/backend.go index ed4d1ae08b1..c939edd0ee5 100644 --- a/pkg/transaction/backendmock/backend.go +++ b/pkg/transaction/backendmock/backend.go @@ -18,19 +18,21 @@ import ( var ErrNotImplemented = errors.New("not implemented") type backendMock struct { - callContract func(ctx context.Context, call ethereum.CallMsg, blockNumber *big.Int) ([]byte, error) - sendTransaction func(ctx context.Context, tx *types.Transaction) error - suggestedFeeAndTip func(ctx context.Context, gasPrice *big.Int, boostPercent int) (*big.Int, *big.Int, error) - suggestGasTipCap func(ctx context.Context) (*big.Int, error) - estimateGas func(ctx context.Context, msg ethereum.CallMsg) (gas uint64, err error) - transactionReceipt func(ctx context.Context, txHash common.Hash) (*types.Receipt, error) - pendingNonceAt func(ctx context.Context, account common.Address) (uint64, error) - transactionByHash func(ctx context.Context, hash common.Hash) (tx *types.Transaction, isPending bool, err error) - blockNumber func(ctx context.Context) (uint64, error) - headerByNumber func(ctx context.Context, number *big.Int) (*types.Header, error) - balanceAt func(ctx context.Context, address common.Address, block *big.Int) (*big.Int, error) - nonceAt func(ctx context.Context, account common.Address, blockNumber *big.Int) (uint64, error) - codeAt func(ctx context.Context, contract common.Address, blockNumber *big.Int) ([]byte, error) + callContract func(ctx context.Context, call ethereum.CallMsg, blockNumber *big.Int) ([]byte, error) + sendTransaction func(ctx context.Context, tx *types.Transaction) error + suggestedFeeAndTip func(ctx context.Context, gasPrice *big.Int, boostPercent int) (*big.Int, *big.Int, error) + suggestedFeeAndTipsFromHistory func(ctx context.Context, lastBlock *big.Int) (*transaction.FeeHistorySuggestedFeeAndTips, error) + suggestGasTipCap func(ctx context.Context) (*big.Int, error) + estimateGas func(ctx context.Context, msg ethereum.CallMsg) (gas uint64, err error) + transactionReceipt func(ctx context.Context, txHash common.Hash) (*types.Receipt, error) + pendingNonceAt func(ctx context.Context, account common.Address) (uint64, error) + transactionByHash func(ctx context.Context, hash common.Hash) (tx *types.Transaction, isPending bool, err error) + blockNumber func(ctx context.Context) (uint64, error) + headerByNumber func(ctx context.Context, number *big.Int) (*types.Header, error) + balanceAt func(ctx context.Context, address common.Address, block *big.Int) (*big.Int, error) + nonceAt func(ctx context.Context, account common.Address, blockNumber *big.Int) (uint64, error) + codeAt func(ctx context.Context, contract common.Address, blockNumber *big.Int) ([]byte, error) + feeHistory func(ctx context.Context, blockCount uint64, lastBlock *big.Int, rewardPercentiles []float64) (*ethereum.FeeHistory, error) } func (m *backendMock) CallContract(ctx context.Context, call ethereum.CallMsg, blockNumber *big.Int) ([]byte, error) { @@ -54,6 +56,13 @@ func (m *backendMock) SuggestedFeeAndTip(ctx context.Context, gasPrice *big.Int, return nil, nil, ErrNotImplemented } +func (m *backendMock) SuggestedFeeAndTipsFromHistory(ctx context.Context, lastBlock *big.Int) (*transaction.FeeHistorySuggestedFeeAndTips, error) { + if m.suggestedFeeAndTipsFromHistory != nil { + return m.suggestedFeeAndTipsFromHistory(ctx, lastBlock) + } + return nil, ErrNotImplemented +} + func (m *backendMock) EstimateGas(ctx context.Context, msg ethereum.CallMsg) (uint64, error) { if m.estimateGas != nil { return m.estimateGas(ctx, msg) @@ -128,6 +137,13 @@ func (m *backendMock) SuggestGasTipCap(ctx context.Context) (*big.Int, error) { return nil, ErrNotImplemented } +func (m *backendMock) FeeHistory(ctx context.Context, blockCount uint64, lastBlock *big.Int, rewardPercentiles []float64) (*ethereum.FeeHistory, error) { + if m.feeHistory != nil { + return m.feeHistory(ctx, blockCount, lastBlock, rewardPercentiles) + } + return nil, ErrNotImplemented +} + func (m *backendMock) ChainID(ctx context.Context) (*big.Int, error) { return nil, ErrNotImplemented } @@ -228,3 +244,15 @@ func WithCodeAtFunc(f func(ctx context.Context, contract common.Address, blockNu s.codeAt = f }) } + +func WithFeeHistoryFunc(f func(ctx context.Context, blockCount uint64, lastBlock *big.Int, rewardPercentiles []float64) (*ethereum.FeeHistory, error)) Option { + return optionFunc(func(s *backendMock) { + s.feeHistory = f + }) +} + +func WithSuggestedFeeAndTipsFromHistoryFunc(f func(ctx context.Context, lastBlock *big.Int) (*transaction.FeeHistorySuggestedFeeAndTips, error)) Option { + return optionFunc(func(s *backendMock) { + s.suggestedFeeAndTipsFromHistory = f + }) +} diff --git a/pkg/transaction/backendnoop/backend.go b/pkg/transaction/backendnoop/backend.go index 9b1eec670bd..6dc16e3c89f 100644 --- a/pkg/transaction/backendnoop/backend.go +++ b/pkg/transaction/backendnoop/backend.go @@ -55,6 +55,10 @@ func (b *Backend) SuggestedFeeAndTip(ctx context.Context, gasPrice *big.Int, boo return nil, nil, postagecontract.ErrChainDisabled } +func (b *Backend) SuggestedFeeAndTipsFromHistory(ctx context.Context, lastBlock *big.Int) (*transaction.FeeHistorySuggestedFeeAndTips, error) { + return nil, postagecontract.ErrChainDisabled +} + func (b *Backend) SuggestGasTipCap(context.Context) (*big.Int, error) { return nil, postagecontract.ErrChainDisabled } @@ -91,6 +95,10 @@ func (b *Backend) FilterLogs(context.Context, ethereum.FilterQuery) ([]types.Log return nil, postagecontract.ErrChainDisabled } +func (b *Backend) FeeHistory(context.Context, uint64, *big.Int, []float64) (*ethereum.FeeHistory, error) { + return nil, postagecontract.ErrChainDisabled +} + func (b *Backend) ChainID(context.Context) (*big.Int, error) { return big.NewInt(b.chainID), nil } diff --git a/pkg/transaction/backendsimulation/backend.go b/pkg/transaction/backendsimulation/backend.go index 53c68d9f259..2ac86692102 100644 --- a/pkg/transaction/backendsimulation/backend.go +++ b/pkg/transaction/backendsimulation/backend.go @@ -101,6 +101,10 @@ func (m *simulatedBackend) SuggestedFeeAndTip(ctx context.Context, gasPrice *big return nil, nil, ErrNotImplemented } +func (m *simulatedBackend) SuggestedFeeAndTipsFromHistory(ctx context.Context, lastBlock *big.Int) (*transaction.FeeHistorySuggestedFeeAndTips, error) { + return nil, ErrNotImplemented +} + func (m *simulatedBackend) EstimateGas(ctx context.Context, msg ethereum.CallMsg) (uint64, error) { return 0, ErrNotImplemented } @@ -152,6 +156,10 @@ func (m *simulatedBackend) SuggestGasTipCap(ctx context.Context) (*big.Int, erro return nil, ErrNotImplemented } +func (m *simulatedBackend) FeeHistory(ctx context.Context, blockCount uint64, lastBlock *big.Int, rewardPercentiles []float64) (*ethereum.FeeHistory, error) { + return nil, ErrNotImplemented +} + func (m *simulatedBackend) ChainID(ctx context.Context) (*big.Int, error) { return nil, ErrNotImplemented } diff --git a/pkg/transaction/export_test.go b/pkg/transaction/export_test.go index 3d723b4fb19..b80ff2ff612 100644 --- a/pkg/transaction/export_test.go +++ b/pkg/transaction/export_test.go @@ -4,4 +4,41 @@ package transaction -var StoredTransactionKey = storedTransactionKey +import ( + "context" + "math/big" + + "github.com/ethersphere/bee/v2/pkg/log" +) + +var ( + StoredTransactionKey = storedTransactionKey + RetryStateKey = retryStateKey + PendingTransactionKey = pendingTransactionKey + ApplyMempoolBump = applyMempoolBump +) + +const ( + FeeTierLow = feeTierLow + FeeTierMarket = feeTierMarket + FeeTierAggressive = feeTierAggressive + + MempoolBumpPercent = mempoolBumpPercent + DefaultAttemptsPerTier = defaultAttemptsPerTier +) + +// SuggestGasFeeForTier exposes suggestGasFeeForTier for tests. +func SuggestGasFeeForTier( + backend Backend, + maxTxPrice *big.Int, + ctx context.Context, + tier int, + previousTip *big.Int, +) (gasFeeCap, gasTipCap *big.Int, err error) { + svc := &transactionService{ + logger: log.Noop, + backend: backend, + maxTxPrice: maxTxPrice, + } + return svc.suggestGasFeeForTier(ctx, feeTier(tier), previousTip) +} diff --git a/pkg/transaction/fee_tier.go b/pkg/transaction/fee_tier.go new file mode 100644 index 00000000000..b9ff3c3b8db --- /dev/null +++ b/pkg/transaction/fee_tier.go @@ -0,0 +1,70 @@ +// Copyright 2026 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package transaction + +import ( + "fmt" + "math/big" + "strings" +) + +// feeTier selects which priority-fee percentile from eth_feeHistory to use. +type feeTier int + +const ( + feeTierLow feeTier = iota + 1 + feeTierMarket + feeTierAggressive +) + +func (ft feeTier) String() string { + switch ft { + case feeTierLow: + return "low" + case feeTierMarket: + return "market" + case feeTierAggressive: + return "aggressive" + default: + return "unknown" + } +} + +// ParseFeeTier validates a fee priority tier name (for API headers and config). +func ParseFeeTier(s string) (feeTier, error) { + switch strings.ToLower(strings.TrimSpace(s)) { + case "low": + return feeTierLow, nil + case "market", "": + return feeTierMarket, nil + case "aggressive": + return feeTierAggressive, nil + default: + return 0, fmt.Errorf("unknown fee tier %q (valid: low, market, aggressive)", s) + } +} + +// tierRange returns the ordered slice of tiers from start to end inclusive. +func tierRange(start, end feeTier) []feeTier { + var tiers []feeTier + for t := start; t <= end; t++ { + tiers = append(tiers, t) + } + return tiers +} + +// tierTip selects the appropriate tip from a FeeHistorySuggestedFeeAndTips for the given tier. +func tierTip(tier feeTier, fh *FeeHistorySuggestedFeeAndTips) *big.Int { + switch tier { + case feeTierLow: + return fh.LowTip + case feeTierMarket: + return fh.MarketTip + case feeTierAggressive: + return fh.AggressiveTip + default: + return fh.MarketTip + } +} diff --git a/pkg/transaction/fee_tier_test.go b/pkg/transaction/fee_tier_test.go new file mode 100644 index 00000000000..3d528191ec1 --- /dev/null +++ b/pkg/transaction/fee_tier_test.go @@ -0,0 +1,41 @@ +// Copyright 2026 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package transaction_test + +import ( + "testing" + + "github.com/ethersphere/bee/v2/pkg/transaction" +) + +func TestParseFeeTier(t *testing.T) { + t.Parallel() + + for _, tc := range []struct { + in string + want string + err bool + }{ + {"low", "low", false}, + {" Market ", "market", false}, + {"AGGRESSIVE", "aggressive", false}, + {"", "market", false}, + {"banana", "", true}, + } { + tier, err := transaction.ParseFeeTier(tc.in) + if tc.err { + if err == nil { + t.Fatalf("ParseFeeTier(%q): want error", tc.in) + } + continue + } + if err != nil { + t.Fatalf("ParseFeeTier(%q): %v", tc.in, err) + } + if got := tier.String(); got != tc.want { + t.Fatalf("ParseFeeTier(%q) = %q, want %q", tc.in, got, tc.want) + } + } +} diff --git a/pkg/transaction/mock/transaction.go b/pkg/transaction/mock/transaction.go index 072f47cf8f2..66ad081fbe4 100644 --- a/pkg/transaction/mock/transaction.go +++ b/pkg/transaction/mock/transaction.go @@ -19,6 +19,7 @@ import ( type transactionServiceMock struct { send func(ctx context.Context, request *transaction.TxRequest, boost int) (txHash common.Hash, err error) + sendWithRetry func(ctx context.Context, request *transaction.TxRequest) (txHash common.Hash, receipt *types.Receipt, err error) waitForReceipt func(ctx context.Context, txHash common.Hash) (receipt *types.Receipt, err error) watchSentTransaction func(txHash common.Hash) (chan types.Receipt, chan error, error) call func(ctx context.Context, request *transaction.TxRequest) (result []byte, err error) @@ -29,6 +30,13 @@ type transactionServiceMock struct { transactionFee func(ctx context.Context, txHash common.Hash) (*big.Int, error) } +func (m *transactionServiceMock) SendWithRetry(ctx context.Context, request *transaction.TxRequest) (common.Hash, *types.Receipt, error) { + if m.sendWithRetry != nil { + return m.sendWithRetry(ctx, request) + } + return common.Hash{}, nil, errors.New("not implemented") +} + func (m *transactionServiceMock) Send(ctx context.Context, request *transaction.TxRequest, boostPercent int) (txHash common.Hash, err error) { if m.send != nil { return m.send(ctx, request, boostPercent) @@ -110,6 +118,12 @@ type optionFunc func(*transactionServiceMock) func (f optionFunc) apply(r *transactionServiceMock) { f(r) } +func WithSendWithRetryFunc(f func(context.Context, *transaction.TxRequest) (common.Hash, *types.Receipt, error)) Option { + return optionFunc(func(s *transactionServiceMock) { + s.sendWithRetry = f + }) +} + func WithSendFunc(f func(context.Context, *transaction.TxRequest, int) (txHash common.Hash, err error)) Option { return optionFunc(func(s *transactionServiceMock) { s.send = f diff --git a/pkg/transaction/retry_recorder.go b/pkg/transaction/retry_recorder.go new file mode 100644 index 00000000000..b212e844b70 --- /dev/null +++ b/pkg/transaction/retry_recorder.go @@ -0,0 +1,62 @@ +// Copyright 2026 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package transaction + +import ( + "context" + "errors" +) + +// RetryMetricsRecorder records SendWithRetry completion metrics. +type RetryMetricsRecorder interface { + RecordRetryComplete(broadcastAttempts int, err error) +} + +type noopRetryMetricsRecorder struct{} + +func (noopRetryMetricsRecorder) RecordRetryComplete(int, error) {} + +type retryMetricsBackend interface { + RetryMetrics() RetryMetricsRecorder +} + +func retryMetricsFromBackend(backend Backend) RetryMetricsRecorder { + if p, ok := backend.(retryMetricsBackend); ok { + return p.RetryMetrics() + } + return noopRetryMetricsRecorder{} +} + +// RetryOutcomeLabel maps a SendWithRetry terminal error to a stable Prometheus label. +func RetryOutcomeLabel(err error) string { + if err == nil { + return "success" + } + if errors.Is(err, ErrTransactionReverted) { + return "reverted" + } + if errors.Is(err, ErrSignTransaction) { + return "sign_failed" + } + if errors.Is(err, ErrTransactionCancelled) { + return "cancelled" + } + if errors.Is(err, ErrTxMaxPriceExceeded) { + return "max_price_exceeded" + } + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + return "context_canceled" + } + if errors.Is(err, ErrAllAttemptsExhausted) { + return "attempts_exhausted" + } + if containsNormalized(err.Error(), "sendtxswithretryrequiresautomaticgaspricing") { + return "manual_gas_price" + } + if isNonRetryable(err) { + return "critical" + } + return "other" +} diff --git a/pkg/transaction/send_tx_with_retry.go b/pkg/transaction/send_tx_with_retry.go new file mode 100644 index 00000000000..767d63022d9 --- /dev/null +++ b/pkg/transaction/send_tx_with_retry.go @@ -0,0 +1,529 @@ +// Copyright 2026 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package transaction + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "math/big" + "strings" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethersphere/bee/v2/pkg/sctx" +) + +const retryStatePrefix = "transaction_retry_" + +// mempoolBumpPercent is the minimum percent increase required by EIP-1559 +// mempools to accept a replacement transaction for the same nonce. +const mempoolBumpPercent = 15 + +// defaultAttemptsPerTier is the number of broadcast attempts at each fee tier +// before escalating to the next tier. +const defaultAttemptsPerTier = 2 + +func retryStateKey(nonce uint64) string { + return fmt.Sprintf("%s%020d", retryStatePrefix, nonce) +} + +// retryState holds in-memory state for a single sendWithRetry session. +type retryState struct { + nonce uint64 + nonceAssigned bool + lastTxHash common.Hash +} + +// SendWithRetry sends an EIP-1559 transaction using fee-history tiers with automatic +// escalation. Each tier gets attemptsPerTier broadcast rounds with fresh eth_feeHistory +// data. A +15% mempool bump floor is applied to ensure replacement transactions are accepted. +func (t *transactionService) SendWithRetry(ctx context.Context, request *TxRequest) (txHash common.Hash, receipt *types.Receipt, err error) { + if request.GasPrice != nil { + err = errors.New("send txs with retry requires automatic gas pricing") + t.retryMetrics.RecordRetryComplete(1, err) + return common.Hash{}, nil, err + } + return t.sendWithRetry(ctx, request, nil) +} + +// applyMempoolBump returns tip bumped by mempoolBumpPercent. +func applyMempoolBump(tip *big.Int) *big.Int { + return new(big.Int).Div( + new(big.Int).Mul(new(big.Int).Set(tip), big.NewInt(int64(100+mempoolBumpPercent))), + big.NewInt(100), + ) +} + +// suggestGasFeeForTier fetches fresh fee history, picks the tip for the given tier, +// applies the mempool bump floor relative to previousTip, and computes gasFeeCap. +func (t *transactionService) suggestGasFeeForTier(ctx context.Context, tier feeTier, previousTip *big.Int) (gasFeeCap, gasTipCap *big.Int, err error) { + header, err := t.backend.HeaderByNumber(ctx, nil) + if err != nil { + return nil, nil, err + } + if header == nil || header.BaseFee == nil { + return nil, nil, errors.New("latest block header or base fee unavailable") + } + + fh, err := t.backend.SuggestedFeeAndTipsFromHistory(ctx, nil) + if err != nil { + return nil, nil, fmt.Errorf("fee history: %w", err) + } + if fh == nil { + return nil, nil, errors.New("fee history: empty response") + } + + tip := tierTip(tier, fh) + + if previousTip != nil && previousTip.Sign() > 0 { + bumpedTip := applyMempoolBump(previousTip) + if tip.Cmp(bumpedTip) < 0 { + tip = bumpedTip + } + } + + gasFeeCap = new(big.Int).Mul(header.BaseFee, big.NewInt(2)) + gasFeeCapWithTip := new(big.Int).Add(new(big.Int).Set(gasFeeCap), tip) + + t.logger.Debug("suggest gas fees for retry", + "tier", tier.String(), + "base_fee", header.BaseFee, + "previous_tip", previousTip, + "selected_tip", tip, + "gas_fee_cap", gasFeeCapWithTip, + "max_tx_price", t.maxTxPrice) + + if t.maxTxPrice != nil && gasFeeCapWithTip.Cmp(t.maxTxPrice) > 0 { + return nil, nil, fmt.Errorf("%w: max_fee_per_gas %s exceeds limit %s", ErrTxMaxPriceExceeded, gasFeeCapWithTip, t.maxTxPrice) + } + return gasFeeCapWithTip, tip, nil +} + +func (t *transactionService) tierRangeForRequest(ctx context.Context) ([]feeTier, error) { + start := t.startTier + if override := sctx.GetFeePriority(ctx); override != "" { + parsed, err := ParseFeeTier(override) + if err != nil { + return nil, fmt.Errorf("fee priority: %w", err) + } + if parsed > t.endTier { + t.logger.Warning("fee priority exceeds configured maximum, clamping", + "requested", parsed.String(), + "maximum", t.endTier.String()) + parsed = t.endTier + } + start = parsed + } + return tierRange(start, t.endTier), nil +} + +// broadcastTx prepares, signs, and sends a transaction. +// When fixedNonce is nil a new nonce is allocated (first attempt); +// otherwise the supplied nonce is reused (replacement transaction). +func (t *transactionService) broadcastTx(ctx context.Context, request *TxRequest, fixedNonce *uint64, tier feeTier, previousTip *big.Int) (*types.Transaction, error) { + var nonce uint64 + + if fixedNonce != nil { + nonce = *fixedNonce + } else { + t.lock.Lock() + defer t.lock.Unlock() + + n, err := t.nextNonce(ctx) + if err != nil { + return nil, err + } + nonce = n + } + + gasLimit, err := t.estimateGasLimit(ctx, request) + if err != nil { + return nil, err + } + + gasFeeCap, gasTipCap, err := t.suggestGasFeeForTier(ctx, tier, previousTip) + if err != nil { + return nil, err + } + + tx := types.NewTx(&types.DynamicFeeTx{ + Nonce: nonce, + ChainID: t.chainID, + To: request.To, + Value: request.Value, + Gas: gasLimit, + GasFeeCap: gasFeeCap, + GasTipCap: gasTipCap, + Data: request.Data, + }) + + signedTx, err := t.signer.SignTx(tx, t.chainID) + if err != nil { + return nil, fmt.Errorf("%w: %w", ErrSignTransaction, err) + } + + t.logger.Info("send with retry: broadcast", + "tier", tier.String(), + "tx", signedTx.Hash(), + "nonce", nonce, + "to", addressForLog(request.To), + "gas_limit", tx.Gas(), + "gas_fee_cap", tx.GasFeeCap(), + "gas_tip_cap", tx.GasTipCap(), + "value", tx.Value(), + "data_len", len(tx.Data()), + "description", request.Description, + ) + err = t.backend.SendTransaction(ctx, signedTx) + return signedTx, err +} + +// persistReplaceTx atomically persists a new stored+pending tx, updates retry_key, and removes the previous stored+pending entries. +func (t *transactionService) persistReplaceTx(signedTx *types.Transaction, rs *retryState, description string) error { + if signedTx == nil { + return nil + } + + txHash := signedTx.Hash() + now := time.Now().Unix() + + // Safe ordering: write new → update pointer → delete old. + if err := t.store.Put(storedTransactionKey(txHash), StoredTransaction{ + To: signedTx.To(), + Data: signedTx.Data(), + GasPrice: signedTx.GasPrice(), + GasLimit: signedTx.Gas(), + GasTipCap: signedTx.GasTipCap(), + GasFeeCap: signedTx.GasFeeCap(), + Value: signedTx.Value(), + Nonce: signedTx.Nonce(), + Created: now, + Description: description, + }); err != nil { + return err + } + + if err := t.store.Put(pendingTransactionKey(txHash), struct{}{}); err != nil { + return err + } + + if !rs.nonceAssigned { + rs.nonce = signedTx.Nonce() + rs.nonceAssigned = true + } + + retryKey := retryStateKey(rs.nonce) + if err := t.store.Put(retryKey, txHash); err != nil { + return err + } + + oldHash := rs.lastTxHash + rs.lastTxHash = txHash + + if oldHash != (common.Hash{}) { + _ = t.store.Delete(pendingTransactionKey(oldHash)) + _ = t.store.Delete(storedTransactionKey(oldHash)) + } + + return nil +} + +// attemptResult is the outcome of a single broadcast+wait cycle. +type attemptResult struct { + signedTx *types.Transaction // successfully broadcast tx (nil if broadcast failed before send) + receipt *types.Receipt // non-nil when receipt was received within the wait window + err error // non-nil on any error; check isNonRetryable to decide whether to stop +} + +// sendWithRetry is the core retry loop. When resuming, pass a pre-populated retryState. +func (t *transactionService) sendWithRetry(ctx context.Context, request *TxRequest, rs *retryState) (common.Hash, *types.Receipt, error) { + if rs == nil { + rs = &retryState{} + } + + tiers, err := t.tierRangeForRequest(ctx) + if err != nil { + return common.Hash{}, nil, err + } + + t.logger.Debug("send with retry: started", + "description", request.Description, + "to", addressForLog(request.To), + "start_tier", tiers[0].String(), + "end_tier", t.endTier.String(), + "attempts_per_tier", t.attemptsPerTier, + "retry_delay", t.txRetryDelay, + "nonce_assigned", rs.nonceAssigned) + + var ( + previousTip *big.Int + terminateTxErr error + nonce *uint64 + ) + attempt := 1 + defer func() { t.finishRetry(rs, request, attempt, terminateTxErr) }() + + for _, tier := range tiers { + for k := 0; k < t.attemptsPerTier; k++ { + if rs.nonceAssigned { + nonce = &rs.nonce + } + + res := t.attempt(ctx, rs, request, previousTip, tier, nonce) + if res.err != nil && (isNonRetryable(res.err) || isNonceTooLow(res.err)) { + terminateTxErr = res.err + return common.Hash{}, nil, terminateTxErr + } + + if res.receipt != nil { + t.logger.Debug("send with retry: receipt received", + "tx_hash", rs.lastTxHash, + "status", res.receipt.Status, + "gas_used", res.receipt.GasUsed, + "block_number", res.receipt.BlockNumber, + "nonce", rs.nonce, + "description", request.Description) + + if res.receipt.Status == 0 { + terminateTxErr = ErrTransactionReverted + return rs.lastTxHash, res.receipt, terminateTxErr + } + return rs.lastTxHash, res.receipt, nil + } + + if res.signedTx != nil { + previousTip = res.signedTx.GasTipCap() + } + attempt++ + } + } + + terminateTxErr = ErrAllAttemptsExhausted + return rs.lastTxHash, nil, terminateTxErr +} + +// finishRetry performs final cleanup +func (t *transactionService) finishRetry(rs *retryState, request *TxRequest, attempt int, err error) { + if err != nil { + t.logger.Error(err, + "send with retry: finished with error", + "attempt", attempt, + "tx_hash", rs.lastTxHash, + "nonce", rs.nonce, + "to", addressForLog(request.To), + "description", request.Description) + } + + // remove the retry_key + if rs.nonceAssigned { + _ = t.store.Delete(retryStateKey(rs.nonce)) + } + + monitorLast := errors.Is(err, ErrTxMaxPriceExceeded) || errors.Is(err, ErrAllAttemptsExhausted) + if monitorLast { + // ErrTxMaxPriceExceeded and ErrAllAttemptsExhausted mean retry must be finished even without a receipt. + // If at least one broadcast succeeded, the tx may still be pending in the mempool and get mined; keep monitoring in the background. + if rs.lastTxHash != (common.Hash{}) { + t.waitForPendingTx(rs.lastTxHash) + } + } else { + if rs.lastTxHash != (common.Hash{}) { + _ = t.store.Delete(pendingTransactionKey(rs.lastTxHash)) + } + } + + t.retryMetrics.RecordRetryComplete(attempt, err) +} + +// attempt performs a single broadcast+wait cycle: broadcast, persist state, wait for receipt. +func (t *transactionService) attempt(ctx context.Context, rs *retryState, request *TxRequest, previousTip *big.Int, tier feeTier, nonce *uint64) attemptResult { + replaced := true + + signedTx, broadCastErr := t.broadcastTx(ctx, request, nonce, tier, previousTip) + if broadCastErr != nil { + switch { + case isNonceTooLow(broadCastErr): + // Between retry attempts the transaction was likely mined. + // Fetch its receipt once and stop retrying regardless of the outcome. + if rs.lastTxHash != (common.Hash{}) { + if rec, recErr := t.backend.TransactionReceipt(ctx, rs.lastTxHash); recErr == nil && rec != nil { + return attemptResult{receipt: rec, err: recErr} + } + } + return attemptResult{err: broadCastErr} + case isReplacementUnderpriced(broadCastErr): + // Base fee dropped between attempts within the same tier, + // so the bumped tip was not enough for the mempool to accept the replacement. + replaced = false + case isNonRetryable(broadCastErr): + return attemptResult{err: broadCastErr} + case signedTx == nil: + // Any other error that occurred before the SendTransaction RPC call. + replaced = false + } + } + + if replaced { + // Persist state only when the transaction was replaced (new tx hash). Otherwise keep monitoring the same pending tx. + if err := t.persistReplaceTx(signedTx, rs, request.Description); err != nil { + return attemptResult{err: fmt.Errorf("%w: %w", ErrUpdateRetryState, err)} + } + } + + if rs.lastTxHash == (common.Hash{}) { + // Rare case: no successful broadcast yet, nothing to monitor. Wait to avoid spamming attempts. + select { + case <-ctx.Done(): + return attemptResult{err: ctx.Err()} + case <-time.After(t.txRetryDelay): + return attemptResult{err: broadCastErr} + } + } + + waitCtx, cancel := context.WithTimeout(ctx, t.txRetryDelay) + rec, waitErr := t.WaitForReceipt(waitCtx, rs.lastTxHash) + cancel() + if waitErr != nil { + return attemptResult{signedTx: signedTx, err: waitErr} + } + return attemptResult{receipt: rec} +} + +func isReplacementUnderpriced(err error) bool { + return err != nil && containsNormalized(err.Error(), "replacementtransactionunderpriced") +} + +func isNonceTooLow(err error) bool { + return err != nil && containsNormalized(err.Error(), "noncetoolow") +} + +// normalizeForMatch lowercases s and strips spaces so that both +// "insufficient funds" and "InsufficientFunds" match the same needle. +func normalizeForMatch(s string) string { + return strings.ToLower(strings.ReplaceAll(s, " ", "")) +} + +func containsNormalized(haystack, needle string) bool { + return strings.Contains(normalizeForMatch(haystack), needle) +} + +func isNonRetryable(err error) bool { + if errors.Is(err, ErrTransactionReverted) || + errors.Is(err, ErrTransactionCancelled) || + errors.Is(err, ErrSignTransaction) || + errors.Is(err, ErrTxMaxPriceExceeded) || + errors.Is(err, ErrUpdateRetryState) || + errors.Is(err, context.Canceled) { + return true + } + + s := normalizeForMatch(err.Error()) + nonRetryable := []string{ + "specifiedgasprice", + "alreadycommitted", + "alreadyrevealed", + "alreadyclaimed", + "notcommitphase", + "notrevealphase", + "notclaimphase", + "commitroundover", + "commitroundnotstarted", + "phaselastblock", + "outofdepth", + "outofdepthreveal", + "outofdepthclaim", + "notstaked", + "muststake2rounds", + "noreveals", + "nocommitsreceived", + "executionreverted", + "insufficientfunds", + } + for _, sub := range nonRetryable { + if strings.Contains(s, sub) { + return true + } + } + return false +} + +// pendingRetryTransactions returns hashes managed by active retry sessions +// so that waitForAllPendingTx does not double-watch them. +func (t *transactionService) pendingRetryTransactions() (map[common.Hash]string, error) { + out := make(map[common.Hash]string) + err := t.store.Iterate(retryStatePrefix, func(key, val []byte) (stop bool, err error) { + var txHash common.Hash + if err := json.Unmarshal(val, &txHash); err != nil { + return false, err + } + out[txHash] = string(key) + return false, nil + }) + return out, err +} + +func (t *transactionService) resumeRetryTransactions() error { + entries, err := t.pendingRetryTransactions() + if err != nil { + return err + } + + confirmed, err := t.backend.NonceAt(t.ctx, t.sender, nil) + if err != nil { + t.logger.Warning("resume send with retry: failed to get confirmed nonce, resuming all", "error", err) + } + + t.logger.Debug("resume send with retry: scanning persisted retry states", "count", len(entries), "confirmed_nonce", confirmed) + + for txHash, key := range entries { + stored, sErr := t.StoredTransaction(txHash) + if sErr != nil { + t.logger.Warning("resume send with retry: stored tx not found, cleaning up", "tx_hash", txHash, "error", sErr) + _ = t.store.Delete(key) + continue + } + + if confirmed > stored.Nonce { + t.logger.Debug("resume send with retry: skipping already confirmed transaction", "nonce", stored.Nonce) + _ = t.store.Delete(key) + _ = t.store.Delete(pendingTransactionKey(txHash)) + _ = t.store.Delete(storedTransactionKey(txHash)) + continue + } + + t.logger.Debug("resume send with retry: resuming", "nonce", stored.Nonce, "tx_hash", txHash) + + request := &TxRequest{ + To: stored.To, + Data: stored.Data, + GasLimit: stored.GasLimit, + Value: stored.Value, + Description: stored.Description, + } + + rs := &retryState{ + nonce: stored.Nonce, + nonceAssigned: true, + lastTxHash: txHash, + } + + t.wg.Go(func() { + if _, _, err := t.sendWithRetry(t.ctx, request, rs); err != nil { + t.logger.Error(err, "resumed transaction send with finished with error", "nonce", stored.Nonce) + } + }) + } + return nil +} + +func addressForLog(addr *common.Address) string { + if addr == nil { + return "" + } + return addr.Hex() +} diff --git a/pkg/transaction/send_tx_with_retry_test.go b/pkg/transaction/send_tx_with_retry_test.go new file mode 100644 index 00000000000..008d8617e53 --- /dev/null +++ b/pkg/transaction/send_tx_with_retry_test.go @@ -0,0 +1,1142 @@ +// Copyright 2026 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package transaction_test + +import ( + "context" + "errors" + "math/big" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + signermock "github.com/ethersphere/bee/v2/pkg/crypto/mock" + "github.com/ethersphere/bee/v2/pkg/log" + "github.com/ethersphere/bee/v2/pkg/sctx" + storemock "github.com/ethersphere/bee/v2/pkg/statestore/mock" + "github.com/ethersphere/bee/v2/pkg/storage" + "github.com/ethersphere/bee/v2/pkg/transaction" + "github.com/ethersphere/bee/v2/pkg/transaction/backendmock" + "github.com/ethersphere/bee/v2/pkg/transaction/monitormock" + "github.com/ethersphere/bee/v2/pkg/util/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestSuggestGasFeeForTier(t *testing.T) { + t.Parallel() + + const ( + baseFee = int64(1000) + tipBase = int64(100) + marketTip = int64(200) // tipBase * 2 + prevTip = int64(1000) + escalatedTip = int64(1150) // prevTip * 1.15 + baseFeeCap = int64(2000) // baseFee * 2 + ) + + headerOption := func() backendmock.Option { + return backendmock.WithHeaderbyNumberFunc(func(ctx context.Context, number *big.Int) (*types.Header, error) { + return &types.Header{BaseFee: big.NewInt(baseFee)}, nil + }) + } + + feeHistoryOption := func(called *atomic.Int32) backendmock.Option { + return backendmock.WithSuggestedFeeAndTipsFromHistoryFunc(func(ctx context.Context, lastBlock *big.Int) (*transaction.FeeHistorySuggestedFeeAndTips, error) { + if called != nil { + called.Add(1) + } + return &transaction.FeeHistorySuggestedFeeAndTips{ + LowTip: big.NewInt(tipBase), + MarketTip: big.NewInt(marketTip), + AggressiveTip: big.NewInt(tipBase * 3), + }, nil + }) + } + + t.Run("previous tip nil uses market tip from fee history", func(t *testing.T) { + t.Parallel() + + var feeHistoryCalls atomic.Int32 + backend := backendmock.New(headerOption(), feeHistoryOption(&feeHistoryCalls)) + + gasFeeCap, gasTipCap, err := transaction.SuggestGasFeeForTier( + backend, nil, context.Background(), int(transaction.FeeTierMarket), nil, + ) + + require.NoError(t, err) + assert.Equal(t, int32(1), feeHistoryCalls.Load()) + assert.Equal(t, marketTip, gasTipCap.Int64()) + assert.Equal(t, baseFeeCap+marketTip, gasFeeCap.Int64()) + }) + + t.Run("escalates previous tip by configured percent", func(t *testing.T) { + t.Parallel() + + var feeHistoryCalls atomic.Int32 + backend := backendmock.New(headerOption(), feeHistoryOption(&feeHistoryCalls)) + + gasFeeCap, gasTipCap, err := transaction.SuggestGasFeeForTier( + backend, nil, context.Background(), int(transaction.FeeTierMarket), big.NewInt(prevTip), + ) + + require.NoError(t, err) + assert.Equal(t, int32(1), feeHistoryCalls.Load(), "fee history is always queried") + assert.Equal(t, escalatedTip, gasTipCap.Int64()) + assert.Equal(t, baseFeeCap+escalatedTip, gasFeeCap.Int64()) + }) + + t.Run("max tx price exceeded returns error", func(t *testing.T) { + t.Parallel() + + // escalated: 2000+1150=3150 + maxTxPrice := big.NewInt(baseFeeCap + prevTip + 100) + + backend := backendmock.New(headerOption(), feeHistoryOption(nil)) + + gasFeeCap, gasTipCap, err := transaction.SuggestGasFeeForTier( + backend, maxTxPrice, context.Background(), int(transaction.FeeTierMarket), big.NewInt(prevTip), + ) + + assert.ErrorIs(t, err, transaction.ErrTxMaxPriceExceeded) + assert.Nil(t, gasFeeCap) + assert.Nil(t, gasTipCap) + }) +} + +// capturedBroadcast records the parameters of a transaction as seen by SendTransaction. +type capturedBroadcast struct { + Hash common.Hash + Nonce uint64 + GasTipCap *big.Int + GasFeeCap *big.Int + GasLimit uint64 + To *common.Address + Data []byte + Value *big.Int +} + +func captureTx(tx *types.Transaction) capturedBroadcast { + return capturedBroadcast{ + Hash: tx.Hash(), + Nonce: tx.Nonce(), + GasTipCap: new(big.Int).Set(tx.GasTipCap()), + GasFeeCap: new(big.Int).Set(tx.GasFeeCap()), + GasLimit: tx.Gas(), + To: tx.To(), + Data: tx.Data(), + Value: new(big.Int).Set(tx.Value()), + } +} + +// assertTxDataUnchanged verifies that nonce, to, data, value, and gas limit +// are identical across all broadcast attempts (only fees should change). +func assertTxDataUnchanged(t *testing.T, broadcasts []capturedBroadcast) { + t.Helper() + for i := 1; i < len(broadcasts); i++ { + assert.Equal(t, broadcasts[0].Nonce, broadcasts[i].Nonce, + "attempt %d: nonce must not change across retries", i) + assert.Equal(t, broadcasts[0].To, broadcasts[i].To, + "attempt %d: To must not change across retries", i) + assert.Equal(t, broadcasts[0].Data, broadcasts[i].Data, + "attempt %d: Data must not change across retries", i) + assert.True(t, broadcasts[0].Value.Cmp(broadcasts[i].Value) == 0, + "attempt %d: Value must not change across retries (got %s, want %s)", i, broadcasts[i].Value, broadcasts[0].Value) + assert.Equal(t, broadcasts[0].GasLimit, broadcasts[i].GasLimit, + "attempt %d: GasLimit must not change across retries", i) + } +} + +// retryTestSetup holds shared constants and helpers for SendWithRetry tests. +type retryTestSetup struct { + sender common.Address + recipient common.Address + chainID *big.Int + nonce uint64 + txData []byte + value *big.Int + tipBase *big.Int // base value for fee tiers: LowTip=tipBase, MarketTip=tipBase*2, AggressiveTip=tipBase*3 + baseFee *big.Int + gasLimit uint64 +} + +func newRetryTestSetup() retryTestSetup { + return retryTestSetup{ + sender: common.HexToAddress("0xddff"), + recipient: common.HexToAddress("0xabcd"), + chainID: big.NewInt(5), + nonce: uint64(2), + txData: common.Hex2Bytes("abcdee"), + value: big.NewInt(1), + tipBase: big.NewInt(100), + baseFee: big.NewInt(1000), + gasLimit: uint64(50000), + } +} + +func (s retryTestSetup) expectedMarketTip() *big.Int { + return new(big.Int).Mul(s.tipBase, big.NewInt(2)) +} + +func (s retryTestSetup) expectedGasFeeCap(tip *big.Int) *big.Int { + return new(big.Int).Add(new(big.Int).Mul(s.baseFee, big.NewInt(2)), tip) +} + +func (s retryTestSetup) retryConfig() transaction.TransactionsRetryConfig { + return transaction.TransactionsRetryConfig{ + AttemptsPerTier: 3, + StartTier: "market", + EndTier: "market", + RetryDelay: 50 * time.Millisecond, + MaxTxPrice: big.NewInt(100_000_000), + } +} + +func (s retryTestSetup) request() *transaction.TxRequest { + return &transaction.TxRequest{ + To: &s.recipient, + Data: s.txData, + Value: s.value, + GasLimit: s.gasLimit, + } +} + +func (s retryTestSetup) passThroughSigner() signermock.Option { + return signermock.WithSignTxFunc(func(tx *types.Transaction, chainID *big.Int) (*types.Transaction, error) { + return tx, nil + }) +} + +func (s retryTestSetup) signerAddr() signermock.Option { + return signermock.WithEthereumAddressFunc(func() (common.Address, error) { + return s.sender, nil + }) +} + +func (s retryTestSetup) feeHistoryOption(counter *atomic.Int32) backendmock.Option { + return backendmock.WithSuggestedFeeAndTipsFromHistoryFunc(func(ctx context.Context, lastBlock *big.Int) (*transaction.FeeHistorySuggestedFeeAndTips, error) { + if counter != nil { + counter.Add(1) + } + return &transaction.FeeHistorySuggestedFeeAndTips{ + LowTip: new(big.Int).Set(s.tipBase), + MarketTip: new(big.Int).Mul(s.tipBase, big.NewInt(2)), + AggressiveTip: new(big.Int).Mul(s.tipBase, big.NewInt(3)), + }, nil + }) +} + +func (s retryTestSetup) headerOption() backendmock.Option { + return backendmock.WithHeaderbyNumberFunc(func(ctx context.Context, number *big.Int) (*types.Header, error) { + return &types.Header{BaseFee: new(big.Int).Set(s.baseFee)}, nil + }) +} + +func (s retryTestSetup) nonceOption() backendmock.Option { + var counter atomic.Uint64 + counter.Store(s.nonce) + return backendmock.WithPendingNonceAtFunc(func(ctx context.Context, account common.Address) (uint64, error) { + return counter.Add(1) - 1, nil + }) +} + +func (s retryTestSetup) estimateGasOption() backendmock.Option { + return backendmock.WithEstimateGasFunc(func(ctx context.Context, msg ethereum.CallMsg) (uint64, error) { + return s.gasLimit, nil + }) +} + +// receiptWatchTimeout returns a monitor option that never returns a receipt (for testing timeout). +func receiptWatchTimeout() monitormock.Option { + return monitormock.WithWatchTransactionFunc(func(txHash common.Hash, nonce uint64) (<-chan types.Receipt, <-chan error, error) { + return make(chan types.Receipt), make(chan error), nil + }) +} + +// receiptWatchErr returns a monitor option that returns an error on the error channel. +func receiptWatchErr(err error) monitormock.Option { + return monitormock.WithWatchTransactionFunc(func(txHash common.Hash, nonce uint64) (<-chan types.Receipt, <-chan error, error) { + ch := make(chan error, 1) + ch <- err + return nil, ch, nil + }) +} + +// Broadcast returns critical error → immediate exit, verify tx was built correctly. +func TestSendWithRetry_BroadcastCriticalError(t *testing.T) { + t.Parallel() + s := newRetryTestSetup() + store := storemock.NewStateStore() + testutil.CleanupCloser(t, store) + + var feeHistoryCalls atomic.Int32 + var broadcasts []capturedBroadcast + + svc, err := transaction.NewService(log.Noop, s.sender, + backendmock.New( + s.nonceOption(), + s.feeHistoryOption(&feeHistoryCalls), + s.headerOption(), + s.estimateGasOption(), + backendmock.WithSendTransactionFunc(func(ctx context.Context, tx *types.Transaction) error { + broadcasts = append(broadcasts, captureTx(tx)) + return errors.New("execution reverted") + }), + ), + signermock.New(s.passThroughSigner(), s.signerAddr()), + store, + s.chainID, + monitormock.New(), + 0, + s.retryConfig(), + ) + require.NoError(t, err) + testutil.CleanupCloser(t, svc) + + txHash, receipt, err := svc.SendWithRetry(context.Background(), s.request()) + + assert.Error(t, err) + assert.Contains(t, err.Error(), "execution reverted") + assert.Equal(t, common.Hash{}, txHash) + assert.Nil(t, receipt) + + assert.Equal(t, int32(1), feeHistoryCalls.Load(), "fee history must be called once") + + require.Len(t, broadcasts, 1, "exactly one broadcast before critical error") + marketTip := s.expectedMarketTip() + assert.Equal(t, marketTip.Int64(), broadcasts[0].GasTipCap.Int64(), + "initial tip must be MarketTip") + assert.Equal(t, s.expectedGasFeeCap(marketTip).Int64(), broadcasts[0].GasFeeCap.Int64(), + "gasFeeCap must be baseFee*2 + MarketTip") + assert.Equal(t, s.recipient, *broadcasts[0].To) + assert.Equal(t, s.txData, broadcasts[0].Data) + assert.Equal(t, s.value.Int64(), broadcasts[0].Value.Int64()) + assert.Equal(t, s.gasLimit, broadcasts[0].GasLimit) + + var v string + assert.ErrorIs(t, store.Get(transaction.RetryStateKey(s.nonce), &v), storage.ErrNotFound, + "retry state should be cleaned up after critical error") +} + +// WaitForReceipt returns critical error → immediate exit, verify tx params. +func TestSendWithRetry_WaitForReceiptCriticalError(t *testing.T) { + t.Parallel() + s := newRetryTestSetup() + store := storemock.NewStateStore() + testutil.CleanupCloser(t, store) + + var feeHistoryCalls atomic.Int32 + var broadcasts []capturedBroadcast + + svc, err := transaction.NewService(log.Noop, s.sender, + backendmock.New( + s.nonceOption(), + s.feeHistoryOption(&feeHistoryCalls), + s.headerOption(), + s.estimateGasOption(), + backendmock.WithSendTransactionFunc(func(ctx context.Context, tx *types.Transaction) error { + broadcasts = append(broadcasts, captureTx(tx)) + return nil + }), + ), + signermock.New(s.passThroughSigner(), s.signerAddr()), + store, + s.chainID, + monitormock.New(receiptWatchErr(transaction.ErrTransactionCancelled)), + 0, + s.retryConfig(), + ) + require.NoError(t, err) + testutil.CleanupCloser(t, svc) + + _, _, err = svc.SendWithRetry(context.Background(), s.request()) + assert.Error(t, err) + + assert.Equal(t, int32(1), feeHistoryCalls.Load(), + "fee history called once: tip was set after first broadcast, no more calls needed") + + require.Len(t, broadcasts, 1, "exactly one broadcast before critical wait error") + marketTip := s.expectedMarketTip() + assert.Equal(t, marketTip.Int64(), broadcasts[0].GasTipCap.Int64(), + "initial tip must be MarketTip") + assert.Equal(t, s.expectedGasFeeCap(marketTip).Int64(), broadcasts[0].GasFeeCap.Int64(), + "gasFeeCap must be baseFee*2 + MarketTip") + + var v string + assert.ErrorIs(t, store.Get(transaction.RetryStateKey(s.nonce), &v), storage.ErrNotFound, + "retry state should be cleaned up after critical WaitForReceipt error") +} + +// updateStates returns any error → immediate exit. +func TestSendWithRetry_UpdateStateError(t *testing.T) { + t.Parallel() + s := newRetryTestSetup() + + putErr := errors.New("disk write failed") + callCount := 0 + failingStore := &failOnNthPutStore{ + StateStorer: storemock.NewStateStore(), + failOnPut: 1, + putErr: putErr, + callCount: &callCount, + } + + svc, err := transaction.NewService(log.Noop, s.sender, + backendmock.New( + s.nonceOption(), + s.feeHistoryOption(nil), + s.headerOption(), + s.estimateGasOption(), + backendmock.WithSendTransactionFunc(func(ctx context.Context, tx *types.Transaction) error { + return nil + }), + ), + signermock.New(s.passThroughSigner(), s.signerAddr()), + failingStore, + s.chainID, + monitormock.New(), + 0, + s.retryConfig(), + ) + require.NoError(t, err) + testutil.CleanupCloser(t, svc) + + _, _, err = svc.SendWithRetry(context.Background(), s.request()) + assert.ErrorIs(t, err, putErr) +} + +// First broadcast fails (non-critical, signedTx nil because prepare fails), second succeeds. +// On the first attempt HeaderByNumber fails → prepareTransactionWithRetry fails → broadcastTxWithRetry +// returns (nil, err) with a non-critical error. +// UpdateStates receives nil signedTx → state is not updated, only number of attempt increased in-memory +// After sendWithretry delay, second broadcast attempt succeeds → receipt → exit. +func TestSendWithRetry_NonCriticalThenSuccess(t *testing.T) { + t.Parallel() + s := newRetryTestSetup() + store := storemock.NewStateStore() + testutil.CleanupCloser(t, store) + + var headerCalls atomic.Int32 + var feeHistoryCalls atomic.Int32 + var broadcasts []capturedBroadcast + + svc, err := transaction.NewService(log.Noop, s.sender, + backendmock.New( + s.nonceOption(), + s.feeHistoryOption(&feeHistoryCalls), + s.estimateGasOption(), + backendmock.WithHeaderbyNumberFunc(func(ctx context.Context, number *big.Int) (*types.Header, error) { + n := headerCalls.Add(1) + if n == 1 { + // non-critical error + return nil, errors.New("temporary RPC error") + } + return &types.Header{BaseFee: new(big.Int).Set(s.baseFee)}, nil + }), + backendmock.WithSendTransactionFunc(func(ctx context.Context, tx *types.Transaction) error { + broadcasts = append(broadcasts, captureTx(tx)) + return nil + }), + ), + signermock.New(s.passThroughSigner(), s.signerAddr()), + store, + s.chainID, + monitormock.New( + monitormock.WithWatchTransactionFunc(func(txHash common.Hash, nonce uint64) (<-chan types.Receipt, <-chan error, error) { + ch := make(chan types.Receipt, 1) + ch <- types.Receipt{TxHash: txHash, Status: 1} + return ch, nil, nil + }), + ), + 0, + s.retryConfig(), + ) + require.NoError(t, err) + testutil.CleanupCloser(t, svc) + + txHash, receipt, err := svc.SendWithRetry(context.Background(), s.request()) + + require.NoError(t, err) + require.NotNil(t, receipt) + assert.NotEqual(t, common.Hash{}, txHash) + assert.Equal(t, uint64(1), receipt.Status) + + assert.GreaterOrEqual(t, int(headerCalls.Load()), 2, "should have retried after non-critical error") + assert.Equal(t, int32(1), feeHistoryCalls.Load(), + "fee history called once: first attempt failed at HeaderByNumber before reaching fee history") + + require.Len(t, broadcasts, 1, "only one successful broadcast (first attempt failed before SendTransaction)") + + marketTip := s.expectedMarketTip() + assert.Equal(t, marketTip.Int64(), broadcasts[0].GasTipCap.Int64(), + "tip must be MarketTip (no previous tip was set since first attempt failed)") + assert.Equal(t, s.expectedGasFeeCap(marketTip).Int64(), broadcasts[0].GasFeeCap.Int64(), + "gasFeeCap must be baseFee*2 + MarketTip") + assert.Equal(t, s.recipient, *broadcasts[0].To) + assert.Equal(t, s.txData, broadcasts[0].Data) + + var v string + assert.ErrorIs(t, store.Get(transaction.RetryStateKey(broadcasts[0].Nonce), &v), storage.ErrNotFound, + "retry state should be cleaned up on success") + assert.ErrorIs(t, store.Get(transaction.PendingTransactionKey(txHash), &struct{}{}), storage.ErrNotFound, + "pending tx entry should be cleaned up on success") +} + +// First broadcast OK, receipt not found (timeout), second broadcast with escalated gas → receipt found. +// Verifies exact fee values, nonce immutability, and tx data immutability. +func TestSendWithRetry_EscalateGasThenSuccess(t *testing.T) { + t.Parallel() + s := newRetryTestSetup() + store := storemock.NewStateStore() + testutil.CleanupCloser(t, store) + + var broadcastCount atomic.Int32 + var feeHistoryCalls atomic.Int32 + var broadcasts []capturedBroadcast + + svc, err := transaction.NewService(log.Noop, s.sender, + backendmock.New( + s.nonceOption(), + s.feeHistoryOption(&feeHistoryCalls), + s.headerOption(), + s.estimateGasOption(), + backendmock.WithSendTransactionFunc(func(ctx context.Context, tx *types.Transaction) error { + broadcastCount.Add(1) + broadcasts = append(broadcasts, captureTx(tx)) + return nil + }), + ), + signermock.New(s.passThroughSigner(), s.signerAddr()), + store, + s.chainID, + monitormock.New( + monitormock.WithWatchTransactionFunc(func(txHash common.Hash, nonce uint64) (<-chan types.Receipt, <-chan error, error) { + if broadcastCount.Load() <= 1 { + return make(chan types.Receipt), make(chan error), nil + } + ch := make(chan types.Receipt, 1) + ch <- types.Receipt{TxHash: txHash, Status: 1} + return ch, nil, nil + }), + ), + 0, + s.retryConfig(), + ) + require.NoError(t, err) + testutil.CleanupCloser(t, svc) + + txHash, receipt, err := svc.SendWithRetry(context.Background(), s.request()) + + require.NoError(t, err) + assert.NotEqual(t, common.Hash{}, txHash) + require.NotNil(t, receipt) + assert.Equal(t, uint64(1), receipt.Status) + + require.Len(t, broadcasts, 2, "should have exactly 2 broadcast attempts") + + assertTxDataUnchanged(t, broadcasts) + + marketTip := s.expectedMarketTip() + assert.Equal(t, marketTip.Int64(), broadcasts[0].GasTipCap.Int64(), + "first attempt must use MarketTip from fee history") + assert.Equal(t, s.expectedGasFeeCap(marketTip).Int64(), broadcasts[0].GasFeeCap.Int64(), + "first attempt gasFeeCap = baseFee*2 + MarketTip") + + escalatedTip := transaction.ApplyMempoolBump(marketTip) + assert.Equal(t, escalatedTip.Int64(), broadcasts[1].GasTipCap.Int64(), + "second attempt must use escalated tip (MarketTip * 1.15)") + assert.Equal(t, s.expectedGasFeeCap(escalatedTip).Int64(), broadcasts[1].GasFeeCap.Int64(), + "second attempt gasFeeCap = baseFee*2 + escalated tip") + + assert.Equal(t, int32(2), feeHistoryCalls.Load(), + "fee history is called for each attempt") + + var v string + assert.ErrorIs(t, store.Get(transaction.RetryStateKey(broadcasts[0].Nonce), &v), storage.ErrNotFound, + "retry state should be cleaned up on success") + + firstTxHash := broadcasts[0].Hash + lastTxHash := broadcasts[len(broadcasts)-1].Hash + assert.ErrorIs(t, store.Get(transaction.StoredTransactionKey(firstTxHash), &struct{}{}), storage.ErrNotFound, + "superseded stored tx should be removed on success") + var stored transaction.StoredTransaction + assert.NoError(t, store.Get(transaction.StoredTransactionKey(lastTxHash), &stored), + "final stored tx should be kept") +} + +// After receipt timeout at market tier, fees escalate to the aggressive tier on the next broadcast. +func TestSendWithRetry_TierEscalation(t *testing.T) { + t.Parallel() + s := newRetryTestSetup() + store := storemock.NewStateStore() + testutil.CleanupCloser(t, store) + + var ( + broadcasts []capturedBroadcast + watchCount atomic.Int32 + ) + + cfg := s.retryConfig() + cfg.AttemptsPerTier = 1 + cfg.StartTier = "market" + cfg.EndTier = "aggressive" + + svc, err := transaction.NewService(log.Noop, s.sender, + backendmock.New( + s.nonceOption(), + s.feeHistoryOption(nil), + s.headerOption(), + s.estimateGasOption(), + backendmock.WithSendTransactionFunc(func(ctx context.Context, tx *types.Transaction) error { + broadcasts = append(broadcasts, captureTx(tx)) + return nil + }), + ), + signermock.New(s.passThroughSigner(), s.signerAddr()), + store, + s.chainID, + monitormock.New( + monitormock.WithWatchTransactionFunc(func(txHash common.Hash, nonce uint64) (<-chan types.Receipt, <-chan error, error) { + if watchCount.Add(1) == 1 { + return make(chan types.Receipt), make(chan error), nil + } + ch := make(chan types.Receipt, 1) + ch <- types.Receipt{TxHash: txHash, Status: 1} + return ch, nil, nil + }), + ), + 0, + cfg, + ) + require.NoError(t, err) + testutil.CleanupCloser(t, svc) + + txHash, receipt, err := svc.SendWithRetry(context.Background(), s.request()) + + require.NoError(t, err) + require.NotNil(t, receipt) + assert.NotEqual(t, common.Hash{}, txHash) + + require.Len(t, broadcasts, 2, "market then aggressive tier, one attempt each") + + marketTip := s.expectedMarketTip() + aggressiveTip := new(big.Int).Mul(s.tipBase, big.NewInt(3)) + assert.Equal(t, marketTip.Int64(), broadcasts[0].GasTipCap.Int64(), + "first broadcast must use market tier tip") + assert.Equal(t, aggressiveTip.Int64(), broadcasts[1].GasTipCap.Int64(), + "second broadcast must use aggressive tier tip") + assert.Equal(t, s.expectedGasFeeCap(marketTip).Int64(), broadcasts[0].GasFeeCap.Int64()) + assert.Equal(t, s.expectedGasFeeCap(aggressiveTip).Int64(), broadcasts[1].GasFeeCap.Int64()) +} + +// Underpriced replacement keeps watching the pending tx hash instead of switching to the rejected one. +func TestSendWithRetry_UnderpricedKeepsPendingTxHash(t *testing.T) { + t.Parallel() + s := newRetryTestSetup() + store := storemock.NewStateStore() + testutil.CleanupCloser(t, store) + + var ( + broadcastCount atomic.Int32 + watchCount atomic.Int32 + firstTxHash common.Hash + ) + + svc, err := transaction.NewService(log.Noop, s.sender, + backendmock.New( + s.nonceOption(), + s.feeHistoryOption(nil), + s.headerOption(), + s.estimateGasOption(), + backendmock.WithSendTransactionFunc(func(ctx context.Context, tx *types.Transaction) error { + if broadcastCount.Add(1) == 1 { + firstTxHash = tx.Hash() + return nil + } + return errors.New("replacement transaction underpriced") + }), + ), + signermock.New(s.passThroughSigner(), s.signerAddr()), + store, + s.chainID, + monitormock.New( + monitormock.WithWatchTransactionFunc(func(txHash common.Hash, nonce uint64) (<-chan types.Receipt, <-chan error, error) { + switch watchCount.Add(1) { + case 1: + assert.Equal(t, firstTxHash, txHash, "first wait must watch the accepted broadcast") + return make(chan types.Receipt), make(chan error), nil + default: + assert.Equal(t, firstTxHash, txHash, "after underpriced must keep watching the pending tx") + ch := make(chan types.Receipt, 1) + ch <- types.Receipt{TxHash: txHash, Status: 1} + return ch, nil, nil + } + }), + ), + 0, + s.retryConfig(), + ) + require.NoError(t, err) + testutil.CleanupCloser(t, svc) + + txHash, receipt, err := svc.SendWithRetry(context.Background(), s.request()) + + require.NoError(t, err) + require.NotNil(t, receipt) + assert.Equal(t, firstTxHash, txHash, "must return receipt for the original pending tx") + assert.Equal(t, int32(2), broadcastCount.Load(), "second broadcast should still be attempted") + assert.Equal(t, int32(2), watchCount.Load(), "must wait for receipt again after underpriced broadcast") +} + +// All attempts exhausted, receipt never found → error. +// Verifies compound escalation chain, nonce immutability, and gasFeeCap on every attempt. +func TestSendWithRetry_AllAttemptsExhausted(t *testing.T) { + t.Parallel() + s := newRetryTestSetup() + store := storemock.NewStateStore() + testutil.CleanupCloser(t, store) + + var ( + feeHistoryCalls atomic.Int32 + broadcasts []capturedBroadcast + ) + + svc, err := transaction.NewService(log.Noop, s.sender, + backendmock.New( + s.nonceOption(), + s.feeHistoryOption(&feeHistoryCalls), + s.headerOption(), + s.estimateGasOption(), + backendmock.WithSendTransactionFunc(func(ctx context.Context, tx *types.Transaction) error { + broadcasts = append(broadcasts, captureTx(tx)) + return nil + }), + ), + signermock.New(s.passThroughSigner(), s.signerAddr()), + store, + s.chainID, + monitormock.New(receiptWatchTimeout()), + 0, + s.retryConfig(), + ) + require.NoError(t, err) + testutil.CleanupCloser(t, svc) + + txHash, receipt, err := svc.SendWithRetry(context.Background(), s.request()) + + assert.ErrorIs(t, err, transaction.ErrAllAttemptsExhausted) + assert.NotEqual(t, common.Hash{}, txHash, "should return last tx hash even on exhaustion") + assert.Nil(t, receipt) + + require.Len(t, broadcasts, 3, "should have made exactly maxRetries attempts") + + assertTxDataUnchanged(t, broadcasts) + + tip0 := s.expectedMarketTip() // tipBase*2 = 200 + tip1 := transaction.ApplyMempoolBump(tip0) // 230 + tip2 := transaction.ApplyMempoolBump(tip1) // 264 + expectedTips := []*big.Int{tip0, tip1, tip2} + + for i, expectedTip := range expectedTips { + assert.Equal(t, expectedTip.Int64(), broadcasts[i].GasTipCap.Int64(), + "attempt %d: tip must match compound escalation chain", i) + assert.Equal(t, s.expectedGasFeeCap(expectedTip).Int64(), broadcasts[i].GasFeeCap.Int64(), + "attempt %d: gasFeeCap must be baseFee*2 + tip", i) + } + + assert.Equal(t, int32(3), feeHistoryCalls.Load(), + "fee history is called for each attempt") + + lastTxHash := broadcasts[len(broadcasts)-1].Hash + assert.Equal(t, lastTxHash, txHash, "last tx hash must match the final broadcast") + + var v string + assert.ErrorIs(t, store.Get(transaction.RetryStateKey(broadcasts[0].Nonce), &v), storage.ErrNotFound, + "retry state should be cleaned up after exhaustion") + + var pending struct{} + assert.NoError(t, store.Get(transaction.PendingTransactionKey(lastTxHash), &pending), + "last pending tx should be kept for monitoring") + for _, oldHash := range broadcasts[:len(broadcasts)-1] { + assert.ErrorIs(t, store.Get(transaction.PendingTransactionKey(oldHash.Hash), &pending), storage.ErrNotFound, + "superseded pending tx should be removed") + assert.ErrorIs(t, store.Get(transaction.StoredTransactionKey(oldHash.Hash), &struct{}{}), storage.ErrNotFound, + "superseded stored tx should be removed") + } + var stored transaction.StoredTransaction + assert.NoError(t, store.Get(transaction.StoredTransactionKey(lastTxHash), &stored), + "last stored tx should be kept for resend/cancel") +} + +// "nonce too low" on a rebroadcast means the nonce was consumed between the +// last receipt check and this broadcast: the previously broadcast tx was most +// likely mined. The service reads its receipt exactly once and stops retrying. +func TestSendWithRetry_NonceTooLow(t *testing.T) { + t.Parallel() + + newSvc := func(t *testing.T, store storage.StateStorer, firstTxHash *common.Hash, broadcastCount, receiptCalls *atomic.Int32, receiptFn func(common.Hash) (*types.Receipt, error)) transaction.Service { + t.Helper() + s := newRetryTestSetup() + + svc, err := transaction.NewService(log.Noop, s.sender, + backendmock.New( + s.nonceOption(), + s.feeHistoryOption(nil), + s.headerOption(), + s.estimateGasOption(), + backendmock.WithSendTransactionFunc(func(ctx context.Context, tx *types.Transaction) error { + if broadcastCount.Add(1) == 1 { + *firstTxHash = tx.Hash() + return nil + } + return errors.New("nonce too low") + }), + backendmock.WithTransactionReceiptFunc(func(ctx context.Context, txHash common.Hash) (*types.Receipt, error) { + receiptCalls.Add(1) + assert.Equal(t, *firstTxHash, txHash, "must read receipt of the previously broadcast tx") + return receiptFn(txHash) + }), + ), + signermock.New(s.passThroughSigner(), s.signerAddr()), + store, + s.chainID, + monitormock.New(receiptWatchTimeout()), + 0, + s.retryConfig(), + ) + require.NoError(t, err) + testutil.CleanupCloser(t, svc) + return svc + } + + t.Run("receipt found stops sendWithRetry and returns it", func(t *testing.T) { + t.Parallel() + s := newRetryTestSetup() + store := storemock.NewStateStore() + testutil.CleanupCloser(t, store) + + var ( + firstTxHash common.Hash + broadcastCount atomic.Int32 + receiptCalls atomic.Int32 + ) + + svc := newSvc(t, store, &firstTxHash, &broadcastCount, &receiptCalls, + func(txHash common.Hash) (*types.Receipt, error) { + return &types.Receipt{TxHash: txHash, Status: 1}, nil + }) + + txHash, receipt, err := svc.SendWithRetry(context.Background(), s.request()) + + require.NoError(t, err) + require.NotNil(t, receipt) + assert.Equal(t, firstTxHash, txHash, "must return the mined tx hash") + assert.Equal(t, uint64(1), receipt.Status) + assert.Equal(t, int32(2), broadcastCount.Load(), "exactly one rebroadcast, no further retries after nonce too low") + assert.Equal(t, int32(1), receiptCalls.Load(), "receipt must be read exactly once") + + var v string + assert.ErrorIs(t, store.Get(transaction.RetryStateKey(s.nonce), &v), storage.ErrNotFound, + "retry state should be cleaned up after success") + }) + + t.Run("receipt not found stops sendWithRetry and returns error", func(t *testing.T) { + t.Parallel() + s := newRetryTestSetup() + store := storemock.NewStateStore() + testutil.CleanupCloser(t, store) + + var ( + firstTxHash common.Hash + broadcastCount atomic.Int32 + receiptCalls atomic.Int32 + ) + + svc := newSvc(t, store, &firstTxHash, &broadcastCount, &receiptCalls, + func(common.Hash) (*types.Receipt, error) { + return nil, ethereum.NotFound + }) + + txHash, receipt, err := svc.SendWithRetry(context.Background(), s.request()) + + assert.Error(t, err) + assert.Contains(t, err.Error(), "nonce too low") + assert.Equal(t, common.Hash{}, txHash) + assert.Nil(t, receipt) + assert.Equal(t, int32(2), broadcastCount.Load(), "exactly one rebroadcast, no further retries after nonce too low") + assert.Equal(t, int32(1), receiptCalls.Load(), "receipt must be read exactly once even when not found") + + var v string + assert.ErrorIs(t, store.Get(transaction.RetryStateKey(s.nonce), &v), storage.ErrNotFound, + "retry state should be cleaned up after error") + }) +} + +// Resume after node restart — transaction is re-sent starting from persisted attempt. +// Verifies nonce, escalated tip, gasFeeCap, and that fee history is NOT called. +func TestSendWithRetry_ResumeAfterRestart(t *testing.T) { + t.Parallel() + s := newRetryTestSetup() + store := storemock.NewStateStore() + testutil.CleanupCloser(t, store) + + previousTip := new(big.Int).Set(s.tipBase) + lastTxHash := common.HexToHash("0xdeadbeef") + + retryKey := transaction.RetryStateKey(s.nonce) + require.NoError(t, store.Put(retryKey, lastTxHash)) + + require.NoError(t, store.Put(transaction.StoredTransactionKey(lastTxHash), transaction.StoredTransaction{ + To: &s.recipient, + Data: s.txData, + GasLimit: s.gasLimit, + Value: s.value, + Nonce: s.nonce, + GasTipCap: previousTip, + GasFeeCap: big.NewInt(5000), + GasPrice: big.NewInt(0), + Created: time.Now().Unix(), + Description: "test-resume", + })) + require.NoError(t, store.Put(transaction.PendingTransactionKey(lastTxHash), struct{}{})) + + var ( + broadcastsMu sync.Mutex + broadcasts []capturedBroadcast + ) + var feeHistoryCalls atomic.Int32 + + svc, err := transaction.NewService(log.Noop, s.sender, + backendmock.New( + s.feeHistoryOption(&feeHistoryCalls), + s.headerOption(), + s.estimateGasOption(), + backendmock.WithPendingNonceAtFunc(func(ctx context.Context, account common.Address) (uint64, error) { + return s.nonce, nil + }), + backendmock.WithNonceAtFunc(func(ctx context.Context, account common.Address, blockNumber *big.Int) (uint64, error) { + return s.nonce, nil + }), + backendmock.WithSendTransactionFunc(func(ctx context.Context, tx *types.Transaction) error { + captured := captureTx(tx) + broadcastsMu.Lock() + broadcasts = append(broadcasts, captured) + broadcastsMu.Unlock() + return nil + }), + backendmock.WithTransactionByHashFunc(func(ctx context.Context, hash common.Hash) (*types.Transaction, bool, error) { + return nil, false, ethereum.NotFound + }), + ), + signermock.New(s.passThroughSigner(), s.signerAddr()), + store, + s.chainID, + monitormock.New( + monitormock.WithWatchTransactionFunc(func(txHash common.Hash, nonce uint64) (<-chan types.Receipt, <-chan error, error) { + ch := make(chan types.Receipt, 1) + ch <- types.Receipt{TxHash: txHash, Status: 1} + return ch, nil, nil + }), + ), + 0, + s.retryConfig(), + ) + require.NoError(t, err) + testutil.CleanupCloser(t, svc) + + require.Eventually(t, func() bool { + broadcastsMu.Lock() + defer broadcastsMu.Unlock() + return len(broadcasts) > 0 + }, 5*time.Second, 10*time.Millisecond, "resume should have triggered a broadcast") + + require.NoError(t, svc.Close()) + + broadcastsMu.Lock() + require.Len(t, broadcasts, 1) + resumed := broadcasts[0] + gasTipCap := new(big.Int).Set(resumed.GasTipCap) + gasFeeCap := new(big.Int).Set(resumed.GasFeeCap) + resumedNonce := resumed.Nonce + broadcastsMu.Unlock() + + assert.Equal(t, s.nonce, resumedNonce, "resumed transaction must use the same nonce") + + expectedTip := s.expectedMarketTip() + assert.Equal(t, expectedTip.Int64(), gasTipCap.Int64(), + "resumed transaction should use market tip from fresh fee history") + + assert.Equal(t, s.expectedGasFeeCap(expectedTip).Int64(), gasFeeCap.Int64(), + "resumed gasFeeCap must be baseFee*2 + escalated tip") + + assert.Equal(t, int32(1), feeHistoryCalls.Load(), + "fee history should be called on resume") + + var v string + assert.Eventually(t, func() bool { + return errors.Is(store.Get(retryKey, &v), storage.ErrNotFound) + }, 5*time.Second, 10*time.Millisecond, "retry state should be cleaned up after success") +} + +// MaxTxPrice cap prevents escalation beyond the configured limit. +func TestSendWithRetry_MaxTxPriceCap(t *testing.T) { + t.Parallel() + s := newRetryTestSetup() + store := storemock.NewStateStore() + testutil.CleanupCloser(t, store) + + marketTip := s.expectedMarketTip() + // Set maxTxPrice below baseFee*2 + MarketTip so even the first attempt fails. + maxTxPrice := new(big.Int).Sub(s.expectedGasFeeCap(marketTip), big.NewInt(1)) // 2199 + + cfg := s.retryConfig() + cfg.MaxTxPrice = maxTxPrice + + var broadcasts []capturedBroadcast + + svc, err := transaction.NewService(log.Noop, s.sender, + backendmock.New( + s.nonceOption(), + s.feeHistoryOption(nil), + s.headerOption(), + s.estimateGasOption(), + backendmock.WithSendTransactionFunc(func(ctx context.Context, tx *types.Transaction) error { + broadcasts = append(broadcasts, captureTx(tx)) + return nil + }), + ), + signermock.New(s.passThroughSigner(), s.signerAddr()), + store, + s.chainID, + monitormock.New(receiptWatchTimeout()), + 0, + cfg, + ) + require.NoError(t, err) + testutil.CleanupCloser(t, svc) + + _, _, err = svc.SendWithRetry(context.Background(), s.request()) + assert.Error(t, err) + assert.Len(t, broadcasts, 0, + "no transaction should be sent when maxTxPrice is below the minimum fee") +} + +func TestSendWithRetry_FeePriorityContextOverride(t *testing.T) { + t.Parallel() + s := newRetryTestSetup() + store := storemock.NewStateStore() + testutil.CleanupCloser(t, store) + + var broadcasts []capturedBroadcast + + cfg := s.retryConfig() + cfg.StartTier = "market" + cfg.EndTier = "aggressive" + cfg.AttemptsPerTier = 1 + + svc, err := transaction.NewService(log.Noop, s.sender, + backendmock.New( + s.nonceOption(), + s.feeHistoryOption(nil), + s.headerOption(), + s.estimateGasOption(), + backendmock.WithSendTransactionFunc(func(ctx context.Context, tx *types.Transaction) error { + broadcasts = append(broadcasts, captureTx(tx)) + return nil + }), + ), + signermock.New(s.passThroughSigner(), s.signerAddr()), + store, + s.chainID, + monitormock.New( + monitormock.WithWatchTransactionFunc(func(txHash common.Hash, nonce uint64) (<-chan types.Receipt, <-chan error, error) { + ch := make(chan types.Receipt, 1) + ch <- types.Receipt{TxHash: txHash, Status: 1} + return ch, nil, nil + }), + ), + 0, + cfg, + ) + require.NoError(t, err) + testutil.CleanupCloser(t, svc) + + ctx := sctx.SetFeePriority(context.Background(), "low") + _, receipt, err := svc.SendWithRetry(ctx, s.request()) + require.NoError(t, err) + require.NotNil(t, receipt) + require.Len(t, broadcasts, 1) + assert.Equal(t, s.tipBase.Int64(), broadcasts[0].GasTipCap.Int64(), + "request fee priority must override node start tier") +} + +func TestSendWithRetry_FeePriorityClampedToNodeMax(t *testing.T) { + t.Parallel() + s := newRetryTestSetup() + store := storemock.NewStateStore() + testutil.CleanupCloser(t, store) + + var broadcasts []capturedBroadcast + + cfg := s.retryConfig() + cfg.StartTier = "low" + cfg.EndTier = "market" + cfg.AttemptsPerTier = 1 + + svc, err := transaction.NewService(log.Noop, s.sender, + backendmock.New( + s.nonceOption(), + s.feeHistoryOption(nil), + s.headerOption(), + s.estimateGasOption(), + backendmock.WithSendTransactionFunc(func(ctx context.Context, tx *types.Transaction) error { + broadcasts = append(broadcasts, captureTx(tx)) + return nil + }), + ), + signermock.New(s.passThroughSigner(), s.signerAddr()), + store, + s.chainID, + monitormock.New( + monitormock.WithWatchTransactionFunc(func(txHash common.Hash, nonce uint64) (<-chan types.Receipt, <-chan error, error) { + ch := make(chan types.Receipt, 1) + ch <- types.Receipt{TxHash: txHash, Status: 1} + return ch, nil, nil + }), + ), + 0, + cfg, + ) + require.NoError(t, err) + testutil.CleanupCloser(t, svc) + + ctx := sctx.SetFeePriority(context.Background(), "aggressive") + _, receipt, err := svc.SendWithRetry(ctx, s.request()) + require.NoError(t, err) + require.NotNil(t, receipt) + require.Len(t, broadcasts, 1) + assert.Equal(t, s.expectedMarketTip().Int64(), broadcasts[0].GasTipCap.Int64(), + "request fee priority above node ceiling must clamp to end tier") +} + +// failOnNthPutStore wraps a StateStorer and fails the Nth Put call with putErr. +type failOnNthPutStore struct { + storage.StateStorer + failOnPut int + putErr error + callCount *int +} + +func (s *failOnNthPutStore) Put(key string, i any) error { + *s.callCount++ + if *s.callCount >= s.failOnPut { + return s.putErr + } + return s.StateStorer.Put(key, i) +} diff --git a/pkg/transaction/transaction.go b/pkg/transaction/transaction.go index 68fc74d6f53..95db388192e 100644 --- a/pkg/transaction/transaction.go +++ b/pkg/transaction/transaction.go @@ -40,6 +40,12 @@ var ( ErrTransactionReverted = errors.New("transaction reverted") ErrUnknownTransaction = errors.New("unknown transaction") ErrAlreadyImported = errors.New("already imported") + ErrTxMaxPriceExceeded = errors.New("transaction retry: exceeds maximum tx price (max fee per gas)") + // ErrSignTransaction is returned when signing a transaction fails. + ErrSignTransaction = errors.New("sign transaction") + ErrAllAttemptsExhausted = errors.New("all attempts exhausted") + // ErrUpdateRetryState is returned when persisting send-with-retry state fails. + ErrUpdateRetryState = errors.New("update transaction retry state") ) const ( @@ -49,8 +55,31 @@ const ( MinGasLimit = 21_000 // Minimum gas for any transaction GasBufferPercent = 33 // Add 33% buffer to estimated gas FallbackGasLimit = 500_000 // Fallback when estimation fails and no minimum is set + + // DefaultSendWithRetryDelay is the default wait for a receipt before escalating fees in SendWithRetry. + DefaultSendWithRetryDelay = 1 * time.Minute ) +// TransactionsRetryConfig configures SendWithRetry behaviour. Zero values are replaced by defaults in NewService. +type TransactionsRetryConfig struct { + RetryDelay time.Duration + AttemptsPerTier int + StartTier string + EndTier string + MaxTxPrice *big.Int +} + +// normalizeServiceRetryConfig fills zero fields with package defaults. +func normalizeServiceRetryConfig(c TransactionsRetryConfig) TransactionsRetryConfig { + if c.RetryDelay <= 0 { + c.RetryDelay = DefaultSendWithRetryDelay + } + if c.AttemptsPerTier <= 0 { + c.AttemptsPerTier = defaultAttemptsPerTier + } + return c +} + // TxRequest describes a request for a transaction that can be executed. type TxRequest struct { To *common.Address // recipient of the transaction @@ -83,6 +112,8 @@ type Service interface { io.Closer // Send creates a transaction based on the request (with gasprice increased by provided percentage) and sends it. Send(ctx context.Context, request *TxRequest, tipCapBoostPercent int) (txHash common.Hash, err error) + // SendWithRetry sends a transaction using fee-history tiers and automatic fee escalation; see send_tx_with_retry.go. + SendWithRetry(ctx context.Context, request *TxRequest) (txHash common.Hash, receipt *types.Receipt, err error) // Call simulate a transaction based on the request. Call(ctx context.Context, request *TxRequest) (result []byte, err error) // WaitForReceipt waits until either the transaction with the given hash has been mined or the context is cancelled. @@ -122,10 +153,18 @@ type transactionService struct { chainID *big.Int monitor Monitor fallbackGasLimit uint64 + + txRetryDelay time.Duration + attemptsPerTier int + startTier feeTier + endTier feeTier + maxTxPrice *big.Int + + retryMetrics RetryMetricsRecorder } // NewService creates a new transaction service. -func NewService(logger log.Logger, overlayEthAddress common.Address, backend Backend, signer crypto.Signer, store storage.StateStorer, chainID *big.Int, monitor Monitor, fallbackGasLimit uint64) (Service, error) { +func NewService(logger log.Logger, overlayEthAddress common.Address, backend Backend, signer crypto.Signer, store storage.StateStorer, chainID *big.Int, monitor Monitor, fallbackGasLimit uint64, retryCfg TransactionsRetryConfig) (Service, error) { senderAddress, err := signer.EthereumAddress() if err != nil { return nil, err @@ -135,7 +174,28 @@ func NewService(logger log.Logger, overlayEthAddress common.Address, backend Bac fallbackGasLimit = FallbackGasLimit } + rc := normalizeServiceRetryConfig(retryCfg) + + startTier, err := ParseFeeTier(rc.StartTier) + if err != nil { + return nil, fmt.Errorf("start tier: %w", err) + } + endTier, err := ParseFeeTier(rc.EndTier) + if err != nil { + return nil, fmt.Errorf("end tier: %w", err) + } + if startTier > endTier { + return nil, fmt.Errorf("start tier %q must not be higher than end tier %q", rc.StartTier, rc.EndTier) + } + ctx, cancel := context.WithCancel(context.Background()) + logger.Info("transaction retry configuration", + "start_tier", startTier.String(), + "end_tier", endTier.String(), + "attempts_per_tier", rc.AttemptsPerTier, + "retry_delay", rc.RetryDelay, + "max_tx_price_wei", rc.MaxTxPrice, + ) t := &transactionService{ ctx: ctx, @@ -148,22 +208,45 @@ func NewService(logger log.Logger, overlayEthAddress common.Address, backend Bac chainID: chainID, monitor: monitor, fallbackGasLimit: fallbackGasLimit, + txRetryDelay: rc.RetryDelay, + attemptsPerTier: rc.AttemptsPerTier, + startTier: startTier, + endTier: endTier, + maxTxPrice: rc.MaxTxPrice, + retryMetrics: retryMetricsFromBackend(backend), } if err = t.waitForAllPendingTx(); err != nil { return nil, err } + if err = t.resumeRetryTransactions(); err != nil { + return nil, err + } + return t, nil } func (t *transactionService) waitForAllPendingTx() error { + retryHashes, err := t.pendingRetryTransactions() + if err != nil { + return err + } + pendingTxs, err := t.PendingTransactions() if err != nil { return err } - pending := t.filterPendingTransactions(t.ctx, pendingTxs) + nonRetry := make([]common.Hash, 0, len(pendingTxs)) + for _, txHash := range pendingTxs { + if _, skip := retryHashes[txHash]; skip { + continue + } + nonRetry = append(nonRetry, txHash) + } + + pending := t.filterPendingTransactions(t.ctx, nonRetry) for txHash := range pending { t.waitForPendingTx(txHash) @@ -278,9 +361,7 @@ func (t *transactionService) StoredTransaction(txHash common.Hash) (*StoredTrans return &tx, nil } -// prepareTransaction creates a signable transaction based on a request. -func (t *transactionService) prepareTransaction(ctx context.Context, request *TxRequest, nonce uint64, boostPercent int) (tx *types.Transaction, err error) { - var gasLimit uint64 +func (t *transactionService) estimateGasLimit(ctx context.Context, request *TxRequest) (gasLimit uint64, err error) { if request.GasLimit == 0 { // Estimate gas using pending state for consistency with PendingNonceAt gasLimit, err = t.backend.EstimateGas(ctx, ethereum.CallMsg{ @@ -330,7 +411,16 @@ func (t *transactionService) prepareTransaction(ctx context.Context, request *Tx } if gasLimit == 0 { - return nil, errors.New("gas limit cannot be zero") + return 0, errors.New("gas limit cannot be zero") + } + return gasLimit, nil +} + +// prepareTransaction creates a signable transaction based on a request. +func (t *transactionService) prepareTransaction(ctx context.Context, request *TxRequest, nonce uint64, boostPercent int) (tx *types.Transaction, err error) { + gasLimit, err := t.estimateGasLimit(ctx, request) + if err != nil { + return nil, err } /* diff --git a/pkg/transaction/transaction_test.go b/pkg/transaction/transaction_test.go index 0a43b055d98..a1668dc6a44 100644 --- a/pkg/transaction/transaction_test.go +++ b/pkg/transaction/transaction_test.go @@ -176,6 +176,7 @@ func TestTransactionSend(t *testing.T) { }), ), 0, + transaction.TransactionsRetryConfig{}, ) if err != nil { t.Fatal(err) @@ -257,6 +258,7 @@ func TestTransactionSend(t *testing.T) { }), ), 0, + transaction.TransactionsRetryConfig{}, ) if err != nil { t.Fatal(err) @@ -344,6 +346,7 @@ func TestTransactionSend(t *testing.T) { }), ), 0, + transaction.TransactionsRetryConfig{}, ) if err != nil { t.Fatal(err) @@ -423,6 +426,7 @@ func TestTransactionSend(t *testing.T) { chainID, monitormock.New(), 0, + transaction.TransactionsRetryConfig{}, ) if err != nil { t.Fatal(err) @@ -489,6 +493,7 @@ func TestTransactionSend(t *testing.T) { chainID, monitormock.New(), 0, + transaction.TransactionsRetryConfig{}, ) if err != nil { t.Fatal(err) @@ -556,6 +561,7 @@ func TestTransactionSend(t *testing.T) { chainID, monitormock.New(), 0, + transaction.TransactionsRetryConfig{}, ) if err != nil { t.Fatal(err) @@ -625,6 +631,7 @@ func TestTransactionSend(t *testing.T) { chainID, monitormock.New(), 0, + transaction.TransactionsRetryConfig{}, ) if err != nil { t.Fatal(err) @@ -694,6 +701,7 @@ func TestTransactionSend(t *testing.T) { chainID, monitormock.New(), 0, + transaction.TransactionsRetryConfig{}, ) if err != nil { t.Fatal(err) @@ -766,6 +774,7 @@ func TestTransactionSend(t *testing.T) { chainID, monitormock.New(), 0, + transaction.TransactionsRetryConfig{}, ) if err != nil { t.Fatal(err) @@ -840,6 +849,7 @@ func TestTransactionSend(t *testing.T) { chainID, monitormock.New(), 0, + transaction.TransactionsRetryConfig{}, ) if err != nil { t.Fatal(err) @@ -914,6 +924,7 @@ func TestTransactionSend(t *testing.T) { chainID, monitormock.New(), 0, + transaction.TransactionsRetryConfig{}, ) if err != nil { t.Fatal(err) @@ -983,6 +994,7 @@ func TestTransactionSend(t *testing.T) { chainID, monitormock.New(), 0, + transaction.TransactionsRetryConfig{}, ) if err != nil { t.Fatal(err) @@ -1052,6 +1064,7 @@ func TestTransactionSend(t *testing.T) { chainID, monitormock.New(), 0, + transaction.TransactionsRetryConfig{}, ) if err != nil { t.Fatal(err) @@ -1124,6 +1137,7 @@ func TestTransactionWaitForReceipt(t *testing.T) { }), ), 0, + transaction.TransactionsRetryConfig{}, ) if err != nil { t.Fatal(err) @@ -1198,6 +1212,7 @@ func TestTransactionResend(t *testing.T) { chainID, monitormock.New(), 0, + transaction.TransactionsRetryConfig{}, ) if err != nil { t.Fatal(err) @@ -1286,6 +1301,7 @@ func TestTransactionCancel(t *testing.T) { chainID, monitormock.New(), 0, + transaction.TransactionsRetryConfig{}, ) if err != nil { t.Fatal(err) @@ -1337,6 +1353,7 @@ func TestTransactionCancel(t *testing.T) { chainID, monitormock.New(), 0, + transaction.TransactionsRetryConfig{}, ) if err != nil { t.Fatal(err) @@ -1423,6 +1440,7 @@ func TestTransactionService_UnwrapABIError(t *testing.T) { chainID, monitormock.New(), 0, + transaction.TransactionsRetryConfig{}, ) if err != nil { t.Fatal(err) diff --git a/pkg/transaction/wrapped/fee.go b/pkg/transaction/wrapped/fee.go index dd8f5b80e80..2e382544d7a 100644 --- a/pkg/transaction/wrapped/fee.go +++ b/pkg/transaction/wrapped/fee.go @@ -9,6 +9,8 @@ import ( "errors" "fmt" "math/big" + + "github.com/ethersphere/bee/v2/pkg/transaction" ) const ( @@ -81,3 +83,20 @@ func (b *wrappedBackend) SuggestedFeeAndTip(ctx context.Context, gasPrice *big.I return gasFeeCap, gasTipCap, nil } + +func (b *wrappedBackend) SuggestedFeeAndTipsFromHistory(ctx context.Context, lastBlock *big.Int) (*transaction.FeeHistorySuggestedFeeAndTips, error) { + fh, err := b.FeeHistory(ctx, b.feeHistoryBlockCount, lastBlock, b.feeHistoryRewardPercentiles) + if err != nil { + return nil, err + } + low, market, aggressive, err := suggestedFeesFromFeeHistoryResult(fh) + if err != nil { + b.metrics.FeeHistoryParseErrors.Inc() + return nil, err + } + return &transaction.FeeHistorySuggestedFeeAndTips{ + LowTip: low, + MarketTip: market, + AggressiveTip: aggressive, + }, nil +} diff --git a/pkg/transaction/wrapped/fee_history.go b/pkg/transaction/wrapped/fee_history.go new file mode 100644 index 00000000000..b741a755a4e --- /dev/null +++ b/pkg/transaction/wrapped/fee_history.go @@ -0,0 +1,61 @@ +// Copyright 2026 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package wrapped + +import ( + "errors" + "fmt" + "math/big" + "sort" + + "github.com/ethereum/go-ethereum" +) + +func suggestedFeesFromFeeHistoryResult(fh *ethereum.FeeHistory) (low, market, aggressive *big.Int, err error) { + if fh == nil { + return nil, nil, nil, errors.New("fee history: empty response") + } + if len(fh.BaseFee) == 0 { + return nil, nil, nil, errors.New("fee history: no base fees") + } + low, err = medianPriorityTipAtPercentileIndex(fh.Reward, 0) + if err != nil { + return nil, nil, nil, err + } + market, err = medianPriorityTipAtPercentileIndex(fh.Reward, 1) + if err != nil { + return nil, nil, nil, err + } + aggressive, err = medianPriorityTipAtPercentileIndex(fh.Reward, 2) + if err != nil { + return nil, nil, nil, err + } + return low, market, aggressive, nil +} + +func medianPriorityTipAtPercentileIndex(reward [][]*big.Int, idx int) (*big.Int, error) { + var vals []*big.Int + for _, row := range reward { + if idx >= len(row) { + continue + } + if row[idx] == nil { + continue + } + vals = append(vals, new(big.Int).Set(row[idx])) + } + if len(vals) == 0 { + return nil, fmt.Errorf("fee history: no reward entries for percentile index %d", idx) + } + sort.Slice(vals, func(i, j int) bool { + return vals[i].Cmp(vals[j]) < 0 + }) + mid := len(vals) / 2 + if len(vals)%2 == 0 { + sum := new(big.Int).Add(vals[mid-1], vals[mid]) + return sum.Div(sum, big.NewInt(2)), nil + } + return vals[mid], nil +} diff --git a/pkg/transaction/wrapped/fee_history_test.go b/pkg/transaction/wrapped/fee_history_test.go new file mode 100644 index 00000000000..57e69058589 --- /dev/null +++ b/pkg/transaction/wrapped/fee_history_test.go @@ -0,0 +1,56 @@ +// Copyright 2026 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package wrapped + +import ( + "math/big" + "testing" + + "github.com/ethereum/go-ethereum" +) + +func TestSuggestedFeeAndTipsFromFeeHistoryResult(t *testing.T) { + t.Parallel() + + base := big.NewInt(1000) + fh := ðereum.FeeHistory{ + BaseFee: []*big.Int{big.NewInt(1), base}, + Reward: [][]*big.Int{ + {big.NewInt(10), big.NewInt(50), big.NewInt(90)}, + {big.NewInt(20), big.NewInt(60), big.NewInt(100)}, + }, + } + + low, market, agg, err := suggestedFeesFromFeeHistoryResult(fh) + if err != nil { + t.Fatal(err) + } + if got, want := low.String(), "15"; got != want { + t.Fatalf("low: got %s want %s", got, want) + } + if got, want := market.String(), "55"; got != want { + t.Fatalf("market: got %s want %s", got, want) + } + if got, want := agg.String(), "95"; got != want { + t.Fatalf("aggressive: got %s want %s", got, want) + } +} + +func TestSuggestedFeeAndTipsFromFeeHistoryResult_NoRewardEntries(t *testing.T) { + t.Parallel() + + fh := ðereum.FeeHistory{ + BaseFee: []*big.Int{big.NewInt(1000)}, + Reward: [][]*big.Int{}, + } + + low, market, aggressive, err := suggestedFeesFromFeeHistoryResult(fh) + if err == nil { + t.Fatal("expected error when fee history has no reward entries") + } + if low != nil || market != nil || aggressive != nil { + t.Fatal("expected nil tips on error") + } +} diff --git a/pkg/transaction/wrapped/fee_test.go b/pkg/transaction/wrapped/fee_test.go index 33ddc31da88..b5905eaac79 100644 --- a/pkg/transaction/wrapped/fee_test.go +++ b/pkg/transaction/wrapped/fee_test.go @@ -108,6 +108,7 @@ func TestSuggestedFeeAndTip(t *testing.T) { minimumGasTipCap, 5*time.Second, 90, + 0, ) gasFeeCap, gasTipCap, err := backend.SuggestedFeeAndTip(ctx, tc.gasPrice, tc.boostPercent) diff --git a/pkg/transaction/wrapped/metrics.go b/pkg/transaction/wrapped/metrics.go index e4a0f8ee3bd..6b6ffd68a1b 100644 --- a/pkg/transaction/wrapped/metrics.go +++ b/pkg/transaction/wrapped/metrics.go @@ -6,9 +6,12 @@ package wrapped import ( m "github.com/ethersphere/bee/v2/pkg/metrics" + "github.com/ethersphere/bee/v2/pkg/transaction" "github.com/prometheus/client_golang/prometheus" ) +var _ transaction.RetryMetricsRecorder = (*metrics)(nil) + type metrics struct { TotalRPCCalls prometheus.Counter TotalRPCErrors prometheus.Counter @@ -27,10 +30,16 @@ type metrics struct { SendTransactionCalls prometheus.Counter FilterLogsCalls prometheus.Counter ChainIDCalls prometheus.Counter + FeeHistoryCalls prometheus.Counter + FeeHistoryParseErrors prometheus.Counter + + SendWithRetryAttemptsPerTransaction prometheus.Histogram + SendWithRetryOutcomesTotal *prometheus.CounterVec } func newMetrics() metrics { subsystem := "eth_backend" + retrySubsystem := "transaction_retry" return metrics{ TotalRPCCalls: prometheus.NewCounter(prometheus.CounterOpts{ @@ -129,6 +138,38 @@ func newMetrics() metrics { Name: "calls_chain_id", Help: "Count of eth_chainId rpc calls", }), + FeeHistoryCalls: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "calls_fee_history", + Help: "Count of eth_feeHistory rpc calls", + }), + FeeHistoryParseErrors: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "fee_history_parse_errors", + Help: "Count of failures to derive suggested fees from fee history response", + }), + SendWithRetryAttemptsPerTransaction: prometheus.NewHistogram(prometheus.HistogramOpts{ + Namespace: m.Namespace, + Subsystem: retrySubsystem, + Name: "attempts_per_transaction", + Help: "Broadcast attempts per SendWithRetry invocation (1 = no retry needed).", + Buckets: []float64{1, 2, 3, 4, 5, 6, 8, 10}, + }), + SendWithRetryOutcomesTotal: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: retrySubsystem, + Name: "outcomes_total", + Help: "Finished SendWithRetry invocations by outcome.", + }, []string{"result"}), + } +} + +func (m *metrics) RecordRetryComplete(broadcastAttempts int, err error) { + m.SendWithRetryOutcomesTotal.WithLabelValues(transaction.RetryOutcomeLabel(err)).Inc() + if broadcastAttempts > 0 { + m.SendWithRetryAttemptsPerTransaction.Observe(float64(broadcastAttempts)) } } @@ -137,3 +178,7 @@ func (b *wrappedBackend) Metrics() []prometheus.Collector { collectors = append(collectors, b.blockNumberCache.Collectors()...) return collectors } + +func (b *wrappedBackend) RetryMetrics() transaction.RetryMetricsRecorder { + return &b.metrics +} diff --git a/pkg/transaction/wrapped/wrapped.go b/pkg/transaction/wrapped/wrapped.go index c8afd8599ab..e1f637bb54c 100644 --- a/pkg/transaction/wrapped/wrapped.go +++ b/pkg/transaction/wrapped/wrapped.go @@ -8,6 +8,7 @@ import ( "context" "errors" "math/big" + "slices" "time" "github.com/ethereum/go-ethereum" @@ -20,18 +21,24 @@ import ( var _ transaction.Backend = (*wrappedBackend)(nil) +const feeHistoryDefaultBlockCount = 100 + +var feeHistoryDefaultRewardPercentiles = []float64{10, 50, 90} + type blockNumberAnchor struct { number uint64 timestamp time.Time } type wrappedBackend struct { - backend backend.Geth - metrics metrics - minimumGasTipCap int64 - blockTime time.Duration - blockSyncInterval uint64 - blockNumberCache *cache.SingleFlightCache[blockNumberAnchor] + backend backend.Geth + metrics metrics + minimumGasTipCap int64 + blockTime time.Duration + blockSyncInterval uint64 + blockNumberCache *cache.SingleFlightCache[blockNumberAnchor] + feeHistoryBlockCount uint64 + feeHistoryRewardPercentiles []float64 } func NewBackend( @@ -39,18 +46,25 @@ func NewBackend( minimumGasTipCap uint64, blockTime time.Duration, blockSyncInterval uint64, + feeHistoryBlockCount uint64, ) transaction.Backend { if blockSyncInterval == 0 { blockSyncInterval = 1 } + if feeHistoryBlockCount == 0 { + feeHistoryBlockCount = feeHistoryDefaultBlockCount + } + return &wrappedBackend{ - backend: backend, - minimumGasTipCap: int64(minimumGasTipCap), - blockTime: blockTime, - metrics: newMetrics(), - blockSyncInterval: blockSyncInterval, - blockNumberCache: cache.NewSingleFlightCache[blockNumberAnchor]("block_number"), + backend: backend, + metrics: newMetrics(), + minimumGasTipCap: int64(minimumGasTipCap), + blockTime: blockTime, + blockSyncInterval: blockSyncInterval, + blockNumberCache: cache.NewSingleFlightCache[blockNumberAnchor]("block_number"), + feeHistoryBlockCount: feeHistoryBlockCount, + feeHistoryRewardPercentiles: slices.Clone(feeHistoryDefaultRewardPercentiles), } } @@ -253,6 +267,17 @@ func (b *wrappedBackend) ChainID(ctx context.Context) (*big.Int, error) { return chainID, nil } +func (b *wrappedBackend) FeeHistory(ctx context.Context, blockCount uint64, lastBlock *big.Int, rewardPercentiles []float64) (*ethereum.FeeHistory, error) { + b.metrics.TotalRPCCalls.Inc() + b.metrics.FeeHistoryCalls.Inc() + fh, err := b.backend.FeeHistory(ctx, blockCount, lastBlock, rewardPercentiles) + if err != nil { + b.metrics.TotalRPCErrors.Inc() + return nil, err + } + return fh, nil +} + func (b *wrappedBackend) Close() { b.backend.Close() } diff --git a/pkg/transaction/wrapped/wrapped_test.go b/pkg/transaction/wrapped/wrapped_test.go index b0af463f6a6..14c5b54f983 100644 --- a/pkg/transaction/wrapped/wrapped_test.go +++ b/pkg/transaction/wrapped/wrapped_test.go @@ -188,6 +188,7 @@ func newTestWrappedBackend(t *testing.T, opts ...backendmock.Option) *wrappedBac testMinimumGasTipCap, testBlockTime, testBlockSyncInterval, + 0, ).(*wrappedBackend) assert.True(t, ok)