Skip to content

Commit 53a835b

Browse files
committed
scan planning
1 parent 4e8cf7f commit 53a835b

9 files changed

Lines changed: 699 additions & 1 deletion

File tree

crates/iceberg/src/arrow/caching_delete_file_loader.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -943,6 +943,7 @@ mod tests {
943943
partition_spec: None,
944944
name_mapping: None,
945945
case_sensitive: false,
946+
split_offsets: None,
946947
};
947948

948949
// Load the deletes - should handle both types without error

crates/iceberg/src/arrow/delete_filter.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -428,6 +428,7 @@ pub(crate) mod tests {
428428
partition_spec: None,
429429
name_mapping: None,
430430
case_sensitive: false,
431+
split_offsets: None,
431432
},
432433
FileScanTask {
433434
file_size_in_bytes: 0,
@@ -444,6 +445,7 @@ pub(crate) mod tests {
444445
partition_spec: None,
445446
name_mapping: None,
446447
case_sensitive: false,
448+
split_offsets: None,
447449
},
448450
];
449451

@@ -501,6 +503,7 @@ pub(crate) mod tests {
501503
partition_spec: None,
502504
name_mapping: None,
503505
case_sensitive: true,
506+
split_offsets: None,
504507
};
505508

506509
let filter = DeleteFilter::default();

crates/iceberg/src/arrow/reader.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2333,6 +2333,7 @@ message schema {
23332333
partition_spec: None,
23342334
name_mapping: None,
23352335
case_sensitive: false,
2336+
split_offsets: None,
23362337
})]
23372338
.into_iter(),
23382339
)) as FileScanTaskStream;
@@ -2656,6 +2657,7 @@ message schema {
26562657
partition_spec: None,
26572658
name_mapping: None,
26582659
case_sensitive: false,
2660+
split_offsets: None,
26592661
};
26602662

26612663
// Task 2: read the second and third row groups
@@ -2674,6 +2676,7 @@ message schema {
26742676
partition_spec: None,
26752677
name_mapping: None,
26762678
case_sensitive: false,
2679+
split_offsets: None,
26772680
};
26782681

26792682
let tasks1 = Box::pin(futures::stream::iter(vec![Ok(task1)])) as FileScanTaskStream;
@@ -2805,6 +2808,7 @@ message schema {
28052808
partition_spec: None,
28062809
name_mapping: None,
28072810
case_sensitive: false,
2811+
split_offsets: None,
28082812
})]
28092813
.into_iter(),
28102814
)) as FileScanTaskStream;
@@ -2979,6 +2983,7 @@ message schema {
29792983
partition_spec: None,
29802984
name_mapping: None,
29812985
case_sensitive: false,
2986+
split_offsets: None,
29822987
};
29832988

29842989
let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream;
@@ -3199,6 +3204,7 @@ message schema {
31993204
partition_spec: None,
32003205
name_mapping: None,
32013206
case_sensitive: false,
3207+
split_offsets: None,
32023208
};
32033209

32043210
let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream;
@@ -3412,6 +3418,7 @@ message schema {
34123418
partition_spec: None,
34133419
name_mapping: None,
34143420
case_sensitive: false,
3421+
split_offsets: None,
34153422
};
34163423

