Skip to content

Commit 4b0b352

Browse files
blackmwkclaude
andauthored
Split arrow reader into smaller modules (#2358)
## Which issue does this PR close? - Closes #2309 ## What changes are included in this PR? Split arrow reader module into smaller onces so that it would be easier to maintain. I didn't do any extra changes on purpose to make the pr easier to read. ## Are these changes tested? ut. --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent c82c42e commit 4b0b352

9 files changed

Lines changed: 5865 additions & 5583 deletions

File tree

crates/iceberg/src/arrow/reader.rs

Lines changed: 0 additions & 5583 deletions
This file was deleted.
Lines changed: 368 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,368 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Async Parquet file reader that adapts an Iceberg `FileRead` to parquet's `AsyncFileReader`.
19+
20+
use std::ops::Range;
21+
use std::sync::Arc;
22+
23+
use bytes::Bytes;
24+
use futures::future::BoxFuture;
25+
use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
26+
use parquet::arrow::arrow_reader::ArrowReaderOptions;
27+
use parquet::arrow::async_reader::AsyncFileReader;
28+
use parquet::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader};
29+
30+
use super::ParquetReadOptions;
31+
use crate::io::{FileMetadata, FileRead};
32+
33+
/// ArrowFileReader is a wrapper around a FileRead that impls parquets AsyncFileReader.
34+
pub struct ArrowFileReader {
35+
meta: FileMetadata,
36+
parquet_read_options: ParquetReadOptions,
37+
r: Box<dyn FileRead>,
38+
}
39+
40+
impl ArrowFileReader {
41+
/// Create a new ArrowFileReader
42+
pub fn new(meta: FileMetadata, r: Box<dyn FileRead>) -> Self {
43+
Self {
44+
meta,
45+
parquet_read_options: ParquetReadOptions::builder().build(),
46+
r,
47+
}
48+
}
49+
50+
/// Configure all Parquet read options.
51+
pub(crate) fn with_parquet_read_options(mut self, options: ParquetReadOptions) -> Self {
52+
self.parquet_read_options = options;
53+
self
54+
}
55+
}
56+
57+
impl AsyncFileReader for ArrowFileReader {
58+
fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
59+
Box::pin(
60+
self.r
61+
.read(range.start..range.end)
62+
.map_err(|err| parquet::errors::ParquetError::External(Box::new(err))),
63+
)
64+
}
65+
66+
/// Override the default `get_byte_ranges` which calls `get_bytes` sequentially.
67+
/// The parquet reader calls this to fetch column chunks for a row group, so
68+
/// without this override each column chunk is a serial round-trip to object storage.
69+
/// Adapted from object_store's `coalesce_ranges` in `util.rs`.
70+
fn get_byte_ranges(
71+
&mut self,
72+
ranges: Vec<Range<u64>>,
73+
) -> BoxFuture<'_, parquet::errors::Result<Vec<Bytes>>> {
74+
let coalesce_bytes = self.parquet_read_options.range_coalesce_bytes();
75+
let concurrency = self.parquet_read_options.range_fetch_concurrency().max(1);
76+
77+
async move {
78+
// Merge nearby ranges to reduce the number of object store requests.
79+
let fetch_ranges = merge_ranges(&ranges, coalesce_bytes);
80+
let r = &self.r;
81+
82+
// Fetch merged ranges concurrently.
83+
let fetched: Vec<Bytes> = futures::stream::iter(fetch_ranges.iter().cloned())
84+
.map(|range| async move {
85+
r.read(range)
86+
.await
87+
.map_err(|e| parquet::errors::ParquetError::External(Box::new(e)))
88+
})
89+
.buffered(concurrency)
90+
.try_collect()
91+
.await?;
92+
93+
// Slice the fetched data back into the originally requested ranges.
94+
Ok(ranges
95+
.iter()
96+
.map(|range| {
97+
let idx = fetch_ranges.partition_point(|v| v.start <= range.start) - 1;
98+
let fetch_range = &fetch_ranges[idx];
99+
let fetch_bytes = &fetched[idx];
100+
let start = (range.start - fetch_range.start) as usize;
101+
let end = (range.end - fetch_range.start) as usize;
102+
fetch_bytes.slice(start..end.min(fetch_bytes.len()))
103+
})
104+
.collect())
105+
}
106+
.boxed()
107+
}
108+
109+
// TODO: currently we don't respect `ArrowReaderOptions` cause it don't expose any method to access the option field
110+
// we will fix it after `v55.1.0` is released in https://github.com/apache/arrow-rs/issues/7393
111+
fn get_metadata(
112+
&mut self,
113+
_options: Option<&'_ ArrowReaderOptions>,
114+
) -> BoxFuture<'_, parquet::errors::Result<Arc<ParquetMetaData>>> {
115+
async move {
116+
let reader = ParquetMetaDataReader::new()
117+
.with_prefetch_hint(self.parquet_read_options.metadata_size_hint())
118+
// Set the page policy first because it updates both column and offset policies.
119+
.with_page_index_policy(PageIndexPolicy::from(
120+
self.parquet_read_options.preload_page_index(),
121+
))
122+
.with_column_index_policy(PageIndexPolicy::from(
123+
self.parquet_read_options.preload_column_index(),
124+
))
125+
.with_offset_index_policy(PageIndexPolicy::from(
126+
self.parquet_read_options.preload_offset_index(),
127+
));
128+
let size = self.meta.size;
129+
let meta = reader.load_and_finish(self, size).await?;
130+
131+
Ok(Arc::new(meta))
132+
}
133+
.boxed()
134+
}
135+
}
136+
137+
/// Merge overlapping or nearby byte ranges, combining ranges with gaps <= `coalesce` bytes.
138+
/// Adapted from object_store's `merge_ranges` in `util.rs`.
139+
fn merge_ranges(ranges: &[Range<u64>], coalesce: u64) -> Vec<Range<u64>> {
140+
if ranges.is_empty() {
141+
return vec![];
142+
}
143+
144+
let mut ranges = ranges.to_vec();
145+
ranges.sort_unstable_by_key(|r| r.start);
146+
147+
let mut merged = Vec::with_capacity(ranges.len());
148+
let mut start_idx = 0;
149+
let mut end_idx = 1;
150+
151+
while start_idx != ranges.len() {
152+
let mut range_end = ranges[start_idx].end;
153+
154+
while end_idx != ranges.len()
155+
&& ranges[end_idx]
156+
.start
157+
.checked_sub(range_end)
158+
.map(|delta| delta <= coalesce)
159+
.unwrap_or(true)
160+
{
161+
range_end = range_end.max(ranges[end_idx].end);
162+
end_idx += 1;
163+
}
164+
165+
merged.push(ranges[start_idx].start..range_end);
166+
start_idx = end_idx;
167+
end_idx += 1;
168+
}
169+
170+
merged
171+
}
172+
173+
#[cfg(test)]
174+
mod tests {
175+
use std::ops::Range;
176+
177+
use parquet::arrow::async_reader::AsyncFileReader;
178+
179+
use super::{ArrowFileReader, ParquetReadOptions, merge_ranges};
180+
use crate::io::{FileMetadata, FileRead};
181+
182+
#[test]
183+
fn test_merge_ranges_empty() {
184+
assert_eq!(merge_ranges(&[], 1024), Vec::<Range<u64>>::new());
185+
}
186+
187+
#[test]
188+
fn test_merge_ranges_no_coalesce() {
189+
// Ranges far apart should not be merged
190+
let ranges = vec![0..100, 1_000_000..1_000_100];
191+
let merged = merge_ranges(&ranges, 1024);
192+
assert_eq!(merged, vec![0..100, 1_000_000..1_000_100]);
193+
}
194+
195+
#[test]
196+
fn test_merge_ranges_coalesce() {
197+
// Ranges within the gap threshold should be merged
198+
let ranges = vec![0..100, 200..300, 500..600];
199+
let merged = merge_ranges(&ranges, 1024);
200+
assert_eq!(merged, vec![0..600]);
201+
}
202+
203+
#[test]
204+
fn test_merge_ranges_overlapping() {
205+
let ranges = vec![0..200, 100..300];
206+
let merged = merge_ranges(&ranges, 0);
207+
assert_eq!(merged, vec![0..300]);
208+
}
209+
210+
#[test]
211+
fn test_merge_ranges_unsorted() {
212+
let ranges = vec![500..600, 0..100, 200..300];
213+
let merged = merge_ranges(&ranges, 1024);
214+
assert_eq!(merged, vec![0..600]);
215+
}
216+
217+
/// Mock FileRead backed by a flat byte buffer.
218+
struct MockFileRead {
219+
data: bytes::Bytes,
220+
}
221+
222+
impl MockFileRead {
223+
fn new(size: usize) -> Self {
224+
// Fill with sequential byte values so slices are verifiable.
225+
let data: Vec<u8> = (0..size).map(|i| (i % 256) as u8).collect();
226+
Self {
227+
data: bytes::Bytes::from(data),
228+
}
229+
}
230+
}
231+
232+
#[async_trait::async_trait]
233+
impl FileRead for MockFileRead {
234+
async fn read(&self, range: Range<u64>) -> crate::Result<bytes::Bytes> {
235+
Ok(self.data.slice(range.start as usize..range.end as usize))
236+
}
237+
}
238+
239+
#[tokio::test]
240+
async fn test_get_byte_ranges_no_coalesce() {
241+
let mock = MockFileRead::new(2048);
242+
let expected_0 = mock.data.slice(0..100);
243+
let expected_1 = mock.data.slice(1500..1600);
244+
245+
let mut reader = ArrowFileReader::new(FileMetadata { size: 2048 }, Box::new(mock))
246+
.with_parquet_read_options(
247+
ParquetReadOptions::builder()
248+
.with_range_coalesce_bytes(0)
249+
.build(),
250+
);
251+
252+
let result = reader
253+
.get_byte_ranges(vec![0..100, 1500..1600])
254+
.await
255+
.unwrap();
256+
257+
assert_eq!(result.len(), 2);
258+
assert_eq!(result[0], expected_0);
259+
assert_eq!(result[1], expected_1);
260+
}
261+
262+
#[tokio::test]
263+
async fn test_get_byte_ranges_with_coalesce() {
264+
let mock = MockFileRead::new(1024);
265+
let expected_0 = mock.data.slice(0..100);
266+
let expected_1 = mock.data.slice(200..300);
267+
let expected_2 = mock.data.slice(500..600);
268+
269+
let mut reader = ArrowFileReader::new(FileMetadata { size: 1024 }, Box::new(mock))
270+
.with_parquet_read_options(
271+
ParquetReadOptions::builder()
272+
.with_range_coalesce_bytes(1024)
273+
.build(),
274+
);
275+
276+
// All ranges within coalesce threshold — should merge into one fetch.
277+
let result = reader
278+
.get_byte_ranges(vec![0..100, 200..300, 500..600])
279+
.await
280+
.unwrap();
281+
282+
assert_eq!(result.len(), 3);
283+
assert_eq!(result[0], expected_0);
284+
assert_eq!(result[1], expected_1);
285+
assert_eq!(result[2], expected_2);
286+
}
287+
288+
#[tokio::test]
289+
async fn test_get_byte_ranges_empty() {
290+
let mock = MockFileRead::new(1024);
291+
let mut reader = ArrowFileReader::new(FileMetadata { size: 1024 }, Box::new(mock));
292+
293+
let result = reader.get_byte_ranges(vec![]).await.unwrap();
294+
assert!(result.is_empty());
295+
}
296+
297+
#[tokio::test]
298+
async fn test_get_byte_ranges_coalesce_max() {
299+
let mock = MockFileRead::new(2048);
300+
let expected_0 = mock.data.slice(0..100);
301+
let expected_1 = mock.data.slice(1500..1600);
302+
303+
let mut reader = ArrowFileReader::new(FileMetadata { size: 2048 }, Box::new(mock))
304+
.with_parquet_read_options(
305+
ParquetReadOptions::builder()
306+
.with_range_coalesce_bytes(u64::MAX)
307+
.build(),
308+
);
309+
310+
// u64::MAX coalesce — all ranges merge into a single fetch.
311+
let result = reader
312+
.get_byte_ranges(vec![0..100, 1500..1600])
313+
.await
314+
.unwrap();
315+
316+
assert_eq!(result.len(), 2);
317+
assert_eq!(result[0], expected_0);
318+
assert_eq!(result[1], expected_1);
319+
}
320+
321+
#[tokio::test]
322+
async fn test_get_byte_ranges_concurrency_zero() {
323+
// concurrency=0 is clamped to 1, so this should not hang.
324+
let mock = MockFileRead::new(1024);
325+
let expected = mock.data.slice(0..100);
326+
327+
let mut reader = ArrowFileReader::new(FileMetadata { size: 1024 }, Box::new(mock))
328+
.with_parquet_read_options(
329+
ParquetReadOptions::builder()
330+
.with_range_fetch_concurrency(0)
331+
.build(),
332+
);
333+
334+
let result = reader
335+
.get_byte_ranges(vec![0..100, 200..300])
336+
.await
337+
.unwrap();
338+
assert_eq!(result.len(), 2);
339+
assert_eq!(result[0], expected);
340+
}
341+
342+
#[tokio::test]
343+
async fn test_get_byte_ranges_concurrency_one() {
344+
let mock = MockFileRead::new(2048);
345+
let expected_0 = mock.data.slice(0..100);
346+
let expected_1 = mock.data.slice(500..600);
347+
let expected_2 = mock.data.slice(1500..1600);
348+
349+
let mut reader = ArrowFileReader::new(FileMetadata { size: 2048 }, Box::new(mock))
350+
.with_parquet_read_options(
351+
ParquetReadOptions::builder()
352+
.with_range_coalesce_bytes(0)
353+
.with_range_fetch_concurrency(1)
354+
.build(),
355+
);
356+
357+
// concurrency=1 with no coalescing — sequential fetches.
358+
let result = reader
359+
.get_byte_ranges(vec![0..100, 500..600, 1500..1600])
360+
.await
361+
.unwrap();
362+
363+
assert_eq!(result.len(), 3);
364+
assert_eq!(result[0], expected_0);
365+
assert_eq!(result[1], expected_1);
366+
assert_eq!(result[2], expected_2);
367+
}
368+
}

0 commit comments

Comments
 (0)