Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager;
import org.apache.pinot.segment.local.upsert.UpsertContext;
import org.apache.pinot.segment.local.utils.IngestionUtils;
import org.apache.pinot.segment.local.utils.TableConfigUtils;
import org.apache.pinot.segment.spi.MutableSegment;
import org.apache.pinot.segment.spi.V1Constants;
import org.apache.pinot.segment.spi.creator.SegmentVersion;
Expand Down Expand Up @@ -1821,9 +1822,17 @@ public RealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableConf
_isOffHeap = indexLoadingConfig.isRealtimeOffHeapAllocation();
_defaultNullHandlingEnabled = indexingConfig.isNullHandlingEnabled();

// Start new realtime segment
// Start new realtime segment.
// Default path: build from indexLoadingConfig so tier overwrites and any instance-level mutations on
// IndexLoadingConfig flow through unchanged. When a FieldConfig.consumingOverride is configured, switch to a
// Builder constructed from the consuming-override-applied TableConfig so the mutable consuming segment can have
// a different (typically richer) index shape than what is persisted on disk — e.g. the table is RAW for
// storage efficiency but the consuming segment uses dictionary + inverted index for fast filtering. The
// committed/immutable segment ignores the override and uses the persisted table-config shape.
String consumerDir = realtimeTableDataManager.getConsumerDir();
RealtimeSegmentConfig.Builder realtimeSegmentConfigBuilder = new RealtimeSegmentConfig.Builder(indexLoadingConfig)
RealtimeSegmentConfig.Builder realtimeSegmentConfigBuilder =
TableConfigUtils.buildConsumingSegmentConfigBuilder(_tableConfig, _schema, indexLoadingConfig, _segmentLogger);
realtimeSegmentConfigBuilder
.setTableNameWithType(_tableNameWithType)
.setSegmentName(_segmentNameStr)
.setStreamName(streamTopic)
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.pinot.segment.local.segment.creator.TransformPipeline;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.local.utils.IngestionUtils;
import org.apache.pinot.segment.local.utils.TableConfigUtils;
import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory;
import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
Expand Down Expand Up @@ -180,9 +181,13 @@ public StatelessRealtimeSegmentWriter(SegmentZKMetadata segmentZKMetadata, Index
File statsHistoryFile = new File(tableDataDir, SEGMENT_STATS_FILE_NAME);
RealtimeSegmentStatsHistory statsHistory = RealtimeSegmentStatsHistory.deserializeFrom(statsHistoryFile);

// Initialize mutable segment with configurations
// Initialize mutable segment with configurations. Default path: build from indexLoadingConfig (unchanged
// behavior). Only switch to a TableConfig-driven Builder when a FieldConfig.consumingOverride is configured,
// so a column can have a richer in-memory index shape (e.g. dictionary + inverted) than what is persisted.
IngestionConfig ingestionConfig = _tableConfig.getIngestionConfig();
RealtimeSegmentConfig.Builder realtimeSegmentConfigBuilder = new RealtimeSegmentConfig.Builder(indexLoadingConfig)
RealtimeSegmentConfig.Builder realtimeSegmentConfigBuilder =
TableConfigUtils.buildConsumingSegmentConfigBuilder(_tableConfig, _schema, indexLoadingConfig, _logger);
realtimeSegmentConfigBuilder
.setTableNameWithType(_tableNameWithType)
.setSegmentName(_segmentName)
.setStreamName(_streamConfig.getTopicName())
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pinot.segment.local.realtime.converter;

import com.fasterxml.jackson.databind.node.ObjectNode;
import java.io.File;
import java.io.IOException;
import java.math.BigDecimal;
Expand Down Expand Up @@ -47,6 +48,7 @@
import org.apache.pinot.segment.local.segment.index.text.TextIndexConfigBuilder;
import org.apache.pinot.segment.local.segment.store.SegmentLocalFSDirectory;
import org.apache.pinot.segment.local.segment.virtualcolumn.VirtualColumnProviderFactory;
import org.apache.pinot.segment.local.utils.TableConfigUtils;
import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.creator.SegmentVersion;
import org.apache.pinot.segment.spi.index.DictionaryIndexConfig;
Expand All @@ -70,6 +72,7 @@
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.utils.ByteArray;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.ReadMode;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
Expand Down Expand Up @@ -1588,6 +1591,109 @@ private SegmentZKMetadata getSegmentZKMetadata(String segmentName) {
return segmentZKMetadata;
}

@Test
public void testConsumingOverrideKeepsRawShapeOnImmutableSegment()
throws Exception {
// Table config has STRING_COLUMN1 as RAW with no indexes (the persisted/immutable shape). The consumingOverride
// lifts the consuming segment to DICTIONARY + INVERTED for fast filtering during the consuming window. After
// commit, the converter must produce an immutable segment that matches the persisted (RAW) shape — the override
// does NOT influence what gets written to disk.
File tmpDir = new File(TMP_DIR, "tmp_consuming_override_" + System.currentTimeMillis());

ObjectNode override = JsonUtils.newObjectNode();
override.put("encodingType", FieldConfig.EncodingType.DICTIONARY.name());
override.set("indexes",
JsonUtils.newObjectNode().set("inverted", JsonUtils.newObjectNode().put("enabled", true)));
FieldConfig fieldConfigWithOverride = new FieldConfig.Builder(STRING_COLUMN1)
.withEncodingType(FieldConfig.EncodingType.RAW)
.withConsumingOverride(override)
.build();

TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME)
.setTableName("testTableConsumingOverride")
.setTimeColumnName(DATE_TIME_COLUMN)
.setNoDictionaryColumns(List.of(STRING_COLUMN1))
.setFieldConfigList(List.of(fieldConfigWithOverride))
.setColumnMajorSegmentBuilderEnabled(false)
.build();
Schema schema = new Schema.SchemaBuilder()
.setSchemaName("testTableConsumingOverride")
.addSingleValueDimension(STRING_COLUMN1, DataType.STRING)
.addSingleValueDimension(STRING_COLUMN2, DataType.STRING)
.addDateTime(DATE_TIME_COLUMN, DataType.LONG, "1:MILLISECONDS:EPOCH", "1:MILLISECONDS")
.build();

String tableNameWithType = tableConfig.getTableName();
String segmentName = "testTableConsumingOverride__0__0__123456";

// Mutable consuming segment: built from the consuming-override-applied table config so STRING_COLUMN1 has a
// dictionary in memory, just like the production RealtimeSegmentDataManager / StatelessRealtimeSegmentWriter
// paths now do.
TableConfig consumingTableConfig = TableConfigUtils.applyConsumingOverrides(tableConfig);
RealtimeSegmentConfig realtimeSegmentConfig = new RealtimeSegmentConfig.Builder(consumingTableConfig, schema)
.setTableNameWithType(tableNameWithType)
.setSegmentName(segmentName)
.setStreamName(tableNameWithType)
.setSchema(schema)
.setTimeColumnName(DATE_TIME_COLUMN)
.setCapacity(1000)
.setAvgNumMultiValues(3)
.setSegmentZKMetadata(getSegmentZKMetadata(segmentName))
.setOffHeap(true)
.setMemoryManager(new DirectMemoryManager(segmentName))
.setStatsHistory(RealtimeSegmentStatsHistory.deserializeFrom(new File(tmpDir, "stats")))
.setConsumerDir(new File(tmpDir, "consumerDir").getAbsolutePath())
.build();

MutableSegmentImpl mutableSegmentImpl = new MutableSegmentImpl(realtimeSegmentConfig, null);
try {
for (int i = 0; i < 5; i++) {
GenericRow row = new GenericRow();
row.putValue(STRING_COLUMN1, "Hello" + i);
row.putValue(STRING_COLUMN2, "World" + i);
row.putValue(DATE_TIME_COLUMN, 1697814309L + i);
mutableSegmentImpl.index(row, null);
}

/// Sanity-check the consuming side: the override MUST have given STRING_COLUMN1 a dictionary AND an inverted
/// index in memory. If this fails, the integration test will fail too — keep this assertion as a fast
/// reproducer.
assertNotNull(mutableSegmentImpl.getDataSource(STRING_COLUMN1).getDictionary(),
STRING_COLUMN1 + " must have a dictionary on the consuming segment due to consumingOverride");
assertNotNull(mutableSegmentImpl.getDataSource(STRING_COLUMN1).getInvertedIndex(),
STRING_COLUMN1 + " must have an inverted index on the consuming segment due to consumingOverride");

File outputDir = new File(tmpDir, "outputDir");
SegmentZKPropsConfig segmentZKPropsConfig = new SegmentZKPropsConfig();
segmentZKPropsConfig.setStartOffset("1");
segmentZKPropsConfig.setEndOffset("100");
// Converter receives the original (un-overridden) table config; the override is consuming-only.
RealtimeSegmentConverter converter =
new RealtimeSegmentConverter(mutableSegmentImpl, segmentZKPropsConfig, outputDir.getAbsolutePath(), schema,
tableNameWithType, tableConfig, segmentName, false);
converter.build(SegmentVersion.v3);

File indexDir = new File(outputDir, segmentName);
SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(indexDir);
ColumnMetadata col1Metadata = segmentMetadata.getColumnMetadataFor(STRING_COLUMN1);
assertFalse(col1Metadata.hasDictionary(),
STRING_COLUMN1 + " should remain RAW-encoded on the immutable segment — consumingOverride only affects"
+ " the consuming segment");
ColumnMetadata col2Metadata = segmentMetadata.getColumnMetadataFor(STRING_COLUMN2);
assertTrue(col2Metadata.hasDictionary(),
STRING_COLUMN2 + " must keep its dictionary (default DICTIONARY encoding)");

// Inverted index added by the override must not appear on disk — the persisted shape has no inverted index.
try (SegmentDirectory segmentDirectory = new SegmentLocalFSDirectory(indexDir, ReadMode.mmap);
SegmentDirectory.Reader reader = segmentDirectory.createReader()) {
assertFalse(reader.hasIndexFor(STRING_COLUMN1, StandardIndexes.inverted()),
STRING_COLUMN1 + " must not carry an inverted index on the immutable segment");
}
} finally {
mutableSegmentImpl.destroy();
}
}

@AfterMethod
public void tearDownTest() {
FileUtils.deleteQuietly(TMP_DIR);
Expand Down
Loading
Loading