Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ public enum ServerMeter implements AbstractMetrics.Meter {
SEGMENT_DOWNLOAD_FROM_REMOTE_FAILURES("segments", false),
SEGMENT_DOWNLOAD_FROM_PEERS_FAILURES("segments", false),
SEGMENT_BUILD_FAILURE("segments", false),
/// Counts consuming-segment-build events where `FieldConfig.consumingOverride` merge failed and the
/// server fell back to the persisted shape. A non-zero value means at least one consuming segment is running
/// with the wrong (un-overridden) in-memory shape — operators should inspect server logs and fix the override
/// config. Emitted per-table via `addMeteredTableValue`.
CONSUMING_OVERRIDE_FALLBACK("segments", false),
SEGMENT_UPLOAD_FAILURE("segments", false),
SEGMENT_UPLOAD_SUCCESS("segments", false),
// Emitted only by Server to Deep-store segment uploader.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager;
import org.apache.pinot.segment.local.upsert.UpsertContext;
import org.apache.pinot.segment.local.utils.IngestionUtils;
import org.apache.pinot.segment.local.utils.TableConfigUtils;
import org.apache.pinot.segment.spi.MutableSegment;
import org.apache.pinot.segment.spi.V1Constants;
import org.apache.pinot.segment.spi.creator.SegmentVersion;
Expand Down Expand Up @@ -1821,9 +1822,19 @@ public RealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableConf
_isOffHeap = indexLoadingConfig.isRealtimeOffHeapAllocation();
_defaultNullHandlingEnabled = indexingConfig.isNullHandlingEnabled();

// Start new realtime segment
// Start new realtime segment.
// Default path: build from indexLoadingConfig so tier overwrites and any instance-level mutations on
// IndexLoadingConfig flow through unchanged. When a FieldConfig.consumingOverride is configured, switch to a
// Builder constructed from the consuming-override-applied TableConfig so the mutable consuming segment can have
// a different (typically richer) index shape than what is persisted on disk — e.g. the table is RAW for
// storage efficiency but the consuming segment uses dictionary + inverted index for fast filtering. The
// committed/immutable segment ignores the override and uses the persisted table-config shape.
String consumerDir = realtimeTableDataManager.getConsumerDir();
RealtimeSegmentConfig.Builder realtimeSegmentConfigBuilder = new RealtimeSegmentConfig.Builder(indexLoadingConfig)
RealtimeSegmentConfig.Builder realtimeSegmentConfigBuilder =
TableConfigUtils.buildConsumingSegmentConfigBuilder(_tableConfig, _schema, indexLoadingConfig, _segmentLogger,
() -> _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.CONSUMING_OVERRIDE_FALLBACK,
1L));
realtimeSegmentConfigBuilder
.setTableNameWithType(_tableNameWithType)
.setSegmentName(_segmentNameStr)
.setStreamName(streamTopic)
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.pinot.integration.tests.BaseClusterIntegrationTest;
import org.apache.pinot.integration.tests.ClusterIntegrationTestUtils;
import org.apache.pinot.plugin.stream.kafka.KafkaStreamConfigProperties;
import org.apache.pinot.server.starter.helix.BaseServerStarter;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.Schema;
Expand Down Expand Up @@ -340,6 +341,13 @@ protected String getSharedKafkaBrokerList() {
return _sharedClusterTestSuite.getKafkaBrokerList();
}

/// Returns the live server starters from the shared suite instance. Tests that need to inspect server-side state
/// (e.g. walk TableDataManager, acquire segments) must use this rather than `_serverStarters` on `this`, because in
/// suite mode only the shared instance has the populated list.
protected List<BaseServerStarter> getSharedServerStarters() {
return _sharedClusterTestSuite._serverStarters;
}

/**
* Creates a Kafka topic on the shared suite's Kafka cluster.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.pinot.segment.local.segment.creator.TransformPipeline;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.local.utils.IngestionUtils;
import org.apache.pinot.segment.local.utils.TableConfigUtils;
import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory;
import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
Expand Down Expand Up @@ -180,9 +181,16 @@ public StatelessRealtimeSegmentWriter(SegmentZKMetadata segmentZKMetadata, Index
File statsHistoryFile = new File(tableDataDir, SEGMENT_STATS_FILE_NAME);
RealtimeSegmentStatsHistory statsHistory = RealtimeSegmentStatsHistory.deserializeFrom(statsHistoryFile);

// Initialize mutable segment with configurations
// Initialize mutable segment with configurations via the dispatch helper. Helper falls back to the
// indexLoadingConfig-driven Builder when no FieldConfig.consumingOverride is present (default), and switches
// to a TableConfig-driven Builder when one is — so a column can have a richer in-memory index shape (e.g.
// dictionary + inverted) than what is persisted.
IngestionConfig ingestionConfig = _tableConfig.getIngestionConfig();
RealtimeSegmentConfig.Builder realtimeSegmentConfigBuilder = new RealtimeSegmentConfig.Builder(indexLoadingConfig)
/// StatelessRealtimeSegmentWriter is a one-shot tool (segment re-ingestion) and has no per-instance metrics
/// pipeline; pass null for the fallback callback. The error log is sufficient for this offline use.
RealtimeSegmentConfig.Builder realtimeSegmentConfigBuilder =
TableConfigUtils.buildConsumingSegmentConfigBuilder(_tableConfig, _schema, indexLoadingConfig, _logger, null);
realtimeSegmentConfigBuilder
.setTableNameWithType(_tableNameWithType)
.setSegmentName(_segmentName)
.setStreamName(_streamConfig.getTopicName())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -544,9 +544,7 @@ protected void writeMetadata()
CommonsConfigurationUtils.saveToFile(properties, metadataFile);
}

/**
* Adds column metadata information to the properties configuration.
*/
/// Adds column metadata information to the properties configuration.
public static void addColumnMetadataInfo(PropertiesConfiguration properties, String column,
ColumnStatistics columnStatistics, int totalDocs, FieldSpec fieldSpec, boolean hasDictionary,
int dictionaryElementSize, boolean autoGenerated) {
Expand Down
Loading