@@ -46,16 +46,16 @@ use crate::transaction::{ActionCommit, TransactionAction};
4646/// 1. Write new rows to data files
4747/// 2. Add files via `add_data_files()`
4848///
49- /// # Future: Merge-on-Read (MOR) Strategy
49+ /// # Future: Merge-on-Read Strategy
5050///
51- /// The `add_delete_files()` method is reserved for future MOR support, which uses
51+ /// The `add_delete_files()` method is reserved for future Merge-on-Read support, which uses
5252/// delete files instead of rewriting data files.
5353pub struct RowDeltaAction {
5454 /// New data files to add (for inserts or rewritten files in COW mode)
5555 added_data_files : Vec < DataFile > ,
5656 /// Data files to mark as deleted (for COW mode when rewriting files)
5757 removed_data_files : Vec < DataFile > ,
58- /// Delete files to add (reserved for future MOR mode support)
58+ /// Delete files to add (reserved for future Merge-on-Read mode support)
5959 added_delete_files : Vec < DataFile > ,
6060 /// Optional commit UUID for manifest file naming
6161 commit_uuid : Option < Uuid > ,
@@ -77,37 +77,25 @@ impl RowDeltaAction {
7777 }
7878 }
7979
80- /// Add new data files to the snapshot.
81- ///
82- /// Used for:
80+ /// Add new data files to the snapshot. Used for:
8381 /// - New rows from INSERT operations
8482 /// - Rewritten data files in COW mode (after applying UPDATE/DELETE)
85- ///
86- /// Corresponds to `addRows(DataFile)` in Java implementation.
8783 pub fn add_data_files ( mut self , data_files : impl IntoIterator < Item = DataFile > ) -> Self {
8884 self . added_data_files . extend ( data_files) ;
8985 self
9086 }
9187
9288 /// Mark data files as deleted in the snapshot.
93- ///
94- /// Used in COW mode to mark original files as deleted when they've been rewritten
95- /// with modifications.
96- ///
89+ /// Used in COW mode to mark original files as deleted when they've been rewritten with modifications.
9790 /// Corresponds to `removeRows(DataFile)` in Java implementation.
9891 pub fn remove_data_files ( mut self , data_files : impl IntoIterator < Item = DataFile > ) -> Self {
9992 self . removed_data_files . extend ( data_files) ;
10093 self
10194 }
10295
103- /// Add delete files to the snapshot (reserved for future MOR mode).
104- ///
105- /// Corresponds to `addDeletes(DeleteFile)` in Java implementation.
106- ///
107- /// # Note
108- ///
109- /// This is not yet implemented and is reserved for future Merge-on-Read (MOR)
110- /// optimization where delete files are used instead of rewriting data files.
96+ /// Add delete files to the snapshot (reserved for future Merge-on-Read mode).
97+ /// #Note: This is not yet implemented and is reserved for future Merge-on-Read optimization
98+ /// where delete files are used instead of rewriting data files.
11199 pub fn add_delete_files ( mut self , delete_files : impl IntoIterator < Item = DataFile > ) -> Self {
112100 self . added_delete_files . extend ( delete_files) ;
113101 self
@@ -126,10 +114,7 @@ impl RowDeltaAction {
126114 }
127115
128116 /// Validate that the operation is applied on top of a specific snapshot.
129- ///
130117 /// This can be used for conflict detection in concurrent modification scenarios.
131- ///
132- /// Corresponds to `validateFromSnapshot(long snapshotId)` in Java implementation.
133118 pub fn validate_from_snapshot ( mut self , snapshot_id : i64 ) -> Self {
134119 self . starting_snapshot_id = Some ( snapshot_id) ;
135120 self
@@ -208,7 +193,6 @@ impl SnapshotProduceOperation for RowDeltaOperation {
208193 }
209194
210195 /// Returns manifest entries for files that should be marked as deleted.
211- ///
212196 /// This creates DELETED entries for removed data files in COW mode.
213197 async fn delete_entries (
214198 & self ,
@@ -217,22 +201,30 @@ impl SnapshotProduceOperation for RowDeltaOperation {
217201 let snapshot_id = snapshot_produce. table . metadata ( ) . current_snapshot_id ( ) ;
218202
219203 // Create DELETED manifest entries for removed data files
220- let deleted_entries = self
204+ let deleted_entries: Vec < ManifestEntry > = self
221205 . removed_data_files
222206 . iter ( )
223207 . map ( |data_file| {
224- if let Some ( snapshot_id) = snapshot_id {
208+ let entry = if let Some ( snapshot_id) = snapshot_id {
225209 ManifestEntry :: builder ( )
226210 . status ( ManifestStatus :: Deleted )
227211 . snapshot_id ( snapshot_id)
212+ // TODO: Get actual sequence numbers from original manifest entries
213+ // For now, use 0 as a placeholder - this should be the sequence
214+ // number from when the file was originally added
215+ . sequence_number ( 0 )
216+ . file_sequence_number ( 0 )
228217 . data_file ( data_file. clone ( ) )
229218 . build ( )
230219 } else {
231220 ManifestEntry :: builder ( )
232221 . status ( ManifestStatus :: Deleted )
222+ . sequence_number ( 0 )
223+ . file_sequence_number ( 0 )
233224 . data_file ( data_file. clone ( ) )
234225 . build ( )
235- }
226+ } ;
227+ entry
236228 } )
237229 . collect ( ) ;
238230
@@ -241,9 +233,12 @@ impl SnapshotProduceOperation for RowDeltaOperation {
241233
242234 /// Returns existing manifest files that should be included in the new snapshot.
243235 ///
244- /// For RowDelta:
245- /// - Include all existing manifests (they contain unchanged data)
246- /// - The snapshot producer will add new manifests for added/deleted entries
236+ /// For RowDelta in Copy-on-Write mode:
237+ /// - We're rewriting entire data files (not just modifying rows)
238+ /// - Files being deleted are completely replaced by new files
239+ /// - We should NOT carry forward manifests that contain any of the deleted files
240+ ///
241+ /// Note: For future precision COW or Merge-on-Read modes, this logic may need refinement.
247242 async fn existing_manifest (
248243 & self ,
249244 snapshot_produce : & SnapshotProducer < ' _ > ,
@@ -259,13 +254,37 @@ impl SnapshotProduceOperation for RowDeltaOperation {
259254 )
260255 . await ?;
261256
262- // Include all existing manifests - unchanged data is still valid
263- Ok ( manifest_list
264- . entries ( )
257+ // In COW mode, we rewrite entire files, so we need to exclude manifests
258+ // that contain any files we're deleting. Create a set of deleted file paths for fast lookup.
259+ let deleted_file_paths: std:: collections:: HashSet < String > = self
260+ . removed_data_files
265261 . iter ( )
266- . filter ( |entry| entry. has_added_files ( ) || entry. has_existing_files ( ) )
267- . cloned ( )
268- . collect ( ) )
262+ . map ( |f| f. file_path ( ) . to_string ( ) )
263+ . collect ( ) ;
264+
265+ // Filter out manifests that contain deleted files
266+ let mut filtered_manifests = Vec :: new ( ) ;
267+ for manifest_file in manifest_list. entries ( ) . iter ( ) {
268+ if manifest_file. has_added_files ( ) || manifest_file. has_existing_files ( ) {
269+ // Load the manifest to check if it contains any deleted files
270+ let manifest = manifest_file
271+ . load_manifest ( snapshot_produce. table . file_io ( ) )
272+ . await ?;
273+
274+ // Check if any entries in this manifest are files we're deleting
275+ let contains_deleted_file = manifest
276+ . entries ( )
277+ . iter ( )
278+ . any ( |entry| deleted_file_paths. contains ( entry. data_file ( ) . file_path ( ) ) ) ;
279+
280+ if !contains_deleted_file {
281+ // This manifest doesn't contain any files we're deleting, keep it
282+ filtered_manifests. push ( manifest_file. clone ( ) ) ;
283+ }
284+ }
285+ }
286+
287+ Ok ( filtered_manifests)
269288 }
270289}
271290
0 commit comments