Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
4 changes: 4 additions & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
# Auto detect text files and perform LF normalization
* text=auto
*.sol linguist-language=Solidity
core/blockstm/testdata/*.witness.gz filter=lfs diff=lfs merge=lfs -text
core/blockstm/testdata/*.block filter=lfs diff=lfs merge=lfs -text
core/blockstm/testdata/codes.tar.gz filter=lfs diff=lfs merge=lfs -text
core/blockstm/testdata/codes/*.bin filter=lfs diff=lfs merge=lfs -text
9 changes: 8 additions & 1 deletion core/block_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,18 @@ package core
import (
"errors"
"fmt"
"time"

"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/trie"
)

var intermediateRootTimer = metrics.NewRegisteredTimer("chain/intermediateroot", nil)

// BlockValidator is responsible for validating block headers, uncles and
// processed state.
//
Expand Down Expand Up @@ -168,7 +172,10 @@ func (v *BlockValidator) ValidateState(block *types.Block, statedb *state.StateD
}
// Validate the state root against the received state root and throw
// an error if they don't match.
if root := statedb.IntermediateRoot(v.config.IsEIP158(header.Number)); header.Root != root {
irStart := time.Now()
root := statedb.IntermediateRoot(v.config.IsEIP158(header.Number))
intermediateRootTimer.UpdateSince(irStart)
if header.Root != root {
return fmt.Errorf("invalid merkle root (remote: %x local: %x) dberr: %w", header.Root, root, statedb.Error())
}

Expand Down
239 changes: 162 additions & 77 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@
blockExecutionParallelErrorCounter = metrics.NewRegisteredCounter("chain/execution/parallel/error", nil)
blockExecutionParallelTimer = metrics.NewRegisteredTimer("chain/execution/parallel/timer", nil)
blockExecutionSerialTimer = metrics.NewRegisteredTimer("chain/execution/serial/timer", nil)
blockMgaspsMeter = metrics.NewRegisteredHistogram("chain/execution/mgasps", nil, metrics.NewUniformSample(10240))

statelessParallelImportTimer = metrics.NewRegisteredTimer("chain/imports/stateless/parallel", nil)
statelessSequentialImportTimer = metrics.NewRegisteredTimer("chain/imports/stateless/sequential", nil)
Expand Down Expand Up @@ -413,6 +414,10 @@
parallelProcessor Processor // Parallel block transaction processor interface
parallelSpeculativeProcesses int // Number of parallel speculative processes
enforceParallelProcessor bool
AblationSkipFlush bool // Ablation: skip FlushMVWriteSet
AblationSkipSettle bool // Ablation: skip Settle
AblationSkipFinalise bool // Ablation: skip worker Finalise
AblationSkipMVRead bool // Ablation: flush normally but MVRead returns None

Check warning on line 420 in core/blockchain.go

View check run for this annotation

Claude / Claude Code Review

BlockChain.AblationSkip* fields declared but never read

The four exported `AblationSkip*` fields added to `BlockChain` at core/blockchain.go:417-420 are declared but never read or written anywhere in the repo. As exported fields on a public type they enter the API surface and become a breaking change to remove later, while toggling them today does nothing — the wiring to MVHashMap.Skip* (which is consumed downstream) was lost. Either delete the four fields or thread them through to the V2 processor's MVHashMap on each ProcessBlock call.
Comment thread
claude[bot] marked this conversation as resolved.
Outdated
parallelStatelessImportEnabled atomic.Bool // Whether parallel stateless import is enabled via config
parallelStatelessImportWorkers int // Number of workers to use for parallel stateless import
forker *ForkChoice
Expand Down Expand Up @@ -700,91 +705,140 @@
return nil, err
}

bc.parallelProcessor = NewParallelStateProcessor(bc.hc, bc)
bc.parallelProcessor = NewV2StateProcessor(bc.hc, bc, numprocs)
bc.parallelSpeculativeProcesses = numprocs
bc.enforceParallelProcessor = enforce

return bc, nil
}

func (bc *BlockChain) ProcessBlock(block *types.Block, parent *types.Header, witness *stateless.Witness, followupInterrupt *atomic.Bool) (_ types.Receipts, _ []*types.Log, _ uint64, _ *state.StateDB, vtime time.Duration, blockEndErr error) {
// Process the block using processor and parallelProcessor at the same time, take the one which finishes first, cancel the other, and return the result
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

if followupInterrupt == nil {
followupInterrupt = &atomic.Bool{}
}

if bc.logger != nil && bc.logger.OnBlockStart != nil {
td := bc.GetTd(block.ParentHash(), block.NumberU64()-1)
bc.logger.OnBlockStart(tracing.BlockEvent{
Block: block,
TD: td,
Finalized: bc.CurrentFinalBlock(),
Safe: bc.CurrentSafeBlock(),
})
}

if bc.logger != nil && bc.logger.OnBlockEnd != nil {
defer func() {
bc.logger.OnBlockEnd(blockEndErr)
}()
// fireBlockStart emits the OnBlockStart tracing event when a tracer is set.
func (bc *BlockChain) fireBlockStart(block *types.Block) {
if bc.logger == nil || bc.logger.OnBlockStart == nil {
return
}
td := bc.GetTd(block.ParentHash(), block.NumberU64()-1)
bc.logger.OnBlockStart(tracing.BlockEvent{
Block: block,
TD: td,
Finalized: bc.CurrentFinalBlock(),
Safe: bc.CurrentSafeBlock(),
})
}

parentRoot := parent.Root
prefetch, process, err := bc.statedb.ReadersWithCacheStats(parentRoot)
// setupBlockReaders builds the three StateDBs needed for parallel block
// processing: throwaway (for prefetcher), statedb (for serial processor),
// and parallelStatedb (for V2). The V2 statedb has concurrent reads
// enabled before the prefetcher runs so the underlying trieReader uses
// muSubTries throughout — switching mid-flight would race.
func (bc *BlockChain) setupBlockReaders(parentRoot common.Hash) (
throwaway, statedb, parallelStatedb *state.StateDB,
prefetch, process state.ReaderWithStats, err error,
) {
prefetch, process, parallel, err := bc.statedb.ReadersWithCacheStatsTriple(parentRoot)
if err != nil {
return nil, nil, 0, nil, 0, err
return nil, nil, nil, nil, nil, err
}
throwaway, err := state.NewWithReader(parentRoot, bc.statedb, prefetch)
if err != nil {
return nil, nil, 0, nil, 0, err
if throwaway, err = state.NewWithReader(parentRoot, bc.statedb, prefetch); err != nil {
return nil, nil, nil, nil, nil, err
}
statedb, err := state.NewWithReader(parentRoot, bc.statedb, process)
if err != nil {
return nil, nil, 0, nil, 0, err
if statedb, err = state.NewWithReader(parentRoot, bc.statedb, process); err != nil {
return nil, nil, nil, nil, nil, err
}
parallelStatedb, err := state.NewWithReader(parentRoot, bc.statedb, process)
if err != nil {
return nil, nil, 0, nil, 0, err
if parallelStatedb, err = state.NewWithReader(parentRoot, bc.statedb, parallel); err != nil {
return nil, nil, nil, nil, nil, err
}
parallelStatedb.EnableConcurrentReads()
return throwaway, statedb, parallelStatedb, prefetch, process, nil
}

// Upload the statistics of reader at the end
defer func() {
stats := prefetch.GetStats()
accountCacheHitPrefetchMeter.Mark(stats.AccountHit)
accountCacheMissPrefetchMeter.Mark(stats.AccountMiss)
storageCacheHitPrefetchMeter.Mark(stats.StorageHit)
storageCacheMissPrefetchMeter.Mark(stats.StorageMiss)
stats = process.GetStats()
accountCacheHitMeter.Mark(stats.AccountHit)
accountCacheMissMeter.Mark(stats.AccountMiss)
storageCacheHitMeter.Mark(stats.StorageHit)
storageCacheMissMeter.Mark(stats.StorageMiss)

// Report additional prefetch attribution metrics
prefetchStats := prefetch.GetPrefetchStats()
accountInsertPrefetchMeter.Mark(prefetchStats.AccountInsert)
storageInsertPrefetchMeter.Mark(prefetchStats.StorageInsert)

processStats := process.GetPrefetchStats()
accountHitFromPrefetchMeter.Mark(processStats.AccountHitFromPrefetch)
storageHitFromPrefetchMeter.Mark(processStats.StorageHitFromPrefetch)
accountHitFromPrefetchUniqueMeter.Mark(processStats.AccountHitFromPrefetchUnique)
}()
// reportReaderStats marks per-block cache hit/miss meters from prefetch and
// process readers. Intended to be called via defer at the end of ProcessBlock.
func reportReaderStats(prefetch, process state.ReaderWithStats) {
stats := prefetch.GetStats()
accountCacheHitPrefetchMeter.Mark(stats.AccountHit)
accountCacheMissPrefetchMeter.Mark(stats.AccountMiss)
storageCacheHitPrefetchMeter.Mark(stats.StorageHit)
storageCacheMissPrefetchMeter.Mark(stats.StorageMiss)

stats = process.GetStats()
accountCacheHitMeter.Mark(stats.AccountHit)
accountCacheMissMeter.Mark(stats.AccountMiss)
storageCacheHitMeter.Mark(stats.StorageHit)
storageCacheMissMeter.Mark(stats.StorageMiss)

prefetchStats := prefetch.GetPrefetchStats()
accountInsertPrefetchMeter.Mark(prefetchStats.AccountInsert)
storageInsertPrefetchMeter.Mark(prefetchStats.StorageInsert)

processStats := process.GetPrefetchStats()
accountHitFromPrefetchMeter.Mark(processStats.AccountHitFromPrefetch)
storageHitFromPrefetchMeter.Mark(processStats.StorageHitFromPrefetch)
accountHitFromPrefetchUniqueMeter.Mark(processStats.AccountHitFromPrefetchUnique)
}
Comment thread
claude[bot] marked this conversation as resolved.
Outdated

// sharedBlockCaches holds VM-level caches that are shared between the
// prefetcher goroutine and the V2 BlockSTM workers for a single block.
type sharedBlockCaches struct {
jumpDests vm.JumpDestCache
keccak *sync.Map
ecrecover *sync.Map
}

func newSharedBlockCaches() *sharedBlockCaches {
return &sharedBlockCaches{
jumpDests: vm.NewSyncJumpDestCache(),
keccak: &sync.Map{},
ecrecover: &sync.Map{},
}
}

// applyTo populates a vm.Config with the shared caches.
func (c *sharedBlockCaches) applyTo(cfg *vm.Config) {
cfg.SharedJumpDestCache = c.jumpDests
cfg.Keccak256Cache = c.keccak
cfg.EcrecoverCache = c.ecrecover
}

go func(start time.Time, throwaway *state.StateDB, block *types.Block) {
// Disable tracing for prefetcher executions.
// startPrefetchGoroutine launches the throwaway-statedb prefetcher in
// the background. It runs the block with tracing disabled to warm caches
// for the real processors.
func (bc *BlockChain) startPrefetchGoroutine(block *types.Block, throwaway *state.StateDB,
caches *sharedBlockCaches, followupInterrupt *atomic.Bool) {
go func(start time.Time) {
vmCfg := bc.cfg.VmConfig
vmCfg.Tracer = nil
caches.applyTo(&vmCfg)
bc.prefetcher.Prefetch(block, throwaway, vmCfg, false, followupInterrupt)

blockPrefetchExecuteTimer.Update(time.Since(start))
if followupInterrupt.Load() {
blockPrefetchInterruptMeter.Mark(1)
}
}(time.Now(), throwaway, block)
}(time.Now())
}

func (bc *BlockChain) ProcessBlock(block *types.Block, parent *types.Header, witness *stateless.Witness, followupInterrupt *atomic.Bool) (_ types.Receipts, _ []*types.Log, _ uint64, _ *state.StateDB, vtime time.Duration, blockEndErr error) {
// Process the block using processor and parallelProcessor at the same time, take the one which finishes first, cancel the other, and return the result
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

if followupInterrupt == nil {
followupInterrupt = &atomic.Bool{}
}
bc.fireBlockStart(block)
if bc.logger != nil && bc.logger.OnBlockEnd != nil {
defer func() { bc.logger.OnBlockEnd(blockEndErr) }()
}

throwaway, statedb, parallelStatedb, prefetch, process, err := bc.setupBlockReaders(parent.Root)
if err != nil {
return nil, nil, 0, nil, 0, err
}
defer reportReaderStats(prefetch, process)

// Shared caches for this block — used by both prefetcher and V2 workers.
sharedCaches := newSharedBlockCaches()
bc.startPrefetchGoroutine(block, throwaway, sharedCaches, followupInterrupt)
Comment thread
cffls marked this conversation as resolved.

type Result struct {
receipts types.Receipts
Expand All @@ -796,13 +850,6 @@
parallel bool
}

// Only disable Parallel Processor for witness producers
// TODO: work on enabling witness production for parallel processor
if witness != nil {
bc.parallelProcessor = nil
bc.enforceParallelProcessor = false
}

var resultChanLen int = 2
if bc.enforceParallelProcessor {
log.Debug("Processing block using Block STM only", "number", block.NumberU64())
Expand All @@ -811,20 +858,29 @@
resultChan := make(chan Result, resultChanLen)

processorCount := 0
execStart := time.Now()

if bc.parallelProcessor != nil {
processorCount++

go func() {
pstart := time.Now()
parallelStatedb.StartPrefetcher("chain", witness, nil)
res, err := bc.parallelProcessor.Process(block, parallelStatedb, bc.cfg.VmConfig, nil, ctx)
v2VmCfg := bc.cfg.VmConfig
sharedCaches.applyTo(&v2VmCfg)
res, err := bc.parallelProcessor.Process(block, parallelStatedb, v2VmCfg, nil, ctx)
Comment thread
claude[bot] marked this conversation as resolved.
blockExecutionParallelTimer.UpdateSince(pstart)
if err == nil {
vstart := time.Now()
err = bc.validator.ValidateState(block, parallelStatedb, res, false)
vtime = time.Since(vstart)
}
// If context was cancelled (we lost the race), stop prefetcher
// before sending result. This prevents "layer stale" errors when
// the winner's commit advances the pathdb layer.
if ctx.Err() != nil {
parallelStatedb.StopPrefetcher()
}
if res == nil {
res = &ProcessResult{}
}
Expand All @@ -845,6 +901,9 @@
err = bc.validator.ValidateState(block, statedb, res, false)
vtime = time.Since(vstart)
}
if ctx.Err() != nil {
statedb.StopPrefetcher()
}
if res == nil {
res = &ProcessResult{}
}
Expand All @@ -854,11 +913,19 @@

result := <-resultChan

// Cancel context immediately so the losing processor stops at the next
// tx boundary, and signal the throwaway prefetcher to stop. This must
// happen BEFORE ProcessBlock returns, because the caller will commit
// the block (advancing the pathdb layer), which would invalidate any
// trie references still held by the loser's prefetcher.
cancel()
followupInterrupt.Store(true)

if result.parallel && result.err != nil {
log.Warn("Parallel state processor failed", "err", result.err)
log.Warn("Parallel state processor failed", "number", block.NumberU64(), "hash", block.Hash(), "err", result.err)
blockExecutionParallelErrorCounter.Inc(1)
// If the parallel processor failed, we will fallback to the serial processor if enabled
if processorCount == 2 {

Check failure on line 928 in core/blockchain.go

View check run for this annotation

Claude / Claude Code Review

V2-failure fallback recovery cancelled before serial completes

**V2-failure fallback recovery is broken.** When the parallel processor finishes first with an error (e.g., panic, ApplyMessage consensus error), `cancel()` is called before the `if result.parallel && result.err != nil` block, which interrupts the still-running serial processor at its next tx boundary. The fallback `result = <-resultChan` then receives a `context.Canceled` serial result instead of the legitimate recovery — the very recovery the PR description advertises ("falls back to serial on
Comment thread
claude[bot] marked this conversation as resolved.
Outdated
result = <-resultChan
result.statedb.StopPrefetcher()
processorCount--
Expand All @@ -867,12 +934,30 @@

result.counter.Inc(1)

// Make sure we are not leaking any prefetchers
// Report per-block mgasps for the winning processor.
// Value is scaled by 1000 (stored as µgasps) to preserve 3 decimal places,
// e.g. 210.357 mgasps → 210357. Divide by 1000 when reading.
// Exclude sprint-end blocks (with state sync tx) — their Finalize overhead
// (Heimdall state sync ~164ms) distorts the execution throughput metric.
hasStateSync := false
if txs := block.Transactions(); len(txs) > 0 {
hasStateSync = txs[len(txs)-1].Type() == types.StateSyncTxType
}
if elapsed := time.Since(execStart); elapsed > 0 && result.usedGas > 0 && !hasStateSync {
mgasps := int64(float64(result.usedGas) * 1e6 / float64(elapsed)) // µgasps (mgasps * 1000)
blockMgaspsMeter.Update(mgasps)
}

// Wait for the losing processor to finish and stop its prefetcher.
// Must be synchronous: the caller will commit the block (advancing the
// pathdb layer), which invalidates trie references held by the loser's
// prefetcher subfetchers. The context is already cancelled and both V1
// and V2 honour it at task-boundary level (V1 in its task loop; V2 in
// the executor's dispatcher and validation loop), so the loser stops
// promptly — typically within one tx execution.
if processorCount == 2 {
go func() {
second_result := <-resultChan
second_result.statedb.StopPrefetcher()
}()
second_result := <-resultChan
second_result.statedb.StopPrefetcher()
}

return result.receipts, result.logs, result.usedGas, result.statedb, vtime, result.err
Expand Down
2 changes: 1 addition & 1 deletion core/blockstm/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (t *testExecTask) Execute(mvh *MVHashMap, incarnation int) error {

sleep(op.duration)

t.readMap[k] = ReadDescriptor{k, readKind, Version{TxnIndex: result.depIdx, Incarnation: result.incarnation}}
t.readMap[k] = ReadDescriptor{Path: k, Kind: readKind, V: Version{TxnIndex: result.depIdx, Incarnation: result.incarnation}}
case writeType:
t.writeMap[k] = WriteDescriptor{k, version, op.val}
case otherType:
Expand Down
Loading
Loading