Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
4 changes: 4 additions & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
# Auto detect text files and perform LF normalization
* text=auto
*.sol linguist-language=Solidity
core/blockstm/testdata/*.witness.gz filter=lfs diff=lfs merge=lfs -text
core/blockstm/testdata/*.block filter=lfs diff=lfs merge=lfs -text
core/blockstm/testdata/codes.tar.gz filter=lfs diff=lfs merge=lfs -text
core/blockstm/testdata/codes/*.bin filter=lfs diff=lfs merge=lfs -text
9 changes: 8 additions & 1 deletion core/block_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,18 @@ package core
import (
"errors"
"fmt"
"time"

"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/trie"
)

var intermediateRootTimer = metrics.NewRegisteredTimer("chain/intermediateroot", nil)

// BlockValidator is responsible for validating block headers, uncles and
// processed state.
//
Expand Down Expand Up @@ -168,7 +172,10 @@ func (v *BlockValidator) ValidateState(block *types.Block, statedb *state.StateD
}
// Validate the state root against the received state root and throw
// an error if they don't match.
if root := statedb.IntermediateRoot(v.config.IsEIP158(header.Number)); header.Root != root {
irStart := time.Now()
root := statedb.IntermediateRoot(v.config.IsEIP158(header.Number))
intermediateRootTimer.UpdateSince(irStart)
if header.Root != root {
return fmt.Errorf("invalid merkle root (remote: %x local: %x) dberr: %w", header.Root, root, statedb.Error())
}

Expand Down
253 changes: 176 additions & 77 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@
blockExecutionParallelErrorCounter = metrics.NewRegisteredCounter("chain/execution/parallel/error", nil)
blockExecutionParallelTimer = metrics.NewRegisteredTimer("chain/execution/parallel/timer", nil)
blockExecutionSerialTimer = metrics.NewRegisteredTimer("chain/execution/serial/timer", nil)
blockMgaspsMeter = metrics.NewRegisteredHistogram("chain/execution/mgasps", nil, metrics.NewUniformSample(10240))

statelessParallelImportTimer = metrics.NewRegisteredTimer("chain/imports/stateless/parallel", nil)
statelessSequentialImportTimer = metrics.NewRegisteredTimer("chain/imports/stateless/sequential", nil)
Expand Down Expand Up @@ -700,91 +701,151 @@
return nil, err
}

bc.parallelProcessor = NewParallelStateProcessor(bc.hc, bc)
bc.parallelProcessor = NewV2StateProcessor(bc.hc, bc, numprocs)
bc.parallelSpeculativeProcesses = numprocs
bc.enforceParallelProcessor = enforce

return bc, nil
}

func (bc *BlockChain) ProcessBlock(block *types.Block, parent *types.Header, witness *stateless.Witness, followupInterrupt *atomic.Bool) (_ types.Receipts, _ []*types.Log, _ uint64, _ *state.StateDB, vtime time.Duration, blockEndErr error) {
// Process the block using processor and parallelProcessor at the same time, take the one which finishes first, cancel the other, and return the result
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

if followupInterrupt == nil {
followupInterrupt = &atomic.Bool{}
}

if bc.logger != nil && bc.logger.OnBlockStart != nil {
td := bc.GetTd(block.ParentHash(), block.NumberU64()-1)
bc.logger.OnBlockStart(tracing.BlockEvent{
Block: block,
TD: td,
Finalized: bc.CurrentFinalBlock(),
Safe: bc.CurrentSafeBlock(),
})
}

if bc.logger != nil && bc.logger.OnBlockEnd != nil {
defer func() {
bc.logger.OnBlockEnd(blockEndErr)
}()
// fireBlockStart emits the OnBlockStart tracing event when a tracer is set.
func (bc *BlockChain) fireBlockStart(block *types.Block) {
if bc.logger == nil || bc.logger.OnBlockStart == nil {
return
}
td := bc.GetTd(block.ParentHash(), block.NumberU64()-1)
bc.logger.OnBlockStart(tracing.BlockEvent{
Block: block,
TD: td,
Finalized: bc.CurrentFinalBlock(),
Safe: bc.CurrentSafeBlock(),
})
}

parentRoot := parent.Root
prefetch, process, err := bc.statedb.ReadersWithCacheStats(parentRoot)
// setupBlockReaders builds the three StateDBs needed for parallel block
// processing: throwaway (for prefetcher), statedb (for serial processor),
// and parallelStatedb (for V2). The V2 statedb has concurrent reads
// enabled before the prefetcher runs so the underlying trieReader uses
// muSubTries throughout — switching mid-flight would race.
func (bc *BlockChain) setupBlockReaders(parentRoot common.Hash) (
throwaway, statedb, parallelStatedb *state.StateDB,
prefetch, process, parallel state.ReaderWithStats, err error,
) {
prefetch, process, parallel, err = bc.statedb.ReadersWithCacheStatsTriple(parentRoot)
if err != nil {
return nil, nil, 0, nil, 0, err
return nil, nil, nil, nil, nil, nil, err
}
throwaway, err := state.NewWithReader(parentRoot, bc.statedb, prefetch)
if err != nil {
return nil, nil, 0, nil, 0, err
if throwaway, err = state.NewWithReader(parentRoot, bc.statedb, prefetch); err != nil {
return nil, nil, nil, nil, nil, nil, err
}
statedb, err := state.NewWithReader(parentRoot, bc.statedb, process)
if err != nil {
return nil, nil, 0, nil, 0, err
if statedb, err = state.NewWithReader(parentRoot, bc.statedb, process); err != nil {
return nil, nil, nil, nil, nil, nil, err
}
parallelStatedb, err := state.NewWithReader(parentRoot, bc.statedb, process)
if err != nil {
return nil, nil, 0, nil, 0, err
if parallelStatedb, err = state.NewWithReader(parentRoot, bc.statedb, parallel); err != nil {
return nil, nil, nil, nil, nil, nil, err
}
parallelStatedb.EnableConcurrentReads()
return throwaway, statedb, parallelStatedb, prefetch, process, parallel, nil
}

// Upload the statistics of reader at the end
defer func() {
stats := prefetch.GetStats()
accountCacheHitPrefetchMeter.Mark(stats.AccountHit)
accountCacheMissPrefetchMeter.Mark(stats.AccountMiss)
storageCacheHitPrefetchMeter.Mark(stats.StorageHit)
storageCacheMissPrefetchMeter.Mark(stats.StorageMiss)
stats = process.GetStats()
accountCacheHitMeter.Mark(stats.AccountHit)
accountCacheMissMeter.Mark(stats.AccountMiss)
storageCacheHitMeter.Mark(stats.StorageHit)
storageCacheMissMeter.Mark(stats.StorageMiss)

// Report additional prefetch attribution metrics
prefetchStats := prefetch.GetPrefetchStats()
accountInsertPrefetchMeter.Mark(prefetchStats.AccountInsert)
storageInsertPrefetchMeter.Mark(prefetchStats.StorageInsert)

processStats := process.GetPrefetchStats()
accountHitFromPrefetchMeter.Mark(processStats.AccountHitFromPrefetch)
storageHitFromPrefetchMeter.Mark(processStats.StorageHitFromPrefetch)
accountHitFromPrefetchUniqueMeter.Mark(processStats.AccountHitFromPrefetchUnique)
}()
// reportReaderStats marks per-block cache hit/miss meters from prefetch,
// process, and parallel readers. Intended to be called via defer at the
// end of ProcessBlock.
//
// process and parallel both use the roleProcess label internally and
// share the same underlying cache, but ReadersWithCacheStatsTriple
// returns independent ReaderWithStats wrappers, so V2's reads accumulate
// in `parallel`'s atomic counters separately from V1's `process` counters.
// We merge them into the same meter set here so the cache-hit-rate
// dashboards reflect the work the winning processor (typically V2) did,
// rather than only the losing serial path's interrupted reads.
func reportReaderStats(prefetch, process, parallel state.ReaderWithStats) {
stats := prefetch.GetStats()
accountCacheHitPrefetchMeter.Mark(stats.AccountHit)
accountCacheMissPrefetchMeter.Mark(stats.AccountMiss)
storageCacheHitPrefetchMeter.Mark(stats.StorageHit)
storageCacheMissPrefetchMeter.Mark(stats.StorageMiss)

procStats := process.GetStats()
parStats := parallel.GetStats()
accountCacheHitMeter.Mark(procStats.AccountHit + parStats.AccountHit)
accountCacheMissMeter.Mark(procStats.AccountMiss + parStats.AccountMiss)
storageCacheHitMeter.Mark(procStats.StorageHit + parStats.StorageHit)
storageCacheMissMeter.Mark(procStats.StorageMiss + parStats.StorageMiss)

prefetchStats := prefetch.GetPrefetchStats()
accountInsertPrefetchMeter.Mark(prefetchStats.AccountInsert)
storageInsertPrefetchMeter.Mark(prefetchStats.StorageInsert)

procPF := process.GetPrefetchStats()
parPF := parallel.GetPrefetchStats()
accountHitFromPrefetchMeter.Mark(procPF.AccountHitFromPrefetch + parPF.AccountHitFromPrefetch)
storageHitFromPrefetchMeter.Mark(procPF.StorageHitFromPrefetch + parPF.StorageHitFromPrefetch)
accountHitFromPrefetchUniqueMeter.Mark(procPF.AccountHitFromPrefetchUnique + parPF.AccountHitFromPrefetchUnique)
}

// sharedBlockCaches holds VM-level caches that are shared between the
// prefetcher goroutine and the V2 BlockSTM workers for a single block.
type sharedBlockCaches struct {
jumpDests vm.JumpDestCache
keccak *sync.Map
ecrecover *sync.Map
}

func newSharedBlockCaches() *sharedBlockCaches {
return &sharedBlockCaches{
jumpDests: vm.NewSyncJumpDestCache(),
keccak: &sync.Map{},
ecrecover: &sync.Map{},
}
}

// applyTo populates a vm.Config with the shared caches.
func (c *sharedBlockCaches) applyTo(cfg *vm.Config) {
cfg.SharedJumpDestCache = c.jumpDests
cfg.Keccak256Cache = c.keccak
cfg.EcrecoverCache = c.ecrecover
}

go func(start time.Time, throwaway *state.StateDB, block *types.Block) {
// Disable tracing for prefetcher executions.
// startPrefetchGoroutine launches the throwaway-statedb prefetcher in
// the background. It runs the block with tracing disabled to warm caches
// for the real processors.
func (bc *BlockChain) startPrefetchGoroutine(block *types.Block, throwaway *state.StateDB,
caches *sharedBlockCaches, followupInterrupt *atomic.Bool) {
go func(start time.Time) {
vmCfg := bc.cfg.VmConfig
vmCfg.Tracer = nil
caches.applyTo(&vmCfg)
bc.prefetcher.Prefetch(block, throwaway, vmCfg, false, followupInterrupt)

blockPrefetchExecuteTimer.Update(time.Since(start))
if followupInterrupt.Load() {
blockPrefetchInterruptMeter.Mark(1)
}
}(time.Now(), throwaway, block)
}(time.Now())
}

func (bc *BlockChain) ProcessBlock(block *types.Block, parent *types.Header, witness *stateless.Witness, followupInterrupt *atomic.Bool) (_ types.Receipts, _ []*types.Log, _ uint64, _ *state.StateDB, vtime time.Duration, blockEndErr error) {
// Process the block using processor and parallelProcessor at the same time, take the one which finishes first, cancel the other, and return the result
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

if followupInterrupt == nil {
followupInterrupt = &atomic.Bool{}
}
bc.fireBlockStart(block)
if bc.logger != nil && bc.logger.OnBlockEnd != nil {
defer func() { bc.logger.OnBlockEnd(blockEndErr) }()
}

throwaway, statedb, parallelStatedb, prefetch, process, parallel, err := bc.setupBlockReaders(parent.Root)
if err != nil {
return nil, nil, 0, nil, 0, err
}
defer reportReaderStats(prefetch, process, parallel)

// Shared caches for this block — used by both prefetcher and V2 workers.
sharedCaches := newSharedBlockCaches()
bc.startPrefetchGoroutine(block, throwaway, sharedCaches, followupInterrupt)

Check warning on line 848 in core/blockchain.go

View check run for this annotation

Claude / Claude Code Review

V2 jumpdest cache override discards shared prefetcher cache

🟡 V2 workers discard the prefetcher-warmed shared jumpdest cache. `sharedCaches.applyTo(&v2VmCfg)` (core/blockchain.go:877) wires `SharedJumpDestCache` correctly, and `vm.NewEVM` honours it (evm.go:207-209), but `v2Env.Execute` (core/parallel_state_processor.go:650-651) immediately calls `evm.SetJumpDestCache(e.jumpDests)` with the per-v2Env cache allocated at line 889 — overriding the shared one. `Keccak256Cache` and `EcrecoverCache` are read directly from `vm.Config` and ARE shared as the comm
Comment thread
cffls marked this conversation as resolved.

type Result struct {
receipts types.Receipts
Expand All @@ -796,13 +857,6 @@
parallel bool
}

// Only disable Parallel Processor for witness producers
// TODO: work on enabling witness production for parallel processor
if witness != nil {
bc.parallelProcessor = nil
bc.enforceParallelProcessor = false
}

var resultChanLen int = 2
if bc.enforceParallelProcessor {
log.Debug("Processing block using Block STM only", "number", block.NumberU64())
Expand All @@ -811,20 +865,29 @@
resultChan := make(chan Result, resultChanLen)

processorCount := 0
execStart := time.Now()

if bc.parallelProcessor != nil {
processorCount++

go func() {
pstart := time.Now()
parallelStatedb.StartPrefetcher("chain", witness, nil)
res, err := bc.parallelProcessor.Process(block, parallelStatedb, bc.cfg.VmConfig, nil, ctx)
v2VmCfg := bc.cfg.VmConfig
sharedCaches.applyTo(&v2VmCfg)
res, err := bc.parallelProcessor.Process(block, parallelStatedb, v2VmCfg, nil, ctx)
Comment thread
claude[bot] marked this conversation as resolved.
blockExecutionParallelTimer.UpdateSince(pstart)
if err == nil {
vstart := time.Now()
err = bc.validator.ValidateState(block, parallelStatedb, res, false)
vtime = time.Since(vstart)
}
// If context was cancelled (we lost the race), stop prefetcher
// before sending result. This prevents "layer stale" errors when
// the winner's commit advances the pathdb layer.
if ctx.Err() != nil {
parallelStatedb.StopPrefetcher()
}
if res == nil {
res = &ProcessResult{}
}
Expand All @@ -845,6 +908,9 @@
err = bc.validator.ValidateState(block, statedb, res, false)
vtime = time.Since(vstart)
}
if ctx.Err() != nil {
statedb.StopPrefetcher()
}
if res == nil {
res = &ProcessResult{}
}
Expand All @@ -854,8 +920,14 @@

result := <-resultChan

// If V2 returned an error (panic, ApplyMessage consensus error, etc.)
// and the serial processor is also running, fall back to the serial
// result BEFORE cancelling — cancelling first would interrupt the
// still-running serial processor at its next tx boundary and the
// fallback would receive context.Canceled instead of a usable
// recovery. The fallback IS the recovery; it must run to completion.
if result.parallel && result.err != nil {
log.Warn("Parallel state processor failed", "err", result.err)
log.Warn("Parallel state processor failed", "number", block.NumberU64(), "hash", block.Hash(), "err", result.err)
blockExecutionParallelErrorCounter.Inc(1)
// If the parallel processor failed, we will fallback to the serial processor if enabled
if processorCount == 2 {
Expand All @@ -865,14 +937,41 @@
}
}

// With the result we plan to keep in hand, cancel the shared context
// so the loser (if any) stops at its next tx boundary, and signal the
// throwaway prefetcher to stop. This must happen BEFORE ProcessBlock
// returns, because the caller will commit the block (advancing the
// pathdb layer), which would invalidate any trie references still
// held by the loser's prefetcher.
cancel()
followupInterrupt.Store(true)

result.counter.Inc(1)

// Make sure we are not leaking any prefetchers
// Report per-block mgasps for the winning processor.
// Value is scaled by 1000 (stored as µgasps) to preserve 3 decimal places,
// e.g. 210.357 mgasps → 210357. Divide by 1000 when reading.
// Exclude sprint-end blocks (with state sync tx) — their Finalize overhead
// (Heimdall state sync ~164ms) distorts the execution throughput metric.
hasStateSync := false
if txs := block.Transactions(); len(txs) > 0 {
hasStateSync = txs[len(txs)-1].Type() == types.StateSyncTxType
}
if elapsed := time.Since(execStart); elapsed > 0 && result.usedGas > 0 && !hasStateSync {
mgasps := int64(float64(result.usedGas) * 1e6 / float64(elapsed)) // µgasps (mgasps * 1000)
blockMgaspsMeter.Update(mgasps)
}

// Wait for the losing processor to finish and stop its prefetcher.
// Must be synchronous: the caller will commit the block (advancing the
// pathdb layer), which invalidates trie references held by the loser's
// prefetcher subfetchers. The context is already cancelled and both V1
// and V2 honour it at task-boundary level (V1 in its task loop; V2 in
// the executor's dispatcher and validation loop), so the loser stops
// promptly — typically within one tx execution.
if processorCount == 2 {
go func() {
second_result := <-resultChan
second_result.statedb.StopPrefetcher()
}()
second_result := <-resultChan
second_result.statedb.StopPrefetcher()
}

return result.receipts, result.logs, result.usedGas, result.statedb, vtime, result.err
Expand Down
2 changes: 1 addition & 1 deletion core/blockstm/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (t *testExecTask) Execute(mvh *MVHashMap, incarnation int) error {

sleep(op.duration)

t.readMap[k] = ReadDescriptor{k, readKind, Version{TxnIndex: result.depIdx, Incarnation: result.incarnation}}
t.readMap[k] = ReadDescriptor{Path: k, Kind: readKind, V: Version{TxnIndex: result.depIdx, Incarnation: result.incarnation}}
case writeType:
t.writeMap[k] = WriteDescriptor{k, version, op.val}
case otherType:
Expand Down
Loading
Loading