Skip to content

Commit 1f87e13

Browse files
committed
fix(datafusion): format
1 parent 70bc487 commit 1f87e13

2 files changed

Lines changed: 17 additions & 50 deletions

File tree

crates/integrations/datafusion/src/table/bucketing.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -178,9 +178,7 @@ fn literal_to_array(lit: &Literal, dt: &DataType) -> Option<ArrayRef> {
178178
(PrimitiveLiteral::Int(v), DataType::Date32) => Arc::new(Date32Array::from(vec![*v])),
179179
(PrimitiveLiteral::Long(v), DataType::Int64) => Arc::new(Int64Array::from(vec![*v])),
180180
(PrimitiveLiteral::Float(v), DataType::Float32) => Arc::new(Float32Array::from(vec![v.0])),
181-
(PrimitiveLiteral::Double(v), DataType::Float64) => {
182-
Arc::new(Float64Array::from(vec![v.0]))
183-
}
181+
(PrimitiveLiteral::Double(v), DataType::Float64) => Arc::new(Float64Array::from(vec![v.0])),
184182
(PrimitiveLiteral::String(v), DataType::Utf8) => {
185183
Arc::new(StringArray::from(vec![v.as_str()]))
186184
}

crates/integrations/datafusion/src/table/mod.rs

Lines changed: 16 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,8 @@ use datafusion::logical_expr::dml::InsertOp;
4343
use datafusion::logical_expr::{Expr, TableProviderFilterPushDown};
4444
use datafusion::physical_expr::PhysicalExpr;
4545
use datafusion::physical_expr::expressions::Column;
46-
use datafusion::physical_plan::ExecutionPlan;
47-
use datafusion::physical_plan::Partitioning;
4846
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
47+
use datafusion::physical_plan::{ExecutionPlan, Partitioning};
4948
use futures::TryStreamExt;
5049
use iceberg::arrow::schema_to_arrow_schema;
5150
use iceberg::inspect::MetadataTableType;
@@ -937,8 +936,8 @@ mod tests {
937936

938937
// ── Bucketed scan tests ──────────────────────────────────────────────────
939938

940-
async fn make_catalog_and_table_for_bucketing(
941-
) -> (Arc<dyn Catalog>, NamespaceIdent, String, tempfile::TempDir) {
939+
async fn make_catalog_and_table_for_bucketing()
940+
-> (Arc<dyn Catalog>, NamespaceIdent, String, tempfile::TempDir) {
942941
use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder};
943942
use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
944943
use iceberg::{CatalogBuilder, TableCreation};
@@ -1054,10 +1053,7 @@ mod tests {
10541053
.scan(&ctx_with_target_partitions(8).state(), None, &[], None)
10551054
.await
10561055
.unwrap();
1057-
let scan = plan
1058-
.as_any()
1059-
.downcast_ref::<IcebergTableScan>()
1060-
.unwrap();
1056+
let scan = plan.as_any().downcast_ref::<IcebergTableScan>().unwrap();
10611057

10621058
assert_eq!(scan.buckets().len(), 1);
10631059
assert_eq!(scan.buckets()[0].len(), 0);
@@ -1083,10 +1079,7 @@ mod tests {
10831079
.scan(&ctx_with_target_partitions(3).state(), None, &[], None)
10841080
.await
10851081
.unwrap();
1086-
let scan = plan
1087-
.as_any()
1088-
.downcast_ref::<IcebergTableScan>()
1089-
.unwrap();
1082+
let scan = plan.as_any().downcast_ref::<IcebergTableScan>().unwrap();
10901083

10911084
let total_files: usize = scan.buckets().iter().map(|b| b.len()).sum();
10921085
assert_eq!(total_files, 5);
@@ -1113,10 +1106,7 @@ mod tests {
11131106
.scan(&ctx_with_target_partitions(16).state(), None, &[], None)
11141107
.await
11151108
.unwrap();
1116-
let scan = plan
1117-
.as_any()
1118-
.downcast_ref::<IcebergTableScan>()
1119-
.unwrap();
1109+
let scan = plan.as_any().downcast_ref::<IcebergTableScan>().unwrap();
11201110

11211111
assert_eq!(scan.buckets().len(), 2);
11221112
}
@@ -1136,17 +1126,14 @@ mod tests {
11361126
.scan(&ctx_with_target_partitions(1).state(), None, &[], None)
11371127
.await
11381128
.unwrap();
1139-
let scan = plan
1140-
.as_any()
1141-
.downcast_ref::<IcebergTableScan>()
1142-
.unwrap();
1129+
let scan = plan.as_any().downcast_ref::<IcebergTableScan>().unwrap();
11431130

11441131
assert_eq!(scan.buckets().len(), 1);
11451132
assert_eq!(scan.buckets()[0].len(), 4);
11461133
}
11471134

1148-
async fn make_partitioned_catalog_and_table_for_bucketing(
1149-
) -> (Arc<dyn Catalog>, NamespaceIdent, String, tempfile::TempDir) {
1135+
async fn make_partitioned_catalog_and_table_for_bucketing()
1136+
-> (Arc<dyn Catalog>, NamespaceIdent, String, tempfile::TempDir) {
11501137
use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder};
11511138
use iceberg::spec::{
11521139
NestedField, PrimitiveType, Schema, Transform, Type, UnboundPartitionSpec,
@@ -1215,9 +1202,7 @@ mod tests {
12151202
table_name: &str,
12161203
partition_values: Vec<&str>,
12171204
) {
1218-
use iceberg::spec::{
1219-
DataContentType, DataFileBuilder, DataFileFormat, Literal, Struct,
1220-
};
1205+
use iceberg::spec::{DataContentType, DataFileBuilder, DataFileFormat, Literal, Struct};
12211206
use iceberg::transaction::{ApplyTransactionAction, Transaction};
12221207

12231208
let table = catalog
@@ -1264,12 +1249,9 @@ mod tests {
12641249

12651250
let (catalog, namespace, table_name, _temp_dir) =
12661251
make_partitioned_catalog_and_table_for_bucketing().await;
1267-
append_partitioned_fake_data_files(
1268-
&catalog,
1269-
&namespace,
1270-
&table_name,
1271-
vec!["a", "b", "c", "a", "b", "c"],
1272-
)
1252+
append_partitioned_fake_data_files(&catalog, &namespace, &table_name, vec![
1253+
"a", "b", "c", "a", "b", "c",
1254+
])
12731255
.await;
12741256

12751257
let provider = IcebergTableProvider::try_new(catalog, namespace, table_name)
@@ -1279,10 +1261,7 @@ mod tests {
12791261
.scan(&ctx_with_target_partitions(3).state(), None, &[], None)
12801262
.await
12811263
.unwrap();
1282-
let scan = plan
1283-
.as_any()
1284-
.downcast_ref::<IcebergTableScan>()
1285-
.unwrap();
1264+
let scan = plan.as_any().downcast_ref::<IcebergTableScan>().unwrap();
12861265

12871266
let total_files: usize = scan.buckets().iter().map(|b| b.len()).sum();
12881267
assert_eq!(total_files, 6);
@@ -1309,13 +1288,7 @@ mod tests {
13091288

13101289
let (catalog, namespace, table_name, _temp_dir) =
13111290
make_partitioned_catalog_and_table_for_bucketing().await;
1312-
append_partitioned_fake_data_files(
1313-
&catalog,
1314-
&namespace,
1315-
&table_name,
1316-
vec!["a", "b"],
1317-
)
1318-
.await;
1291+
append_partitioned_fake_data_files(&catalog, &namespace, &table_name, vec!["a", "b"]).await;
13191292

13201293
let provider = IcebergTableProvider::try_new(catalog, namespace, table_name)
13211294
.await
@@ -1331,15 +1304,11 @@ mod tests {
13311304
)
13321305
.await
13331306
.unwrap();
1334-
let scan = plan
1335-
.as_any()
1336-
.downcast_ref::<IcebergTableScan>()
1337-
.unwrap();
1307+
let scan = plan.as_any().downcast_ref::<IcebergTableScan>().unwrap();
13381308

13391309
assert!(matches!(
13401310
scan.properties().partitioning,
13411311
Partitioning::UnknownPartitioning(_)
13421312
));
13431313
}
1344-
13451314
}

0 commit comments

Comments
 (0)