[Delta] Add Changelog support#6794
Conversation
| Configuration hadoopConf, | ||
| SQLConf sqlConf, | ||
| boolean isCDCRead) { | ||
| CdcReadMode cdcReadMode) { |
There was a problem hiding this comment.
Handling different cdcReadModes
Problem. Before this PR
createDeltaParquetReaderFactorytookboolean isCDCRead. Two callers passedtruefor completely different semantics:SparkMicroBatchStreamfor streaming CDC (PartitionUtils owns schema augmentation +CDCReadFunctionwrap), andDeltaChangelogBatchfor Auto-CDF (outerCDCPartitionReaderFactoryowns CDC tail injection). The method had to disambiguate by inspectingreadDataSchemafor a_metadatafield — coupling a CDC decision to a row-tracking schema check.Options.
- Keep
isCDCReadboolean, continue the_metadata-presence heuristic — fragile; breaks if row tracking ever appears without Auto-CDF.- Split into two factory methods — duplicates the shared DV / row-tracking / format wiring.
- Single factory with
CdcReadMode { NONE, STREAMING, BATCH_CHANGELOG }.Chosen. Option 3. Dispatch contract is explicit at the call site, shared wiring stays in one place, and a future
BATCH_CHANGELOG_NETCHANGESmode (post apache/spark#55637) drops in without changing call signatures.
Like in the comment below I'm a bit confused on how batch/streaming should live with each other. Advice is appreciated.
| // CDCPartitionReaderFactory injects the CDC tail columns as constants instead. | ||
| Optional<CDCSchemaContext> cdcSchemaContext = | ||
| isCDCRead | ||
| cdcReadMode.injectsCdcAtReaderLevel() |
There was a problem hiding this comment.
Streaming CDC and Auto-CDF coexistence
Problem. The streaming CDC entrypoint shipped in #6359 mutates
readDataSchemawith CDC tail columns (_change_type,_commit_version,_commit_timestamp) and wraps the reader withCDCReadFunctionto inject those columns per row. Auto-CDF (DeltaChangelogBatch) wraps its delegate withCDCPartitionReaderFactory, which injects the same columns as per-partition constants (one value per CDC file change). If both fire, the schema sees the CDC tail twice and projection blows up withGenericInternalRow cannot be cast to Long.Options.
- Tear down the OSS streaming-CDC path and route streaming through Auto-CDF — invasive, breaks streaming tests until
toMicroBatchStreamlands.- Heuristic: skip the inner wrap when
_metadatais inreadDataSchema— works incidentally; same fragility called out above.- Explicit
CdcReadModedispatch:injectsCdcAtReaderLevel()is true only forSTREAMING.Chosen. Option 3. Both pathways stay live; the connector picks the right wrapper from the mode at the call site. Streaming behavior is unchanged from #6359.
Tbh, I was a bit overwhelmed on how to handle the streaming co-existence with our batch approach. I'm not sure if the chosen path is the correct one.
There was a problem hiding this comment.
Following on the comment on CdcReadMode: Consider Streaming write-time CDF a completely unrelated feature.
It's not possible to read both read-time CDF and write-time CDF in the same query, so it's not possible for the problem you describe to occur.
The important part here will be to make the distinction between the two features really clear to avoid confusion. This can simply be renaming isCDCRead to isWriteTimeCDCRead.
DeltaChangelogBatch will call createDeltaParquetReaderFactory passing isWriteTimeCDCRead=false
There was a problem hiding this comment.
Done. Renamed; DeltaChangelogBatch passes isWriteTimeCDCRead=false. Javadoc updated to make the read-time vs write-time distinction explicit.
| * classes (SparkTable, DeltaChangelog, SnapshotManagerFactory, V2SchemaUtils) which | ||
| * the sparkV1 module that hosts [[AbstractDeltaCatalog]] cannot depend on. | ||
| */ | ||
| abstract class DeltaCatalogChangelogSupport extends AbstractDeltaCatalog { |
There was a problem hiding this comment.
Why the loadChangelog override lives here, not in AbstractDeltaCatalog
**Problem.** The 1:1 port placed `loadChangelog` directly inside `AbstractDeltaCatalog`. That override references sparkV2 types (`SparkTable`, `DeltaChangelog`, `SnapshotManagerFactory`, `V2SchemaUtils`) — but `AbstractDeltaCatalog` lives in sparkV1, and sparkV2 already depends on sparkV1Filtered → sparkV1 transitively. A `sparkV1 → sparkV2` edge would close the dependency cycle and the module graph stops being a DAG.Options.
- Move the sparkV2 changelog classes (
DeltaChangelog,SparkTable,SnapshotManagerFactory, ...) into sparkV1 — pulls Kernel-based reader code into the V1 module against the Hybrid pattern introduced in [Spark] Hybrid Delta connector combining V1(default) and V2 connectors #5726. But I think we want to keep changes related to DsV2 to in V2, right?- Reflection lookup from sparkV1 — avoids compile-time deps but hides API contracts and is invisible to IDE tooling.
- Implement the override one level higher, in
spark-unified(which already depends on both V1 and V2), via an intermediate class.Chosen. Option 3 — this class. It mirrors how
DeltaCatalog.javaalready routesloadCatalogTable/loadPathTablebetween V1 and V2 fromspark-unified. The only structural difference from the internal-source port is where the override is defined; the implementation logic is unchanged.
I'm not sure if I set myself to limitations that do not exist. When I see V1 vs V2 I always think of clear separation. Is that the case? Or could I simply have implemented stuff in the sparkV1 world?
| vr.endingBoundInclusive()) | ||
| case tr: TimestampRange => | ||
| val catalogTable = resolveDeltaCatalogTable(ident) | ||
| val deltaLog = DeltaLog.forTable(spark, catalogTable) |
There was a problem hiding this comment.
Direct access to the delta log at this level of abstraction is a no-go imo.
buildChangelog is also doing a bit too much. For now, what you need is really just exposing the table schema. Later on surfacing containsCarryoverRows / ... may turn to be trickier and require more state to be loaded, but for now all the pre processing is a bit much.
A different structure will be simpler and delegate work to a later stage: have DeltaChangelog wrap a SparkTable (similar to https://docs.google.com/document/d/1NpJ9Q4wvU-tp3r67RC8MKnm-0J4Y0n7mSOC6iiiBJNo/edit?tab=t.0#heading=h.2gjhdgzfrm6g).
I would also just store the changelogInfo in DeltaChangelog, without trying to pre-process it. Making ChangelogSupport extension on the catalog really straightforward
We only support reading CDC here on DSv2 tables. That means SparkTable for Delta. You should reject reads on DeltaTableV2 below (which is a V1 table really, don't believe its name)
There was a problem hiding this comment.
Rejecting DeltaTableV2 reads with new Error DELTA_CHANGELOG_REQUIRES_V2_TABLE. Removed the direct DeltaLog access. Wrapping the SparkTable by deferring to the read path (DeltaChangelogScanBuilder.build and DeltaChangelogBatch.planInputPartitions)
| val engine = DefaultEngine.create(deltaLog.newDeltaHadoopConf()) | ||
| val snapshotManager = | ||
| SnapshotManagerFactory.create(tablePath, engine, Optional.of(catalogTable)) | ||
|
|
||
| // The latest snapshot is needed to default the ending version when the user did not | ||
| // specify one, and to surface a clear "start > latest" error before issuing a snapshot | ||
| // load that would otherwise fail with a low-level kernel error. | ||
| val latestVersion = snapshotManager.loadLatestSnapshot().getVersion() |
There was a problem hiding this comment.
You'll want to use the snapshotManager interface to resolve timestamps -> versions also, which is currently done by directly calling in DeltaLog.
E.g. startVersion probably through getActiveCommitAtTime.
Main point: you shouldn't create a snapshotManager yourself, and rely on the resolved SparkTable to do this. I'm not sure how well time-travel is implemented at this stage, so you may need to extend it.
E.g. we may want to start by resolving the SparkTable directly at startVersion / startTimestamp by calling loadTable(ident, version/timestamp)
There was a problem hiding this comment.
Currently no SnapshotManager is constructed inside the trait or DeltaChangelog. But I'm not using your approach through my own SnapshotManager (yet). I haven't yet switched to resolving the SparkTable directly at startVersion / startTimestamp via loadTable(ident, version/timestamp). The SparkTable resolved by the catalog gives us access to the snapshot manager for arbitrary version / timestamp lookups, so pinning the table to startVersion didn't seem necessary. I'll take a look at this on Monday.
| // CDCPartitionReaderFactory injects the CDC tail columns as constants instead. | ||
| Optional<CDCSchemaContext> cdcSchemaContext = | ||
| isCDCRead | ||
| cdcReadMode.injectsCdcAtReaderLevel() |
There was a problem hiding this comment.
Following on the comment on CdcReadMode: Consider Streaming write-time CDF a completely unrelated feature.
It's not possible to read both read-time CDF and write-time CDF in the same query, so it's not possible for the problem you describe to occur.
The important part here will be to make the distinction between the two features really clear to avoid confusion. This can simply be renaming isCDCRead to isWriteTimeCDCRead.
DeltaChangelogBatch will call createDeltaParquetReaderFactory passing isWriteTimeCDCRead=false
Please read my added comments in the conversation, they explain differences to the path I'd have expected to cleanly work.
Which Delta project/connector is this regarding?
Description
Guarded behing a feature flag, this PR Implements B.2 of the CDC SPIP, so the
Changeloginterface. Furthermore, it adds catalog-driven Auto-CDF (TableCatalog.loadChangelog) for the DSv2 connector, so the kernel-based V2 reader stack answersSELECT * FROM t CHANGES FROM VERSION/TIMESTAMP ...batch queries introduced by SPARK-55668 (apache/spark#55508).The streaming CDC entrypoint shipped in #6359 is preserved unchanged and coexists with the new batch path through an explicit
CdcReadModeflag.What the PR adds
DeltaCatalogChangelogSupport(new, inspark-unified) — abstract Scala class betweenAbstractDeltaCatalog(sparkV1) and the hybridDeltaCatalog(spark-unified). OverridesloadChangelog(ident, changelogInfo)and dispatches on theChangelogRangesubtype (version / timestamp / unbounded). Resolves the table throughloadTable(so it works for bothSparkTableandDeltaTableV2), loads the latest Kernel snapshot viaSnapshotManagerFactory, validates row tracking is enabled, applies bounds-inclusivity adjustments, and returns aDeltaChangelog.The class lives in
spark-unifiedbecause the implementation references sparkV2 classes (SparkTable,DeltaChangelog,SnapshotManagerFactory,V2SchemaUtils) andsparkV1cannot depend onsparkV2.DeltaCatalog.javanow extendsDeltaCatalogChangelogSupport.CdcReadMode(new enum,sparkV2) — replaces the priorboolean isCDCReadonPartitionUtils.createDeltaParquetReaderFactory:NONE— non-CDC scan (SparkBatch).STREAMING— opt-in streaming CDC (SparkMicroBatchStreamwhenreadChangeFeed = true).PartitionUtilsowns the CDC schema augmentation andCDCReadFunctionwrap, as before.BATCH_CHANGELOG— Auto-CDF (DeltaChangelogBatch).PartitionUtilsleaves schema and reader untouched;CDCPartitionReaderFactoryinDeltaChangelogBatchinjects_change_type/_commit_version/_commit_timestampas per-partition constants instead. This avoids the double-injection / schema-vs-reader misalignment that the previous sharedisCDCRead=truepath caused.Feature flag
DELTA_CHANGELOG_V2_ENABLED(changelogV2.enabled, internal, defaultfalse). When disabled,DeltaCatalogChangelogSupport.loadChangelogdelegates tosuper.loadChangelog, which surfaces the familiarUNSUPPORTED_FEATURE.CHANGE_DATA_CAPTUREerror. Lets the implementation land without changing user-visible behavior until tests catch up.Per-commit ordering in
DeltaChangelogBatch.planInputPartitions— emits allRemoveFilepartitions beforeAddFilepartitions within a single commit, so Spark's batch CDC post-processor (ResolveChangelogTable) sees preimage → postimage pairs regardless of the action order in the on-disk commit log. The Delta protocol does not contract action order; pinning the partition order here makes the test expectations stable across protocol implementations.Why this is split this way
The DSv2 Changelog interface (apache/spark#55426) covers both batch and streaming, but the two enter Delta through different paths today:
option("readChangeFeed", "true")→SparkMicroBatchStream→
PartitionUtilswithCdcReadMode.STREAMING. Existing surface from [kernel-spark][Part 4] CDC data reading: ReadFunc decorator, schema coordination, and reader factory wiring #6359; preserved by this PR.CHANGES FROM(SQL or DataFrame) → analyzer (SPARK-56686, SPARK-56687 / apache/spark#55637)→
TableCatalog.loadChangelog→DeltaCatalog→DeltaCatalogChangelogSupport.loadChangelog→DeltaChangelog→DeltaChangelogScan→DeltaChangelogBatch→PartitionUtilswithCdcReadMode.BATCH_CHANGELOG.The follow-up
DeltaChangelogScan.toMicroBatchStream()(so the catalog-driven Changelog also drives streaming reads, completing the surface that apache/spark#55637 builds on fordeduplicationMode = netChanges) is left as aTODOinDeltaChangelogScanfor a follow-up PR.Limitations / known follow-ups
requires row trackinganalysis error otherwise).UnboundedRangeis rejected withDELTA_CHANGELOG_UNBOUNDED_RANGE; Auto-CDF always operates over a bounded range.DeltaChangelogScandoes not yet implementtoMicroBatchStream()— see inlineTODO.DeltaChangelogBatch.planInputPartitionsstill goes throughStreamingHelper.getCommitActionsFromRangeUnsafe(markedTODOin the file). The helper is generic — only the class name is streaming-flavored. A separate rename / extract pass would be good.Build dependency
The new code uses the
Changelog/ChangelogInfo/ChangelogRangeinterfaces added in Spark 4.2 (apache/spark#55426). This PR is tested by cherry picking changes from #6657, which re-enables the Spark 4.2 snapshot cross-build row.How was this patch tested?
New tests (17 total, all green):
DeltaChangelogDirectBatchExecutionTest— exercises theDeltaChangelogScan→Batch→PartitionReaderpath directly, without going through SQL. Covers: initial insert + delete with paired DELETE/INSERT output, UPDATE-as-CoW producing paired preimage/postimage rows at the same commit, range slicing on non-zero start, and rowId / rowVersion field-reference contract.DeltaChangelogCatalogIntegrationTest— exercises the catalog-routed entrypoint (TableCatalog.loadChangelog) over both SQLCHANGES FROMand the DataFrame API. Covers all fourChangelogRangeshapes (version range, timestamp range, open-ended, exclusive bounds), boundary inclusivity, and the failure modes (timestamp before earliest commit, timestamp after latest commit, empty exclusive range, unbounded rejection).Pre-existing suites still pass:
SparkMicroBatchStreamCDCTest— confirms the streaming CDC entrypoint introduced in [kernel-spark][Part 4] CDC data reading: ReadFunc decorator, schema coordination, and reader factory wiring #6359 is unaffected by theCdcReadModerefactor.PartitionUtilsTest— updated to passCdcReadModeinstead ofboolean.Build / test command used locally:
Does this PR introduce any user-facing changes?
No, not while the feature flag is at its default.
With
changelogV2.enabled = true(internal flag, defaultfalse):SELECT * FROM <table> CHANGES FROM VERSION/TIMESTAMP ...and the DataFrame.changes(...)builder become functional on row-tracking-enabled Delta tables via the V2 connector. Previously the V2 catalog rejected these queries withUNSUPPORTED_FEATURE.CHANGE_DATA_CAPTURE.