From 607777952214e2dfc3fac8c617de2f53c760e287 Mon Sep 17 00:00:00 2001 From: Vladimir Kavlakan Date: Mon, 8 Jun 2026 15:56:21 +0200 Subject: [PATCH 1/4] Add block.Manager with reload, retention and queryable support Port reloadBlocks into a standalone pp/go/storage/block.Manager that reloads persisted blocks, applies retention via an injected tsdb.BlocksToDeleteFunc, and implements storage.Queryable/ChunkQueryable. Refactor pp-pkg/tsdb to a DB-free NewBlocksToDelete constructor that owns its retention metrics and limit gauges, and expose CatalogHeadsSize / CatalogHeadsExtraSize helpers. Add a tsdb.OpenBlocks wrapper. Co-authored-by: Cursor --- pp-pkg/tsdb/db.go | 134 +++++++++--- pp/go/storage/block/manager.go | 378 +++++++++++++++++++++++++++++++++ tsdb/external.go | 29 +-- 3 files changed, 496 insertions(+), 45 deletions(-) create mode 100644 pp/go/storage/block/manager.go diff --git a/pp-pkg/tsdb/db.go b/pp-pkg/tsdb/db.go index 8d156c12ed..c0a88f2674 100644 --- a/pp-pkg/tsdb/db.go +++ b/pp-pkg/tsdb/db.go @@ -3,24 +3,87 @@ package tsdb import ( "path/filepath" "slices" + "time" "github.com/oklog/ulid" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/pp/go/storage/catalog" "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/tsdb/fileutil" ) -// PPBlocksToDelete returns a filter which decides time based and size based -// retention from the options of the db. -// This is copy of tsdb.DefaultBlocksToDelete function with modifications such as calculation prompp heads size. -func PPBlocksToDelete(db *tsdb.DB, dir string, catalog *catalog.Catalog) tsdb.BlocksToDeleteFunc { +// Metrics holds the retention constraints and counters for a BlocksToDeleteFunc +// that owns no *tsdb.DB. They mirror the corresponding tsdb dbMetrics so the +// DB-free path reports the same series. +type Metrics struct { + timeRetentions prometheus.Counter + sizeRetentions prometheus.Counter + maxBytes prometheus.Gauge + retentionDuration prometheus.Gauge +} + +// NewMetrics creates the retention metrics and registers them when r is not nil. +func NewMetrics(r prometheus.Registerer) *Metrics { + m := &Metrics{ + timeRetentions: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_tsdb_time_retentions_total", + Help: "The number of times that blocks were deleted because the maximum time limit was exceeded.", + }), + sizeRetentions: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_tsdb_size_retentions_total", + Help: "The number of times that blocks were deleted because the maximum number of bytes was exceeded.", + }), + maxBytes: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "prometheus_tsdb_retention_limit_bytes", + Help: "Max number of bytes to be retained in the tsdb blocks, configured 0 means disabled", + }), + retentionDuration: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "prometheus_tsdb_retention_limit_seconds", + Help: "How long to retain samples in storage.", + }), + } + + if r != nil { + r.MustRegister(m.timeRetentions, m.sizeRetentions, m.maxBytes, m.retentionDuration) + } + + return m +} + +// NewBlocksToDelete returns a filter which decides time based and size based +// retention. It does not depend on a *tsdb.DB, so it can be used by components +// that own no DB instance. It creates and registers its own retention metrics +// (counters + limit gauges) via r and reports the configured constraints. +// +// extraSize reports bytes used outside of the given blocks (e.g. heads + catalog) +// and may be nil. +func NewBlocksToDelete( + retentionDuration, maxBytes int64, + extraSize func() int64, + r prometheus.Registerer, +) tsdb.BlocksToDeleteFunc { + m := NewMetrics(r) + + // Report the configured retention constraints, mirroring tsdb.DB.open. + limitBytes := maxBytes + if limitBytes < 0 { + limitBytes = 0 + } + m.maxBytes.Set(float64(limitBytes)) + m.retentionDuration.Set((time.Duration(retentionDuration) * time.Millisecond).Seconds()) + return func(blocks []*tsdb.Block) map[ulid.ULID]struct{} { - return deletableBlocks(db, dir, catalog, blocks) + return deletableBlocks(retentionDuration, maxBytes, extraSize, m.timeRetentions, m.sizeRetentions, blocks) } } // deletableBlocks returns all currently loaded blocks past retention policy or already compacted into a new block. -func deletableBlocks(db *tsdb.DB, dir string, catalog *catalog.Catalog, blocks []*tsdb.Block) map[ulid.ULID]struct{} { +func deletableBlocks( + retentionDuration, maxBytes int64, + extraSize func() int64, + timeRetentions, sizeRetentions prometheus.Counter, + blocks []*tsdb.Block, +) map[ulid.ULID]struct{} { deletable := make(map[ulid.ULID]struct{}) // Sort the blocks by time - newest to oldest (largest to smallest timestamp). @@ -42,22 +105,25 @@ func deletableBlocks(db *tsdb.DB, dir string, catalog *catalog.Catalog, blocks [ } } - for ulid := range BeyondTimeRetention(db, blocks) { + for ulid := range BeyondTimeRetention(retentionDuration, timeRetentions, blocks) { deletable[ulid] = struct{}{} } - for ulid := range BeyondSizeRetention(db, dir, catalog, blocks) { + for ulid := range BeyondSizeRetention(maxBytes, extraSize, sizeRetentions, blocks) { deletable[ulid] = struct{}{} } return deletable } -// BeyondTimeRetention returns those blocks which are beyond the time retention -// set in the db options. -func BeyondTimeRetention(db *tsdb.DB, blocks []*tsdb.Block) (deletable map[ulid.ULID]struct{}) { +// BeyondTimeRetention returns those blocks which are beyond the time retention. +func BeyondTimeRetention( + retentionDuration int64, + timeRetentions prometheus.Counter, + blocks []*tsdb.Block, +) (deletable map[ulid.ULID]struct{}) { // Time retention is disabled or no blocks to work with. - if len(blocks) == 0 || tsdb.DBOpts(db).RetentionDuration == 0 { + if len(blocks) == 0 || retentionDuration == 0 { return } @@ -65,45 +131,65 @@ func BeyondTimeRetention(db *tsdb.DB, blocks []*tsdb.Block) (deletable map[ulid. for i, block := range blocks { // The difference between the first block and this block is greater than or equal to // the retention period so any blocks after that are added as deletable. - if i > 0 && blocks[0].Meta().MaxTime-block.Meta().MaxTime >= tsdb.DBOpts(db).RetentionDuration { + if i > 0 && blocks[0].Meta().MaxTime-block.Meta().MaxTime >= retentionDuration { for _, b := range blocks[i:] { deletable[b.Meta().ULID] = struct{}{} } - tsdb.DBTimeRetentionCount(db).Inc() + if timeRetentions != nil { + timeRetentions.Inc() + } break } } return deletable } -// BeyondSizeRetention returns those blocks which are beyond the size retention -// set in the db options. -func BeyondSizeRetention(db *tsdb.DB, dir string, catalog *catalog.Catalog, blocks []*tsdb.Block) (deletable map[ulid.ULID]struct{}) { +// BeyondSizeRetention returns those blocks which are beyond the size retention. +func BeyondSizeRetention( + maxBytes int64, + extraSize func() int64, + sizeRetentions prometheus.Counter, + blocks []*tsdb.Block, +) (deletable map[ulid.ULID]struct{}) { // Size retention is disabled or no blocks to work with. - if len(blocks) == 0 || tsdb.DBOpts(db).MaxBytes <= 0 { + if len(blocks) == 0 || maxBytes <= 0 { return } deletable = make(map[ulid.ULID]struct{}) - // Initializing size counter with catalog size - blocksSize := catalogHeadsSize(dir, catalog) - blocksSize += db.Head().Size() + // Initializing size counter with the injected extra size (heads + catalog). + var blocksSize int64 + if extraSize != nil { + blocksSize = extraSize() + } for i, block := range blocks { blocksSize += block.Size() - if blocksSize > tsdb.DBOpts(db).MaxBytes { + if blocksSize > maxBytes { // Add this and all following blocks for deletion. for _, b := range blocks[i:] { deletable[b.Meta().ULID] = struct{}{} } - tsdb.DBSizeRetentionCount(db).Inc() + if sizeRetentions != nil { + sizeRetentions.Inc() + } break } } return deletable } -func catalogHeadsSize(dir string, catalog *catalog.Catalog) (catalogSize int64) { +// CatalogHeadsExtraSize adapts [CatalogHeadsSize] into an extraSize function +// (func() int64) suitable for passing to [NewBlocksToDelete]. +func CatalogHeadsExtraSize(dir string, catalog *catalog.Catalog) func() int64 { + return func() int64 { + return CatalogHeadsSize(dir, catalog) + } +} + +// CatalogHeadsSize returns the on-disk size of the catalog and all of its heads. +// It is useful to build the extraSize function passed to NewBlocksToDelete. +func CatalogHeadsSize(dir string, catalog *catalog.Catalog) (catalogSize int64) { catalogSize += catalog.OnDiskSize() heads := catalog.List(nil, nil) for _, h := range heads { diff --git a/pp/go/storage/block/manager.go b/pp/go/storage/block/manager.go new file mode 100644 index 0000000000..9f602fb3d8 --- /dev/null +++ b/pp/go/storage/block/manager.go @@ -0,0 +1,378 @@ +package block + +import ( + "fmt" + "os" + "path/filepath" + "slices" + "sync" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/oklog/ulid" + "github.com/prometheus/client_golang/prometheus" + + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb" + "github.com/prometheus/prometheus/tsdb/chunkenc" + "github.com/prometheus/prometheus/tsdb/fileutil" +) + +var ( + _ storage.Queryable = (*Manager)(nil) + _ storage.ChunkQueryable = (*Manager)(nil) +) + +const ( + tmpForDeletionBlockDirSuffix = ".tmp-for-deletion" + reloadBlocksInterval = time.Minute +) + +// Options configures block reload, mirroring the relevant tsdb.Options fields. +type Options struct { + // RetentionDuration is the time retention in milliseconds, used for the corrupted-block outdated check. + RetentionDuration int64 + // CorruptedRetentionDuration is the duration of the retention for corrupted blocks. + CorruptedRetentionDuration time.Duration + // EnableOverlappingCompaction enables warning about overlapping blocks on reload. + EnableOverlappingCompaction bool +} + +// Manager reloads and applies retention to persisted blocks on disk. +type Manager struct { + dir string + opts *Options + blocksToDelete tsdb.BlocksToDeleteFunc + logger log.Logger + chunkPool chunkenc.Pool + metrics *metrics + + mtx sync.RWMutex + blocks []*tsdb.Block + + stopc chan struct{} + stoppedc chan struct{} + stopOnce sync.Once +} + +// NewManager init new [Manager] and starts its periodic reload loop. +// +// blocksToDelete is the retention filter (e.g. built via pp-pkg/tsdb.NewBlocksToDelete); +// it may be nil, in which case no blocks are deleted by retention. +func NewManager( + dir string, + opts *Options, + blocksToDelete tsdb.BlocksToDeleteFunc, + logger log.Logger, + r prometheus.Registerer, +) *Manager { + if opts == nil { + opts = &Options{} + } + if logger == nil { + logger = log.NewNopLogger() + } + + m := &Manager{ + dir: dir, + opts: opts, + blocksToDelete: blocksToDelete, + logger: logger, + chunkPool: chunkenc.NewPool(), + metrics: newMetrics(r), + stopc: make(chan struct{}), + stoppedc: make(chan struct{}), + } + go m.loop() + return m +} + +func (m *Manager) loop() { + defer func() { + close(m.stoppedc) + }() + + ticker := time.NewTicker(reloadBlocksInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + if err := m.reloadBlocks(); err != nil { + level.Error(m.logger).Log("msg", "periodic reload blocks failed", "err", err) + } + + case <-m.stopc: + return + } + } +} + +// Close stops the reload loop and waits for it to finish. +func (m *Manager) Close() { + m.stopOnce.Do(func() { + close(m.stopc) + }) + <-m.stoppedc +} + +// Querier returns a new querier over the persisted blocks overlapping the time +// range [mint, maxt]. It implements [storage.Queryable]. +func (m *Manager) Querier(mint, maxt int64) (_ storage.Querier, err error) { + m.mtx.RLock() + defer m.mtx.RUnlock() + + blockQueriers := make([]storage.Querier, 0, len(m.blocks)) + defer func() { + if err != nil { + // If we fail, all previously opened queriers must be closed. + for _, q := range blockQueriers { + _ = q.Close() + } + } + }() + + for _, b := range m.blocks { + if !b.OverlapsClosedInterval(mint, maxt) { + continue + } + q, err := tsdb.NewBlockQuerier(b, mint, maxt) + if err != nil { + return nil, fmt.Errorf("open querier for block %s: %w", b, err) + } + blockQueriers = append(blockQueriers, q) + } + + return storage.NewMergeQuerier(blockQueriers, nil, storage.ChainedSeriesMerge), nil +} + +// ChunkQuerier returns a new chunk querier over the persisted blocks overlapping +// the time range [mint, maxt]. It implements [storage.ChunkQueryable]. +func (m *Manager) ChunkQuerier(mint, maxt int64) (_ storage.ChunkQuerier, err error) { + m.mtx.RLock() + defer m.mtx.RUnlock() + + blockQueriers := make([]storage.ChunkQuerier, 0, len(m.blocks)) + defer func() { + if err != nil { + // If we fail, all previously opened queriers must be closed. + for _, q := range blockQueriers { + _ = q.Close() + } + } + }() + + for _, b := range m.blocks { + if !b.OverlapsClosedInterval(mint, maxt) { + continue + } + q, err := tsdb.NewBlockChunkQuerier(b, mint, maxt) + if err != nil { + return nil, fmt.Errorf("open chunk querier for block %s: %w", b, err) + } + blockQueriers = append(blockQueriers, q) + } + + return storage.NewMergeChunkQuerier( + blockQueriers, + nil, + storage.NewCompactingChunkSeriesMerger(storage.ChainedSeriesMerge), + ), nil +} + +// reloadBlocks reloads blocks from disk and deletes the ones past retention. +// +//revive:disable-next-line:cyclomatic // ported from tsdb.DB.reloadBlocks. +func (m *Manager) reloadBlocks() (err error) { + defer func() { + if err != nil { + m.metrics.reloadsFailed.Inc() + } + m.metrics.reloads.Inc() + }() + + m.mtx.Lock() + defer m.mtx.Unlock() + + loadable, corrupted, err := tsdb.OpenBlocks(m.logger, m.dir, m.blocks, m.chunkPool) + if err != nil { + return err + } + + var deletableULIDs map[ulid.ULID]struct{} + if m.blocksToDelete != nil { + deletableULIDs = m.blocksToDelete(loadable) + } + deletable := make(map[ulid.ULID]*tsdb.Block, len(deletableULIDs)) + + // Mark all parents of loaded blocks as deletable (no matter if they exists). This makes it resilient against the process + // crashing towards the end of a compaction but before deletions. By doing that, we can pick up the deletion where it left off during a crash. + for _, block := range loadable { + if _, ok := deletableULIDs[block.Meta().ULID]; ok { + deletable[block.Meta().ULID] = block + } + for _, b := range block.Meta().Compaction.Parents { + if _, ok := corrupted[b.ULID]; ok { + delete(corrupted, b.ULID) + level.Warn(m.logger).Log("msg", "Found corrupted block, but replaced by compacted one so it's safe to delete. This should not happen with atomic deletes.", "block", b.ULID) + } + deletable[b.ULID] = nil + } + } + + m.metrics.corruptedBlocks.Set(float64(len(corrupted))) + for uid, cerr := range corrupted { + // check if the block is outdated, if it is, delete the block. + isOutdated := m.isOutdatedBlock( + uid, + min( + time.Duration(m.opts.RetentionDuration)*time.Millisecond, + m.opts.CorruptedRetentionDuration, + ), + ) + + if isOutdated { + deletable[uid] = nil + } + + level.Warn(m.logger).Log( + "msg", "corrupted block", + "ulid", uid.String(), + "err", cerr, + "isOutdated", isOutdated, + ) + } + + var ( + toLoad []*tsdb.Block + blocksSize int64 + ) + // All deletable blocks should be unloaded. + // NOTE: We need to loop through loadable one more time as there might be loadable ready to be removed (replaced by compacted block). + for _, block := range loadable { + if _, ok := deletable[block.Meta().ULID]; ok { + deletable[block.Meta().ULID] = block + continue + } + + toLoad = append(toLoad, block) + blocksSize += block.Size() + } + m.metrics.blocksBytes.Set(float64(blocksSize)) + + slices.SortFunc(toLoad, func(a, b *tsdb.Block) int { + switch { + case a.Meta().MinTime < b.Meta().MinTime: + return -1 + case a.Meta().MinTime > b.Meta().MinTime: + return 1 + default: + return 0 + } + }) + + // Swap new blocks first for subsequently created readers to be seen. + oldBlocks := m.blocks + m.blocks = toLoad + + // Only check overlapping blocks when overlapping compaction is enabled. + if m.opts.EnableOverlappingCompaction { + blockMetas := make([]tsdb.BlockMeta, 0, len(toLoad)) + for _, b := range toLoad { + blockMetas = append(blockMetas, b.Meta()) + } + if overlaps := tsdb.OverlappingBlocks(blockMetas); len(overlaps) > 0 { + level.Warn(m.logger).Log("msg", "Overlapping blocks found during reloadBlocks", "detail", overlaps.String()) + } + } + + // Append blocks to old, deletable blocks, so we can close them. + for _, b := range oldBlocks { + if _, ok := deletable[b.Meta().ULID]; ok { + deletable[b.Meta().ULID] = b + } + } + if err := m.deleteBlocks(deletable); err != nil { + return fmt.Errorf("delete %v blocks: %w", len(deletable), err) + } + return nil +} + +func (m *Manager) deleteBlocks(blocks map[ulid.ULID]*tsdb.Block) error { + for uid, block := range blocks { + if block != nil { + if err := block.Close(); err != nil { + level.Warn(m.logger).Log("msg", "Closing block failed", "err", err, "block", uid) + } + } + + toDelete := filepath.Join(m.dir, uid.String()) + switch _, err := os.Stat(toDelete); { + case os.IsNotExist(err): + // Noop. + continue + case err != nil: + return fmt.Errorf("stat dir %v: %w", toDelete, err) + } + + // Replace atomically to avoid partial block when process would crash during deletion. + tmpToDelete := filepath.Join(m.dir, fmt.Sprintf("%s%s", uid, tmpForDeletionBlockDirSuffix)) + if err := fileutil.Replace(toDelete, tmpToDelete); err != nil { + return fmt.Errorf("replace of obsolete block for deletion %s: %w", uid, err) + } + if err := os.RemoveAll(tmpToDelete); err != nil { + return fmt.Errorf("delete obsolete block %s: %w", uid, err) + } + level.Info(m.logger).Log("msg", "Deleting obsolete block", "block", uid) + } + + return nil +} + +func (m *Manager) isOutdatedBlock(id ulid.ULID, retentionDuration time.Duration) bool { + return id.Time() < uint64(time.Now().Add(-retentionDuration).UnixMilli()) +} + +// +// metrics +// + +type metrics struct { + reloads prometheus.Counter + reloadsFailed prometheus.Counter + corruptedBlocks prometheus.Gauge + blocksBytes prometheus.Gauge +} + +func newMetrics(r prometheus.Registerer) *metrics { + m := &metrics{ + reloads: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_tsdb_reloads_total", + Help: "Number of times the database reloaded block data from disk.", + }), + reloadsFailed: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_tsdb_reloads_failures_total", + Help: "Number of times the database failed to reloadBlocks block data from disk.", + }), + corruptedBlocks: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "prometheus_tsdb_corrupted_blocks", + Help: "The number of corrupted blocks.", + }), + blocksBytes: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "prometheus_tsdb_storage_blocks_bytes", + Help: "The number of bytes that are currently used for local storage by all blocks.", + }), + } + + if r != nil { + r.MustRegister( + m.reloads, + m.reloadsFailed, + m.corruptedBlocks, + m.blocksBytes, + ) + } + + return m +} diff --git a/tsdb/external.go b/tsdb/external.go index 239c49e115..68002520ec 100644 --- a/tsdb/external.go +++ b/tsdb/external.go @@ -1,26 +1,13 @@ package tsdb -import "github.com/prometheus/client_golang/prometheus" +import ( + "github.com/go-kit/log" + "github.com/oklog/ulid" -// DBOpts exposes db.opts (usage: pp-pkg/tsdb). -func DBOpts(db *DB) *Options { - return db.opts -} - -// DBTimeRetentionCount exposes time retention metric (usage: pp-pkg/tsdb). -func DBTimeRetentionCount(db *DB) prometheus.Counter { - return db.metrics.timeRetentionCount -} - -// DBSizeRetentionCount exposes size retention metric (usage: pp-pkg/tsdb). -func DBSizeRetentionCount(db *DB) prometheus.Counter { - return db.metrics.sizeRetentionCount -} - -// DBSetBlocksToDelete safely injects blocksToDelete closure (usage cmd/prometheus/main.go). -func DBSetBlocksToDelete(db *DB, blocksToDelete BlocksToDeleteFunc) { - db.cmtx.Lock() - defer db.cmtx.Unlock() + "github.com/prometheus/prometheus/tsdb/chunkenc" +) - db.blocksToDelete = blocksToDelete +// OpenBlocks loads all blocks from dir, reusing already-loaded ones (usage: pp/go/storage/block). +func OpenBlocks(l log.Logger, dir string, loaded []*Block, chunkPool chunkenc.Pool) ([]*Block, map[ulid.ULID]error, error) { + return openBlocks(l, dir, loaded, chunkPool) } From e23c29632b5fc184bb23755c574857a6bec9eb62 Mon Sep 17 00:00:00 2001 From: Vladimir Kavlakan Date: Mon, 8 Jun 2026 16:36:50 +0200 Subject: [PATCH 2/4] Implement Blocks method in block.Manager to return currently loaded blocks This update adds the Blocks method to the Manager struct, which provides a snapshot of the currently loaded blocks, implementing the BlockSource interface. The method ensures thread-safe access to the blocks using read locks. --- pp/go/storage/block/compactor.go | 190 +++++++++++++++++++++++++++++++ pp/go/storage/block/manager.go | 9 ++ 2 files changed, 199 insertions(+) create mode 100644 pp/go/storage/block/compactor.go diff --git a/pp/go/storage/block/compactor.go b/pp/go/storage/block/compactor.go new file mode 100644 index 0000000000..fae30ee015 --- /dev/null +++ b/pp/go/storage/block/compactor.go @@ -0,0 +1,190 @@ +package block + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/prometheus/client_golang/prometheus" + + "github.com/prometheus/prometheus/tsdb" + "github.com/prometheus/prometheus/tsdb/chunkenc" +) + +const compactionInterval = time.Minute + +// CompactorOptions configures the persisted-blocks compactor. +type CompactorOptions struct { + // MinBlockDuration is the smallest block range, used to derive the + // exponential compaction ranges. If zero, tsdb.DefaultBlockDuration is used. + MinBlockDuration int64 + // MaxBlockChunkSegmentSize is the max block chunk segment size. + MaxBlockChunkSegmentSize int64 + // EnableOverlappingCompaction enables compaction of overlapping blocks. + EnableOverlappingCompaction bool + // CompactionInterval is the period of the background compaction loop. + // If zero, compactionInterval is used. + CompactionInterval time.Duration +} + +// BlockSource provides the compactor with the currently loaded blocks. +// It is implemented by Manager. +type BlockSource interface { + // Blocks returns a snapshot of the currently loaded blocks (the open + // argument for Compact). + Blocks() []*tsdb.Block +} + +// Compactor periodically compacts persisted on-disk blocks. It does not reload +// blocks itself: the new block is loaded and the compacted parents are deleted +// by the periodic reload loop of the block source (e.g. Manager). +type Compactor struct { + dir string + compactor tsdb.Compactor + source BlockSource + interval time.Duration + logger log.Logger + metrics *compactorMetrics + + stopc chan struct{} + stoppedc chan struct{} + stopOnce sync.Once +} + +// NewCompactor builds a LeveledCompactor from opts and starts the background +// compaction loop. +func NewCompactor( + ctx context.Context, + dir string, + opts *CompactorOptions, + source BlockSource, + logger log.Logger, + r prometheus.Registerer, +) (*Compactor, error) { + if opts == nil { + opts = &CompactorOptions{} + } + if logger == nil { + logger = log.NewNopLogger() + } + + minBlockDuration := opts.MinBlockDuration + if minBlockDuration <= 0 { + minBlockDuration = tsdb.DefaultBlockDuration + } + interval := opts.CompactionInterval + if interval <= 0 { + interval = compactionInterval + } + + rngs := tsdb.ExponentialBlockRanges(minBlockDuration, 10, 3) + leveled, err := tsdb.NewLeveledCompactorWithOptions(ctx, r, logger, rngs, chunkenc.NewPool(), tsdb.LeveledCompactorOptions{ + MaxBlockChunkSegmentSize: opts.MaxBlockChunkSegmentSize, + EnableOverlappingCompaction: opts.EnableOverlappingCompaction, + }) + if err != nil { + return nil, fmt.Errorf("create compactor: %w", err) + } + + c := &Compactor{ + dir: dir, + compactor: leveled, + source: source, + interval: interval, + logger: logger, + metrics: newCompactorMetrics(r), + stopc: make(chan struct{}), + stoppedc: make(chan struct{}), + } + go c.loop() + return c, nil +} + +func (c *Compactor) loop() { + defer func() { + close(c.stoppedc) + }() + + ticker := time.NewTicker(c.interval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + c.metrics.compactionsTriggered.Inc() + if err := c.compactBlocks(); err != nil { + c.metrics.compactionsFailed.Inc() + level.Error(c.logger).Log("msg", "compaction failed", "err", err) + } + + case <-c.stopc: + return + } + } +} + +// Close stops the compaction loop and waits for it to finish. +func (c *Compactor) Close() { + c.stopOnce.Do(func() { + close(c.stopc) + }) + <-c.stoppedc +} + +// compactBlocks compacts at most one planned group of eligible on-disk blocks. +// It does not reload blocks: the periodic reload loop of the block source loads +// the new block and deletes the compacted parents. +func (c *Compactor) compactBlocks() error { + plan, err := c.compactor.Plan(c.dir) + if err != nil { + return fmt.Errorf("plan compaction: %w", err) + } + if len(plan) == 0 { + return nil + } + + select { + case <-c.stopc: + return nil + default: + } + + if _, err := c.compactor.Compact(c.dir, plan, c.source.Blocks()); err != nil { + return fmt.Errorf("compact %s: %w", plan, err) + } + return nil +} + +// +// metrics +// + +type compactorMetrics struct { + compactionsTriggered prometheus.Counter + compactionsFailed prometheus.Counter +} + +func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics { + m := &compactorMetrics{ + compactionsTriggered: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_tsdb_compactions_triggered_total", + Help: "Total number of triggered compactions for the partition.", + }), + compactionsFailed: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_tsdb_compactions_failed_total", + Help: "Total number of compactions that failed for the partition.", + }), + } + + if r != nil { + r.MustRegister( + m.compactionsTriggered, + m.compactionsFailed, + ) + } + + return m +} diff --git a/pp/go/storage/block/manager.go b/pp/go/storage/block/manager.go index 9f602fb3d8..7f72328a11 100644 --- a/pp/go/storage/block/manager.go +++ b/pp/go/storage/block/manager.go @@ -22,6 +22,7 @@ import ( var ( _ storage.Queryable = (*Manager)(nil) _ storage.ChunkQueryable = (*Manager)(nil) + _ BlockSource = (*Manager)(nil) ) const ( @@ -181,6 +182,14 @@ func (m *Manager) ChunkQuerier(mint, maxt int64) (_ storage.ChunkQuerier, err er ), nil } +// Blocks returns a snapshot of the currently loaded blocks. It implements +// [BlockSource]. +func (m *Manager) Blocks() []*tsdb.Block { + m.mtx.RLock() + defer m.mtx.RUnlock() + return slices.Clone(m.blocks) +} + // reloadBlocks reloads blocks from disk and deletes the ones past retention. // //revive:disable-next-line:cyclomatic // ported from tsdb.DB.reloadBlocks. From 8914619183355be2e1af29296feb6c71b6853b45 Mon Sep 17 00:00:00 2001 From: Vladimir Kavlakan Date: Mon, 8 Jun 2026 17:10:04 +0200 Subject: [PATCH 3/4] Wire block.Manager and block.Compactor into main, disable tsdb In server mode, stop opening tsdb.DB and instead run block.Manager (persisted block reads + retention) and block.Compactor (compaction). block.Manager is plugged into the fanout via a querier-only storage.Storage adapter; localStorage stays an empty stub. Replace the TSDB run-group actor with a lifecycle actor and drop the dead openDBWithMetrics and its obsolete TestTimeMetrics. Co-authored-by: Cursor --- cmd/prometheus/main.go | 210 ++++++++++++++++++++++-------------- cmd/prometheus/main_test.go | 65 ----------- 2 files changed, 131 insertions(+), 144 deletions(-) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index d7f6e5ca0e..bcfc8de07a 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -69,6 +69,7 @@ import ( pp_pkg_tsdb "github.com/prometheus/prometheus/pp-pkg/tsdb" // PP_CHANGES.md: rebuild on cpp pp_storage "github.com/prometheus/prometheus/pp/go/storage" // PP_CHANGES.md: rebuild on cpp + block "github.com/prometheus/prometheus/pp/go/storage/block" // PP_CHANGES.md: rebuild on cpp "github.com/prometheus/prometheus/pp/go/storage/catalog" // PP_CHANGES.md: rebuild on cpp "github.com/prometheus/prometheus/pp/go/storage/head/head" // PP_CHANGES.md: rebuild on cpp "github.com/prometheus/prometheus/pp/go/storage/querier" // PP_CHANGES.md: rebuild on cpp @@ -817,23 +818,69 @@ func main() { // PP_CHANGES.md: rebuild on cpp end - var ( - localStorage = &readyStorage{stats: tsdb.NewDBStats()} - scraper = &readyScrapeManager{} + localStorage := &readyStorage{stats: tsdb.NewDBStats()} + scraper := &readyScrapeManager{} - // PP_CHANGES.md: rebuild on cpp start - remoteRead = pp_pkg_remote.NewRemoteRead( - log.With(logger, "component", "remote"), - localStorage.StartTime, - ) - fanoutStorage = storage.NewFanout( - logger, - adapter, - localStorage, - remoteRead, + // PP_CHANGES.md: rebuild on cpp start + // In server mode the persisted blocks are managed by block.Manager (read + + // retention) and block.Compactor (compaction) instead of a tsdb.DB. The + // block.Manager is wired into the fanout via a querier-only storage.Storage + // adapter; localStorage stays an empty stub. In agent mode the secondary is + // still localStorage (agent.DB set later). + var ( + blockManager *block.Manager + blockCompactor *block.Compactor + compactCancel context.CancelFunc + persistedStorage storage.Storage = localStorage + startTimeFn func() (int64, error) = localStorage.StartTime + ) + if !agentMode { + retentionMs := int64(time.Duration(cfg.tsdb.RetentionDuration) / time.Millisecond) + blocksToDelete := pp_pkg_tsdb.NewBlocksToDelete( + retentionMs, + int64(cfg.tsdb.MaxBytes), + pp_pkg_tsdb.CatalogHeadsExtraSize(dataDir, headCatalog), + prometheus.DefaultRegisterer, ) - // PP_CHANGES.md: rebuild on cpp end + blockManager = block.NewManager(localStoragePath, &block.Options{ + RetentionDuration: retentionMs, + CorruptedRetentionDuration: time.Duration(cfg.tsdb.CorruptedRetentionDuration), + EnableOverlappingCompaction: cfg.tsdb.EnableOverlappingCompaction, + }, blocksToDelete, log.With(logger, "component", "blockmanager"), prometheus.DefaultRegisterer) + + var compactCtx context.Context + compactCtx, compactCancel = context.WithCancel(context.Background()) + blockCompactor, err = block.NewCompactor(compactCtx, localStoragePath, &block.CompactorOptions{ + MinBlockDuration: int64(time.Duration(cfg.tsdb.MinBlockDuration) / time.Millisecond), + MaxBlockChunkSegmentSize: int64(cfg.tsdb.MaxBlockChunkSegmentSize), + EnableOverlappingCompaction: cfg.tsdb.EnableOverlappingCompaction, + }, blockManager, log.With(logger, "component", "blockcompactor"), prometheus.DefaultRegisterer) + if err != nil { + level.Error(logger).Log("msg", "failed to create block compactor", "err", err) + os.Exit(1) + } + + bs := &blockStorage{m: blockManager, onClose: func() error { + blockCompactor.Close() + blockManager.Close() + compactCancel() + return nil + }} + persistedStorage = bs + startTimeFn = bs.StartTime + } + + remoteRead := pp_pkg_remote.NewRemoteRead( + log.With(logger, "component", "remote"), + startTimeFn, + ) + fanoutStorage := storage.NewFanout( + logger, + adapter, + persistedStorage, + remoteRead, ) + // PP_CHANGES.md: rebuild on cpp end var ( ctxWeb, cancelWeb = context.WithCancel(context.Background()) @@ -1371,30 +1418,15 @@ func main() { ) } if !agentMode { - // TSDB. - opts := cfg.tsdb.ToTSDBOptions() - opts.StripeSize = 1 // PP_CHANGES.md: rebuild on cpp + // Persisted block storage (block.Manager + block.Compactor). The tsdb.DB + // is disabled in server mode; persisted blocks are read via block.Manager + // and compacted by block.Compactor, both started above. This actor only + // signals readiness and tears the storage down on shutdown. + // PP_CHANGES.md: rebuild on cpp cancel := make(chan struct{}) g.Add( func() error { - level.Info(logger).Log("msg", "Starting TSDB ...") - if cfg.tsdb.WALSegmentSize != 0 { - if cfg.tsdb.WALSegmentSize < 10*1024*1024 || cfg.tsdb.WALSegmentSize > 256*1024*1024 { - return errors.New("flag 'storage.tsdb.wal-segment-size' must be set between 10MB and 256MB") - } - } - if cfg.tsdb.MaxBlockChunkSegmentSize != 0 { - if cfg.tsdb.MaxBlockChunkSegmentSize < 1024*1024 { - return errors.New("flag 'storage.tsdb.max-block-chunk-segment-size' must be set over 1MB") - } - } - - db, err := openDBWithMetrics(localStoragePath, logger, prometheus.DefaultRegisterer, &opts, localStorage.getStats()) - if err != nil { - return fmt.Errorf("opening storage failed: %w", err) - } - - tsdb.DBSetBlocksToDelete(db, pp_pkg_tsdb.PPBlocksToDelete(db, dataDir, headCatalog)) + level.Info(logger).Log("msg", "Starting persisted block storage ...") switch fsType := prom_runtime.Statfs(localStoragePath); fsType { case "NFS_SUPER_MAGIC": level.Warn(logger).Log("fs_type", fsType, "msg", "This filesystem is not supported and may lead to data corruption and data loss. Please carefully read https://prometheus.io/docs/prometheus/latest/storage/ to learn more about supported filesystems.") @@ -1402,26 +1434,21 @@ func main() { level.Info(logger).Log("fs_type", fsType) } - level.Info(logger).Log("msg", "TSDB started") - level.Debug(logger).Log("msg", "TSDB options", + level.Info(logger).Log("msg", "Persisted block storage started") + level.Debug(logger).Log("msg", "Block storage options", "MinBlockDuration", cfg.tsdb.MinBlockDuration, - "MaxBlockDuration", cfg.tsdb.MaxBlockDuration, "MaxBytes", cfg.tsdb.MaxBytes, - "NoLockfile", cfg.tsdb.NoLockfile, "RetentionDuration", cfg.tsdb.RetentionDuration, "CorruptedRetentionDuration", cfg.tsdb.CorruptedRetentionDuration, - "WALSegmentSize", cfg.tsdb.WALSegmentSize, - "WALCompression", cfg.tsdb.WALCompression, + "EnableOverlappingCompaction", cfg.tsdb.EnableOverlappingCompaction, ) - startTimeMargin := int64(2 * time.Duration(cfg.tsdb.MinBlockDuration).Seconds() * 1000) - localStorage.Set(db, startTimeMargin) - // db.SetWriteNotified(remoteStorage) // PP_CHANGES.md: rebuild on cpp close(dbOpen) <-cancel return nil }, func(err error) { + // Closes adapter (head) + blockStorage (block.Manager + block.Compactor) + remoteRead. if err := fanoutStorage.Close(); err != nil { level.Error(logger).Log("msg", "Error stopping storage", "err", err) } @@ -1598,41 +1625,6 @@ func main() { level.Info(logger).Log("msg", "See you next time!") } -func openDBWithMetrics(dir string, logger log.Logger, reg prometheus.Registerer, opts *tsdb.Options, stats *tsdb.DBStats) (*tsdb.DB, error) { - db, err := tsdb.Open( - dir, - log.With(logger, "component", "tsdb"), - reg, - opts, - stats, - ) - if err != nil { - return nil, err - } - - reg.MustRegister( - prometheus.NewGaugeFunc(prometheus.GaugeOpts{ - Name: "prometheus_tsdb_lowest_timestamp_seconds", - Help: "Lowest timestamp value stored in the database.", - }, func() float64 { - bb := db.Blocks() - if len(bb) == 0 { - return float64(db.Head().MinTime() / 1000) - } - return float64(db.Blocks()[0].Meta().MinTime / 1000) - }), prometheus.NewGaugeFunc(prometheus.GaugeOpts{ - Name: "prometheus_tsdb_head_min_time_seconds", - Help: "Minimum time bound of the head block.", - }, func() float64 { return float64(db.Head().MinTime() / 1000) }), - prometheus.NewGaugeFunc(prometheus.GaugeOpts{ - Name: "prometheus_tsdb_head_max_time_seconds", - Help: "Maximum timestamp of the head block.", - }, func() float64 { return float64(db.Head().MaxTime() / 1000) }), - ) - - return db, nil -} - type safePromQLNoStepSubqueryInterval struct { value atomic.Int64 } @@ -1756,6 +1748,66 @@ func computeExternalURL(u, listenAddr string) (*url.URL, error) { return eu, nil } +// blockStorage adapts a read-only block.Manager (persisted blocks) to +// storage.Storage so it can be used as a fanout secondary. Appends are dropped: +// the head adapter is the fanout primary that stores samples. +// PP_CHANGES.md: rebuild on cpp +type blockStorage struct { + m *block.Manager + onClose func() error +} + +func (b *blockStorage) Querier(mint, maxt int64) (storage.Querier, error) { + return b.m.Querier(mint, maxt) +} + +func (b *blockStorage) ChunkQuerier(mint, maxt int64) (storage.ChunkQuerier, error) { + return b.m.ChunkQuerier(mint, maxt) +} + +func (b *blockStorage) Appender(context.Context) storage.Appender { return noopAppender{} } + +func (b *blockStorage) Close() error { return b.onClose() } + +// StartTime returns the oldest timestamp stored in the persisted blocks. +func (b *blockStorage) StartTime() (int64, error) { + // Manager keeps blocks sorted ascending by MinTime. + if blocks := b.m.Blocks(); len(blocks) > 0 { + return blocks[0].Meta().MinTime, nil + } + return math.MaxInt64, nil +} + +// noopAppender silently drops samples and reports success, so that the fanout +// appender (which appends to every secondary) does not fail on the read-only +// blockStorage secondary. +// PP_CHANGES.md: rebuild on cpp +type noopAppender struct{} + +func (noopAppender) Append(storage.SeriesRef, labels.Labels, int64, float64) (storage.SeriesRef, error) { + return 0, nil +} + +func (noopAppender) AppendExemplar(storage.SeriesRef, labels.Labels, exemplar.Exemplar) (storage.SeriesRef, error) { + return 0, nil +} + +func (noopAppender) AppendHistogram(storage.SeriesRef, labels.Labels, int64, *histogram.Histogram, *histogram.FloatHistogram) (storage.SeriesRef, error) { + return 0, nil +} + +func (noopAppender) UpdateMetadata(storage.SeriesRef, labels.Labels, metadata.Metadata) (storage.SeriesRef, error) { + return 0, nil +} + +func (noopAppender) AppendCTZeroSample(storage.SeriesRef, labels.Labels, int64, int64) (storage.SeriesRef, error) { + return 0, nil +} + +func (noopAppender) Commit() error { return nil } + +func (noopAppender) Rollback() error { return nil } + // readyStorage implements the Storage interface while allowing to set the actual // storage at a later point in time. type readyStorage struct { diff --git a/cmd/prometheus/main_test.go b/cmd/prometheus/main_test.go index 787edf1c81..31bfb4a404 100644 --- a/cmd/prometheus/main_test.go +++ b/cmd/prometheus/main_test.go @@ -19,7 +19,6 @@ import ( "errors" "fmt" "io" - "math" "os" "os/exec" "path/filepath" @@ -31,9 +30,6 @@ import ( "time" "github.com/alecthomas/kingpin/v2" - "github.com/go-kit/log" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/common/model" "github.com/stretchr/testify/require" "github.com/prometheus/prometheus/config" @@ -286,67 +282,6 @@ func TestMaxBlockChunkSegmentSizeBounds(t *testing.T) { } } -func TestTimeMetrics(t *testing.T) { - tmpDir := t.TempDir() - - reg := prometheus.NewRegistry() - db, err := openDBWithMetrics(tmpDir, log.NewNopLogger(), reg, nil, nil) - require.NoError(t, err) - defer func() { - require.NoError(t, db.Close()) - }() - - // Check initial values. - require.Equal(t, map[string]float64{ - "prometheus_tsdb_lowest_timestamp_seconds": float64(math.MaxInt64) / 1000, - "prometheus_tsdb_head_min_time_seconds": float64(math.MaxInt64) / 1000, - "prometheus_tsdb_head_max_time_seconds": float64(math.MinInt64) / 1000, - }, getCurrentGaugeValuesFor(t, reg, - "prometheus_tsdb_lowest_timestamp_seconds", - "prometheus_tsdb_head_min_time_seconds", - "prometheus_tsdb_head_max_time_seconds", - )) - - app := db.Appender(context.Background()) - _, err = app.Append(0, labels.FromStrings(model.MetricNameLabel, "a"), 1000, 1) - require.NoError(t, err) - _, err = app.Append(0, labels.FromStrings(model.MetricNameLabel, "a"), 2000, 1) - require.NoError(t, err) - _, err = app.Append(0, labels.FromStrings(model.MetricNameLabel, "a"), 3000, 1) - require.NoError(t, err) - require.NoError(t, app.Commit()) - - require.Equal(t, map[string]float64{ - "prometheus_tsdb_lowest_timestamp_seconds": 1.0, - "prometheus_tsdb_head_min_time_seconds": 1.0, - "prometheus_tsdb_head_max_time_seconds": 3.0, - }, getCurrentGaugeValuesFor(t, reg, - "prometheus_tsdb_lowest_timestamp_seconds", - "prometheus_tsdb_head_min_time_seconds", - "prometheus_tsdb_head_max_time_seconds", - )) -} - -func getCurrentGaugeValuesFor(t *testing.T, reg prometheus.Gatherer, metricNames ...string) map[string]float64 { - f, err := reg.Gather() - require.NoError(t, err) - - res := make(map[string]float64, len(metricNames)) - for _, g := range f { - for _, m := range metricNames { - if g.GetName() != m { - continue - } - - require.Len(t, g.GetMetric(), 1) - _, ok := res[m] - require.False(t, ok, "expected only one metric family for", m) - res[m] = *g.GetMetric()[0].GetGauge().Value - } - } - return res -} - func TestAgentSuccessfulStartup(t *testing.T) { prom := exec.Command(promPath, "-test.main", "--enable-feature=agent", "--web.listen-address=0.0.0.0:0", "--config.file="+agentConfig) require.NoError(t, prom.Start()) From da78aa5948b9179662b4f8771c2463b432b123c5 Mon Sep 17 00:00:00 2001 From: Vladimir Kavlakan Date: Thu, 11 Jun 2026 06:49:53 +0200 Subject: [PATCH 4/4] review fix --- cmd/prometheus/main.go | 11 +++++++++++ pp/go/storage/block/manager.go | 15 +++++++++++++++ 2 files changed, 26 insertions(+) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index bcfc8de07a..3031ceb745 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -1427,6 +1427,17 @@ func main() { g.Add( func() error { level.Info(logger).Log("msg", "Starting persisted block storage ...") + if cfg.tsdb.WALSegmentSize != 0 { + if cfg.tsdb.WALSegmentSize < 10*1024*1024 || cfg.tsdb.WALSegmentSize > 256*1024*1024 { + return errors.New("flag 'storage.tsdb.wal-segment-size' must be set between 10MB and 256MB") + } + } + if cfg.tsdb.MaxBlockChunkSegmentSize != 0 { + if cfg.tsdb.MaxBlockChunkSegmentSize < 1024*1024 { + return errors.New("flag 'storage.tsdb.max-block-chunk-segment-size' must be set over 1MB") + } + } + switch fsType := prom_runtime.Statfs(localStoragePath); fsType { case "NFS_SUPER_MAGIC": level.Warn(logger).Log("fs_type", fsType, "msg", "This filesystem is not supported and may lead to data corruption and data loss. Please carefully read https://prometheus.io/docs/prometheus/latest/storage/ to learn more about supported filesystems.") diff --git a/pp/go/storage/block/manager.go b/pp/go/storage/block/manager.go index 7f72328a11..e2720980f8 100644 --- a/pp/go/storage/block/manager.go +++ b/pp/go/storage/block/manager.go @@ -85,6 +85,12 @@ func NewManager( stopc: make(chan struct{}), stoppedc: make(chan struct{}), } + + if err := m.reloadBlocks(); err != nil { + level.Error(logger).Log("msg", "initial reload blocks failed", "err", err) + } + + level.Info(logger).Log("msg", "Block manager started", "dir", dir) go m.loop() return m } @@ -116,6 +122,15 @@ func (m *Manager) Close() { close(m.stopc) }) <-m.stoppedc + level.Info(m.logger).Log("msg", "Block manager closed") + m.mtx.Lock() + defer m.mtx.Unlock() + for _, b := range m.blocks { + if err := b.Close(); err != nil { + level.Warn(m.logger).Log("msg", "Closing block failed", "err", err, "block", b.Meta().ULID) + } + } + m.blocks = nil } // Querier returns a new querier over the persisted blocks overlapping the time