Skip to content

Add FieldConfig.consumingOverride for richer in-memory shape on realtime consuming segments#18406

Open
xiangfu0 wants to merge 9 commits intoapache:masterfrom
xiangfu0:worktree-realtime-segment-column-override
Open

Add FieldConfig.consumingOverride for richer in-memory shape on realtime consuming segments#18406
xiangfu0 wants to merge 9 commits intoapache:masterfrom
xiangfu0:worktree-realtime-segment-column-override

Conversation

@xiangfu0
Copy link
Copy Markdown
Contributor

@xiangfu0 xiangfu0 commented May 3, 2026

Summary

Adds an opt-in per-column override (FieldConfig.consumingOverride) that lets a realtime table column have a richer in-memory shape on the mutable consuming segment than what is persisted on disk.

Typical use case: keep the table in raw encoding (smaller storage, no dictionary on disk) while giving the consuming segment a dictionary + inverted index for fast filtering during the consumption window. The committed/immutable segment uses the un-overridden table config — the override never leaks to disk.

Example

{
  \"fieldConfigList\": [{
    \"name\": \"userId\",
    \"encodingType\": \"RAW\",
    \"consumingOverride\": {
      \"encodingType\": \"DICTIONARY\",
      \"indexes\": { \"inverted\": { \"enabled\": true } }
    }
  }],
  \"tableIndexConfig\": {
    \"noDictionaryColumns\": [\"userId\"]
  }
}

Scope (intentionally narrow)

The allowlist is just encoding + indexing: encodingType and indexes (the typed JSON-tree per-index config — going forward this is the canonical API; the legacy flat indexTypes list is deprecated for new features).

Everything else is rejected at validate time with an explicit error:

Override key Status Why
encodingType ✅ Allowed High-level RAW vs DICTIONARY lever
indexes ✅ Allowed Modern typed per-index JSON config
indexTypes, indexType ❌ Rejected Legacy flat list — superseded by indexes
compressionCodec ❌ Rejected Only meaningful for raw on-disk forward indexes
properties, timestampConfig ❌ Rejected Influence cross-subsystem state (TransformPipeline, aggregation, timestamp index) that does NOT consult the consuming-override view
Unknown keys / typos ❌ Rejected Surfaces misconfiguration up front

Validation also rejects override on non-realtime tables, sorted columns, and partition columns.

Architecture

  • SPI: FieldConfig.consumingOverride is a @Nullable JsonNode. Backward-compatible — older clients without this field deserialize cleanly via @JsonIgnoreProperties(ignoreUnknown=true) on BaseJsonConfig. The previous 9-arg @JsonCreator constructor is preserved as a @Deprecated overload for source/binary compatibility.

  • Single dispatch point: TableConfigUtils.buildConsumingSegmentConfigBuilder is used by both RealtimeSegmentDataManager and StatelessRealtimeSegmentWriter.

    • No override (default): returns the existing IndexLoadingConfig-driven Builder unchanged — zero behavior change for tables that don't opt in.
    • With override: routes the override-applied TableConfig back through a new IndexLoadingConfig (with a defensive schema clone) so instance config, tier overlay, and schema-derived timestamp index materialization all flow through unchanged.
    • On RuntimeException (a misconfiguration that slipped past validate): logs and falls back to the persisted shape so consumption keeps making forward progress.
  • Merge utility: TableConfigUtils.applyConsumingOverrides deep-copies the JSON tree before mutating to avoid aliasing into the cached TableConfig (other server threads may be reading it concurrently).

  • Validation: shared enforceConsumingOverrideInvariants helper used by both validate-time and apply-time paths so the rules can't drift. Merged shape is re-validated with diagnostics that point back to the originating consumingOverride.

  • Scrub policy: deny-list — any per-column string-array in tableIndexConfig (invertedIndexColumns, noDictionaryColumns, etc.) is scrubbed for overridden columns. Object-keyed maps and structurally-tied collections (sortedColumn, starTreeIndexConfigs, tierOverwrites) are intentionally left alone to avoid over-scrubbing feature configs whose keys could collide with column names.

Mixed-version safety

  • Older server reading a controller-written consumingOverride it doesn't understand: silently drops the unknown field (Jackson default) and behaves as if no override exists. The override only takes effect once the server is upgraded.
  • Older controller writing a TableConfig: cannot produce the new field, so newer servers see no override. Default behavior preserved.

