Skip to content

Commit d8bd5bc

Browse files
viiryaclaude
andcommitted
feat(datafusion): Add Binary scalar value conversion for predicate pushdown
Add support for converting Binary and LargeBinary DataFusion ScalarValue types to Iceberg Datum, enabling binary predicates to be pushed down to the Iceberg storage layer. This conversion allows SQL queries with binary hex literals (X'...') to push predicates down to Iceberg, improving query performance by filtering data at the storage level rather than in DataFusion. The integration test verifies that binary predicates are successfully pushed down end-to-end: - Without conversion: predicate stays in FilterExec with predicate:[] - With conversion: predicate pushed to IcebergTableScan Other scalar types (Boolean, Timestamp, Decimal) were investigated but excluded because they are not reachable through practical usage: - Boolean: DataFusion aggressively optimizes comparisons (e.g., x=true becomes just x) before reaching the converter - Timestamp/Decimal: SQL literals are converted to strings/other types before reaching the converter Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
1 parent 76cdf28 commit d8bd5bc

2 files changed

Lines changed: 78 additions & 0 deletions

File tree

crates/integrations/datafusion/src/physical_plan/expr_to_predicate.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,8 @@ fn scalar_value_to_datum(value: &ScalarValue) -> Option<Datum> {
212212
ScalarValue::Float64(Some(v)) => Some(Datum::double(*v)),
213213
ScalarValue::Utf8(Some(v)) => Some(Datum::string(v.clone())),
214214
ScalarValue::LargeUtf8(Some(v)) => Some(Datum::string(v.clone())),
215+
ScalarValue::Binary(Some(v)) => Some(Datum::binary(v.clone())),
216+
ScalarValue::LargeBinary(Some(v)) => Some(Datum::binary(v.clone())),
215217
ScalarValue::Date32(Some(v)) => Some(Datum::date(*v)),
216218
ScalarValue::Date64(Some(v)) => Some(Datum::date((*v / MILLIS_PER_DAY) as i32)),
217219
_ => None,
@@ -429,4 +431,31 @@ mod tests {
429431
let predicate = convert_to_iceberg_predicate(sql);
430432
assert_eq!(predicate, None);
431433
}
434+
435+
#[test]
436+
fn test_scalar_value_to_datum_binary() {
437+
use datafusion::common::ScalarValue;
438+
439+
let bytes = vec![1u8, 2u8, 3u8];
440+
let datum = super::scalar_value_to_datum(&ScalarValue::Binary(Some(bytes.clone())));
441+
assert_eq!(datum, Some(Datum::binary(bytes.clone())));
442+
443+
let datum = super::scalar_value_to_datum(&ScalarValue::LargeBinary(Some(bytes.clone())));
444+
assert_eq!(datum, Some(Datum::binary(bytes)));
445+
446+
let datum = super::scalar_value_to_datum(&ScalarValue::Binary(None));
447+
assert_eq!(datum, None);
448+
}
449+
450+
#[test]
451+
fn test_predicate_conversion_with_binary() {
452+
let sql = "foo = 1 and bar = X'0102'";
453+
let predicate = convert_to_iceberg_predicate(sql).unwrap();
454+
// Binary literals are converted to Datum::binary
455+
// Note: SQL literal 1 is converted to Long by DataFusion
456+
let expected_predicate = Reference::new("foo")
457+
.equal_to(Datum::long(1))
458+
.and(Reference::new("bar").equal_to(Datum::binary(vec![1u8, 2u8])));
459+
assert_eq!(predicate, expected_predicate);
460+
}
432461
}

crates/integrations/datafusion/tests/integration_datafusion_test.rs

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -807,6 +807,55 @@ async fn test_insert_into_nested() -> Result<()> {
807807
Ok(())
808808
}
809809

810+
#[tokio::test]
811+
async fn test_binary_predicate_pushdown() -> Result<()> {
812+
let iceberg_catalog = get_iceberg_catalog().await;
813+
let namespace = NamespaceIdent::new("ns".to_string());
814+
set_test_namespace(&iceberg_catalog, &namespace).await?;
815+
816+
// Create a schema with binary type
817+
let schema = Schema::builder()
818+
.with_schema_id(0)
819+
.with_fields(vec![
820+
NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
821+
NestedField::optional(2, "data", Type::Primitive(PrimitiveType::Binary)).into(),
822+
])
823+
.build()?;
824+
let creation = get_table_creation(temp_path(), "binary_table", Some(schema))?;
825+
iceberg_catalog.create_table(&namespace, creation).await?;
826+
827+
let client = Arc::new(iceberg_catalog);
828+
let catalog = Arc::new(IcebergCatalogProvider::try_new(client).await?);
829+
830+
let ctx = SessionContext::new();
831+
ctx.register_catalog("catalog", catalog);
832+
833+
// Test binary predicate pushdown using X'...' syntax for binary literals
834+
let records = ctx
835+
.sql("EXPLAIN select * from catalog.ns.binary_table where data = X'0102'")
836+
.await
837+
.unwrap()
838+
.collect()
839+
.await
840+
.unwrap();
841+
assert_eq!(1, records.len());
842+
let plan = records[0]
843+
.column(1)
844+
.as_any()
845+
.downcast_ref::<StringArray>()
846+
.unwrap();
847+
let plan_str = plan.value(1);
848+
849+
// The key validation: binary predicate should be pushed down to IcebergTableScan
850+
// Without binary Datum conversion, this would stay in a FilterExec
851+
assert!(
852+
plan_str.contains("predicate:[data = ") || plan_str.contains("predicate:[(data = "),
853+
"Binary predicate should be pushed down. Plan: {plan_str}"
854+
);
855+
856+
Ok(())
857+
}
858+
810859
#[tokio::test]
811860
async fn test_insert_into_partitioned() -> Result<()> {
812861
let iceberg_catalog = get_iceberg_catalog().await;

0 commit comments

Comments
 (0)