Skip to content
Draft
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
d63daed
miner, core, consensus/bor: pipelined state root computation (PoC)
pratikspatil024 Mar 30, 2026
ceca519
miner: run speculative fillTransactions concurrently with SRC and rem…
pratikspatil024 Apr 1, 2026
3dcdf80
miner: async DB write, concurrent fill, and interrupt timer improvements
pratikspatil024 Apr 1, 2026
07345ad
llint fix
pratikspatil024 Apr 1, 2026
a86db50
addressed comments and fix test, lint
pratikspatil024 Apr 2, 2026
8d5ed1b
core/stateless: (fix unit test) fix NewWitness zeroing breaking witne…
pratikspatil024 Apr 2, 2026
0e2da86
core, consensus/bor, eth, triedb: pipelined state root computation fo…
pratikspatil024 Apr 9, 2026
b283227
tests/bor: add pipelined import SRC self-destruct integration test
pratikspatil024 Apr 10, 2026
acc0ef7
core/state, triedb/pathdb: fix prefetcher race during pipelined SRC
pratikspatil024 Apr 10, 2026
5d45f02
miner, consensus/bor, core, eth: harden pipelined SRC abort handling
pratikspatil024 Apr 16, 2026
46e9ea4
core, miner, core/state: added metrics for pipelined SRC
pratikspatil024 Apr 21, 2026
caac30e
core, core/state, eth, tests/bor, miner: refactor pipelined-src funct…
pratikspatil024 Apr 22, 2026
d2d6641
core, core/txpool, miner: PR review fixes + pipelined import hardening
pratikspatil024 Apr 23, 2026
946f137
core: added metrics for preloadFlatDiffReads in pipelined SRC
pratikspatil024 Apr 28, 2026
d00791d
core: added metrics - cheap exec and auto-collection phases for pipel…
pratikspatil024 Apr 28, 2026
68e80ac
core: stop execution prefetcher in pipelined import path
pratikspatil024 Apr 30, 2026
8ef3578
core, miner: honour producewitnesses in pipelined import SRC path
pratikspatil024 May 1, 2026
bbaa0d6
core: use multi-reader StateDB on pipelined SRC witness-off path
pratikspatil024 May 4, 2026
fcab1bc
core, core/stateless, miner: share execution witness with pipelined SRC
pratikspatil024 May 5, 2026
ecf6ebf
core, core/state, eth, internal/cli, miner: add warm-snapshot handoff…
pratikspatil024 May 5, 2026
d9e22cd
core/state: use warm snapshot for SRC commit trie opens
pratikspatil024 May 6, 2026
696c3c1
core, core/state: add import phase observability
pratikspatil024 May 6, 2026
1bf3f2b
core, core/state: split pipelined prefetch-stop observability
pratikspatil024 May 6, 2026
e68753e
core, core/state: build warm snapshot inside SRC goroutine
pratikspatil024 May 7, 2026
8a24f4e
core/state: make pipelined SRC prefetch stop snapshot-fast
pratikspatil024 May 7, 2026
665746d
core/state: bound snapshot-fast prefetch drain latency
pratikspatil024 May 7, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 108 additions & 63 deletions consensus/bor/bor.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,18 @@
inmemorySnapshots = 128 // Number of recent vote snapshots to keep in memory
inmemorySignatures = 4096 // Number of recent block signatures to keep in memory
veblopBlockTimeout = time.Second * 8 // Timeout for new span check. DO NOT CHANGE THIS VALUE.
minBlockBuildTime = 1 * time.Second // Minimum remaining time before extending the block deadline to avoid empty blocks
// minBlockBuildTime is the minimum remaining time before Prepare() extends
// the block deadline to avoid producing empty blocks. If time.Until(target)
// is less than this value, the target timestamp is pushed forward by one
// blockTime period.
//
// This interacts with pipelined SRC: when a speculative block is aborted,
// the pipeline triggers a fresh commitWork. On chains where blockTime ==
// minBlockBuildTime (e.g., 1-second devnets), the remaining time after the
// abort (~990ms) is always less than minBlockBuildTime, so the timestamp is
// always pushed — adding an extra 1s gap. On mainnet (2s blocks), the
// remaining time (~1.99s) exceeds minBlockBuildTime, so no push occurs.
minBlockBuildTime = 1 * time.Second
)

