-
Notifications
You must be signed in to change notification settings - Fork 591
core, blockstm, state: add BlockSTM v2 parallel transaction execution #2210
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Changes from all commits
268e976
a7c0b00
7e99f1f
df8a4bb
4c688e4
e8e367a
0c26b08
0ce45f0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,3 +1,7 @@ | ||
| # Auto detect text files and perform LF normalization | ||
| * text=auto | ||
| *.sol linguist-language=Solidity | ||
| core/blockstm/testdata/*.witness.gz filter=lfs diff=lfs merge=lfs -text | ||
| core/blockstm/testdata/*.block filter=lfs diff=lfs merge=lfs -text | ||
| core/blockstm/testdata/codes.tar.gz filter=lfs diff=lfs merge=lfs -text | ||
| core/blockstm/testdata/codes/*.bin filter=lfs diff=lfs merge=lfs -text |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -119,6 +119,7 @@ | |
| blockExecutionParallelErrorCounter = metrics.NewRegisteredCounter("chain/execution/parallel/error", nil) | ||
| blockExecutionParallelTimer = metrics.NewRegisteredTimer("chain/execution/parallel/timer", nil) | ||
| blockExecutionSerialTimer = metrics.NewRegisteredTimer("chain/execution/serial/timer", nil) | ||
| blockMgaspsMeter = metrics.NewRegisteredHistogram("chain/execution/mgasps", nil, metrics.NewUniformSample(10240)) | ||
|
|
||
| statelessParallelImportTimer = metrics.NewRegisteredTimer("chain/imports/stateless/parallel", nil) | ||
| statelessSequentialImportTimer = metrics.NewRegisteredTimer("chain/imports/stateless/sequential", nil) | ||
|
|
@@ -700,91 +701,151 @@ | |
| return nil, err | ||
| } | ||
|
|
||
| bc.parallelProcessor = NewParallelStateProcessor(bc.hc, bc) | ||
| bc.parallelProcessor = NewV2StateProcessor(bc.hc, bc, numprocs) | ||
| bc.parallelSpeculativeProcesses = numprocs | ||
| bc.enforceParallelProcessor = enforce | ||
|
|
||
| return bc, nil | ||
| } | ||
|
|
||
| func (bc *BlockChain) ProcessBlock(block *types.Block, parent *types.Header, witness *stateless.Witness, followupInterrupt *atomic.Bool) (_ types.Receipts, _ []*types.Log, _ uint64, _ *state.StateDB, vtime time.Duration, blockEndErr error) { | ||
| // Process the block using processor and parallelProcessor at the same time, take the one which finishes first, cancel the other, and return the result | ||
| ctx, cancel := context.WithCancel(context.Background()) | ||
| defer cancel() | ||
|
|
||
| if followupInterrupt == nil { | ||
| followupInterrupt = &atomic.Bool{} | ||
| } | ||
|
|
||
| if bc.logger != nil && bc.logger.OnBlockStart != nil { | ||
| td := bc.GetTd(block.ParentHash(), block.NumberU64()-1) | ||
| bc.logger.OnBlockStart(tracing.BlockEvent{ | ||
| Block: block, | ||
| TD: td, | ||
| Finalized: bc.CurrentFinalBlock(), | ||
| Safe: bc.CurrentSafeBlock(), | ||
| }) | ||
| } | ||
|
|
||
| if bc.logger != nil && bc.logger.OnBlockEnd != nil { | ||
| defer func() { | ||
| bc.logger.OnBlockEnd(blockEndErr) | ||
| }() | ||
| // fireBlockStart emits the OnBlockStart tracing event when a tracer is set. | ||
| func (bc *BlockChain) fireBlockStart(block *types.Block) { | ||
| if bc.logger == nil || bc.logger.OnBlockStart == nil { | ||
| return | ||
| } | ||
| td := bc.GetTd(block.ParentHash(), block.NumberU64()-1) | ||
| bc.logger.OnBlockStart(tracing.BlockEvent{ | ||
| Block: block, | ||
| TD: td, | ||
| Finalized: bc.CurrentFinalBlock(), | ||
| Safe: bc.CurrentSafeBlock(), | ||
| }) | ||
| } | ||
|
|
||
| parentRoot := parent.Root | ||
| prefetch, process, err := bc.statedb.ReadersWithCacheStats(parentRoot) | ||
| // setupBlockReaders builds the three StateDBs needed for parallel block | ||
| // processing: throwaway (for prefetcher), statedb (for serial processor), | ||
| // and parallelStatedb (for V2). The V2 statedb has concurrent reads | ||
| // enabled before the prefetcher runs so the underlying trieReader uses | ||
| // muSubTries throughout — switching mid-flight would race. | ||
| func (bc *BlockChain) setupBlockReaders(parentRoot common.Hash) ( | ||
| throwaway, statedb, parallelStatedb *state.StateDB, | ||
| prefetch, process, parallel state.ReaderWithStats, err error, | ||
| ) { | ||
| prefetch, process, parallel, err = bc.statedb.ReadersWithCacheStatsTriple(parentRoot) | ||
| if err != nil { | ||
| return nil, nil, 0, nil, 0, err | ||
| return nil, nil, nil, nil, nil, nil, err | ||
| } | ||
| throwaway, err := state.NewWithReader(parentRoot, bc.statedb, prefetch) | ||
| if err != nil { | ||
| return nil, nil, 0, nil, 0, err | ||
| if throwaway, err = state.NewWithReader(parentRoot, bc.statedb, prefetch); err != nil { | ||
| return nil, nil, nil, nil, nil, nil, err | ||
| } | ||
| statedb, err := state.NewWithReader(parentRoot, bc.statedb, process) | ||
| if err != nil { | ||
| return nil, nil, 0, nil, 0, err | ||
| if statedb, err = state.NewWithReader(parentRoot, bc.statedb, process); err != nil { | ||
| return nil, nil, nil, nil, nil, nil, err | ||
| } | ||
| parallelStatedb, err := state.NewWithReader(parentRoot, bc.statedb, process) | ||
| if err != nil { | ||
| return nil, nil, 0, nil, 0, err | ||
| if parallelStatedb, err = state.NewWithReader(parentRoot, bc.statedb, parallel); err != nil { | ||
| return nil, nil, nil, nil, nil, nil, err | ||
| } | ||
| parallelStatedb.EnableConcurrentReads() | ||
| return throwaway, statedb, parallelStatedb, prefetch, process, parallel, nil | ||
| } | ||
|
|
||
| // Upload the statistics of reader at the end | ||
| defer func() { | ||
| stats := prefetch.GetStats() | ||
| accountCacheHitPrefetchMeter.Mark(stats.AccountHit) | ||
| accountCacheMissPrefetchMeter.Mark(stats.AccountMiss) | ||
| storageCacheHitPrefetchMeter.Mark(stats.StorageHit) | ||
| storageCacheMissPrefetchMeter.Mark(stats.StorageMiss) | ||
| stats = process.GetStats() | ||
| accountCacheHitMeter.Mark(stats.AccountHit) | ||
| accountCacheMissMeter.Mark(stats.AccountMiss) | ||
| storageCacheHitMeter.Mark(stats.StorageHit) | ||
| storageCacheMissMeter.Mark(stats.StorageMiss) | ||
|
|
||
| // Report additional prefetch attribution metrics | ||
| prefetchStats := prefetch.GetPrefetchStats() | ||
| accountInsertPrefetchMeter.Mark(prefetchStats.AccountInsert) | ||
| storageInsertPrefetchMeter.Mark(prefetchStats.StorageInsert) | ||
|
|
||
| processStats := process.GetPrefetchStats() | ||
| accountHitFromPrefetchMeter.Mark(processStats.AccountHitFromPrefetch) | ||
| storageHitFromPrefetchMeter.Mark(processStats.StorageHitFromPrefetch) | ||
| accountHitFromPrefetchUniqueMeter.Mark(processStats.AccountHitFromPrefetchUnique) | ||
| }() | ||
| // reportReaderStats marks per-block cache hit/miss meters from prefetch, | ||
| // process, and parallel readers. Intended to be called via defer at the | ||
| // end of ProcessBlock. | ||
| // | ||
| // process and parallel both use the roleProcess label internally and | ||
| // share the same underlying cache, but ReadersWithCacheStatsTriple | ||
| // returns independent ReaderWithStats wrappers, so V2's reads accumulate | ||
| // in `parallel`'s atomic counters separately from V1's `process` counters. | ||
| // We merge them into the same meter set here so the cache-hit-rate | ||
| // dashboards reflect the work the winning processor (typically V2) did, | ||
| // rather than only the losing serial path's interrupted reads. | ||
| func reportReaderStats(prefetch, process, parallel state.ReaderWithStats) { | ||
| stats := prefetch.GetStats() | ||
| accountCacheHitPrefetchMeter.Mark(stats.AccountHit) | ||
| accountCacheMissPrefetchMeter.Mark(stats.AccountMiss) | ||
| storageCacheHitPrefetchMeter.Mark(stats.StorageHit) | ||
| storageCacheMissPrefetchMeter.Mark(stats.StorageMiss) | ||
|
|
||
| procStats := process.GetStats() | ||
| parStats := parallel.GetStats() | ||
| accountCacheHitMeter.Mark(procStats.AccountHit + parStats.AccountHit) | ||
| accountCacheMissMeter.Mark(procStats.AccountMiss + parStats.AccountMiss) | ||
| storageCacheHitMeter.Mark(procStats.StorageHit + parStats.StorageHit) | ||
| storageCacheMissMeter.Mark(procStats.StorageMiss + parStats.StorageMiss) | ||
|
|
||
| prefetchStats := prefetch.GetPrefetchStats() | ||
| accountInsertPrefetchMeter.Mark(prefetchStats.AccountInsert) | ||
| storageInsertPrefetchMeter.Mark(prefetchStats.StorageInsert) | ||
|
|
||
| procPF := process.GetPrefetchStats() | ||
| parPF := parallel.GetPrefetchStats() | ||
| accountHitFromPrefetchMeter.Mark(procPF.AccountHitFromPrefetch + parPF.AccountHitFromPrefetch) | ||
| storageHitFromPrefetchMeter.Mark(procPF.StorageHitFromPrefetch + parPF.StorageHitFromPrefetch) | ||
| accountHitFromPrefetchUniqueMeter.Mark(procPF.AccountHitFromPrefetchUnique + parPF.AccountHitFromPrefetchUnique) | ||
| } | ||
|
|
||
| // sharedBlockCaches holds VM-level caches that are shared between the | ||
| // prefetcher goroutine and the V2 BlockSTM workers for a single block. | ||
| type sharedBlockCaches struct { | ||
| jumpDests vm.JumpDestCache | ||
| keccak *sync.Map | ||
| ecrecover *sync.Map | ||
| } | ||
|
|
||
| func newSharedBlockCaches() *sharedBlockCaches { | ||
| return &sharedBlockCaches{ | ||
| jumpDests: vm.NewSyncJumpDestCache(), | ||
| keccak: &sync.Map{}, | ||
| ecrecover: &sync.Map{}, | ||
| } | ||
| } | ||
|
|
||
| // applyTo populates a vm.Config with the shared caches. | ||
| func (c *sharedBlockCaches) applyTo(cfg *vm.Config) { | ||
| cfg.SharedJumpDestCache = c.jumpDests | ||
| cfg.Keccak256Cache = c.keccak | ||
| cfg.EcrecoverCache = c.ecrecover | ||
| } | ||
|
|
||
| go func(start time.Time, throwaway *state.StateDB, block *types.Block) { | ||
| // Disable tracing for prefetcher executions. | ||
| // startPrefetchGoroutine launches the throwaway-statedb prefetcher in | ||
| // the background. It runs the block with tracing disabled to warm caches | ||
| // for the real processors. | ||
| func (bc *BlockChain) startPrefetchGoroutine(block *types.Block, throwaway *state.StateDB, | ||
| caches *sharedBlockCaches, followupInterrupt *atomic.Bool) { | ||
| go func(start time.Time) { | ||
| vmCfg := bc.cfg.VmConfig | ||
| vmCfg.Tracer = nil | ||
| caches.applyTo(&vmCfg) | ||
| bc.prefetcher.Prefetch(block, throwaway, vmCfg, false, followupInterrupt) | ||
|
|
||
| blockPrefetchExecuteTimer.Update(time.Since(start)) | ||
| if followupInterrupt.Load() { | ||
| blockPrefetchInterruptMeter.Mark(1) | ||
| } | ||
| }(time.Now(), throwaway, block) | ||
| }(time.Now()) | ||
| } | ||
|
|
||
| func (bc *BlockChain) ProcessBlock(block *types.Block, parent *types.Header, witness *stateless.Witness, followupInterrupt *atomic.Bool) (_ types.Receipts, _ []*types.Log, _ uint64, _ *state.StateDB, vtime time.Duration, blockEndErr error) { | ||
| // Process the block using processor and parallelProcessor at the same time, take the one which finishes first, cancel the other, and return the result | ||
| ctx, cancel := context.WithCancel(context.Background()) | ||
| defer cancel() | ||
|
|
||
| if followupInterrupt == nil { | ||
| followupInterrupt = &atomic.Bool{} | ||
| } | ||
| bc.fireBlockStart(block) | ||
| if bc.logger != nil && bc.logger.OnBlockEnd != nil { | ||
| defer func() { bc.logger.OnBlockEnd(blockEndErr) }() | ||
| } | ||
|
|
||
| throwaway, statedb, parallelStatedb, prefetch, process, parallel, err := bc.setupBlockReaders(parent.Root) | ||
| if err != nil { | ||
| return nil, nil, 0, nil, 0, err | ||
| } | ||
| defer reportReaderStats(prefetch, process, parallel) | ||
|
|
||
| // Shared caches for this block — used by both prefetcher and V2 workers. | ||
| sharedCaches := newSharedBlockCaches() | ||
| bc.startPrefetchGoroutine(block, throwaway, sharedCaches, followupInterrupt) | ||
|
|
||
| type Result struct { | ||
| receipts types.Receipts | ||
|
|
@@ -796,13 +857,6 @@ | |
| parallel bool | ||
| } | ||
|
|
||
| // Only disable Parallel Processor for witness producers | ||
| // TODO: work on enabling witness production for parallel processor | ||
| if witness != nil { | ||
| bc.parallelProcessor = nil | ||
| bc.enforceParallelProcessor = false | ||
| } | ||
|
|
||
| var resultChanLen int = 2 | ||
| if bc.enforceParallelProcessor { | ||
| log.Debug("Processing block using Block STM only", "number", block.NumberU64()) | ||
|
|
@@ -811,23 +865,32 @@ | |
| resultChan := make(chan Result, resultChanLen) | ||
|
|
||
| processorCount := 0 | ||
| execStart := time.Now() | ||
|
|
||
| if bc.parallelProcessor != nil { | ||
| processorCount++ | ||
|
|
||
| go func() { | ||
| pstart := time.Now() | ||
| parallelStatedb.StartPrefetcher("chain", witness, nil) | ||
| res, err := bc.parallelProcessor.Process(block, parallelStatedb, bc.cfg.VmConfig, nil, ctx) | ||
| v2VmCfg := bc.cfg.VmConfig | ||
| sharedCaches.applyTo(&v2VmCfg) | ||
| res, err := bc.parallelProcessor.Process(block, parallelStatedb, v2VmCfg, nil, ctx) | ||
|
claude[bot] marked this conversation as resolved.
|
||
| blockExecutionParallelTimer.UpdateSince(pstart) | ||
| if err == nil { | ||
| vstart := time.Now() | ||
| err = bc.validator.ValidateState(block, parallelStatedb, res, false) | ||
| vtime = time.Since(vstart) | ||
| } | ||
| // If context was cancelled (we lost the race), stop prefetcher | ||
| // before sending result. This prevents "layer stale" errors when | ||
| // the winner's commit advances the pathdb layer. | ||
| if ctx.Err() != nil { | ||
| parallelStatedb.StopPrefetcher() | ||
| } | ||
| if res == nil { | ||
| res = &ProcessResult{} | ||
| } | ||
|
Check failure on line 893 in core/blockchain.go
|
||
|
Comment on lines
873
to
893
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🔴 🟡 V2 prefetcher leaks on the documented V2-failure fallback path. When V2 returns an error (PanickedIdx, ExecErrIdx) and ProcessBlock falls back to serial, the V2 statedb's Extended reasoning...What the bug isThe V2 goroutine in Back in the V2 goroutine in blockchain.go, the only cleanup gate is the Why the fallback drain doesn't catch it\n\nWhen V2 sends its error result, the main goroutine enters the fallback at blockchain.go:929:
|
||
| resultChan <- Result{res.Receipts, res.Logs, res.GasUsed, err, parallelStatedb, blockExecutionParallelCounter, true} | ||
| }() | ||
| } | ||
|
|
@@ -845,6 +908,9 @@ | |
| err = bc.validator.ValidateState(block, statedb, res, false) | ||
| vtime = time.Since(vstart) | ||
| } | ||
| if ctx.Err() != nil { | ||
| statedb.StopPrefetcher() | ||
| } | ||
| if res == nil { | ||
| res = &ProcessResult{} | ||
| } | ||
|
|
@@ -854,8 +920,14 @@ | |
|
|
||
| result := <-resultChan | ||
|
|
||
| // If V2 returned an error (panic, ApplyMessage consensus error, etc.) | ||
| // and the serial processor is also running, fall back to the serial | ||
| // result BEFORE cancelling — cancelling first would interrupt the | ||
| // still-running serial processor at its next tx boundary and the | ||
| // fallback would receive context.Canceled instead of a usable | ||
| // recovery. The fallback IS the recovery; it must run to completion. | ||
| if result.parallel && result.err != nil { | ||
| log.Warn("Parallel state processor failed", "err", result.err) | ||
| log.Warn("Parallel state processor failed", "number", block.NumberU64(), "hash", block.Hash(), "err", result.err) | ||
| blockExecutionParallelErrorCounter.Inc(1) | ||
| // If the parallel processor failed, we will fallback to the serial processor if enabled | ||
| if processorCount == 2 { | ||
|
|
@@ -865,14 +937,41 @@ | |
| } | ||
| } | ||
|
|
||
| // With the result we plan to keep in hand, cancel the shared context | ||
| // so the loser (if any) stops at its next tx boundary, and signal the | ||
| // throwaway prefetcher to stop. This must happen BEFORE ProcessBlock | ||
| // returns, because the caller will commit the block (advancing the | ||
| // pathdb layer), which would invalidate any trie references still | ||
| // held by the loser's prefetcher. | ||
| cancel() | ||
| followupInterrupt.Store(true) | ||
|
|
||
| result.counter.Inc(1) | ||
|
|
||
| // Make sure we are not leaking any prefetchers | ||
| // Report per-block mgasps for the winning processor. | ||
| // Value is scaled by 1000 (stored as µgasps) to preserve 3 decimal places, | ||
| // e.g. 210.357 mgasps → 210357. Divide by 1000 when reading. | ||
| // Exclude sprint-end blocks (with state sync tx) — their Finalize overhead | ||
| // (Heimdall state sync ~164ms) distorts the execution throughput metric. | ||
| hasStateSync := false | ||
| if txs := block.Transactions(); len(txs) > 0 { | ||
| hasStateSync = txs[len(txs)-1].Type() == types.StateSyncTxType | ||
| } | ||
| if elapsed := time.Since(execStart); elapsed > 0 && result.usedGas > 0 && !hasStateSync { | ||
| mgasps := int64(float64(result.usedGas) * 1e6 / float64(elapsed)) // µgasps (mgasps * 1000) | ||
| blockMgaspsMeter.Update(mgasps) | ||
| } | ||
|
|
||
| // Wait for the losing processor to finish and stop its prefetcher. | ||
| // Must be synchronous: the caller will commit the block (advancing the | ||
| // pathdb layer), which invalidates trie references held by the loser's | ||
| // prefetcher subfetchers. The context is already cancelled and both V1 | ||
| // and V2 honour it at task-boundary level (V1 in its task loop; V2 in | ||
| // the executor's dispatcher and validation loop), so the loser stops | ||
| // promptly — typically within one tx execution. | ||
| if processorCount == 2 { | ||
| go func() { | ||
| second_result := <-resultChan | ||
| second_result.statedb.StopPrefetcher() | ||
| }() | ||
| second_result := <-resultChan | ||
| second_result.statedb.StopPrefetcher() | ||
| } | ||
|
|
||
| return result.receipts, result.logs, result.usedGas, result.statedb, vtime, result.err | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.