Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
4 changes: 4 additions & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
# Auto detect text files and perform LF normalization
* text=auto
*.sol linguist-language=Solidity
core/blockstm/testdata/*.witness.gz filter=lfs diff=lfs merge=lfs -text
core/blockstm/testdata/*.block filter=lfs diff=lfs merge=lfs -text
core/blockstm/testdata/codes.tar.gz filter=lfs diff=lfs merge=lfs -text
core/blockstm/testdata/codes/*.bin filter=lfs diff=lfs merge=lfs -text
9 changes: 8 additions & 1 deletion core/block_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,18 @@ package core
import (
"errors"
"fmt"
"time"

"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/trie"
)

var intermediateRootTimer = metrics.NewRegisteredTimer("chain/intermediateroot", nil)

// BlockValidator is responsible for validating block headers, uncles and
// processed state.
//
Expand Down Expand Up @@ -168,7 +172,10 @@ func (v *BlockValidator) ValidateState(block *types.Block, statedb *state.StateD
}
// Validate the state root against the received state root and throw
// an error if they don't match.
if root := statedb.IntermediateRoot(v.config.IsEIP158(header.Number)); header.Root != root {
irStart := time.Now()
root := statedb.IntermediateRoot(v.config.IsEIP158(header.Number))
intermediateRootTimer.UpdateSince(irStart)
if header.Root != root {
return fmt.Errorf("invalid merkle root (remote: %x local: %x) dberr: %w", header.Root, root, statedb.Error())
}

Expand Down
242 changes: 165 additions & 77 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@
blockExecutionParallelErrorCounter = metrics.NewRegisteredCounter("chain/execution/parallel/error", nil)
blockExecutionParallelTimer = metrics.NewRegisteredTimer("chain/execution/parallel/timer", nil)
blockExecutionSerialTimer = metrics.NewRegisteredTimer("chain/execution/serial/timer", nil)
blockMgaspsMeter = metrics.NewRegisteredHistogram("chain/execution/mgasps", nil, metrics.NewUniformSample(10240))

statelessParallelImportTimer = metrics.NewRegisteredTimer("chain/imports/stateless/parallel", nil)
statelessSequentialImportTimer = metrics.NewRegisteredTimer("chain/imports/stateless/sequential", nil)
Expand Down Expand Up @@ -700,91 +701,140 @@
return nil, err
}

bc.parallelProcessor = NewParallelStateProcessor(bc.hc, bc)
bc.parallelProcessor = NewV2StateProcessor(bc.hc, bc, numprocs)
bc.parallelSpeculativeProcesses = numprocs
bc.enforceParallelProcessor = enforce

return bc, nil
}

func (bc *BlockChain) ProcessBlock(block *types.Block, parent *types.Header, witness *stateless.Witness, followupInterrupt *atomic.Bool) (_ types.Receipts, _ []*types.Log, _ uint64, _ *state.StateDB, vtime time.Duration, blockEndErr error) {
// Process the block using processor and parallelProcessor at the same time, take the one which finishes first, cancel the other, and return the result
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

if followupInterrupt == nil {
followupInterrupt = &atomic.Bool{}
}

if bc.logger != nil && bc.logger.OnBlockStart != nil {
td := bc.GetTd(block.ParentHash(), block.NumberU64()-1)
bc.logger.OnBlockStart(tracing.BlockEvent{
Block: block,
TD: td,
Finalized: bc.CurrentFinalBlock(),
Safe: bc.CurrentSafeBlock(),
})
}

if bc.logger != nil && bc.logger.OnBlockEnd != nil {
defer func() {
bc.logger.OnBlockEnd(blockEndErr)
}()
// fireBlockStart emits the OnBlockStart tracing event when a tracer is set.
func (bc *BlockChain) fireBlockStart(block *types.Block) {
if bc.logger == nil || bc.logger.OnBlockStart == nil {
return
}
td := bc.GetTd(block.ParentHash(), block.NumberU64()-1)
bc.logger.OnBlockStart(tracing.BlockEvent{
Block: block,
TD: td,
Finalized: bc.CurrentFinalBlock(),
Safe: bc.CurrentSafeBlock(),
})
}

parentRoot := parent.Root
prefetch, process, err := bc.statedb.ReadersWithCacheStats(parentRoot)
// setupBlockReaders builds the three StateDBs needed for parallel block
// processing: throwaway (for prefetcher), statedb (for serial processor),
// and parallelStatedb (for V2). The V2 statedb has concurrent reads
// enabled before the prefetcher runs so the underlying trieReader uses
// muSubTries throughout — switching mid-flight would race.
func (bc *BlockChain) setupBlockReaders(parentRoot common.Hash) (
throwaway, statedb, parallelStatedb *state.StateDB,
prefetch, process state.ReaderWithStats, err error,
) {
prefetch, process, parallel, err := bc.statedb.ReadersWithCacheStatsTriple(parentRoot)
if err != nil {
return nil, nil, 0, nil, 0, err
return nil, nil, nil, nil, nil, err
}
throwaway, err := state.NewWithReader(parentRoot, bc.statedb, prefetch)
if err != nil {
return nil, nil, 0, nil, 0, err
if throwaway, err = state.NewWithReader(parentRoot, bc.statedb, prefetch); err != nil {
return nil, nil, nil, nil, nil, err
}
statedb, err := state.NewWithReader(parentRoot, bc.statedb, process)
if err != nil {
return nil, nil, 0, nil, 0, err
if statedb, err = state.NewWithReader(parentRoot, bc.statedb, process); err != nil {
return nil, nil, nil, nil, nil, err
}
parallelStatedb, err := state.NewWithReader(parentRoot, bc.statedb, process)
if err != nil {
return nil, nil, 0, nil, 0, err
if parallelStatedb, err = state.NewWithReader(parentRoot, bc.statedb, parallel); err != nil {
return nil, nil, nil, nil, nil, err
}
parallelStatedb.EnableConcurrentReads()
return throwaway, statedb, parallelStatedb, prefetch, process, nil
}

// Upload the statistics of reader at the end
defer func() {
stats := prefetch.GetStats()
accountCacheHitPrefetchMeter.Mark(stats.AccountHit)
accountCacheMissPrefetchMeter.Mark(stats.AccountMiss)
storageCacheHitPrefetchMeter.Mark(stats.StorageHit)
storageCacheMissPrefetchMeter.Mark(stats.StorageMiss)
stats = process.GetStats()
accountCacheHitMeter.Mark(stats.AccountHit)
accountCacheMissMeter.Mark(stats.AccountMiss)
storageCacheHitMeter.Mark(stats.StorageHit)
storageCacheMissMeter.Mark(stats.StorageMiss)

// Report additional prefetch attribution metrics
prefetchStats := prefetch.GetPrefetchStats()
accountInsertPrefetchMeter.Mark(prefetchStats.AccountInsert)
storageInsertPrefetchMeter.Mark(prefetchStats.StorageInsert)

processStats := process.GetPrefetchStats()
accountHitFromPrefetchMeter.Mark(processStats.AccountHitFromPrefetch)
storageHitFromPrefetchMeter.Mark(processStats.StorageHitFromPrefetch)
accountHitFromPrefetchUniqueMeter.Mark(processStats.AccountHitFromPrefetchUnique)
}()
// reportReaderStats marks per-block cache hit/miss meters from prefetch and
// process readers. Intended to be called via defer at the end of ProcessBlock.
func reportReaderStats(prefetch, process state.ReaderWithStats) {
stats := prefetch.GetStats()
accountCacheHitPrefetchMeter.Mark(stats.AccountHit)
accountCacheMissPrefetchMeter.Mark(stats.AccountMiss)
storageCacheHitPrefetchMeter.Mark(stats.StorageHit)
storageCacheMissPrefetchMeter.Mark(stats.StorageMiss)

stats = process.GetStats()
accountCacheHitMeter.Mark(stats.AccountHit)
accountCacheMissMeter.Mark(stats.AccountMiss)
storageCacheHitMeter.Mark(stats.StorageHit)
storageCacheMissMeter.Mark(stats.StorageMiss)

prefetchStats := prefetch.GetPrefetchStats()
accountInsertPrefetchMeter.Mark(prefetchStats.AccountInsert)
storageInsertPrefetchMeter.Mark(prefetchStats.StorageInsert)

processStats := process.GetPrefetchStats()
accountHitFromPrefetchMeter.Mark(processStats.AccountHitFromPrefetch)
storageHitFromPrefetchMeter.Mark(processStats.StorageHitFromPrefetch)
accountHitFromPrefetchUniqueMeter.Mark(processStats.AccountHitFromPrefetchUnique)
}

Check warning on line 774 in core/blockchain.go

View check run for this annotation

Claude / Claude Code Review

V2 reader cache hit/miss stats are silently dropped

V2 (parallel) reader cache hit/miss stats are silently dropped. `setupBlockReaders` calls `ReadersWithCacheStatsTriple` to create three independent `ReaderWithStats` wrappers (prefetch, process, parallel) and wires the parallel one into `parallelStatedb` (core/blockchain.go:744), but `reportReaderStats` (line 753) only reads stats from prefetch and process — the parallel reader's atomic counters are accumulated and discarded each block. Pure metrics regression (no correctness impact); fix is to
Comment thread
claude[bot] marked this conversation as resolved.
Outdated

// sharedBlockCaches holds VM-level caches that are shared between the
// prefetcher goroutine and the V2 BlockSTM workers for a single block.
type sharedBlockCaches struct {
jumpDests vm.JumpDestCache
keccak *sync.Map
ecrecover *sync.Map
}

func newSharedBlockCaches() *sharedBlockCaches {
return &sharedBlockCaches{
jumpDests: vm.NewSyncJumpDestCache(),
keccak: &sync.Map{},
ecrecover: &sync.Map{},
}
}

// applyTo populates a vm.Config with the shared caches.
func (c *sharedBlockCaches) applyTo(cfg *vm.Config) {
cfg.SharedJumpDestCache = c.jumpDests
cfg.Keccak256Cache = c.keccak
cfg.EcrecoverCache = c.ecrecover
}

go func(start time.Time, throwaway *state.StateDB, block *types.Block) {
// Disable tracing for prefetcher executions.
// startPrefetchGoroutine launches the throwaway-statedb prefetcher in
// the background. It runs the block with tracing disabled to warm caches
// for the real processors.
func (bc *BlockChain) startPrefetchGoroutine(block *types.Block, throwaway *state.StateDB,
caches *sharedBlockCaches, followupInterrupt *atomic.Bool) {
go func(start time.Time) {
vmCfg := bc.cfg.VmConfig
vmCfg.Tracer = nil
caches.applyTo(&vmCfg)
bc.prefetcher.Prefetch(block, throwaway, vmCfg, false, followupInterrupt)

blockPrefetchExecuteTimer.Update(time.Since(start))
if followupInterrupt.Load() {
blockPrefetchInterruptMeter.Mark(1)
}
}(time.Now(), throwaway, block)
}(time.Now())
}

func (bc *BlockChain) ProcessBlock(block *types.Block, parent *types.Header, witness *stateless.Witness, followupInterrupt *atomic.Bool) (_ types.Receipts, _ []*types.Log, _ uint64, _ *state.StateDB, vtime time.Duration, blockEndErr error) {
// Process the block using processor and parallelProcessor at the same time, take the one which finishes first, cancel the other, and return the result
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

if followupInterrupt == nil {
followupInterrupt = &atomic.Bool{}
}
bc.fireBlockStart(block)
if bc.logger != nil && bc.logger.OnBlockEnd != nil {
defer func() { bc.logger.OnBlockEnd(blockEndErr) }()
}

throwaway, statedb, parallelStatedb, prefetch, process, err := bc.setupBlockReaders(parent.Root)
if err != nil {
return nil, nil, 0, nil, 0, err
}
defer reportReaderStats(prefetch, process)

// Shared caches for this block — used by both prefetcher and V2 workers.
sharedCaches := newSharedBlockCaches()
bc.startPrefetchGoroutine(block, throwaway, sharedCaches, followupInterrupt)
Comment thread
cffls marked this conversation as resolved.

type Result struct {
receipts types.Receipts
Expand All @@ -796,13 +846,6 @@
parallel bool
}

// Only disable Parallel Processor for witness producers
// TODO: work on enabling witness production for parallel processor
if witness != nil {
bc.parallelProcessor = nil
bc.enforceParallelProcessor = false
}

var resultChanLen int = 2
if bc.enforceParallelProcessor {
log.Debug("Processing block using Block STM only", "number", block.NumberU64())
Expand All @@ -811,20 +854,29 @@
resultChan := make(chan Result, resultChanLen)

processorCount := 0
execStart := time.Now()

if bc.parallelProcessor != nil {
processorCount++

go func() {
pstart := time.Now()
parallelStatedb.StartPrefetcher("chain", witness, nil)
res, err := bc.parallelProcessor.Process(block, parallelStatedb, bc.cfg.VmConfig, nil, ctx)
v2VmCfg := bc.cfg.VmConfig
sharedCaches.applyTo(&v2VmCfg)
res, err := bc.parallelProcessor.Process(block, parallelStatedb, v2VmCfg, nil, ctx)

Check failure on line 867 in core/blockchain.go

View check run for this annotation

Claude / Claude Code Review

V2 silently loses stateless witness data when ProcessBlock is called with a witness

V2 silently drops the stateless-witness pointer when ProcessBlock is called with a non-nil witness. blockchain.go:864 wires the witness via `parallelStatedb.StartPrefetcher("chain", witness, nil)`, but inside V2 `parallel_state_processor.go:1034-1035` immediately calls `finalDB.StopPrefetcher(); finalDB.StartPrefetcher("v2-settle", nil, nil)`, and `StateDB.StartPrefetcher` unconditionally executes `s.witness = witness` — overwriting the pointer to nil for the rest of execution. Every `s.witness
Comment thread
claude[bot] marked this conversation as resolved.
blockExecutionParallelTimer.UpdateSince(pstart)
if err == nil {
vstart := time.Now()
err = bc.validator.ValidateState(block, parallelStatedb, res, false)
vtime = time.Since(vstart)
}
// If context was cancelled (we lost the race), stop prefetcher
// before sending result. This prevents "layer stale" errors when
// the winner's commit advances the pathdb layer.
if ctx.Err() != nil {
parallelStatedb.StopPrefetcher()
}
if res == nil {
res = &ProcessResult{}
}
Expand All @@ -845,6 +897,9 @@
err = bc.validator.ValidateState(block, statedb, res, false)
vtime = time.Since(vstart)
}
if ctx.Err() != nil {
statedb.StopPrefetcher()
}
if res == nil {
res = &ProcessResult{}
}
Expand All @@ -854,8 +909,14 @@

result := <-resultChan

// If V2 returned an error (panic, ApplyMessage consensus error, etc.)
// and the serial processor is also running, fall back to the serial
// result BEFORE cancelling — cancelling first would interrupt the
// still-running serial processor at its next tx boundary and the
// fallback would receive context.Canceled instead of a usable
// recovery. The fallback IS the recovery; it must run to completion.
if result.parallel && result.err != nil {
log.Warn("Parallel state processor failed", "err", result.err)
log.Warn("Parallel state processor failed", "number", block.NumberU64(), "hash", block.Hash(), "err", result.err)
blockExecutionParallelErrorCounter.Inc(1)
// If the parallel processor failed, we will fallback to the serial processor if enabled
if processorCount == 2 {
Expand All @@ -865,14 +926,41 @@
}
}

// With the result we plan to keep in hand, cancel the shared context
// so the loser (if any) stops at its next tx boundary, and signal the
// throwaway prefetcher to stop. This must happen BEFORE ProcessBlock
// returns, because the caller will commit the block (advancing the
// pathdb layer), which would invalidate any trie references still
// held by the loser's prefetcher.
cancel()
followupInterrupt.Store(true)

result.counter.Inc(1)

// Make sure we are not leaking any prefetchers
// Report per-block mgasps for the winning processor.
// Value is scaled by 1000 (stored as µgasps) to preserve 3 decimal places,
// e.g. 210.357 mgasps → 210357. Divide by 1000 when reading.
// Exclude sprint-end blocks (with state sync tx) — their Finalize overhead
// (Heimdall state sync ~164ms) distorts the execution throughput metric.
hasStateSync := false
if txs := block.Transactions(); len(txs) > 0 {
hasStateSync = txs[len(txs)-1].Type() == types.StateSyncTxType
}
if elapsed := time.Since(execStart); elapsed > 0 && result.usedGas > 0 && !hasStateSync {
mgasps := int64(float64(result.usedGas) * 1e6 / float64(elapsed)) // µgasps (mgasps * 1000)
blockMgaspsMeter.Update(mgasps)
}

// Wait for the losing processor to finish and stop its prefetcher.
// Must be synchronous: the caller will commit the block (advancing the
// pathdb layer), which invalidates trie references held by the loser's
// prefetcher subfetchers. The context is already cancelled and both V1
// and V2 honour it at task-boundary level (V1 in its task loop; V2 in
// the executor's dispatcher and validation loop), so the loser stops
// promptly — typically within one tx execution.
if processorCount == 2 {
go func() {
second_result := <-resultChan
second_result.statedb.StopPrefetcher()
}()
second_result := <-resultChan
second_result.statedb.StopPrefetcher()
}

return result.receipts, result.logs, result.usedGas, result.statedb, vtime, result.err
Expand Down
2 changes: 1 addition & 1 deletion core/blockstm/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (t *testExecTask) Execute(mvh *MVHashMap, incarnation int) error {

sleep(op.duration)

t.readMap[k] = ReadDescriptor{k, readKind, Version{TxnIndex: result.depIdx, Incarnation: result.incarnation}}
t.readMap[k] = ReadDescriptor{Path: k, Kind: readKind, V: Version{TxnIndex: result.depIdx, Incarnation: result.incarnation}}
case writeType:
t.writeMap[k] = WriteDescriptor{k, version, op.val}
case otherType:
Expand Down
21 changes: 21 additions & 0 deletions core/blockstm/invariants_off.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
//go:build !invariants

package blockstm

// Invariants build tag — production builds use these zero-cost stubs.
// Build with `-tags invariants` to enable the runtime checks defined in
// invariants_on.go. Inlined by the compiler; no perf cost in prod.

// assertSettleOrder verifies the load-bearing inductive invariant of the
// V2 validation loop: when validateOne(idx) runs, every reexecDone[j] for
// j < idx-1 must be nil, because validateOne(j+1) finalised it on the
// previous iteration. A future "skip earlier validations" optimisation
// that violates this would silently break settle order and split state
// roots — keep the check on in CI.
func (x *v2ExecCtx) assertSettleOrder(reexecDone []chan struct{}, idx int) {}

Check failure on line 15 in core/blockstm/invariants_off.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Add a nested comment explaining why this function is empty or complete the implementation.

See more on https://sonarcloud.io/project/issues?id=0xPolygon_bor&issues=AZ32n4tEQTy0jLCvpuDp&open=AZ32n4tEQTy0jLCvpuDp&pullRequest=2210

// assertReexecVisitedExactlyOnce ensures runValidationLoop's drain loop
// observes every dispatched re-execution exactly once. Catches a future
// off-by-one in the drain that would either skip a settle or send twice
// (deadlocking on chSettle's bounded buffer).
func (x *v2ExecCtx) assertReexecVisitedExactlyOnce(reexecDone []chan struct{}) {}

Check failure on line 21 in core/blockstm/invariants_off.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Add a nested comment explaining why this function is empty or complete the implementation.

See more on https://sonarcloud.io/project/issues?id=0xPolygon_bor&issues=AZ32n4tEQTy0jLCvpuDq&open=AZ32n4tEQTy0jLCvpuDq&pullRequest=2210
Loading
Loading