Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
199 changes: 131 additions & 68 deletions cmd/prometheus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ import (
pp_pkg_tsdb "github.com/prometheus/prometheus/pp-pkg/tsdb" // PP_CHANGES.md: rebuild on cpp

pp_storage "github.com/prometheus/prometheus/pp/go/storage" // PP_CHANGES.md: rebuild on cpp
block "github.com/prometheus/prometheus/pp/go/storage/block" // PP_CHANGES.md: rebuild on cpp
"github.com/prometheus/prometheus/pp/go/storage/catalog" // PP_CHANGES.md: rebuild on cpp
"github.com/prometheus/prometheus/pp/go/storage/head/head" // PP_CHANGES.md: rebuild on cpp
"github.com/prometheus/prometheus/pp/go/storage/querier" // PP_CHANGES.md: rebuild on cpp
Expand Down Expand Up @@ -817,23 +818,69 @@ func main() {

// PP_CHANGES.md: rebuild on cpp end

var (
localStorage = &readyStorage{stats: tsdb.NewDBStats()}
scraper = &readyScrapeManager{}
localStorage := &readyStorage{stats: tsdb.NewDBStats()}
scraper := &readyScrapeManager{}

// PP_CHANGES.md: rebuild on cpp start
remoteRead = pp_pkg_remote.NewRemoteRead(
log.With(logger, "component", "remote"),
localStorage.StartTime,
)
fanoutStorage = storage.NewFanout(
logger,
adapter,
localStorage,
remoteRead,
// PP_CHANGES.md: rebuild on cpp start
// In server mode the persisted blocks are managed by block.Manager (read +
// retention) and block.Compactor (compaction) instead of a tsdb.DB. The
// block.Manager is wired into the fanout via a querier-only storage.Storage
// adapter; localStorage stays an empty stub. In agent mode the secondary is
// still localStorage (agent.DB set later).
var (
blockManager *block.Manager
blockCompactor *block.Compactor
compactCancel context.CancelFunc
persistedStorage storage.Storage = localStorage
startTimeFn func() (int64, error) = localStorage.StartTime
)
if !agentMode {
retentionMs := int64(time.Duration(cfg.tsdb.RetentionDuration) / time.Millisecond)
blocksToDelete := pp_pkg_tsdb.NewBlocksToDelete(
retentionMs,
int64(cfg.tsdb.MaxBytes),
pp_pkg_tsdb.CatalogHeadsExtraSize(dataDir, headCatalog),
prometheus.DefaultRegisterer,
)
// PP_CHANGES.md: rebuild on cpp end
blockManager = block.NewManager(localStoragePath, &block.Options{
RetentionDuration: retentionMs,
CorruptedRetentionDuration: time.Duration(cfg.tsdb.CorruptedRetentionDuration),
EnableOverlappingCompaction: cfg.tsdb.EnableOverlappingCompaction,
}, blocksToDelete, log.With(logger, "component", "blockmanager"), prometheus.DefaultRegisterer)

var compactCtx context.Context
compactCtx, compactCancel = context.WithCancel(context.Background())
blockCompactor, err = block.NewCompactor(compactCtx, localStoragePath, &block.CompactorOptions{
MinBlockDuration: int64(time.Duration(cfg.tsdb.MinBlockDuration) / time.Millisecond),
MaxBlockChunkSegmentSize: int64(cfg.tsdb.MaxBlockChunkSegmentSize),
EnableOverlappingCompaction: cfg.tsdb.EnableOverlappingCompaction,
}, blockManager, log.With(logger, "component", "blockcompactor"), prometheus.DefaultRegisterer)
if err != nil {
level.Error(logger).Log("msg", "failed to create block compactor", "err", err)
os.Exit(1)
}

bs := &blockStorage{m: blockManager, onClose: func() error {
blockCompactor.Close()
blockManager.Close()
compactCancel()
return nil
}}
persistedStorage = bs
startTimeFn = bs.StartTime
}

remoteRead := pp_pkg_remote.NewRemoteRead(
log.With(logger, "component", "remote"),
startTimeFn,
)
fanoutStorage := storage.NewFanout(
logger,
adapter,
persistedStorage,
remoteRead,
)
// PP_CHANGES.md: rebuild on cpp end

var (
ctxWeb, cancelWeb = context.WithCancel(context.Background())
Expand Down Expand Up @@ -1371,13 +1418,15 @@ func main() {
)
}
if !agentMode {
// TSDB.
opts := cfg.tsdb.ToTSDBOptions()
opts.StripeSize = 1 // PP_CHANGES.md: rebuild on cpp
// Persisted block storage (block.Manager + block.Compactor). The tsdb.DB
// is disabled in server mode; persisted blocks are read via block.Manager
// and compacted by block.Compactor, both started above. This actor only
// signals readiness and tears the storage down on shutdown.
// PP_CHANGES.md: rebuild on cpp
cancel := make(chan struct{})
g.Add(
func() error {
level.Info(logger).Log("msg", "Starting TSDB ...")
level.Info(logger).Log("msg", "Starting persisted block storage ...")
if cfg.tsdb.WALSegmentSize != 0 {
if cfg.tsdb.WALSegmentSize < 10*1024*1024 || cfg.tsdb.WALSegmentSize > 256*1024*1024 {
return errors.New("flag 'storage.tsdb.wal-segment-size' must be set between 10MB and 256MB")
Expand All @@ -1389,39 +1438,28 @@ func main() {
}
}

db, err := openDBWithMetrics(localStoragePath, logger, prometheus.DefaultRegisterer, &opts, localStorage.getStats())
if err != nil {
return fmt.Errorf("opening storage failed: %w", err)
}

tsdb.DBSetBlocksToDelete(db, pp_pkg_tsdb.PPBlocksToDelete(db, dataDir, headCatalog))
switch fsType := prom_runtime.Statfs(localStoragePath); fsType {
case "NFS_SUPER_MAGIC":
level.Warn(logger).Log("fs_type", fsType, "msg", "This filesystem is not supported and may lead to data corruption and data loss. Please carefully read https://prometheus.io/docs/prometheus/latest/storage/ to learn more about supported filesystems.")
default:
level.Info(logger).Log("fs_type", fsType)
}

level.Info(logger).Log("msg", "TSDB started")
level.Debug(logger).Log("msg", "TSDB options",
level.Info(logger).Log("msg", "Persisted block storage started")
level.Debug(logger).Log("msg", "Block storage options",
"MinBlockDuration", cfg.tsdb.MinBlockDuration,
"MaxBlockDuration", cfg.tsdb.MaxBlockDuration,
"MaxBytes", cfg.tsdb.MaxBytes,
"NoLockfile", cfg.tsdb.NoLockfile,
"RetentionDuration", cfg.tsdb.RetentionDuration,
"CorruptedRetentionDuration", cfg.tsdb.CorruptedRetentionDuration,
"WALSegmentSize", cfg.tsdb.WALSegmentSize,
"WALCompression", cfg.tsdb.WALCompression,
"EnableOverlappingCompaction", cfg.tsdb.EnableOverlappingCompaction,
)

startTimeMargin := int64(2 * time.Duration(cfg.tsdb.MinBlockDuration).Seconds() * 1000)
localStorage.Set(db, startTimeMargin)
// db.SetWriteNotified(remoteStorage) // PP_CHANGES.md: rebuild on cpp
close(dbOpen)
<-cancel
return nil
},
func(err error) {
// Closes adapter (head) + blockStorage (block.Manager + block.Compactor) + remoteRead.
if err := fanoutStorage.Close(); err != nil {
level.Error(logger).Log("msg", "Error stopping storage", "err", err)
}
Expand Down Expand Up @@ -1598,41 +1636,6 @@ func main() {
level.Info(logger).Log("msg", "See you next time!")
}

func openDBWithMetrics(dir string, logger log.Logger, reg prometheus.Registerer, opts *tsdb.Options, stats *tsdb.DBStats) (*tsdb.DB, error) {
db, err := tsdb.Open(
dir,
log.With(logger, "component", "tsdb"),
reg,
opts,
stats,
)
if err != nil {
return nil, err
}

reg.MustRegister(
prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "prometheus_tsdb_lowest_timestamp_seconds",
Help: "Lowest timestamp value stored in the database.",
}, func() float64 {
bb := db.Blocks()
if len(bb) == 0 {
return float64(db.Head().MinTime() / 1000)
}
return float64(db.Blocks()[0].Meta().MinTime / 1000)
}), prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "prometheus_tsdb_head_min_time_seconds",
Help: "Minimum time bound of the head block.",
}, func() float64 { return float64(db.Head().MinTime() / 1000) }),
prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "prometheus_tsdb_head_max_time_seconds",
Help: "Maximum timestamp of the head block.",
}, func() float64 { return float64(db.Head().MaxTime() / 1000) }),
)

return db, nil
}

type safePromQLNoStepSubqueryInterval struct {
value atomic.Int64
}
Expand Down Expand Up @@ -1756,6 +1759,66 @@ func computeExternalURL(u, listenAddr string) (*url.URL, error) {
return eu, nil
}

// blockStorage adapts a read-only block.Manager (persisted blocks) to
// storage.Storage so it can be used as a fanout secondary. Appends are dropped:
// the head adapter is the fanout primary that stores samples.
// PP_CHANGES.md: rebuild on cpp
type blockStorage struct {
m *block.Manager
onClose func() error
}

func (b *blockStorage) Querier(mint, maxt int64) (storage.Querier, error) {
return b.m.Querier(mint, maxt)
}

func (b *blockStorage) ChunkQuerier(mint, maxt int64) (storage.ChunkQuerier, error) {
return b.m.ChunkQuerier(mint, maxt)
}

func (b *blockStorage) Appender(context.Context) storage.Appender { return noopAppender{} }

func (b *blockStorage) Close() error { return b.onClose() }

// StartTime returns the oldest timestamp stored in the persisted blocks.
func (b *blockStorage) StartTime() (int64, error) {
// Manager keeps blocks sorted ascending by MinTime.
if blocks := b.m.Blocks(); len(blocks) > 0 {
return blocks[0].Meta().MinTime, nil
}
return math.MaxInt64, nil
}

// noopAppender silently drops samples and reports success, so that the fanout
// appender (which appends to every secondary) does not fail on the read-only
// blockStorage secondary.
// PP_CHANGES.md: rebuild on cpp
type noopAppender struct{}

func (noopAppender) Append(storage.SeriesRef, labels.Labels, int64, float64) (storage.SeriesRef, error) {
return 0, nil
}

func (noopAppender) AppendExemplar(storage.SeriesRef, labels.Labels, exemplar.Exemplar) (storage.SeriesRef, error) {
return 0, nil
}

func (noopAppender) AppendHistogram(storage.SeriesRef, labels.Labels, int64, *histogram.Histogram, *histogram.FloatHistogram) (storage.SeriesRef, error) {
return 0, nil
}

func (noopAppender) UpdateMetadata(storage.SeriesRef, labels.Labels, metadata.Metadata) (storage.SeriesRef, error) {
return 0, nil
}

func (noopAppender) AppendCTZeroSample(storage.SeriesRef, labels.Labels, int64, int64) (storage.SeriesRef, error) {
return 0, nil
}

func (noopAppender) Commit() error { return nil }

func (noopAppender) Rollback() error { return nil }

// readyStorage implements the Storage interface while allowing to set the actual
// storage at a later point in time.
type readyStorage struct {
Expand Down
65 changes: 0 additions & 65 deletions cmd/prometheus/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"errors"
"fmt"
"io"
"math"
"os"
"os/exec"
"path/filepath"
Expand All @@ -31,9 +30,6 @@ import (
"time"

"github.com/alecthomas/kingpin/v2"
"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"

"github.com/prometheus/prometheus/config"
Expand Down Expand Up @@ -286,67 +282,6 @@ func TestMaxBlockChunkSegmentSizeBounds(t *testing.T) {
}
}

func TestTimeMetrics(t *testing.T) {
tmpDir := t.TempDir()

reg := prometheus.NewRegistry()
db, err := openDBWithMetrics(tmpDir, log.NewNopLogger(), reg, nil, nil)
require.NoError(t, err)
defer func() {
require.NoError(t, db.Close())
}()

// Check initial values.
require.Equal(t, map[string]float64{
"prometheus_tsdb_lowest_timestamp_seconds": float64(math.MaxInt64) / 1000,
"prometheus_tsdb_head_min_time_seconds": float64(math.MaxInt64) / 1000,
"prometheus_tsdb_head_max_time_seconds": float64(math.MinInt64) / 1000,
}, getCurrentGaugeValuesFor(t, reg,
"prometheus_tsdb_lowest_timestamp_seconds",
"prometheus_tsdb_head_min_time_seconds",
"prometheus_tsdb_head_max_time_seconds",
))

app := db.Appender(context.Background())
_, err = app.Append(0, labels.FromStrings(model.MetricNameLabel, "a"), 1000, 1)
require.NoError(t, err)
_, err = app.Append(0, labels.FromStrings(model.MetricNameLabel, "a"), 2000, 1)
require.NoError(t, err)
_, err = app.Append(0, labels.FromStrings(model.MetricNameLabel, "a"), 3000, 1)
require.NoError(t, err)
require.NoError(t, app.Commit())

require.Equal(t, map[string]float64{
"prometheus_tsdb_lowest_timestamp_seconds": 1.0,
"prometheus_tsdb_head_min_time_seconds": 1.0,
"prometheus_tsdb_head_max_time_seconds": 3.0,
}, getCurrentGaugeValuesFor(t, reg,
"prometheus_tsdb_lowest_timestamp_seconds",
"prometheus_tsdb_head_min_time_seconds",
"prometheus_tsdb_head_max_time_seconds",
))
}

func getCurrentGaugeValuesFor(t *testing.T, reg prometheus.Gatherer, metricNames ...string) map[string]float64 {
f, err := reg.Gather()
require.NoError(t, err)

res := make(map[string]float64, len(metricNames))
for _, g := range f {
for _, m := range metricNames {
if g.GetName() != m {
continue
}

require.Len(t, g.GetMetric(), 1)
_, ok := res[m]
require.False(t, ok, "expected only one metric family for", m)
res[m] = *g.GetMetric()[0].GetGauge().Value
}
}
return res
}

func TestAgentSuccessfulStartup(t *testing.T) {
prom := exec.Command(promPath, "-test.main", "--enable-feature=agent", "--web.listen-address=0.0.0.0:0", "--config.file="+agentConfig)
require.NoError(t, prom.Start())
Expand Down
Loading
Loading