From ac89ac9302f8f99b8e42887fff429333e7764069 Mon Sep 17 00:00:00 2001 From: Alright Date: Thu, 4 Jun 2026 15:42:31 -0400 Subject: [PATCH 1/9] use sync.RWMutex and bbolt View() for read-only methods --- chain/db.go | 30 +++++ chain/manager.go | 292 +++++++++++++++++++++++++++++------------------ db.go | 33 ++++++ 3 files changed, 242 insertions(+), 113 deletions(-) diff --git a/chain/db.go b/chain/db.go index a3ab8f23..7a844b1d 100644 --- a/chain/db.go +++ b/chain/db.go @@ -65,6 +65,14 @@ type DB interface { CreateBucket(name []byte) (DBBucket, error) Flush() error Cancel() + + // View calls fn with a read-only DB. Implementations must guarantee + // that the DB passed to fn observes a consistent snapshot for the + // duration of fn, and that fn may run concurrently with other View + // calls. Buckets obtained via the DB inside fn must not be used + // after fn returns. Writes via the DB inside fn are not durable + // and may panic. + View(fn func(DB) error) error } // A DBBucket is a set of key-value pairs. @@ -105,6 +113,12 @@ func (db *MemDB) Flush() error { return nil } +// View implements DB. MemDB has no MVCC; callers must ensure that no writes +// occur concurrently with the View call. +func (db *MemDB) View(fn func(DB) error) error { + return fn(db) +} + // Cancel implements DB. func (db *MemDB) Cancel() { for k := range db.puts { @@ -336,6 +350,12 @@ func (db *CacheDB) Cancel() { db.db.Cancel() } +// View implements DB. CacheDB has no MVCC; callers must ensure that no writes +// occur concurrently with the View call. +func (db *CacheDB) View(fn func(DB) error) error { + return fn(db) +} + // NewCacheDB returns a new CacheDB that wraps the given DB. func NewCacheDB(db DB) DB { return &CacheDB{ @@ -948,6 +968,16 @@ func (db *DBStore) Flush() error { return err } +// View implements Store. It calls fn with a read-only Store backed by a +// consistent snapshot of the underlying DB. The Store passed to fn must not be +// used after fn returns. Writes performed via the Store will fail. +func (db *DBStore) View(fn func(Store)) error { + return db.db.View(func(rdb DB) error { + fn(&DBStore{db: rdb, n: db.n}) + return nil + }) +} + // NewDBStore creates a new DBStore using the provided database. The tip state // is also returned. The DB will be automatically migrated if necessary. The // provided logger may be nil. diff --git a/chain/manager.go b/chain/manager.go index 1a355ea1..433fcad3 100644 --- a/chain/manager.go +++ b/chain/manager.go @@ -65,6 +65,11 @@ type Store interface { ApplyBlock(s consensus.State, cau consensus.ApplyUpdate) RevertBlock(s consensus.State, cru consensus.RevertUpdate) Flush() error + + // View calls fn with a read-only Store backed by a consistent snapshot + // of the underlying storage. The Store passed to fn must not be used + // after fn returns and may execute concurrently with other View calls. + View(fn func(Store)) error } // blockAndParent returns the block with the specified ID, along with its parent @@ -98,13 +103,31 @@ type Manager struct { lastRevertedV2 []types.V2Transaction } - mu sync.Mutex + mu sync.RWMutex +} + +// flushStore commits any pending store writes. It must be called before +// releasing m.mu in writer paths so that concurrent View readers observe a +// consistent snapshot. +func (m *Manager) flushStore() { + if err := m.store.Flush(); err != nil { + panic(err) + } +} + +// viewStore runs fn against a read-only snapshot of the store. It panics on +// any tx-level error, consistent with the rest of Manager's store error +// handling. +func (m *Manager) viewStore(fn func(Store)) { + if err := m.store.View(fn); err != nil { + panic(err) + } } // TipState returns the consensus state for the current tip. func (m *Manager) TipState() consensus.State { - m.mu.Lock() - defer m.mu.Unlock() + m.mu.RLock() + defer m.mu.RUnlock() return m.tipState } @@ -114,51 +137,61 @@ func (m *Manager) Tip() types.ChainIndex { } // Block returns the block with the specified ID. -func (m *Manager) Block(id types.BlockID) (types.Block, bool) { - m.mu.Lock() - defer m.mu.Unlock() - b, _, ok := m.store.Block(id) - return b, ok +func (m *Manager) Block(id types.BlockID) (b types.Block, ok bool) { + m.mu.RLock() + defer m.mu.RUnlock() + m.viewStore(func(s Store) { + b, _, ok = s.Block(id) + }) + return } // State returns the state with the specified ID. -func (m *Manager) State(id types.BlockID) (consensus.State, bool) { - m.mu.Lock() - defer m.mu.Unlock() - return m.store.State(id) +func (m *Manager) State(id types.BlockID) (cs consensus.State, ok bool) { + m.mu.RLock() + defer m.mu.RUnlock() + m.viewStore(func(s Store) { + cs, ok = s.State(id) + }) + return } // BestIndex returns the index of the block at the specified height within the // best chain. -func (m *Manager) BestIndex(height uint64) (types.ChainIndex, bool) { - m.mu.Lock() - defer m.mu.Unlock() - return m.store.BestIndex(height) +func (m *Manager) BestIndex(height uint64) (index types.ChainIndex, ok bool) { + m.mu.RLock() + defer m.mu.RUnlock() + m.viewStore(func(s Store) { + index, ok = s.BestIndex(height) + }) + return } // MinReorgIndex returns the index on the best chain below which the manager // cannot perform a reorg. -func (m *Manager) MinReorgIndex() types.ChainIndex { - m.mu.Lock() - defer m.mu.Unlock() - index := m.tipState.Index - for index.Height > 0 { - prevIndex, ok := m.store.BestIndex(index.Height - 1) - _, _, ok2 := m.store.Block(prevIndex.ID) - if !ok || !ok2 { - break +func (m *Manager) MinReorgIndex() (index types.ChainIndex) { + m.mu.RLock() + defer m.mu.RUnlock() + m.viewStore(func(s Store) { + index = m.tipState.Index + for index.Height > 0 { + prevIndex, ok := s.BestIndex(index.Height - 1) + _, _, ok2 := s.Block(prevIndex.ID) + if !ok || !ok2 { + break + } + index = prevIndex } - index = prevIndex - } - return index + }) + return } // History returns a set of block IDs that span the best chain, beginning with // the 10 most-recent blocks, and subsequently spaced exponentionally farther // apart until reaching the genesis block. -func (m *Manager) History() ([32]types.BlockID, error) { - m.mu.Lock() - defer m.mu.Unlock() +func (m *Manager) History() (history [32]types.BlockID, _ error) { + m.mu.RLock() + defer m.mu.RUnlock() tipHeight := m.tipState.Index.Height histHeight := func(i int) uint64 { @@ -171,37 +204,46 @@ func (m *Manager) History() ([32]types.BlockID, error) { } return tipHeight - offset } - var history [32]types.BlockID - for i := range history { - index, ok := m.store.BestIndex(histHeight(i)) - if !ok { - break + m.viewStore(func(s Store) { + for i := range history { + index, ok := s.BestIndex(histHeight(i)) + if !ok { + break + } + history[i] = index.ID } - history[i] = index.ID - } + }) return history, nil } // Headers returns up to max consecutive headers starting from supplied index, // which must be on the best chain. It also returns the number of headers // between the end of the returned slice and the current tip. -func (m *Manager) Headers(index types.ChainIndex, maxHeaders uint64) ([]types.BlockHeader, uint64, error) { - m.mu.Lock() - defer m.mu.Unlock() - if bestIndex, ok := m.store.BestIndex(index.Height); !ok || bestIndex != index { - return nil, 0, fmt.Errorf("index %v is not on our best chain", index) - } - maxHeaders = min(maxHeaders, m.tipState.Index.Height-index.Height) - headers := make([]types.BlockHeader, maxHeaders) - for i := range headers { - index, _ := m.store.BestIndex(index.Height + uint64(i) + 1) - bh, ok := m.store.Header(index.ID) - if !ok { - return nil, 0, fmt.Errorf("missing block header %v", index) +func (m *Manager) Headers(index types.ChainIndex, maxHeaders uint64) (headers []types.BlockHeader, remaining uint64, err error) { + m.mu.RLock() + defer m.mu.RUnlock() + m.viewStore(func(s Store) { + if bestIndex, ok := s.BestIndex(index.Height); !ok || bestIndex != index { + err = fmt.Errorf("index %v is not on our best chain", index) + return + } + maxHeaders = min(maxHeaders, m.tipState.Index.Height-index.Height) + headers = make([]types.BlockHeader, maxHeaders) + for i := range headers { + next, _ := s.BestIndex(index.Height + uint64(i) + 1) + bh, ok := s.Header(next.ID) + if !ok { + err = fmt.Errorf("missing block header %v", next) + return + } + headers[i] = bh } - headers[i] = bh + remaining = m.tipState.Index.Height - (index.Height + maxHeaders) + }) + if err != nil { + return nil, 0, err } - return headers, m.tipState.Index.Height - (index.Height + maxHeaders), nil + return } // BlocksForHistory returns up to maxBlocks consecutive blocks from the best @@ -209,34 +251,42 @@ func (m *Manager) Headers(index types.ChainIndex, maxHeaders uint64) ([]types.Bl // is present in the best chain (or, if no match is found, genesis). It also // returns the number of blocks between the end of the returned slice and the // current tip. -func (m *Manager) BlocksForHistory(history []types.BlockID, maxBlocks uint64) ([]types.Block, uint64, error) { - m.mu.Lock() - defer m.mu.Unlock() - var attachHeight uint64 - for _, id := range history { - if cs, ok := m.store.State(id); !ok { - continue - } else if index, ok := m.store.BestIndex(cs.Index.Height); ok && index == cs.Index { - attachHeight = cs.Index.Height - break +func (m *Manager) BlocksForHistory(history []types.BlockID, maxBlocks uint64) (blocks []types.Block, remaining uint64, err error) { + m.mu.RLock() + defer m.mu.RUnlock() + m.viewStore(func(s Store) { + var attachHeight uint64 + for _, id := range history { + if cs, ok := s.State(id); !ok { + continue + } else if index, ok := s.BestIndex(cs.Index.Height); ok && index == cs.Index { + attachHeight = cs.Index.Height + break + } } - } - if maxBlocks > m.tipState.Index.Height-attachHeight { - maxBlocks = m.tipState.Index.Height - attachHeight - } - blocks := make([]types.Block, maxBlocks) - for i := range blocks { - index, ok := m.store.BestIndex(attachHeight + uint64(i) + 1) - if !ok { - return nil, 0, fmt.Errorf("unknown block at height %v", attachHeight+uint64(i)+1) + if maxBlocks > m.tipState.Index.Height-attachHeight { + maxBlocks = m.tipState.Index.Height - attachHeight } - b, _, ok := m.store.Block(index.ID) - if !ok { - return nil, 0, fmt.Errorf("missing block %v", index) + blocks = make([]types.Block, maxBlocks) + for i := range blocks { + index, ok := s.BestIndex(attachHeight + uint64(i) + 1) + if !ok { + err = fmt.Errorf("unknown block at height %v", attachHeight+uint64(i)+1) + return + } + b, _, ok := s.Block(index.ID) + if !ok { + err = fmt.Errorf("missing block %v", index) + return + } + blocks[i] = b } - blocks[i] = b + remaining = m.tipState.Index.Height - (attachHeight + maxBlocks) + }) + if err != nil { + return nil, 0, err } - return blocks, m.tipState.Index.Height - (attachHeight + maxBlocks), nil + return } // AddBlocks ingests a chain of blocks. If the blocks are valid, the chain they @@ -244,6 +294,7 @@ func (m *Manager) BlocksForHistory(history []types.BlockID, maxBlocks uint64) ([ func (m *Manager) AddBlocks(blocks []types.Block) error { m.mu.Lock() defer m.mu.Unlock() + defer m.flushStore() if len(blocks) == 0 { return nil } @@ -297,6 +348,7 @@ func (m *Manager) AddBlocks(blocks []types.Block) error { for _, fn := range m.onPool { fns = append(fns, fn) } + m.flushStore() m.mu.Unlock() for _, fn := range fns { fn() @@ -312,6 +364,7 @@ func (m *Manager) AddBlocks(blocks []types.Block) error { func (m *Manager) AddValidatedV2Blocks(blocks []types.Block, states []consensus.State) error { m.mu.Lock() defer m.mu.Unlock() + defer m.flushStore() if len(blocks) == 0 { return nil } else if len(states) != len(blocks) { @@ -348,6 +401,7 @@ func (m *Manager) AddValidatedV2Blocks(blocks []types.Block, states []consensus. for _, fn := range m.onPool { fns = append(fns, fn) } + m.flushStore() m.mu.Unlock() for _, fn := range fns { fn() @@ -533,6 +587,7 @@ func (m *Manager) reorgTo(index types.ChainIndex) error { func (m *Manager) PruneBlocks(height uint64) { m.mu.Lock() defer m.mu.Unlock() + defer m.flushStore() for h := height; h > 0; h-- { index, ok := m.store.BestIndex(h - 1) @@ -548,45 +603,55 @@ func (m *Manager) PruneBlocks(height uint64) { // UpdatesSince returns at most max updates on the path between index and the // Manager's current tip. func (m *Manager) UpdatesSince(index types.ChainIndex, maxBlocks int) (rus []RevertUpdate, aus []ApplyUpdate, err error) { - m.mu.Lock() - defer m.mu.Unlock() - onBestChain := func(index types.ChainIndex) bool { - bi, _ := m.store.BestIndex(index.Height) - return bi.ID == index.ID || index == types.ChainIndex{} - } - - for index != m.tipState.Index && len(rus)+len(aus) < maxBlocks { - // revert until we are on the best chain, then apply - if !onBestChain(index) { - b, bs, cs, ok := blockAndParent(m.store, index.ID) - if !ok { - return nil, nil, fmt.Errorf("%w %v", ErrMissingBlock, index) - } else if bs == nil { - return nil, nil, fmt.Errorf("missing supplement for block %v", index) - } - cru := consensus.RevertBlock(cs, b, *bs) - rus = append(rus, RevertUpdate{cru, b, cs}) - index = cs.Index - } else { - // special case: if index is uninitialized, we're starting from genesis - if index == (types.ChainIndex{}) { - index, _ = m.store.BestIndex(0) + m.mu.RLock() + defer m.mu.RUnlock() + m.viewStore(func(s Store) { + onBestChain := func(index types.ChainIndex) bool { + bi, _ := s.BestIndex(index.Height) + return bi.ID == index.ID || index == types.ChainIndex{} + } + + for index != m.tipState.Index && len(rus)+len(aus) < maxBlocks { + // revert until we are on the best chain, then apply + if !onBestChain(index) { + b, bs, cs, ok := blockAndParent(s, index.ID) + if !ok { + err = fmt.Errorf("%w %v", ErrMissingBlock, index) + return + } else if bs == nil { + err = fmt.Errorf("missing supplement for block %v", index) + return + } + cru := consensus.RevertBlock(cs, b, *bs) + rus = append(rus, RevertUpdate{cru, b, cs}) + index = cs.Index } else { - index, _ = m.store.BestIndex(index.Height + 1) - } - b, bs, cs, ok := blockAndParent(m.store, index.ID) - if !ok { - return nil, nil, fmt.Errorf("%w %v", ErrMissingBlock, index) - } else if bs == nil { - return nil, nil, fmt.Errorf("missing supplement for block %v", index) - } - ancestorTimestamp, ok := m.store.AncestorTimestamp(b.ParentID) - if !ok && index.Height != 0 { - return nil, nil, fmt.Errorf("missing ancestor timestamp for block %v", b.ParentID) + // special case: if index is uninitialized, we're starting from genesis + if index == (types.ChainIndex{}) { + index, _ = s.BestIndex(0) + } else { + index, _ = s.BestIndex(index.Height + 1) + } + b, bs, cs, ok := blockAndParent(s, index.ID) + if !ok { + err = fmt.Errorf("%w %v", ErrMissingBlock, index) + return + } else if bs == nil { + err = fmt.Errorf("missing supplement for block %v", index) + return + } + ancestorTimestamp, ok := s.AncestorTimestamp(b.ParentID) + if !ok && index.Height != 0 { + err = fmt.Errorf("missing ancestor timestamp for block %v", b.ParentID) + return + } + cs, cau := consensus.ApplyBlock(cs, b, *bs, ancestorTimestamp) + aus = append(aus, ApplyUpdate{cau, b, cs}) } - cs, cau := consensus.ApplyBlock(cs, b, *bs, ancestorTimestamp) - aus = append(aus, ApplyUpdate{cau, b, cs}) } + }) + if err != nil { + return nil, nil, err } return } @@ -1455,5 +1520,6 @@ func NewManager(store Store, cs consensus.State, opts ...ManagerOption) *Manager for _, opt := range opts { opt(m) } + return m } diff --git a/db.go b/db.go index ff0eb359..875979a6 100644 --- a/db.go +++ b/db.go @@ -78,6 +78,39 @@ func (db *BoltChainDB) Cancel() { db.tx = nil } +// View implements chain.DB. It runs fn against a read-only bbolt transaction, +// which can execute concurrently with other View calls and with the open +// writer transaction. Writes performed via the DB passed to fn will fail. +func (db *BoltChainDB) View(fn func(chain.DB) error) error { + return db.db.View(func(tx *bbolt.Tx) error { + return fn(&boltViewDB{tx: tx}) + }) +} + +// boltViewDB is a read-only chain.DB backed by a bbolt read transaction. +type boltViewDB struct { + tx *bbolt.Tx +} + +func (db *boltViewDB) Bucket(name []byte) chain.DBBucket { + b := db.tx.Bucket(name) + if b == nil { + return nil + } + return boltBucket{b} +} + +func (db *boltViewDB) CreateBucket(name []byte) (chain.DBBucket, error) { + return nil, bbolt.ErrTxNotWritable +} + +func (db *boltViewDB) Flush() error { return nil } +func (db *boltViewDB) Cancel() {} + +func (db *boltViewDB) View(fn func(chain.DB) error) error { + return fn(db) +} + // Close closes the BoltDB database. func (db *BoltChainDB) Close() error { db.Flush() From db7a3a340ec9c184b62a2fef4f09f908c6d06ac5 Mon Sep 17 00:00:00 2001 From: Alright Date: Tue, 9 Jun 2026 08:21:00 -0400 Subject: [PATCH 2/9] golangci-lint fix --- db.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/db.go b/db.go index 875979a6..804958fd 100644 --- a/db.go +++ b/db.go @@ -100,7 +100,7 @@ func (db *boltViewDB) Bucket(name []byte) chain.DBBucket { return boltBucket{b} } -func (db *boltViewDB) CreateBucket(name []byte) (chain.DBBucket, error) { +func (db *boltViewDB) CreateBucket(_ []byte) (chain.DBBucket, error) { return nil, bbolt.ErrTxNotWritable } From 21a3ef33bc0edb5aee42ff21bcd95e3e443e3c37 Mon Sep 17 00:00:00 2001 From: Alright Date: Tue, 9 Jun 2026 08:25:45 -0400 Subject: [PATCH 3/9] add changeset --- ...hain_manager_mutex_to_rwmutex_to_enable_parallel_reads.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/change_chain_manager_mutex_to_rwmutex_to_enable_parallel_reads.md diff --git a/.changeset/change_chain_manager_mutex_to_rwmutex_to_enable_parallel_reads.md b/.changeset/change_chain_manager_mutex_to_rwmutex_to_enable_parallel_reads.md new file mode 100644 index 00000000..f047e0bb --- /dev/null +++ b/.changeset/change_chain_manager_mutex_to_rwmutex_to_enable_parallel_reads.md @@ -0,0 +1,5 @@ +--- +default: patch +--- + +# Change Chain Manager Mutex to RWMutex to enable parallel reads From f0f540089470b1dfb12e55e1cb408e1349c38556 Mon Sep 17 00:00:00 2001 From: Alright Date: Tue, 9 Jun 2026 08:41:41 -0400 Subject: [PATCH 4/9] whitespace --- chain/manager.go | 1 - 1 file changed, 1 deletion(-) diff --git a/chain/manager.go b/chain/manager.go index 433fcad3..656d5af4 100644 --- a/chain/manager.go +++ b/chain/manager.go @@ -1520,6 +1520,5 @@ func NewManager(store Store, cs consensus.State, opts ...ManagerOption) *Manager for _, opt := range opts { opt(m) } - return m } From 6cdb24311bdb6e5a4b2d1878ed625598219b44b0 Mon Sep 17 00:00:00 2001 From: Alright Date: Tue, 9 Jun 2026 08:51:14 -0400 Subject: [PATCH 5/9] add writeUnlock helper to pair store flush with lock release --- chain/manager.go | 31 ++++++++++++++++++------------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/chain/manager.go b/chain/manager.go index 656d5af4..f3174812 100644 --- a/chain/manager.go +++ b/chain/manager.go @@ -106,15 +106,25 @@ type Manager struct { mu sync.RWMutex } -// flushStore commits any pending store writes. It must be called before -// releasing m.mu in writer paths so that concurrent View readers observe a -// consistent snapshot. +// flushStore commits any pending store writes. Writer paths must not release +// m.mu without flushing first, or concurrent View readers would observe a +// snapshot older than m.tipState; use writeUnlock to release the write lock so +// the two cannot drift apart. func (m *Manager) flushStore() { if err := m.store.Flush(); err != nil { panic(err) } } +// writeUnlock flushes pending store writes, then releases the write lock. It +// must be used (never m.mu.Unlock directly) to release a lock taken with +// m.mu.Lock in a store-mutating path, so that flush-before-unlock cannot be +// forgotten. Pool-only writers that don't touch the store use m.mu.Unlock. +func (m *Manager) writeUnlock() { + m.flushStore() + m.mu.Unlock() +} + // viewStore runs fn against a read-only snapshot of the store. It panics on // any tx-level error, consistent with the rest of Manager's store error // handling. @@ -293,8 +303,7 @@ func (m *Manager) BlocksForHistory(history []types.BlockID, maxBlocks uint64) (b // belong to may become the new best chain, triggering a reorg. func (m *Manager) AddBlocks(blocks []types.Block) error { m.mu.Lock() - defer m.mu.Unlock() - defer m.flushStore() + defer m.writeUnlock() if len(blocks) == 0 { return nil } @@ -348,8 +357,7 @@ func (m *Manager) AddBlocks(blocks []types.Block) error { for _, fn := range m.onPool { fns = append(fns, fn) } - m.flushStore() - m.mu.Unlock() + m.writeUnlock() for _, fn := range fns { fn() } @@ -363,8 +371,7 @@ func (m *Manager) AddBlocks(blocks []types.Block) error { // sufficient work, it may become the new best chain, triggering a reorg. func (m *Manager) AddValidatedV2Blocks(blocks []types.Block, states []consensus.State) error { m.mu.Lock() - defer m.mu.Unlock() - defer m.flushStore() + defer m.writeUnlock() if len(blocks) == 0 { return nil } else if len(states) != len(blocks) { @@ -401,8 +408,7 @@ func (m *Manager) AddValidatedV2Blocks(blocks []types.Block, states []consensus. for _, fn := range m.onPool { fns = append(fns, fn) } - m.flushStore() - m.mu.Unlock() + m.writeUnlock() for _, fn := range fns { fn() } @@ -586,8 +592,7 @@ func (m *Manager) reorgTo(index types.ChainIndex) error { // a large backlog. func (m *Manager) PruneBlocks(height uint64) { m.mu.Lock() - defer m.mu.Unlock() - defer m.flushStore() + defer m.writeUnlock() for h := height; h > 0; h-- { index, ok := m.store.BestIndex(h - 1) From bcad357ffb236d74fbc8cffac1c102e16d101c1d Mon Sep 17 00:00:00 2001 From: Alright Date: Tue, 16 Jun 2026 08:40:26 -0400 Subject: [PATCH 6/9] add ReadonlyStore --- chain/db.go | 103 ++++++++++++++---- chain/manager.go | 277 +++++++++++++++++++++++------------------------ db.go | 38 +++---- 3 files changed, 229 insertions(+), 189 deletions(-) diff --git a/chain/db.go b/chain/db.go index 7a844b1d..dbd2f443 100644 --- a/chain/db.go +++ b/chain/db.go @@ -66,15 +66,33 @@ type DB interface { Flush() error Cancel() - // View calls fn with a read-only DB. Implementations must guarantee - // that the DB passed to fn observes a consistent snapshot for the - // duration of fn, and that fn may run concurrently with other View - // calls. Buckets obtained via the DB inside fn must not be used - // after fn returns. Writes via the DB inside fn are not durable - // and may panic. - View(fn func(DB) error) error + // Snapshot returns a read-only, point-in-time view of the database. + // Implementations must guarantee that the returned ReadonlyDB observes a + // consistent snapshot and may be read concurrently with writes to the DB + // and with other snapshots. Callers must Close it when finished. + Snapshot() ReadonlyDB } +// A ReadonlyDB is a read-only, point-in-time view of a DB. Because it exposes +// no mutating methods, the underlying database cannot be modified through it. +type ReadonlyDB interface { + Bucket(name []byte) DBBucket + // Close releases the resources held by the snapshot. + Close() error +} + +// noSnapshotDB is a ReadonlyDB for databases that do not support true +// snapshots. Reads observe live state, so callers must exclude concurrent +// writes themselves; Close is a no-op. +type noSnapshotDB struct { + db interface { + Bucket(name []byte) DBBucket + } +} + +func (s noSnapshotDB) Bucket(name []byte) DBBucket { return s.db.Bucket(name) } +func (noSnapshotDB) Close() error { return nil } + // A DBBucket is a set of key-value pairs. type DBBucket interface { Get(key []byte) []byte @@ -113,10 +131,12 @@ func (db *MemDB) Flush() error { return nil } -// View implements DB. MemDB has no MVCC; callers must ensure that no writes -// occur concurrently with the View call. -func (db *MemDB) View(fn func(DB) error) error { - return fn(db) +// Snapshot implements DB. MemDB has no internal locking to prevent a write +// through db racing a read from the returned snapshot, so it must only be used +// behind a Manager, which guards snapshot reads against writes with its +// RWMutex. +func (db *MemDB) Snapshot() ReadonlyDB { + return noSnapshotDB{db} } // Cancel implements DB. @@ -350,10 +370,12 @@ func (db *CacheDB) Cancel() { db.db.Cancel() } -// View implements DB. CacheDB has no MVCC; callers must ensure that no writes -// occur concurrently with the View call. -func (db *CacheDB) View(fn func(DB) error) error { - return fn(db) +// Snapshot implements DB. CacheDB has no internal locking to prevent a write +// through db racing a read from the returned snapshot, so it must only be used +// behind a Manager, which guards snapshot reads against writes with its +// RWMutex. +func (db *CacheDB) Snapshot() ReadonlyDB { + return noSnapshotDB{db} } // NewCacheDB returns a new CacheDB that wraps the given DB. @@ -430,9 +452,18 @@ var ( keyHeight = []byte("Height") ) +// rwDB is the subset of DB that DBStore accesses directly. A read-only +// snapshot can satisfy it (see roDB), letting a snapshot reuse DBStore's +// accessors without exposing any write methods. +type rwDB interface { + Bucket(name []byte) DBBucket + Flush() error + Snapshot() ReadonlyDB +} + // DBStore implements Store using a key-value database. type DBStore struct { - db DB + db rwDB n *consensus.Network // for getState enc types.Encoder @@ -968,14 +999,38 @@ func (db *DBStore) Flush() error { return err } -// View implements Store. It calls fn with a read-only Store backed by a -// consistent snapshot of the underlying DB. The Store passed to fn must not be -// used after fn returns. Writes performed via the Store will fail. -func (db *DBStore) View(fn func(Store)) error { - return db.db.View(func(rdb DB) error { - fn(&DBStore{db: rdb, n: db.n}) - return nil - }) +// roDB adapts a ReadonlyDB to the rwDB interface so that a read-only DBStore +// can reuse DBStore's accessors. The snapshot is never written or +// re-snapshotted, so Flush and Snapshot are inert. +type roDB struct{ ReadonlyDB } + +func (roDB) Flush() error { return nil } +func (r roDB) Snapshot() ReadonlyDB { return r.ReadonlyDB } + +// readonlyStore is a read-only Store backed by a database snapshot. It embeds a +// *DBStore for its read accessors but is only ever exposed as a ReadonlyStore, +// so its inherited write methods are unreachable. +type readonlyStore struct { + *DBStore + rdb ReadonlyDB +} + +// Close releases the underlying snapshot. +func (s *readonlyStore) Close() { + if err := s.rdb.Close(); err != nil { + panic(err) + } +} + +// Snapshot implements Store. It returns a read-only view of the store backed by +// a consistent snapshot of the underlying DB. The returned ReadonlyStore must +// be closed when finished and may be read concurrently with other snapshots. +func (db *DBStore) Snapshot() ReadonlyStore { + rdb := db.db.Snapshot() + return &readonlyStore{ + DBStore: &DBStore{db: roDB{rdb}, n: db.n}, + rdb: rdb, + } } // NewDBStore creates a new DBStore using the provided database. The tip state diff --git a/chain/manager.go b/chain/manager.go index 6eae21e6..7aaf47b5 100644 --- a/chain/manager.go +++ b/chain/manager.go @@ -67,15 +67,34 @@ type Store interface { RevertBlock(s consensus.State, cru consensus.RevertUpdate) Flush() error - // View calls fn with a read-only Store backed by a consistent snapshot - // of the underlying storage. The Store passed to fn must not be used - // after fn returns and may execute concurrently with other View calls. - View(fn func(Store)) error + // Snapshot returns a read-only, point-in-time view of the Store. It may be + // read concurrently with other snapshots and must be closed when finished. + Snapshot() ReadonlyStore +} + +// A ReadonlyStore is a read-only, point-in-time view of a Store. It exposes +// only the Store's read methods, so the underlying storage cannot be modified +// through it. +type ReadonlyStore interface { + BestIndex(height uint64) (types.ChainIndex, bool) + Block(id types.BlockID) (types.Block, *consensus.V1BlockSupplement, bool) + Header(id types.BlockID) (types.BlockHeader, bool) + State(id types.BlockID) (consensus.State, bool) + AncestorTimestamp(id types.BlockID) (time.Time, bool) + // Close releases the snapshot. + Close() +} + +// blockReader provides the reads needed by blockAndParent. Both Store and +// ReadonlyStore satisfy it. +type blockReader interface { + Block(id types.BlockID) (types.Block, *consensus.V1BlockSupplement, bool) + State(id types.BlockID) (consensus.State, bool) } // blockAndParent returns the block with the specified ID, along with its parent // state. -func blockAndParent(s Store, id types.BlockID) (types.Block, *consensus.V1BlockSupplement, consensus.State, bool) { +func blockAndParent(s blockReader, id types.BlockID) (types.Block, *consensus.V1BlockSupplement, consensus.State, bool) { b, bs, ok := s.Block(id) cs, ok2 := s.State(b.ParentID) return b, bs, cs, ok && ok2 @@ -126,15 +145,6 @@ func (m *Manager) writeUnlock() { m.mu.Unlock() } -// viewStore runs fn against a read-only snapshot of the store. It panics on -// any tx-level error, consistent with the rest of Manager's store error -// handling. -func (m *Manager) viewStore(fn func(Store)) { - if err := m.store.View(fn); err != nil { - panic(err) - } -} - // TipState returns the consensus state for the current tip. func (m *Manager) TipState() consensus.State { m.mu.RLock() @@ -148,61 +158,61 @@ func (m *Manager) Tip() types.ChainIndex { } // Block returns the block with the specified ID. -func (m *Manager) Block(id types.BlockID) (b types.Block, ok bool) { +func (m *Manager) Block(id types.BlockID) (types.Block, bool) { m.mu.RLock() defer m.mu.RUnlock() - m.viewStore(func(s Store) { - b, _, ok = s.Block(id) - }) - return + snap := m.store.Snapshot() + defer snap.Close() + b, _, ok := snap.Block(id) + return b, ok } // State returns the state with the specified ID. -func (m *Manager) State(id types.BlockID) (cs consensus.State, ok bool) { +func (m *Manager) State(id types.BlockID) (consensus.State, bool) { m.mu.RLock() defer m.mu.RUnlock() - m.viewStore(func(s Store) { - cs, ok = s.State(id) - }) - return + snap := m.store.Snapshot() + defer snap.Close() + return snap.State(id) } // BestIndex returns the index of the block at the specified height within the // best chain. -func (m *Manager) BestIndex(height uint64) (index types.ChainIndex, ok bool) { +func (m *Manager) BestIndex(height uint64) (types.ChainIndex, bool) { m.mu.RLock() defer m.mu.RUnlock() - m.viewStore(func(s Store) { - index, ok = s.BestIndex(height) - }) - return + snap := m.store.Snapshot() + defer snap.Close() + return snap.BestIndex(height) } // MinReorgIndex returns the index on the best chain below which the manager // cannot perform a reorg. -func (m *Manager) MinReorgIndex() (index types.ChainIndex) { +func (m *Manager) MinReorgIndex() types.ChainIndex { m.mu.RLock() defer m.mu.RUnlock() - m.viewStore(func(s Store) { - index = m.tipState.Index - for index.Height > 0 { - prevIndex, ok := s.BestIndex(index.Height - 1) - _, _, ok2 := s.Block(prevIndex.ID) - if !ok || !ok2 { - break - } - index = prevIndex + snap := m.store.Snapshot() + defer snap.Close() + index := m.tipState.Index + for index.Height > 0 { + prevIndex, ok := snap.BestIndex(index.Height - 1) + _, _, ok2 := snap.Block(prevIndex.ID) + if !ok || !ok2 { + break } - }) - return + index = prevIndex + } + return index } // History returns a set of block IDs that span the best chain, beginning with // the 10 most-recent blocks, and subsequently spaced exponentionally farther // apart until reaching the genesis block. -func (m *Manager) History() (history [32]types.BlockID, _ error) { +func (m *Manager) History() ([32]types.BlockID, error) { m.mu.RLock() defer m.mu.RUnlock() + snap := m.store.Snapshot() + defer snap.Close() tipHeight := m.tipState.Index.Height histHeight := func(i int) uint64 { @@ -215,46 +225,39 @@ func (m *Manager) History() (history [32]types.BlockID, _ error) { } return tipHeight - offset } - m.viewStore(func(s Store) { - for i := range history { - index, ok := s.BestIndex(histHeight(i)) - if !ok { - break - } - history[i] = index.ID + var history [32]types.BlockID + for i := range history { + index, ok := snap.BestIndex(histHeight(i)) + if !ok { + break } - }) + history[i] = index.ID + } return history, nil } // Headers returns up to max consecutive headers starting from supplied index, // which must be on the best chain. It also returns the number of headers // between the end of the returned slice and the current tip. -func (m *Manager) Headers(index types.ChainIndex, maxHeaders uint64) (headers []types.BlockHeader, remaining uint64, err error) { +func (m *Manager) Headers(index types.ChainIndex, maxHeaders uint64) ([]types.BlockHeader, uint64, error) { m.mu.RLock() defer m.mu.RUnlock() - m.viewStore(func(s Store) { - if bestIndex, ok := s.BestIndex(index.Height); !ok || bestIndex != index { - err = fmt.Errorf("index %v is not on our best chain", index) - return - } - maxHeaders = min(maxHeaders, m.tipState.Index.Height-index.Height) - headers = make([]types.BlockHeader, maxHeaders) - for i := range headers { - next, _ := s.BestIndex(index.Height + uint64(i) + 1) - bh, ok := s.Header(next.ID) - if !ok { - err = fmt.Errorf("missing block header %v", next) - return - } - headers[i] = bh + snap := m.store.Snapshot() + defer snap.Close() + if bestIndex, ok := snap.BestIndex(index.Height); !ok || bestIndex != index { + return nil, 0, fmt.Errorf("index %v is not on our best chain", index) + } + maxHeaders = min(maxHeaders, m.tipState.Index.Height-index.Height) + headers := make([]types.BlockHeader, maxHeaders) + for i := range headers { + next, _ := snap.BestIndex(index.Height + uint64(i) + 1) + bh, ok := snap.Header(next.ID) + if !ok { + return nil, 0, fmt.Errorf("missing block header %v", next) } - remaining = m.tipState.Index.Height - (index.Height + maxHeaders) - }) - if err != nil { - return nil, 0, err + headers[i] = bh } - return + return headers, m.tipState.Index.Height - (index.Height + maxHeaders), nil } // BlocksForHistory returns up to maxBlocks consecutive blocks from the best @@ -262,42 +265,36 @@ func (m *Manager) Headers(index types.ChainIndex, maxHeaders uint64) (headers [] // is present in the best chain (or, if no match is found, genesis). It also // returns the number of blocks between the end of the returned slice and the // current tip. -func (m *Manager) BlocksForHistory(history []types.BlockID, maxBlocks uint64) (blocks []types.Block, remaining uint64, err error) { +func (m *Manager) BlocksForHistory(history []types.BlockID, maxBlocks uint64) ([]types.Block, uint64, error) { m.mu.RLock() defer m.mu.RUnlock() - m.viewStore(func(s Store) { - var attachHeight uint64 - for _, id := range history { - if cs, ok := s.State(id); !ok { - continue - } else if index, ok := s.BestIndex(cs.Index.Height); ok && index == cs.Index { - attachHeight = cs.Index.Height - break - } + snap := m.store.Snapshot() + defer snap.Close() + var attachHeight uint64 + for _, id := range history { + if cs, ok := snap.State(id); !ok { + continue + } else if index, ok := snap.BestIndex(cs.Index.Height); ok && index == cs.Index { + attachHeight = cs.Index.Height + break } - if maxBlocks > m.tipState.Index.Height-attachHeight { - maxBlocks = m.tipState.Index.Height - attachHeight + } + if maxBlocks > m.tipState.Index.Height-attachHeight { + maxBlocks = m.tipState.Index.Height - attachHeight + } + blocks := make([]types.Block, maxBlocks) + for i := range blocks { + index, ok := snap.BestIndex(attachHeight + uint64(i) + 1) + if !ok { + return nil, 0, fmt.Errorf("unknown block at height %v", attachHeight+uint64(i)+1) } - blocks = make([]types.Block, maxBlocks) - for i := range blocks { - index, ok := s.BestIndex(attachHeight + uint64(i) + 1) - if !ok { - err = fmt.Errorf("unknown block at height %v", attachHeight+uint64(i)+1) - return - } - b, _, ok := s.Block(index.ID) - if !ok { - err = fmt.Errorf("missing block %v", index) - return - } - blocks[i] = b + b, _, ok := snap.Block(index.ID) + if !ok { + return nil, 0, fmt.Errorf("missing block %v", index) } - remaining = m.tipState.Index.Height - (attachHeight + maxBlocks) - }) - if err != nil { - return nil, 0, err + blocks[i] = b } - return + return blocks, m.tipState.Index.Height - (attachHeight + maxBlocks), nil } // AddBlocks ingests a chain of blocks. If the blocks are valid, the chain they @@ -615,53 +612,45 @@ func (m *Manager) PruneBlocks(height uint64) { func (m *Manager) UpdatesSince(index types.ChainIndex, maxBlocks int) (rus []RevertUpdate, aus []ApplyUpdate, err error) { m.mu.RLock() defer m.mu.RUnlock() - m.viewStore(func(s Store) { - onBestChain := func(index types.ChainIndex) bool { - bi, _ := s.BestIndex(index.Height) - return bi.ID == index.ID || index == types.ChainIndex{} - } + snap := m.store.Snapshot() + defer snap.Close() + onBestChain := func(index types.ChainIndex) bool { + bi, _ := snap.BestIndex(index.Height) + return bi.ID == index.ID || index == types.ChainIndex{} + } - for index != m.tipState.Index && len(rus)+len(aus) < maxBlocks { - // revert until we are on the best chain, then apply - if !onBestChain(index) { - b, bs, cs, ok := blockAndParent(s, index.ID) - if !ok { - err = fmt.Errorf("%w %v", ErrMissingBlock, index) - return - } else if bs == nil { - err = fmt.Errorf("missing supplement for block %v", index) - return - } - cru := consensus.RevertBlock(cs, b, *bs) - rus = append(rus, RevertUpdate{cru, b, cs}) - index = cs.Index + for index != m.tipState.Index && len(rus)+len(aus) < maxBlocks { + // revert until we are on the best chain, then apply + if !onBestChain(index) { + b, bs, cs, ok := blockAndParent(snap, index.ID) + if !ok { + return nil, nil, fmt.Errorf("%w %v", ErrMissingBlock, index) + } else if bs == nil { + return nil, nil, fmt.Errorf("missing supplement for block %v", index) + } + cru := consensus.RevertBlock(cs, b, *bs) + rus = append(rus, RevertUpdate{cru, b, cs}) + index = cs.Index + } else { + // special case: if index is uninitialized, we're starting from genesis + if index == (types.ChainIndex{}) { + index, _ = snap.BestIndex(0) } else { - // special case: if index is uninitialized, we're starting from genesis - if index == (types.ChainIndex{}) { - index, _ = s.BestIndex(0) - } else { - index, _ = s.BestIndex(index.Height + 1) - } - b, bs, cs, ok := blockAndParent(s, index.ID) - if !ok { - err = fmt.Errorf("%w %v", ErrMissingBlock, index) - return - } else if bs == nil { - err = fmt.Errorf("missing supplement for block %v", index) - return - } - ancestorTimestamp, ok := s.AncestorTimestamp(b.ParentID) - if !ok && index.Height != 0 { - err = fmt.Errorf("missing ancestor timestamp for block %v", b.ParentID) - return - } - cs, cau := consensus.ApplyBlock(cs, b, *bs, ancestorTimestamp) - aus = append(aus, ApplyUpdate{cau, b, cs}) + index, _ = snap.BestIndex(index.Height + 1) } + b, bs, cs, ok := blockAndParent(snap, index.ID) + if !ok { + return nil, nil, fmt.Errorf("%w %v", ErrMissingBlock, index) + } else if bs == nil { + return nil, nil, fmt.Errorf("missing supplement for block %v", index) + } + ancestorTimestamp, ok := snap.AncestorTimestamp(b.ParentID) + if !ok && index.Height != 0 { + return nil, nil, fmt.Errorf("missing ancestor timestamp for block %v", b.ParentID) + } + cs, cau := consensus.ApplyBlock(cs, b, *bs, ancestorTimestamp) + aus = append(aus, ApplyUpdate{cau, b, cs}) } - }) - if err != nil { - return nil, nil, err } return } diff --git a/db.go b/db.go index 804958fd..7896b962 100644 --- a/db.go +++ b/db.go @@ -78,38 +78,34 @@ func (db *BoltChainDB) Cancel() { db.tx = nil } -// View implements chain.DB. It runs fn against a read-only bbolt transaction, -// which can execute concurrently with other View calls and with the open -// writer transaction. Writes performed via the DB passed to fn will fail. -func (db *BoltChainDB) View(fn func(chain.DB) error) error { - return db.db.View(func(tx *bbolt.Tx) error { - return fn(&boltViewDB{tx: tx}) - }) +// Snapshot implements chain.DB. It begins a read-only bbolt transaction, which +// observes a consistent snapshot and can execute concurrently with other +// snapshots and with the open writer transaction. +func (db *BoltChainDB) Snapshot() chain.ReadonlyDB { + tx, err := db.db.Begin(false) + if err != nil { + panic(err) + } + return &boltSnapshot{tx: tx} } -// boltViewDB is a read-only chain.DB backed by a bbolt read transaction. -type boltViewDB struct { +// boltSnapshot is a read-only chain.ReadonlyDB backed by a bbolt read +// transaction. +type boltSnapshot struct { tx *bbolt.Tx } -func (db *boltViewDB) Bucket(name []byte) chain.DBBucket { - b := db.tx.Bucket(name) +// Bucket implements chain.ReadonlyDB. +func (s *boltSnapshot) Bucket(name []byte) chain.DBBucket { + b := s.tx.Bucket(name) if b == nil { return nil } return boltBucket{b} } -func (db *boltViewDB) CreateBucket(_ []byte) (chain.DBBucket, error) { - return nil, bbolt.ErrTxNotWritable -} - -func (db *boltViewDB) Flush() error { return nil } -func (db *boltViewDB) Cancel() {} - -func (db *boltViewDB) View(fn func(chain.DB) error) error { - return fn(db) -} +// Close implements chain.ReadonlyDB, ending the read transaction. +func (s *boltSnapshot) Close() error { return s.tx.Rollback() } // Close closes the BoltDB database. func (db *BoltChainDB) Close() error { From 1661443b490f9bd5432c3b02d9098350c4f85119 Mon Sep 17 00:00:00 2001 From: Alright Date: Tue, 16 Jun 2026 09:15:53 -0400 Subject: [PATCH 7/9] add concurrency_test.go --- chain/concurrency_test.go | 223 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 223 insertions(+) create mode 100644 chain/concurrency_test.go diff --git a/chain/concurrency_test.go b/chain/concurrency_test.go new file mode 100644 index 00000000..f3b87790 --- /dev/null +++ b/chain/concurrency_test.go @@ -0,0 +1,223 @@ +package chain_test + +import ( + "fmt" + "path/filepath" + "sync" + "testing" + "time" + + "go.sia.tech/core/types" + "go.sia.tech/coreutils" + "go.sia.tech/coreutils/chain" + "go.sia.tech/coreutils/testutil" +) + +// TestManagerConcurrentReadsWrites hammers the Manager's read methods from many +// goroutines while a writer continuously appends blocks and periodically forces +// reorgs. It exercises the RWMutex + per-reader snapshot design and, in +// particular, the flush-before-unlock invariant: once a tip is observable, the +// block and state behind it must be committed, so any later snapshot can read +// them. If a writer ever released the lock without flushing (m.mu.Unlock +// instead of m.writeUnlock), a reader could observe an advanced tip whose block +// is missing from the snapshot it reads, and the assertions below would fail. +// +// It runs against both store backends. The bbolt backend is the one that +// actually exercises the invariant: its snapshots are isolated read +// transactions that do not observe uncommitted writes, whereas MemDB has no +// MVCC and reads pending writes regardless of flushing. Run with -race to also +// catch any cross-goroutine sharing of store state. +func TestManagerConcurrentReadsWrites(t *testing.T) { + for _, tc := range []struct { + name string + makeDB func(testing.TB) chain.DB + }{ + {"MemDB", func(testing.TB) chain.DB { return chain.NewMemDB() }}, + {"BoltDB", func(tb testing.TB) chain.DB { + db, err := coreutils.OpenBoltChainDB(filepath.Join(tb.TempDir(), "consensus.db")) + if err != nil { + tb.Fatal(err) + } + tb.Cleanup(func() { db.Close() }) + return db + }}, + } { + t.Run(tc.name, func(t *testing.T) { + runConcurrentReadsWrites(t, tc.makeDB(t)) + }) + } +} + +func runConcurrentReadsWrites(t *testing.T, db chain.DB) { + n, genesis := testutil.V2Network() + store, tipState, err := chain.NewDBStore(db, n, genesis, nil) + if err != nil { + t.Fatal(err) + } + cm := chain.NewManager(store, tipState) + + // give readers some history to traverse + for i := 0; i < 20; i++ { + b, ok := coreutils.MineBlock(cm, types.VoidAddress, time.Second) + if !ok { + t.Fatal("PoW failed") + } else if err := cm.AddBlocks([]types.Block{b}); err != nil { + t.Fatal(err) + } + } + + const ( + totalBlocks = 200 + readers = 8 + reorgInterval = 20 // force a reorg roughly every N writer iterations + reorgDepth = 3 // blocks reverted per forced reorg + ) + + done := make(chan struct{}) + var doneOnce sync.Once + closeDone := func() { doneOnce.Do(func() { close(done) }) } + + var failOnce sync.Once + var failure error + fail := func(e error) { + failOnce.Do(func() { failure = e }) + closeDone() + } + + // forkReorg builds a competing chain reorgDepth+2 blocks long from an + // ancestor reorgDepth back and submits it, forcing the Manager to revert + // and reapply. The fork is mined on an independent store, so building it + // never touches cm's store; only the final AddBlocks does. + forkReorg := func() bool { + tip := cm.Tip() + if tip.Height <= reorgDepth { + return false + } + aidx, ok := cm.BestIndex(tip.Height - reorgDepth) + if !ok { + return false + } + ab, ok := cm.Block(aidx.ID) + if !ok { + return false + } + parentState, ok := cm.State(ab.ParentID) + if !ok { + return false + } + fdb, ftip, err := chain.NewDBStoreAtCheckpoint(chain.NewMemDB(), parentState, ab, nil) + if err != nil { + fail(fmt.Errorf("fork: checkpoint init: %w", err)) + return false + } + fcm := chain.NewManager(fdb, ftip) + fork := make([]types.Block, 0, reorgDepth+2) + for j := 0; j < reorgDepth+2; j++ { + b, ok := coreutils.MineBlock(fcm, types.VoidAddress, 5*time.Second) + if !ok { + fail(fmt.Errorf("fork: PoW failed")) + return false + } else if err := fcm.AddBlocks([]types.Block{b}); err != nil { + fail(fmt.Errorf("fork: AddBlocks: %w", err)) + return false + } + fork = append(fork, b) + } + // the fork is strictly heavier (cm cannot advance while this single + // writer builds it), so this triggers a reorg. + if err := cm.AddBlocks(fork); err != nil { + fail(fmt.Errorf("writer: reorg AddBlocks: %w", err)) + return false + } + return true + } + + var wg sync.WaitGroup + + // writer: extend the chain, periodically forcing a reorg. + wg.Add(1) + go func() { + defer wg.Done() + defer closeDone() + for i := 0; i < totalBlocks; i++ { + if i > 0 && i%reorgInterval == 0 { + if forkReorg() { + continue + } + select { + case <-done: + return // forkReorg failed + default: + } + } + b, ok := coreutils.MineBlock(cm, types.VoidAddress, 5*time.Second) + if !ok { + fail(fmt.Errorf("writer: PoW failed at block %d", i)) + return + } else if err := cm.AddBlocks([]types.Block{b}); err != nil { + fail(fmt.Errorf("writer: AddBlocks failed at block %d: %w", i, err)) + return + } + } + }() + + genesisIndex := types.ChainIndex{Height: 0, ID: genesis.ID()} + for r := 0; r < readers; r++ { + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-done: + return + default: + } + + // The core invariant, which survives reorgs: whatever tip the + // Manager reports, that tip's block and state must be committed, + // so a fresh snapshot can read them. This is exactly what + // flush-before-unlock guarantees. (We can't assert + // BestIndex(tip.Height) == tip here, because a reorg may land + // between observing the tip and the lookup.) + tip := cm.Tip() + if b, ok := cm.Block(tip.ID); !ok { + fail(fmt.Errorf("observed tip %v but its block is absent from the read snapshot", tip)) + return + } else if b.ID() != tip.ID { + fail(fmt.Errorf("Block(%v) returned block %v", tip.ID, b.ID())) + return + } + if _, ok := cm.State(tip.ID); !ok { + fail(fmt.Errorf("observed tip %v but its state is absent from the read snapshot", tip)) + return + } + + // multi-lookup reads (each runs in a single snapshot): must + // remain internally consistent and never error on a live best + // chain, even while reorgs are happening. + hist, err := cm.History() + if err != nil { + fail(fmt.Errorf("History: %w", err)) + return + } + if _, _, err := cm.BlocksForHistory(hist[:], 10); err != nil { + fail(fmt.Errorf("BlocksForHistory: %w", err)) + return + } + if _, _, err := cm.Headers(genesisIndex, 100); err != nil { + fail(fmt.Errorf("Headers: %w", err)) + return + } + if _, _, err := cm.UpdatesSince(types.ChainIndex{}, 10); err != nil { + fail(fmt.Errorf("UpdatesSince: %w", err)) + return + } + } + }() + } + + wg.Wait() + if failure != nil { + t.Fatal(failure) + } +} From 94d628eccc29add6575554d2666e533a0d691b9a Mon Sep 17 00:00:00 2001 From: Alright Date: Tue, 16 Jun 2026 10:54:26 -0400 Subject: [PATCH 8/9] Remove panic --- chain/db.go | 6 ++---- chain/manager.go | 4 ++-- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/chain/db.go b/chain/db.go index dbd2f443..37d67f55 100644 --- a/chain/db.go +++ b/chain/db.go @@ -1016,10 +1016,8 @@ type readonlyStore struct { } // Close releases the underlying snapshot. -func (s *readonlyStore) Close() { - if err := s.rdb.Close(); err != nil { - panic(err) - } +func (s *readonlyStore) Close() error { + return s.rdb.Close() } // Snapshot implements Store. It returns a read-only view of the store backed by diff --git a/chain/manager.go b/chain/manager.go index 7aaf47b5..1ac2cb9f 100644 --- a/chain/manager.go +++ b/chain/manager.go @@ -81,8 +81,8 @@ type ReadonlyStore interface { Header(id types.BlockID) (types.BlockHeader, bool) State(id types.BlockID) (consensus.State, bool) AncestorTimestamp(id types.BlockID) (time.Time, bool) - // Close releases the snapshot. - Close() + // Close releases the snapshot. Callers may ignore the returned error. + Close() error } // blockReader provides the reads needed by blockAndParent. Both Store and From 38d9717ab9096366dcd62dba73de187dc3f64da3 Mon Sep 17 00:00:00 2001 From: Alright Date: Tue, 16 Jun 2026 13:52:56 -0400 Subject: [PATCH 9/9] add ReadonlyDBBucket --- chain/db.go | 47 ++++++++++++++++++++++++++++++++++------------- db.go | 2 +- 2 files changed, 35 insertions(+), 14 deletions(-) diff --git a/chain/db.go b/chain/db.go index 37d67f55..79a7fa72 100644 --- a/chain/db.go +++ b/chain/db.go @@ -73,10 +73,10 @@ type DB interface { Snapshot() ReadonlyDB } -// A ReadonlyDB is a read-only, point-in-time view of a DB. Because it exposes -// no mutating methods, the underlying database cannot be modified through it. +// A ReadonlyDB is a read-only, point-in-time view of a DB. It exposes +// no mutating methods. The underlying database cannot be modified through it. type ReadonlyDB interface { - Bucket(name []byte) DBBucket + Bucket(name []byte) ReadonlyDBBucket // Close releases the resources held by the snapshot. Close() error } @@ -90,15 +90,21 @@ type noSnapshotDB struct { } } -func (s noSnapshotDB) Bucket(name []byte) DBBucket { return s.db.Bucket(name) } -func (noSnapshotDB) Close() error { return nil } +func (s noSnapshotDB) Bucket(name []byte) ReadonlyDBBucket { return s.db.Bucket(name) } +func (noSnapshotDB) Close() error { return nil } + +// A ReadonlyDBBucket is a read-only view of a DBBucket; it exposes no mutating +// methods, so a bucket obtained from a ReadonlyDB cannot be written to. +type ReadonlyDBBucket interface { + Get(key []byte) []byte + Iter() iter.Seq2[[]byte, []byte] +} // A DBBucket is a set of key-value pairs. type DBBucket interface { - Get(key []byte) []byte + ReadonlyDBBucket Put(key, value []byte) error Delete(key []byte) error - Iter() iter.Seq2[[]byte, []byte] } // MemDB implements DB with an in-memory map. @@ -395,10 +401,17 @@ func check(err error) { // dbBucket is a helper type for implementing Store. type dbBucket struct { - b DBBucket + b ReadonlyDBBucket db *DBStore } +// writable returns the bucket as a DBBucket for mutation. It is only reached +// from write paths, which only run on a writable DBStore; such a store is +// always backed by a full DB, so its buckets always implement DBBucket. +func (b *dbBucket) writable() DBBucket { + return b.b.(DBBucket) +} + func (b *dbBucket) getRaw(key []byte) []byte { if b.b == nil { return nil @@ -421,7 +434,7 @@ func (b *dbBucket) get(key []byte, v types.DecoderFrom) bool { } func (b *dbBucket) putRaw(key, value []byte) { - check(b.b.Put(key, value)) + check(b.writable().Put(key, value)) b.db.unflushed += len(value) } @@ -434,7 +447,7 @@ func (b *dbBucket) put(key []byte, v types.EncoderTo) { } func (b *dbBucket) delete(key []byte) { - check(b.b.Delete(key)) + check(b.writable().Delete(key)) b.db.unflushed += len(key) } @@ -456,11 +469,19 @@ var ( // snapshot can satisfy it (see roDB), letting a snapshot reuse DBStore's // accessors without exposing any write methods. type rwDB interface { - Bucket(name []byte) DBBucket + Bucket(name []byte) ReadonlyDBBucket Flush() error Snapshot() ReadonlyDB } +// rwView adapts a full DB to the rwDB interface, narrowing Bucket's return type +// to ReadonlyDBBucket so that DBStore's shared read path cannot mutate through +// it. Write paths recover the full DBBucket via dbBucket.writable, which is +// sound because a writable DBStore is always backed by a full DB. +type rwView struct{ DB } + +func (v rwView) Bucket(name []byte) ReadonlyDBBucket { return v.DB.Bucket(name) } + // DBStore implements Store using a key-value database. type DBStore struct { db rwDB @@ -1053,7 +1074,7 @@ func NewDBStore(db DB, n *consensus.Network, genesisBlock types.Block, logger Mi } dbs := &DBStore{ - db: db, + db: rwView{db}, n: n, } @@ -1129,7 +1150,7 @@ func NewDBStoreAtCheckpoint(db DB, cs consensus.State, b types.Block, logger Mig } dbs := &DBStore{ - db: db, + db: rwView{db}, n: cs.Network, } diff --git a/db.go b/db.go index 7896b962..518b2d10 100644 --- a/db.go +++ b/db.go @@ -96,7 +96,7 @@ type boltSnapshot struct { } // Bucket implements chain.ReadonlyDB. -func (s *boltSnapshot) Bucket(name []byte) chain.DBBucket { +func (s *boltSnapshot) Bucket(name []byte) chain.ReadonlyDBBucket { b := s.tx.Bucket(name) if b == nil { return nil