Plugin teams: the SPI change is additive — please re-test against the new pinot-spi.jar if you build FieldConfig objects directly via positional constructor calls (the previous 9-arg signature is preserved as @Deprecated).

Test plan

  • 23 unit tests in TableConfigConsumingOverrideTest:
    • merge semantics (encoding replacement, scrub of legacy per-column lists)
    • input-mutation guard (cached TableConfig byte-for-byte unchanged after merge)
    • idempotence
    • validation rules (REALTIME-only, sorted-column rejection, partition-column rejection, allowlist enforcement at both validate-time and apply-time)
    • dispatch helper (no-override → IndexLoadingConfig path, valid override → merged path, fallback on failure)
    • JSON round-trip preservation
    • legacy FieldConfig JSON without consumingOverride still deserializes
  • End-to-end converter test (RealtimeSegmentConverterTest.testConsumingOverrideKeepsRawShapeOnImmutableSegment):
    • asserts the consuming MutableSegmentImpl has dictionary + inverted index in memory
    • asserts the committed immutable segment retains the persisted RAW shape with no inverted index
  • Integration test (ConsumingOverrideRealtimeTest) — full Kafka → consuming → forceCommit → immutable lifecycle:
    • Pre-commit: walks each server's TableDataManager, every consuming segment is MutableSegmentImpl, STRING_COLUMN has both dictionary and inverted index in memory, control column keeps default DICTIONARY shape.
    • Force-commit + wait: triggers forceCommit via admin REST and waits for ExternalView ONLINE.
    • Post-commit: every ImmutableSegmentImpl on every server has STRING_COLUMN with null dictionary AND null inverted index in memory, AND hasDictionary=false on the on-disk ColumnMetadata. Cross-checks both views so a loader can't silently re-create the index. Control column still dictionary-encoded.
    • Query parity: same COUNT(*) filter returns identical results before/after commit.
  • No regression in TableConfigUtilsTest (56), IndexLoadingConfigTest (3), BaseTableDataManagerTest (25), RealtimeSegmentConverterTest (161 total).
  • Spotless / Checkstyle / License all pass.

🤖 Generated with Claude Code

…ents

Adds an opt-in per-column override that lets a realtime table column have
a richer in-memory shape on the mutable consuming segment than what is
persisted on disk. Typical use: keep the table in raw encoding for storage
efficiency while giving the consuming segment a dictionary + inverted index
for fast filtering during the consumption window. The committed/immutable
segment and all loaded immutable segments use the un-overridden table
config — the override never leaks to disk.

Architecture:
- FieldConfig.consumingOverride is an optional JsonNode mirroring a partial
  FieldConfig (encodingType, indexTypes, indexes, compressionCodec).
- TableConfigUtils.buildConsumingSegmentConfigBuilder is the single
  dispatch point used by both RealtimeSegmentDataManager and
  StatelessRealtimeSegmentWriter. With no override it returns the existing
  IndexLoadingConfig-driven Builder unchanged. With an override it routes
  the merged TableConfig back through a new IndexLoadingConfig (with a
  defensive schema clone) preserving instance config, tier overlay and
  schema-derived timestamp index materialization. On RuntimeException it
  logs and falls back to the persisted shape so consumption keeps making
  forward progress.
- TableConfigUtils.applyConsumingOverrides deep-copies the JsonNode tree
  before mutating to avoid aliasing into the cached TableConfig.
- Validation: shared enforceConsumingOverrideInvariants helper used by
  both validate-time and apply-time paths so the rules can't drift.
  Rejects non-realtime tables, sorted columns, partition columns, and
  unknown override keys (allowlist: encodingType, indexTypes, indexes,
  compressionCodec). Merged shape is re-validated with diagnostics that
  point back to the consumingOverride.
- Scrub policy is a deny-list: any per-column string-array in
  tableIndexConfig is scrubbed for the overridden columns; object-keyed
  maps and structurally-tied collections (sortedColumn,
  starTreeIndexConfigs, tierOverwrites) are intentionally left alone to
  avoid over-scrubbing feature configs whose keys could collide with
  column names.

