Skip to content

fix: complete native_datafusion Parquet schema-mismatch rejections#4229

Merged
andygrove merged 29 commits into
apache:mainfrom
andygrove:native-df-type-promotion-validation
May 17, 2026
Merged

fix: complete native_datafusion Parquet schema-mismatch rejections#4229
andygrove merged 29 commits into
apache:mainfrom
andygrove:native-df-type-promotion-validation

Conversation

@andygrove
Copy link
Copy Markdown
Member

@andygrove andygrove commented May 5, 2026

Which issues does this PR close?

Closes #3720 — umbrella for native_datafusion silent acceptance of incompatible Parquet reads, now fully split into specific child issues.
Closes #4297 — primitive numeric / date / timestamp conversions Spark rejects (long → int, double → float, float|double → int*, int|long → float, int → timestamp, long → date|timestamp, date → timestamp(LTZ), timestamp → date).
Closes #4343 — decimal-to-decimal precision/scale narrowing (scaleIncrease < 0 OR precisionIncrease < scaleIncrease).
Closes #4344 — integer-to-decimal narrowing (INT32 source needs precision − scale ≥ 10, INT64 source ≥ 20).
Closes #4351 — plain BINARY (no DecimalLogicalTypeAnnotation) read as DecimalType.
Closes #4298 — deprecation of spark.comet.schemaEvolution.enabled.

Not closed:

Rationale for this change

Under spark.comet.scan.impl=native_datafusion, several Spark SQL tests that expect SchemaColumnConvertNotSupportedException on incompatible Parquet reads were passing silently — DataFusion's reader was coercing mismatched numeric / decimal / binary types instead of erroring, producing wrong answers (silent overflow on narrowing, silent precision loss on widening, raw-byte reinterpretation for int → timestamp, etc.). This PR adds the rejections Spark's vectorized reader performs in ParquetVectorUpdaterFactory.getUpdater, formatted to match Spark's _LEGACY_ERROR_TEMP_2063 (3.x) / FAILED_READ_FILE.PARQUET_COLUMN_DATA_TYPE_MISMATCH (4.x) error params byte-for-byte.

What changes are included in this PR?