// Bor protocol constants.
Expand Down Expand Up @@ -1193,7 +1204,7 @@
// check and commit span
if !c.config.IsRio(header.Number) {
if err := c.checkAndCommitSpan(wrappedState, header, cx); err != nil {
log.Error("Error while committing span", "error", err)

Check failure on line 1207 in consensus/bor/bor.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Define a constant instead of duplicating this literal "Error while committing span" 3 times.

See more on https://sonarcloud.io/project/issues?id=0xPolygon_bor&issues=AZ1JxLOFMy6llO4qwxL2&open=AZ1JxLOFMy6llO4qwxL2&pullRequest=2180
return nil
}
}
Expand All @@ -1202,7 +1213,7 @@
// commit states
stateSyncData, err = c.CommitStates(wrappedState, header, cx)
if err != nil {
log.Error("Error while committing states", "error", err)

Check failure on line 1216 in consensus/bor/bor.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Define a constant instead of duplicating this literal "Error while committing states" 3 times.

See more on https://sonarcloud.io/project/issues?id=0xPolygon_bor&issues=AZ1JxLOFMy6llO4qwxL4&open=AZ1JxLOFMy6llO4qwxL4&pullRequest=2180
return nil
}
}
Expand All @@ -1215,7 +1226,7 @@
// the wrapped state here as it may have a hooked state db instance which can help
// in tracing if it's enabled.
if err = c.changeContractCodeIfNeeded(headerNumber, wrappedState); err != nil {
log.Error("Error changing contract code", "error", err)

Check failure on line 1229 in consensus/bor/bor.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Define a constant instead of duplicating this literal "Error changing contract code" 3 times.

See more on https://sonarcloud.io/project/issues?id=0xPolygon_bor&issues=AZ1JxLOFMy6llO4qwxL3&open=AZ1JxLOFMy6llO4qwxL3&pullRequest=2180
return nil
}

Expand Down Expand Up @@ -1361,25 +1372,9 @@
return nil, nil, 0, err
}

// No block rewards in PoA, so the state remains as it is
start := time.Now()

// No block rewards in PoA, so the state remains as it is.
// Under delayed SRC, header.Root stores the parent block's actual state root;
// the goroutine in BlockChain.spawnSRCGoroutine handles this block's root.
if c.chainConfig.Bor != nil && c.chainConfig.Bor.IsDelayedSRC(header.Number) {
dsrcReader, ok := chain.(core.DelayedSRCReader)
if !ok {
return nil, nil, 0, fmt.Errorf("chain does not implement DelayedSRCReader")
}
parentRoot := dsrcReader.GetPostStateRoot(header.ParentHash)
if parentRoot == (common.Hash{}) {
return nil, nil, 0, fmt.Errorf("delayed state root unavailable for parent %s", header.ParentHash)
}
header.Root = parentRoot
} else {
header.Root = state.IntermediateRoot(chain.Config().IsEIP158(header.Number))
}

header.Root = state.IntermediateRoot(chain.Config().IsEIP158(header.Number))
commitTime := time.Since(start)

// Uncles are dropped
Expand All @@ -1404,6 +1399,81 @@
return block, receipts, commitTime, nil
}

// FinalizeForPipeline runs the same post-transaction state modifications as
// FinalizeAndAssemble (state sync, span commits, contract code changes) but
// does NOT compute IntermediateRoot or assemble the block. It returns the
// stateSyncData so the caller can pass it to AssembleBlock later after the
// background SRC goroutine has computed the state root.
//
// This is the pipelined SRC equivalent of the first half of FinalizeAndAssemble.
func (c *Bor) FinalizeForPipeline(chain consensus.ChainHeaderReader, header *types.Header, statedb *state.StateDB, body *types.Body, receipts []*types.Receipt) ([]*types.StateSyncData, error) {
headerNumber := header.Number.Uint64()
if body.Withdrawals != nil || header.WithdrawalsHash != nil {
return nil, consensus.ErrUnexpectedWithdrawals
}
if header.RequestsHash != nil {
return nil, consensus.ErrUnexpectedRequests
}

var (
stateSyncData []*types.StateSyncData
err error
)

if IsSprintStart(headerNumber, c.config.CalculateSprint(headerNumber)) {
cx := statefull.ChainContext{Chain: chain, Bor: c}

if !c.config.IsRio(header.Number) {
if err = c.checkAndCommitSpan(statedb, header, cx); err != nil {
log.Error("Error while committing span", "error", err)
return nil, err
}
}

if c.HeimdallClient != nil {
stateSyncData, err = c.CommitStates(statedb, header, cx)
if err != nil {
log.Error("Error while committing states", "error", err)
return nil, err
}
}
}

if err = c.changeContractCodeIfNeeded(headerNumber, statedb); err != nil {
log.Error("Error changing contract code", "error", err)
return nil, err
}

return stateSyncData, nil
}

// AssembleBlock constructs the final block from a pre-computed state root,
// without calling IntermediateRoot. This is used by pipelined SRC where the
// state root is computed by a background goroutine.
//
// stateSyncData is the state sync data collected during Finalize(). If non-nil
// and the Madhugiri fork is active, a StateSyncTx is appended to the body.
func (c *Bor) AssembleBlock(chain consensus.ChainHeaderReader, header *types.Header, statedb *state.StateDB, body *types.Body, receipts []*types.Receipt, stateRoot common.Hash, stateSyncData []*types.StateSyncData) (*types.Block, []*types.Receipt, error) {
headerNumber := header.Number.Uint64()

header.Root = stateRoot
header.UncleHash = types.CalcUncleHash(nil)

if len(stateSyncData) > 0 && c.config != nil && c.config.IsMadhugiri(big.NewInt(int64(headerNumber))) {
stateSyncTx := types.NewTx(&types.StateSyncTx{
StateSyncData: stateSyncData,
})
body.Transactions = append(body.Transactions, stateSyncTx)
receipts = insertStateSyncTransactionAndCalculateReceipt(stateSyncTx, header, body, statedb, receipts)
} else {
bc := chain.(core.BorStateSyncer)
bc.SetStateSync(stateSyncData)
}

block := types.NewBlock(header, body, receipts, trie.NewStackTrie(nil))
return block, receipts, nil
}

