@@ -34,17 +34,15 @@ use arrow_array::{ArrayRef, Int64Array, RecordBatch, StringArray};
3434use arrow_schema:: SchemaRef as ArrowSchemaRef ;
3535use futures:: TryStreamExt ;
3636use iceberg:: arrow:: schema_to_arrow_schema;
37- use iceberg:: memory:: { MemoryCatalogBuilder , MEMORY_CATALOG_WAREHOUSE } ;
38- use iceberg:: spec:: {
39- NestedField , PartitionSpec , PrimitiveType , Schema , SortOrder , Type ,
40- } ;
37+ use iceberg:: memory:: { MEMORY_CATALOG_WAREHOUSE , MemoryCatalogBuilder } ;
38+ use iceberg:: spec:: { NestedField , PartitionSpec , PrimitiveType , Schema , SortOrder , Type } ;
4139use iceberg:: transaction:: { ApplyTransactionAction , Transaction } ;
4240use iceberg:: writer:: base_writer:: data_file_writer:: DataFileWriterBuilder ;
41+ use iceberg:: writer:: file_writer:: ParquetWriterBuilder ;
4342use iceberg:: writer:: file_writer:: location_generator:: {
4443 DefaultFileNameGenerator , DefaultLocationGenerator ,
4544} ;
4645use iceberg:: writer:: file_writer:: rolling_writer:: RollingFileWriterBuilder ;
47- use iceberg:: writer:: file_writer:: ParquetWriterBuilder ;
4846use iceberg:: writer:: { IcebergWriter , IcebergWriterBuilder } ;
4947use iceberg:: { Catalog , CatalogBuilder , TableCreation } ;
5048use parquet:: file:: properties:: WriterProperties ;
@@ -103,10 +101,7 @@ async fn run_benchmark(num_files: usize, rows_per_file: usize) {
103101 . unwrap ( ) ;
104102
105103 let ns = iceberg:: NamespaceIdent :: new ( "bench_ns" . to_string ( ) ) ;
106- catalog
107- . create_namespace ( & ns, HashMap :: new ( ) )
108- . await
109- . unwrap ( ) ;
104+ catalog. create_namespace ( & ns, HashMap :: new ( ) ) . await . unwrap ( ) ;
110105
111106 let schema = create_schema ( ) ;
112107 let table_creation = TableCreation :: builder ( )
@@ -119,9 +114,8 @@ async fn run_benchmark(num_files: usize, rows_per_file: usize) {
119114 let mut table = catalog. create_table ( & ns, table_creation) . await . unwrap ( ) ;
120115
121116 // Derive Arrow schema from Iceberg schema (includes field ID metadata)
122- let arrow_schema: ArrowSchemaRef = Arc :: new (
123- schema_to_arrow_schema ( table. metadata ( ) . current_schema ( ) ) . unwrap ( ) ,
124- ) ;
117+ let arrow_schema: ArrowSchemaRef =
118+ Arc :: new ( schema_to_arrow_schema ( table. metadata ( ) . current_schema ( ) ) . unwrap ( ) ) ;
125119
126120 // Phase 1: Write N small files (simulating micro-batch ingestion)
127121 let write_start = Instant :: now ( ) ;
@@ -131,8 +125,7 @@ async fn run_benchmark(num_files: usize, rows_per_file: usize) {
131125 let start_id = ( file_idx * rows_per_file) as i64 ;
132126 let batch = generate_batch ( & arrow_schema, start_id, rows_per_file) ;
133127
134- let location_gen =
135- DefaultLocationGenerator :: new ( table. metadata ( ) . clone ( ) ) . unwrap ( ) ;
128+ let location_gen = DefaultLocationGenerator :: new ( table. metadata ( ) . clone ( ) ) . unwrap ( ) ;
136129 let file_name_gen = DefaultFileNameGenerator :: new (
137130 format ! ( "frag_{:04}" , file_idx) ,
138131 None ,
@@ -189,8 +182,7 @@ async fn run_benchmark(num_files: usize, rows_per_file: usize) {
189182
190183 // Phase 3: Write compacted file -- this is the compaction WRITE path
191184 let compact_write_start = Instant :: now ( ) ;
192- let location_gen =
193- DefaultLocationGenerator :: new ( table. metadata ( ) . clone ( ) ) . unwrap ( ) ;
185+ let location_gen = DefaultLocationGenerator :: new ( table. metadata ( ) . clone ( ) ) . unwrap ( ) ;
194186 let file_name_gen = DefaultFileNameGenerator :: new (
195187 "compacted" . to_string ( ) ,
196188 None ,
@@ -222,11 +214,7 @@ async fn run_benchmark(num_files: usize, rows_per_file: usize) {
222214
223215 // Phase 4: Commit replacement via ReplaceDataFilesAction
224216 let commit_start = Instant :: now ( ) ;
225- let snapshot_id = table
226- . metadata ( )
227- . current_snapshot ( )
228- . unwrap ( )
229- . snapshot_id ( ) ;
217+ let snapshot_id = table. metadata ( ) . current_snapshot ( ) . unwrap ( ) . snapshot_id ( ) ;
230218
231219 let tx = Transaction :: new ( & table) ;
232220 let action = tx
@@ -285,10 +273,7 @@ async fn run_benchmark(num_files: usize, rows_per_file: usize) {
285273
286274 // Snapshot verification
287275 let snapshots: Vec < _ > = table. metadata ( ) . snapshots ( ) . collect ( ) ;
288- println ! (
289- "Snapshots: {} (append + replace)" ,
290- snapshots. len( )
291- ) ;
276+ println ! ( "Snapshots: {} (append + replace)" , snapshots. len( ) ) ;
292277 let current = table. metadata ( ) . current_snapshot ( ) . unwrap ( ) ;
293278 println ! (
294279 "Current snapshot operation: {:?}" ,
0 commit comments