Schema adapter (native/core/src/parquet/schema_adapter.rs)

  • New RejectOnNonEmpty PhysicalExpr that returns an empty array for batches with zero rows and raises SparkError::ParquetSchemaConvert otherwise. Used to defer rejection to evaluation time so files with no row groups (e.g. empty DataFrame.write) pass silently — mirrors Spark's per-row-group getUpdater check (SPARK-26709).
  • Plan-time rejections in replace_with_spark_cast:
    • BINARY/STRING source → non-string/binary target (#4088 / #4351): rejects all non-string/binary target types, including Decimal128/256 (Arrow already exposes decimal-annotated BINARY as Decimal128, so observing physical Binary here unambiguously means a non-decimal source).
    • Decimal-to-decimal narrowing (#4343): rejects dst_scale < src_scale OR dst_precision − dst_scale < src_precision − src_scale.
    • Integer-to-decimal narrowing (#4344): rejects when the requested decimal cannot represent the source integer type's range.
  • Runtime-deferred rejections via RejectOnNonEmpty:
    • Spark-3.x type-promotion gating (INT32 → INT64, FLOAT → DOUBLE, INT32 → DOUBLE) when allow_type_promotion is false.
    • Primitive numeric / date / timestamp conversions Spark rejects on every supported version (#4297).
  • Same BINARY → non-string/binary rejection added to the wrap_all_type_mismatches fallback path so the rejection fires whether the default adapter constructs a CastColumnExpr (typical) or fails (which happens for Binary → Decimal128 because DataFusion has no built-in cast for that pair).
  • parquet_primitive_name and spark_catalog_name produce Spark-compatible names (INT32, INT64, FIXED_LEN_BYTE_ARRAY, decimal(p,s), timestamp_ntz, …) for error params.

Configuration

  • Deprecate the public spark.comet.schemaEvolution.enabled conf in favor of a per-Spark-version constant in ShimCometConf (false on 3.x, true on 4.x) — Spark 3.x's vectorized reader rejects these widenings unconditionally, Spark 4.x always accepts them.

Spark-SQL test diffs (dev/diffs/{3.4.3,3.5.8,4.0.2,4.1.1}.diff)

The remaining IgnoreCometNativeDataFusion annotations now point at specific issues — #3720 has zero references in dev/diffs/. Final state per issue:

Issue 3.4.3 3.5.8 4.0.2 4.1.1
#4316 (missing file path) 1 1 1 1
#4352 (parquet-mr permissive) 6 6
#4354 (3.x cause-chain wrapping) 2 2
#4219 (LTZ → NTZ Spark 3.x) 1 1
"cannot be pushed down" (no issue) 1
#3720 0 0 0 0

Specifically:

  • SPARK-34212 Parquet should read decimals correctly is unignored on 4.0.2 and 4.1.1 (passes thanks to the #4351 schema-adapter fix). Stays ignored against #4354 on 3.x where the shim's extra cause-chain layer makes the strict intercept(...).getCause.isInstanceOf assertion fail.
  • The five ParquetTypeWideningSuite test groups in 4.1.1.diff that commit 80836b18d had unignored prematurely are re-ignored against #4352, along with the matching tests in 4.0.2.diff. The schema-adapter rejection is correct on the vectorized=true branch; these tests assert parquet-mr's permissive behavior on the vectorized=false branch, which Comet doesn't replicate (no parquet-mr fallback).
  • The parquet decimal type change Decimal(5, 2) → Decimal(3, 2) overflows with parquet-mr test in 4.0.2.diff and 4.1.1.diff is ignored against #4352 (had been unannotated, would fail otherwise).

Documentation (docs/source/user-guide/latest/compatibility/scans.md)

New Schema Mismatch Handling subsection in the parquet scan compat guide. States explicitly that these gaps apply only when the requested read schema differs from the file schema (explicit user schema or schema-evolution / partitioned reads), not to plain spark.read.parquet(path). Notes per-version differences and lists the only remaining user-visible gap (#4316 — missing file path in error). Mentions across-Spark-version differences (Spark 3.x's schemaEvolution.enabled-gated widenings vs 4.0+ unconditional acceptance, TimestampLTZ → TimestampNTZ).

How are these changes tested?

Rust unit tests in schema_adapter.rs — 18 new tests covering each rejection class plus the empty-file pass-through:

  • parquet_empty_file_disallowed_widening, parquet_non_empty_file_disallowed_widening_errors
  • parquet_int_read_as_string_errors, parquet_string_read_as_int_errors
  • parquet_binary_read_as_decimal_errors (regression for #4351)
  • parquet_int32_read_as_narrow_decimal_errors, parquet_int64_read_as_narrow_decimal_errors, parquet_int32_read_as_wide_decimal_succeeds, parquet_int32_read_as_decimal_with_scale_errors
  • parquet_decimal_precision_narrowing_errors, parquet_decimal_int_precision_narrowing_errors, parquet_decimal_scale_widening_without_precision_errors, parquet_decimal_widening_succeeds
  • parquet_long_read_as_int_errors, parquet_long_read_as_double_errors, parquet_double_read_as_float_errors, parquet_float_read_as_long_errors, parquet_double_read_as_long_errors, parquet_int_read_as_float_errors
  • parquet_int_read_as_timestamp_ntz_errors, parquet_long_read_as_date_errors, parquet_date_read_as_ltz_timestamp_errors, parquet_timestamp_read_as_date_errors

JVM regression test in ParquetReadSuite.scala:

  • native_datafusion rejects BINARY (no decimal annotation) read as DecimalType — mirrors the BINARY iteration of SPARK-34212. Walks the cause chain because Spark 3.x produces an extra SparkException layer (#4354); verified locally on Spark 3.4.3, 3.5.8, 4.0.2, and 4.1.1.

Spark SQL CI — the affected Spark-SQL tests run under CI with the regenerated diffs. The previously-failing 37 tests in 4.1.1.diff (CI logs from 80836b1) are now either unignored and passing (SPARK-34212) or re-ignored under #4352 with a clear architectural reason.

andygrove and others added 5 commits May 5, 2026 12:52
When COMET_SCHEMA_EVOLUTION_ENABLED is false, the native_datafusion scan
path now rejects reading Parquet INT32 as INT64, FLOAT as DOUBLE, and
INT32 as DOUBLE — matching the existing validation in native_iceberg_compat.

The allow_type_promotion flag is passed from JVM via protobuf and checked
in replace_with_spark_cast() before allowing widening casts.

Closes apache#3720

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Format the SchemaColumnConvertNotSupportedException message produced by
the type-promotion check so it matches Spark's vectorized reader output:
column rendered as [name], expected as Spark catalog string (bigint),
found as Parquet primitive name (INT32). This lets the SPARK-35640 and
"row group skipping doesn't overflow" tests pass, and updates 3.4.3.diff
to remove their IgnoreCometNativeDataFusion tags.

The TimestampLTZ to TimestampNTZ case (SPARK-36182) and decimal
precision/scale case (SPARK-34212) remain ignored, tracked under apache#4219
and apache#3720 respectively. Also reverts the cfg(test) gate on
parquet/util/test_common so the parquet_read benchmark builds.
Run the 3720-tagged tests in dev/diffs/3.5.8.diff, 4.0.2.diff, and
4.1.1.diff against patched Spark trees with the type-promotion fix
applied, then drop the IgnoreCometNativeDataFusion tag for tests that
now pass and keep it on tests that still fail.

3.5.8: Drop tags for SPARK-35640 (int as long) and "row group skipping
doesn't overflow", repoint SPARK-36182 at issue apache#4219. Same scope as
3.4.3, since the test source matches.

4.0.2 and 4.1.1: Drop tags for SPARK-47447 (TimestampLTZ as TimestampNTZ)
and "row group skipping". 4.1.1 also drops the tag for SPARK-45604
(timestamp_ntz to array<timestamp_ntz>). Tests for SPARK-35640 (binary as
timestamp), SPARK-34212 (decimal precision/scale), the schema-mismatch
vectorized-reader test, and the parameterized ParquetTypeWideningSuite
cases (unsupported parquet conversion, unsupported parquet timestamp
conversion, parquet decimal precision change, parquet decimal precision
and scale change) still fail and remain ignored under apache#3720.
The test reads a partitioned dataset where one partition is an
empty parquet file written with INT32 schema and the other has 10
rows of INT64. Spark's vectorized reader silently skips the type
check for the empty file because no row groups are scanned. The
native_datafusion adapter rejects the INT32 to INT64 promotion at
plan time regardless of file row count, so the test now fails
when allow_type_promotion is false (Spark 3.x default).

Tag the test with IgnoreCometNativeDataFusion under the existing
3720 umbrella in 3.4.3.diff and 3.5.8.diff. Spark 4.x defaults
allow_type_promotion to true so its diffs are unaffected.
@andygrove andygrove marked this pull request as ready for review May 5, 2026 23:04
@andygrove andygrove added this to the 0.16.0 milestone May 6, 2026
@andygrove andygrove moved this to In progress in Comet Development May 6, 2026
@mbutrovich
Copy link
Copy Markdown
Contributor

Nice reduction in ignored tests. One concern on scope.

The three-case match in replace_with_spark_cast exactly mirrors the three Comet-specific permissive branches in Comet's TypeUtil.checkParquetType (INT32→Long and FLOAT→Double gated on allowTypePromotion, INT32→Double gated on isSpark40Plus), so native_datafusion and native_iceberg_compat now line up for those three. Good.

But native_datafusion also silently accepts a bunch of conversions that Spark's vectorized reader rejects on every supported version (Spark's ParquetVectorUpdaterFactory.getUpdater falls through to constructConvertNotSupportedException). From Spark's ParquetTypeWideningSuite expectError = true list on 4.0.2: Long→Int, Double→Float, Float→Long, Long→Double, Int→Float, Int→TimestampType, Date→TimestampType. None of these is gated by the new flag, and none of them throws today.

I ran a probe against this PR on Spark 3.5 with COMET_NATIVE_SCAN_IMPL=native_datafusion, sweeping spark.comet.schemaEvolution.enabled on and off. Results:

Case Written Spark ref behavior schemaEvolution=false schemaEvolution=true
int→long [1, 2, 3] throws (3.x) / ok (4.0) throws [1, 2, 3]
float→double [1.0, 2.0, 3.0] throws (3.x) / ok (4.0) throws [1.0, 2.0, 3.0]
int→double [1, 2, 3] throws (3.x) / ok (4.0) throws [1.0, 2.0, 3.0]
long→int (narrowing) [1, 2, 3, 2147483652] throws [1, 2, 3, -2147483644] [1, 2, 3, -2147483644]
double→float (narrowing) [1.5, 2.5, 1e40] throws [1.5, 2.5, Infinity] [1.5, 2.5, Infinity]
float→long [1.5, 2.5] throws [1, 2] (truncated) [1, 2] (truncated)
long→double [1, 2, 2^54+1] throws [1.0, 2.0, 1.8014398509481984E16] (lost +1) same
int→float [1, 2, 2^25+1] throws [1.0, 2.0, 3.3554432E7] (lost +1) same
int→timestamp [1, 2, 3] throws [1969-12-31 16:00:01 … 03] (PST, int-as-seconds) same
double→long [1.0, 2.0, 3.0] throws [1, 2, 3] [1, 2, 3]

The top three rows are what the PR fixes and look right under both settings. The bottom seven are wrong-answer paths under both settings: silent overflow on narrowing, silent precision loss on widening Spark doesn't allow, silent raw-int-as-epoch-seconds reinterpretation for int→timestamp. These are the same class of gap #3720 enumerates for STRING→INT and decimal precision narrowing, just for primitive-to-primitive conversions.

Not asking you to fix all of them in this PR. But I think the framing in the commit message and code comment (mirrors TypeUtil.checkParquetType) undersells the remaining surface. Two options worth considering:

  1. Invert the check to an allowlist of Spark-supported (physical, target) pairs (essentially mirror the accept cases in Spark's ParquetVectorUpdaterFactory.getUpdater per Spark version), so anything else raises ParquetSchemaConvert. This closes the whole category.
  2. Land this as-is and file a followup issue tracking the seven cases above, linking this probe so behavior is captured.

Either is fine by me. I'd lean toward (2) to keep this PR scoped.


Probe used (slimmed, put under spark/src/test/scala/org/apache/comet/parquet/, runs with ./mvnw test -Pspark-3.5 -Dtest=none -Dsuites=org.apache.comet.parquet.TypePromotionProbeSuite -Dscalastyle.skip=true):

package org.apache.comet.parquet

import scala.util.Try
import org.apache.spark.sql.{CometTestBase, DataFrame}
import org.apache.spark.sql.internal.SQLConf
import org.apache.comet.CometConf

class TypePromotionProbeSuite extends CometTestBase {
  import testImplicits._

  private def probe(label: String)(body: => Any): Unit = {
    val result = Try(body)
    // scalastyle:off println
    println(s"[PROBE] $label -> ${result match {
        case scala.util.Success(v) => s"OK value=$v"
        case scala.util.Failure(e) => s"THROW ${e.getClass.getSimpleName}"
      }}")
    // scalastyle:on println
  }

  private def runAll(ev: Boolean): Unit = withSQLConf(
    CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_DATAFUSION,
    CometConf.COMET_SCHEMA_EVOLUTION_ENABLED.key -> ev.toString,
    SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") {
    def run(label: String, df: DataFrame, writeType: String, readAs: String): Unit =
      probe(s"$label (ev=$ev)") {
        withTempPath { dir =>
          df.selectExpr(s"cast(c as $writeType) as c").write.parquet(dir.getCanonicalPath)
          spark.read.schema(s"c $readAs").parquet(dir.getCanonicalPath)
            .collect().map(_.get(0)).toSeq
        }
      }
    run("int->long",              Seq(1, 2, 3).toDF("c"),                              "int",    "bigint")
    run("float->double",          Seq(1.0f, 2.0f, 3.0f).toDF("c"),                     "float",  "double")
    run("int->double",            Seq(1, 2, 3).toDF("c"),                              "int",    "double")
    run("long->int narrowing",    Seq(1L, 2L, 3L, Int.MaxValue.toLong + 5L).toDF("c"), "bigint", "int")
    run("double->float narrowing",Seq(1.5, 2.5, 1e40).toDF("c"),                       "double", "float")
    run("float->long",            Seq(1.5f, 2.5f).toDF("c"),                           "float",  "bigint")
    run("long->double",           Seq(1L, 2L, (1L << 54) + 1L).toDF("c"),              "bigint", "double")
    run("int->float",             Seq(1, 2, (1 << 25) + 1).toDF("c"),                  "int",    "float")
    run("int->timestamp",         Seq(1, 2, 3).toDF("c"),                              "int",    "timestamp")
    run("double->long",           Seq(1.0, 2.0, 3.0).toDF("c"),                        "double", "bigint")
  }

  test("probe ev=false") { runAll(ev = false) }
  test("probe ev=true")  { runAll(ev = true) }
}

@mbutrovich
Copy link
Copy Markdown
Contributor

I guess the bigger question to me becomes: why do we have spark.comet.schemaEvolution.enabled config anymore? Maybe we should deprecate that first and help us simplify the story. I think it's legacy from when Comet's Parquet decoder could be called from Iceberg, which has different schema evolution semantics.

@andygrove
Copy link
Copy Markdown
Member Author

Thanks for the probe — that table makes the remaining surface concrete.

Going with option (2): filed #4297 to track the seven unconditionally-rejected conversions, with your probe table and code copied over so the behavior is captured. Tightened the code comment and PR description here to be explicit that this PR only closes the three schemaEvolution-gated widenings, not the broader category.

On the bigger question about deprecating spark.comet.schemaEvolution.enabled — filed #4298 for that investigation. It looks like the per-version defaults (false on 3.x, true on 4.x via ShimCometConf) already encode what Spark itself does, so if there are no remaining callers flipping it, hardcoding it into the shim seems reasonable.

andygrove added 3 commits May 12, 2026 08:09
…ion-validation

# Conflicts:
#	dev/diffs/3.4.3.diff
#	dev/diffs/3.5.8.diff
#	native/core/src/parquet/parquet_support.rs
#	native/proto/src/proto/operator.proto
#	spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala
…-Spark-version default

Removes the public `spark.comet.schemaEvolution.enabled` ConfigEntry and reads
type-promotion permissiveness directly from the per-Spark-version constant in
`ShimCometConf` (false on 3.x, true on 4.x). Mirrors what Spark's vectorized
reader does without requiring a user-tunable knob that historically existed only
for the now-dead Java Iceberg-Comet integration.

- Promote `COMET_SCHEMA_EVOLUTION_ENABLED` to a public `val` in `ShimCometConf`
- Drop the `internal()` ConfigEntry from `CometConf`
- Swap Java callers in `AbstractColumnReader` and `TypeUtil` to read the constant
- Swap `CometNativeScan` to set the proto field from the constant
- Rewrite `ParquetReadSuite` tests that flipped the conf:
  - `schema evolution`: drop the parametrization, branch on the constant
  - `type widening` and `read byte, int, short, long together`: gate with
    `assume(...COMET_SCHEMA_EVOLUTION_ENABLED)` since they only apply on 4.x

Closes apache#4298.

Also regenerates `dev/diffs/3.4.3.diff` and `dev/diffs/3.5.8.diff` to include the
SPARK-26709 IgnoreCometNativeDataFusion tag alongside main's SPARK-33084 tag
(both branches added separate hunks to SQLQuerySuite; merge needed a re-diff).
@andygrove
Copy link
Copy Markdown
Member Author

@mbutrovich I removed COMET_SCHEMA_EVOLUTION_ENABLED. I did not remove the references from the Iceberg diffs yet, but there is no harm in them being there.

andygrove added 3 commits May 13, 2026 07:32
…ion-validation

# Conflicts:
#	dev/diffs/4.0.2.diff
#	dev/diffs/4.1.1.diff
…-26709

The plan-time check in `replace_with_spark_cast` rejects the three widenings
(INT32->INT64, FLOAT->DOUBLE, INT32->DOUBLE) regardless of file row count.
Spark's vectorized reader only invokes `ParquetVectorUpdaterFactory.getUpdater`
while decoding a row group, so a Parquet file with no row groups (e.g. written
from an empty DataFrame) passes silently. SPARK-26709's mixed-partition case
hit this: one partition is an empty INT32 file, another has 10 rows of INT64.

Replace the eager `return Err(...)` with a `RejectOnNonEmpty` PhysicalExpr that
returns an empty array of the target type when the input batch has 0 rows and
raises `ParquetSchemaConvert` otherwise. The JVM shim converts the error to
`SchemaColumnConvertNotSupportedException` with the same Spark-compatible
column-name and type formatting.

Drops the `IgnoreCometNativeDataFusion` tag for SPARK-26709 in 3.4.3.diff and
3.5.8.diff (both diffs regenerated from clean Spark trees).
Match Spark's `_LEGACY_ERROR_TEMP_2063` exactly for the two BINARY-related
rejection paths in `replace_with_spark_cast`:

- Existing BINARY -> non-string/binary/decimal rejection: format column as
  `[name]`, emit Parquet primitive names (`BINARY`) and Spark catalog names
  (`int`, `timestamp`, ...) instead of Arrow datatype debug names.
- New non-BINARY -> string/binary rejection: Spark's vectorized reader has no
  `int -> string` or `long -> string` updater in `ParquetVectorUpdaterFactory`,
  so reject these to match (previously we silently produced strings via
  `spark_expr::Cast`).

Extends `parquet_primitive_name` / `spark_catalog_name` with Utf8, Binary,
Date32, and Timestamp entries needed by the new error format.

Un-ignores tests now passing:
- `schema mismatch failure error message for parquet vectorized reader`
  (all four diffs, tests both directions)
- `SPARK-35640: read binary as timestamp should throw schema incompatible error`
  (4.0.2, 4.1.1)
- `SPARK-35640: int as long should throw schema incompatible error` (3.4.3,
  3.5.8) was already enabled by the existing type-promotion rejection but the
  upmerge regen had re-added the ignore tag; drop it here.

Replaces the existing `parquet_roundtrip_int_as_string` test (which was
asserting silent wrong-answer behavior) with `parquet_int_read_as_string_errors`
plus a companion `parquet_string_read_as_int_errors`.
@andygrove andygrove changed the title fix: reject disallowed type promotions in native_datafusion scan fix: align native_datafusion Parquet schema checks with Spark's vectorized reader May 13, 2026
@andygrove
Copy link
Copy Markdown
Member Author

I iterated more on this PR and it now unignores more tests and no longer adds any new ignores - let's see if CI passes

@andygrove andygrove marked this pull request as draft May 13, 2026 15:05
@andygrove
Copy link
Copy Markdown
Member Author

@mbutrovich this is ready for another look

andygrove added 2 commits May 15, 2026 04:24
…ion gaps

- Tests pointing at apache#3720 in 4.1.1.diff now reference apache#4297 (primitive
  narrowing), apache#4343 (decimal-to-decimal narrowing), or apache#4344 (integer-to-
  decimal narrowing).
- Replace the high-level scan-compat note with three concrete entries.
…, apache#4344)

Extend the native_datafusion schema adapter to mirror the rejections in
`ParquetVectorUpdaterFactory.getUpdater` / `isDecimalTypeMatched`:

- Integer -> decimal where the requested decimal cannot represent the
  source integer type's range (apache#4344). INT32 sources require
  `precision - scale >= 10`, INT64 sources `>= 20`.
- Decimal -> decimal where `scaleIncrease < 0` OR
  `precisionIncrease < scaleIncrease` (apache#4343). Generalises the prior
  scale-only narrowing check to also reject precision narrowing and
  scale widening that overflows the integer side.
- Primitive numeric / date / timestamp conversions Spark rejects on
  every supported version (apache#4297): `long -> int`, `double -> float`,
  `float|double -> int*`, `int|long -> float`, `int -> timestamp`,
  `long -> date|timestamp`, `date -> timestamp(LTZ)`, and
  `timestamp -> date`. Deferred to runtime via `RejectOnNonEmpty` so
  empty Parquet files pass through (SPARK-26709).

`spark_catalog_name` now returns String so it can format
`decimal(p,s)` per Spark's `catalogString()`. `parquet_primitive_name`
gains decimal / date / timestamp arms so error messages report the
underlying Parquet primitive (INT32/INT64/FIXED_LEN_BYTE_ARRAY)
instead of UNKNOWN.

Coverage: 18 new Rust unit tests across the three rejection classes,
plus expanded JVM regression tests in ParquetReadSuite. The four
Spark SQL tests pointing at apache#4343/apache#4344/apache#4297 in dev/diffs/4.1.1.diff
are unignored (no umbrella-apache#3720 references remain). The compat doc
collapses the type-promotion bullet, leaving only the
TimestampLTZ-as-TimestampNTZ pre-Spark-4.0 caveat.
@andygrove
Copy link
Copy Markdown
Member Author

I pushed more changes and attempted to unignore all 4.1.1 tests (except for the one that depends on the error message containing the file name). If CI passes I will go ahead and unignore tests in the other diffs.

…pache#4351, apache#4352)

Two follow-ups to 80836b1, which unignored four `ParquetTypeWideningSuite`
test groups in `dev/diffs/4.1.1.diff` and surfaced a remaining schema-adapter
gap on top.

Schema adapter (apache#4351): a Parquet BINARY column without a
`DecimalLogicalTypeAnnotation` was silently allowed through when the requested
schema was `DecimalType`. The cast then fell through and Arrow's
`RecordBatch::try_new` raised a generic `column types must match schema types`
error as `CometNativeException` instead of the Spark-equivalent
`SchemaColumnConvertNotSupportedException`. Drop `Decimal128/Decimal256` from
the BINARY/STRING allowed-target match in `replace_with_spark_cast`, and add
the same rejection in `wrap_all_type_mismatches` so the fallback path doesn't
construct a `CometCastColumnExpr` for a cast it can't actually perform. New
unit test `parquet_binary_read_as_decimal_errors`. With this fix, Spark's
SPARK-34212 in `ParquetQuerySuite` passes without an ignore tag.

Test diffs (apache#4352): re-add `IgnoreCometNativeDataFusion` to the five test
groups in `dev/diffs/4.1.1.diff`'s `ParquetTypeWideningSuite` whose failure
isn't a rejection-logic bug but a parquet-mr-only behavior assertion
(`expectError = vectorized` paths and the explicit `overflows with parquet-mr`
test). Same annotation added to the previously-unannotated overflow test in
`dev/diffs/4.0.2.diff`. The schema-adapter rejection is correct; Comet just
has no parquet-mr-equivalent backend that produces silent overflow on the
non-vectorized path.

Docs: new `Schema Mismatch Handling` subsection in the parquet scan compat
guide. Explains that these gaps only apply to explicit schemas or schema
evolution (not plain `spark.read.parquet(path)`), notes cross-version
differences, and lists the one remaining gap (apache#4316: missing file path in
`ParquetSchemaConvert` errors).
@andygrove andygrove marked this pull request as draft May 16, 2026 01:11
andygrove added 3 commits May 15, 2026 19:12
Mirrors the BINARY -> DECIMAL(...) iteration in Spark's `SPARK-34212 Parquet
should read decimals correctly`. Walks the exception cause chain rather than
using `intercept[SparkException].getCause` directly because Spark 3.x's task
error handling wraps the shim's `SparkException` once more on the way back to
the driver, producing a two-level chain (`SparkException -> SparkException ->
SchemaColumnConvertNotSupportedException`); Spark 4.0+ produces a one-level
chain (`SparkException -> SchemaColumnConvertNotSupportedException`).

Validated locally against Spark 3.4.3, 3.5.8, 4.0.2, and 4.1.1. The strict
`intercept(...).getCause.isInstanceOf` check Spark's own SPARK-34212 uses
matches Comet on 4.x but not on 3.x, so SPARK-34212 itself can be unignored
on 4.0.2 / 4.1.1 but should stay ignored on 3.4.3 / 3.5.8 until the 3.x shim
no longer requires the extra wrapping.
…ic issues

Migrate the remaining `IgnoreCometNativeDataFusion("…apache/issues/3720")` references:

- `dev/diffs/4.0.2.diff`:
  - Unignore `SPARK-34212 Parquet should read decimals correctly` in
    `ParquetQuerySuite`. The schema-adapter rejection from apache#4351 covers all
    three iterations (INT32/INT64/BINARY -> DECIMAL); the cause chain on
    Spark 4.x is single-layer so the test's strict
    `intercept(...).getCause.isInstanceOf[SchemaColumnConvertNotSupportedException]`
    assertion is satisfied.
  - Move the five `ParquetTypeWideningSuite` test groups from apache#3720 to apache#4352
    (parquet-mr permissive-overflow umbrella).
- `dev/diffs/3.4.3.diff` and `dev/diffs/3.5.8.diff`:
  - Re-tag `SPARK-34212` and `row group skipping doesn't overflow when reading
    into larger type` from apache#3720 to apache#4354 (Spark-3.x cause-chain wrapping).
    These tests will start passing once the 3.x shim no longer adds the extra
    `SparkException` layer; the schema-adapter rejection itself is correct.

After this change `apache#3720` has zero references in `dev/diffs/` and can be
closed as superseded by apache#4297, apache#4343, apache#4344, apache#4351, apache#4352, apache#4354.

Validated locally with the new `native_datafusion rejects BINARY (no decimal
annotation) read as DecimalType` test (added in the previous commit) on Spark
3.4.3, 3.5.8, 4.0.2, and 4.1.1.
@andygrove andygrove changed the title fix: align native_datafusion Parquet schema checks with Spark's vectorized reader fix: complete native_datafusion Parquet schema-mismatch rejections May 16, 2026
@andygrove andygrove marked this pull request as ready for review May 16, 2026 02:37
Copy link
Copy Markdown
Contributor

@mbutrovich mbutrovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for sticking with this one @andygrove. Another round of feedback. In general, it feels like the diff could be trimmed dramatically with a simplify skill from Claude and asking it to reduce wordy comments. There are a lot of "what" not "why" comments and fluffy wording.

child: Arc<dyn PhysicalExpr>,
target_field: FieldRef,
column: String,
physical_type: String,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

parquet_primitive_name returns &'static str (line 266), but the field stores String and every construction site (690, 725, 768, 801, 871) calls .to_string() on the static. Any reason not to make the field &'static str and skip the allocation? On the same struct, would column and spark_type work as Arc<str> so with_new_children (line 1123) does not have to clone three strings each time?

DataType::Utf8 | DataType::LargeUtf8 | DataType::Binary | DataType::LargeBinary
)
{
let rejection: Arc<dyn PhysicalExpr> = Arc::new(RejectOnNonEmpty {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All three sites build the same five-field struct literal with the same column: format!("[{}]", cast.input_field().name()) shape and the same parquet_primitive_name(...).to_string() and spark_catalog_name(...) calls. Would a small constructor like

fn make_rejection(child: Arc<dyn PhysicalExpr>, cast: &CastColumnExpr,
                  physical: &DataType, target: &DataType) -> Arc<dyn PhysicalExpr>

pull the duplication out? The four SparkError::ParquetSchemaConvert literals at 574, 648, 722, 765 repeat the same shape and could share a sibling helper.

column: cast.input_field().name().to_string(),
physical_type: physical_type.to_string(),
spark_type: target_type.to_string(),
column: format!("[{}]", cast.input_field().name()),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seven call sites do format!("[{}]", cast.input_field().name()). Is there a reason the bracket framing has to live at the call site rather than inside SparkError::ParquetSchemaConvert's Display impl (or a constructor on the variant)?

/// producing nulls (mirrors `spark.sql.parquet.fieldId.read.ignoreMissing`).
pub ignore_missing_field_id: bool,
/// Whether type promotion (schema evolution) is allowed, e.g. INT32 -> INT64,
/// FLOAT -> DOUBLE. Mirrors spark.comet.schemaEvolution.enabled.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spark.comet.schemaEvolution.enabled was removed in this PR. Should this comment point at the per-version ShimCometConf.COMET_SCHEMA_EVOLUTION_ENABLED constant instead?

DataType::Int64,
DataType::Int8 | DataType::Int16 | DataType::Int32,
)
// Long -> floating point.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A lot of these inline comments say the same thing as the match arm directly below them, e.g. // Long -> narrower int. above (DataType::Int64, DataType::Int8 | DataType::Int16 | DataType::Int32). The arms read fine on their own. The ones that carry actual context (the IntegerToDoubleUpdater note at 844, the "raw INT32; DATE-annotated columns surface as Date32" parenthetical at 849, the SPARK-26709 references) seem worth keeping. Would dropping the others make this block easier to read?

let filename = get_temp_filename();
let filename = filename.as_path().as_os_str().to_str().unwrap().to_string();
let file = File::create(&filename)?;
let writer = ArrowWriter::try_new(file, Arc::clone(&file_schema), None)?;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both tests build ArrowWriter and FileScanConfigBuilder and expr_adapter_factory from scratch, even though roundtrip at schema_adapter.rs:815 already does that setup. Could threading an expect_empty: bool (or factoring the shared setup out) let both tests go through roundtrip and drop the duplicated ~30 lines?

@andygrove
Copy link
Copy Markdown
Member Author

CI is green as of ac0e989

andygrove added 4 commits May 16, 2026 07:09
…pache#4352)

Comet's native_datafusion scan rejects Parquet-to-Spark conversions that
Spark's vectorized reader rejects, but Spark's parquet-mr (non-vectorized)
path silently overflows / nulls. Disabling PARQUET_VECTORIZED_READER_ENABLED
opts into parquet-mr semantics that Comet has no equivalent for, so fall
back to Spark in that case. Re-enables the affected ParquetTypeWideningSuite
tests in 4.0.2 and 4.1.1 diffs.
@andygrove
Copy link
Copy Markdown
Member Author

@kazuyukitanimura @parthchandra @comphead @mbutrovich This PR is now ready for review. It unignores almost all of the Spark SQL tests that were previously ignored for native_datafision. The only real issue left is #4354 which I will look at in a follow on PR.

Copy link
Copy Markdown
Contributor

@mbutrovich mbutrovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @andygrove!

@mbutrovich
Copy link
Copy Markdown
Contributor

2026-05-16T14:47:16.2261622Z [info] - unsupported parquet conversion ByteType -> DecimalType(1,0) *** FAILED *** (142 milliseconds)
2026-05-16T14:47:16.2263947Z [info]   org.apache.spark.SparkException: [FAILED_READ_FILE.PARQUET_COLUMN_DATA_TYPE_MISMATCH] Encountered error while reading file . Data type mismatches when reading Parquet column [a]. Expected Spark type decimal(1,0), actual Parquet type INT32. SQLSTATE: KD001
2026-05-16T14:47:16.2269764Z [info]   at org.apache.spark.sql.errors.QueryExecutionErrors$.parquetColumnDataTypeMismatchError(QueryExecutionErrors.scala:889)
2026-05-16T14:47:16.2278111Z [info]   at org.apache.spark.sql.comet.shims.ShimSparkErrorConverter.convertErrorType(ShimSparkErrorConverter.scala:342)
2026-05-16T14:47:16.2279782Z [info]   at org.apache.spark.sql.comet.shims.ShimSparkErrorConverter.convertErrorType$(ShimSparkErrorConverter.scala:80)
2026-05-16T14:47:16.2281492Z [info]   at org.apache.comet.SparkErrorConverter$.convertErrorType(SparkErrorConverter.scala:43)
2026-05-16T14:47:16.2282782Z [info]   at org.apache.comet.SparkErrorConverter$.convertToSparkException(SparkErrorConverter.scala:108)
2026-05-16T14:47:16.2284025Z [info]   at org.apache.comet.CometExecIterator.getNextBatch(CometExecIterator.scala:172)
2026-05-16T14:47:16.2285126Z [info]   at org.apache.comet.CometExecIterator.hasNext(CometExecIterator.scala:215)
2026-05-16T14:47:16.2285975Z [info]   at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:611)
2026-05-16T14:47:16.2286779Z [info]   at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:593)
2026-05-16T14:47:16.2287514Z [info]   at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:593)
2026-05-16T14:47:16.2288235Z [info]   at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1820)
2026-05-16T14:47:16.2288988Z [info]   at org.apache.spark.rdd.RDD.$anonfun$count$1(RDD.scala:1304)
2026-05-16T14:47:16.2289736Z [info]   at org.apache.spark.rdd.RDD.$anonfun$count$1$adapted(RDD.scala:1304)
2026-05-16T14:47:16.2292094Z [info]   at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2536)

:(

@andygrove
Copy link
Copy Markdown
Member Author

andygrove commented May 16, 2026

2026-05-16T14:47:16.2261622Z [info] - unsupported parquet conversion ByteType -> DecimalType(1,0) *** FAILED *** (142 milliseconds)
2026-05-16T14:47:16.2263947Z [info]   org.apache.spark.SparkException: [FAILED_READ_FILE.PARQUET_COLUMN_DATA_TYPE_MISMATCH] Encountered error while reading file . Data type mismatches when reading Parquet column [a]. Expected Spark type decimal(1,0), actual Parquet type INT32. SQLSTATE: KD001
2026-05-16T14:47:16.2269764Z [info]   at org.apache.spark.sql.errors.QueryExecutionErrors$.parquetColumnDataTypeMismatchError(QueryExecutionErrors.scala:889)
2026-05-16T14:47:16.2278111Z [info]   at org.apache.spark.sql.comet.shims.ShimSparkErrorConverter.convertErrorType(ShimSparkErrorConverter.scala:342)
2026-05-16T14:47:16.2279782Z [info]   at org.apache.spark.sql.comet.shims.ShimSparkErrorConverter.convertErrorType$(ShimSparkErrorConverter.scala:80)
2026-05-16T14:47:16.2281492Z [info]   at org.apache.comet.SparkErrorConverter$.convertErrorType(SparkErrorConverter.scala:43)
2026-05-16T14:47:16.2282782Z [info]   at org.apache.comet.SparkErrorConverter$.convertToSparkException(SparkErrorConverter.scala:108)
2026-05-16T14:47:16.2284025Z [info]   at org.apache.comet.CometExecIterator.getNextBatch(CometExecIterator.scala:172)
2026-05-16T14:47:16.2285126Z [info]   at org.apache.comet.CometExecIterator.hasNext(CometExecIterator.scala:215)
2026-05-16T14:47:16.2285975Z [info]   at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:611)
2026-05-16T14:47:16.2286779Z [info]   at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:593)
2026-05-16T14:47:16.2287514Z [info]   at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:593)
2026-05-16T14:47:16.2288235Z [info]   at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1820)
2026-05-16T14:47:16.2288988Z [info]   at org.apache.spark.rdd.RDD.$anonfun$count$1(RDD.scala:1304)
2026-05-16T14:47:16.2289736Z [info]   at org.apache.spark.rdd.RDD.$anonfun$count$1$adapted(RDD.scala:1304)
2026-05-16T14:47:16.2292094Z [info]   at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2536)

:(

My bad. I had tried one more fix then failed to revert it correctly (forgot to re-ignore the tests).

Commit 2ddece0 reverted the parquet-mr fallback added in apache#4352 but did
not restore the IgnoreCometNativeDataFusion annotations that apache#4352 had
removed. Without the fallback, those tests run against Comet's native
scan and fail because Comet rejects conversions that parquet-mr would
silently overflow/null.
@andygrove andygrove merged commit dc08a96 into apache:main May 17, 2026
125 checks passed
@github-project-automation github-project-automation Bot moved this from In progress to Done in Comet Development May 17, 2026
@andygrove andygrove deleted the native-df-type-promotion-validation branch May 17, 2026 02:26
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment