Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
4aee3d6
Merge remote-tracking branch 'origin/promql_cpp' into agg_series_set
u-veles-a May 4, 2026
48c9998
WIP: fix test
u-veles-a May 4, 2026
994326c
WIP: created methods for lss
u-veles-a May 5, 2026
b982bd4
Merge branch 'multiseries_iterator' into agg_series_set
u-veles-a May 7, 2026
d926a61
WIP: fix test
u-veles-a May 7, 2026
58464d2
WIP: add AggSeriesSet
u-veles-a May 8, 2026
3ed9d23
WIP: add test, lookback
u-veles-a May 12, 2026
ec057cc
WIP: add test
u-veles-a May 12, 2026
62c8c6f
WIP: add PROMPP_FEATURES select_func_optimization
u-veles-a May 12, 2026
7451285
WIP: fix naming
u-veles-a May 12, 2026
b92bfef
WIP: add work with stalenan_series_set, fix bugs
u-veles-a May 13, 2026
ea2abd9
WIP: add hints_by
u-veles-a May 14, 2026
c78aaef
Merge branch 'multiseries_iterator' into agg_series_set
u-veles-a May 14, 2026
63ae14a
WIP: fix isCrossSeriesFunc
u-veles-a May 14, 2026
a25c050
Merge branch 'multiseries_iterator' into agg_series_set
u-veles-a May 14, 2026
c5e1f37
Merge branch 'multiseries_iterator' into agg_series_set
u-veles-a May 15, 2026
057fcea
Merge branch 'multiseries_iterator' into agg_series_set
u-veles-a May 18, 2026
cb7d11a
WIP: rebuild flags
u-veles-a May 18, 2026
d755175
WIP: fix adapter
u-veles-a May 18, 2026
01977e8
WIP: rebuild SwitchFuncOptimize
u-veles-a May 18, 2026
97e5987
WIP: fix
u-veles-a May 18, 2026
834b158
WIP: small fix
u-veles-a May 18, 2026
d8950ed
WIP: rebuild SetSelectFuncOptimize
u-veles-a May 19, 2026
584b51b
WIP: rebuild flag select_func_optimization
u-veles-a May 19, 2026
5a7efe6
WIP: fix and test
u-veles-a May 19, 2026
f4e0596
Merge branch 'multiseries_iterator' into agg_series_set
u-veles-a May 27, 2026
2f1ba7e
WIP: add cross test
u-veles-a May 28, 2026
9c52ba2
Merge branch 'multiseries_iterator' into agg_series_set
u-veles-a May 28, 2026
70a406d
Merge branch 'multiseries_iterator' into agg_series_set
u-veles-a May 29, 2026
8bf7cf8
WIP: add result equal
u-veles-a May 29, 2026
daa80dd
Merge branch 'multiseries_iterator' into agg_series_set
u-veles-a May 29, 2026
6a7126c
WIP: clearing
u-veles-a May 29, 2026
d81d41d
WIP: refactoring test
u-veles-a May 29, 2026
9776f65
Merge branch 'multiseries_iterator' into agg_series_set
u-veles-a May 29, 2026
27c19e4
Merge branch 'multiseries_iterator' into agg_series_set
u-veles-a May 29, 2026
b3042e7
WIP: add BenchmarkRangeQuery
u-veles-a May 29, 2026
bca8982
WIP: add benchmark, opt
u-veles-a Jun 2, 2026
e8882ed
WIP: fix iterator dtor
u-veles-a Jun 2, 2026
f855e64
Merge branch 'multiseries_iterator' into agg_series_set
u-veles-a Jun 4, 2026
16725ba
WIP: add iterator reset
u-veles-a Jun 4, 2026
d8893be
WIP: add scrape interval
u-veles-a Jun 4, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion cmd/prometheus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2240,7 +2240,7 @@ func readPromPPFeatures(logger log.Logger) {
remotewriter.DefaultSampleAgeLimit = defaultSampleAgeLimit

case "select_func_optimization":
if err := querier.SetSelectFuncOptimize(strings.TrimSpace(fvalue)); err != nil {
if err := selectFuncOptimization(strings.TrimSpace(fvalue)); err != nil {
level.Error(logger).Log(
"msg", "[FEATURE] Error parsing select_func_optimization value",
"err", err,
Expand All @@ -2255,3 +2255,15 @@ func readPromPPFeatures(logger log.Logger) {
}
}
}

// selectFuncOptimization sets the select function optimization.
func selectFuncOptimization(fvalue string) error {
for opt := range strings.SplitSeq(fvalue, "|") {
if err := querier.SetSelectFuncOptimize(opt); err != nil {
querier.SetDefaultOptimizeType() // reset to default if error
return err
}
}

return nil
}
26 changes: 19 additions & 7 deletions pp-pkg/storage/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type Adapter struct {
hashdexLimits atomic.Value // stores cppbridge.WALHashdexLimits
transparentState *cppbridge.StateV2
mergeOutOfOrderChunks func()
scrapeInterval atomic.Int64

// stat
activeQuerierMetrics *querier.Metrics
Expand Down Expand Up @@ -132,7 +133,10 @@ func (ar *Adapter) AppendSnappyProtobuf(
state *cppbridge.StateV2,
commitToWal bool,
) error {
hx, err := cppbridge.NewWALSnappyProtobufHashdex(compressedData.Bytes(), ar.hashdexLimits.Load().(cppbridge.WALHashdexLimits))
hx, err := cppbridge.NewWALSnappyProtobufHashdex(
compressedData.Bytes(),
ar.hashdexLimits.Load().(cppbridge.WALHashdexLimits),
)
compressedData.Destroy()
if err != nil {
return err
Expand Down Expand Up @@ -247,11 +251,12 @@ func (ar *Adapter) ChunkQuerier(mint, maxt int64) (storage.ChunkQuerier, error)
// Label limit fields follow the same 0 = no limit semantics as scrape config.
func (ar *Adapter) ApplyConfig(cfg *config.Config) error {
limits := cppbridge.WALHashdexLimits{
MaxLabelNamesPerTimeseries: uint32(cfg.GlobalConfig.LabelLimit),
MaxLabelNameLength: uint32(cfg.GlobalConfig.LabelNameLengthLimit),
MaxLabelValueLength: uint32(cfg.GlobalConfig.LabelValueLengthLimit),
MaxLabelNamesPerTimeseries: uint32(cfg.GlobalConfig.LabelLimit), // #nosec G115 // no overflow
MaxLabelNameLength: uint32(cfg.GlobalConfig.LabelNameLengthLimit), // #nosec G115 // no overflow
MaxLabelValueLength: uint32(cfg.GlobalConfig.LabelValueLengthLimit), // #nosec G115 // no overflow
}
ar.hashdexLimits.Store(limits)
ar.scrapeInterval.Store(int64(cfg.GlobalConfig.ScrapeInterval))
return nil
}

Expand All @@ -269,7 +274,7 @@ func (ar *Adapter) HeadQuerier(mint, maxt int64) (storage.Querier, error) {
querier.NewNoOpShardedDeduplicator,
mint,
maxt,
nil,
ar.scrapeInterval.Load(),
ar.activeQuerierMetrics,
), nil
}
Expand All @@ -296,7 +301,14 @@ func (ar *Adapter) Querier(mint, maxt int64) (storage.Querier, error) {
ahead := ar.proxy.Get()
queriers = append(
queriers,
querier.NewQuerier(ahead, querier.NewNoOpShardedDeduplicator, mint, maxt, nil, ar.activeQuerierMetrics),
querier.NewQuerier(
ahead,
querier.NewNoOpShardedDeduplicator,
mint,
maxt,
ar.scrapeInterval.Load(),
ar.activeQuerierMetrics,
),
)

for _, head := range ar.proxy.Heads() {
Expand All @@ -316,7 +328,7 @@ func (ar *Adapter) Querier(mint, maxt int64) (storage.Querier, error) {
querier.NewNoOpShardedDeduplicator,
mint,
maxt,
nil,
ar.scrapeInterval.Load(),
ar.storageQuerierMetrics,
),
)
Expand Down
16 changes: 14 additions & 2 deletions pp-pkg/storage/batch_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,23 @@ func (bs *BatchStorage) Commit(ctx context.Context) error {
// CommitWithState adds aggregated series from [pp_storage.TransactionHead] to the [Head] with [cppbridge.StateV2].
func (bs *BatchStorage) CommitWithState(ctx context.Context, state *cppbridge.StateV2) error {
s := bs.transactionHead.Shards()[0]
_, err := bs.adapter.AppendGoHeadHashdex(ctx, cppbridge.NewGoHeadHashdex(s.LSS().Target(), s.DataStorage().Raw()), state, false)
_, err := bs.adapter.AppendGoHeadHashdex(
ctx,
cppbridge.NewGoHeadHashdex(s.LSS().Target(), s.DataStorage().Raw()),
state,
false,
)
return err
}

// Querier calls f() with the given parameters. Returns a [querier.Querier].
func (bs *BatchStorage) Querier(mint, maxt int64) (storage.Querier, error) {
return querier.NewQuerier(bs.transactionHead, querier.NewNoOpShardedDeduplicator, mint, maxt, nil, nil), nil
return querier.NewQuerier(
bs.transactionHead,
querier.NewNoOpShardedDeduplicator,
mint,
maxt,
bs.adapter.scrapeInterval.Load(),
nil,
), nil
}
22 changes: 13 additions & 9 deletions pp/entrypoint/series_data_data_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -219,27 +219,31 @@ extern "C" void prompp_series_data_data_storage_query_final(void* args) {
}
}

extern "C" void prompp_series_data_data_storage_query_first_timestamps(void* args, void* res) {
extern "C" void prompp_series_data_data_storage_query_first_timestamps(void* args) {
using PromPP::Primitives::Timestamp;
using series_data::Decoder;

struct Arguments {
DataStoragePtr data_storage;
Timestamp not_found_timestamp_value;
SliceView<LabelSetID> series_ids;
};

struct Result {
Slice<Timestamp> timestamps;
};

const auto in = static_cast<Arguments*>(args);
const auto out = static_cast<Result*>(res);

assert(in->series_ids.size() == out->timestamps.size());
assert(in->series_ids.size() == in->timestamps.size());
const auto& storage = *in->data_storage;

std::ranges::transform(in->series_ids, out->timestamps.begin(),
[&storage](uint32_t series_id) { return Decoder::get_series_min_timestamp(storage, series_id); });
std::ranges::transform(in->series_ids, in->timestamps.begin(), [&storage, in](uint32_t series_id) {
if (storage.open_chunks.size() > series_id) [[likely]] {
if (!storage.open_chunks[series_id].is_empty()) [[likely]] {
return Decoder::get_series_min_timestamp(storage, series_id);
}
}

return in->not_found_timestamp_value;
});
}

extern "C" void prompp_series_data_data_storage_allocated_memory(void* args, void* res) {
Expand Down Expand Up @@ -497,4 +501,4 @@ extern "C" void prompp_series_data_data_storage_loader_dtor(void* args) {
};

static_cast<Arguments*>(args)->~Arguments();
}
}
11 changes: 5 additions & 6 deletions pp/entrypoint/series_data_data_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,14 +144,13 @@ void prompp_series_data_data_storage_instant_query(void* args, void* res);
* @brief Get the first sample timestamp per series
*
* @param args {
* dataStorage uintptr // pointer to constructed data storage
* seriesIds []uint32 // series ids
* }
* @param res {
* timestamps []int64 // same length as seriesIds; filled from storage
* dataStorage uintptr // pointer to constructed data storage
* notFoundTimestampValue int64 // timestamp value to return if series is not found in storage
* seriesIds []uint32 // series ids
* timestamps []int64 // same length as seriesIds; filled from storage
* }
*/
void prompp_series_data_data_storage_query_first_timestamps(void* args, void* res);
void prompp_series_data_data_storage_query_first_timestamps(void* args);

/**
* @brief finishes all Queriers after data load.
Expand Down
4 changes: 2 additions & 2 deletions pp/go/cppbridge/data_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ func (ds *DataStorage) InstantQuery(targetTimestamp int64, labelSetIDs []uint32,
}

// QueryFirstTimestamps fills timestamps with the first sample timestamp (Prometheus ms) for each series in seriesIDs
func (ds *DataStorage) QueryFirstTimestamps(seriesIDs []uint32, timestamps []int64) {
seriesDataDataStorageQueryFirstTimestamps(ds.dataStorage, seriesIDs, timestamps)
func (ds *DataStorage) QueryFirstTimestamps(seriesIDs []uint32, timestamps []int64, notFoundTimestampValue int64) {
seriesDataDataStorageQueryFirstTimestamps(ds.dataStorage, notFoundTimestampValue, seriesIDs, timestamps)
runtime.KeepAlive(ds)
}

Expand Down
27 changes: 14 additions & 13 deletions pp/go/cppbridge/entrypoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -1573,24 +1573,22 @@ func primitivesLSSQueryLabelValues(lss uintptr, label_name string, matchers []mo
return res.status, res.values
}

func primitivesLSSGetLabelNameIDs(lss uintptr, names []string) []uint32 {
func primitivesLSSGetLabelNameIDs(lss uintptr, names []string, nameIDs []uint32) {
args := struct {
lss uintptr
names []string
}{lss, names}

res := struct {
outIDs []uint32
}{make([]uint32, len(names))}
}{nameIDs}

testGC()
fastcgo.UnsafeCall2(
C.prompp_primitives_lss_get_label_name_ids,
uintptr(unsafe.Pointer(&args)),
uintptr(unsafe.Pointer(&res)),
)

return res.outIDs
}

func primitivesLSSCreateSnapshotLSS(lss uintptr) uintptr {
Expand Down Expand Up @@ -2145,20 +2143,23 @@ func seriesDataDataStorageInstantQuery(dataStorage uintptr, labelSetIDs []uint32
return res
}

func seriesDataDataStorageQueryFirstTimestamps(dataStorage uintptr, seriesIDs []uint32, timestamps []int64) {
func seriesDataDataStorageQueryFirstTimestamps(
dataStorage uintptr,
notFoundTimestampValue int64,
seriesIDs []uint32,
timestamps []int64,
) {
args := struct {
dataStorage uintptr
seriesIDs []uint32
}{dataStorage, seriesIDs}
res := struct {
timestamps []int64
}{timestamps}
dataStorage uintptr
notFoundTimestampValue int64
seriesIDs []uint32
timestamps []int64
}{dataStorage, notFoundTimestampValue, seriesIDs, timestamps}

testGC()
fastcgo.UnsafeCall2(
fastcgo.UnsafeCall1(
C.prompp_series_data_data_storage_query_first_timestamps,
uintptr(unsafe.Pointer(&args)),
uintptr(unsafe.Pointer(&res)),
)
}

Expand Down
11 changes: 5 additions & 6 deletions pp/go/cppbridge/entrypoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -1507,14 +1507,13 @@ void prompp_series_data_data_storage_instant_query(void* args, void* res);
* @brief Get the first sample timestamp per series
*
* @param args {
* dataStorage uintptr // pointer to constructed data storage
* seriesIds []uint32 // series ids
* }
* @param res {
* timestamps []int64 // same length as seriesIds; filled from storage
* dataStorage uintptr // pointer to constructed data storage
* notFoundTimestampValue int64 // timestamp value to return if series is not found in storage
* seriesIds []uint32 // series ids
* timestamps []int64 // same length as seriesIds; filled from storage
* }
*/
void prompp_series_data_data_storage_query_first_timestamps(void* args, void* res);
void prompp_series_data_data_storage_query_first_timestamps(void* args);

/**
* @brief finishes all Queriers after data load.
Expand Down
46 changes: 37 additions & 9 deletions pp/go/cppbridge/head_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (

"github.com/prometheus/prometheus/pp/go/storage/querier"
"github.com/prometheus/prometheus/storage"
"github.com/stretchr/testify/require"

"github.com/prometheus/prometheus/pp/go/cppbridge"
"github.com/prometheus/prometheus/pp/go/model"
Expand Down Expand Up @@ -183,7 +182,7 @@ func (s *HeadSuite) TestInstantQuery() {
// Arrange
dataStorage := cppbridge.NewDataStorage()
encoder := cppbridge.NewHeadEncoderWithDataStorage(dataStorage)
var series = []struct {
series := []struct {
SeriesID uint32
cppbridge.Sample
}{
Expand Down Expand Up @@ -213,19 +212,19 @@ func (s *HeadSuite) TestInstantQuery() {
result := dataStorage.InstantQuery(targetTimestamp, seriesIDs, uintptr(unsafe.Pointer(unsafe.SliceData(instantSeries))))

// Assert
require.Equal(s.T(), cppbridge.DataStorageQueryStatusSuccess, result.Status)
s.Require().Equal(cppbridge.DataStorageQueryStatusSuccess, result.Status)

s.Equal(defaultTimestamp, instantSeries[0].Timestamp)
s.Equal(series[2].Sample, cppbridge.Sample{Timestamp: instantSeries[1].Timestamp, Value: instantSeries[1].Value})
s.Equal(series[5].Sample, cppbridge.Sample{Timestamp: instantSeries[2].Timestamp, Value: instantSeries[2].Value})
s.Equal(series[6].Sample, cppbridge.Sample{Timestamp: instantSeries[3].Timestamp, Value: instantSeries[3].Value})
s.Equal(cppbridge.Sample{Timestamp: instantSeries[1].Timestamp, Value: instantSeries[1].Value}, series[2].Sample)
s.Equal(cppbridge.Sample{Timestamp: instantSeries[2].Timestamp, Value: instantSeries[2].Value}, series[5].Sample)
s.Equal(cppbridge.Sample{Timestamp: instantSeries[3].Timestamp, Value: instantSeries[3].Value}, series[6].Sample)
}

func (s *HeadSuite) TestQueryFirstTimestampsWithEmptySeriesIds() {
// Arrange

// Act
s.dataStorage.QueryFirstTimestamps(nil, nil)
s.dataStorage.QueryFirstTimestamps(nil, nil, 0)

// Assert
}
Expand All @@ -242,7 +241,7 @@ func (s *HeadSuite) TestQueryFirstTimestamps() {

// Act
timestamps := make([]int64, 2)
s.dataStorage.QueryFirstTimestamps([]uint32{1, 0}, timestamps)
s.dataStorage.QueryFirstTimestamps([]uint32{1, 0}, timestamps, 0)

// Assert
s.Equal([]int64{2, 5}, timestamps)
Expand All @@ -259,12 +258,41 @@ func (s *HeadSuite) TestQueryFirstTimestampsInFinalizedChunk() {

// Act
timestamps := make([]int64, 1)
s.dataStorage.QueryFirstTimestamps([]uint32{0}, timestamps)
s.dataStorage.QueryFirstTimestamps([]uint32{0}, timestamps, 0)

// Assert
s.Equal([]int64{5}, timestamps)
}

func (s *HeadSuite) TestQueryFirstTimestampsRotatedLSS() {
// Arrange
s.lss.FindOrEmplace(model.NewLabelSetBuilder().Set("job", "1").Build())
s.lss.FindOrEmplace(model.NewLabelSetBuilder().Set("job", "2").Build())

// Act
timestamps := make([]int64, 2)
s.dataStorage.QueryFirstTimestamps([]uint32{1, 0}, timestamps, -1)

// Assert
s.Equal([]int64{-1, -1}, timestamps)
}

func (s *HeadSuite) TestQueryFirstTimestampsRotatedLSSWithEmptySeries() {
// Arrange
s.lss.FindOrEmplace(model.NewLabelSetBuilder().Set("job", "1").Build())
s.lss.FindOrEmplace(model.NewLabelSetBuilder().Set("job", "2").Build())

s.encoder.Encode(1, 5, 1.0)
s.encoder.Encode(1, 9, 1.0)

// Act
timestamps := make([]int64, 2)
s.dataStorage.QueryFirstTimestamps([]uint32{1, 0}, timestamps, -1)

// Assert
s.Equal([]int64{5, -1}, timestamps)
}

type DataStorageSerializedDataMultiSeriesIteratorSuite struct {
suite.Suite
lss *cppbridge.LabelSetStorage
Expand Down
3 changes: 2 additions & 1 deletion pp/go/cppbridge/lss_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,13 @@ func (lss *LabelSetSnapshot) Query(selector uintptr) *LSSQueryResult {
return result
}

// SeriesGroups group series by label names.
type SeriesGroups struct {
Groups [][]uint32
}

// GroupSeriesByLabelNames group series by label names
func (lss *LabelSetSnapshot) GroupSeriesByLabelNames(seriesIDs []uint32, labelNameIDs []uint32) *SeriesGroups {
func (lss *LabelSetSnapshot) GroupSeriesByLabelNames(seriesIDs, labelNameIDs []uint32) *SeriesGroups {
result := &SeriesGroups{
Groups: primitivesGroupSeriesByLabelNames(lss.pointer, seriesIDs, labelNameIDs),
}
Expand Down
Loading
Loading