diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java index 2e40a399f988..9c705203c7e4 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java @@ -102,6 +102,11 @@ public enum ServerMeter implements AbstractMetrics.Meter { SEGMENT_DOWNLOAD_FROM_REMOTE_FAILURES("segments", false), SEGMENT_DOWNLOAD_FROM_PEERS_FAILURES("segments", false), SEGMENT_BUILD_FAILURE("segments", false), + /// Counts consuming-segment-build events where `FieldConfig.consumingOverride` merge failed and the + /// server fell back to the persisted shape. A non-zero value means at least one consuming segment is running + /// with the wrong (un-overridden) in-memory shape — operators should inspect server logs and fix the override + /// config. Emitted per-table via `addMeteredTableValue`. + CONSUMING_OVERRIDE_FALLBACK("segments", false), SEGMENT_UPLOAD_FAILURE("segments", false), SEGMENT_UPLOAD_SUCCESS("segments", false), // Emitted only by Server to Deep-store segment uploader. diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java index fa423c7d081c..34b85a059a7f 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java @@ -69,6 +69,7 @@ import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager; import org.apache.pinot.segment.local.upsert.UpsertContext; import org.apache.pinot.segment.local.utils.IngestionUtils; +import org.apache.pinot.segment.local.utils.TableConfigUtils; import org.apache.pinot.segment.spi.MutableSegment; import org.apache.pinot.segment.spi.V1Constants; import org.apache.pinot.segment.spi.creator.SegmentVersion; @@ -1821,9 +1822,19 @@ public RealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableConf _isOffHeap = indexLoadingConfig.isRealtimeOffHeapAllocation(); _defaultNullHandlingEnabled = indexingConfig.isNullHandlingEnabled(); - // Start new realtime segment + // Start new realtime segment. + // Default path: build from indexLoadingConfig so tier overwrites and any instance-level mutations on + // IndexLoadingConfig flow through unchanged. When a FieldConfig.consumingOverride is configured, switch to a + // Builder constructed from the consuming-override-applied TableConfig so the mutable consuming segment can have + // a different (typically richer) index shape than what is persisted on disk — e.g. the table is RAW for + // storage efficiency but the consuming segment uses dictionary + inverted index for fast filtering. The + // committed/immutable segment ignores the override and uses the persisted table-config shape. String consumerDir = realtimeTableDataManager.getConsumerDir(); - RealtimeSegmentConfig.Builder realtimeSegmentConfigBuilder = new RealtimeSegmentConfig.Builder(indexLoadingConfig) + RealtimeSegmentConfig.Builder realtimeSegmentConfigBuilder = + TableConfigUtils.buildConsumingSegmentConfigBuilder(_tableConfig, _schema, indexLoadingConfig, _segmentLogger, + () -> _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.CONSUMING_OVERRIDE_FALLBACK, + 1L)); + realtimeSegmentConfigBuilder .setTableNameWithType(_tableNameWithType) .setSegmentName(_segmentNameStr) .setStreamName(streamTopic) diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ConsumingOverrideRealtimeTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ConsumingOverrideRealtimeTest.java new file mode 100644 index 000000000000..91fa5817c6fd --- /dev/null +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ConsumingOverrideRealtimeTest.java @@ -0,0 +1,386 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.integration.tests.custom; + +import com.fasterxml.jackson.databind.node.ObjectNode; +import java.io.File; +import java.util.Collections; +import java.util.List; +import javax.annotation.Nullable; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericData; +import org.apache.helix.model.ExternalView; +import org.apache.pinot.common.utils.LLCSegmentName; +import org.apache.pinot.segment.local.data.manager.SegmentDataManager; +import org.apache.pinot.segment.local.data.manager.TableDataManager; +import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl; +import org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl; +import org.apache.pinot.segment.local.segment.store.SegmentLocalFSDirectory; +import org.apache.pinot.segment.spi.ColumnMetadata; +import org.apache.pinot.segment.spi.IndexSegment; +import org.apache.pinot.segment.spi.SegmentMetadata; +import org.apache.pinot.segment.spi.datasource.DataSource; +import org.apache.pinot.segment.spi.index.StandardIndexes; +import org.apache.pinot.segment.spi.store.SegmentDirectory; +import org.apache.pinot.server.starter.helix.BaseServerStarter; +import org.apache.pinot.spi.config.table.FieldConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.JsonUtils; +import org.apache.pinot.spi.utils.ReadMode; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.apache.pinot.util.TestUtils; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; + + +/// End-to-end coverage for [FieldConfig#getConsumingOverride()] on a realtime table: +/// +/// 1. The table persists `STRING_COLUMN` as RAW with no inverted index. +/// 2. The `consumingOverride` lifts it to `DICTIONARY` + `INVERTED` for the in-memory consuming segment so that +/// filters during the consumption window can hit an inverted index. +/// 3. After [#forceCommitAndWait()] commits the consuming segments, the immutable segments on disk must match +/// the persisted shape (RAW + no inverted index, no dictionary), proving that the override never leaks into the +/// committed segment. +/// +/// Comprehensive checks executed in [#testConsumingOverrideEndToEnd()]: +/// +/// - Pre-commit: walks each server's [TableDataManager], asserts every consuming segment is a +/// [MutableSegmentImpl], and verifies via the in-memory [DataSource] that `STRING_COLUMN` has both a dictionary +/// and an inverted index — i.e. the override took effect. +/// - Post-commit: walks each server again, finds every [ImmutableSegmentImpl], asserts the on-disk +/// [ColumnMetadata] (`hasDictionary()`) AND the on-disk [SegmentDirectory] reader (`hasIndexFor`) confirm the +/// persisted shape is RAW with no dictionary and no inverted index. Also verifies that an unrelated dimension +/// column (`STRING_COLUMN_NO_OVERRIDE`) keeps its default DICTIONARY shape on both the consuming and immutable +/// segments — guards against the override scrub being too aggressive. +/// - Query parity: the same SQL queries return identical row counts before and after commit, so the override does +/// not affect query semantics — only the in-memory index that serves them. +@Test(suiteName = "CustomClusterIntegrationTest") +public class ConsumingOverrideRealtimeTest extends CustomDataQueryClusterIntegrationTest { + private static final String TABLE_NAME = "ConsumingOverrideRealtimeTest"; + private static final int NUM_DOCS = 200; + + /// Carries the `consumingOverride`: persisted RAW (no dictionary, no inverted), consuming DICTIONARY + INVERTED. + private static final String STRING_COLUMN = "stringCol"; + /// Control column: no override anywhere; expected to be DICTIONARY-encoded both pre- and post-commit. + private static final String STRING_COLUMN_NO_OVERRIDE = "stringColNoOverride"; + private static final String INT_COLUMN = "intCol"; + private static final String TIME_COLUMN = "tsMillis"; + + /// Distinct values cycled through `STRING_COLUMN` so equality filters have non-trivial selectivity. + private static final String[] STRING_VALUES = {"alpha", "beta", "gamma", "delta"}; + + @Override + public String getTableName() { + return TABLE_NAME; + } + + @Override + public boolean isRealtimeTable() { + return true; + } + + @Override + protected long getCountStarResult() { + return NUM_DOCS; + } + + @Override + protected int getRealtimeSegmentFlushSize() { + /// Keep flush size larger than NUM_DOCS so that the entire dataset stays in the consuming segment until the + /// test explicitly forceCommits — letting the test inspect the consuming-side shape with full data. + return NUM_DOCS * 10; + } + + @Override + public Schema createSchema() { + return new Schema.SchemaBuilder().setSchemaName(getTableName()) + .addSingleValueDimension(STRING_COLUMN, FieldSpec.DataType.STRING) + .addSingleValueDimension(STRING_COLUMN_NO_OVERRIDE, FieldSpec.DataType.STRING) + .addMetric(INT_COLUMN, FieldSpec.DataType.INT) + .addDateTimeField(TIME_COLUMN, FieldSpec.DataType.LONG, "1:MILLISECONDS:EPOCH", "1:MILLISECONDS") + .build(); + } + + @Override + public String getTimeColumnName() { + return TIME_COLUMN; + } + + @Nullable + @Override + protected String getSortedColumn() { + /// No sorted column: validation rejects consumingOverride on a sorted column, and we want the override active. + return null; + } + + @Override + protected List getNoDictionaryColumns() { + /// Persisted shape: STRING_COLUMN is RAW. STRING_COLUMN_NO_OVERRIDE stays default (DICTIONARY). + return Collections.singletonList(STRING_COLUMN); + } + + @Override + protected List getInvertedIndexColumns() { + /// Persisted shape: no inverted index anywhere. + return List.of(); + } + + @Override + protected List getFieldConfigs() { + /// FieldConfig drives the override: persisted RAW, consuming DICTIONARY + INVERTED via the typed `indexes` tree. + ObjectNode override = JsonUtils.newObjectNode(); + override.put("encodingType", FieldConfig.EncodingType.DICTIONARY.name()); + override.set("indexes", + JsonUtils.newObjectNode().set("inverted", JsonUtils.newObjectNode().put("enabled", true))); + FieldConfig overridden = new FieldConfig.Builder(STRING_COLUMN) + .withEncodingType(FieldConfig.EncodingType.RAW) + .withConsumingOverride(override) + .build(); + return List.of(overridden); + } + + protected TableConfig createRealtimeTableConfig(File sampleAvroFile) { + AvroFileSchemaKafkaAvroMessageDecoder._avroFile = sampleAvroFile; + return new TableConfigBuilder(TableType.REALTIME) + .setTableName(getTableName()) + .setStreamConfigs(getStreamConfigs()) + .setTimeColumnName(getTimeColumnName()) + .setSortedColumn(getSortedColumn()) + .setInvertedIndexColumns(getInvertedIndexColumns()) + .setNoDictionaryColumns(getNoDictionaryColumns()) + .setFieldConfigList(getFieldConfigs()) + .setNumReplicas(getNumReplicas()) + .build(); + } + + @Override + public List createAvroFiles() + throws Exception { + org.apache.avro.Schema avroSchema = org.apache.avro.Schema.createRecord("myRecord", null, null, false); + org.apache.avro.Schema stringSchema = org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING); + org.apache.avro.Schema intSchema = org.apache.avro.Schema.create(org.apache.avro.Schema.Type.INT); + org.apache.avro.Schema longSchema = org.apache.avro.Schema.create(org.apache.avro.Schema.Type.LONG); + avroSchema.setFields(List.of( + new org.apache.avro.Schema.Field(STRING_COLUMN, stringSchema, null, null), + new org.apache.avro.Schema.Field(STRING_COLUMN_NO_OVERRIDE, stringSchema, null, null), + new org.apache.avro.Schema.Field(INT_COLUMN, intSchema, null, null), + new org.apache.avro.Schema.Field(TIME_COLUMN, longSchema, null, null))); + + long now = System.currentTimeMillis(); + try (AvroFilesAndWriters avroFilesAndWriters = createAvroFilesAndWriters(avroSchema)) { + List> writers = avroFilesAndWriters.getWriters(); + for (int i = 0; i < NUM_DOCS; i++) { + GenericData.Record record = new GenericData.Record(avroSchema); + record.put(STRING_COLUMN, STRING_VALUES[i % STRING_VALUES.length]); + record.put(STRING_COLUMN_NO_OVERRIDE, "fixed"); + record.put(INT_COLUMN, i); + record.put(TIME_COLUMN, now + i); + writers.get(i % getNumAvroFiles()).append(record); + } + return avroFilesAndWriters.getAvroFiles(); + } + } + + @Test + public void testConsumingOverrideEndToEnd() + throws Exception { + String realtimeTableName = TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(getTableName()); + + /// 1. Wait until all docs land in the consuming segment. + waitForAllDocsLoaded(60_000L); + + /// 2. Sanity-check the query result before commit. + long countAlphaPreCommit = queryStringCount(STRING_VALUES[0]); + long expectedAlphaCount = (NUM_DOCS + STRING_VALUES.length - 1) / STRING_VALUES.length; + assertEquals(countAlphaPreCommit, expectedAlphaCount, + "Pre-commit count for " + STRING_VALUES[0] + " mismatch"); + + /// 3. Inspect the consuming segment(s) on each server: STRING_COLUMN MUST have a dictionary AND an inverted + /// index (the override is in effect). STRING_COLUMN_NO_OVERRIDE MUST also have a dictionary (default). + int consumingSegmentsInspected = 0; + int totalDocsAcrossConsumingSegments = 0; + for (BaseServerStarter serverStarter : getSharedServerStarters()) { + TableDataManager tableDataManager = serverStarter.getServerInstance().getInstanceDataManager() + .getTableDataManager(realtimeTableName); + if (tableDataManager == null) { + continue; + } + List segmentDataManagers = tableDataManager.acquireAllSegments(); + try { + for (SegmentDataManager sdm : segmentDataManagers) { + IndexSegment segment = sdm.getSegment(); + assertTrue(segment instanceof MutableSegmentImpl, + "Pre-commit segment must be mutable, got: " + segment.getClass().getSimpleName() + + " for segment " + sdm.getSegmentName()); + DataSource overriddenDs = segment.getDataSource(STRING_COLUMN); + assertNotNull(overriddenDs, "DataSource missing for " + STRING_COLUMN + " on consuming segment"); + assertNotNull(overriddenDs.getDictionary(), + STRING_COLUMN + " must have a dictionary on the consuming segment due to consumingOverride"); + assertNotNull(overriddenDs.getInvertedIndex(), + STRING_COLUMN + " must have an inverted index on the consuming segment due to consumingOverride"); + DataSource controlDs = segment.getDataSource(STRING_COLUMN_NO_OVERRIDE); + assertNotNull(controlDs.getDictionary(), + STRING_COLUMN_NO_OVERRIDE + " must keep its default dictionary on the consuming segment"); + consumingSegmentsInspected++; + totalDocsAcrossConsumingSegments += segment.getSegmentMetadata().getTotalDocs(); + } + } finally { + for (SegmentDataManager sdm : segmentDataManagers) { + tableDataManager.releaseSegment(sdm); + } + } + } + assertTrue(consumingSegmentsInspected > 0, "Expected at least one consuming segment to be inspected"); + /// Total docs summed across every replica must be exactly NUM_DOCS * numReplicas — guards against silent drift + /// where one replica double-consumes and another drops a row (integer-division check would mask that). + assertEquals(totalDocsAcrossConsumingSegments, NUM_DOCS * getNumReplicas(), + "Total docs across all consuming-segment replicas must equal NUM_DOCS * numReplicas"); + + /// 4. Force-commit consuming segments to seal them as immutable, and wait for ExternalView to reflect ONLINE. + forceCommitAndWait(realtimeTableName); + + /// 5. Query parity: counts must be identical post-commit. + long countAlphaPostCommit = queryStringCount(STRING_VALUES[0]); + assertEquals(countAlphaPostCommit, countAlphaPreCommit, + "Post-commit count for " + STRING_VALUES[0] + " must match pre-commit (override does not affect semantics)"); + + /// 6. Inspect every segment on every server. Sealed (immutable) segments MUST carry the persisted RAW shape + /// (no dictionary, no inverted index) on disk. New consuming segments that came up after force-commit + /// MUST also have the override applied — guards against a regression where the override is honored only + /// on first-ever segment creation but lost on subsequent rollovers. + int immutableSegmentsInspected = 0; + int newConsumingSegmentsInspected = 0; + for (BaseServerStarter serverStarter : getSharedServerStarters()) { + TableDataManager tableDataManager = serverStarter.getServerInstance().getInstanceDataManager() + .getTableDataManager(realtimeTableName); + if (tableDataManager == null) { + continue; + } + List segmentDataManagers = tableDataManager.acquireAllSegments(); + try { + for (SegmentDataManager sdm : segmentDataManagers) { + IndexSegment segment = sdm.getSegment(); + + if (segment instanceof MutableSegmentImpl) { + /// New consuming segment that replaced a force-committed one: must still have the override applied. + DataSource overriddenDs = segment.getDataSource(STRING_COLUMN); + assertNotNull(overriddenDs.getDictionary(), + STRING_COLUMN + " must have a dictionary on the new consuming segment after forceCommit " + + "— consumingOverride must be re-applied to every consuming segment, not just the first one. " + + "Segment: " + sdm.getSegmentName()); + assertNotNull(overriddenDs.getInvertedIndex(), + STRING_COLUMN + " must have an inverted index on the new consuming segment after forceCommit. " + + "Segment: " + sdm.getSegmentName()); + newConsumingSegmentsInspected++; + continue; + } + if (!(segment instanceof ImmutableSegmentImpl)) { + continue; + } + + /// On-disk ColumnMetadata view: this is the authoritative persisted shape — independent of any in-memory + /// loader/lifecycle state that might mask a real regression where the override leaks to disk. + SegmentMetadata segmentMetadata = segment.getSegmentMetadata(); + ColumnMetadata overriddenMeta = segmentMetadata.getColumnMetadataFor(STRING_COLUMN); + assertNotNull(overriddenMeta, "ColumnMetadata missing for " + STRING_COLUMN); + assertFalse(overriddenMeta.hasDictionary(), + STRING_COLUMN + " on-disk ColumnMetadata must report hasDictionary=false. Segment: " + + sdm.getSegmentName()); + ColumnMetadata controlMeta = segmentMetadata.getColumnMetadataFor(STRING_COLUMN_NO_OVERRIDE); + assertTrue(controlMeta.hasDictionary(), + STRING_COLUMN_NO_OVERRIDE + " on-disk ColumnMetadata must report hasDictionary=true. Segment: " + + sdm.getSegmentName()); + + /// Cross-check via the on-disk SegmentDirectory reader so the assertion proves the inverted index file is + /// physically absent rather than merely lazy-loaded. + ImmutableSegmentImpl immutableSegment = (ImmutableSegmentImpl) segment; + File indexDir = immutableSegment.getSegmentMetadata().getIndexDir(); + try (SegmentDirectory segmentDirectory = new SegmentLocalFSDirectory(indexDir, ReadMode.mmap); + SegmentDirectory.Reader reader = segmentDirectory.createReader()) { + assertFalse(reader.hasIndexFor(STRING_COLUMN, StandardIndexes.inverted()), + STRING_COLUMN + " must not carry an inverted index on the immutable segment. Segment: " + + sdm.getSegmentName()); + assertFalse(reader.hasIndexFor(STRING_COLUMN, StandardIndexes.dictionary()), + STRING_COLUMN + " must not carry a dictionary on the immutable segment. Segment: " + + sdm.getSegmentName()); + } + + immutableSegmentsInspected++; + } + } finally { + for (SegmentDataManager sdm : segmentDataManagers) { + tableDataManager.releaseSegment(sdm); + } + } + } + assertTrue(immutableSegmentsInspected > 0, + "Expected at least one immutable (sealed) segment after forceCommit, got " + immutableSegmentsInspected); + assertTrue(newConsumingSegmentsInspected > 0, + "Expected at least one new consuming segment after forceCommit, got " + newConsumingSegmentsInspected); + + /// 7. Re-run a different filter to make sure the post-commit query path actually executes against the + /// immutable segment (which has no inverted index — exercises the brute-force scan path) and still returns + /// correct results. + long countBetaPostCommit = queryStringCount(STRING_VALUES[1]); + assertEquals(countBetaPostCommit, expectedAlphaCount, + "Post-commit count for " + STRING_VALUES[1] + " mismatch"); + } + + /// Triggers `forceCommit` on the realtime table and waits until the ExternalView shows at least one ONLINE + /// segment (the previously-consuming segment now sealed) and at most `getNumKafkaPartitions` CONSUMING segments + /// (the new mutable replacements). + private void forceCommitAndWait(String realtimeTableName) + throws Exception { + getOrCreateAdminClient().getTableClient().forceCommit(getTableName()); + TestUtils.waitForCondition(aVoid -> { + ExternalView externalView = getSharedHelixResourceManager().getTableExternalView(realtimeTableName); + if (externalView == null) { + return false; + } + int onlineSegments = 0; + for (String segmentName : externalView.getPartitionSet()) { + if (!LLCSegmentName.isLLCSegment(segmentName)) { + continue; + } + for (String state : externalView.getStateMap(segmentName).values()) { + if (CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE.equals(state)) { + onlineSegments++; + break; + } + } + } + return onlineSegments > 0; + }, 200L, 60_000L, "ExternalView did not converge to having ONLINE segments after forceCommit"); + } + + private long queryStringCount(String value) { + return getPinotConnection().execute( + "SELECT COUNT(*) FROM " + getTableName() + " WHERE " + STRING_COLUMN + " = '" + value + "'") + .getResultSet(0).getLong(0); + } +} diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CustomDataQueryClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CustomDataQueryClusterIntegrationTest.java index 865e87b3bb95..e954595dd4ca 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CustomDataQueryClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CustomDataQueryClusterIntegrationTest.java @@ -46,6 +46,7 @@ import org.apache.pinot.integration.tests.BaseClusterIntegrationTest; import org.apache.pinot.integration.tests.ClusterIntegrationTestUtils; import org.apache.pinot.plugin.stream.kafka.KafkaStreamConfigProperties; +import org.apache.pinot.server.starter.helix.BaseServerStarter; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.data.Schema; @@ -340,6 +341,13 @@ protected String getSharedKafkaBrokerList() { return _sharedClusterTestSuite.getKafkaBrokerList(); } + /// Returns the live server starters from the shared suite instance. Tests that need to inspect server-side state + /// (e.g. walk TableDataManager, acquire segments) must use this rather than `_serverStarters` on `this`, because in + /// suite mode only the shared instance has the populated list. + protected List getSharedServerStarters() { + return _sharedClusterTestSuite._serverStarters; + } + /** * Creates a Kafka topic on the shared suite's Kafka cluster. */ diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/writer/StatelessRealtimeSegmentWriter.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/writer/StatelessRealtimeSegmentWriter.java index c7f623b6c90f..1a4772d5d119 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/writer/StatelessRealtimeSegmentWriter.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/writer/StatelessRealtimeSegmentWriter.java @@ -43,6 +43,7 @@ import org.apache.pinot.segment.local.segment.creator.TransformPipeline; import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; import org.apache.pinot.segment.local.utils.IngestionUtils; +import org.apache.pinot.segment.local.utils.TableConfigUtils; import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory; import org.apache.pinot.spi.config.table.ColumnPartitionConfig; import org.apache.pinot.spi.config.table.SegmentPartitionConfig; @@ -180,9 +181,16 @@ public StatelessRealtimeSegmentWriter(SegmentZKMetadata segmentZKMetadata, Index File statsHistoryFile = new File(tableDataDir, SEGMENT_STATS_FILE_NAME); RealtimeSegmentStatsHistory statsHistory = RealtimeSegmentStatsHistory.deserializeFrom(statsHistoryFile); - // Initialize mutable segment with configurations + // Initialize mutable segment with configurations via the dispatch helper. Helper falls back to the + // indexLoadingConfig-driven Builder when no FieldConfig.consumingOverride is present (default), and switches + // to a TableConfig-driven Builder when one is — so a column can have a richer in-memory index shape (e.g. + // dictionary + inverted) than what is persisted. IngestionConfig ingestionConfig = _tableConfig.getIngestionConfig(); - RealtimeSegmentConfig.Builder realtimeSegmentConfigBuilder = new RealtimeSegmentConfig.Builder(indexLoadingConfig) + /// StatelessRealtimeSegmentWriter is a one-shot tool (segment re-ingestion) and has no per-instance metrics + /// pipeline; pass null for the fallback callback. The error log is sufficient for this offline use. + RealtimeSegmentConfig.Builder realtimeSegmentConfigBuilder = + TableConfigUtils.buildConsumingSegmentConfigBuilder(_tableConfig, _schema, indexLoadingConfig, _logger, null); + realtimeSegmentConfigBuilder .setTableNameWithType(_tableNameWithType) .setSegmentName(_segmentName) .setStreamName(_streamConfig.getTopicName()) diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/BaseSegmentCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/BaseSegmentCreator.java index 6fafe69cca6a..40f913ca20b5 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/BaseSegmentCreator.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/BaseSegmentCreator.java @@ -544,9 +544,7 @@ protected void writeMetadata() CommonsConfigurationUtils.saveToFile(properties, metadataFile); } - /** - * Adds column metadata information to the properties configuration. - */ + /// Adds column metadata information to the properties configuration. public static void addColumnMetadataInfo(PropertiesConfiguration properties, String column, ColumnStatistics columnStatistics, int totalDocs, FieldSpec fieldSpec, boolean hasDictionary, int dictionaryElementSize, boolean autoGenerated) { diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java index 76147e49b5b5..52744391aced 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java @@ -19,6 +19,7 @@ package org.apache.pinot.segment.local.utils; import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -48,7 +49,9 @@ import org.apache.pinot.common.utils.config.TagNameUtils; import org.apache.pinot.segment.local.aggregator.ValueAggregator; import org.apache.pinot.segment.local.aggregator.ValueAggregatorFactory; +import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig; import org.apache.pinot.segment.local.recordtransformer.SchemaConformingTransformer; +import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; import org.apache.pinot.segment.spi.AggregationFunctionType; import org.apache.pinot.segment.spi.index.DictionaryIndexConfig; import org.apache.pinot.segment.spi.index.FieldIndexConfigs; @@ -185,6 +188,7 @@ public static void validate(TableConfig tableConfig, Schema schema, @Nullable St } validateTierConfigList(tableConfig.getTierConfigsList()); validateIndexingConfigAndFieldConfigList(tableConfig, schema); + validateConsumingOverrides(tableConfig, schema); validateInstancePartitionsTypeMapConfig(tableConfig); validatePartitionedReplicaGroupInstance(tableConfig); validateInstanceAssignmentConfigs(tableConfig); @@ -2111,6 +2115,403 @@ private static void overwriteConfig(JsonNode oldCfg, JsonNode newCfg) { } } + // Top-level keys in `tableIndexConfig` that look like per-column collections (string-array of column + // names) but must NOT be touched by the consuming-override scrub. Anything else of string-array shape is treated + // as a per-column list and the overridden column name is stripped from it. Inverting the policy this way + // (deny-list rather than allow-list) keeps the scrub correct as new per-column index config keys are added to + // [IndexingConfig] - there is no hand-curated list to drift out of sync. Today every string-array in + // [IndexingConfig] IS a per-column list (`invertedIndexColumns`, `noDictionaryColumns`, + // `bloomFilterColumns`, `rangeIndexColumns`, `jsonIndexColumns`, `onHeapDictionaryColumns`, + // `varLengthDictionaryColumns`), so this is correct. **Important:** any future non-per-column string-array field + // added to [IndexingConfig] MUST be added to this exclusion set, otherwise the scrub will silently strip + // entries from it whenever an overridden column name happens to match. + // + // - sortedColumn is structurally tied to the segment layout; a separate invariant check rejects overrides + // on sorted columns. + // - starTreeIndexConfigs is a list of objects (not column names); leave it alone. + // - tierOverwrites is the tier-overlay JSON, handled by a separate transform. + private static final Set CONSUMING_OVERRIDE_SCRUB_EXCLUDED_KEYS = Set.of( + "sortedColumn", + "starTreeIndexConfigs", + "tierOverwrites"); + private static final String NO_DICTIONARY_CONFIG_KEY = "noDictionaryConfig"; + + /// Returns a [TableConfig] with each [FieldConfig#getConsumingOverride()] merged into its parent + /// [FieldConfig]. The result is intended for building the realtime *mutable consuming* segment only. + /// The committed/immutable segment and all loaded immutable segments use the un-overridden table config — that is + /// the persisted shape on disk. A typical use is keeping the table in raw encoding for storage efficiency while + /// giving the consuming segment a dictionary + inverted index for fast filtering during the consuming window. + /// + /// Supported override keys: `encodingType`, `indexes`. Other [FieldConfig] fields are intentionally not + /// overridable — see [#ALLOWED_CONSUMING_OVERRIDE_KEYS] for the rationale. + /// + /// Merge semantics: every supported field present in `consumingOverride` replaces the parent field wholesale. + /// For `indexes` the entire JSON subtree is replaced rather than merged key-by-key. + /// + /// For each overridden column, the column is also scrubbed from legacy per-column lists in `tableIndexConfig` + /// (e.g. `invertedIndexColumns`, `rangeIndexColumns`, ...) and `noDictionaryConfig` so that those legacy configs do + /// not contradict the merged [FieldConfig]. + /// + /// If no [FieldConfig] declares a consuming override, the original [TableConfig] is returned unchanged. + /// + /// @param tableConfig original table config; not mutated + /// @return a new table config with overrides merged, or the original config when no overrides are configured + public static TableConfig applyConsumingOverrides(TableConfig tableConfig) { + if (!hasConsumingOverride(tableConfig)) { + return tableConfig; + } + // Enforce the same invariants that `validateConsumingOverrides` enforces, so a hand-edited or + // older-controller-written TableConfig can't reach the merge with an unsupported override and silently produce + // a wrong-shape consuming segment. We re-check here (rather than relying solely on validate-time) because the + // merge runs on every server at consuming-segment build time and may see stale configs. + enforceConsumingOverrideInvariants(tableConfig); + try { + // Deep-copy the JSON tree before mutating it. `toJsonNode` may alias subtrees on JsonNode-typed fields + // of FieldConfig (notably `_indexes`, `_tierOverwrites`, `_consumingOverride`); mutating + // those in place would silently corrupt the cached TableConfig that other threads are concurrently reading. + JsonNode tblCfgJson = tableConfig.toJsonNode().deepCopy(); + JsonNode fieldCfgListJson = tblCfgJson.get(TableConfig.FIELD_CONFIG_LIST_KEY); + // hasConsumingOverride already proved at least one FieldConfig has a non-empty override, so the JSON view + // must surface the field config list as an array. If not, the TableConfig (de)serialization layer is + // inconsistent — fail loudly rather than silently shipping the un-overridden shape. + Preconditions.checkState(fieldCfgListJson != null && fieldCfgListJson.isArray(), + "fieldConfigList missing from JSON view of table %s despite hasConsumingOverride==true", + tableConfig.getTableName()); + JsonNode tblIdxCfgJson = tblCfgJson.get(TableConfig.INDEXING_CONFIG_KEY); + Set overriddenColumns = new HashSet<>(); + Iterator fieldCfgItr = fieldCfgListJson.elements(); + while (fieldCfgItr.hasNext()) { + JsonNode fieldCfgJson = fieldCfgItr.next(); + JsonNode overrideJson = fieldCfgJson.get(TableConfig.CONSUMING_OVERRIDE_KEY); + if (overrideJson == null || !overrideJson.isObject() || overrideJson.isEmpty()) { + continue; + } + JsonNode nameNode = fieldCfgJson.get("name"); + // FieldConfig's @JsonCreator already guarantees name is non-null, but we re-parsed via toJsonNode so guard + // defensively. A missing name is a bug — fail loudly rather than silently dropping the override. + Preconditions.checkState(nameNode != null && nameNode.isTextual(), + "FieldConfig with consumingOverride is missing a textual 'name' field in table: %s", + tableConfig.getTableName()); + overriddenColumns.add(nameNode.asText()); + ObjectNode fieldCfgObj = (ObjectNode) fieldCfgJson; + overwriteConfig(fieldCfgJson, overrideJson); + if (overrideJson.has("indexes")) { + fieldCfgObj.remove("indexType"); + fieldCfgObj.remove("indexTypes"); + } + // Remove the override marker so downstream consumers see only the merged shape. + fieldCfgObj.remove(TableConfig.CONSUMING_OVERRIDE_KEY); + } + Preconditions.checkState(!overriddenColumns.isEmpty(), + "no overridden columns produced from JSON view of table %s despite hasConsumingOverride==true", + tableConfig.getTableName()); + if (tblIdxCfgJson != null && tblIdxCfgJson.isObject()) { + scrubOverriddenColumnsFromTableIndexConfig((ObjectNode) tblIdxCfgJson, overriddenColumns); + } + return JsonUtils.jsonNodeToObject(tblCfgJson, TableConfig.class); + } catch (IOException e) { + // Surface override parse/deserialization failures (e.g. unknown keys, type mismatches) loudly so the bad + // config is rejected at validate time rather than silently falling back to the un-overridden shape — that + // would build the consuming segment with the wrong layout and the user would get no signal. + throw new IllegalStateException( + "Failed to apply consuming overrides for table: " + tableConfig.getTableName(), e); + } + } + + /// For each top-level entry in `tableIndexConfig` that holds an array of column-name strings (legacy + /// per-column lists like `invertedIndexColumns`, `noDictionaryColumns`, etc.), strip any name in + /// `overriddenColumns`. The legacy column-keyed `noDictionaryConfig` map is scrubbed the same way because its keys + /// are also RAW/no-dictionary signals. Entries whose key is in [#CONSUMING_OVERRIDE_SCRUB_EXCLUDED_KEYS], that are + /// not arrays, or that contain non-string elements, are left alone. + /// + /// Object-shaped values other than `noDictionaryConfig` (e.g. `segmentPartitionConfig.columnPartitionMap`, feature + /// configs) are intentionally NOT scrubbed: the heuristic "drop any object key matching an overridden column name" + /// would silently drop unrelated feature-config keys whose names happen to collide with a column name. Validation + /// already rejects overrides on partition columns up front. + private static void scrubOverriddenColumnsFromTableIndexConfig(ObjectNode tblIdxCfgObj, + Set overriddenColumns) { + Iterator> entries = tblIdxCfgObj.properties().iterator(); + while (entries.hasNext()) { + Map.Entry entry = entries.next(); + String key = entry.getKey(); + if (CONSUMING_OVERRIDE_SCRUB_EXCLUDED_KEYS.contains(key)) { + continue; + } + JsonNode value = entry.getValue(); + if (NO_DICTIONARY_CONFIG_KEY.equals(key) && value != null && value.isObject()) { + for (String overriddenColumn : overriddenColumns) { + ((ObjectNode) value).remove(overriddenColumn); + } + continue; + } + if (value == null || !value.isArray() || value.isEmpty()) { + continue; + } + ArrayNode arr = (ArrayNode) value; + for (JsonNode element : arr) { + if (!element.isTextual()) { + // Non-string-array entry — not a per-column list; leave alone. + arr = null; + break; + } + } + if (arr == null) { + continue; + } + List kept = new ArrayList<>(arr.size()); + for (JsonNode element : arr) { + if (!overriddenColumns.contains(element.asText())) { + kept.add(element); + } + } + if (kept.size() != arr.size()) { + ArrayNode replacement = JsonUtils.newArrayNode(); + for (JsonNode element : kept) { + replacement.add(element); + } + tblIdxCfgObj.set(key, replacement); + } + } + } + + /// Builds the [RealtimeSegmentConfig.Builder] used to construct a mutable consuming segment, applying any + /// [FieldConfig#getConsumingOverride()] present on the table config. When no override is configured, the + /// un-modified [IndexLoadingConfig]-driven builder is returned so existing code paths (tier overwrites, + /// instance-level mutations on `IndexLoadingConfig`) flow through unchanged. If the override merge throws (a + /// misconfiguration that slipped past validate), the call falls back to the `IndexLoadingConfig`-driven builder, + /// logs the error, and invokes `onFallback` so the caller can emit a metric / surface the degradation. The + /// fallback-applied builder produces a consuming segment matching the persisted shape (no override) for the + /// full lifetime of that consuming segment — there is no auto-retry; operators must reload the table after + /// fixing the override config to pick up the corrected shape on the next consuming-segment build. + /// + /// **IndexLoadingConfig contract on the override path:** the helper rebuilds a fresh [IndexLoadingConfig] from + /// `(indexLoadingConfig.getInstanceDataManagerConfig(), mergedTableConfig, schemaCopy)` and re-applies the + /// `segmentTier`. Any other in-place mutation the caller made on the supplied `indexLoadingConfig` (segment + /// version, star-tree configs, etc.) is **not** carried over on the override path. Callers that need such + /// mutations preserved must either set them on the returned Builder after this call, or extend this helper. + /// + /// **Schema clone:** the override path defensively deep-copies the schema via JSON round-trip because the + /// override-path `IndexLoadingConfig` constructor invokes `TimestampIndexUtils.applyTimestampIndex` which + /// mutates the schema under a lock keyed to the *new* (per-call) TableConfig instance. The non-override path + /// constructs `IndexLoadingConfig` once at table-data-manager-load time (not per-segment-build), so the lock + /// is not racing per call there. The clone is therefore needed only on the override path. TODO: fix + /// TimestampIndexUtils to not mutate its inputs so this clone can be removed. + /// + /// @param tableConfig source table config; not mutated + /// @param schema schema in scope; deep-copied before being passed to the override-path IndexLoadingConfig + /// @param indexLoadingConfig fallback / no-override path; on the override path only `instanceDataManagerConfig` + /// and `segmentTier` are read (see contract above) + /// @param logger logger to emit error on fallback + /// @param onFallback optional callback invoked when the override merge fails and the helper falls back to the + /// persisted shape; intended for the caller to bump a metric (e.g. + /// [ServerMeter#CONSUMING_OVERRIDE_FALLBACK]) tagged with its own table-name context. May + /// be `null` to skip metric emission (e.g. in unit tests). + /// @return a Builder ready for the caller to chain `.set...` calls and `.build()` + public static RealtimeSegmentConfig.Builder buildConsumingSegmentConfigBuilder(TableConfig tableConfig, + Schema schema, IndexLoadingConfig indexLoadingConfig, Logger logger, @Nullable Runnable onFallback) { + if (hasConsumingOverride(tableConfig)) { + try { + TableConfig consumingTableConfig = applyConsumingOverrides(tableConfig); + Schema schemaCopy; + try { + schemaCopy = JsonUtils.jsonNodeToObject(schema.toJsonObject(), Schema.class); + } catch (IOException ioe) { + throw new IllegalStateException("Failed to clone schema for consumingOverride path on table: " + + tableConfig.getTableName(), ioe); + } + IndexLoadingConfig consumingIlc = new IndexLoadingConfig(indexLoadingConfig.getInstanceDataManagerConfig(), + consumingTableConfig, schemaCopy); + /// `segmentVersion` and `readMode` are already derived by the constructor from the same + /// `instanceDataManagerConfig` + `consumingTableConfig.indexingConfig` inputs the source ILC used, so they + /// don't need to be copied across. Tier overlay, however, is set by the caller post-construction (it is + /// not in either input), so we re-apply it explicitly. + String segmentTier = indexLoadingConfig.getSegmentTier(); + if (segmentTier != null) { + consumingIlc.setSegmentTier(segmentTier); + } + return new RealtimeSegmentConfig.Builder(consumingIlc); + } catch (RuntimeException e) { + /// Include the override snippet so the consuming-segment build log line is self-sufficient for triage — + /// operators reading the log do not need to also fetch the table config from ZK to see which override + /// failed. The metric (CONSUMING_OVERRIDE_FALLBACK) is the alerting hook; this log is the diagnostic. + logger.error("Failed to apply consumingOverride for table: {} (override snippet: {}); falling back to " + + "persisted shape — consuming segment will run with the un-overridden shape for its full lifetime", + tableConfig.getTableName(), summarizeConsumingOverrides(tableConfig), e); + if (onFallback != null) { + onFallback.run(); + } + } + } + return new RealtimeSegmentConfig.Builder(indexLoadingConfig); + } + + /// Returns a `column → override` short summary for the failure log. + private static String summarizeConsumingOverrides(TableConfig tableConfig) { + List fieldConfigs = tableConfig.getFieldConfigList(); + if (fieldConfigs == null) { + return "{}"; + } + StringBuilder sb = new StringBuilder("{"); + boolean first = true; + for (FieldConfig fieldConfig : fieldConfigs) { + JsonNode override = fieldConfig.getConsumingOverride(); + if (override == null || override.isNull() || (override.isObject() && override.isEmpty())) { + continue; + } + if (!first) { + sb.append(", "); + } + sb.append(fieldConfig.getName()).append('=').append(override); + first = false; + } + sb.append('}'); + return sb.toString(); + } + + public static boolean hasConsumingOverride(TableConfig tableConfig) { + List fieldConfigList = tableConfig.getFieldConfigList(); + if (fieldConfigList == null) { + return false; + } + for (FieldConfig fieldConfig : fieldConfigList) { + JsonNode overrideJson = fieldConfig.getConsumingOverride(); + if (isConsumingOverrideConfigured(overrideJson)) { + return true; + } + } + return false; + } + + private static boolean isConsumingOverrideConfigured(@Nullable JsonNode overrideJson) { + return overrideJson != null && !overrideJson.isNull() && (!overrideJson.isObject() || !overrideJson.isEmpty()); + } + + // Allowlist of JSON keys that [FieldConfig#getConsumingOverride()] may carry. Anything outside this set is + // either a typo or a not-yet-supported override. We reject at validate time so misconfigurations surface up front + // rather than getting silently dropped by Jackson via `@JsonIgnoreProperties(ignoreUnknown = true)` on + // [org.apache.pinot.spi.config.BaseJsonConfig]. + // + // Scope is intentionally narrow to encoding + indexing only: + // - Allows `encodingType` (RAW vs DICTIONARY) as the high-level encoding lever. + // - Allows `indexes` (typed per-index JSON tree, e.g. `{"inverted": {"enabled": true}}`) - the + // canonical modern API for per-index configuration. + // - Excludes the legacy `indexType` (singular) and `indexTypes` (plural) flat lists: superseded by + // `indexes`. New features should use the typed JSON tree. + // - Excludes `compressionCodec`: only meaningful for raw-encoded columns on disk; the consuming segment's + // in-memory layout doesn't honor the on-disk codec the same way. + // - Excludes `properties` and `timestampConfig`: these influence cross-subsystem state + // (TransformPipeline, aggregation pipeline, timestamp index materialization) that does NOT consult the + // consuming-override view, so overriding them on the consuming side would cause silent inconsistency between + // the segment's index shape and the surrounding ingestion plumbing. + private static final Set ALLOWED_CONSUMING_OVERRIDE_KEYS = Set.of("encodingType", "indexes"); + + /// Validates [FieldConfig#getConsumingOverride()] entries on the given table config. The override only affects + /// the realtime mutable consuming segment, so the rules are limited: + /// + /// - Only realtime tables make sense as targets — the override is a no-op on offline tables and is rejected to + /// avoid silently misleading configurations. + /// - Each override JSON value must be an object, and that object must only contain keys from + /// [#ALLOWED_CONSUMING_OVERRIDE_KEYS]; typo'd keys + /// would otherwise be silently dropped by Jackson's `@JsonIgnoreProperties(ignoreUnknown = true)` on + /// [org.apache.pinot.spi.config.BaseJsonConfig]. + /// + /// The merged override-applied table config is also re-validated so any conflicting indexes specified inside + /// the override (e.g. requesting INVERTED while encoding=RAW) surface at validate time. + private static void validateConsumingOverrides(TableConfig tableConfig, Schema schema) { + if (!hasConsumingOverride(tableConfig)) { + return; + } + Set overriddenColumns = enforceConsumingOverrideInvariants(tableConfig); + + // Re-run index/fieldconfig validation against the merged consuming-side shape so conflicts inside the override + // (e.g. requesting INVERTED on a column whose merged encoding is RAW with no dictionary) surface here. Wrap any + // validation error with a clear pointer back to the consumingOverride that caused it so the user sees their + // override (not the merged config they didn't author) in the diagnostic. + TableConfig merged = applyConsumingOverrides(tableConfig); + if (merged != tableConfig) { + // Defensively clone the schema before re-validating: validateIndexingConfigAndFieldConfigList may construct + // an IndexLoadingConfig internally, whose constructor calls TimestampIndexUtils.applyTimestampIndex which + // mutates the schema. Match the same defensive-copy that buildConsumingSegmentConfigBuilder applies. + Schema schemaCopy; + try { + schemaCopy = JsonUtils.jsonNodeToObject(schema.toJsonObject(), Schema.class); + } catch (IOException ioe) { + throw new IllegalStateException( + "Failed to clone schema while validating consumingOverride for table: " + tableConfig.getTableName(), ioe); + } + try { + validateIndexingConfigAndFieldConfigList(merged, schemaCopy); + } catch (RuntimeException e) { + throw new IllegalStateException( + "FieldConfig.consumingOverride for one of " + overriddenColumns + + " produces an invalid merged shape: " + e.getMessage(), e); + } + } + } + + /// Shared invariant check used by both the validate path and the apply path so the rules can't drift. Verifies + /// the table is REALTIME, that every [FieldConfig#getConsumingOverride()] is a JSON object carrying only keys in + /// [#ALLOWED_CONSUMING_OVERRIDE_KEYS], and that overrides do not target sorted or partition columns. Returns the set + /// of column names that have a non-empty override. + private static Set enforceConsumingOverrideInvariants(TableConfig tableConfig) { + Preconditions.checkArgument(tableConfig.getTableType() == TableType.REALTIME, + "FieldConfig.consumingOverride is only supported on REALTIME tables; table %s is %s", + tableConfig.getTableName(), tableConfig.getTableType()); + Set overriddenColumns = new HashSet<>(); + List fieldConfigs = tableConfig.getFieldConfigList(); + if (fieldConfigs == null) { + return overriddenColumns; + } + for (FieldConfig fieldConfig : fieldConfigs) { + JsonNode overrideJson = fieldConfig.getConsumingOverride(); + if (overrideJson == null || overrideJson.isNull() || (overrideJson.isObject() && overrideJson.isEmpty())) { + continue; + } + Preconditions.checkArgument(overrideJson.isObject(), + "FieldConfig.consumingOverride must be a JSON object on column %s; got: %s", + fieldConfig.getName(), overrideJson.getNodeType()); + overriddenColumns.add(fieldConfig.getName()); + Iterator keys = overrideJson.fieldNames(); + while (keys.hasNext()) { + String key = keys.next(); + Preconditions.checkArgument(ALLOWED_CONSUMING_OVERRIDE_KEYS.contains(key), + "Unknown FieldConfig.consumingOverride key: '%s' on column %s; allowed keys: %s", + key, fieldConfig.getName(), ALLOWED_CONSUMING_OVERRIDE_KEYS); + } + } + enforceConsumingOverrideStructuralInvariants(tableConfig, overriddenColumns); + return overriddenColumns; + } + + private static void enforceConsumingOverrideStructuralInvariants(TableConfig tableConfig, + Set overriddenColumns) { + if (overriddenColumns.isEmpty() || tableConfig.getIndexingConfig() == null) { + return; + } + IndexingConfig indexingConfig = tableConfig.getIndexingConfig(); + + // sortedColumn is structural for the realtime consuming segment too — Builder(IndexLoadingConfig) auto-adds an + // inverted index to dictionary-enabled sorted columns. Allowing the override to change the sorted column's + // encoding/index shape would create a contradiction with that auto-handling. Reject up front. + List sortedColumns = indexingConfig.getSortedColumn(); + if (sortedColumns != null) { + for (String sortedColumn : sortedColumns) { + Preconditions.checkState(!overriddenColumns.contains(sortedColumn), + "FieldConfig.consumingOverride is not allowed on sorted column: %s", sortedColumn); + } + } + + // segmentPartitionConfig is keyed by column name; the consuming-side scrub would strip an overridden partition + // column, breaking partition routing on the consuming segment. Reject up front rather than over-scrub. + SegmentPartitionConfig segmentPartitionConfig = indexingConfig.getSegmentPartitionConfig(); + if (segmentPartitionConfig != null && segmentPartitionConfig.getColumnPartitionMap() != null) { + for (String partitionColumn : segmentPartitionConfig.getColumnPartitionMap().keySet()) { + Preconditions.checkState(!overriddenColumns.contains(partitionColumn), + "FieldConfig.consumingOverride is not allowed on partition column: %s", partitionColumn); + } + } + } + /** * Get the partition column from tableConfig instance assignment config map. * @param tableConfig table config diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverterTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverterTest.java index b8d1913cdaee..71bc04dd0620 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverterTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverterTest.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.segment.local.realtime.converter; +import com.fasterxml.jackson.databind.node.ObjectNode; import java.io.File; import java.io.IOException; import java.math.BigDecimal; @@ -47,6 +48,7 @@ import org.apache.pinot.segment.local.segment.index.text.TextIndexConfigBuilder; import org.apache.pinot.segment.local.segment.store.SegmentLocalFSDirectory; import org.apache.pinot.segment.local.segment.virtualcolumn.VirtualColumnProviderFactory; +import org.apache.pinot.segment.local.utils.TableConfigUtils; import org.apache.pinot.segment.spi.ColumnMetadata; import org.apache.pinot.segment.spi.creator.SegmentVersion; import org.apache.pinot.segment.spi.index.DictionaryIndexConfig; @@ -70,9 +72,11 @@ import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.readers.GenericRow; import org.apache.pinot.spi.utils.ByteArray; +import org.apache.pinot.spi.utils.JsonUtils; import org.apache.pinot.spi.utils.ReadMode; import org.apache.pinot.spi.utils.builder.TableConfigBuilder; import org.roaringbitmap.buffer.ImmutableRoaringBitmap; +import org.slf4j.LoggerFactory; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeClass; import org.testng.annotations.DataProvider; @@ -1588,6 +1592,112 @@ private SegmentZKMetadata getSegmentZKMetadata(String segmentName) { return segmentZKMetadata; } + @Test + public void testConsumingOverrideKeepsRawShapeOnImmutableSegment() + throws Exception { + // Table config has STRING_COLUMN1 as RAW with no indexes (the persisted/immutable shape). The consumingOverride + // lifts the consuming segment to DICTIONARY + INVERTED for fast filtering during the consuming window. After + // commit, the converter must produce an immutable segment that matches the persisted (RAW) shape — the override + // does NOT influence what gets written to disk. + File tmpDir = new File(TMP_DIR, "tmp_consuming_override_" + System.currentTimeMillis()); + + ObjectNode override = JsonUtils.newObjectNode(); + override.put("encodingType", FieldConfig.EncodingType.DICTIONARY.name()); + override.set("indexes", + JsonUtils.newObjectNode().set("inverted", JsonUtils.newObjectNode().put("enabled", true))); + FieldConfig fieldConfigWithOverride = new FieldConfig.Builder(STRING_COLUMN1) + .withEncodingType(FieldConfig.EncodingType.RAW) + .withConsumingOverride(override) + .build(); + + TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME) + .setTableName("testTableConsumingOverride") + .setTimeColumnName(DATE_TIME_COLUMN) + .setNoDictionaryColumns(List.of(STRING_COLUMN1)) + .setFieldConfigList(List.of(fieldConfigWithOverride)) + .setColumnMajorSegmentBuilderEnabled(false) + .build(); + Schema schema = new Schema.SchemaBuilder() + .setSchemaName("testTableConsumingOverride") + .addSingleValueDimension(STRING_COLUMN1, DataType.STRING) + .addSingleValueDimension(STRING_COLUMN2, DataType.STRING) + .addDateTime(DATE_TIME_COLUMN, DataType.LONG, "1:MILLISECONDS:EPOCH", "1:MILLISECONDS") + .build(); + + String tableNameWithType = tableConfig.getTableName(); + String segmentName = "testTableConsumingOverride__0__0__123456"; + + /// Mutable consuming segment: built via the production dispatch helper so STRING_COLUMN1 has a dictionary in + /// memory the same way RealtimeSegmentDataManager / StatelessRealtimeSegmentWriter would build it. Using the + /// dispatch helper here (instead of `new Builder(consumingTableConfig, schema)`) makes the test exercise the + /// real production code path so a regression in the helper is caught at unit-test time. + IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(tableConfig, schema); + RealtimeSegmentConfig realtimeSegmentConfig = + TableConfigUtils.buildConsumingSegmentConfigBuilder(tableConfig, schema, indexLoadingConfig, + LoggerFactory.getLogger(RealtimeSegmentConverterTest.class), null) + .setTableNameWithType(tableNameWithType) + .setSegmentName(segmentName) + .setStreamName(tableNameWithType) + .setSchema(schema) + .setTimeColumnName(DATE_TIME_COLUMN) + .setCapacity(1000) + .setAvgNumMultiValues(3) + .setSegmentZKMetadata(getSegmentZKMetadata(segmentName)) + .setOffHeap(true) + .setMemoryManager(new DirectMemoryManager(segmentName)) + .setStatsHistory(RealtimeSegmentStatsHistory.deserializeFrom(new File(tmpDir, "stats"))) + .setConsumerDir(new File(tmpDir, "consumerDir").getAbsolutePath()) + .build(); + + MutableSegmentImpl mutableSegmentImpl = new MutableSegmentImpl(realtimeSegmentConfig, null); + try { + for (int i = 0; i < 5; i++) { + GenericRow row = new GenericRow(); + row.putValue(STRING_COLUMN1, "Hello" + i); + row.putValue(STRING_COLUMN2, "World" + i); + row.putValue(DATE_TIME_COLUMN, 1697814309L + i); + mutableSegmentImpl.index(row, null); + } + + /// Sanity-check the consuming side: the override MUST have given STRING_COLUMN1 a dictionary AND an inverted + /// index in memory. If this fails, the integration test will fail too — keep this assertion as a fast + /// reproducer. + assertNotNull(mutableSegmentImpl.getDataSource(STRING_COLUMN1).getDictionary(), + STRING_COLUMN1 + " must have a dictionary on the consuming segment due to consumingOverride"); + assertNotNull(mutableSegmentImpl.getDataSource(STRING_COLUMN1).getInvertedIndex(), + STRING_COLUMN1 + " must have an inverted index on the consuming segment due to consumingOverride"); + + File outputDir = new File(tmpDir, "outputDir"); + SegmentZKPropsConfig segmentZKPropsConfig = new SegmentZKPropsConfig(); + segmentZKPropsConfig.setStartOffset("1"); + segmentZKPropsConfig.setEndOffset("100"); + // Converter receives the original (un-overridden) table config; the override is consuming-only. + RealtimeSegmentConverter converter = + new RealtimeSegmentConverter(mutableSegmentImpl, segmentZKPropsConfig, outputDir.getAbsolutePath(), schema, + tableNameWithType, tableConfig, segmentName, false); + converter.build(SegmentVersion.v3); + + File indexDir = new File(outputDir, segmentName); + SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(indexDir); + ColumnMetadata col1Metadata = segmentMetadata.getColumnMetadataFor(STRING_COLUMN1); + assertFalse(col1Metadata.hasDictionary(), + STRING_COLUMN1 + " should remain RAW-encoded on the immutable segment — consumingOverride only affects" + + " the consuming segment"); + ColumnMetadata col2Metadata = segmentMetadata.getColumnMetadataFor(STRING_COLUMN2); + assertTrue(col2Metadata.hasDictionary(), + STRING_COLUMN2 + " must keep its dictionary (default DICTIONARY encoding)"); + + // Inverted index added by the override must not appear on disk — the persisted shape has no inverted index. + try (SegmentDirectory segmentDirectory = new SegmentLocalFSDirectory(indexDir, ReadMode.mmap); + SegmentDirectory.Reader reader = segmentDirectory.createReader()) { + assertFalse(reader.hasIndexFor(STRING_COLUMN1, StandardIndexes.inverted()), + STRING_COLUMN1 + " must not carry an inverted index on the immutable segment"); + } + } finally { + mutableSegmentImpl.destroy(); + } + } + @AfterMethod public void tearDownTest() { FileUtils.deleteQuietly(TMP_DIR); diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigConsumingOverrideTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigConsumingOverrideTest.java new file mode 100644 index 000000000000..591f493baffc --- /dev/null +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigConsumingOverrideTest.java @@ -0,0 +1,814 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.utils; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig; +import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; +import org.apache.pinot.segment.spi.index.FieldIndexConfigs; +import org.apache.pinot.segment.spi.index.StandardIndexes; +import org.apache.pinot.spi.config.table.ColumnPartitionConfig; +import org.apache.pinot.spi.config.table.FieldConfig; +import org.apache.pinot.spi.config.table.IndexingConfig; +import org.apache.pinot.spi.config.table.SegmentPartitionConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.utils.JsonUtils; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.slf4j.LoggerFactory; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNotSame; +import static org.testng.Assert.assertSame; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; + + +/// Tests for [TableConfigUtils#applyConsumingOverrides(TableConfig)] and the validation rules around +/// [FieldConfig#getConsumingOverride()]. The override is consuming-segment-only — it merges into the [FieldConfig] +/// that drives the mutable consuming segment, while the persisted/immutable segment shape comes from the +/// un-overridden table config. +public class TableConfigConsumingOverrideTest { + private static final String TABLE_NAME = "overrideTable"; + private static final String TIME_COLUMN = "ts"; + + private static Schema buildSchema() { + return new Schema.SchemaBuilder().setSchemaName(TABLE_NAME) + .addSingleValueDimension("colA", FieldSpec.DataType.STRING) + .addSingleValueDimension("colB", FieldSpec.DataType.LONG) + .addSingleValueDimension("colC", FieldSpec.DataType.STRING) + .addDateTime(TIME_COLUMN, FieldSpec.DataType.LONG, "1:MILLISECONDS:EPOCH", "1:MILLISECONDS") + .build(); + } + + private static Map streamConfigs() { + return Map.of( + "streamType", "kafka", + "stream.kafka.consumer.type", "lowlevel", + "stream.kafka.topic.name", "test", + "stream.kafka.decoder.class.name", + "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder", + "stream.kafka.consumer.factory.class.name", + "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory", + "stream.kafka.broker.list", "localhost:9092", + "realtime.segment.flush.threshold.rows", "1000"); + } + + /// Builds a [FieldConfig] that on the persisted/immutable side is RAW with no indexes, but with a + /// `consumingOverride` that lifts it to DICTIONARY + INVERTED for the mutable consuming segment. + private static FieldConfig buildRawColumnWithRichConsumingOverride(String name, ObjectNode override) { + return new FieldConfig.Builder(name) + .withEncodingType(FieldConfig.EncodingType.RAW) + .withConsumingOverride(override) + .build(); + } + + @Test + public void applyConsumingOverridesIsNoOpWhenAbsent() { + TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME) + .setTimeColumnName(TIME_COLUMN) + .setStreamConfigs(streamConfigs()) + .setNoDictionaryColumns(List.of("colA")) + .build(); + assertSame(TableConfigUtils.applyConsumingOverrides(tableConfig), tableConfig); + } + + @Test + public void applyConsumingOverridesUpgradesEncodingAndIndexes() { + /// Persisted: RAW, no indexes. Consuming: DICTIONARY + INVERTED for fast filtering. + ObjectNode override = JsonUtils.newObjectNode(); + override.put("encodingType", FieldConfig.EncodingType.DICTIONARY.name()); + override.set("indexes", + JsonUtils.newObjectNode().set("inverted", JsonUtils.newObjectNode().put("enabled", true))); + FieldConfig overridden = buildRawColumnWithRichConsumingOverride("colA", override); + + TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME) + .setTimeColumnName(TIME_COLUMN) + .setStreamConfigs(streamConfigs()) + .setFieldConfigList(List.of(overridden)) + /// Persisted shape says noDictionary on colA — the consuming-side merge must scrub this so the consuming + /// segment can actually have a dictionary. + .setNoDictionaryColumns(List.of("colA")) + .build(); + + TableConfig consumingView = TableConfigUtils.applyConsumingOverrides(tableConfig); + assertNotSame(consumingView, tableConfig); + + FieldConfig consumingCol = consumingView.getFieldConfigList().get(0); + assertEquals(consumingCol.getName(), "colA"); + assertEquals(consumingCol.getEncodingType(), FieldConfig.EncodingType.DICTIONARY, + "Consuming-side encoding should be lifted to DICTIONARY by the override"); + JsonNode mergedIndexes = consumingCol.getIndexes(); + assertTrue(mergedIndexes.has("inverted"), + "Consuming-side indexes should contain the inverted entry from the override"); + assertTrue(mergedIndexes.path("inverted").path("enabled").asBoolean(false), + "Consuming-side inverted index should be enabled"); + JsonNode overrideMarker = consumingCol.getConsumingOverride(); + assertTrue(overrideMarker == null || overrideMarker.isNull() || overrideMarker.isEmpty(), + "consumingOverride marker should be cleared after merge, got: " + overrideMarker); + + /// colA must be scrubbed from noDictionaryColumns in the consuming view so the dictionary actually applies. + IndexingConfig indexingConfig = consumingView.getIndexingConfig(); + List noDictionaryColumns = indexingConfig.getNoDictionaryColumns(); + assertFalse(noDictionaryColumns != null && noDictionaryColumns.contains("colA"), + "colA should be scrubbed from noDictionaryColumns on the consuming view"); + + /// Original tableConfig must not have been mutated — persisted/immutable side stays RAW. + assertTrue(tableConfig.getIndexingConfig().getNoDictionaryColumns().contains("colA"), + "applyConsumingOverrides must not mutate the input table config"); + } + + @Test + public void applyConsumingOverridesScrubsNoDictionaryConfig() { + ObjectNode override = JsonUtils.newObjectNode(); + override.put("encodingType", FieldConfig.EncodingType.DICTIONARY.name()); + FieldConfig overridden = buildRawColumnWithRichConsumingOverride("colA", override); + + TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME) + .setTimeColumnName(TIME_COLUMN) + .setStreamConfigs(streamConfigs()) + .setFieldConfigList(List.of(overridden)) + .build(); + tableConfig.getIndexingConfig().setNoDictionaryConfig(Map.of("colA", "RAW", "colB", "RAW")); + + TableConfig consumingView = TableConfigUtils.applyConsumingOverrides(tableConfig); + Map noDictionaryConfig = consumingView.getIndexingConfig().getNoDictionaryConfig(); + assertFalse(noDictionaryConfig.containsKey("colA"), + "colA should be scrubbed from noDictionaryConfig on the consuming view"); + assertEquals(noDictionaryConfig.get("colB"), "RAW", + "unrelated noDictionaryConfig entries must be preserved"); + assertTrue(tableConfig.getIndexingConfig().getNoDictionaryConfig().containsKey("colA"), + "applyConsumingOverrides must not mutate the input noDictionaryConfig"); + } + + @Test + public void applyConsumingOverridesPreservesUnrelatedColumns() { + ObjectNode override = JsonUtils.newObjectNode(); + override.put("encodingType", FieldConfig.EncodingType.DICTIONARY.name()); + FieldConfig overridden = buildRawColumnWithRichConsumingOverride("colA", override); + FieldConfig regular = new FieldConfig.Builder("colB") + .withEncodingType(FieldConfig.EncodingType.RAW) + .build(); + + TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME) + .setTimeColumnName(TIME_COLUMN) + .setStreamConfigs(streamConfigs()) + .setFieldConfigList(List.of(overridden, regular)) + .build(); + + TableConfig consumingView = TableConfigUtils.applyConsumingOverrides(tableConfig); + FieldConfig consumingB = consumingView.getFieldConfigList().stream() + .filter(fc -> "colB".equals(fc.getName())).findFirst().orElseThrow(); + assertEquals(consumingB.getEncodingType(), FieldConfig.EncodingType.RAW); + } + + @Test + public void applyConsumingOverridesIgnoresEmptyOverride() { + FieldConfig empty = new FieldConfig.Builder("colA") + .withEncodingType(FieldConfig.EncodingType.RAW) + .withConsumingOverride(JsonUtils.newObjectNode()) + .build(); + + TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME) + .setTimeColumnName(TIME_COLUMN) + .setStreamConfigs(streamConfigs()) + .setFieldConfigList(List.of(empty)) + .build(); + + assertSame(TableConfigUtils.applyConsumingOverrides(tableConfig), tableConfig, + "Empty override object must be treated as no-op"); + } + + @Test + public void validateRejectsConsumingOverrideOnOfflineTable() { + ObjectNode override = JsonUtils.newObjectNode(); + override.put("encodingType", FieldConfig.EncodingType.DICTIONARY.name()); + FieldConfig overridden = buildRawColumnWithRichConsumingOverride("colA", override); + + TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME) + .setTimeColumnName(TIME_COLUMN) + .setFieldConfigList(List.of(overridden)) + .build(); + + try { + TableConfigUtils.validate(tableConfig, buildSchema()); + fail("Expected validation failure for consuming override on offline table"); + } catch (IllegalArgumentException e) { + assertTrue(e.getMessage().contains("REALTIME"), + "Expected message to mention REALTIME requirement, got: " + e.getMessage()); + } + } + + @Test + public void validateRejectsTypoedOverrideKey() { + // Jackson's @JsonIgnoreProperties(ignoreUnknown = true) on BaseJsonConfig would otherwise drop a typo'd key + // silently. validateConsumingOverrides enforces an explicit allowlist so the user gets a clear error before the + // table config is persisted. + ObjectNode override = JsonUtils.newObjectNode(); + override.put("encodingTpye", FieldConfig.EncodingType.DICTIONARY.name()); + FieldConfig overridden = buildRawColumnWithRichConsumingOverride("colA", override); + + TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME) + .setTimeColumnName(TIME_COLUMN) + .setStreamConfigs(streamConfigs()) + .setFieldConfigList(List.of(overridden)) + .build(); + + try { + TableConfigUtils.validate(tableConfig, buildSchema()); + fail("Expected validation failure for typo'd override key"); + } catch (IllegalArgumentException e) { + assertTrue(e.getMessage().contains("Unknown FieldConfig.consumingOverride key"), + "Expected unknown-key message, got: " + e.getMessage()); + assertTrue(e.getMessage().contains("encodingTpye"), + "Expected message to surface the bad key, got: " + e.getMessage()); + } + } + + @Test + public void validateRejectsNonObjectConsumingOverride() { + FieldConfig overridden = new FieldConfig.Builder("colA") + .withEncodingType(FieldConfig.EncodingType.RAW) + .withConsumingOverride(JsonUtils.newArrayNode()) + .build(); + + TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME) + .setTimeColumnName(TIME_COLUMN) + .setStreamConfigs(streamConfigs()) + .setFieldConfigList(List.of(overridden)) + .build(); + + try { + TableConfigUtils.validate(tableConfig, buildSchema()); + fail("Expected validation failure for non-object consumingOverride"); + } catch (IllegalArgumentException e) { + assertTrue(e.getMessage().contains("must be a JSON object"), + "Expected object-shape message, got: " + e.getMessage()); + } + } + + @Test + public void validateAcceptsValidConsumingOverride() { + ObjectNode override = JsonUtils.newObjectNode(); + override.put("encodingType", FieldConfig.EncodingType.DICTIONARY.name()); + override.set("indexes", + JsonUtils.newObjectNode().set("inverted", JsonUtils.newObjectNode().put("enabled", true))); + FieldConfig overridden = buildRawColumnWithRichConsumingOverride("colA", override); + + TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME) + .setTimeColumnName(TIME_COLUMN) + .setStreamConfigs(streamConfigs()) + .setFieldConfigList(List.of(overridden)) + .setNoDictionaryColumns(List.of("colA")) + .build(); + + // Should not throw — colA is RAW persisted, DICTIONARY + INVERTED on consuming. + TableConfigUtils.validate(tableConfig, buildSchema()); + } + + @Test + public void validateAcceptsValidConsumingOverrideWithNoDictionaryConfig() { + ObjectNode override = JsonUtils.newObjectNode(); + override.put("encodingType", FieldConfig.EncodingType.DICTIONARY.name()); + override.set("indexes", + JsonUtils.newObjectNode().set("inverted", JsonUtils.newObjectNode().put("enabled", true))); + FieldConfig overridden = buildRawColumnWithRichConsumingOverride("colA", override); + + TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME) + .setTimeColumnName(TIME_COLUMN) + .setStreamConfigs(streamConfigs()) + .setFieldConfigList(List.of(overridden)) + .build(); + tableConfig.getIndexingConfig().setNoDictionaryConfig(Map.of("colA", "RAW")); + + TableConfigUtils.validate(tableConfig, buildSchema()); + } + + @Test + public void applyConsumingOverridesIsIdempotent() { + // Calling applyConsumingOverrides twice on the same input should not corrupt the merged shape: the second call + // sees a config without overrides (because the override marker is stripped on first merge) and returns it + // unchanged. This guards against accidental double-application during tests or future refactors. + ObjectNode override = JsonUtils.newObjectNode(); + override.put("encodingType", FieldConfig.EncodingType.DICTIONARY.name()); + FieldConfig overridden = buildRawColumnWithRichConsumingOverride("colA", override); + + TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME) + .setTimeColumnName(TIME_COLUMN) + .setStreamConfigs(streamConfigs()) + .setFieldConfigList(List.of(overridden)) + .build(); + + TableConfig once = TableConfigUtils.applyConsumingOverrides(tableConfig); + TableConfig twice = TableConfigUtils.applyConsumingOverrides(once); + assertSame(twice, once, + "Second applyConsumingOverrides on a merged config must be a no-op (override marker already cleared)"); + } + + @Test + public void applyConsumingOverridesRejectsTypoedKeyOnStaleConfig() { + // Simulates a hand-edited config that bypassed validate(): the merge utility itself must reject typo'd keys so + // the consuming segment never gets built with a silently-dropped override. + ObjectNode override = JsonUtils.newObjectNode(); + override.put("encodingTpye", FieldConfig.EncodingType.DICTIONARY.name()); + FieldConfig overridden = buildRawColumnWithRichConsumingOverride("colA", override); + + TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME) + .setTimeColumnName(TIME_COLUMN) + .setStreamConfigs(streamConfigs()) + .setFieldConfigList(List.of(overridden)) + .build(); + + try { + TableConfigUtils.applyConsumingOverrides(tableConfig); + fail("Expected applyConsumingOverrides to reject a typo'd override key"); + } catch (IllegalArgumentException e) { + assertTrue(e.getMessage().contains("Unknown FieldConfig.consumingOverride key"), + "Expected unknown-key message, got: " + e.getMessage()); + } + } + + @Test + public void applyConsumingOverridesRejectsNonObjectOverrideOnStaleConfig() { + FieldConfig overridden = new FieldConfig.Builder("colA") + .withEncodingType(FieldConfig.EncodingType.RAW) + .withConsumingOverride(JsonUtils.newArrayNode()) + .build(); + + TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME) + .setTimeColumnName(TIME_COLUMN) + .setStreamConfigs(streamConfigs()) + .setFieldConfigList(List.of(overridden)) + .build(); + + try { + TableConfigUtils.applyConsumingOverrides(tableConfig); + fail("Expected applyConsumingOverrides to reject a non-object override"); + } catch (IllegalArgumentException e) { + assertTrue(e.getMessage().contains("must be a JSON object"), + "Expected object-shape message, got: " + e.getMessage()); + } + } + + @Test + public void applyConsumingOverridesRejectsOnOfflineTable() { + // Same defense-in-depth: an offline table that somehow carries a consumingOverride must be rejected at apply + // time, not silently treated as if it were realtime. + ObjectNode override = JsonUtils.newObjectNode(); + override.put("encodingType", FieldConfig.EncodingType.DICTIONARY.name()); + FieldConfig overridden = buildRawColumnWithRichConsumingOverride("colA", override); + + TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME) + .setTimeColumnName(TIME_COLUMN) + .setFieldConfigList(List.of(overridden)) + .build(); + + try { + TableConfigUtils.applyConsumingOverrides(tableConfig); + fail("Expected applyConsumingOverrides to reject offline table with consumingOverride"); + } catch (IllegalArgumentException e) { + assertTrue(e.getMessage().contains("REALTIME"), + "Expected REALTIME message, got: " + e.getMessage()); + } + } + + @Test + public void validateRejectsPropertiesKeyOutsideAllowlist() { + // 'properties' is intentionally NOT in the allowlist because TransformPipeline / aggregation read FieldConfig + // properties from the un-overridden table config, which would create silent inconsistency. Confirm the user + // gets an explicit error rather than silently-applied-then-ignored behavior. + ObjectNode override = JsonUtils.newObjectNode(); + override.put("encodingType", FieldConfig.EncodingType.DICTIONARY.name()); + override.set("properties", JsonUtils.newObjectNode().put("foo", "bar")); + FieldConfig overridden = buildRawColumnWithRichConsumingOverride("colA", override); + + TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME) + .setTimeColumnName(TIME_COLUMN) + .setStreamConfigs(streamConfigs()) + .setFieldConfigList(List.of(overridden)) + .build(); + + try { + TableConfigUtils.validate(tableConfig, buildSchema()); + fail("Expected validation failure for unsupported override key 'properties'"); + } catch (IllegalArgumentException e) { + assertTrue(e.getMessage().contains("Unknown FieldConfig.consumingOverride key"), + "Expected unknown-key message, got: " + e.getMessage()); + assertTrue(e.getMessage().contains("properties"), + "Expected message to surface the bad key, got: " + e.getMessage()); + } + } + + @Test + public void validateRejectsIndexTypesKeyOutsideAllowlist() { + /// `indexTypes` is the legacy flat-list API; the consumingOverride allowlist is the modern typed `indexes` + /// JSON tree. Confirm the user gets an explicit error rather than a silently-honored override. + ObjectNode override = JsonUtils.newObjectNode(); + override.put("encodingType", FieldConfig.EncodingType.DICTIONARY.name()); + override.set("indexTypes", JsonUtils.newArrayNode().add(FieldConfig.IndexType.INVERTED.name())); + FieldConfig overridden = buildRawColumnWithRichConsumingOverride("colA", override); + + TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME) + .setTimeColumnName(TIME_COLUMN) + .setStreamConfigs(streamConfigs()) + .setFieldConfigList(List.of(overridden)) + .build(); + + try { + TableConfigUtils.validate(tableConfig, buildSchema()); + fail("Expected validation failure for unsupported override key 'indexTypes'"); + } catch (IllegalArgumentException e) { + assertTrue(e.getMessage().contains("Unknown FieldConfig.consumingOverride key"), + "Expected unknown-key message, got: " + e.getMessage()); + assertTrue(e.getMessage().contains("indexTypes"), + "Expected message to surface the bad key, got: " + e.getMessage()); + } + } + + @Test + public void applyConsumingOverridesClearsLegacyIndexTypesWhenIndexesOverridePresent() { + ObjectNode override = JsonUtils.newObjectNode(); + override.set("indexes", + JsonUtils.newObjectNode().set("inverted", JsonUtils.newObjectNode().put("enabled", true))); + FieldConfig overridden = new FieldConfig.Builder("colA") + .withEncodingType(FieldConfig.EncodingType.DICTIONARY) + .withIndexTypes(List.of(FieldConfig.IndexType.INVERTED)) + .withConsumingOverride(override) + .build(); + + TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME) + .setTimeColumnName(TIME_COLUMN) + .setStreamConfigs(streamConfigs()) + .setFieldConfigList(List.of(overridden)) + .build(); + + TableConfig consumingView = TableConfigUtils.applyConsumingOverrides(tableConfig); + FieldConfig consumingCol = consumingView.getFieldConfigList().get(0); + assertTrue(consumingCol.getIndexTypes().isEmpty(), + "consumingOverride.indexes must replace and clear legacy indexTypes"); + assertTrue(consumingCol.getIndexes().path("inverted").path("enabled").asBoolean(false), + "modern indexes override should be preserved"); + assertEquals(tableConfig.getFieldConfigList().get(0).getIndexTypes(), List.of(FieldConfig.IndexType.INVERTED), + "applyConsumingOverrides must not mutate legacy indexTypes on the input"); + } + + @Test + public void applyConsumingOverridesScrubsAllPerColumnLists() { + // Confirm the deny-list scrub touches every per-column list/map in tableIndexConfig, not just a hard-coded + // subset. Colour the table with several per-column collections that all reference colA. + ObjectNode override = JsonUtils.newObjectNode(); + override.put("encodingType", FieldConfig.EncodingType.DICTIONARY.name()); + FieldConfig overridden = buildRawColumnWithRichConsumingOverride("colA", override); + + TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME) + .setTimeColumnName(TIME_COLUMN) + .setStreamConfigs(streamConfigs()) + .setFieldConfigList(List.of(overridden)) + .setNoDictionaryColumns(List.of("colA", "colB")) + .setInvertedIndexColumns(List.of("colA", "colB")) + .setRangeIndexColumns(List.of("colA")) + .setBloomFilterColumns(List.of("colA")) + .setVarLengthDictionaryColumns(List.of("colA")) + .build(); + + TableConfig consumingView = TableConfigUtils.applyConsumingOverrides(tableConfig); + IndexingConfig indexingConfig = consumingView.getIndexingConfig(); + assertFalse(indexingConfig.getNoDictionaryColumns().contains("colA"), + "colA must be scrubbed from noDictionaryColumns"); + assertTrue(indexingConfig.getNoDictionaryColumns().contains("colB"), + "colB must remain in noDictionaryColumns"); + assertFalse(indexingConfig.getInvertedIndexColumns().contains("colA"), + "colA must be scrubbed from invertedIndexColumns"); + assertFalse(indexingConfig.getRangeIndexColumns() != null + && indexingConfig.getRangeIndexColumns().contains("colA"), + "colA must be scrubbed from rangeIndexColumns"); + assertFalse(indexingConfig.getBloomFilterColumns() != null + && indexingConfig.getBloomFilterColumns().contains("colA"), + "colA must be scrubbed from bloomFilterColumns"); + assertFalse(indexingConfig.getVarLengthDictionaryColumns() != null + && indexingConfig.getVarLengthDictionaryColumns().contains("colA"), + "colA must be scrubbed from varLengthDictionaryColumns"); + } + + @Test + public void buildConsumingSegmentConfigBuilderUsesIndexLoadingConfigWhenNoOverride() { + TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME) + .setTimeColumnName(TIME_COLUMN) + .setStreamConfigs(streamConfigs()) + .setInvertedIndexColumns(List.of("colA")) + .build(); + IndexLoadingConfig ilc = new IndexLoadingConfig(tableConfig, buildSchema()); + RealtimeSegmentConfig.Builder builder = TableConfigUtils.buildConsumingSegmentConfigBuilder( + tableConfig, buildSchema(), ilc, LoggerFactory.getLogger(TableConfigConsumingOverrideTest.class), null); + RealtimeSegmentConfig built = builder.build(); + FieldIndexConfigs colA = built.getIndexConfigByCol().get("colA"); + assertTrue(colA != null && colA.getConfig(StandardIndexes.inverted()).isEnabled(), + "no-override path must yield IndexLoadingConfig-driven shape (inverted index enabled on colA)"); + } + + @Test + public void buildConsumingSegmentConfigBuilderAppliesOverrideWhenPresent() { + ObjectNode override = JsonUtils.newObjectNode(); + override.put("encodingType", FieldConfig.EncodingType.DICTIONARY.name()); + override.set("indexes", + JsonUtils.newObjectNode().set("inverted", JsonUtils.newObjectNode().put("enabled", true))); + FieldConfig overridden = buildRawColumnWithRichConsumingOverride("colA", override); + + TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME) + .setTimeColumnName(TIME_COLUMN) + .setStreamConfigs(streamConfigs()) + .setFieldConfigList(List.of(overridden)) + .setNoDictionaryColumns(List.of("colA")) + .build(); + IndexLoadingConfig ilc = new IndexLoadingConfig(tableConfig, buildSchema()); + RealtimeSegmentConfig.Builder builder = TableConfigUtils.buildConsumingSegmentConfigBuilder( + tableConfig, buildSchema(), ilc, LoggerFactory.getLogger(TableConfigConsumingOverrideTest.class), null); + RealtimeSegmentConfig built = builder.build(); + FieldIndexConfigs colA = built.getIndexConfigByCol().get("colA"); + assertTrue(colA.getConfig(StandardIndexes.dictionary()).isEnabled(), + "override must lift colA to dictionary on the consuming-side builder"); + assertTrue(colA.getConfig(StandardIndexes.inverted()).isEnabled(), + "override must add inverted on colA on the consuming-side builder"); + } + + @Test + public void applyConsumingOverridesDoesNotMutateInput() { + /// The input TableConfig is shared across server threads (cached on the data manager). A successful merge must + /// not mutate any of its sub-objects — particularly the JsonNode-typed fields on FieldConfig (consumingOverride, + /// indexes, tierOverwrites) which are at risk of aliasing through tableConfig.toJsonNode(). + ObjectNode override = JsonUtils.newObjectNode(); + override.put("encodingType", FieldConfig.EncodingType.DICTIONARY.name()); + override.set("indexes", + JsonUtils.newObjectNode().set("inverted", JsonUtils.newObjectNode().put("enabled", true))); + FieldConfig overridden = buildRawColumnWithRichConsumingOverride("colA", override); + + TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME) + .setTimeColumnName(TIME_COLUMN) + .setStreamConfigs(streamConfigs()) + .setFieldConfigList(List.of(overridden)) + .setNoDictionaryColumns(List.of("colA")) + .build(); + + // Snapshot the input shape. + String beforeJson = tableConfig.toJsonString(); + JsonNode beforeOverride = tableConfig.getFieldConfigList().get(0).getConsumingOverride(); + String beforeOverrideJson = beforeOverride.toString(); + + TableConfigUtils.applyConsumingOverrides(tableConfig); + + // Source TableConfig must be byte-for-byte identical, AND its consumingOverride sub-tree must still hold the + // original override (i.e. not stripped of encodingType / indexTypes by the in-place merge). + assertEquals(tableConfig.toJsonString(), beforeJson, + "applyConsumingOverrides must not mutate the input TableConfig"); + assertEquals(tableConfig.getFieldConfigList().get(0).getConsumingOverride().toString(), beforeOverrideJson, + "applyConsumingOverrides must not mutate the input FieldConfig.consumingOverride sub-tree"); + assertTrue(tableConfig.getIndexingConfig().getNoDictionaryColumns().contains("colA"), + "applyConsumingOverrides must not mutate the input IndexingConfig per-column lists"); + } + + @Test + public void fieldConfigJsonWithoutConsumingOverrideStillDeserializes() + throws Exception { + /// Backward-compat: any FieldConfig JSON written by an older controller (no consumingOverride field) must + /// deserialize cleanly; getConsumingOverride() returns a NullNode (matching sibling getIndexes() + /// / getTierOverwrites() contract). + String legacyJson = "{" + + "\"name\":\"colA\"," + + "\"encodingType\":\"RAW\"," + + "\"indexTypes\":[]" + + "}"; + FieldConfig fc = JsonUtils.stringToObject(legacyJson, FieldConfig.class); + assertEquals(fc.getName(), "colA"); + assertEquals(fc.getEncodingType(), FieldConfig.EncodingType.RAW); + JsonNode override = fc.getConsumingOverride(); + assertNotNull(override, "getConsumingOverride() must never return null"); + assertTrue(override.isNull(), "Absent override must be a NullNode, got: " + override); + } + + @Test + public void validateRejectsConsumingOverrideOnSortedColumn() { + /// sortedColumn is structural for the consuming segment — Builder(IndexLoadingConfig) auto-adds an inverted + /// index on dictionary-enabled sorted columns. Allowing the override would create a contradiction with that + /// auto-handling. + ObjectNode override = JsonUtils.newObjectNode(); + override.put("encodingType", FieldConfig.EncodingType.DICTIONARY.name()); + FieldConfig overridden = buildRawColumnWithRichConsumingOverride("colA", override); + + TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME) + .setTimeColumnName(TIME_COLUMN) + .setStreamConfigs(streamConfigs()) + .setFieldConfigList(List.of(overridden)) + .setSortedColumn("colA") + .build(); + + try { + TableConfigUtils.validate(tableConfig, buildSchema()); + fail("Expected validation failure for consuming override on sorted column"); + } catch (IllegalStateException e) { + assertTrue(e.getMessage().contains("sorted column"), + "Expected sorted-column message, got: " + e.getMessage()); + } + } + + @Test + public void applyConsumingOverridesRejectsSortedColumnOnStaleConfig() { + ObjectNode override = JsonUtils.newObjectNode(); + override.put("encodingType", FieldConfig.EncodingType.DICTIONARY.name()); + FieldConfig overridden = buildRawColumnWithRichConsumingOverride("colA", override); + + TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME) + .setTimeColumnName(TIME_COLUMN) + .setStreamConfigs(streamConfigs()) + .setFieldConfigList(List.of(overridden)) + .setSortedColumn("colA") + .build(); + + try { + TableConfigUtils.applyConsumingOverrides(tableConfig); + fail("Expected applyConsumingOverrides to reject sorted column override"); + } catch (IllegalStateException e) { + assertTrue(e.getMessage().contains("sorted column"), + "Expected sorted-column message, got: " + e.getMessage()); + } + } + + @Test + public void validateRejectsCompressionCodecKeyOutsideAllowlist() { + /// `compressionCodec` is intentionally NOT in the allowlist — only `encodingType` + `indexes` are allowed. + ObjectNode override = JsonUtils.newObjectNode(); + override.put("encodingType", FieldConfig.EncodingType.RAW.name()); + override.put("compressionCodec", FieldConfig.CompressionCodec.LZ4.name()); + FieldConfig overridden = buildRawColumnWithRichConsumingOverride("colA", override); + + TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME) + .setTimeColumnName(TIME_COLUMN) + .setStreamConfigs(streamConfigs()) + .setFieldConfigList(List.of(overridden)) + .build(); + + try { + TableConfigUtils.validate(tableConfig, buildSchema()); + fail("Expected validation failure for unsupported override key 'compressionCodec'"); + } catch (IllegalArgumentException e) { + assertTrue(e.getMessage().contains("Unknown FieldConfig.consumingOverride key"), + "Expected unknown-key message, got: " + e.getMessage()); + assertTrue(e.getMessage().contains("compressionCodec"), + "Expected message to surface the bad key, got: " + e.getMessage()); + } + } + + @Test + public void buildConsumingSegmentConfigBuilderFallsBackOnInvariantFailure() { + /// Simulates a stale config that bypassed validate: the override carries a typo'd key. The dispatch helper + /// must catch the resulting RuntimeException, log, and fall back to the IndexLoadingConfig-driven Builder so + /// consumption keeps making forward progress on the persisted shape. + ObjectNode override = JsonUtils.newObjectNode(); + override.put("encodingTpye", FieldConfig.EncodingType.DICTIONARY.name()); + FieldConfig overridden = buildRawColumnWithRichConsumingOverride("colA", override); + + TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME) + .setTimeColumnName(TIME_COLUMN) + .setStreamConfigs(streamConfigs()) + .setFieldConfigList(List.of(overridden)) + .setNoDictionaryColumns(List.of("colA")) + .build(); + IndexLoadingConfig ilc = new IndexLoadingConfig(tableConfig, buildSchema()); + /// Track that the onFallback callback fired — this is the hook RealtimeSegmentDataManager uses to bump the + /// CONSUMING_OVERRIDE_FALLBACK metric, so the test asserts the contract the production caller relies on. + AtomicInteger fallbackInvocations = new AtomicInteger(); + /// Should NOT throw — the helper catches and logs. + RealtimeSegmentConfig.Builder builder = TableConfigUtils.buildConsumingSegmentConfigBuilder( + tableConfig, buildSchema(), ilc, LoggerFactory.getLogger(TableConfigConsumingOverrideTest.class), + fallbackInvocations::incrementAndGet); + RealtimeSegmentConfig built = builder.build(); + FieldIndexConfigs colA = built.getIndexConfigByCol().get("colA"); + /// Persisted shape on colA is RAW (no dictionary); the fallback must reflect that, NOT the override. + assertFalse(colA.getConfig(StandardIndexes.dictionary()).isEnabled(), + "Fallback path must use the persisted RAW shape, not the override-attempted dictionary shape"); + assertEquals(fallbackInvocations.get(), 1, + "onFallback callback must be invoked exactly once when the override merge fails"); + } + + @Test + public void validateRejectsConsumingOverrideOnPartitionColumn() { + ObjectNode override = JsonUtils.newObjectNode(); + override.put("encodingType", FieldConfig.EncodingType.DICTIONARY.name()); + FieldConfig overridden = buildRawColumnWithRichConsumingOverride("colA", override); + + ColumnPartitionConfig partitionCfg = new ColumnPartitionConfig("Murmur", 4); + SegmentPartitionConfig segmentPartitionConfig = new SegmentPartitionConfig(Map.of("colA", partitionCfg)); + + TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME) + .setTimeColumnName(TIME_COLUMN) + .setStreamConfigs(streamConfigs()) + .setFieldConfigList(List.of(overridden)) + .setSegmentPartitionConfig(segmentPartitionConfig) + .build(); + + try { + TableConfigUtils.validate(tableConfig, buildSchema()); + fail("Expected validation failure for consuming override on partition column"); + } catch (IllegalStateException e) { + assertTrue(e.getMessage().contains("partition column"), + "Expected partition-column message, got: " + e.getMessage()); + } + } + + @Test + public void applyConsumingOverridesRejectsPartitionColumnOnStaleConfig() { + ObjectNode override = JsonUtils.newObjectNode(); + override.put("encodingType", FieldConfig.EncodingType.DICTIONARY.name()); + FieldConfig overridden = buildRawColumnWithRichConsumingOverride("colA", override); + + ColumnPartitionConfig partitionCfg = new ColumnPartitionConfig("Murmur", 4); + SegmentPartitionConfig segmentPartitionConfig = new SegmentPartitionConfig(Map.of("colA", partitionCfg)); + + TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME) + .setTimeColumnName(TIME_COLUMN) + .setStreamConfigs(streamConfigs()) + .setFieldConfigList(List.of(overridden)) + .setSegmentPartitionConfig(segmentPartitionConfig) + .build(); + + try { + TableConfigUtils.applyConsumingOverrides(tableConfig); + fail("Expected applyConsumingOverrides to reject partition column override"); + } catch (IllegalStateException e) { + assertTrue(e.getMessage().contains("partition column"), + "Expected partition-column message, got: " + e.getMessage()); + } + } + + @Test + public void tableConfigJsonRoundTripPreservesConsumingOverride() + throws Exception { + /// Mirrors the production code path in applyConsumingOverrides which uses tableConfig.toJsonNode() — guarantees + /// the override survives the same serialization/deserialization that the merge utility relies on. + ObjectNode override = JsonUtils.newObjectNode(); + override.put("encodingType", FieldConfig.EncodingType.DICTIONARY.name()); + override.set("indexes", + JsonUtils.newObjectNode().set("inverted", JsonUtils.newObjectNode().put("enabled", true))); + FieldConfig overridden = buildRawColumnWithRichConsumingOverride("colA", override); + + TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME) + .setTimeColumnName(TIME_COLUMN) + .setStreamConfigs(streamConfigs()) + .setFieldConfigList(List.of(overridden)) + .build(); + + JsonNode tcJson = tableConfig.toJsonNode(); + TableConfig roundTripped = JsonUtils.jsonNodeToObject(tcJson, TableConfig.class); + FieldConfig fc = roundTripped.getFieldConfigList().get(0); + JsonNode overrideAfter = fc.getConsumingOverride(); + assertTrue(overrideAfter.isObject() && !overrideAfter.isEmpty(), + "consumingOverride must survive a TableConfig JSON round-trip; got: " + overrideAfter); + assertEquals(overrideAfter.get("encodingType").asText(), "DICTIONARY"); + assertTrue(overrideAfter.path("indexes").path("inverted").path("enabled").asBoolean(false), + "inverted index entry must survive a TableConfig JSON round-trip"); + } + + @Test + public void fieldConfigSerdePreservesConsumingOverride() + throws Exception { + ObjectNode override = JsonUtils.newObjectNode(); + override.put("encodingType", FieldConfig.EncodingType.DICTIONARY.name()); + override.set("indexes", + JsonUtils.newObjectNode().set("inverted", JsonUtils.newObjectNode().put("enabled", true))); + FieldConfig fieldConfig = new FieldConfig.Builder("colA") + .withEncodingType(FieldConfig.EncodingType.RAW) + .withConsumingOverride(override) + .build(); + + String json = fieldConfig.toJsonString(); + FieldConfig roundTripped = JsonUtils.stringToObject(json, FieldConfig.class); + JsonNode consumingOverride = roundTripped.getConsumingOverride(); + assertEquals(consumingOverride.get("encodingType").asText(), "DICTIONARY"); + assertTrue(consumingOverride.path("indexes").path("inverted").path("enabled").asBoolean(false), + "inverted index entry must survive FieldConfig serde round-trip"); + } +} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java index 064526695d45..d3acc60f462d 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java @@ -76,6 +76,7 @@ public class FieldConfig extends BaseJsonConfig { private final List _indexTypes; private final JsonNode _indexes; private final JsonNode _tierOverwrites; + private final JsonNode _consumingOverride; private final CompressionCodec _compressionCodec; private final Map _properties; private final TimestampConfig _timestampConfig; @@ -83,19 +84,28 @@ public class FieldConfig extends BaseJsonConfig { @Deprecated public FieldConfig(String name, EncodingType encodingType, @Nullable IndexType indexType, @Nullable CompressionCodec compressionCodec, @Nullable Map properties) { - this(name, encodingType, indexType, null, compressionCodec, null, null, properties, null); + this(name, encodingType, indexType, null, compressionCodec, null, null, properties, null, null); } public FieldConfig(String name, EncodingType encodingType, @Nullable List indexTypes, @Nullable CompressionCodec compressionCodec, @Nullable Map properties) { - this(name, encodingType, null, indexTypes, compressionCodec, null, null, properties, null); + this(name, encodingType, null, indexTypes, compressionCodec, null, null, properties, null, null); } @Deprecated public FieldConfig(String name, EncodingType encodingType, @Nullable IndexType indexType, @Nullable List indexTypes, @Nullable CompressionCodec compressionCodec, @Nullable TimestampConfig timestampConfig, @Nullable Map properties) { - this(name, encodingType, indexType, indexTypes, compressionCodec, timestampConfig, null, properties, null); + this(name, encodingType, indexType, indexTypes, compressionCodec, timestampConfig, null, properties, null, null); + } + + @Deprecated + public FieldConfig(String name, EncodingType encodingType, @Nullable IndexType indexType, + @Nullable List indexTypes, @Nullable CompressionCodec compressionCodec, + @Nullable TimestampConfig timestampConfig, @Nullable JsonNode indexes, + @Nullable Map properties, @Nullable JsonNode tierOverwrites) { + this(name, encodingType, indexType, indexTypes, compressionCodec, timestampConfig, indexes, properties, + tierOverwrites, null); } @JsonCreator @@ -107,7 +117,8 @@ public FieldConfig(@JsonProperty(value = "name", required = true) String name, @JsonProperty(value = "timestampConfig") @Nullable TimestampConfig timestampConfig, @JsonProperty(value = "indexes") @Nullable JsonNode indexes, @JsonProperty(value = "properties") @Nullable Map properties, - @JsonProperty(value = "tierOverwrites") @Nullable JsonNode tierOverwrites) { + @JsonProperty(value = "tierOverwrites") @Nullable JsonNode tierOverwrites, + @JsonProperty(value = "consumingOverride") @Nullable JsonNode consumingOverride) { Preconditions.checkArgument(name != null, "'name' must be configured"); _name = name; _encodingType = encodingType == null ? EncodingType.DICTIONARY : encodingType; @@ -118,6 +129,7 @@ public FieldConfig(@JsonProperty(value = "name", required = true) String name, _properties = properties; _indexes = indexes == null ? NullNode.getInstance() : indexes; _tierOverwrites = tierOverwrites == null ? NullNode.getInstance() : tierOverwrites; + _consumingOverride = consumingOverride == null ? NullNode.getInstance() : consumingOverride; } // If null, we will create dictionary encoded forward index by default @@ -197,6 +209,23 @@ public JsonNode getTierOverwrites() { return _tierOverwrites; } + /// Returns optional per-column overrides applied only to the realtime mutable consuming segment. When set, the + /// supported fields in this [FieldConfig] are replaced by the corresponding fields in the override at + /// consuming-segment build time. The committed/immutable segment and all loaded immutable segments continue to + /// use the un-overridden table config — that is the persisted shape on disk. Typical use is to add a dictionary + /// or inverted index for fast filtering on the consuming segment while keeping the rest of the table in raw + /// encoding. + /// + /// Supported override keys: `encodingType` and `indexes`. Other top-level [FieldConfig] fields are + /// intentionally not overridable. Unknown keys are rejected at table-config validation time. + /// + /// Never returns `null`; absent override is represented as a `NullNode` (matching the contract of sibling + /// `JsonNode` accessors `getIndexes()` and `getTierOverwrites()`). Validation rejects non-object override values; + /// use `overrideJson.isObject() && !overrideJson.isEmpty()` to detect an object override. + public JsonNode getConsumingOverride() { + return _consumingOverride; + } + @Nullable public CompressionCodec getCompressionCodec() { return _compressionCodec; @@ -221,6 +250,7 @@ public static class Builder { private Map _properties; private TimestampConfig _timestampConfig; private JsonNode _tierOverwrites; + private JsonNode _consumingOverride; public Builder(String name) { _name = name; @@ -235,6 +265,7 @@ public Builder(FieldConfig other) { _properties = other._properties; _timestampConfig = other._timestampConfig; _tierOverwrites = other._tierOverwrites; + _consumingOverride = other._consumingOverride; } public Builder withIndexes(JsonNode indexes) { @@ -277,9 +308,14 @@ public Builder withTierOverwrites(JsonNode tierOverwrites) { return this; } + public Builder withConsumingOverride(JsonNode consumingOverride) { + _consumingOverride = consumingOverride; + return this; + } + public FieldConfig build() { return new FieldConfig(_name, _encodingType, null, _indexTypes, _compressionCodec, _timestampConfig, _indexes, - _properties, _tierOverwrites); + _properties, _tierOverwrites, _consumingOverride); } } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java index 7d214bf8d678..4efc5a0af06b 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java @@ -64,6 +64,7 @@ public class TableConfig extends BaseJsonConfig { public static final String TIER_CONFIGS_LIST_KEY = "tierConfigs"; public static final String TUNER_CONFIG_LIST_KEY = "tunerConfigs"; public static final String TIER_OVERWRITES_KEY = "tierOverwrites"; + public static final String CONSUMING_OVERRIDE_KEY = "consumingOverride"; public static final String TABLE_SAMPLERS_KEY = "tableSamplers"; public static final String DESCRIPTION_KEY = "description"; public static final String TAGS_KEY = "tags";