34173424
let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream;
@@ -3519,6 +3526,7 @@ message schema {
35193526
partition_spec: None,
35203527
name_mapping: None,
35213528
case_sensitive: false,
3529+
split_offsets: None,
35223530
})]
35233531
.into_iter(),
35243532
)) as FileScanTaskStream;
@@ -3620,6 +3628,7 @@ message schema {
36203628
partition_spec: None,
36213629
name_mapping: None,
36223630
case_sensitive: false,
3631+
split_offsets: None,
36233632
})]
36243633
.into_iter(),
36253634
)) as FileScanTaskStream;
@@ -3710,6 +3719,7 @@ message schema {
37103719
partition_spec: None,
37113720
name_mapping: None,
37123721
case_sensitive: false,
3722+
split_offsets: None,
37133723
})]
37143724
.into_iter(),
37153725
)) as FileScanTaskStream;
@@ -3814,6 +3824,7 @@ message schema {
38143824
partition_spec: None,
38153825
name_mapping: None,
38163826
case_sensitive: false,
3827+
split_offsets: None,
38173828
})]
38183829
.into_iter(),
38193830
)) as FileScanTaskStream;
@@ -3947,6 +3958,7 @@ message schema {
39473958
partition_spec: None,
39483959
name_mapping: None,
39493960
case_sensitive: false,
3961+
split_offsets: None,
39503962
})]
39513963
.into_iter(),
39523964
)) as FileScanTaskStream;
@@ -4047,6 +4059,7 @@ message schema {
40474059
partition_spec: None,
40484060
name_mapping: None,
40494061
case_sensitive: false,
4062+
split_offsets: None,
40504063
})]
40514064
.into_iter(),
40524065
)) as FileScanTaskStream;
@@ -4160,6 +4173,7 @@ message schema {
41604173
partition_spec: None,
41614174
name_mapping: None,
41624175
case_sensitive: false,
4176+
split_offsets: None,
41634177
})]
41644178
.into_iter(),
41654179
)) as FileScanTaskStream;
@@ -4254,6 +4268,7 @@ message schema {
42544268
partition_spec: None,
42554269
name_mapping: None,
42564270
case_sensitive: false,
4271+
split_offsets: None,
42574272
}),
42584273
Ok(FileScanTask {
42594274
file_size_in_bytes: std::fs::metadata(format!("{table_location}/file_1.parquet"))
@@ -4272,6 +4287,7 @@ message schema {
42724287
partition_spec: None,
42734288
name_mapping: None,
42744289
case_sensitive: false,
4290+
split_offsets: None,
42754291
}),
42764292
Ok(FileScanTask {
42774293
file_size_in_bytes: std::fs::metadata(format!("{table_location}/file_2.parquet"))
@@ -4290,6 +4306,7 @@ message schema {
42904306
partition_spec: None,
42914307
name_mapping: None,
42924308
case_sensitive: false,
4309+
split_offsets: None,
42934310
}),
42944311
];
42954312

@@ -4472,6 +4489,7 @@ message schema {
44724489
partition_spec: Some(partition_spec),
44734490
name_mapping: None,
44744491
case_sensitive: false,
4492+
split_offsets: None,
44754493
})]
44764494
.into_iter(),
44774495
)) as FileScanTaskStream;
@@ -4888,6 +4906,7 @@ message schema {
48884906
partition_spec: None,
48894907
name_mapping: None,
48904908
case_sensitive: false,
4909+
split_offsets: None,
48914910
})]
48924911
.into_iter(),
48934912
)) as FileScanTaskStream;
@@ -4956,6 +4975,7 @@ message schema {
49564975
partition_spec: None,
49574976
name_mapping: None,
49584977
case_sensitive: false,
4978+
split_offsets: None,
49594979
};
49604980