// Authorize injects a private key into the consensus engine to mint new blocks
// with.
func (c *Bor) Authorize(currentSigner common.Address, signFn SignerFn) {
Expand Down Expand Up @@ -1597,38 +1667,22 @@
headerNumber := header.Number.Uint64()

tempState := state.Inner().Copy()
if c.chainConfig.Bor != nil && c.chainConfig.Bor.IsDelayedSRC(header.Number) {
// Under delayed SRC, skip ResetPrefetcher + StartPrefetcher.
// The full-node state is at root_{N-2} with a FlatDiff overlay
// approximating root_{N-1}. ResetPrefetcher clears that overlay,
// causing GetCurrentSpan to read stale root_{N-2} values — different
// from what the stateless node sees at root_{N-1}. The mismatch leads
// to different storage-slot access patterns, so the SRC goroutine
// captures the wrong trie nodes.
//
// StartPrefetcher is also unnecessary: the witness is built by the
// SRC goroutine, and tempState's reads are captured via
// CommitSnapshot + TouchAllAddresses below.
} else {
tempState.ResetPrefetcher()
tempState.StartPrefetcher("bor", state.Witness(), nil)
}
tempState.ResetPrefetcher()
tempState.StartPrefetcher("bor", state.Witness(), nil)

span, err := c.spanner.GetCurrentSpan(ctx, header.ParentHash, tempState)
if err != nil {
return err
}

if c.chainConfig.Bor != nil && c.chainConfig.Bor.IsDelayedSRC(header.Number) {
// Under delayed SRC, use CommitSnapshot instead of IntermediateRoot
// to capture all accesses without computing a trie root. Touch
// every address on the main state so they appear in the block's
// FlatDiff and the SRC goroutine includes their trie paths in
// the witness.
tempState.CommitSnapshot(false).TouchAllAddresses(state.Inner())
} else {
tempState.IntermediateRoot(false)
}
tempState.IntermediateRoot(false)

// Propagate addresses accessed during GetCurrentSpan back to the original
// state so they appear in the FlatDiff ReadSet. Without this, the pipelined
// SRC goroutine's witness won't capture their trie proof nodes (the copy's
// reads aren't tracked on the original), causing stateless execution to fail
// with missing trie nodes for the validator contract.
tempState.PropagateReadsTo(state.Inner())

if c.needToCommitSpan(span, headerNumber) {
return c.FetchAndCommitSpan(ctx, span.Id+1, state, header, chain)
Expand Down Expand Up @@ -1765,30 +1819,21 @@
if c.config.IsIndore(header.Number) {
// Fetch the LastStateId from contract via current state instance
tempState := state.Inner().Copy()
if c.chainConfig.Bor != nil && c.chainConfig.Bor.IsDelayedSRC(header.Number) {
// See comment in checkAndCommitSpan: under delayed SRC,
// skip ResetPrefetcher + StartPrefetcher to preserve the
// FlatDiff overlay and avoid stale root_{N-2} reads.
} else {
tempState.ResetPrefetcher()
tempState.StartPrefetcher("bor", state.Witness(), nil)
}
tempState.ResetPrefetcher()
tempState.StartPrefetcher("bor", state.Witness(), nil)

lastStateIDBig, err = c.GenesisContractsClient.LastStateId(tempState, number-1, header.ParentHash)
if err != nil {
return nil, err
}

if c.chainConfig.Bor != nil && c.chainConfig.Bor.IsDelayedSRC(header.Number) {
// Under delayed SRC, use CommitSnapshot instead of
// IntermediateRoot to capture all accesses without computing
// a trie root. Touch every address on the main state so they
// appear in the block's FlatDiff and the SRC goroutine
// includes their trie paths in the witness.
tempState.CommitSnapshot(false).TouchAllAddresses(state.Inner())
} else {
tempState.IntermediateRoot(false)
}
tempState.IntermediateRoot(false)

// Propagate addresses accessed during LastStateId back to the original
// state so they appear in the FlatDiff ReadSet. Without this, the
// pipelined SRC goroutine's witness won't capture their trie proof
// nodes, causing stateless execution to fail with missing trie nodes.
tempState.PropagateReadsTo(state.Inner())

stateSyncDelay := c.config.CalculateStateSyncDelay(number)
to = time.Unix(int64(header.Time-stateSyncDelay), 0)
Expand Down
46 changes: 31 additions & 15 deletions core/block_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
"errors"
"fmt"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
Expand Down Expand Up @@ -128,6 +127,37 @@
return nil
}

// ValidateStateCheap validates the cheap (non-trie) post-state checks: gas used,
// bloom filter, receipt root, and requests hash. It does NOT compute the state
// root (IntermediateRoot), which is the expensive operation. Used by the pipelined
// import path where IntermediateRoot is deferred to a background SRC goroutine.
func (v *BlockValidator) ValidateStateCheap(block *types.Block, statedb *state.StateDB, res *ProcessResult) error {
if res == nil {
return errors.New("nil ProcessResult value")
}
header := block.Header()
if block.GasUsed() != res.GasUsed {
return fmt.Errorf("%w (remote: %d local: %d)", ErrGasUsedMismatch, block.GasUsed(), res.GasUsed)
}
rbloom := types.MergeBloom(res.Receipts)
if rbloom != header.Bloom {
return fmt.Errorf("%w (remote: %x local: %x)", ErrBloomMismatch, header.Bloom, rbloom)
}
receiptSha := types.DeriveSha(res.Receipts, trie.NewStackTrie(nil))
if receiptSha != header.ReceiptHash {
return fmt.Errorf("%w (remote: %x local: %x)", ErrReceiptRootMismatch, header.ReceiptHash, receiptSha)

Check failure on line 148 in core/block_validator.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Define a constant instead of duplicating this literal "%w (remote: %x local: %x)" 4 times.

See more on https://sonarcloud.io/project/issues?id=0xPolygon_bor&issues=AZ1x0w37i9Z6KQlE8hGd&open=AZ1x0w37i9Z6KQlE8hGd&pullRequest=2180
}
if header.RequestsHash != nil {
reqhash := types.CalcRequestsHash(res.Requests)
if reqhash != *header.RequestsHash {
return fmt.Errorf("%w (remote: %x local: %x)", ErrRequestsHashMismatch, *header.RequestsHash, reqhash)
}
} else if res.Requests != nil {
return errors.New("block has requests before prague fork")
}
return nil
}

// ValidateState validates the various changes that happen after a state transition,
// such as amount of used gas, the receipt roots and the state root itself.
func (v *BlockValidator) ValidateState(block *types.Block, statedb *state.StateDB, res *ProcessResult, stateless bool) error {
Expand Down Expand Up @@ -167,20 +197,6 @@
} else if res.Requests != nil {
return errors.New("block has requests before prague fork")
}
// Under delayed SRC, header.Root = state root of the PARENT block.
// Verify it matches the persisted delayed root and skip IntermediateRoot —
// the background goroutine spawned by spawnSRCGoroutine computes root_N.
if v.config.Bor != nil && v.config.Bor.IsDelayedSRC(header.Number) {
parentActualRoot := v.bc.GetPostStateRoot(header.ParentHash)
if parentActualRoot == (common.Hash{}) {
return fmt.Errorf("delayed state root unavailable for parent %x", header.ParentHash)
}
if header.Root != parentActualRoot {
return fmt.Errorf("invalid delayed state root (header: %x, parent actual: %x)", header.Root, parentActualRoot)
}
return nil
}

// Validate the state root against the received state root and throw
// an error if they don't match.
if root := statedb.IntermediateRoot(v.config.IsEIP158(header.Number)); header.Root != root {
Expand Down
Loading
Loading