Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public void close() {

protected void initNative() {
LOG.debug("initializing the native column reader");
DataType readType = (boolean) CometConf.COMET_SCHEMA_EVOLUTION_ENABLED().get() ? type : null;
DataType readType = CometConf.COMET_SCHEMA_EVOLUTION_ENABLED() ? type : null;
boolean useLegacyDateTimestampOrNTZ =
useLegacyDateTimestamp || type == TimestampNTZType$.MODULE$;
nativeHandle =
Expand Down
10 changes: 5 additions & 5 deletions common/src/main/java/org/apache/comet/parquet/TypeUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public static void checkParquetType(ColumnDescriptor descriptor, DataType sparkT
PrimitiveType.PrimitiveTypeName typeName = descriptor.getPrimitiveType().getPrimitiveTypeName();
LogicalTypeAnnotation logicalTypeAnnotation =
descriptor.getPrimitiveType().getLogicalTypeAnnotation();
boolean allowTypePromotion = (boolean) CometConf.COMET_SCHEMA_EVOLUTION_ENABLED().get();
boolean allowTypePromotion = CometConf.COMET_SCHEMA_EVOLUTION_ENABLED();

if (sparkType instanceof NullType) {
return;
Expand All @@ -150,8 +150,8 @@ && isUnsignedIntTypeMatched(logicalTypeAnnotation, 32)) {
// fallbacks. We read them as long values.
return;
} else if (sparkType == DataTypes.LongType && allowTypePromotion) {
// In Comet we allow schema evolution from int to long, if
// `spark.comet.schemaEvolution.enabled` is enabled.
// INT32 -> LONG widening is allowed when Comet's per-Spark-version
// type-promotion default permits it (Spark 4.x). See ShimCometConf.
return;
} else if (sparkType == DataTypes.ByteType || sparkType == DataTypes.ShortType) {
return;
Expand Down Expand Up @@ -198,8 +198,8 @@ && isUnsignedIntTypeMatched(logicalTypeAnnotation, 64)) {
break;
case FLOAT:
if (sparkType == DataTypes.FloatType) return;
// In Comet we allow schema evolution from float to double, if
// `spark.comet.schemaEvolution.enabled` is enabled.
// FLOAT -> DOUBLE widening is allowed when Comet's per-Spark-version
// type-promotion default permits it (Spark 4.x). See ShimCometConf.
if (sparkType == DataTypes.DoubleType && allowTypePromotion) return;
break;
case DOUBLE:
Expand Down
10 changes: 0 additions & 10 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -727,16 +727,6 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(true)

val COMET_SCHEMA_EVOLUTION_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.schemaEvolution.enabled")
.internal()
.category(CATEGORY_SCAN)
.doc("Whether to enable schema evolution in Comet. For instance, promoting a integer " +
"column to a long column, a float column to a double column, etc. This is automatically" +
"enabled when reading from Iceberg tables.")
.booleanConf
.createWithDefault(COMET_SCHEMA_EVOLUTION_ENABLED_DEFAULT)

val COMET_ENABLE_PARTIAL_HASH_AGGREGATE: ConfigEntry[Boolean] =
conf("spark.comet.testing.aggregate.partialMode.enabled")
.internal()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,12 @@
package org.apache.comet.shims

trait ShimCometConf {
protected val COMET_SCHEMA_EVOLUTION_ENABLED_DEFAULT = false

/**
* Whether Comet's Parquet scan paths allow widening type promotions (e.g. INT32 → INT64, FLOAT
* → DOUBLE). Spark 3.x's vectorized reader rejects these on read, so Comet matches by
* defaulting to false on 3.x. Reads from the deprecated `spark.comet.schemaEvolution.enabled`
* SQL conf were removed in favor of this per-version constant; see #4298.
*/
val COMET_SCHEMA_EVOLUTION_ENABLED: Boolean = false
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,12 @@
package org.apache.comet.shims

trait ShimCometConf {
protected val COMET_SCHEMA_EVOLUTION_ENABLED_DEFAULT = true

/**
* Whether Comet's Parquet scan paths allow widening type promotions (e.g. INT32 → INT64, FLOAT
* → DOUBLE, INT32 → DOUBLE). Spark 4.x's vectorized reader accepts these by default. Reads from
* the deprecated `spark.comet.schemaEvolution.enabled` SQL conf were removed in favor of this
* per-version constant; see #4298.
*/
val COMET_SCHEMA_EVOLUTION_ENABLED: Boolean = true
}
40 changes: 11 additions & 29 deletions dev/diffs/3.4.3.diff
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
diff --git a/pom.xml b/pom.xml
index d3544881af1..d075572c5b3 100644
index d3544881af1..1126f287096 100644
--- a/pom.xml
+++ b/pom.xml
@@ -148,6 +148,8 @@
Expand Down Expand Up @@ -1969,7 +1969,7 @@ index 07e2849ce6f..3e73645b638 100644
ParquetOutputFormat.WRITER_VERSION -> ParquetProperties.WriterVersion.PARQUET_2_0.toString
)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index 104b4e416cd..b8af360fa14 100644
index 104b4e416cd..4adb273170a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -38,6 +38,7 @@ import org.apache.parquet.schema.MessageType
Expand Down Expand Up @@ -2121,28 +2121,10 @@ index 104b4e416cd..b8af360fa14 100644
case _ =>
throw new AnalysisException("Can not match ParquetTable in the query.")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index 8670d95c65e..9411af57a26 100644
index 8670d95c65e..b624c3811dd 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -41,6 +41,7 @@ import org.apache.parquet.schema.{MessageType, MessageTypeParser}

import org.apache.spark.{SPARK_VERSION_SHORT, SparkException, TestUtils}
import org.apache.spark.sql._
+import org.apache.spark.sql.IgnoreCometNativeDataFusion
import org.apache.spark.sql.catalyst.{InternalRow, ScalaReflection}
import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeRow}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
@@ -1075,7 +1076,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
}
}

- test("SPARK-35640: int as long should throw schema incompatible error") {
+ test("SPARK-35640: int as long should throw schema incompatible error",
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) {
val data = (1 to 4).map(i => Tuple1(i))
val readSchema = StructType(Seq(StructField("_1", DataTypes.LongType)))

@@ -1335,7 +1337,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
@@ -1335,7 +1335,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
}
}

Expand All @@ -2153,7 +2135,7 @@ index 8670d95c65e..9411af57a26 100644
checkAnswer(
// "fruit" column in this file is encoded using DELTA_LENGTH_BYTE_ARRAY.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index 29cb224c878..ee5a87fa200 100644
index 29cb224c878..1f7a0ebf0bd 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -27,6 +27,7 @@ import org.apache.parquet.hadoop.ParquetOutputFormat
Expand Down Expand Up @@ -2309,7 +2291,7 @@ index 5c0b7def039..151184bc98c 100644
assert(fileSourceScanSchemata.size === expectedSchemaCatalogStrings.size,
s"Found ${fileSourceScanSchemata.size} file sources in dataframe, " +
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
index bf5c51b89bb..dc3aac281c3 100644
index bf5c51b89bb..7e143a0e0f9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
@@ -27,6 +27,7 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
Expand All @@ -2336,7 +2318,7 @@ index bf5c51b89bb..dc3aac281c3 100644

- test("schema mismatch failure error message for parquet vectorized reader") {
+ test("schema mismatch failure error message for parquet vectorized reader",
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) {
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4316")) {
withTempPath { dir =>
val e = testSchemaMismatch(dir.getCanonicalPath, vectorizedReaderEnabled = true)
assert(e.getCause.isInstanceOf[SparkException])
Expand Down Expand Up @@ -2882,7 +2864,7 @@ index abe606ad9c1..2d930b64cca 100644
val tblTargetName = "tbl_target"
val tblSourceQualified = s"default.$tblSourceName"
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
index dd55fcfe42c..99bc018008a 100644
index dd55fcfe42c..cd18a23d4de 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
@@ -27,6 +27,7 @@ import scala.concurrent.duration._
Expand Down Expand Up @@ -2948,7 +2930,7 @@ index dd55fcfe42c..99bc018008a 100644
protected override def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
SparkSession.setActiveSession(spark)
super.withSQLConf(pairs: _*)(f)
@@ -434,6 +487,8 @@ private[sql] trait SQLTestUtilsBase
@@ -434,6 +469,8 @@ private[sql] trait SQLTestUtilsBase
val schema = df.schema
val withoutFilters = df.queryExecution.executedPlan.transform {
case FilterExec(_, child) => child
Expand All @@ -2958,7 +2940,7 @@ index dd55fcfe42c..99bc018008a 100644

spark.internalCreateDataFrame(withoutFilters.execute(), schema)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
index ed2e309fa07..a5ea58146ad 100644
index ed2e309fa07..25b798d2c1c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
@@ -74,6 +74,20 @@ trait SharedSparkSessionBase
Expand Down Expand Up @@ -3071,7 +3053,7 @@ index a902cb3a69e..800a3acbe99 100644

test("SPARK-4963 DataFrame sample on mutable row return wrong result") {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
index 07361cfdce9..97dab2a3506 100644
index 07361cfdce9..4fdbcd18656 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -55,25 +55,41 @@ object TestHive
Expand Down
32 changes: 11 additions & 21 deletions dev/diffs/3.5.8.diff
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
diff --git a/pom.xml b/pom.xml
index edd2ad57880..d5273840330 100644
index edd2ad57880..15a0947abf4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -152,6 +152,8 @@
Expand Down Expand Up @@ -1958,7 +1958,7 @@ index 07e2849ce6f..3e73645b638 100644
ParquetOutputFormat.WRITER_VERSION -> ParquetProperties.WriterVersion.PARQUET_2_0.toString
)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index 8e88049f51e..20d7ef7b1bc 100644
index 8e88049f51e..097c518a19a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -1095,7 +1095,11 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
Expand Down Expand Up @@ -2104,20 +2104,10 @@ index 8e88049f51e..20d7ef7b1bc 100644
case _ =>
throw new AnalysisException("Can not match ParquetTable in the query.")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index 8ed9ef1630e..71e22972a47 100644
index 8ed9ef1630e..eed2a6f5ad5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -1075,7 +1075,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
}
}

- test("SPARK-35640: int as long should throw schema incompatible error") {
+ test("SPARK-35640: int as long should throw schema incompatible error",
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) {
val data = (1 to 4).map(i => Tuple1(i))
val readSchema = StructType(Seq(StructField("_1", DataTypes.LongType)))

@@ -1345,7 +1346,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
@@ -1345,7 +1345,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
}
}

Expand All @@ -2128,7 +2118,7 @@ index 8ed9ef1630e..71e22972a47 100644
checkAnswer(
// "fruit" column in this file is encoded using DELTA_LENGTH_BYTE_ARRAY.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index f6472ba3d9d..5ea2d938664 100644
index f6472ba3d9d..0d54d2f0410 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -185,7 +185,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
Expand Down Expand Up @@ -2276,7 +2266,7 @@ index 5c0b7def039..151184bc98c 100644
assert(fileSourceScanSchemata.size === expectedSchemaCatalogStrings.size,
s"Found ${fileSourceScanSchemata.size} file sources in dataframe, " +
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
index 3f47c5e506f..2ac0868407e 100644
index 3f47c5e506f..80b7ef6c46a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
@@ -27,6 +27,7 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
Expand All @@ -2303,7 +2293,7 @@ index 3f47c5e506f..2ac0868407e 100644

- test("schema mismatch failure error message for parquet vectorized reader") {
+ test("schema mismatch failure error message for parquet vectorized reader",
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) {
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4316")) {
withTempPath { dir =>
val e = testSchemaMismatch(dir.getCanonicalPath, vectorizedReaderEnabled = true)
assert(e.getCause.isInstanceOf[SparkException])
Expand Down Expand Up @@ -2834,7 +2824,7 @@ index abe606ad9c1..2d930b64cca 100644
val tblTargetName = "tbl_target"
val tblSourceQualified = s"default.$tblSourceName"
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
index e937173a590..7d20538bc68 100644
index e937173a590..3134078a122 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
@@ -27,6 +27,7 @@ import scala.concurrent.duration._
Expand Down Expand Up @@ -2900,7 +2890,7 @@ index e937173a590..7d20538bc68 100644
protected override def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
SparkSession.setActiveSession(spark)
super.withSQLConf(pairs: _*)(f)
@@ -435,6 +488,8 @@ private[sql] trait SQLTestUtilsBase
@@ -435,6 +470,8 @@ private[sql] trait SQLTestUtilsBase
val schema = df.schema
val withoutFilters = df.queryExecution.executedPlan.transform {
case FilterExec(_, child) => child
Expand All @@ -2910,7 +2900,7 @@ index e937173a590..7d20538bc68 100644

spark.internalCreateDataFrame(withoutFilters.execute(), schema)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
index ed2e309fa07..a5ea58146ad 100644
index ed2e309fa07..25b798d2c1c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
@@ -74,6 +74,20 @@ trait SharedSparkSessionBase
Expand Down Expand Up @@ -3023,7 +3013,7 @@ index 6160c3e5f6c..0956d7d9edc 100644

test("SPARK-4963 DataFrame sample on mutable row return wrong result") {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
index 1d646f40b3e..5babe505301 100644
index 1d646f40b3e..df108c17c42 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -53,25 +53,41 @@ object TestHive
Expand Down
Loading
Loading