49614981
let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream;
Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! First-fit-decreasing bin packing with lookback.
19+
//!
20+
//! Used by [`TableScan::plan_tasks()`] to combine split scan tasks into
21+
//! balanced groups whose total weight is roughly `target_weight`.
22+
23+
use std::collections::VecDeque;
24+
25+
struct Bin<T> {
26+
items: Vec<T>,
27+
weight: u64,
28+
}
29+
30+
/// Bin-pack `items` into groups whose total weight is roughly `target_weight`.
31+
///
32+
/// Uses a first-fit-decreasing strategy with a sliding `lookback` window of
33+
/// open bins, matching the algorithm in Java Iceberg's `BinPacking.java`.
34+
///
35+
/// Items heavier than `target_weight` are placed in their own bin.
36+
pub(crate) fn bin_pack<T, F>(
37+
mut items: Vec<T>,
38+
target_weight: u64,
39+
lookback: usize,
40+
weight_fn: F,
41+
) -> Vec<Vec<T>>
42+
where
43+
F: Fn(&T) -> u64,
44+
{
45+
if items.is_empty() {
46+
return vec![];
47+
}
48+
49+
let lookback = lookback.max(1);
50+
51+
// Compute weights and sort descending (heaviest first)
52+
let mut weighted: Vec<(T, u64)> = items
53+
.drain(..)
54+
.map(|item| {
55+
let w = weight_fn(&item);
56+
(item, w)
57+
})
58+
.collect();
59+
weighted.sort_by(|a, b| b.1.cmp(&a.1));
60+
61+
let mut result: Vec<Vec<T>> = Vec::new();
62+
let mut open_bins: VecDeque<Bin<T>> = VecDeque::new();
63+
64+
for (item, weight) in weighted {
65+
// Try to fit into an existing open bin
66+
let fit_idx = open_bins
67+
.iter()
68+
.position(|bin| bin.weight + weight <= target_weight);
69+
70+
if let Some(idx) = fit_idx {
71+
open_bins[idx].weight += weight;
72+
open_bins[idx].items.push(item);
73+
} else {
74+
// Evict the largest bin if we've exceeded lookback
75+
if open_bins.len() >= lookback {
76+
let max_idx = open_bins
77+
.iter()
78+
.enumerate()
79+
.max_by_key(|(_, b)| b.weight)
80+
.map(|(i, _)| i)
81+
.unwrap();
82+
let evicted = open_bins.remove(max_idx).unwrap();
83+
result.push(evicted.items);
84+
}
85+
86+
open_bins.push_back(Bin {
87+
items: vec![item],
88+
weight,
89+
});
90+
}
91+
}
92+
93+
// Flush remaining bins
94+
for bin in open_bins {
95+
result.push(bin.items);
96+
}
97+
98+
result
99+
}
100+
101+
#[cfg(test)]
102+
mod tests {
103+
use super::*;
104+
105+
#[test]
106+
fn test_empty_input() {
107+
let items: Vec<u64> = vec![];
108+
let result = bin_pack(items, 100, 10, |&x| x);
109+
assert!(result.is_empty());
110+
}
111+
112+
#[test]
113+
fn test_single_item_fits() {
114+
let result = bin_pack(vec![50u64], 100, 10, |&x| x);
115+
assert_eq!(result.len(), 1);
116+
assert_eq!(result[0], vec![50]);
117+
}
118+
119+
#[test]
120+
fn test_single_oversized_item() {
121+
let result = bin_pack(vec![200u64], 100, 10, |&x| x);
122+
assert_eq!(result.len(), 1);
123+
assert_eq!(result[0], vec![200]);
124+
}
125+
126+
#[test]
127+
fn test_multiple_small_items_pack_together() {
128+
let result = bin_pack(vec![30u64, 20, 10, 25, 15], 100, 10, |&x| x);
129+
// Total weight = 100, fits in one bin
130+
assert_eq!(result.len(), 1);
131+
assert_eq!(result[0].iter().sum::<u64>(), 100);
132+
}
133+
134+
#[test]
135+
fn test_items_split_into_multiple_bins() {
136+
let result = bin_pack(vec![60u64, 60, 60], 100, 10, |&x| x);
137+
// Each 60 can pair with at most one other 60 (120 > 100), so need at least 2 bins
138+
// With first-fit-decreasing: first 60 -> bin1, second 60 -> bin2, third 60 -> bin3
139+
// (none can combine since 60+60=120 > 100)
140+
assert_eq!(result.len(), 3);
141+
}
142+
143+
#[test]
144+
fn test_bin_packing_balances_load() {
145+
// 4 items: 50, 40, 30, 20 with target 70
146+
let result = bin_pack(vec![50u64, 40, 30, 20], 70, 10, |&x| x);
147+
// Sorted descending: 50, 40, 30, 20
148+
// 50 -> bin1(50), 40 -> bin2(40), 30 -> bin2(70), 20 -> bin1(70)
149+
assert_eq!(result.len(), 2);
150+
for bin in &result {
151+
let sum: u64 = bin.iter().sum();
152+
assert!(sum <= 70, "Bin weight {sum} exceeds target 70");
153+
}
154+
}
155+
156+
#[test]
157+
fn test_lookback_limits_open_bins() {
158+
// With lookback=1, only one bin is kept open at a time
159+
let result = bin_pack(vec![10u64, 10, 10, 10], 100, 1, |&x| x);
160+
// All items are same weight (10). With lookback=1:
161+
// item1(10)->bin1(10), item2(10)->bin1(20), item3(10)->bin1(30), item4(10)->bin1(40)
162+
// They all fit, so 1 bin
163+
assert_eq!(result.len(), 1);
164+
}
165+
166+
#[test]
167+
fn test_lookback_causes_suboptimal_packing() {
168+
// With lookback=1, a tight fit may be missed
169+
// Items: 80, 70, 30, 20 with target=100, lookback=1
170+
let result = bin_pack(vec![80u64, 70, 30, 20], 100, 1, |&x| x);
171+
// Sorted: 80, 70, 30, 20
172+
// 80->bin1(80). lookback=1, only bin1 open.
173+
// 70 doesn't fit in bin1(80+70=150>100). Evict bin1([80]). 70->bin2(70).
174+
// 30 fits in bin2(70+30=100). bin2(100).
175+
// 20 doesn't fit in bin2(100+20=120>100). Evict bin2([70,30]). 20->bin3(20).
176+
assert_eq!(result.len(), 3);
177+
}
178+
179+
#[test]
180+
fn test_custom_weight_function() {
181+
// Weight function that doubles the value
182+
let result = bin_pack(vec![30u64, 30, 30], 100, 10, |&x| x * 2);
183+
// Effective weights: 60, 60, 60
184+
// 60+60=120 > 100, so each in its own bin
185+
assert_eq!(result.len(), 3);
186+
}
187+
}

crates/iceberg/src/scan/context.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,11 @@ impl ManifestEntryContext {
140140
// TODO: Extract name_mapping from table metadata property "schema.name-mapping.default"
141141
name_mapping: None,
142142
case_sensitive: self.case_sensitive,
143+
split_offsets: self
144+
.manifest_entry
145+
.data_file()
146+
.split_offsets()
147+
.map(|s| s.to_vec()),
143148
})
144149
}
145150
}

0 commit comments

Comments
 (0)