Tests: 23 unit tests in TableConfigConsumingOverrideTest cover merge
semantics, validation rules, JSON round-trip, idempotence, fallback path,
and input-mutation guards. End-to-end converter test verifies the
immutable segment retains the persisted (RAW) shape after commit even
when the consuming segment was built with a richer (DICTIONARY+INVERTED)
override.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@codecov-commenter
Copy link
Copy Markdown

codecov-commenter commented May 3, 2026

Codecov Report

❌ Patch coverage is 72.48677% with 52 lines in your changes missing coverage. Please review.
✅ Project coverage is 63.67%. Comparing base (80fb0e8) to head (393bc62).
⚠️ Report is 27 commits behind head on master.

Files with missing lines Patch % Lines
...he/pinot/segment/local/utils/TableConfigUtils.java 71.76% 25 Missing and 23 partials ⚠️
...ealtime/writer/StatelessRealtimeSegmentWriter.java 0.00% 3 Missing ⚠️
...a/manager/realtime/RealtimeSegmentDataManager.java 75.00% 1 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #18406      +/-   ##
============================================
+ Coverage     63.46%   63.67%   +0.20%     
- Complexity     1701     1735      +34     
============================================
  Files          3254     3254              
  Lines        199104   199627     +523     
  Branches      30830    31017     +187     
