Skip to content

Commit 5654cf9

Browse files
committed
add improvements
1 parent 11f1389 commit 5654cf9

3 files changed

Lines changed: 57 additions & 37 deletions

File tree

crates/iceberg/src/transaction/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ impl Transaction {
147147
///
148148
/// RowDelta supports:
149149
/// - Adding new data files (inserts)
150-
/// - Removing data files (deletes in COW mode)
150+
/// - Removing data files (deletes in Copy-on-Write (COW) mode)
151151
/// - Both operations in a single transaction (updates/merges)
152152
pub fn row_delta(&self) -> RowDeltaAction {
153153
RowDeltaAction::new()

crates/iceberg/src/transaction/row_delta.rs

Lines changed: 52 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -46,16 +46,16 @@ use crate::transaction::{ActionCommit, TransactionAction};
4646
/// 1. Write new rows to data files
4747
/// 2. Add files via `add_data_files()`
4848
///
49-
/// # Future: Merge-on-Read (MOR) Strategy
49+
/// # Future: Merge-on-Read Strategy
5050
///
51-
/// The `add_delete_files()` method is reserved for future MOR support, which uses
51+
/// The `add_delete_files()` method is reserved for future Merge-on-Read support, which uses
5252
/// delete files instead of rewriting data files.
5353
pub struct RowDeltaAction {
5454
/// New data files to add (for inserts or rewritten files in COW mode)
5555
added_data_files: Vec<DataFile>,
5656
/// Data files to mark as deleted (for COW mode when rewriting files)
5757
removed_data_files: Vec<DataFile>,
58-
/// Delete files to add (reserved for future MOR mode support)
58+
/// Delete files to add (reserved for future Merge-on-Read mode support)
5959
added_delete_files: Vec<DataFile>,
6060
/// Optional commit UUID for manifest file naming
6161
commit_uuid: Option<Uuid>,
@@ -77,37 +77,25 @@ impl RowDeltaAction {
7777
}
7878
}
7979

80-
/// Add new data files to the snapshot.
81-
///
82-
/// Used for:
80+
/// Add new data files to the snapshot. Used for:
8381
/// - New rows from INSERT operations
8482
/// - Rewritten data files in COW mode (after applying UPDATE/DELETE)
85-
///
86-
/// Corresponds to `addRows(DataFile)` in Java implementation.
8783
pub fn add_data_files(mut self, data_files: impl IntoIterator<Item = DataFile>) -> Self {
8884
self.added_data_files.extend(data_files);
8985
self
9086
}
9187

9288
/// Mark data files as deleted in the snapshot.
93-
///
94-
/// Used in COW mode to mark original files as deleted when they've been rewritten
95-
/// with modifications.
96-
///
89+
/// Used in COW mode to mark original files as deleted when they've been rewritten with modifications.
9790
/// Corresponds to `removeRows(DataFile)` in Java implementation.
9891
pub fn remove_data_files(mut self, data_files: impl IntoIterator<Item = DataFile>) -> Self {
9992
self.removed_data_files.extend(data_files);
10093
self
10194
}
10295

103-
/// Add delete files to the snapshot (reserved for future MOR mode).
104-
///
105-
/// Corresponds to `addDeletes(DeleteFile)` in Java implementation.
106-
///
107-
/// # Note
108-
///
109-
/// This is not yet implemented and is reserved for future Merge-on-Read (MOR)
110-
/// optimization where delete files are used instead of rewriting data files.
96+
/// Add delete files to the snapshot (reserved for future Merge-on-Read mode).
97+
/// #Note: This is not yet implemented and is reserved for future Merge-on-Read optimization
98+
/// where delete files are used instead of rewriting data files.
11199
pub fn add_delete_files(mut self, delete_files: impl IntoIterator<Item = DataFile>) -> Self {
112100
self.added_delete_files.extend(delete_files);
113101
self
@@ -126,10 +114,7 @@ impl RowDeltaAction {
126114
}
127115

128116
/// Validate that the operation is applied on top of a specific snapshot.
129-
///
130117
/// This can be used for conflict detection in concurrent modification scenarios.
131-
///
132-
/// Corresponds to `validateFromSnapshot(long snapshotId)` in Java implementation.
133118
pub fn validate_from_snapshot(mut self, snapshot_id: i64) -> Self {
134119
self.starting_snapshot_id = Some(snapshot_id);
135120
self
@@ -208,7 +193,6 @@ impl SnapshotProduceOperation for RowDeltaOperation {
208193
}
209194

210195
/// Returns manifest entries for files that should be marked as deleted.
211-
///
212196
/// This creates DELETED entries for removed data files in COW mode.
213197
async fn delete_entries(
214198
&self,
@@ -217,19 +201,26 @@ impl SnapshotProduceOperation for RowDeltaOperation {
217201
let snapshot_id = snapshot_produce.table.metadata().current_snapshot_id();
218202

219203
// Create DELETED manifest entries for removed data files
220-
let deleted_entries = self
204+
let deleted_entries: Vec<ManifestEntry> = self
221205
.removed_data_files
222206
.iter()
223207
.map(|data_file| {
224208
if let Some(snapshot_id) = snapshot_id {
225209
ManifestEntry::builder()
226210
.status(ManifestStatus::Deleted)
227211
.snapshot_id(snapshot_id)
212+
// TODO: Get actual sequence numbers from original manifest entries
213+
// For now, use 0 as a placeholder - this should be the sequence
214+
// number from when the file was originally added
215+
.sequence_number(0)
216+
.file_sequence_number(0)
228217
.data_file(data_file.clone())
229218
.build()
230219
} else {
231220
ManifestEntry::builder()
232221
.status(ManifestStatus::Deleted)
222+
.sequence_number(0)
223+
.file_sequence_number(0)
233224
.data_file(data_file.clone())
234225
.build()
235226
}
@@ -241,9 +232,12 @@ impl SnapshotProduceOperation for RowDeltaOperation {
241232

242233
/// Returns existing manifest files that should be included in the new snapshot.
243234
///
244-
/// For RowDelta:
245-
/// - Include all existing manifests (they contain unchanged data)
246-
/// - The snapshot producer will add new manifests for added/deleted entries
235+
/// For RowDelta in Copy-on-Write mode:
236+
/// - We're rewriting entire data files (not just modifying rows)
237+
/// - Files being deleted are completely replaced by new files
238+
/// - We should NOT carry forward manifests that contain any of the deleted files
239+
///
240+
/// Note: For future precision COW or Merge-on-Read modes, this logic may need refinement.
247241
async fn existing_manifest(
248242
&self,
249243
snapshot_produce: &SnapshotProducer<'_>,
@@ -259,13 +253,37 @@ impl SnapshotProduceOperation for RowDeltaOperation {
259253
)
260254
.await?;
261255

262-
// Include all existing manifests - unchanged data is still valid
263-
Ok(manifest_list
264-
.entries()
256+
// In COW mode, we rewrite entire files, so we need to exclude manifests
257+
// that contain any files we're deleting. Create a set of deleted file paths for fast lookup.
258+
let deleted_file_paths: std::collections::HashSet<String> = self
259+
.removed_data_files
265260
.iter()
266-
.filter(|entry| entry.has_added_files() || entry.has_existing_files())
267-
.cloned()
268-
.collect())
261+
.map(|f| f.file_path().to_string())
262+
.collect();
263+
264+
// Filter out manifests that contain deleted files
265+
let mut filtered_manifests = Vec::new();
266+
for manifest_file in manifest_list.entries().iter() {
267+
if manifest_file.has_added_files() || manifest_file.has_existing_files() {
268+
// Load the manifest to check if it contains any deleted files
269+
let manifest = manifest_file
270+
.load_manifest(snapshot_produce.table.file_io())
271+
.await?;
272+
273+
// Check if any entries in this manifest are files we're deleting
274+
let contains_deleted_file = manifest
275+
.entries()
276+
.iter()
277+
.any(|entry| deleted_file_paths.contains(entry.data_file().file_path()));
278+
279+
if !contains_deleted_file {
280+
// This manifest doesn't contain any files we're deleting, keep it
281+
filtered_manifests.push(manifest_file.clone());
282+
}
283+
}
284+
}
285+
286+
Ok(filtered_manifests)
269287
}
270288
}
271289

crates/iceberg/src/transaction/snapshot.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -327,13 +327,15 @@ impl<'a> SnapshotProducer<'a> {
327327
if delete_entries.is_empty() {
328328
return Err(Error::new(
329329
ErrorKind::PreconditionFailed,
330-
"No delete entries found when write a delete manifest file",
330+
"No delete entries found when writing a delete manifest file",
331331
));
332332
}
333333

334334
let mut writer = self.new_manifest_writer(ManifestContentType::Data)?;
335335
for entry in delete_entries {
336-
writer.add_entry(entry)?;
336+
// Use add_delete_entry() to preserve Deleted status instead of add_entry()
337+
// which always overwrites status to Added
338+
writer.add_delete_entry(entry)?;
337339
}
338340
writer.write_manifest_file().await
339341
}

0 commit comments

Comments
 (0)