Skip to content

Commit 1f08192

Browse files
committed
fixes
1 parent 47c254b commit 1f08192

3 files changed

Lines changed: 202 additions & 84 deletions

File tree

crates/iceberg/src/scan/context.rs

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -242,12 +242,19 @@ impl PlanContext {
242242
// TODO: Ideally we could ditch this intermediate Vec as we return an iterator.
243243
let mut filtered_mfcs = vec![];
244244
for manifest_file in manifest_files {
245-
// For incremental scans, skip delete manifests entirely —
246-
// we only care about newly added data files.
247-
if self.snapshot_range.is_some()
248-
&& manifest_file.content == ManifestContentType::Deletes
249-
{
250-
continue;
245+
// For incremental scans, skip manifests that can't contain relevant entries:
246+
// 1. Delete manifests — we only care about newly added data files.
247+
// 2. Data manifests whose added_snapshot_id is outside the scan range —
248+
// they can't contain entries added in the snapshots we care about.
249+
// (We still keep the entry-level filter because a manifest can contain
250+
// entries from multiple snapshots via manifest reuse.)
251+
if let Some(ref range) = self.snapshot_range {
252+
if manifest_file.content == ManifestContentType::Deletes {
253+
continue;
254+
}
255+
if !range.contains(manifest_file.added_snapshot_id) {
256+
continue;
257+
}
251258
}
252259

253260
let tx = if manifest_file.content == ManifestContentType::Deletes {

crates/iceberg/src/scan/mod.rs

Lines changed: 146 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ use crate::runtime::spawn;
4242
use crate::spec::{DataContentType, Operation, SnapshotRef, TableMetadataRef};
4343
use crate::table::Table;
4444
use crate::util::available_parallelism;
45+
use crate::util::snapshot::ancestors_between;
4546
use crate::{Error, ErrorKind, Result};
4647

4748
/// A stream of arrow [`RecordBatch`]es.
@@ -75,46 +76,85 @@ impl SnapshotRange {
7576
to_snapshot_id: i64,
7677
from_inclusive: bool,
7778
) -> Result<Self> {
78-
let mut snapshot_ids = HashSet::new();
79-
let mut current_id = Some(to_snapshot_id);
80-
81-
// Walk backwards from to_snapshot to from_snapshot
82-
while let Some(id) = current_id {
83-
let snapshot = table_metadata.snapshot_by_id(id).ok_or_else(|| {
84-
Error::new(ErrorKind::DataInvalid, format!("Snapshot {id} not found"))
79+
// Verify from_snapshot exists and determine the exclusive stop point.
80+
let from_snapshot = table_metadata
81+
.snapshot_by_id(from_snapshot_id)
82+
.ok_or_else(|| {
83+
Error::new(
84+
ErrorKind::DataInvalid,
85+
format!("Snapshot {from_snapshot_id} not found"),
86+
)
8587
})?;
8688

87-
// Validate operation is APPEND
89+
// ancestors_between returns (oldest_exclusive, latest_inclusive].
90+
// For inclusive mode, stop at from's parent so from itself is included.
91+
let oldest_exclusive = if from_inclusive {
92+
from_snapshot.parent_snapshot_id()
93+
} else {
94+
Some(from_snapshot_id)
95+
};
96+
97+
let snapshots: Vec<_> =
98+
ancestors_between(table_metadata, to_snapshot_id, oldest_exclusive).collect();
99+
100+
// ancestors_between silently returns the full chain to root if
101+
// oldest_exclusive isn't in the ancestry chain. Detect this:
102+
// if we got snapshots but from_snapshot_id wasn't encountered as
103+
// the stop point, the chain doesn't connect.
104+
if from_snapshot_id == to_snapshot_id {
105+
// Edge case: from == to. In exclusive mode, range is empty.
106+
// In inclusive mode, we should have exactly one snapshot.
107+
if !from_inclusive {
108+
return Ok(Self {
109+
snapshot_ids: HashSet::new(),
110+
});
111+
}
112+
} else if snapshots.is_empty() {
113+
// to_snapshot_id doesn't exist
114+
return Err(Error::new(
115+
ErrorKind::DataInvalid,
116+
format!(
117+
"from_snapshot {from_snapshot_id} is not an ancestor of to_snapshot {to_snapshot_id}",
118+
),
119+
));
120+
} else {
121+
// Verify the oldest snapshot in our walk is actually connected
122+
// to from_snapshot_id. The last snapshot's parent (for exclusive)
123+
// or the last snapshot itself (for inclusive) should be from_snapshot_id.
124+
let oldest_collected = snapshots.last().unwrap();
125+
let connects = if from_inclusive {
126+
oldest_collected.snapshot_id() == from_snapshot_id
127+
} else {
128+
oldest_collected.parent_snapshot_id() == Some(from_snapshot_id)
129+
};
130+
if !connects {
131+
return Err(Error::new(
132+
ErrorKind::DataInvalid,
133+
format!(
134+
"from_snapshot {from_snapshot_id} is not an ancestor of to_snapshot {to_snapshot_id}",
135+
),
136+
));
137+
}
138+
}
139+
140+
// Validate all snapshots have APPEND operations and collect IDs.
141+
let mut snapshot_ids = HashSet::with_capacity(snapshots.len());
142+
for snapshot in &snapshots {
88143
if snapshot.summary().operation != Operation::Append {
89144
return Err(Error::new(
90145
ErrorKind::FeatureUnsupported,
91146
format!(
92147
"Incremental scan only supports APPEND operations, \
93148
snapshot {} has operation: {:?}",
94-
id,
149+
snapshot.snapshot_id(),
95150
snapshot.summary().operation
96151
),
97152
));
98153
}
99-
100-
if id == from_snapshot_id {
101-
if from_inclusive {
102-
snapshot_ids.insert(id);
103-
}
104-
return Ok(Self { snapshot_ids });
105-
}
106-
107-
snapshot_ids.insert(id);
108-
current_id = snapshot.parent_snapshot_id();
154+
snapshot_ids.insert(snapshot.snapshot_id());
109155
}
110156

111-
// If we get here, from_snapshot was not found in the ancestry chain
112-
Err(Error::new(
113-
ErrorKind::DataInvalid,
114-
format!(
115-
"from_snapshot {from_snapshot_id} is not an ancestor of to_snapshot {to_snapshot_id}",
116-
),
117-
))
157+
Ok(Self { snapshot_ids })
118158
}
119159

120160
/// Check if a snapshot_id is within this range
@@ -2494,7 +2534,10 @@ pub mod tests {
24942534

24952535
assert!(result.is_err());
24962536
let err = result.unwrap_err();
2497-
assert!(err.to_string().contains("not an ancestor"));
2537+
assert!(
2538+
err.to_string().contains("not found"),
2539+
"Expected 'not found' error, got: {err}"
2540+
);
24982541
}
24992542

25002543
#[test]
@@ -2656,4 +2699,80 @@ pub mod tests {
26562699
assert!(range.contains(3055729675574597004), "S2 should be in range");
26572700
assert!(range.contains(3056729675574597004), "S3 should be in range");
26582701
}
2702+
2703+
#[tokio::test]
2704+
async fn test_incremental_scan_returns_only_added_files_in_range() {
2705+
// Fixture has S1 (append) -> S2 (append, current)
2706+
// Manifest contains:
2707+
// 1.parquet: status=Added, snapshot=S2
2708+
// 2.parquet: status=Deleted, snapshot=S1
2709+
// 3.parquet: status=Existing, snapshot=S1
2710+
let mut fixture = TableTestFixture::new();
2711+
fixture.setup_manifest_files().await;
2712+
2713+
let current_snapshot = fixture.table.metadata().current_snapshot().unwrap();
2714+
let parent_snapshot_id = current_snapshot.parent_snapshot_id().unwrap();
2715+
2716+
// Incremental scan from S1 (exclusive) to S2 should return only 1.parquet
2717+
let table_scan = fixture
2718+
.table
2719+
.incremental_append_scan(parent_snapshot_id)
2720+
.to_snapshot(current_snapshot.snapshot_id())
2721+
.build()
2722+
.unwrap();
2723+
2724+
let tasks: Vec<_> = table_scan
2725+
.plan_files()
2726+
.await
2727+
.unwrap()
2728+
.try_collect()
2729+
.await
2730+
.unwrap();
2731+
2732+
assert_eq!(
2733+
tasks.len(),
2734+
1,
2735+
"Incremental scan should return exactly 1 file"
2736+
);
2737+
assert_eq!(
2738+
tasks[0].data_file_path,
2739+
format!("{}/1.parquet", &fixture.table_location),
2740+
"Should only return the file added in S2"
2741+
);
2742+
}
2743+
2744+
#[tokio::test]
2745+
async fn test_incremental_scan_exclusive_same_snapshot_returns_empty() {
2746+
// Fixture has S1 (append) -> S2 (append, current)
2747+
let mut fixture = TableTestFixture::new();
2748+
fixture.setup_manifest_files().await;
2749+
2750+
let current_snapshot_id = fixture
2751+
.table
2752+
.metadata()
2753+
.current_snapshot()
2754+
.unwrap()
2755+
.snapshot_id();
2756+
2757+
// Incremental scan from S2 to S2 (exclusive) should return nothing
2758+
let table_scan = fixture
2759+
.table
2760+
.incremental_append_scan(current_snapshot_id)
2761+
.to_snapshot(current_snapshot_id)
2762+
.build()
2763+
.unwrap();
2764+
2765+
let tasks: Vec<_> = table_scan
2766+
.plan_files()
2767+
.await
2768+
.unwrap()
2769+
.try_collect()
2770+
.await
2771+
.unwrap();
2772+
2773+
assert!(
2774+
tasks.is_empty(),
2775+
"Exclusive scan from=to should return no files"
2776+
);
2777+
}
26592778
}

crates/integrations/datafusion/src/table/mod.rs

Lines changed: 43 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -1004,30 +1004,27 @@ mod tests {
10041004
let table = get_test_table_from_metadata_file().await;
10051005
let snapshots: Vec<_> = table.metadata().snapshots().collect();
10061006

1007-
// Need at least 2 snapshots for incremental scan
10081007
assert!(snapshots.len() >= 2);
10091008
let from_id = snapshots[0].snapshot_id();
10101009
let to_id = snapshots[snapshots.len() - 1].snapshot_id();
10111010

10121011
let provider =
1013-
IcebergStaticTableProvider::try_new_incremental(table.clone(), from_id, to_id).await;
1014-
1015-
// May fail due to non-APPEND operations in test data, that's OK
1016-
if let Ok(provider) = provider {
1017-
let ctx = SessionContext::new();
1018-
let state = ctx.state();
1019-
1020-
let scan_plan = provider.scan(&state, None, &[], None).await.unwrap();
1021-
let iceberg_scan = scan_plan
1022-
.as_any()
1023-
.downcast_ref::<IcebergTableScan>()
1024-
.expect("Expected IcebergTableScan");
1025-
1026-
// Verify incremental scan parameters are set
1027-
assert_eq!(iceberg_scan.from_snapshot_id(), Some(from_id));
1028-
assert!(!iceberg_scan.from_snapshot_inclusive());
1029-
assert_eq!(iceberg_scan.snapshot_id(), Some(to_id));
1030-
}
1012+
IcebergStaticTableProvider::try_new_incremental(table.clone(), from_id, to_id)
1013+
.await
1014+
.unwrap();
1015+
1016+
let ctx = SessionContext::new();
1017+
let state = ctx.state();
1018+
1019+
let scan_plan = provider.scan(&state, None, &[], None).await.unwrap();
1020+
let iceberg_scan = scan_plan
1021+
.as_any()
1022+
.downcast_ref::<IcebergTableScan>()
1023+
.expect("Expected IcebergTableScan");
1024+
1025+
assert_eq!(iceberg_scan.from_snapshot_id(), Some(from_id));
1026+
assert!(!iceberg_scan.from_snapshot_inclusive());
1027+
assert_eq!(iceberg_scan.snapshot_id(), Some(to_id));
10311028
}
10321029

10331030
#[tokio::test]
@@ -1046,22 +1043,20 @@ mod tests {
10461043
from_id,
10471044
to_id,
10481045
)
1049-
.await;
1046+
.await
1047+
.unwrap();
10501048

1051-
if let Ok(provider) = provider {
1052-
let ctx = SessionContext::new();
1053-
let state = ctx.state();
1049+
let ctx = SessionContext::new();
1050+
let state = ctx.state();
10541051

1055-
let scan_plan = provider.scan(&state, None, &[], None).await.unwrap();
1056-
let iceberg_scan = scan_plan
1057-
.as_any()
1058-
.downcast_ref::<IcebergTableScan>()
1059-
.expect("Expected IcebergTableScan");
1052+
let scan_plan = provider.scan(&state, None, &[], None).await.unwrap();
1053+
let iceberg_scan = scan_plan
1054+
.as_any()
1055+
.downcast_ref::<IcebergTableScan>()
1056+
.expect("Expected IcebergTableScan");
10601057

1061-
// Verify inclusive flag is set
1062-
assert_eq!(iceberg_scan.from_snapshot_id(), Some(from_id));
1063-
assert!(iceberg_scan.from_snapshot_inclusive());
1064-
}
1058+
assert_eq!(iceberg_scan.from_snapshot_id(), Some(from_id));
1059+
assert!(iceberg_scan.from_snapshot_inclusive());
10651060
}
10661061

10671062
#[tokio::test]
@@ -1074,25 +1069,22 @@ mod tests {
10741069
assert!(!snapshots.is_empty());
10751070
let from_id = snapshots[0].snapshot_id();
10761071

1077-
let provider =
1078-
IcebergStaticTableProvider::try_new_appends_after(table.clone(), from_id).await;
1079-
1080-
if let Ok(provider) = provider {
1081-
let ctx = SessionContext::new();
1082-
let state = ctx.state();
1083-
1084-
let scan_plan = provider.scan(&state, None, &[], None).await.unwrap();
1085-
let iceberg_scan = scan_plan
1086-
.as_any()
1087-
.downcast_ref::<IcebergTableScan>()
1088-
.expect("Expected IcebergTableScan");
1089-
1090-
// Verify appends_after configuration
1091-
assert_eq!(iceberg_scan.from_snapshot_id(), Some(from_id));
1092-
assert!(!iceberg_scan.from_snapshot_inclusive());
1093-
// snapshot_id should be None (uses current)
1094-
assert_eq!(iceberg_scan.snapshot_id(), None);
1095-
}
1072+
let provider = IcebergStaticTableProvider::try_new_appends_after(table.clone(), from_id)
1073+
.await
1074+
.unwrap();
1075+
1076+
let ctx = SessionContext::new();
1077+
let state = ctx.state();
1078+
1079+
let scan_plan = provider.scan(&state, None, &[], None).await.unwrap();
1080+
let iceberg_scan = scan_plan
1081+
.as_any()
1082+
.downcast_ref::<IcebergTableScan>()
1083+
.expect("Expected IcebergTableScan");
1084+
1085+
assert_eq!(iceberg_scan.from_snapshot_id(), Some(from_id));
1086+
assert!(!iceberg_scan.from_snapshot_inclusive());
1087+
assert_eq!(iceberg_scan.snapshot_id(), None);
10961088
}
10971089

10981090
#[tokio::test]

0 commit comments

Comments
 (0)