diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 4330703400..9238c6bc37 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -2240,7 +2240,7 @@ func readPromPPFeatures(logger log.Logger) { remotewriter.DefaultSampleAgeLimit = defaultSampleAgeLimit case "select_func_optimization": - if err := querier.SetSelectFuncOptimize(strings.TrimSpace(fvalue)); err != nil { + if err := selectFuncOptimization(strings.TrimSpace(fvalue)); err != nil { level.Error(logger).Log( "msg", "[FEATURE] Error parsing select_func_optimization value", "err", err, @@ -2255,3 +2255,15 @@ func readPromPPFeatures(logger log.Logger) { } } } + +// selectFuncOptimization sets the select function optimization. +func selectFuncOptimization(fvalue string) error { + for opt := range strings.SplitSeq(fvalue, "|") { + if err := querier.SetSelectFuncOptimize(opt); err != nil { + querier.SetDefaultOptimizeType() // reset to default if error + return err + } + } + + return nil +} diff --git a/pp-pkg/storage/adapter.go b/pp-pkg/storage/adapter.go index 81d7041458..cd0d471ca5 100644 --- a/pp-pkg/storage/adapter.go +++ b/pp-pkg/storage/adapter.go @@ -36,6 +36,7 @@ type Adapter struct { hashdexLimits atomic.Value // stores cppbridge.WALHashdexLimits transparentState *cppbridge.StateV2 mergeOutOfOrderChunks func() + scrapeInterval atomic.Int64 // stat activeQuerierMetrics *querier.Metrics @@ -132,7 +133,10 @@ func (ar *Adapter) AppendSnappyProtobuf( state *cppbridge.StateV2, commitToWal bool, ) error { - hx, err := cppbridge.NewWALSnappyProtobufHashdex(compressedData.Bytes(), ar.hashdexLimits.Load().(cppbridge.WALHashdexLimits)) + hx, err := cppbridge.NewWALSnappyProtobufHashdex( + compressedData.Bytes(), + ar.hashdexLimits.Load().(cppbridge.WALHashdexLimits), + ) compressedData.Destroy() if err != nil { return err @@ -247,11 +251,12 @@ func (ar *Adapter) ChunkQuerier(mint, maxt int64) (storage.ChunkQuerier, error) // Label limit fields follow the same 0 = no limit semantics as scrape config. func (ar *Adapter) ApplyConfig(cfg *config.Config) error { limits := cppbridge.WALHashdexLimits{ - MaxLabelNamesPerTimeseries: uint32(cfg.GlobalConfig.LabelLimit), - MaxLabelNameLength: uint32(cfg.GlobalConfig.LabelNameLengthLimit), - MaxLabelValueLength: uint32(cfg.GlobalConfig.LabelValueLengthLimit), + MaxLabelNamesPerTimeseries: uint32(cfg.GlobalConfig.LabelLimit), // #nosec G115 // no overflow + MaxLabelNameLength: uint32(cfg.GlobalConfig.LabelNameLengthLimit), // #nosec G115 // no overflow + MaxLabelValueLength: uint32(cfg.GlobalConfig.LabelValueLengthLimit), // #nosec G115 // no overflow } ar.hashdexLimits.Store(limits) + ar.scrapeInterval.Store(int64(cfg.GlobalConfig.ScrapeInterval)) return nil } @@ -269,7 +274,7 @@ func (ar *Adapter) HeadQuerier(mint, maxt int64) (storage.Querier, error) { querier.NewNoOpShardedDeduplicator, mint, maxt, - nil, + ar.scrapeInterval.Load(), ar.activeQuerierMetrics, ), nil } @@ -296,7 +301,14 @@ func (ar *Adapter) Querier(mint, maxt int64) (storage.Querier, error) { ahead := ar.proxy.Get() queriers = append( queriers, - querier.NewQuerier(ahead, querier.NewNoOpShardedDeduplicator, mint, maxt, nil, ar.activeQuerierMetrics), + querier.NewQuerier( + ahead, + querier.NewNoOpShardedDeduplicator, + mint, + maxt, + ar.scrapeInterval.Load(), + ar.activeQuerierMetrics, + ), ) for _, head := range ar.proxy.Heads() { @@ -316,7 +328,7 @@ func (ar *Adapter) Querier(mint, maxt int64) (storage.Querier, error) { querier.NewNoOpShardedDeduplicator, mint, maxt, - nil, + ar.scrapeInterval.Load(), ar.storageQuerierMetrics, ), ) diff --git a/pp-pkg/storage/batch_storage.go b/pp-pkg/storage/batch_storage.go index 5794fce1b4..4099c653ee 100644 --- a/pp-pkg/storage/batch_storage.go +++ b/pp-pkg/storage/batch_storage.go @@ -84,11 +84,23 @@ func (bs *BatchStorage) Commit(ctx context.Context) error { // CommitWithState adds aggregated series from [pp_storage.TransactionHead] to the [Head] with [cppbridge.StateV2]. func (bs *BatchStorage) CommitWithState(ctx context.Context, state *cppbridge.StateV2) error { s := bs.transactionHead.Shards()[0] - _, err := bs.adapter.AppendGoHeadHashdex(ctx, cppbridge.NewGoHeadHashdex(s.LSS().Target(), s.DataStorage().Raw()), state, false) + _, err := bs.adapter.AppendGoHeadHashdex( + ctx, + cppbridge.NewGoHeadHashdex(s.LSS().Target(), s.DataStorage().Raw()), + state, + false, + ) return err } // Querier calls f() with the given parameters. Returns a [querier.Querier]. func (bs *BatchStorage) Querier(mint, maxt int64) (storage.Querier, error) { - return querier.NewQuerier(bs.transactionHead, querier.NewNoOpShardedDeduplicator, mint, maxt, nil, nil), nil + return querier.NewQuerier( + bs.transactionHead, + querier.NewNoOpShardedDeduplicator, + mint, + maxt, + bs.adapter.scrapeInterval.Load(), + nil, + ), nil } diff --git a/pp/entrypoint/series_data_data_storage.cpp b/pp/entrypoint/series_data_data_storage.cpp index 5cb6a83b60..3b6a44eba7 100644 --- a/pp/entrypoint/series_data_data_storage.cpp +++ b/pp/entrypoint/series_data_data_storage.cpp @@ -219,27 +219,31 @@ extern "C" void prompp_series_data_data_storage_query_final(void* args) { } } -extern "C" void prompp_series_data_data_storage_query_first_timestamps(void* args, void* res) { +extern "C" void prompp_series_data_data_storage_query_first_timestamps(void* args) { using PromPP::Primitives::Timestamp; using series_data::Decoder; struct Arguments { DataStoragePtr data_storage; + Timestamp not_found_timestamp_value; SliceView series_ids; - }; - - struct Result { Slice timestamps; }; const auto in = static_cast(args); - const auto out = static_cast(res); - assert(in->series_ids.size() == out->timestamps.size()); + assert(in->series_ids.size() == in->timestamps.size()); const auto& storage = *in->data_storage; - std::ranges::transform(in->series_ids, out->timestamps.begin(), - [&storage](uint32_t series_id) { return Decoder::get_series_min_timestamp(storage, series_id); }); + std::ranges::transform(in->series_ids, in->timestamps.begin(), [&storage, in](uint32_t series_id) { + if (storage.open_chunks.size() > series_id) [[likely]] { + if (!storage.open_chunks[series_id].is_empty()) [[likely]] { + return Decoder::get_series_min_timestamp(storage, series_id); + } + } + + return in->not_found_timestamp_value; + }); } extern "C" void prompp_series_data_data_storage_allocated_memory(void* args, void* res) { @@ -497,4 +501,4 @@ extern "C" void prompp_series_data_data_storage_loader_dtor(void* args) { }; static_cast(args)->~Arguments(); -} \ No newline at end of file +} diff --git a/pp/entrypoint/series_data_data_storage.h b/pp/entrypoint/series_data_data_storage.h index 6706ec19a5..a00c75bc5c 100644 --- a/pp/entrypoint/series_data_data_storage.h +++ b/pp/entrypoint/series_data_data_storage.h @@ -144,14 +144,13 @@ void prompp_series_data_data_storage_instant_query(void* args, void* res); * @brief Get the first sample timestamp per series * * @param args { - * dataStorage uintptr // pointer to constructed data storage - * seriesIds []uint32 // series ids - * } - * @param res { - * timestamps []int64 // same length as seriesIds; filled from storage + * dataStorage uintptr // pointer to constructed data storage + * notFoundTimestampValue int64 // timestamp value to return if series is not found in storage + * seriesIds []uint32 // series ids + * timestamps []int64 // same length as seriesIds; filled from storage * } */ -void prompp_series_data_data_storage_query_first_timestamps(void* args, void* res); +void prompp_series_data_data_storage_query_first_timestamps(void* args); /** * @brief finishes all Queriers after data load. diff --git a/pp/go/cppbridge/data_storage.go b/pp/go/cppbridge/data_storage.go index 159e901f66..0193c77925 100644 --- a/pp/go/cppbridge/data_storage.go +++ b/pp/go/cppbridge/data_storage.go @@ -121,8 +121,8 @@ func (ds *DataStorage) InstantQuery(targetTimestamp int64, labelSetIDs []uint32, } // QueryFirstTimestamps fills timestamps with the first sample timestamp (Prometheus ms) for each series in seriesIDs -func (ds *DataStorage) QueryFirstTimestamps(seriesIDs []uint32, timestamps []int64) { - seriesDataDataStorageQueryFirstTimestamps(ds.dataStorage, seriesIDs, timestamps) +func (ds *DataStorage) QueryFirstTimestamps(seriesIDs []uint32, timestamps []int64, notFoundTimestampValue int64) { + seriesDataDataStorageQueryFirstTimestamps(ds.dataStorage, notFoundTimestampValue, seriesIDs, timestamps) runtime.KeepAlive(ds) } diff --git a/pp/go/cppbridge/entrypoint.go b/pp/go/cppbridge/entrypoint.go index ca91b042e4..ab82d4b71c 100644 --- a/pp/go/cppbridge/entrypoint.go +++ b/pp/go/cppbridge/entrypoint.go @@ -1573,7 +1573,7 @@ func primitivesLSSQueryLabelValues(lss uintptr, label_name string, matchers []mo return res.status, res.values } -func primitivesLSSGetLabelNameIDs(lss uintptr, names []string) []uint32 { +func primitivesLSSGetLabelNameIDs(lss uintptr, names []string, nameIDs []uint32) { args := struct { lss uintptr names []string @@ -1581,7 +1581,7 @@ func primitivesLSSGetLabelNameIDs(lss uintptr, names []string) []uint32 { res := struct { outIDs []uint32 - }{make([]uint32, len(names))} + }{nameIDs} testGC() fastcgo.UnsafeCall2( @@ -1589,8 +1589,6 @@ func primitivesLSSGetLabelNameIDs(lss uintptr, names []string) []uint32 { uintptr(unsafe.Pointer(&args)), uintptr(unsafe.Pointer(&res)), ) - - return res.outIDs } func primitivesLSSCreateSnapshotLSS(lss uintptr) uintptr { @@ -2145,20 +2143,23 @@ func seriesDataDataStorageInstantQuery(dataStorage uintptr, labelSetIDs []uint32 return res } -func seriesDataDataStorageQueryFirstTimestamps(dataStorage uintptr, seriesIDs []uint32, timestamps []int64) { +func seriesDataDataStorageQueryFirstTimestamps( + dataStorage uintptr, + notFoundTimestampValue int64, + seriesIDs []uint32, + timestamps []int64, +) { args := struct { - dataStorage uintptr - seriesIDs []uint32 - }{dataStorage, seriesIDs} - res := struct { - timestamps []int64 - }{timestamps} + dataStorage uintptr + notFoundTimestampValue int64 + seriesIDs []uint32 + timestamps []int64 + }{dataStorage, notFoundTimestampValue, seriesIDs, timestamps} testGC() - fastcgo.UnsafeCall2( + fastcgo.UnsafeCall1( C.prompp_series_data_data_storage_query_first_timestamps, uintptr(unsafe.Pointer(&args)), - uintptr(unsafe.Pointer(&res)), ) } diff --git a/pp/go/cppbridge/entrypoint.h b/pp/go/cppbridge/entrypoint.h index 3e38373bab..167cccd88e 100755 --- a/pp/go/cppbridge/entrypoint.h +++ b/pp/go/cppbridge/entrypoint.h @@ -1507,14 +1507,13 @@ void prompp_series_data_data_storage_instant_query(void* args, void* res); * @brief Get the first sample timestamp per series * * @param args { - * dataStorage uintptr // pointer to constructed data storage - * seriesIds []uint32 // series ids - * } - * @param res { - * timestamps []int64 // same length as seriesIds; filled from storage + * dataStorage uintptr // pointer to constructed data storage + * notFoundTimestampValue int64 // timestamp value to return if series is not found in storage + * seriesIds []uint32 // series ids + * timestamps []int64 // same length as seriesIds; filled from storage * } */ -void prompp_series_data_data_storage_query_first_timestamps(void* args, void* res); +void prompp_series_data_data_storage_query_first_timestamps(void* args); /** * @brief finishes all Queriers after data load. diff --git a/pp/go/cppbridge/head_test.go b/pp/go/cppbridge/head_test.go index 02628176bf..b824b6173b 100644 --- a/pp/go/cppbridge/head_test.go +++ b/pp/go/cppbridge/head_test.go @@ -7,7 +7,6 @@ import ( "github.com/prometheus/prometheus/pp/go/storage/querier" "github.com/prometheus/prometheus/storage" - "github.com/stretchr/testify/require" "github.com/prometheus/prometheus/pp/go/cppbridge" "github.com/prometheus/prometheus/pp/go/model" @@ -183,7 +182,7 @@ func (s *HeadSuite) TestInstantQuery() { // Arrange dataStorage := cppbridge.NewDataStorage() encoder := cppbridge.NewHeadEncoderWithDataStorage(dataStorage) - var series = []struct { + series := []struct { SeriesID uint32 cppbridge.Sample }{ @@ -213,19 +212,19 @@ func (s *HeadSuite) TestInstantQuery() { result := dataStorage.InstantQuery(targetTimestamp, seriesIDs, uintptr(unsafe.Pointer(unsafe.SliceData(instantSeries)))) // Assert - require.Equal(s.T(), cppbridge.DataStorageQueryStatusSuccess, result.Status) + s.Require().Equal(cppbridge.DataStorageQueryStatusSuccess, result.Status) s.Equal(defaultTimestamp, instantSeries[0].Timestamp) - s.Equal(series[2].Sample, cppbridge.Sample{Timestamp: instantSeries[1].Timestamp, Value: instantSeries[1].Value}) - s.Equal(series[5].Sample, cppbridge.Sample{Timestamp: instantSeries[2].Timestamp, Value: instantSeries[2].Value}) - s.Equal(series[6].Sample, cppbridge.Sample{Timestamp: instantSeries[3].Timestamp, Value: instantSeries[3].Value}) + s.Equal(cppbridge.Sample{Timestamp: instantSeries[1].Timestamp, Value: instantSeries[1].Value}, series[2].Sample) + s.Equal(cppbridge.Sample{Timestamp: instantSeries[2].Timestamp, Value: instantSeries[2].Value}, series[5].Sample) + s.Equal(cppbridge.Sample{Timestamp: instantSeries[3].Timestamp, Value: instantSeries[3].Value}, series[6].Sample) } func (s *HeadSuite) TestQueryFirstTimestampsWithEmptySeriesIds() { // Arrange // Act - s.dataStorage.QueryFirstTimestamps(nil, nil) + s.dataStorage.QueryFirstTimestamps(nil, nil, 0) // Assert } @@ -242,7 +241,7 @@ func (s *HeadSuite) TestQueryFirstTimestamps() { // Act timestamps := make([]int64, 2) - s.dataStorage.QueryFirstTimestamps([]uint32{1, 0}, timestamps) + s.dataStorage.QueryFirstTimestamps([]uint32{1, 0}, timestamps, 0) // Assert s.Equal([]int64{2, 5}, timestamps) @@ -259,12 +258,41 @@ func (s *HeadSuite) TestQueryFirstTimestampsInFinalizedChunk() { // Act timestamps := make([]int64, 1) - s.dataStorage.QueryFirstTimestamps([]uint32{0}, timestamps) + s.dataStorage.QueryFirstTimestamps([]uint32{0}, timestamps, 0) // Assert s.Equal([]int64{5}, timestamps) } +func (s *HeadSuite) TestQueryFirstTimestampsRotatedLSS() { + // Arrange + s.lss.FindOrEmplace(model.NewLabelSetBuilder().Set("job", "1").Build()) + s.lss.FindOrEmplace(model.NewLabelSetBuilder().Set("job", "2").Build()) + + // Act + timestamps := make([]int64, 2) + s.dataStorage.QueryFirstTimestamps([]uint32{1, 0}, timestamps, -1) + + // Assert + s.Equal([]int64{-1, -1}, timestamps) +} + +func (s *HeadSuite) TestQueryFirstTimestampsRotatedLSSWithEmptySeries() { + // Arrange + s.lss.FindOrEmplace(model.NewLabelSetBuilder().Set("job", "1").Build()) + s.lss.FindOrEmplace(model.NewLabelSetBuilder().Set("job", "2").Build()) + + s.encoder.Encode(1, 5, 1.0) + s.encoder.Encode(1, 9, 1.0) + + // Act + timestamps := make([]int64, 2) + s.dataStorage.QueryFirstTimestamps([]uint32{1, 0}, timestamps, -1) + + // Assert + s.Equal([]int64{5, -1}, timestamps) +} + type DataStorageSerializedDataMultiSeriesIteratorSuite struct { suite.Suite lss *cppbridge.LabelSetStorage diff --git a/pp/go/cppbridge/lss_snapshot.go b/pp/go/cppbridge/lss_snapshot.go index 70e85805a0..9ab1b716df 100644 --- a/pp/go/cppbridge/lss_snapshot.go +++ b/pp/go/cppbridge/lss_snapshot.go @@ -77,12 +77,13 @@ func (lss *LabelSetSnapshot) Query(selector uintptr) *LSSQueryResult { return result } +// SeriesGroups group series by label names. type SeriesGroups struct { Groups [][]uint32 } // GroupSeriesByLabelNames group series by label names -func (lss *LabelSetSnapshot) GroupSeriesByLabelNames(seriesIDs []uint32, labelNameIDs []uint32) *SeriesGroups { +func (lss *LabelSetSnapshot) GroupSeriesByLabelNames(seriesIDs, labelNameIDs []uint32) *SeriesGroups { result := &SeriesGroups{ Groups: primitivesGroupSeriesByLabelNames(lss.pointer, seriesIDs, labelNameIDs), } diff --git a/pp/go/cppbridge/lss_snapshot_test.go b/pp/go/cppbridge/lss_snapshot_test.go index 6aecd33563..360366ab6b 100644 --- a/pp/go/cppbridge/lss_snapshot_test.go +++ b/pp/go/cppbridge/lss_snapshot_test.go @@ -52,13 +52,15 @@ func (s *LabelSetSnapshotSuite) TestGroupSeriesByLabelNames_ByJob() { idA0 := s.lss.FindOrEmplace(model.NewLabelSetBuilder().Set("__name__", "m").Set("job", "a").Set("instance", "i0").Build()).LabelSetID idA1 := s.lss.FindOrEmplace(model.NewLabelSetBuilder().Set("__name__", "m").Set("job", "a").Set("instance", "i1").Build()).LabelSetID idB := s.lss.FindOrEmplace(model.NewLabelSetBuilder().Set("__name__", "m").Set("job", "b").Set("instance", "i2").Build()).LabelSetID + names := []string{"job"} + nameIDs := make([]uint32, len(names)) - jobId := s.lss.GetLabelNameIDs([]string{"job"}) + s.lss.LabelNameToIDs(names, nameIDs) snap := s.lss.CreateLabelSetSnapshot() // Act - groupedSeries := snap.GroupSeriesByLabelNames([]uint32{idA0, idA1, idB}, jobId) + groupedSeries := snap.GroupSeriesByLabelNames([]uint32{idA0, idA1, idB}, nameIDs) // Assert s.Equal([][]uint32{{idA0, idA1}, {idB}}, groupedSeries.Groups) @@ -69,10 +71,12 @@ func (s *LabelSetSnapshotSuite) TestGroupSeriesByLabelNames_ByJobAndInstance() { idSame0 := s.lss.FindOrEmplace(model.NewLabelSetBuilder().Set("__name__", "m1").Set("job", "a").Set("instance", "i0").Build()).LabelSetID idOther := s.lss.FindOrEmplace(model.NewLabelSetBuilder().Set("__name__", "m2").Set("job", "a").Set("instance", "i1").Build()).LabelSetID idSame1 := s.lss.FindOrEmplace(model.NewLabelSetBuilder().Set("__name__", "m3").Set("job", "a").Set("instance", "i0").Build()).LabelSetID + names := []string{"job", "instance"} + nameIDs := make([]uint32, len(names)) - ids := s.lss.GetLabelNameIDs([]string{"job", "instance"}) - jobID := ids[0] - instanceID := ids[1] + s.lss.LabelNameToIDs(names, nameIDs) + jobID := nameIDs[0] + instanceID := nameIDs[1] snap := s.lss.CreateLabelSetSnapshot() diff --git a/pp/go/cppbridge/primitives_lss.go b/pp/go/cppbridge/primitives_lss.go index 745bef62d9..a9373ea57d 100644 --- a/pp/go/cppbridge/primitives_lss.go +++ b/pp/go/cppbridge/primitives_lss.go @@ -1,6 +1,7 @@ package cppbridge import ( + "fmt" "runtime" "unsafe" @@ -124,11 +125,14 @@ func (lss *LabelSetStorage) QueryLabelValues( return result } -// GetLabelNameIDs - returns label name ids -func (lss *LabelSetStorage) GetLabelNameIDs(names []string) []uint32 { - out := primitivesLSSGetLabelNameIDs(lss.pointer, names) +// LabelNameToIDs get label name ids from lss. +func (lss *LabelSetStorage) LabelNameToIDs(names []string, namesIDs []uint32) { + if len(names) != len(namesIDs) { + panic(fmt.Sprintf("names and namesIDs must have the same length: %d != %d", len(names), len(namesIDs))) + } + + primitivesLSSGetLabelNameIDs(lss.pointer, names, namesIDs) runtime.KeepAlive(lss) - return out } // GetLabelSets - returns copy of lss data. diff --git a/pp/go/cppbridge/primitives_lss_test.go b/pp/go/cppbridge/primitives_lss_test.go index feef518127..ae484f99c7 100644 --- a/pp/go/cppbridge/primitives_lss_test.go +++ b/pp/go/cppbridge/primitives_lss_test.go @@ -362,12 +362,14 @@ func (s *QueryableLSSSuite) TestQueryLabelValues() { func (s *QueryableLSSSuite) TestGetLabelNameIDs() { // Arrange + names := []string{"lol", "foo", "nope", "lol"} + nameIDs := make([]uint32, len(names)) // Act - out := s.lss.GetLabelNameIDs([]string{"lol", "foo", "nope", "lol"}) + s.lss.LabelNameToIDs(names, nameIDs) // Assert - s.Equal([]uint32{0, 3, math.MaxUint32, 0}, out) + s.Equal([]uint32{0, 3, math.MaxUint32, 0}, nameIDs) } func (s *QueryableLSSSuite) testQueryLabelValuesImpl(testCase queryLabelValuesCase) { diff --git a/pp/go/storage/appender/appender_test.go b/pp/go/storage/appender/appender_test.go index 5e7bfbade0..90ecdf94aa 100644 --- a/pp/go/storage/appender/appender_test.go +++ b/pp/go/storage/appender/appender_test.go @@ -143,7 +143,7 @@ func (s *AppenderSuite) TestDropInvalidSeries() { // Act stats, err := s.appender.Append( context.Background(), - storagetest.NewIncomingData(&s.Suite, []model.TimeSeries{ + storagetest.NewIncomingData(s.Require().NoError, []model.TimeSeries{ { LabelSet: model.NewLabelSetBuilder().Set("name", "metric1").Build(), Timestamp: 1, @@ -165,7 +165,7 @@ func (s *AppenderSuite) TestAppendMultipleSamplesInOneSeries() { // Act stats, err := s.appender.Append( context.Background(), - storagetest.NewIncomingData(&s.Suite, []model.TimeSeries{ + storagetest.NewIncomingData(s.Require().NoError, []model.TimeSeries{ { LabelSet: model.NewLabelSetBuilder().Set("__name__", "metric1").Build(), Timestamp: 1, @@ -213,7 +213,7 @@ func (s *AppenderSuite) TestSeriesPerShardTransfer() { // Act stats, err := s.appender.Append( context.Background(), - storagetest.NewIncomingData(&s.Suite, []model.TimeSeries{ + storagetest.NewIncomingData(s.Require().NoError, []model.TimeSeries{ { LabelSet: model.NewLabelSetBuilder().Set("__name__", "metric1").Build(), Timestamp: 1, @@ -261,7 +261,7 @@ func (s *AppenderSuite) TestShardedRelabeledSeriesFullNotEmpty() { // Act stats, err := s.appender.Append( context.Background(), - storagetest.NewIncomingData(&s.Suite, []model.TimeSeries{ + storagetest.NewIncomingData(s.Require().NoError, []model.TimeSeries{ { LabelSet: model.NewLabelSetBuilder().Set("__name__", "metric1").Build(), Timestamp: 1, @@ -330,7 +330,7 @@ func (s *AppenderSuite) TestTrackStaleness() { // Act stats, err := s.appender.Append( context.Background(), - storagetest.NewIncomingData(&s.Suite, []model.TimeSeries{ + storagetest.NewIncomingData(s.Require().NoError, []model.TimeSeries{ { LabelSet: model.NewLabelSetBuilder().Set("__name__", "metric1").Build(), Timestamp: 1, @@ -377,7 +377,7 @@ func (s *AppenderSuite) TestTrackStalenessWithoutHonorTimestamps() { // Act stats, err := s.appender.Append( context.Background(), - storagetest.NewIncomingData(&s.Suite, []model.TimeSeries{ + storagetest.NewIncomingData(s.Require().NoError, []model.TimeSeries{ { LabelSet: model.NewLabelSetBuilder().Set("__name__", "metric1").Build(), Timestamp: 1, @@ -420,7 +420,7 @@ func (s *AppenderSuite) TestWithoutCommitToWal() { // Act _, err := s.appender.Append( context.Background(), - storagetest.NewIncomingData(&s.Suite, []model.TimeSeries{ + storagetest.NewIncomingData(s.Require().NoError, []model.TimeSeries{ { LabelSet: model.NewLabelSetBuilder().Set("__name__", "metric1").Build(), Timestamp: 1, @@ -442,7 +442,7 @@ func (s *AppenderSuite) TestWithCommitToWal() { // Act _, err := s.appender.Append( context.Background(), - storagetest.NewIncomingData(&s.Suite, []model.TimeSeries{ + storagetest.NewIncomingData(s.Require().NoError, []model.TimeSeries{ { LabelSet: model.NewLabelSetBuilder().Set("__name__", "metric1").Build(), Timestamp: 1, @@ -464,7 +464,7 @@ func (s *AppenderSuite) TestWithCommitToWalByLimitExhausted() { // Act _, err := s.appender.Append( context.Background(), - storagetest.NewIncomingData(&s.Suite, []model.TimeSeries{ + storagetest.NewIncomingData(s.Require().NoError, []model.TimeSeries{ { LabelSet: model.NewLabelSetBuilder().Set("__name__", "metric1").Build(), Timestamp: 1, @@ -496,7 +496,7 @@ func (s *AppenderSuite) TestWithCommitToWalByLimitExhausted() { // Act _, _, _ = s.appender.Append( context.Background(), - storagetest.NewIncomingData(&s.Suite, []model.TimeSeries{ + storagetest.NewIncomingData(s.Require().NoError, []model.TimeSeries{ { LabelSet: model.NewLabelSetBuilder().Set("__name__", "metric1").Build(), Timestamp: 1, @@ -508,7 +508,7 @@ func (s *AppenderSuite) TestWithCommitToWalByLimitExhausted() { _, stats, err := s.appender.Append( context.Background(), - storagetest.NewIncomingData(&s.Suite, []model.TimeSeries{ + storagetest.NewIncomingData(s.Require().NoError, []model.TimeSeries{ { LabelSet: model.NewLabelSetBuilder().Set("__name__", "metric1").Build(), Timestamp: 1, diff --git a/pp/go/storage/block/writer_test.go b/pp/go/storage/block/writer_test.go index ce1a070921..76524792fe 100644 --- a/pp/go/storage/block/writer_test.go +++ b/pp/go/storage/block/writer_test.go @@ -111,7 +111,7 @@ func (s *WriterSuite) shard() *shard.Shard { func (s *WriterSuite) fillHead() { ts := time.UnixMilli(1753805651969) - storagetest.MustAppendTimeSeries(&s.Suite, s.head, []storagetest.TimeSeries{ + storagetest.MustAppendTimeSeries(s.T().Context(), s.Require().NoError, s.head, []storagetest.TimeSeries{ { Labels: labels.FromStrings("__name__", "value1"), Samples: []cppbridge.Sample{ @@ -283,7 +283,7 @@ func (s *WriterSuite) TestWriteWithDataUnloadingInBatches() { func (s *WriterSuite) TestSkipEmptyBlock() { // Arrange - storagetest.MustAppendTimeSeries(&s.Suite, s.head, []storagetest.TimeSeries{ + storagetest.MustAppendTimeSeries(s.T().Context(), s.Require().NoError, s.head, []storagetest.TimeSeries{ { Labels: labels.FromStrings("__name__", "value1"), Samples: []cppbridge.Sample{ diff --git a/pp/go/storage/head/head/head.go b/pp/go/storage/head/head/head.go index dbadb80ce5..57fe78d60f 100644 --- a/pp/go/storage/head/head/head.go +++ b/pp/go/storage/head/head/head.go @@ -66,8 +66,6 @@ type Head[TShard Shard, TGShard Shard] struct { // for tasks metrics tasksCreated *prometheus.CounterVec tasksDone *prometheus.CounterVec - tasksLive *prometheus.CounterVec - tasksExecute *prometheus.CounterVec // pools for reusable objects headPool *poolprovider.HeadPool[TGShard] @@ -89,9 +87,9 @@ func NewHead[TShard Shard, TGShard Shard]( concurrency := calculateHeadConcurrency(numberOfShards) // current head workers concurrency for shardID := range numberOfShards { // append and query can create 2 tasks per request, so minimal length of channel is - // cap(querySemaphore)*2+cap(appendSemaphore)*2 = 2*concurrency*2+2*concurrency*2 = 8*concurrency - // add extra slots to channel for safety = x9 for back pressure - taskChs[shardID] = make(chan *task.Generic[TGShard], 9*concurrency) + // cap(querySemaphore)*4+cap(appendSemaphore)*2 = 2*concurrency*4+2*concurrency*2 = 12*concurrency + // add extra slots to channel for safety = x13 for back pressure + taskChs[shardID] = make(chan *task.Generic[TGShard], 13*concurrency) } factory := util.NewUnconflictRegisterer(registerer) diff --git a/pp/go/storage/head/poolprovider/poolprovider.go b/pp/go/storage/head/poolprovider/poolprovider.go index b913fc39fa..e98186d9e9 100644 --- a/pp/go/storage/head/poolprovider/poolprovider.go +++ b/pp/go/storage/head/poolprovider/poolprovider.go @@ -6,6 +6,7 @@ import ( "github.com/prometheus/prometheus/pp/go/cppbridge" "github.com/prometheus/prometheus/pp/go/storage/head/task" "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/util/pool" "github.com/prometheus/prometheus/util/zeropool" ) @@ -33,16 +34,22 @@ type HeadPool[TGShard Shard] struct { shardedInnerSeriesPool sync.Pool statsPool zeropool.Pool[[]cppbridge.RelabelerStats] // use in querier - snapshotsPool zeropool.Pool[[]*cppbridge.LabelSetSnapshot] - lssQueryResultsPool zeropool.Pool[[]*cppbridge.LSSQueryResult] - selectorsPool zeropool.Pool[[]uintptr] - seriesSetPool zeropool.Pool[[]storage.SeriesSet] - chunkSeriesSetPool zeropool.Pool[[]storage.ChunkSeriesSet] - serializedDataPool zeropool.Pool[[]*cppbridge.DataStorageSerializedData] - errorsPool zeropool.Pool[[]error] + snapshotsPool zeropool.Pool[[]*cppbridge.LabelSetSnapshot] + lssQueryResultsPool zeropool.Pool[[]*cppbridge.LSSQueryResult] + selectorsPool zeropool.Pool[[]uintptr] + seriesSetPool zeropool.Pool[[]storage.SeriesSet] + chunkSeriesSetPool zeropool.Pool[[]storage.ChunkSeriesSet] + serializedDataPool zeropool.Pool[[]*cppbridge.DataStorageSerializedData] + errorsPool zeropool.Pool[[]error] + sliceOfTimestampsPool zeropool.Pool[[][]int64] + timestampsPool pool.SlicePool[int64] + seriesGroupsPool zeropool.Pool[[]*cppbridge.SeriesGroups] + nameIDsPool pool.SlicePool[uint32] } // NewHeadPool init new [HeadPool], pools for reusable objects. +// +//revive:disable-next-line:function-length // this is constructor. func NewHeadPool[TGShard Shard](numberOfShards uint16) *HeadPool[TGShard] { return &HeadPool[TGShard]{ // used to reuse tasks @@ -92,6 +99,14 @@ func NewHeadPool[TGShard Shard](numberOfShards uint16) *HeadPool[TGShard] { errorsPool: zeropool.New(func() []error { return make([]error, numberOfShards) }), + sliceOfTimestampsPool: zeropool.New(func() [][]int64 { + return make([][]int64, numberOfShards) + }), + timestampsPool: pool.NewSlicePool[int64]([]int{2, 4, 8, 16, 32, 64, 128, 256, 512, 1024}), + seriesGroupsPool: zeropool.New(func() []*cppbridge.SeriesGroups { + return make([]*cppbridge.SeriesGroups, numberOfShards) + }), + nameIDsPool: pool.NewSlicePool[uint32]([]int{0, 1, 2, 3, 5}), } } @@ -225,3 +240,47 @@ func (hp *HeadPool[TGShard]) PutErrors(errs []error) { clear(errs) hp.errorsPool.Put(errs) } + +// GetSliceOfTimestamps gets a slice of []int64 from the pool. +func (hp *HeadPool[TGShard]) GetSliceOfTimestamps() [][]int64 { + return hp.sliceOfTimestampsPool.Get() +} + +// PutSliceOfTimestamps adds slice of []int64 to the pool after resetting it. +func (hp *HeadPool[TGShard]) PutSliceOfTimestamps(ts [][]int64) { + clear(ts) + hp.sliceOfTimestampsPool.Put(ts) +} + +// GetTimestamps gets a slice of [int64] from the pool. +func (hp *HeadPool[TGShard]) GetTimestamps(size int) []int64 { + return hp.timestampsPool.Get(size) +} + +// PutTimestamps adds slice of [int64] to the pool after resetting it. +func (hp *HeadPool[TGShard]) PutTimestamps(ts []int64) { + clear(ts) + hp.timestampsPool.Put(ts) +} + +// GetSeriesGroups gets a slice of [cppbridge.SeriesGroups] from the pool. +func (hp *HeadPool[TGShard]) GetSeriesGroups() []*cppbridge.SeriesGroups { + return hp.seriesGroupsPool.Get() +} + +// PutSeriesGroups adds slice of [cppbridge.SeriesGroups] to the pool after resetting it. +func (hp *HeadPool[TGShard]) PutSeriesGroups(groups []*cppbridge.SeriesGroups) { + clear(groups) + hp.seriesGroupsPool.Put(groups) +} + +// GetNameIDs gets a slice of [uint32] from the pool. +func (hp *HeadPool[TGShard]) GetNameIDs(size int) []uint32 { + return hp.nameIDsPool.Get(size) +} + +// PutNameIDs adds slice of [uint32] to the pool after resetting it. +func (hp *HeadPool[TGShard]) PutNameIDs(nameIDs []uint32) { + clear(nameIDs) + hp.nameIDsPool.Put(nameIDs) +} diff --git a/pp/go/storage/head/services/persistener_test.go b/pp/go/storage/head/services/persistener_test.go index a1fc2b1d1e..306d5c780c 100644 --- a/pp/go/storage/head/services/persistener_test.go +++ b/pp/go/storage/head/services/persistener_test.go @@ -147,7 +147,7 @@ func (s *PersistenerSuite) TestNoPersistWritableHead() { func (s *PersistenerSuite) TestNoPersistPersistedHead() { // Arrange head := s.mustCreateHead() - storagetest.MustAppendTimeSeries(&s.Suite, head, []storagetest.TimeSeries{ + storagetest.MustAppendTimeSeries(s.T().Context(), s.Require().NoError, head, []storagetest.TimeSeries{ { Labels: labels.FromStrings("__name__", "value1"), Samples: []cppbridge.Sample{ @@ -172,7 +172,7 @@ func (s *PersistenerSuite) TestNoPersistPersistedHead() { func (s *PersistenerSuite) TestOutdatedPersistedHead() { // Arrange head := s.mustCreateHead() - storagetest.MustAppendTimeSeries(&s.Suite, head, []storagetest.TimeSeries{ + storagetest.MustAppendTimeSeries(s.T().Context(), s.Require().NoError, head, []storagetest.TimeSeries{ { Labels: labels.FromStrings("__name__", "value1"), Samples: []cppbridge.Sample{ @@ -199,7 +199,7 @@ func (s *PersistenerSuite) TestOutdatedHead() { s.clock.Advance(tsdbRetentionPeriod) head := s.mustCreateHead() - storagetest.MustAppendTimeSeries(&s.Suite, head, []storagetest.TimeSeries{ + storagetest.MustAppendTimeSeries(s.T().Context(), s.Require().NoError, head, []storagetest.TimeSeries{ { Labels: labels.FromStrings("__name__", "value1"), Samples: []cppbridge.Sample{ @@ -231,7 +231,7 @@ func (s *PersistenerSuite) TestPersistHeadSuccess() { } head := s.mustCreateHead() - storagetest.MustAppendTimeSeries(&s.Suite, head, []storagetest.TimeSeries{ + storagetest.MustAppendTimeSeries(s.T().Context(), s.Require().NoError, head, []storagetest.TimeSeries{ { Labels: labels.FromStrings("__name__", "value1"), Samples: []cppbridge.Sample{ @@ -271,7 +271,7 @@ func (s *PersistenerSuite) TestPersistHeadErrorOnBlockWriterForSecondShard() { } head := s.mustCreateHead() - storagetest.MustAppendTimeSeries(&s.Suite, head, []storagetest.TimeSeries{ + storagetest.MustAppendTimeSeries(s.T().Context(), s.Require().NoError, head, []storagetest.TimeSeries{ { Labels: labels.FromStrings("__name__", "value1"), Samples: []cppbridge.Sample{ @@ -341,7 +341,7 @@ func (s *PersistenerServiceSuite) TestRemoveOutdatedHeadFromKeeper() { // Arrange s.clock.Advance(tsdbRetentionPeriod) head := s.mustCreateHead() - storagetest.MustAppendTimeSeries(&s.Suite, head, []storagetest.TimeSeries{ + storagetest.MustAppendTimeSeries(s.T().Context(), s.Require().NoError, head, []storagetest.TimeSeries{ { Labels: labels.FromStrings("__name__", "value1"), Samples: []cppbridge.Sample{ @@ -364,7 +364,7 @@ func (s *PersistenerServiceSuite) TestRemoveOutdatedHeadFromKeeper() { func (s *PersistenerServiceSuite) TestLoadHeadsInKeeper() { // Arrange head := s.mustCreateHead() - storagetest.MustAppendTimeSeries(&s.Suite, head, []storagetest.TimeSeries{ + storagetest.MustAppendTimeSeries(s.T().Context(), s.Require().NoError, head, []storagetest.TimeSeries{ { Labels: labels.FromStrings("__name__", "value1"), Samples: []cppbridge.Sample{ @@ -387,7 +387,7 @@ func (s *PersistenerServiceSuite) TestLoadHeadsInKeeper() { func (s *PersistenerServiceSuite) TestHeadAlreadyExistsInKeeper() { // Arrange head := s.mustCreateHead() - storagetest.MustAppendTimeSeries(&s.Suite, head, []storagetest.TimeSeries{ + storagetest.MustAppendTimeSeries(s.T().Context(), s.Require().NoError, head, []storagetest.TimeSeries{ { Labels: labels.FromStrings("__name__", "value1"), Samples: []cppbridge.Sample{ diff --git a/pp/go/storage/head/shard/data_storage.go b/pp/go/storage/head/shard/data_storage.go index ddc6818449..bf1bf706dd 100644 --- a/pp/go/storage/head/shard/data_storage.go +++ b/pp/go/storage/head/shard/data_storage.go @@ -90,9 +90,9 @@ func (ds *DataStorage) QueryFinal(queriers []uintptr) { } // QueryFirstTimestamps fills timestamps with the first sample timestamp (Prometheus ms) for each series in seriesIDs. -func (ds *DataStorage) QueryFirstTimestamps(ids []uint32, timestamps []int64) { +func (ds *DataStorage) QueryFirstTimestamps(ids []uint32, timestamps []int64, notFoundTimestampValue int64) { ds.locker.RLock() - ds.dataStorage.QueryFirstTimestamps(ids, timestamps) + ds.dataStorage.QueryFirstTimestamps(ids, timestamps, notFoundTimestampValue) ds.locker.RUnlock() } diff --git a/pp/go/storage/head/shard/lss.go b/pp/go/storage/head/shard/lss.go index d5ecbbf735..54b9dbabd3 100644 --- a/pp/go/storage/head/shard/lss.go +++ b/pp/go/storage/head/shard/lss.go @@ -46,11 +46,27 @@ func (l *LSS) CopyAddedSeriesTo(destination *LSS) { destination.dstSrcLsIdsMapping = snapshot.CopyAddedSeries(bitsetSeries, destination.target) } +// GroupSeriesByLabelNames group series by label names. +func (l *LSS) GroupSeriesByLabelNames(seriesIDs, labelNameIDs []uint32) *cppbridge.SeriesGroups { + l.locker.RLock() + snapshot := l.getSnapshot() + l.locker.RUnlock() + + return snapshot.GroupSeriesByLabelNames(seriesIDs, labelNameIDs) +} + // Input returns input lss. func (l *LSS) Input() *cppbridge.LabelSetStorage { return l.input } +// LabelNameToIDs get label name ids from lss. +func (l *LSS) LabelNameToIDs(names []string, namesIDs []uint32) { + l.locker.RLock() + l.target.LabelNameToIDs(names, namesIDs) + l.locker.RUnlock() +} + // QueryLabelNames add to dedup all the unique label names present in lss in sorted order. func (l *LSS) QueryLabelNames( shardID uint16, diff --git a/pp/go/storage/head/transactionhead/head.go b/pp/go/storage/head/transactionhead/head.go index e0d9f7aa87..da30aebab4 100644 --- a/pp/go/storage/head/transactionhead/head.go +++ b/pp/go/storage/head/transactionhead/head.go @@ -64,7 +64,7 @@ func NewHead[TShard Shard, TGShard Shard]( } // AcquireQuery implementation of the working [Head], no blocking. -func (*Head[TShard, TGShard]) AcquireQuery(ctx context.Context) (func(), error) { +func (*Head[TShard, TGShard]) AcquireQuery(context.Context) (func(), error) { return noopRelease, nil } @@ -98,6 +98,11 @@ func (*Head[TShard, TGShard]) Generation() uint64 { return 0 } +// ID returns the [Head] ID. +func (h *Head[TShard, TGShard]) ID() string { + return h.id +} + // NumberOfShards returns current number of shards in to [Head]. func (*Head[TShard, TGShard]) NumberOfShards() uint16 { return 1 diff --git a/pp/go/storage/loader_test.go b/pp/go/storage/loader_test.go index a69ea9cc5f..2c526e09d5 100644 --- a/pp/go/storage/loader_test.go +++ b/pp/go/storage/loader_test.go @@ -133,7 +133,7 @@ func (s *HeadLoadSuite) lockFileForCreation(fileName string) { } func (s *HeadLoadSuite) appendTimeSeries(head *storage.Head, timeSeries []storagetest.TimeSeries) { - storagetest.MustAppendTimeSeries(&s.Suite, head, timeSeries) + storagetest.MustAppendTimeSeries(s.T().Context(), s.Require().NoError, head, timeSeries) } func (*HeadLoadSuite) shards(head *storage.Head) (result []*shard.Shard) { diff --git a/pp/go/storage/querier/agg_series_set.go b/pp/go/storage/querier/agg_series_set.go new file mode 100644 index 0000000000..df4c5d7003 --- /dev/null +++ b/pp/go/storage/querier/agg_series_set.go @@ -0,0 +1,374 @@ +package querier + +import ( + "errors" + "fmt" + "runtime" + "slices" + + "github.com/prometheus/prometheus/model/histogram" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/pp/go/cppbridge" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb/chunkenc" + "github.com/prometheus/prometheus/util/annotations" + "github.com/prometheus/prometheus/util/pool" +) + +// +// AggSeriesSet +// + +// AggSeriesSet contains a set of aggregated series. +// If grouping is empty, it will return series with labels "__head__shard_id". +// If grouping is not empty, it will return series with "__head__shard_id" and the grouping labels. +type AggSeriesSet struct { + serializedData *cppbridge.DataStorageSerializedData + labelSetSnapshot *cppbridge.LabelSetSnapshot + seriesGroups *cppbridge.SeriesGroups + mint, maxt int64 + grouping []string + headID string + shardID uint16 + + series []AggSeries + nextGroupIndex int +} + +// NewAggSeriesSet initializes a new [AggSeriesSet]. +func NewAggSeriesSet( + serializedData *cppbridge.DataStorageSerializedData, + labelSetSnapshot *cppbridge.LabelSetSnapshot, + seriesGroups *cppbridge.SeriesGroups, + mint, maxt int64, + grouping []string, + headID string, + shardID uint16, +) *AggSeriesSet { + return &AggSeriesSet{ + serializedData: serializedData, + labelSetSnapshot: labelSetSnapshot, + seriesGroups: seriesGroups, + mint: mint, + maxt: maxt, + grouping: grouping, + headID: headID, + shardID: shardID, + series: make([]AggSeries, 0, len(seriesGroups.Groups)), + } +} + +// At returns the current series. +// [storage.SeriesSet] interface implementation. +func (ss *AggSeriesSet) At() storage.Series { + return &ss.series[len(ss.series)-1] +} + +// Err returns the error of the [AggSeriesSet] - always nil. +// [storage.SeriesSet] interface implementation. +func (*AggSeriesSet) Err() error { + return nil +} + +// Next advances the iterator by one and returns false if there are no more values. +// [storage.SeriesSet] interface implementation. +func (ss *AggSeriesSet) Next() bool { + if ss.serializedData == nil { + return false + } + + if ss.nextGroupIndex >= len(ss.seriesGroups.Groups) { + return false + } + + builder := builderPool.Get().(*labels.ScratchBuilder) + builder.Reset() + ss.series = append(ss.series, NewAggSeries( + aggLabelSetCtor( + builder, + ss.labelSetSnapshot, + ss.grouping, + ss.headID, + ss.seriesGroups.Groups[ss.nextGroupIndex][0], // 0 is the first series ID + ss.shardID, + ), + ss.serializedData, + ss.seriesGroups, + ss.nextGroupIndex, + ss.mint, + ss.maxt, + )) + builderPool.Put(builder) + ss.nextGroupIndex++ + + return true +} + +// Warnings returns the warnings of the [AggSeriesSet] - always nil. +// [storage.SeriesSet] interface implementation. +func (*AggSeriesSet) Warnings() annotations.Annotations { + return nil +} + +// +// AggSeries +// + +// AggSeries represents a time series with aggregated samples. +type AggSeries struct { + labelSet labels.Labels + serializedData *cppbridge.DataStorageSerializedData + seriesGroups *cppbridge.SeriesGroups + groupIndex int + mint, maxt int64 +} + +// NewAggSeries initializes a new [AggSeries]. +func NewAggSeries( + labelSet labels.Labels, + serializedData *cppbridge.DataStorageSerializedData, + seriesGroups *cppbridge.SeriesGroups, + groupIndex int, + mint, maxt int64, +) AggSeries { + return AggSeries{ + labelSet: labelSet, + serializedData: serializedData, + seriesGroups: seriesGroups, + groupIndex: groupIndex, + mint: mint, + maxt: maxt, + } +} + +// Iterator returns an iterator that iterates over the aggregated samples of the [AggSeries]. +// [storage.Series] interface implementation. +func (s *AggSeries) Iterator(it chunkenc.Iterator) chunkenc.Iterator { + chunkIterator, ok := it.(*AggChunkIterator) + if !ok { + return NewAggChunkIterator( + s.serializedData, + s.seriesGroups, + s.groupIndex, + s.mint, + s.maxt, + ) + } + + chunkIterator.reset(s.serializedData, s.seriesGroups, s.groupIndex, s.mint, s.maxt) + return chunkIterator +} + +// Labels returns the labels of the [AggSeries]. +// [storage.Series] interface implementation. +func (s *AggSeries) Labels() labels.Labels { + return s.labelSet +} + +// +// AggChunkIterator +// + +// AggChunkIterator iterates over the aggregated samples of a time series, that can only get the next value. +type AggChunkIterator struct { + serializedData *cppbridge.DataStorageSerializedData + seriesGroups *cppbridge.SeriesGroups + chunkIterator cppbridge.DataStorageSerializedDataMultiSeriesIterator + mint int64 + maxt int64 + isInitialized bool +} + +// NewAggChunkIterator initializes a new [AggChunkIterator]. +func NewAggChunkIterator( + serializedData *cppbridge.DataStorageSerializedData, + seriesGroups *cppbridge.SeriesGroups, + groupIndex int, + mint, maxt int64, +) *AggChunkIterator { + it := &AggChunkIterator{ + serializedData: serializedData, + seriesGroups: seriesGroups, + chunkIterator: cppbridge.NewDataStorageSerializedDataMultiSeriesIterator( + serializedData, + seriesGroups.Groups[groupIndex], + ), + mint: mint, + maxt: maxt, + } + + runtime.SetFinalizer(it, func(iter *AggChunkIterator) { + iter.chunkIterator.Close() + iter.serializedData = nil + iter.seriesGroups = nil + }) + + return it +} + +// At returns the current timestamp/value pair if the value is a float. +// [chunkenc.Iterator] interface implementation. +// +//nolint:gocritic // unnamedResult not need +func (it *AggChunkIterator) At() (int64, float64) { + return it.chunkIterator.Timestamp(), it.chunkIterator.Value() +} + +// AtFloatHistogram returns the current timestamp/value pair if the value is a histogram with floating-point counts. +// [chunkenc.Iterator] interface implementation. +func (*AggChunkIterator) AtFloatHistogram(*histogram.FloatHistogram) (int64, *histogram.FloatHistogram) { + return 0, nil +} + +// AtHistogram returns the current timestamp/value pair if the value is a histogram with integer counts. +// [chunkenc.Iterator] interface implementation. +func (*AggChunkIterator) AtHistogram(*histogram.Histogram) (int64, *histogram.Histogram) { + return 0, nil +} + +// AtT returns the current timestamp. +// [chunkenc.Iterator] interface implementation. +func (it *AggChunkIterator) AtT() int64 { + return it.chunkIterator.Timestamp() +} + +// Err returns the current error - always nil. +// [chunkenc.Iterator] interface implementation. +func (*AggChunkIterator) Err() error { + return nil +} + +// Next advances the iterator by one and returns the type of the value. +// [chunkenc.Iterator] interface implementation. +func (it *AggChunkIterator) Next() chunkenc.ValueType { + if it.nextValue() == chunkenc.ValNone { + return chunkenc.ValNone + } + + if it.AtT() > it.maxt { + return chunkenc.ValNone + } + + return chunkenc.ValFloat +} + +// Seek advances the iterator forward to the first sample with a timestamp equal or greater than t. +// [chunkenc.Iterator] interface implementation. +func (it *AggChunkIterator) Seek(t int64) chunkenc.ValueType { + it.isInitialized = true + if t > it.AtT() { + return it.Next() + } + + if it.AtT() > it.maxt { + return chunkenc.ValNone + } + + return chunkenc.ValFloat +} + +// nextValue advances the iterator by one and returns the type of the value. +func (it *AggChunkIterator) nextValue() chunkenc.ValueType { + if !it.isInitialized { + if !it.chunkIterator.HasData() { + return chunkenc.ValNone + } + + it.isInitialized = true + return chunkenc.ValFloat + } + + it.chunkIterator.Next() + if !it.chunkIterator.HasData() { + return chunkenc.ValNone + } + + return chunkenc.ValFloat +} + +// reset resets the iterator to the beginning of the serialized data. +func (it *AggChunkIterator) reset( + serializedData *cppbridge.DataStorageSerializedData, + seriesGroups *cppbridge.SeriesGroups, + groupIndex int, + mint, maxt int64, +) { + it.serializedData = serializedData + it.seriesGroups = seriesGroups + it.mint = mint + it.maxt = maxt + it.isInitialized = false + it.chunkIterator.Reset(serializedData, seriesGroups.Groups[groupIndex]) +} + +// +// aggLabelSetCtor +// + +const ( + // labelHeadIDShardID is the label name for the head ID and shard ID. + labelHeadIDShardID = "__head__shard_id" +) + +var ( + // groupingPool is a pool of slices for sorted grouping. + groupingPool = pool.NewSlicePool[string]([]int{2, 3, 5}) + + // errGroupingLabelsIsEnough is the error returned when the grouping labels is enough. + errGroupingLabelsIsEnough = errors.New("grouping labels is enough") +) + +// aggLabelSetCtor constructs the label set for an aggregated series. +func aggLabelSetCtor( + sb *labels.ScratchBuilder, + snapshot *cppbridge.LabelSetSnapshot, + grouping []string, + headID string, + seriesID uint32, + shardID uint16, +) labels.Labels { + sb.Add(labelHeadIDShardID, fmt.Sprintf("%s__%d", headID, shardID)) + + if len(grouping) == 0 { + return sb.Labels() + } + + // grouping must be sorted + var sortedGrouping []string + if len(grouping) == 1 { + sortedGrouping = grouping + } else { + sortedGrouping = groupingPool.Get(len(grouping)) + defer groupingPool.Put(sortedGrouping) + + copy(sortedGrouping, grouping) + slices.Sort(sortedGrouping) + } + + i := 0 + _ = snapshot.RangeLabelSet(seriesID, func(l cppbridge.Label) error { + if i >= len(sortedGrouping) { + // fast exit if the grouping labels is enough + return errGroupingLabelsIsEnough + } + + if l.Name > sortedGrouping[i] { + i++ + + if i >= len(sortedGrouping) { + // fast exit if the grouping labels is enough + return errGroupingLabelsIsEnough + } + } + + if l.Name == sortedGrouping[i] { + sb.Add(l.Name, l.Value) + i++ + } + + return nil + }) + sb.Sort() + + return sb.Labels() +} diff --git a/pp/go/storage/querier/agg_series_set_test.go b/pp/go/storage/querier/agg_series_set_test.go new file mode 100644 index 0000000000..f0efa1ab52 --- /dev/null +++ b/pp/go/storage/querier/agg_series_set_test.go @@ -0,0 +1,416 @@ +package querier_test + +import ( + "math" + "testing" + + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/model/value" + "github.com/prometheus/prometheus/pp/go/cppbridge" + "github.com/prometheus/prometheus/pp/go/model" + "github.com/prometheus/prometheus/pp/go/storage/head/shard" + "github.com/prometheus/prometheus/pp/go/storage/querier" + "github.com/prometheus/prometheus/pp/go/storage/storagetest" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb/chunkenc" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" +) + +type AggSeriesSetSuite struct { + suite.Suite + + timeSeries []storagetest.TimeSeries + lss *shard.LSS + ds *shard.DataStorage +} + +func TestAggSeriesSetSuite(t *testing.T) { + suite.Run(t, new(AggSeriesSetSuite)) +} + +func (s *AggSeriesSetSuite) SetupTest() { + s.lss = shard.NewLSS() + s.ds = shard.NewDataStorage() + + s.timeSeries = []storagetest.TimeSeries{ + { + Labels: labels.FromStrings("__name__", "metric", "job", "test", "instance", "instance1"), + Samples: []cppbridge.Sample{{Timestamp: 10, Value: 1}}, + }, + { + Labels: labels.FromStrings("__name__", "metric", "job", "test", "instance", "instance1"), + Samples: []cppbridge.Sample{{Timestamp: 11, Value: 3}}, + }, + { + Labels: labels.FromStrings("__name__", "metric", "job", "test", "instance", "instance1"), + Samples: []cppbridge.Sample{{Timestamp: 12, Value: 5}}, + }, + { + Labels: labels.FromStrings("__name__", "metric", "job", "test", "instance", "instance1"), + Samples: []cppbridge.Sample{{Timestamp: 13, Value: 7}}, + }, + { + Labels: labels.FromStrings("__name__", "metric", "job", "test2", "instance", "instance2"), + Samples: []cppbridge.Sample{{Timestamp: 10, Value: 1}}, + }, + { + Labels: labels.FromStrings("__name__", "metric", "job", "test2", "instance", "instance2"), + Samples: []cppbridge.Sample{{Timestamp: 11, Value: 2}}, + }, + { + Labels: labels.FromStrings("__name__", "metric", "job", "test2", "instance", "instance2"), + Samples: []cppbridge.Sample{{Timestamp: 12, Value: 4}}, + }, + } +} + +func (s *AggSeriesSetSuite) query( + lss *shard.LSS, + ds *shard.DataStorage, + start, end, downsamplingMs int64, + hints *storage.SelectHints, + matchers ...model.LabelMatcher, +) storage.SeriesSet { + selector, snapshot, err := lss.QuerySelector(0, matchers) + s.Require().NoError(err) + + if selector == 0 || snapshot == nil { + return &querier.AggSeriesSet{} + } + + lssQueryResult := snapshot.Query(selector) + if lssQueryResult.Status() == cppbridge.LSSQueryStatusNoMatch { + return &querier.AggSeriesSet{} + } + + valueNotFoundTimestampValue := int64(0) + timestamps := make([]int64, lssQueryResult.Len()) + ds.QueryFirstTimestamps(lssQueryResult.IDs(), timestamps, 0) + + sNaNSS := querier.NewStaleNaNSeriesSet( + querier.NewStaleNaNSeriesSliceFromTimestamps(timestamps), + lssQueryResult, + snapshot, + valueNotFoundTimestampValue, + ) + + dsQueryResult := ds.Query(cppbridge.DataStorageQuery{ + StartTimestampMs: start, + EndTimestampMs: end, + LabelSetIDs: lssQueryResult.IDs(), + }, downsamplingMs, hints) + + nameIDs := make([]uint32, len(hints.Grouping)) + lss.LabelNameToIDs(hints.Grouping, nameIDs) + seriesGroups := lss.GroupSeriesByLabelNames(lssQueryResult.IDs(), nameIDs) + + s.Require().Equal(cppbridge.DataStorageQueryStatusSuccess, dsQueryResult.Status) + + aggSS := querier.NewAggSeriesSet( + dsQueryResult.SerializedData, + snapshot, + seriesGroups, + start, + end, + hints.Grouping, + "head_id", + 0, + ) + + return querier.NewMergeShardSeriesSet([]storage.SeriesSet{sNaNSS, aggSS}) +} + +func (s *AggSeriesSetSuite) TestQueryWithoutGrouping() { + // Arrange + matcher := model.LabelMatcher{ + Name: "__name__", + Value: "metric", + MatcherType: model.MatcherTypeExactMatch, + } + + var start int64 + var end int64 = 50 + hints := &storage.SelectHints{ + Start: 10, + End: 13, + Step: 1, + Func: "sum", + Range: 0, + } + + expected := []storagetest.TimeSeries{ + { + Labels: labels.FromStrings("__head__shard_id", "head_id__0"), + Samples: []cppbridge.Sample{}, + }, + { + Labels: labels.FromStrings("__name__", "metric", "job", "test", "instance", "instance1"), + Samples: []cppbridge.Sample{{Timestamp: 10, Value: math.Float64frombits(value.StaleNaN)}}, + }, + { + Labels: labels.FromStrings("__name__", "metric", "job", "test2", "instance", "instance2"), + Samples: []cppbridge.Sample{{Timestamp: 10, Value: math.Float64frombits(value.StaleNaN)}}, + }, + } + + storagetest.MustAppendTimeSeriesToLSSAndDataStorage(s.lss, s.ds, s.timeSeries...) + + // Act + seriesSet := s.query(s.lss, s.ds, start, end, cppbridge.NoDownsampling, hints, matcher) + + // Assert + actual := storagetest.TimeSeriesFromSeriesSet(seriesSet, true) + s.Require().Equal(len(expected), len(actual)) + s.Require().Equal(expected[0].Labels, actual[0].Labels) + s.Require().Equal(expected[1].Labels, actual[1].Labels) + s.Require().Equal(expected[2].Labels, actual[2].Labels) +} + +func (s *AggSeriesSetSuite) TestQueryGrouping_OneGroupingLabel() { + // Arrange + matcher := model.LabelMatcher{ + Name: "__name__", + Value: "metric", + MatcherType: model.MatcherTypeExactMatch, + } + + var start int64 + var end int64 = 50 + hints := &storage.SelectHints{ + Start: 10, + End: 13, + Step: 1, + Func: "sum", + Range: 0, + Grouping: []string{"job"}, + } + + expected := []storagetest.TimeSeries{ + { + Labels: labels.FromStrings("__head__shard_id", "head_id__0", "job", "test"), + Samples: []cppbridge.Sample{}, + }, + { + Labels: labels.FromStrings("__head__shard_id", "head_id__0", "job", "test2"), + Samples: []cppbridge.Sample{}, + }, + { + Labels: labels.FromStrings("__name__", "metric", "job", "test", "instance", "instance1"), + Samples: []cppbridge.Sample{{Timestamp: 10, Value: math.Float64frombits(value.StaleNaN)}}, + }, + { + Labels: labels.FromStrings("__name__", "metric", "job", "test2", "instance", "instance2"), + Samples: []cppbridge.Sample{{Timestamp: 10, Value: math.Float64frombits(value.StaleNaN)}}, + }, + } + + storagetest.MustAppendTimeSeriesToLSSAndDataStorage(s.lss, s.ds, s.timeSeries...) + + // Act + seriesSet := s.query(s.lss, s.ds, start, end, cppbridge.NoDownsampling, hints, matcher) + + // Assert + actual := storagetest.TimeSeriesFromSeriesSet(seriesSet, true) + s.Require().Equal(len(expected), len(actual)) + s.Require().Equal(expected[0].Labels, actual[0].Labels) + s.Require().Equal(expected[1].Labels, actual[1].Labels) + s.Require().Equal(expected[2].Labels, actual[2].Labels) + s.Require().Equal(expected[3].Labels, actual[3].Labels) +} + +func (s *AggSeriesSetSuite) TestQueryGrouping_TwoGroupingLabels() { + // Arrange + matcher := model.LabelMatcher{ + Name: "__name__", + Value: "metric", + MatcherType: model.MatcherTypeExactMatch, + } + + var start int64 + var end int64 = 50 + hints := &storage.SelectHints{ + Start: 10, + End: 13, + Step: 1, + Func: "sum", + Range: 0, + Grouping: []string{"instance", "job"}, + } + + expected := []storagetest.TimeSeries{ + { + Labels: labels.FromStrings("__head__shard_id", "head_id__0", "job", "test", "instance", "instance1"), + Samples: []cppbridge.Sample{}, + }, + { + Labels: labels.FromStrings("__head__shard_id", "head_id__0", "job", "test2", "instance", "instance2"), + Samples: []cppbridge.Sample{}, + }, + { + Labels: labels.FromStrings("__name__", "metric", "job", "test", "instance", "instance1"), + Samples: []cppbridge.Sample{{Timestamp: 10, Value: math.Float64frombits(value.StaleNaN)}}, + }, + { + Labels: labels.FromStrings("__name__", "metric", "job", "test2", "instance", "instance2"), + Samples: []cppbridge.Sample{{Timestamp: 10, Value: math.Float64frombits(value.StaleNaN)}}, + }, + } + + storagetest.MustAppendTimeSeriesToLSSAndDataStorage(s.lss, s.ds, s.timeSeries...) + + // Act + seriesSet := s.query(s.lss, s.ds, start, end, cppbridge.NoDownsampling, hints, matcher) + + // Assert + actual := storagetest.TimeSeriesFromSeriesSet(seriesSet, true) + s.Require().Equal(len(expected), len(actual)) + s.Require().Equal(expected[0].Labels, actual[0].Labels) + s.Require().Equal(expected[1].Labels, actual[1].Labels) + s.Require().Equal(expected[2].Labels, actual[2].Labels) + s.Require().Equal(expected[3].Labels, actual[3].Labels) +} + +func (s *AggSeriesSetSuite) TestQueryGrouping_TwoGroupingLabels_WithMissingGroupingLabel() { + // Arrange + matcher := model.LabelMatcher{ + Name: "__name__", + Value: "metric", + MatcherType: model.MatcherTypeExactMatch, + } + + var start int64 + var end int64 = 50 + hints := &storage.SelectHints{ + Start: 10, + End: 13, + Step: 1, + Func: "sum", + Range: 0, + Grouping: []string{"job", "instance", "head"}, + } + + expected := []storagetest.TimeSeries{ + { + Labels: labels.FromStrings("__head__shard_id", "head_id__0", "job", "test", "instance", "instance1"), + Samples: []cppbridge.Sample{}, + }, + { + Labels: labels.FromStrings("__head__shard_id", "head_id__0", "job", "test2", "instance", "instance2"), + Samples: []cppbridge.Sample{}, + }, + { + Labels: labels.FromStrings("__name__", "metric", "job", "test", "instance", "instance1"), + Samples: []cppbridge.Sample{{Timestamp: 10, Value: math.Float64frombits(value.StaleNaN)}}, + }, + { + Labels: labels.FromStrings("__name__", "metric", "job", "test2", "instance", "instance2"), + Samples: []cppbridge.Sample{{Timestamp: 10, Value: math.Float64frombits(value.StaleNaN)}}, + }, + } + + storagetest.MustAppendTimeSeriesToLSSAndDataStorage(s.lss, s.ds, s.timeSeries...) + + // Act + seriesSet := s.query(s.lss, s.ds, start, end, cppbridge.NoDownsampling, hints, matcher) + + // Assert + actual := storagetest.TimeSeriesFromSeriesSet(seriesSet, true) + s.Require().Equal(len(expected), len(actual)) + s.Require().Equal(expected[0].Labels, actual[0].Labels) + s.Require().Equal(expected[1].Labels, actual[1].Labels) +} + +// +// TODO DELETE +// + +func TestAGGSS(t *testing.T) { + hints := &storage.SelectHints{ + Start: 0, + End: 6, + Step: 1, + Func: "sum", + Range: 1, + Grouping: []string{ + "shard_id", + "even_numbered", + "head_id", + }, + By: true, + } + + shardID := uint16(0) + head := makeHead(2, 10, 5) + + selector, snapshot, err := head.lsses[shardID].QuerySelector( + shardID, + []model.LabelMatcher{{ + Name: "__name__", + Value: "metric", + MatcherType: model.MatcherTypeExactMatch, + }}, + ) + require.NoError(t, err) + + lssQueryResult := snapshot.Query(selector) + require.Equal(t, cppbridge.LSSQueryStatusMatch, lssQueryResult.Status()) + seriesIDs := lssQueryResult.IDs() + t.Log("seriesIDs", seriesIDs) + + nameIDs := make([]uint32, len(hints.Grouping)) + t.Log("nameIDs empty", nameIDs) + head.lsses[0].LabelNameToIDs(hints.Grouping, nameIDs) + t.Log("nameIDs filled", nameIDs) + + seriesGroups := head.lsses[shardID].GroupSeriesByLabelNames(seriesIDs, nameIDs) + t.Log("seriesGroups", seriesGroups.Groups) + + valueNotFoundTimestampValue := int64(0) + timestamps := make([]int64, lssQueryResult.Len()) + head.dss[shardID].QueryFirstTimestamps(lssQueryResult.IDs(), timestamps, 0) + t.Log("timestamps", timestamps) + + sNaNSS := querier.NewStaleNaNSeriesSet( + querier.NewStaleNaNSeriesSliceFromTimestamps(timestamps), + lssQueryResult, + snapshot, + valueNotFoundTimestampValue, + ) + + result := head.dss[shardID].Query( + cppbridge.DataStorageQuery{ + StartTimestampMs: hints.Start, + EndTimestampMs: hints.End, + LabelSetIDs: seriesIDs, + }, + cppbridge.NoDownsampling, + hints, + ) + + aggSS := querier.NewAggSeriesSet( + result.SerializedData, + snapshot, + seriesGroups, + hints.Start, + hints.End, + hints.Grouping, + "headID", + shardID, + ) + + ss := querier.NewMergeShardSeriesSet([]storage.SeriesSet{sNaNSS, aggSS}) + + var it chunkenc.Iterator + for ss.Next() { + s := ss.At() + t.Log("s.Labels()", s.Labels()) + it = s.Iterator(it) + t.Log("it.Next()", it.Next()) + t.Log(it.At()) + for it.Next() != chunkenc.ValNone { + ts, v := it.At() + t.Log("ts", ts, "v", v) + } + } +} diff --git a/pp/go/storage/querier/interface.go b/pp/go/storage/querier/interface.go index c822592ced..03b5d30390 100644 --- a/pp/go/storage/querier/interface.go +++ b/pp/go/storage/querier/interface.go @@ -56,6 +56,10 @@ type DataStorage interface { hints *storage.SelectHints, ) cppbridge.DataStorageQueryResult + // QueryFirstTimestamps fills timestamps with the first sample + // timestamp (Prometheus ms) for each series in seriesIDs. + QueryFirstTimestamps(ids []uint32, timestamps []int64, notFoundTimestampValue int64) + // WithRLock calls fn on raw [cppbridge.DataStorage] with read lock. WithRLock(fn func(ds *cppbridge.DataStorage) error) error } @@ -66,6 +70,12 @@ type DataStorage interface { // LSS the minimum required [LSS] implementation. type LSS interface { + // GroupSeriesByLabelNames group series by label names. + GroupSeriesByLabelNames(seriesIDs, labelNameIDs []uint32) *cppbridge.SeriesGroups + + // LabelNameToIDs get label name ids from lss. + LabelNameToIDs(names []string, namesIDs []uint32) + // QueryLabelNames returns all the unique label names present in lss in sorted order. QueryLabelNames( shardID uint16, @@ -134,6 +144,9 @@ type Head[ // EnqueueOnShard the task to be executed on head on specific shard. EnqueueOnShard(t TTask, shardID uint16) + // ID returns the [Head] ID. + ID() string + // NumberOfShards returns current number of shards in to [Head]. NumberOfShards() uint16 diff --git a/pp/go/storage/querier/merge_series_set.go b/pp/go/storage/querier/merge_series_set.go index 6fe49f7dc6..48695335a4 100644 --- a/pp/go/storage/querier/merge_series_set.go +++ b/pp/go/storage/querier/merge_series_set.go @@ -40,6 +40,38 @@ func NewMergeShardSeriesSet(sets []storage.SeriesSet) storage.SeriesSet { return s } +// NewMergeManyShardSeriesSets merges many [storage.SeriesSet] together from different shards. +// The implementation assumes only our shard-local sets are passed in: +// - always no nil sets; +// - always no Err()/Warnings() - because shards should not have any errors and warnings; +// - always sorted output sets; +// - always no identical label sets across shards. +func NewMergeManyShardSeriesSets(ssets ...[]storage.SeriesSet) storage.SeriesSet { + if len(ssets) == 0 { + return emptySeriesSet + } + + length := 0 + for _, sets := range ssets { + length += len(sets) + } + + s := &mergeShardSeriesSet{ + heap: make(seriesSetHeap, 0, length), + } + + for _, sets := range ssets { + for _, set := range sets { + // shard series don't have errors and not nil, so we can safely call Next + if set.Next() { + heap.Push(&s.heap, set) + } + } + } + + return s +} + // At returns the current [storage.Series], implement [storage.SeriesSet] interface. func (s *mergeShardSeriesSet) At() storage.Series { return s.currentSet.At() diff --git a/pp/go/storage/querier/merge_series_set_test.go b/pp/go/storage/querier/merge_series_set_test.go index a384025163..4e80f62948 100644 --- a/pp/go/storage/querier/merge_series_set_test.go +++ b/pp/go/storage/querier/merge_series_set_test.go @@ -57,6 +57,7 @@ func TestMergeShardSeriesSetSuite(t *testing.T) { suite.Run(t, new(MergeShardSeriesSetSuite)) } +//revive:disable-next-line:cognitive-complexity // this is a test. func (s *MergeShardSeriesSetSuite) TestMergeShardSeriesSetScenarios() { var start int64 matcher := model.LabelMatcher{ @@ -76,8 +77,24 @@ func (s *MergeShardSeriesSetSuite) TestMergeShardSeriesSetScenarios() { esets := make([]storage.SeriesSet, 0, bm.numShards) asets := make([]storage.SeriesSet, 0, bm.numShards) for i := 0; i < bm.numShards; i++ { - esets = append(esets, queryOpt(s.T(), head.lsses[i], head.dss[i], start, end, cppbridge.NoDownsampling, matcher)) - asets = append(asets, queryOpt(s.T(), head.lsses[i], head.dss[i], start, end, cppbridge.NoDownsampling, matcher)) + esets = append(esets, queryOpt( + s.T(), + head.lsses[i], + head.dss[i], + start, + end, + cppbridge.NoDownsampling, + matcher, + )) + asets = append(asets, queryOpt( + s.T(), + head.lsses[i], + head.dss[i], + start, + end, + cppbridge.NoDownsampling, + matcher, + )) } assertMergeShardSeriesSetsEqual(s, esets, asets) }) @@ -90,8 +107,24 @@ func (s *MergeShardSeriesSetSuite) TestMergeShardSeriesSetScenarios() { esets = append(esets, &querier.SeriesSet{}) asets = append(asets, &querier.SeriesSet{}) } - esets = append(esets, queryOpt(s.T(), head.lsses[i], head.dss[i], start, end, cppbridge.NoDownsampling, matcher)) - asets = append(asets, queryOpt(s.T(), head.lsses[i], head.dss[i], start, end, cppbridge.NoDownsampling, matcher)) + esets = append(esets, queryOpt( + s.T(), + head.lsses[i], + head.dss[i], + start, + end, + cppbridge.NoDownsampling, + matcher, + )) + asets = append(asets, queryOpt( + s.T(), + head.lsses[i], + head.dss[i], + start, + end, + cppbridge.NoDownsampling, + matcher, + )) } assertMergeShardSeriesSetsEqual(s, esets, asets) }) @@ -100,10 +133,26 @@ func (s *MergeShardSeriesSetSuite) TestMergeShardSeriesSetScenarios() { esets := make([]storage.SeriesSet, 0, bm.numShards) asets := make([]storage.SeriesSet, 0, bm.numShards) for i := 0; i < bm.numShards; i++ { - esets = append(esets, queryOpt(s.T(), head.lsses[i], head.dss[i], start, end, cppbridge.NoDownsampling, matcher)) + esets = append(esets, queryOpt( + s.T(), + head.lsses[i], + head.dss[i], + start, + end, + cppbridge.NoDownsampling, + matcher, + )) } for i := bm.numShards - 1; i >= 0; i-- { - asets = append(asets, queryOpt(s.T(), head.lsses[i], head.dss[i], start, end, cppbridge.NoDownsampling, matcher)) + asets = append(asets, queryOpt( + s.T(), + head.lsses[i], + head.dss[i], + start, + end, + cppbridge.NoDownsampling, + matcher, + )) } assertMergeShardSeriesSetsEqual(s, esets, asets) }) @@ -141,7 +190,15 @@ func BenchmarkMergeSeriesSet(b *testing.B) { b.StopTimer() seriesSets = seriesSets[:0] for i := 0; i < bm.numShards; i++ { - seriesSets = append(seriesSets, queryOpt(b, head.lsses[i], head.dss[i], start, end, cppbridge.NoDownsampling, matcher)) + seriesSets = append(seriesSets, queryOpt( + b, + head.lsses[i], + head.dss[i], + start, + end, + cppbridge.NoDownsampling, + matcher, + )) } b.StartTimer() @@ -185,15 +242,17 @@ func makeHead(numShards, numSeries, numSamples int) *testHead { func makeTimeSeries(numSeries, numSamples, shardID int) []storagetest.TimeSeries { timeSeries := make([]storagetest.TimeSeries, 0, numSeries) for j := range numSeries { + evenNumbered := j%2 == 0 ls := labels.FromStrings( "__name__", "metric", + "even_numbered", fmt.Sprintf("%t", evenNumbered), "foo", fmt.Sprintf("bar%d", j), "shard_id", fmt.Sprintf("id_%d", shardID), ) samples := make([]cppbridge.Sample, 0, numSamples) for k := range numSamples { - samples = append(samples, cppbridge.Sample{Timestamp: int64(k), Value: float64(k)}) + samples = append(samples, cppbridge.Sample{Timestamp: int64(k + 1), Value: float64(k)}) } timeSeries = append(timeSeries, storagetest.TimeSeries{Labels: ls, Samples: samples}) diff --git a/pp/go/storage/querier/querier.go b/pp/go/storage/querier/querier.go index d3aba0012f..5566278c4f 100644 --- a/pp/go/storage/querier/querier.go +++ b/pp/go/storage/querier/querier.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "math" "sort" "time" "unsafe" @@ -28,13 +29,21 @@ const ( // lssLabelNamesQuerier name of task. lssLabelNamesQuerier = "lss_label_names_querier" + // lssGroupSeriesByLabelNames name of task. + lssGroupSeriesByLabelNames = "lss_group_series_by_label_names" + // dsQueryInstantQuerier name of task. dsQueryInstantQuerier = "data_storage_query_instant_querier" // dsQueryRangeQuerier name of task. dsQueryRangeQuerier = "data_storage_query_range_querier" + // dsQueryFirstTimestampsQuerier name of task. + dsQueryFirstTimestampsQuerier = "data_storage_query_first_timestamps_querier" - // DefaultInstantQueryValueNotFoundTimestampValue default value for not found timestamp value. + // DefaultInstantQueryValueNotFoundTimestampValue default value for not found timestamp value for instant query. DefaultInstantQueryValueNotFoundTimestampValue int64 = 0 + + // DefaultNotFoundTimestampValue default value for not found timestamp value. + DefaultNotFoundTimestampValue int64 = math.MinInt64 ) // @@ -65,6 +74,12 @@ const ( allOptimizeType queryOptimizeType = dropPointOptimizeType | newPointOptimizeType | crossSeriesOptimizeType ) +// DefaultCountOfSeriesToOptimize is the default count of series to optimize. +const DefaultCountOfSeriesToOptimize = 6 + +// defaultOptimizeType is the default option for selecting functions optimization. +var defaultOptimizeType = noneOptimizeType + // SetSelectFuncOptimize sets the select func optimization option by name. func SetSelectFuncOptimize(opt string) error { switch opt { @@ -73,15 +88,15 @@ func SetSelectFuncOptimize(opt string) error { return nil case "drop_point": - selectFuncOptimize = dropPointOptimizeType + selectFuncOptimize |= dropPointOptimizeType return nil case "new_point": - selectFuncOptimize = newPointOptimizeType + selectFuncOptimize |= newPointOptimizeType return nil case "cross": - selectFuncOptimize = crossSeriesOptimizeType + selectFuncOptimize |= crossSeriesOptimizeType return nil case "all": @@ -96,12 +111,20 @@ func SetSelectFuncOptimize(opt string) error { } } +// SetDefaultOptimizeType set the default option for selecting functions optimization. +func SetDefaultOptimizeType() { + selectFuncOptimize = defaultOptimizeType +} + // selectFuncOptimize is the option for selecting functions optimization. -var selectFuncOptimize = noneOptimizeType +var selectFuncOptimize = defaultOptimizeType // emptySelectHints is an empty select hints, it's used when no optimization is needed. var emptySelectHints = &storage.SelectHints{} +// emptySeriesSet is an empty series set. +var emptySeriesSet = &SeriesSet{} + // // Querier // @@ -116,9 +139,9 @@ type Querier[ ] struct { mint int64 maxt int64 + scrapeInterval int64 head THead deduplicatorCtor deduplicatorCtor - closer func() error metrics *Metrics queryOptimize queryOptimizeType } @@ -133,11 +156,18 @@ func NewQuerier[ ]( head THead, deduplicatorCtor deduplicatorCtor, - mint, maxt int64, - closer func() error, + mint, maxt, scrapeInterval int64, metrics *Metrics, ) *Querier[TTask, TDataStorage, TLSS, TShard, THead] { - return newQuerierWithSelectFuncOptimize(head, deduplicatorCtor, mint, maxt, closer, metrics, selectFuncOptimize) + return newQuerierWithSelectFuncOptimize( + head, + deduplicatorCtor, + mint, + maxt, + scrapeInterval, + metrics, + selectFuncOptimize, + ) } // NewQuerierWithOutSelectFuncOptimize init new [Querier] without select func optimization. @@ -150,8 +180,7 @@ func NewQuerierWithOutSelectFuncOptimize[ ]( head THead, deduplicatorCtor deduplicatorCtor, - mint, maxt int64, - closer func() error, + mint, maxt, scrapeInterval int64, metrics *Metrics, ) *Querier[TTask, TDataStorage, TLSS, TShard, THead] { return newQuerierWithSelectFuncOptimize( @@ -159,7 +188,7 @@ func NewQuerierWithOutSelectFuncOptimize[ deduplicatorCtor, mint, maxt, - closer, + scrapeInterval, metrics, selectFuncOptimize&dropPointOptimizeType, ) @@ -175,17 +204,16 @@ func newQuerierWithSelectFuncOptimize[ ]( head THead, deduplicatorCtor deduplicatorCtor, - mint, maxt int64, - closer func() error, + mint, maxt, scrapeInterval int64, metrics *Metrics, queryOptimize queryOptimizeType, ) *Querier[TTask, TDataStorage, TLSS, TShard, THead] { return &Querier[TTask, TDataStorage, TLSS, TShard, THead]{ mint: mint, maxt: maxt, + scrapeInterval: scrapeInterval, head: head, deduplicatorCtor: deduplicatorCtor, - closer: closer, metrics: metrics, queryOptimize: queryOptimize, } @@ -194,11 +222,7 @@ func newQuerierWithSelectFuncOptimize[ // Close [Querier] if need. // //revive:disable-next-line:confusing-naming // other type of querier. -func (q *Querier[TTask, TDataStorage, TLSS, TShard, THead]) Close() error { - if q.closer != nil { - return q.closer() - } - +func (*Querier[TTask, TDataStorage, TLSS, TShard, THead]) Close() error { return nil } @@ -256,6 +280,7 @@ func (q *Querier[TTask, TDataStorage, TLSS, TShard, THead]) Select( if q.mint == q.maxt { return q.selectInstant(ctx, sortSeries, hints, matchers...) } + return q.selectRange(ctx, sortSeries, hints, matchers...) } @@ -273,7 +298,7 @@ func (q *Querier[TTask, TDataStorage, TLSS, TShard, THead]) selectInstant( release, err := q.head.AcquireQuery(ctx) if err != nil { if errors.Is(err, locker.ErrSemaphoreClosed) { - return &SeriesSet{} + return emptySeriesSet } logger.Warnf("[QUERIER]: select instant failed on the capture of the read lock query: %s", err) @@ -325,6 +350,7 @@ func (q *Querier[TTask, TDataStorage, TLSS, TShard, THead]) selectInstant( lssQueryResult.IDs(), uintptr(unsafe.Pointer(unsafe.SliceData(instantSeries))), // #nosec G103 // it's meant to be that way ) + if result.Status == cppbridge.DataStorageQueryStatusNeedDataLoad { loadAndQueryWaiter.Add(s, result.Querier) } @@ -363,7 +389,7 @@ func (q *Querier[TTask, TDataStorage, TLSS, TShard, THead]) selectRange( release, err := q.head.AcquireQuery(ctx) if err != nil { if errors.Is(err, locker.ErrSemaphoreClosed) { - return &SeriesSet{} + return emptySeriesSet } logger.Warnf("[QUERIER]: select range failed on the capture of the read lock query: %s", err) @@ -390,11 +416,26 @@ func (q *Querier[TTask, TDataStorage, TLSS, TShard, THead]) selectRange( return storage.ErrSeriesSet(err) } - hints = SwitchFuncOptimize(hints, q.queryOptimize) + hints = SwitchFuncOptimize(hints, isPossibleToOptimize(lssQueryResults, hints, q.scrapeInterval), q.queryOptimize) shardedSerializedData := poolProvider.GetSerializedData() defer poolProvider.PutSerializedData(shardedSerializedData) queryDataStorage(dsQueryRangeQuerier, q.head, lssQueryResults, shardedSerializedData, q.mint, q.maxt, hints) + if isCrossSeriesFunc(hints) { + return q.makeAggSeriesSet(lssQueryResults, snapshots, shardedSerializedData, hints) + } + + return q.makeSeriesSet(lssQueryResults, snapshots, shardedSerializedData) +} + +// makeSeriesSet makes the series set. +func (q *Querier[TTask, TDataStorage, TLSS, TShard, THead]) makeSeriesSet( + lssQueryResults []*cppbridge.LSSQueryResult, + snapshots []*cppbridge.LabelSetSnapshot, + shardedSerializedData []*cppbridge.DataStorageSerializedData, +) storage.SeriesSet { + poolProvider := q.head.PoolProvider() + seriesSets := poolProvider.GetSeriesSet() defer poolProvider.PutSeriesSet(seriesSets) for shardID, serializedData := range shardedSerializedData { @@ -408,14 +449,113 @@ func (q *Querier[TTask, TDataStorage, TLSS, TShard, THead]) selectRange( ) continue } - seriesSets[shardID] = &SeriesSet{} + + seriesSets[shardID] = emptySeriesSet } return NewMergeShardSeriesSet(seriesSets) } +// makeAggSeriesSet queries the aggregated cross series set. +func (q *Querier[TTask, TDataStorage, TLSS, TShard, THead]) makeAggSeriesSet( + lssQueryResults []*cppbridge.LSSQueryResult, + snapshots []*cppbridge.LabelSetSnapshot, + shardedSerializedData []*cppbridge.DataStorageSerializedData, + hints *storage.SelectHints, +) storage.SeriesSet { + poolProvider := q.head.PoolProvider() + timestamps := poolProvider.GetSliceOfTimestamps() + defer poolProvider.PutSliceOfTimestamps(timestamps) + for i := range timestamps { + if lssQueryResults[i] == nil { + continue + } + + timestamps[i] = poolProvider.GetTimestamps(lssQueryResults[i].Len()) + defer poolProvider.PutTimestamps(timestamps[i]) + } + + tds := q.head.CreateTask( + dsQueryFirstTimestampsQuerier, + func(shard TShard) error { + shardID := shard.ShardID() + res := lssQueryResults[shardID] + if res == nil { + return nil + } + + shard.DataStorage().QueryFirstTimestamps(res.IDs(), timestamps[shardID], DefaultNotFoundTimestampValue) + + return nil + }, + ) + q.head.Enqueue(tds) + + seriesGroups := poolProvider.GetSeriesGroups() + defer poolProvider.PutSeriesGroups(seriesGroups) + tlss := q.head.CreateTask( + lssGroupSeriesByLabelNames, + func(shard TShard) error { + shardID := shard.ShardID() + res := lssQueryResults[shardID] + if res == nil { + return nil + } + + nameIDs := poolProvider.GetNameIDs(len(hints.Grouping)) + shard.LSS().LabelNameToIDs(hints.Grouping, nameIDs) + seriesGroups[shardID] = shard.LSS().GroupSeriesByLabelNames(res.IDs(), nameIDs) + poolProvider.PutNameIDs(nameIDs) + + return nil + }, + ) + q.head.Enqueue(tlss) + + _ = tds.Wait() + _ = tlss.Wait() + q.head.PutTask(tds) + q.head.PutTask(tlss) + + seriesSets := poolProvider.GetSeriesSet() + defer poolProvider.PutSeriesSet(seriesSets) + sNaNSeriesSets := poolProvider.GetSeriesSet() + defer poolProvider.PutSeriesSet(sNaNSeriesSets) + for shardID, serializedData := range shardedSerializedData { + if serializedData == nil { + sNaNSeriesSets[shardID] = emptySeriesSet + seriesSets[shardID] = emptySeriesSet + continue + } + + sNaNSeriesSets[shardID] = NewStaleNaNSeriesSet( + NewStaleNaNSeriesSliceFromTimestamps(timestamps[shardID]), + lssQueryResults[shardID], + snapshots[shardID], + DefaultNotFoundTimestampValue, + ) + + seriesSets[shardID] = NewAggSeriesSet( + serializedData, + snapshots[shardID], + seriesGroups[shardID], + q.mint, + q.maxt, + hints.Grouping, + q.head.ID(), + uint16(shardID), // #nosec G115 // no overflow + ) + } + + return NewMergeManyShardSeriesSets(seriesSets, sNaNSeriesSets) +} + // SwitchFuncOptimize switch the function optimization hints. -func SwitchFuncOptimize(hints *storage.SelectHints, queryOptimize queryOptimizeType) *storage.SelectHints { +func SwitchFuncOptimize( + hints *storage.SelectHints, + possibleToOptimize func() bool, + queryOptimize queryOptimizeType, +) *storage.SelectHints { if hints == nil { return emptySelectHints } @@ -424,13 +564,46 @@ func SwitchFuncOptimize(hints *storage.SelectHints, queryOptimize queryOptimizeT return emptySelectHints } - if funcOptimizeMap[hints.Func]&queryOptimize != 0 && isNotWithpout(hints) { + if funcOptimizeMap[hints.Func]&queryOptimize != 0 && + isNotWithpout(hints) && + isAllowedGroupingForCrossSeriesFunc(hints.Grouping) && + possibleToOptimize() { return hints } return emptySelectHints } +// isPossibleToOptimize checks if the query possible to optimization. +func isPossibleToOptimize( + lssQueryResults []*cppbridge.LSSQueryResult, + hints *storage.SelectHints, + scrapeInterval int64, +) func() bool { + return func() bool { + countOfSeries := 0 + for _, lssQueryResult := range lssQueryResults { + if lssQueryResult == nil { + continue + } + + countOfSeries += lssQueryResult.Len() + } + + //revive:disable-next-line:add-constant // x2 scrape interval are required to enable optimization + if hints.Step*1e6 >= scrapeInterval*2 { + return true + } + + if hints.Step == 0 && isCrossSeriesFunc(hints) { + //revive:disable-next-line:add-constant // x3 we need to optimize the query for the crossseries function + return countOfSeries >= DefaultCountOfSeriesToOptimize*3 + } + + return countOfSeries >= DefaultCountOfSeriesToOptimize + } +} + // isNotWithpout checks if the hints is not without by. func isNotWithpout(hints *storage.SelectHints) bool { return hints.By || len(hints.Grouping) == 0 @@ -463,6 +636,26 @@ var funcOptimizeMap = func() map[string]queryOptimizeType { return functions }() +// isAllowedGroupingForCrossSeriesFunc checks if the series set is an cross series set. +func isAllowedGroupingForCrossSeriesFunc(grouping []string) bool { + for _, group := range grouping { + if group == labelHeadIDShardID { + logger.Infof( + "[QUERIER]: __head__shard_id is in the grouping, it will be ignored: %s", + group, + ) + return false + } + } + + return true +} + +// isCrossSeriesFunc checks if the function is a cross series function. +func isCrossSeriesFunc(hints *storage.SelectHints) bool { + return funcOptimizeMap[hints.Func]&crossSeriesOptimizeType == crossSeriesOptimizeType +} + // convertPrometheusMatchersToPPMatchers converts prometheus matchers to pp matchers. func convertPrometheusMatchersToPPMatchers(matchers ...*labels.Matcher) []model.LabelMatcher { promppMatchers := make([]model.LabelMatcher, len(matchers)) diff --git a/pp/go/storage/querier/querier_optimize_quick_test.go b/pp/go/storage/querier/querier_optimize_quick_test.go index 194cb7ec76..1b4ef4abf9 100644 --- a/pp/go/storage/querier/querier_optimize_quick_test.go +++ b/pp/go/storage/querier/querier_optimize_quick_test.go @@ -41,6 +41,27 @@ func (qp QueryParam) Generate(rd *rand.Rand, _ int) reflect.Value { return reflect.ValueOf(qp) } +// Format formats the query parameter to string. +func (qp QueryParam) Format(f fmt.State, _ rune) { + _, _ = fmt.Fprintf( + f, + "{start: %d, end: %d, step: %d}", + qp.Start.UnixMilli(), + qp.End.UnixMilli(), + qp.Step.Milliseconds(), + ) +} + +// String converts the query parameter to string. +func (qp *QueryParam) String() string { + return fmt.Sprintf( + "{start: %d, end: %d, step: %d}", + qp.Start.UnixMilli(), + qp.End.UnixMilli(), + qp.Step.Milliseconds(), + ) +} + // gen generates a random query parameter. func (qp *QueryParam) gen(rd *rand.Rand) { qp.Start = time.UnixMilli(defaultStartMs - defaultJitterMs + rd.Int63n(2*defaultJitterMs)) @@ -80,6 +101,27 @@ func (sqp SubQueryParams) Generate(rd *rand.Rand, _ int) reflect.Value { return reflect.ValueOf(sqp) } +// Format formats the subquery parameter to string. +func (sqp SubQueryParams) Format(f fmt.State, _ rune) { + _, _ = fmt.Fprintf( + f, + "{query_param: %s, subQueryStep: %d, subQueryRange: %d}", + sqp.QueryParam.String(), + sqp.SubQueryStep.Milliseconds(), + sqp.SubQueryRange.Milliseconds(), + ) +} + +// String converts the subquery parameter to string. +func (sqp *SubQueryParams) String() string { + return fmt.Sprintf( + "{query_param: %s, subQueryStep: %d, subQueryRange: %d}", + sqp.QueryParam.String(), + sqp.SubQueryStep.Milliseconds(), + sqp.SubQueryRange.Milliseconds(), + ) +} + // subGen generates a random subquery parameter. func (sqp *SubQueryParams) subGen(rd *rand.Rand) { sqp.gen(rd) @@ -178,7 +220,8 @@ func (oqp *OffsetQueryParams) offsetGen(rd *rand.Rand) { // type QuickQuerierOptimizeSuite struct { - QuerierOptimizeSuite + suite.Suite + querierOptimize quickQE *promql.Engine } @@ -188,12 +231,12 @@ func TestQuickQuerierOptimizeSuite(t *testing.T) { } func (s *QuickQuerierOptimizeSuite) SetupSuite() { - s.QuerierOptimizeSuite.SetupSuite() + s.querierOptimize.setup(s.T().Context(), s.T().TempDir(), s.Require().NoError) s.quickQE = promql.NewEngine(promql.EngineOpts{ Logger: log.NewNopLogger(), MaxSamples: 500000, - Timeout: 10 * time.Second, + Timeout: 100 * time.Second, LookbackDelta: s.lookbackDelta, NoStepSubqueryIntervalFn: func(int64) int64 { return s.lookbackDelta.Milliseconds() }, EnableAtModifier: true, @@ -201,6 +244,10 @@ func (s *QuickQuerierOptimizeSuite) SetupSuite() { }) } +func (s *QuickQuerierOptimizeSuite) TearDownSuite() { + s.Suite.Require().NoError(s.querierOptimize.close()) +} + func (s *QuickQuerierOptimizeSuite) TestQueryRangeQuickQueryParam() { ctx := s.T().Context() @@ -222,7 +269,7 @@ func (s *QuickQuerierOptimizeSuite) TestQueryRangeQuickQueryParam() { s.Require().NoError(err) defer resOpt.qry.Close() - return s.Equal(res.res, resOpt.res) + return s.True(resultEqual(res.res, resOpt.res, query)) } s.Require().NoError(quick.Check(f, nil)) @@ -251,7 +298,7 @@ func (s *QuickQuerierOptimizeSuite) TestQueryRangeQuickSubQueryParams() { s.Require().NoError(err) defer resOpt.qry.Close() - return s.Equal(res.res, resOpt.res) + return s.True(resultEqual(res.res, resOpt.res, query)) } s.Require().NoError(quick.Check(f, nil)) @@ -280,7 +327,7 @@ func (s *QuickQuerierOptimizeSuite) TestQueryRangeQuickModifierQueryParams() { s.Require().NoError(err) defer resOpt.qry.Close() - return s.Equal(res.res, resOpt.res) + return s.True(resultEqual(res.res, resOpt.res, query)) } s.Require().NoError(quick.Check(f, nil)) @@ -309,7 +356,7 @@ func (s *QuickQuerierOptimizeSuite) TestQueryRangeQuickOffsetQueryParams() { s.Require().NoError(err) defer resOpt.qry.Close() - return s.Equal(res.res, resOpt.res) + return s.True(resultEqual(res.res, resOpt.res, query)) } s.Require().NoError(quick.Check(f, nil)) @@ -338,7 +385,7 @@ func (s *QuickQuerierOptimizeSuite) TestQueryInstantQuickQueryParam() { s.Require().NoError(err) defer resOpt.qry.Close() - return s.Equal(res.res, resOpt.res) + return s.True(resultEqual(res.res, resOpt.res, query)) } s.Require().NoError(quick.Check(f, nil)) @@ -367,7 +414,7 @@ func (s *QuickQuerierOptimizeSuite) TestQueryInstantQuickSubQueryParams() { s.Require().NoError(err) defer resOpt.qry.Close() - return s.Equal(res.res, resOpt.res) + return s.True(resultEqual(res.res, resOpt.res, query)) } s.Require().NoError(quick.Check(f, nil)) @@ -396,7 +443,7 @@ func (s *QuickQuerierOptimizeSuite) TestQueryInstantQuickModifierQueryParams() { s.Require().NoError(err) defer resOpt.qry.Close() - return s.Equal(res.res, resOpt.res) + return s.True(resultEqual(res.res, resOpt.res, query)) } s.Require().NoError(quick.Check(f, nil)) @@ -425,7 +472,7 @@ func (s *QuickQuerierOptimizeSuite) TestQueryInstantQuickOffsetQueryParams() { s.Require().NoError(err) defer resOpt.qry.Close() - return s.Equal(res.res, resOpt.res) + return s.True(resultEqual(res.res, resOpt.res, query)) } s.Require().NoError(quick.Check(f, nil)) diff --git a/pp/go/storage/querier/querier_optimize_test.go b/pp/go/storage/querier/querier_optimize_test.go index 99ff057ac4..fe1bacc07a 100644 --- a/pp/go/storage/querier/querier_optimize_test.go +++ b/pp/go/storage/querier/querier_optimize_test.go @@ -3,9 +3,12 @@ package querier_test import ( "context" "fmt" + "maps" "math" "os" "path/filepath" + "strconv" + "strings" "testing" "time" @@ -13,6 +16,7 @@ import ( "github.com/jonboulle/clockwork" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "github.com/prometheus/prometheus/model/labels" @@ -23,6 +27,7 @@ import ( "github.com/prometheus/prometheus/pp/go/storage/querier" "github.com/prometheus/prometheus/pp/go/storage/storagetest" "github.com/prometheus/prometheus/promql" + "github.com/prometheus/prometheus/promql/parser" prom_storage "github.com/prometheus/prometheus/storage" ) @@ -32,12 +37,18 @@ import ( type SwitchFuncOptimizeSuite struct { suite.Suite + + isPossibleToOptimize func() bool } func TestSwitchFuncOptimizeSuite(t *testing.T) { suite.Run(t, new(SwitchFuncOptimizeSuite)) } +func (s *SwitchFuncOptimizeSuite) SetupSuite() { + s.isPossibleToOptimize = func() bool { return true } +} + func (s *SwitchFuncOptimizeSuite) TestNone() { tests := []struct { hints *prom_storage.SelectHints @@ -78,7 +89,7 @@ func (s *SwitchFuncOptimizeSuite) TestNone() { } for _, test := range tests { - result := querier.SwitchFuncOptimize(test.hints, 0) + result := querier.SwitchFuncOptimize(test.hints, s.isPossibleToOptimize, 0) s.Require().Equal(test.expected, result) } } @@ -123,7 +134,7 @@ func (s *SwitchFuncOptimizeSuite) TestDropPoint() { } for _, test := range tests { - result := querier.SwitchFuncOptimize(test.hints, 1) + result := querier.SwitchFuncOptimize(test.hints, s.isPossibleToOptimize, 1) s.Require().Equal(test.expected, result) } } @@ -168,7 +179,7 @@ func (s *SwitchFuncOptimizeSuite) TestNewPoint() { } for _, test := range tests { - result := querier.SwitchFuncOptimize(test.hints, 2) + result := querier.SwitchFuncOptimize(test.hints, s.isPossibleToOptimize, 2) s.Require().Equal(test.expected, result) } } @@ -213,7 +224,7 @@ func (s *SwitchFuncOptimizeSuite) TestCrossSeries() { } for _, test := range tests { - result := querier.SwitchFuncOptimize(test.hints, 4) + result := querier.SwitchFuncOptimize(test.hints, s.isPossibleToOptimize, 4) s.Require().Equal(test.expected, result) } } @@ -258,7 +269,7 @@ func (s *SwitchFuncOptimizeSuite) TestAll() { } for _, test := range tests { - result := querier.SwitchFuncOptimize(test.hints, 7) + result := querier.SwitchFuncOptimize(test.hints, s.isPossibleToOptimize, 7) s.Require().Equal(test.expected, result) } } @@ -272,7 +283,7 @@ const ( defaultStartMs = 1779290789000 // defaultStep is the default step. - defaultStep = 15 * time.Second + defaultStep = 30 * time.Second ) // @@ -437,11 +448,12 @@ func queryInstant( } // -// QuerierOptimizeSuite +// querierOptimize // -type QuerierOptimizeSuite struct { - suite.Suite +// querierOptimize is the querier optimizer for testing. +type querierOptimize struct { + noErrorFunc storagetest.NoErrorFunc dataDir string head *storage.Head @@ -455,55 +467,63 @@ type QuerierOptimizeSuite struct { queryFuncs []queryFunc } -func (s *QuerierOptimizeSuite) SetupSuite() { +// setup sets up the querier optimizer. +func (s *querierOptimize) setup(ctx context.Context, baseDir string, noErrorFunc storagetest.NoErrorFunc) { + s.noErrorFunc = noErrorFunc s.start = time.UnixMilli(defaultStartMs) s.step = defaultStep s.end = s.start.Add(s.step * defaultCountOfSteps) // 480 steps - s.dataDir = s.createDataDirectory() + s.dataDir = filepath.Join(baseDir, "data") + s.noErrorFunc(os.MkdirAll(s.dataDir, os.ModeDir)) + s.head = s.mustCreateHead(0) - s.fillHead() + s.fillHead(ctx) + s.fillHeadWithCounter(ctx, querier.DefaultCountOfSeriesToOptimize) s.lookbackDelta = 5 * time.Minute s.queryOpts = promql.NewPrometheusQueryOpts(false, s.lookbackDelta) s.queryFuncs = []queryFunc{ + {name: "min_over_time", needRange: true}, // + + {name: "max_over_time", needRange: true}, // + + {name: "last_over_time", needRange: true}, // + + {name: "changes", needRange: true}, // + + {name: "min", needRange: false}, // + + {name: "min by(value) ", needRange: false}, // + + {name: "max", needRange: false}, // + + {name: "max by(value) ", needRange: false}, // + + {name: "sum", needRange: false}, // + + {name: "sum by(value) ", needRange: false}, // + + // {name: "rate", needRange: true}, // - // {name: "irate", needRange: true}, // - // {name: "delta", needRange: true}, // - // {name: "idelta", needRange: true}, // - // {name: "increase", needRange: true}, // - - {name: "min_over_time", needRange: true}, // + - {name: "max_over_time", needRange: true}, // + - {name: "last_over_time", needRange: true}, // + // {name: "sum_over_time", needRange: true}, // - // {name: "resets", needRange: true}, // - - {name: "changes", needRange: true}, // + } q, err := s.Querier(s.start.UnixMilli(), s.end.UnixMilli()) - s.Require().NoError(err) + s.noErrorFunc(err) - names, _, err := q.LabelValues(s.T().Context(), "__name__", &prom_storage.LabelHints{}) - s.Require().NoError(err) + names, _, err := q.LabelValues(ctx, "__name__", &prom_storage.LabelHints{}) + s.noErrorFunc(err) s.metricNames = querier.DeduplicateAndSortStringSlices(names) - s.Require().NoError(q.Close()) -} - -func (s *QuerierOptimizeSuite) TearDownSuite() { - s.Require().NoError(s.head.Close()) + s.noErrorFunc(q.Close()) } -func (s *QuerierOptimizeSuite) createDataDirectory() string { - dataDir := filepath.Join(s.T().TempDir(), "data") - s.Require().NoError(os.MkdirAll(dataDir, os.ModeDir)) - return dataDir +// close closes the querier optimizer. +func (s *querierOptimize) close() error { + return s.head.Close() } -func (s *QuerierOptimizeSuite) mustCreateCatalog() *catalog.Catalog { +// mustCreateHead creates a new head. +func (s *querierOptimize) mustCreateHead(unloadDataStorageInterval time.Duration) *storage.Head { l, err := catalog.NewFileLogV2(filepath.Join(s.dataDir, "catalog.log")) - s.Require().NoError(err) + s.noErrorFunc(err) c, err := catalog.New( clockwork.NewFakeClock(), @@ -512,28 +532,21 @@ func (s *QuerierOptimizeSuite) mustCreateCatalog() *catalog.Catalog { catalog.DefaultMaxLogFileSize, nil, ) - s.Require().NoError(err) + s.noErrorFunc(err) - return c -} - -func (s *QuerierOptimizeSuite) mustCreateHead(unloadDataStorageInterval time.Duration) *storage.Head { h, err := storage.NewBuilder( - s.mustCreateCatalog(), + c, s.dataDir, maxSegmentSize, prometheus.DefaultRegisterer, unloadDataStorageInterval, ).Build(0, numberOfShards) - s.Require().NoError(err) + s.noErrorFunc(err) return h } -func (s *QuerierOptimizeSuite) appendTimeSeries(timeSeries []storagetest.TimeSeries) { - storagetest.MustAppendTimeSeries(&s.Suite, s.head, timeSeries) -} - -func (s *QuerierOptimizeSuite) fillHead() { +// fillHead fills the head with the given time series. +func (s *querierOptimize) fillHead(ctx context.Context) { countOfSamples := (s.end.UnixMilli()-s.start.UnixMilli())/s.step.Milliseconds() + 1 timeSeries := []storagetest.TimeSeries{ { @@ -623,12 +636,49 @@ func (s *QuerierOptimizeSuite) fillHead() { valueCounter++ } - s.appendTimeSeries(timeSeries) + storagetest.MustAppendTimeSeries(ctx, s.noErrorFunc, s.head, timeSeries) +} + +// fillHeadWithCounter fills the head with the given number of counter metrics. +func (s *querierOptimize) fillHeadWithCounter(ctx context.Context, counter int) { + countOfSamples := (s.end.UnixMilli()-s.start.UnixMilli())/s.step.Milliseconds() + 1 + timeSeries := make([]storagetest.TimeSeries, 0, counter*2) + for i := range counter { + timeSeries = append(timeSeries, storagetest.TimeSeries{ + Labels: labels.FromStrings("__name__", "counter_metric", "value", "inc", "counter", strconv.Itoa(i)), + Samples: make([]cppbridge.Sample, 0, countOfSamples), + }) + } + + for i := range counter { + timeSeries = append(timeSeries, storagetest.TimeSeries{ + Labels: labels.FromStrings("__name__", "sin_cos_metric", "value", "sin_cos", "counter", strconv.Itoa(i)), + Samples: make([]cppbridge.Sample, 0, countOfSamples), + }) + } + + valueCounter := 1 + for ts := s.start; !ts.After(s.end); ts = ts.Add(s.step) { + tsMilli := ts.UnixMilli() + for i := range counter { + timeSeries[i].Samples = append(timeSeries[i].Samples, + cppbridge.Sample{Timestamp: tsMilli, Value: float64(valueCounter)}, + ) + + timeSeries[i+counter].Samples = append(timeSeries[i+counter].Samples, + cppbridge.Sample{Timestamp: tsMilli, Value: math.Sin(float64(i))*10 + math.Cos(float64(i))*10}, + ) + } + + valueCounter++ + } + + storagetest.MustAppendTimeSeries(ctx, s.noErrorFunc, s.head, timeSeries) } // Querier implements the [prom_storage.Queryable] interface. -func (s *QuerierOptimizeSuite) Querier(mint, maxt int64) (prom_storage.Querier, error) { - return querier.NewQuerier(s.head, querier.NewNoOpShardedDeduplicator, mint, maxt, nil, nil), nil +func (s *querierOptimize) Querier(mint, maxt int64) (prom_storage.Querier, error) { + return querier.NewQuerier(s.head, querier.NewNoOpShardedDeduplicator, mint, maxt, int64(s.step), nil), nil } // @@ -636,7 +686,8 @@ func (s *QuerierOptimizeSuite) Querier(mint, maxt int64) (prom_storage.Querier, // type MatrixQuerierOptimizeSuiteSuite struct { - QuerierOptimizeSuite + suite.Suite + querierOptimize queryEngine *promql.Engine steps []time.Duration @@ -650,7 +701,7 @@ func TestMatrixQuerierOptimizeSuiteSuite(t *testing.T) { } func (s *MatrixQuerierOptimizeSuiteSuite) SetupSuite() { - s.QuerierOptimizeSuite.SetupSuite() + s.querierOptimize.setup(s.T().Context(), s.T().TempDir(), s.Require().NoError) s.queryEngine = promql.NewEngine(promql.EngineOpts{ Logger: log.NewNopLogger(), @@ -677,6 +728,10 @@ func (s *MatrixQuerierOptimizeSuiteSuite) SetupSuite() { s.Require().NoError(q.Close()) } +func (s *MatrixQuerierOptimizeSuiteSuite) TearDownSuite() { + s.Suite.Require().NoError(s.querierOptimize.close()) +} + // rangeArgs runs the given function for all combinations of // query functions, metric names, steps, subqueries, modifiers and offsets. // @@ -699,17 +754,21 @@ func (s *MatrixQuerierOptimizeSuiteSuite) rangeArgs(fn func( fn(qFunc, metricName, step, subq, mod, o) } } + + if !qFunc.needRange { + break // skip subQuery + } } } } } } -// rangeArgsWithStep runs the given function for all combinations of +// rangeArgsWithoutStep runs the given function for all combinations of // query functions, metric names, subqueries, modifiers and offsets. // //revive:disable-next-line:cognitive-complexity // matrix test -func (s *MatrixQuerierOptimizeSuiteSuite) rangeArgsWithStep(fn func( +func (s *MatrixQuerierOptimizeSuiteSuite) rangeArgsWithoutStep(fn func( qFunc queryFunc, metricName string, subq subQuery, @@ -725,6 +784,10 @@ func (s *MatrixQuerierOptimizeSuiteSuite) rangeArgsWithStep(fn func( fn(qFunc, metricName, subq, mod, o) } } + + if !qFunc.needRange { + break // skip subQuery + } } } } @@ -744,7 +807,7 @@ func (s *MatrixQuerierOptimizeSuiteSuite) TestQueryRange() { s.Require().NoError(err) defer resOpt.qry.Close() - s.Require().Equal(res.res, resOpt.res) + s.Require().True(resultEqual(res.res, resOpt.res, query)) }) }) } @@ -752,7 +815,7 @@ func (s *MatrixQuerierOptimizeSuiteSuite) TestQueryRange() { func (s *MatrixQuerierOptimizeSuiteSuite) TestQueryInstantStart() { ctx := s.T().Context() - s.rangeArgsWithStep(func(qFunc queryFunc, metricName string, subq subQuery, mod modifier, o offset) { + s.rangeArgsWithoutStep(func(qFunc queryFunc, metricName string, subq subQuery, mod modifier, o offset) { query := qFunc.toQueryString(metricName, subq, mod, o) s.Run(query, func() { res, err := queryInstant(ctx, "none", s.queryEngine, s, s.queryOpts, query, s.start) @@ -763,7 +826,7 @@ func (s *MatrixQuerierOptimizeSuiteSuite) TestQueryInstantStart() { s.Require().NoError(err) defer resOpt.qry.Close() - s.Require().Equal(res.res, resOpt.res) + s.Require().True(resultEqual(res.res, resOpt.res, query)) }) }) } @@ -771,7 +834,7 @@ func (s *MatrixQuerierOptimizeSuiteSuite) TestQueryInstantStart() { func (s *MatrixQuerierOptimizeSuiteSuite) TestQueryInstantMiddle() { ctx := s.T().Context() - s.rangeArgsWithStep(func(qFunc queryFunc, metricName string, subq subQuery, mod modifier, o offset) { + s.rangeArgsWithoutStep(func(qFunc queryFunc, metricName string, subq subQuery, mod modifier, o offset) { query := qFunc.toQueryString(metricName, subq, mod, o) s.Run(query, func() { res, err := queryInstant(ctx, "none", s.queryEngine, s, s.queryOpts, query, s.start.Add(s.step*90)) @@ -782,7 +845,7 @@ func (s *MatrixQuerierOptimizeSuiteSuite) TestQueryInstantMiddle() { s.Require().NoError(err) defer resOpt.qry.Close() - s.Require().Equal(res.res, resOpt.res) + s.Require().True(resultEqual(res.res, resOpt.res, query)) }) }) } @@ -790,7 +853,7 @@ func (s *MatrixQuerierOptimizeSuiteSuite) TestQueryInstantMiddle() { func (s *MatrixQuerierOptimizeSuiteSuite) TestQueryInstantEnd() { ctx := s.T().Context() - s.rangeArgsWithStep(func(qFunc queryFunc, metricName string, subq subQuery, mod modifier, o offset) { + s.rangeArgsWithoutStep(func(qFunc queryFunc, metricName string, subq subQuery, mod modifier, o offset) { query := qFunc.toQueryString(metricName, subq, mod, o) s.Run(query, func() { res, err := queryInstant(ctx, "none", s.queryEngine, s, s.queryOpts, query, s.end) @@ -801,7 +864,7 @@ func (s *MatrixQuerierOptimizeSuiteSuite) TestQueryInstantEnd() { s.Require().NoError(err) defer resOpt.qry.Close() - s.Require().Equal(res.res, resOpt.res) + s.Require().True(resultEqual(res.res, resOpt.res, query)) }) }) } @@ -821,7 +884,317 @@ func (s *MatrixQuerierOptimizeSuiteSuite) TestQueryRangeSingle() { s.Require().NoError(err) defer resOpt.qry.Close() - s.Require().Equal(res.res, resOpt.res) + s.Require().True(resultEqual(res.res, resOpt.res, query)) }) } } + +// +// resultEqual +// + +// defaultEpsilon is the default epsilon for comparing two values. +var defaultEpsilon = 0.0000000000001 + +// resultEqual compares two results. +// +//nolint:gocritic // unnamedResult // comporator +func resultEqual(exp, act *promql.Result, query string) (bool, string) { + if exp == nil && act == nil { + return true, "" + } + + if exp == nil || act == nil { + return false, fmt.Sprintf("query: %s\none of the results is nil", query) + } + + if exp.Err != act.Err { + return false, fmt.Sprintf("query: %s\nerror: %v, got %v", query, exp.Err, act.Err) + } + + if !maps.Equal(exp.Warnings, act.Warnings) { + return false, fmt.Sprintf("query: %s\nwarnings: %v, got %v", query, exp.Warnings, act.Warnings) + } + + if eq, result := valueEqual(exp.Value, act.Value); !eq { + return false, fmt.Sprintf("query: %s\n%s", query, result) + } + + return true, "" +} + +// valueEqual compares two values. +// +//nolint:gocritic // unnamedResult // comporator +func valueEqual(exp, act parser.Value) (bool, string) { + if exp == nil && act == nil { + return true, "" + } + + if exp == nil || act == nil { + return false, "one of the values is nil" + } + + if exp.Type() != act.Type() { + return false, fmt.Sprintf("value type: expected %s, got %s", exp.Type(), act.Type()) + } + + switch expType := exp.(type) { + case promql.Scalar: + return scalarEqual(expType, act.(promql.Scalar)) + + case promql.Vector: + return vectorEqual(expType, act.(promql.Vector)) + + case promql.Matrix: + return matrixEqual(expType, act.(promql.Matrix)) + + default: + return false, fmt.Sprintf("expected scalar, vector or matrix, got %T", exp) + } +} + +// scalarEqual compares two scalars. +// +//nolint:gocritic // unnamedResult // comporator +func scalarEqual(exp, act promql.Scalar) (bool, string) { + if exp.T != act.T || !inEpsilon(exp.V, act.V, defaultEpsilon) { + return false, fmt.Sprintf("scalar: %s != %s", exp, act) + } + + return true, "" +} + +// vectorEqual compares two vectors. +// +//nolint:gocritic // unnamedResult // comporator +func vectorEqual(exp, act promql.Vector) (bool, string) { + if len(exp) != len(act) { + return false, fmt.Sprintf("vector: length: %d != %d", len(exp), len(act)) + } + + msg := strings.Builder{} + _, _ = msg.WriteString("vector:\n") + isEqual := true + + for i, v := range exp { + if eq, result := sampleEqual(v, act[i]); !eq { + _, _ = msg.WriteString(result) + isEqual = false + } + } + + if isEqual { + msg.Reset() + } + + return isEqual, msg.String() +} + +// sampleEqual compares two samples. +// +//nolint:gocritic // unnamedResult // comporator +func sampleEqual(exp, act promql.Sample) (bool, string) { + if !labels.Equal(exp.Metric, act.Metric) { + return false, fmt.Sprintf("labels: %s != %s\n", exp.Metric, act.Metric) + } + + msg := strings.Builder{} + _, _ = fmt.Fprintf(&msg, "labels: %s\n", exp.Metric) + isEqual := true + + if exp.T != act.T || !inEpsilon(exp.F, act.F, defaultEpsilon) { + _, _ = fmt.Fprintf( + &msg, + "floats:\n %s != %s\n", + promql.FPoint{T: exp.T, F: exp.F}, + promql.FPoint{T: act.T, F: act.F}, + ) + isEqual = false + } + + if isEqual { + msg.Reset() + } + + return isEqual, msg.String() +} + +// matrixEqual compares two matrices. +// +//nolint:gocritic // unnamedResult // comporator +func matrixEqual(exp, act promql.Matrix) (bool, string) { + if len(exp) != len(act) { + return false, fmt.Sprintf("matrix: length: %d != %d", len(exp), len(act)) + } + + msg := strings.Builder{} + _, _ = msg.WriteString("matrix:\n") + isEqual := true + + for i, v := range exp { + if eq, result := seriesEqual(v, act[i]); !eq { + _, _ = msg.WriteString(result) + isEqual = false + } + } + + if isEqual { + msg.Reset() + } + + return isEqual, msg.String() +} + +// seriesEqual compares two series. +// +//nolint:gocritic // unnamedResult // comporator +func seriesEqual(exp, act promql.Series) (bool, string) { + if !labels.Equal(exp.Metric, act.Metric) { + return false, fmt.Sprintf("labels: %s != %s\n", exp.Metric, act.Metric) + } + + msg := strings.Builder{} + isEqual := true + _, _ = fmt.Fprintf(&msg, "labels: %s\n", exp.Metric) + + if len(exp.Floats) != len(act.Floats) { + _, _ = fmt.Fprintf(&msg, "floats: length: %d != %d\n", len(exp.Floats), len(act.Floats)) + _, _ = fmt.Fprintf(&msg, " exp: %s\n", exp.Floats) + _, _ = fmt.Fprintf(&msg, " act: %s\n", act.Floats) + return false, msg.String() + } + + _, _ = msg.WriteString("floats:\n") + + for i, v := range exp.Floats { + if v.T != act.Floats[i].T || !inEpsilon(v.F, act.Floats[i].F, defaultEpsilon) { + _, _ = fmt.Fprintf(&msg, " %s != %s\n", v, act.Floats[i]) + isEqual = false + } + } + + if isEqual { + msg.Reset() + } + + return isEqual, msg.String() +} + +// inEpsilon checks if two values are within epsilon. +func inEpsilon(expected, actual, epsilon float64) bool { + if math.IsNaN(expected) && math.IsNaN(actual) { + return true + } + + if math.IsNaN(expected) || math.IsNaN(actual) { + return false + } + + if expected == 0 && actual == 0 { + return true + } + + if expected == 0 || actual == 0 { + return false + } + + return calcRelative(expected, actual) <= epsilon +} + +// calcRelative calculates the relative between two values. +func calcRelative(expected, actual float64) float64 { + return math.Abs(expected-actual) / math.Abs(expected) +} + +// +// Benchmark +// + +func BenchmarkRangeQuery(b *testing.B) { + ctx := b.Context() + qo := &querierOptimize{} + qo.setup(ctx, b.TempDir(), func(err error, msgAndArgs ...any) { require.NoError(b, err, msgAndArgs) }) + qo.fillHeadWithCounter(ctx, 3) + defer qo.close() + + queryEngine := promql.NewEngine(promql.EngineOpts{ + Logger: log.NewNopLogger(), + MaxSamples: 100000, + Timeout: 10 * time.Second, + LookbackDelta: qo.lookbackDelta, + NoStepSubqueryIntervalFn: func(int64) int64 { return qo.lookbackDelta.Milliseconds() }, + EnableAtModifier: true, + EnableNegativeOffset: true, + }) + + query := "sum(counter_metric)" + // query := "sum by(value) (counter_metric)" + // query := "max_over_time(counter_metric[3600s])" + + step := qo.step + // step := qo.step * 4 + + b.Run("none", func(b *testing.B) { + b.ResetTimer() + for b.Loop() { + res, err := queryRange(ctx, "none", queryEngine, qo, qo.queryOpts, query, qo.start, qo.end, step) + require.NoError(b, err) + res.qry.Close() + } + }) + + b.Run("all", func(b *testing.B) { + b.ResetTimer() + for b.Loop() { + res, err := queryRange(ctx, "all", queryEngine, qo, qo.queryOpts, query, qo.start, qo.end, step) + require.NoError(b, err) + res.qry.Close() + } + }) +} + +func BenchmarkInstantQuery(b *testing.B) { + ctx := b.Context() + qo := &querierOptimize{} + qo.setup(ctx, b.TempDir(), func(err error, msgAndArgs ...any) { require.NoError(b, err, msgAndArgs) }) + qo.fillHeadWithCounter(ctx, 3) + defer qo.close() + + queryEngine := promql.NewEngine(promql.EngineOpts{ + Logger: log.NewNopLogger(), + MaxSamples: 100000, + Timeout: 10 * time.Second, + LookbackDelta: qo.lookbackDelta, + NoStepSubqueryIntervalFn: func(int64) int64 { return qo.lookbackDelta.Milliseconds() }, + EnableAtModifier: true, + EnableNegativeOffset: true, + }) + + // query := "sum(counter_metric)" + // query := "sum by(value) (counter_metric)" + query := "max_over_time(counter_metric[3600s])" + + ttt := 3700 * time.Second + mid := qo.start.Add(ttt) + // mid := qo.start + // mid := qo.end + + b.Run("none", func(b *testing.B) { + b.ResetTimer() + for b.Loop() { + res, err := queryInstant(ctx, "none", queryEngine, qo, qo.queryOpts, query, mid) + require.NoError(b, err) + res.qry.Close() + } + }) + + b.Run("all", func(b *testing.B) { + b.ResetTimer() + for b.Loop() { + res, err := queryInstant(ctx, "all", queryEngine, qo, qo.queryOpts, query, mid) + require.NoError(b, err) + res.qry.Close() + } + }) +} diff --git a/pp/go/storage/querier/querier_optimize_variables_test.go b/pp/go/storage/querier/querier_optimize_variables_test.go index c4aa406414..5da789160a 100644 --- a/pp/go/storage/querier/querier_optimize_variables_test.go +++ b/pp/go/storage/querier/querier_optimize_variables_test.go @@ -11,7 +11,7 @@ import ( const ( // defaultCountOfSteps is the default count of steps. - defaultCountOfSteps = 480 + defaultCountOfSteps = 240 ) // defaultSteps is the default steps. @@ -73,7 +73,7 @@ func (s *MatrixQuerierOptimizeSuiteSuite) TestQueryRangeWithStep() { s.Require().NoError(err) defer resOpt.qry.Close() - s.Require().Equal(res.res, resOpt.res) + s.Require().True(resultEqual(res.res, resOpt.res, query)) }) }) } diff --git a/pp/go/storage/querier/querier_test.go b/pp/go/storage/querier/querier_test.go index 977868af5a..50801dc0a7 100644 --- a/pp/go/storage/querier/querier_test.go +++ b/pp/go/storage/querier/querier_test.go @@ -42,6 +42,9 @@ type QuerierSuite struct { dataDir string context context.Context head *storage.Head + + hints *prom_storage.SelectHints + scrapeInterval int64 } func TestQuerierSuite(t *testing.T) { @@ -52,6 +55,12 @@ func (s *QuerierSuite) SetupTest() { s.dataDir = s.createDataDirectory() s.context = context.Background() s.head = s.mustCreateHead(1) + s.hints = &prom_storage.SelectHints{ + Start: 0, + End: 200, + Range: 100, + } + s.scrapeInterval = 1 } func (s *QuerierSuite) createDataDirectory() string { @@ -89,7 +98,7 @@ func (s *QuerierSuite) mustCreateHead(unloadDataStorageInterval time.Duration) * } func (s *QuerierSuite) appendTimeSeries(timeSeries []storagetest.TimeSeries) { - storagetest.MustAppendTimeSeries(&s.Suite, s.head, timeSeries) + storagetest.MustAppendTimeSeries(s.T().Context(), s.Require().NoError, s.head, timeSeries) } func (s *QuerierSuite) TestRangeQuery() { @@ -115,7 +124,7 @@ func (s *QuerierSuite) TestRangeQuery() { *shard.LSS, *shard.PerGoroutineShard, *storage.Head, - ](s.head, querier.NewNoOpShardedDeduplicator, 0, 2, nil, nil) + ](s.head, querier.NewNoOpShardedDeduplicator, 0, 2, s.scrapeInterval, nil) defer func() { _ = q.Close() }() matcher, _ := labels.NewMatcher(labels.MatchEqual, "__name__", "metric") @@ -143,7 +152,7 @@ func (s *QuerierSuite) TestRangeQueryWithoutMatching() { *shard.LSS, *shard.PerGoroutineShard, *storage.Head, - ](s.head, querier.NewNoOpShardedDeduplicator, 0, 2, nil, nil) + ](s.head, querier.NewNoOpShardedDeduplicator, 0, 2, s.scrapeInterval, nil) defer func() { _ = q.Close() }() matcher, _ := labels.NewMatcher(labels.MatchEqual, "__name__", "unknown_metric") @@ -196,7 +205,7 @@ func (s *QuerierSuite) TestRangeQueryWithDataStorageLoading() { *shard.LSS, *shard.PerGoroutineShard, *storage.Head, - ](s.head, querier.NewNoOpShardedDeduplicator, 0, 3, nil, nil) + ](s.head, querier.NewNoOpShardedDeduplicator, 0, 3, s.scrapeInterval, nil) defer func() { _ = q.Close() }() matcher, _ := labels.NewMatcher(labels.MatchEqual, "__name__", "metric") @@ -234,7 +243,7 @@ func (s *QuerierSuite) TestInstantQuery() { *shard.LSS, *shard.PerGoroutineShard, *storage.Head, - ](s.head, querier.NewNoOpShardedDeduplicator, 0, 0, nil, nil) + ](s.head, querier.NewNoOpShardedDeduplicator, 0, 0, s.scrapeInterval, nil) defer func() { _ = q.Close() }() matcher, _ := labels.NewMatcher(labels.MatchEqual, "__name__", "metric") @@ -287,7 +296,7 @@ func (s *QuerierSuite) TestInstantQueryWithDataStorageLoading() { *shard.LSS, *shard.PerGoroutineShard, *storage.Head, - ](s.head, querier.NewNoOpShardedDeduplicator, 0, 0, nil, nil) + ](s.head, querier.NewNoOpShardedDeduplicator, 0, 0, s.scrapeInterval, nil) defer func() { _ = q.Close() }() matcher, _ := labels.NewMatcher(labels.MatchEqual, "__name__", "metric") @@ -331,7 +340,7 @@ func (s *QuerierSuite) TestLabelNames() { } s.appendTimeSeries(timeSeries) - q := querier.NewQuerier(s.head, querier.NewNoOpShardedDeduplicator, 0, 2, nil, nil) + q := querier.NewQuerier(s.head, querier.NewNoOpShardedDeduplicator, 0, 2, s.scrapeInterval, nil) defer func() { _ = q.Close() }() matcher, err := labels.NewMatcher(labels.MatchEqual, "__name__", "metric0") s.Require().NoError(err) @@ -364,7 +373,7 @@ func (s *QuerierSuite) TestLabelNamesWithLimit() { } s.appendTimeSeries(timeSeries) - q := querier.NewQuerier(s.head, querier.NewNoOpShardedDeduplicator, 0, 2, nil, nil) + q := querier.NewQuerier(s.head, querier.NewNoOpShardedDeduplicator, 0, 2, s.scrapeInterval, nil) defer func() { _ = q.Close() }() matcher, err := labels.NewMatcher(labels.MatchEqual, "__name__", "metric0") s.Require().NoError(err) @@ -397,7 +406,7 @@ func (s *QuerierSuite) TestLabelNamesNoMatches() { } s.appendTimeSeries(timeSeries) - q := querier.NewQuerier(s.head, querier.NewNoOpShardedDeduplicator, 0, 2, nil, nil) + q := querier.NewQuerier(s.head, querier.NewNoOpShardedDeduplicator, 0, 2, s.scrapeInterval, nil) defer func() { _ = q.Close() }() matcher, err := labels.NewMatcher(labels.MatchEqual, "__name__", "metric3") s.Require().NoError(err) @@ -430,7 +439,7 @@ func (s *QuerierSuite) TestLabelValues() { } s.appendTimeSeries(timeSeries) - q := querier.NewQuerier(s.head, querier.NewNoOpShardedDeduplicator, 0, 2, nil, nil) + q := querier.NewQuerier(s.head, querier.NewNoOpShardedDeduplicator, 0, 2, s.scrapeInterval, nil) defer func() { _ = q.Close() }() matcher, err := labels.NewMatcher(labels.MatchRegexp, "__name__", "metric.*") s.Require().NoError(err) @@ -463,7 +472,7 @@ func (s *QuerierSuite) TestLabelValuesNoMatches() { } s.appendTimeSeries(timeSeries) - q := querier.NewQuerier(s.head, querier.NewNoOpShardedDeduplicator, 0, 2, nil, nil) + q := querier.NewQuerier(s.head, querier.NewNoOpShardedDeduplicator, 0, 2, s.scrapeInterval, nil) defer func() { _ = q.Close() }() matcher, err := labels.NewMatcher(labels.MatchEqual, "__name__", "metric2") s.Require().NoError(err) @@ -496,7 +505,7 @@ func (s *QuerierSuite) TestLabelValuesNoMatchesOnName() { } s.appendTimeSeries(timeSeries) - q := querier.NewQuerier(s.head, querier.NewNoOpShardedDeduplicator, 0, 2, nil, nil) + q := querier.NewQuerier(s.head, querier.NewNoOpShardedDeduplicator, 0, 2, s.scrapeInterval, nil) defer func() { _ = q.Close() }() matcher, err := labels.NewMatcher(labels.MatchRegexp, "__name__", "metric.*") s.Require().NoError(err) diff --git a/pp/go/storage/querier/series.go b/pp/go/storage/querier/series.go index 41bf9e8943..3f4a7d2d42 100644 --- a/pp/go/storage/querier/series.go +++ b/pp/go/storage/querier/series.go @@ -178,12 +178,10 @@ func (s *Series) Iterator(it chunkenc.Iterator) chunkenc.Iterator { type SeriesSet struct { mint, maxt int64 - lssQueryResult *cppbridge.LSSQueryResult labelSetSnapshot *cppbridge.LabelSetSnapshot serializedData *cppbridge.DataStorageSerializedData - lastIndexFromLSSQueryResult int - series []Series + series []Series } func NewSeriesSet( @@ -195,7 +193,6 @@ func NewSeriesSet( return &SeriesSet{ mint: mint, maxt: maxt, - lssQueryResult: lssQueryResult, labelSetSnapshot: labelSetSnapshot, serializedData: serializedData, series: make([]Series, 0, lssQueryResult.Len()), diff --git a/pp/go/storage/querier/stalenan_series_set.go b/pp/go/storage/querier/stalenan_series_set.go index 19862ee8f2..2ee26d7257 100644 --- a/pp/go/storage/querier/stalenan_series_set.go +++ b/pp/go/storage/querier/stalenan_series_set.go @@ -15,16 +15,6 @@ import ( // floatStaleNaN is the float64 representation of the [value.StaleNaN] value. var floatStaleNaN = math.Float64frombits(value.StaleNaN) -// MakeTimestampsSliceWithDefault creates a slice with the default timestamp. -func MakeTimestampsSliceWithDefault(size int, defaultTimestamp int64) []int64 { - timestamps := make([]int64, size) - for i := range timestamps { - timestamps[i] = defaultTimestamp - } - - return timestamps -} - // NewStaleNaNSeriesSliceFromTimestamps creates [StaleNaNSeries] slice from timestamps. func NewStaleNaNSeriesSliceFromTimestamps(timestamps []int64) []StaleNaNSeries { seriesSlice := make([]StaleNaNSeries, len(timestamps)) diff --git a/pp/go/storage/storagetest/fixtures.go b/pp/go/storage/storagetest/fixtures.go index 2afcdb9010..844f575e70 100644 --- a/pp/go/storage/storagetest/fixtures.go +++ b/pp/go/storage/storagetest/fixtures.go @@ -28,7 +28,6 @@ import ( "github.com/prometheus/prometheus/pp/go/storage/querier" promstorage "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunkenc" - "github.com/stretchr/testify/suite" ) // TimeSeries test data. @@ -74,30 +73,33 @@ func (tsd *timeSeriesDataSlice) Destroy() { tsd.timeSeries = nil } +// NoErrorFunc is a function that can be used to assert that an error is nil. +type NoErrorFunc func(err error, msgAndArgs ...any) + // MustAppendTimeSeries add time series to head. -func MustAppendTimeSeries(s *suite.Suite, head *storage.Head, timeSeries []TimeSeries) { +func MustAppendTimeSeries(ctx context.Context, noErrorFunc NoErrorFunc, head *storage.Head, timeSeries []TimeSeries) { headAppender := appender.New(head, services.CFViaRange) statelessRelabeler, err := cppbridge.NewStatelessRelabeler([]*cppbridge.RelabelConfig{}) - s.Require().NoError(err) + noErrorFunc(err) state := cppbridge.NewStateV2WithoutLock() state.SetStatelessRelabeler(statelessRelabeler) for i := range timeSeries { _, err = headAppender.Append( - context.Background(), - NewIncomingData(s, timeSeries[i].toModelTimeSeries()), + ctx, + NewIncomingData(noErrorFunc, timeSeries[i].toModelTimeSeries()), state, true) - s.NoError(err) + noErrorFunc(err) } } -func NewIncomingData(s *suite.Suite, timeSeries []model.TimeSeries) *appender.IncomingData { +func NewIncomingData(noErrorFunc NoErrorFunc, timeSeries []model.TimeSeries) *appender.IncomingData { tsd := timeSeriesDataSlice{timeSeries: timeSeries} hx, err := (cppbridge.HashdexFactory{}).GoModel(tsd.TimeSeries(), cppbridge.DefaultWALHashdexLimits()) - s.Require().NoError(err) + noErrorFunc(err) return &appender.IncomingData{Hashdex: hx, Data: &tsd} } @@ -341,8 +343,8 @@ func StaleNaNQuery( return &querier.StaleNaNSeriesSet{}, nil } - timestamps := querier.MakeTimestampsSliceWithDefault(lssQueryResult.Len(), valueNotFoundTimestampValue) - ds.QueryFirstTimestamps(lssQueryResult.IDs(), timestamps) + timestamps := make([]int64, lssQueryResult.Len()) + ds.QueryFirstTimestamps(lssQueryResult.IDs(), timestamps, valueNotFoundTimestampValue) return querier.NewStaleNaNSeriesSet( querier.NewStaleNaNSeriesSliceFromTimestamps(timestamps), diff --git a/pp/series_data/decoder/decorator/lookback_delta_iterator.h b/pp/series_data/decoder/decorator/lookback_delta_iterator.h index f5ceb664ba..21011ac90b 100644 --- a/pp/series_data/decoder/decorator/lookback_delta_iterator.h +++ b/pp/series_data/decoder/decorator/lookback_delta_iterator.h @@ -1,5 +1,6 @@ #pragma once +#include #include "primitives/primitives.h" #include "series_data/decoder/universal_decode_iterator.h" @@ -65,4 +66,4 @@ class LookbackDeltaIterator { } }; -} // namespace series_data::decoder::decorator \ No newline at end of file +} // namespace series_data::decoder::decorator diff --git a/pp/series_data/decoder/decorator/window_function_iterator.h b/pp/series_data/decoder/decorator/window_function_iterator.h index 12ff090b13..e470204e17 100644 --- a/pp/series_data/decoder/decorator/window_function_iterator.h +++ b/pp/series_data/decoder/decorator/window_function_iterator.h @@ -71,4 +71,4 @@ class WindowFunctionIterator { } }; -} // namespace series_data::decoder::decorator \ No newline at end of file +} // namespace series_data::decoder::decorator diff --git a/util/pool/slicepool.go b/util/pool/slicepool.go new file mode 100644 index 0000000000..7fe5439ce4 --- /dev/null +++ b/util/pool/slicepool.go @@ -0,0 +1,68 @@ +package pool + +import ( + "github.com/prometheus/prometheus/util/zeropool" +) + +// SlicePool is a pool of slices. +type SlicePool[T any] struct { + buckets []zeropool.Pool[[]T] + sizes []int +} + +// NewSlicePool creates a new [SlicePool]. +func NewSlicePool[T any](sizes []int) SlicePool[T] { + if len(sizes) == 0 { + panic("invalid sizes") + } + + for _, size := range sizes { + if size < 0 { + panic("invalid size") + } + } + + buckets := make([]zeropool.Pool[[]T], len(sizes)) + for i, size := range sizes { + buckets[i] = zeropool.New(func() []T { return make([]T, size) }) + } + + return SlicePool[T]{ + buckets: buckets, + sizes: sizes, + } +} + +// Get returns a new slice of the given size. +func (p *SlicePool[T]) Get(size int) []T { + if size < 0 { + panic("invalid size") + } + + for i, bktSize := range p.sizes { + if size > bktSize { + continue + } + + return p.buckets[i].Get()[:size] + } + + return make([]T, size) +} + +// Put adds a slice to the pool. +func (p *SlicePool[T]) Put(item []T) { + // If the item is larger than the largest size in the pool, don't put it back. + if cap(item) > p.sizes[len(p.sizes)-1] { + return + } + + for i, size := range p.sizes { + if cap(item) > size { + continue + } + + p.buckets[i].Put(item) + return + } +} diff --git a/util/pool/slicepool_test.go b/util/pool/slicepool_test.go new file mode 100644 index 0000000000..eb7c7aec3a --- /dev/null +++ b/util/pool/slicepool_test.go @@ -0,0 +1,40 @@ +package pool_test + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/prometheus/prometheus/util/pool" +) + +func TestSlicePool(t *testing.T) { + testPool := pool.NewSlicePool[int]([]int{0, 1, 2, 4}) + + cases := []struct { + size int + expectedCap int + }{ + { + size: 0, + expectedCap: 0, + }, + { + size: 2, + expectedCap: 2, + }, + { + size: 3, + expectedCap: 4, + }, + { + size: 5, + expectedCap: 5, + }, + } + for _, c := range cases { + ret := testPool.Get(c.size) + require.Equal(t, c.expectedCap, cap(ret)) + testPool.Put(ret) + } +}