Summary
Increase the throughput of validate_duplicate_files by parallelizing requests to manifest files.
Context
In newer versions of the library, we make one sequential for each manifest file when checking for duplicates. For cases where the latest snapshot has a lot of manifest files, this makes the check very slow. In past versions of the library, fetching was done in a batched manner (see attached code snippets)
let mut manifest_stream = self
.snapshot_produce_action
.tx
.table
.inspect()
.manifests()
.scan()
.await?;
let mut referenced_files = Vec::new();
while let Some(batch) = manifest_stream.try_next().await? {
let file_path_array = batch
.column(1)
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| {
Error::new(
ErrorKind::DataInvalid,
"Failed to downcast file_path column to StringArray",
)
})?;
for i in 0..batch.num_rows() {
let file_path = file_path_array.value(i);
if new_files.contains(file_path) {
referenced_files.push(file_path.to_string());
}
}
}
let manifest_list = current_snapshot
.load_manifest_list(self.table.file_io(), &self.table.metadata_ref())
.await?;
for manifest_list_entry in manifest_list.entries() {
let manifest = manifest_list_entry
.load_manifest(self.table.file_io())
.await?;
for entry in manifest.entries() {
let file_path = entry.file_path();
if new_files.contains(file_path) && entry.is_alive() {
referenced_files.push(file_path.to_string());
}
}
}
Summary
Increase the throughput of
validate_duplicate_filesby parallelizing requests to manifest files.Context
In newer versions of the library, we make one sequential for each manifest file when checking for duplicates. For cases where the latest snapshot has a lot of manifest files, this makes the check very slow. In past versions of the library, fetching was done in a batched manner (see attached code snippets)