============================================
+ Hits         126354   127105     +751     
+ Misses        62665    62362     -303     
- Partials      10085    10160      +75     
Flag Coverage Δ
custom-integration1 100.00% <ø> (ø)
integration 100.00% <ø> (ø)
integration1 100.00% <ø> (ø)
integration2 0.00% <ø> (ø)
java-21 63.67% <72.48%> (+0.20%) ⬆️
temurin 63.67% <72.48%> (+0.20%) ⬆️
unittests 63.66% <72.48%> (+0.20%) ⬆️
unittests1 55.65% <13.75%> (+0.23%) ⬆️
unittests2 35.00% <72.48%> (+0.02%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@xiangfu0 xiangfu0 added the real-time Related to realtime table ingestion and serving label May 4, 2026
xiangfu0 and others added 3 commits May 3, 2026 22:09
Drives a realtime table with consumingOverride active on STRING_COLUMN
(persisted RAW + no inverted index, consuming DICTIONARY + INVERTED) and
performs comprehensive checks against the actual Pinot cluster:

- Pre-commit: walks each server's TableDataManager, asserts every consuming
  segment is a MutableSegmentImpl, and verifies via the in-memory DataSource
  that STRING_COLUMN has both a dictionary (getDictionary != null) and an
  inverted index (getInvertedIndex != null) — proving the override is
  active on the consuming side. Also confirms the control column
  (STRING_COLUMN_NO_OVERRIDE) keeps its default DICTIONARY shape so the
  override scrub isn't over-aggressive.

- Force-commit via the admin REST API and wait for the ExternalView to
  reflect ONLINE segments (sealed immutable replacements).

- Post-commit: walks each server again, finds every ImmutableSegmentImpl,
  asserts BOTH the in-memory DataSource view (getDictionary == null,
  getInvertedIndex == null) AND the on-disk ColumnMetadata view
  (hasDictionary == false) — proving the persisted shape on disk matches
  the un-overridden table config and the override never leaks to disk.
  The control column remains DICTIONARY-encoded.

- Query parity: the same SELECT COUNT(*) WHERE STRING_COLUMN = 'alpha'
  returns identical results before and after commit, so the override
  changes only the in-memory index — not query semantics.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Drops `indexes` and `compressionCodec` from the consumingOverride
allowlist. Encoding type and index types are sufficient for the documented
use case (e.g. RAW persisted, DICTIONARY + INVERTED on consuming) and
keep the SPI surface narrow:

- `indexes` is the per-index typed JSON config; broader than what the
  consuming segment can safely diverge on.
- `compressionCodec` only meaningfully shapes raw on-disk forward indexes;
  the consuming segment's in-memory layout doesn't honor it the same way.

Tests: replaced applyConsumingOverridesAppliesCompressionCodec and
applyConsumingOverridesReplacesIndexesSubtreeWholesale with their reject
counterparts (validateRejectsCompressionCodecKeyOutsideAllowlist,
validateRejectsIndexesKeyOutsideAllowlist) so misconfigurations surface
loudly at validate time. All other tests already use only encodingType +
indexTypes and pass unchanged.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The consumingOverride allowlist now permits `encodingType` and `indexes`
(the modern typed JSON-tree per-index config) instead of `indexTypes` (the
legacy flat list). Going forward all per-index configuration in Pinot is
driven by the typed `indexes` JSON tree; the flat `indexTypes` list is
deprecated for new features.

Override JSON before:
  { "encodingType": "DICTIONARY", "indexTypes": ["INVERTED"] }

Override JSON now:
  { "encodingType": "DICTIONARY", "indexes": { "inverted": { "enabled": true } } }

Also strengthened the converter test (testConsumingOverrideKeepsRawShape...)
with explicit consuming-side assertions on getDictionary() / getInvertedIndex()
so a future regression that breaks the in-memory shape is caught at unit-test
time rather than only at integration-test time.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@xiangfu0 xiangfu0 changed the title Add FieldConfig.consumingOverride for realtime mutable consuming segments Add FieldConfig.consumingOverride for richer in-memory shape on realtime consuming segments May 4, 2026
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds an opt-in FieldConfig.consumingOverride for realtime tables so mutable consuming segments can use a different in-memory encoding/index shape than the persisted immutable segment shape. It fits into Pinot’s realtime ingestion path by letting servers build richer consuming-segment indexes without changing what gets committed to disk.

Changes:

  • Adds FieldConfig.consumingOverride support in SPI/config classes and exposes a getter/builder path for the new JSON field.
  • Introduces consuming-override merge/validation logic in TableConfigUtils and routes realtime mutable-segment construction through that logic from both consuming writers.
  • Adds unit, converter, and integration coverage for consuming-segment-only dictionary/inverted-index overrides.

Reviewed changes

Copilot reviewed 8 out of 8 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java Adds a JSON key constant used by consuming-override merge logic.
pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java Extends FieldConfig with the new consumingOverride field, constructor arg, getter, and builder support.
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java Implements override detection, validation, merge/scrub behavior, and the new consuming-segment builder dispatch.
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java Switches realtime consuming-segment creation to the new override-aware builder helper.
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/writer/StatelessRealtimeSegmentWriter.java Applies the same override-aware builder path for stateless realtime segment writing.
pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigConsumingOverrideTest.java Adds focused unit tests for merge behavior, validation rules, serde, and fallback behavior.
pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverterTest.java Verifies the override affects the mutable consuming segment but not the committed immutable segment.
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ConsumingOverrideRealtimeTest.java Adds end-to-end realtime coverage across consume, force-commit, immutable load, and query parity.

xiangfu0 and others added 4 commits May 4, 2026 02:05
- Add ServerMeter.CONSUMING_OVERRIDE_FALLBACK; bump it whenever
  buildConsumingSegmentConfigBuilder catches a merge failure and falls
  back to the persisted shape so the silent-degradation case is
  observable through metrics, not just per-segment logs.
- Document the IndexLoadingConfig contract on
  buildConsumingSegmentConfigBuilder: only instanceDataManagerConfig
  and segmentTier are read on the override path; other in-place
  IndexLoadingConfig mutations made by the caller are not preserved.
- Integration test: drop the brittle assertNull(getDictionary()) /
  assertNull(getInvertedIndex()) on the immutable-segment DataSource
  view and add a SegmentDirectory.hasIndexFor check instead, mirroring
  the converter test pattern. The on-disk reader proves the file is
  physically absent rather than relying on lifecycle-dependent
  in-memory state.
- Integration test: replace the integer-division per-replica check
  with an exact NUM_DOCS * numReplicas equality so silent drift
  between replicas (e.g. one double-consumes, one drops) cannot
  silently average out.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- Move metric emission out of TableConfigUtils — buildConsumingSegmentConfigBuilder
  now takes a Runnable onFallback callback, so the per-table metric is emitted
  by the call site (RealtimeSegmentDataManager) where it belongs. Keeps
  TableConfigUtils role-agnostic. StatelessRealtimeSegmentWriter passes null
  (one-shot offline tool, error log is enough).
- Flip ServerMeter.CONSUMING_OVERRIDE_FALLBACK to global=false to match every
  other per-table failure meter in the section (the meter is emitted via
  addMeteredTableValue per table). Avoids dashboard/alerting drift.
- Document the schema-clone asymmetry on buildConsumingSegmentConfigBuilder:
  the override path clones because TimestampIndexUtils.applyTimestampIndex
  mutates its inputs, and the override path constructs IndexLoadingConfig
  per-segment-build (vs the non-override path which constructs it once at
  table-data-manager-load time). Includes a TODO to fix the underlying
  TimestampIndexUtils mutation so the clone can be removed.
- Document the fallback recovery contract: a failed override merge produces
  a consuming segment matching the persisted shape for its full lifetime —
  no auto-retry; operators must reload the table after fixing the override.
- Strengthen the integration test: also assert the new consuming segments
  that come up after forceCommit have the override applied (guards against
  a regression where the override is honored only on first segment creation).
- Update the converter unit test to use buildConsumingSegmentConfigBuilder
  instead of constructing the Builder directly — the unit test now exercises
  the same dispatch path as production.
- Add an assertion in the fallback unit test that the onFallback callback
  is invoked exactly once.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
… nits

- Normalize FieldConfig._consumingOverride to NullNode.getInstance() to
  match the contract of sibling JsonNode accessors getIndexes() /
  getTierOverwrites(); drop the @nullable getter contract that was
  inconsistent with the rest of the class. Update the legacy-JSON test
  to assert NullNode rather than null.
- Preserve segmentVersion + readMode from the original IndexLoadingConfig
  on the consuming-override path so common caller-side mutations are
  carried across (was only carrying segmentTier before).
- Fix CONSUMING_OVERRIDE_FALLBACK ServerMeter doc and unit: emission
  is per-event (per consuming-segment build), not per-table; doc updated
  to match.
- Reword the StatelessRealtimeSegmentWriter comment to match the new
  single-dispatch architecture.
- Use AtomicInteger instead of int[] sentinel in the fallback unit test.
- Strengthen the deny-list scrub comment in TableConfigUtils to be
  explicit about the contract for future IndexingConfig fields.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
… log

- Drop the consumingIlc.setSegmentVersion / setReadMode calls. Both are
  already derived by the IndexLoadingConfig constructor from the same
  inputs (instanceDataManagerConfig + indexingConfig), so the explicit
  copies were redundant. Also: setSegmentVersion is annotated "For tests
  only" in IndexLoadingConfig — production code shouldn't depend on it.
- Include a per-column override snippet in the fallback ERROR log so
  operators reading the log can triage without fetching the table config
  from ZK separately. The metric (CONSUMING_OVERRIDE_FALLBACK) remains
  the alerting hook; this log is the diagnostic.
- Switch CONSUMING_OVERRIDE_FALLBACK metric unit from "events" to
  "segments" for consistency with sibling per-segment failure meters in
  the same block (SEGMENT_BUILD_FAILURE etc.).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Copy link
Copy Markdown
Contributor Author

@xiangfu0 xiangfu0 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Found a couple of high-signal config-safety issues; see inline comments.

@xiangfu0 xiangfu0 force-pushed the worktree-realtime-segment-column-override branch from 92ff5f9 to 393bc62 Compare May 8, 2026 01:31
…arters

In suite mode (CustomClusterIntegrationTest TestNG suite), only the
first test instance to run @BeforeSuite populates _serverStarters; every
subsequent test instance has an empty list. The test was inspecting its
own _serverStarters and silently finding 0 segments — the
"consumingSegmentsInspected > 0" assertion would then fire on any run
where ConsumingOverrideRealtimeTest is not the first suite member.

- Add CustomDataQueryClusterIntegrationTest.getSharedServerStarters()
  helper alongside the existing getSharedHelixResourceManager / etc.
  pattern. Returns _sharedClusterTestSuite._serverStarters.
- Switch ConsumingOverrideRealtimeTest's two inspection loops to use the
  helper. Java's protected-access rule blocks reaching into a sibling
  subclass's _serverStarters via static reference, so the helper method
  on the common parent (which has access via 'this') is required.

Test passes both standalone and (by construction) when running second
in the custom-cluster suite.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@xiangfu0 xiangfu0 force-pushed the worktree-realtime-segment-column-override branch from 393bc62 to 3fe03d0 Compare May 8, 2026 05:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

real-time Related to realtime table ingestion and serving

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants