This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 2f64b60b9 chore: Run Spark SQL tests with `native_datafusion` in CI
(#3393)
2f64b60b9 is described below
commit 2f64b60b926c1f3183ec59acafe77f23629391c7
Author: Andy Grove <[email protected]>
AuthorDate: Thu Feb 5 15:03:05 2026 -0700
chore: Run Spark SQL tests with `native_datafusion` in CI (#3393)
---
.github/workflows/spark_sql_test.yml | 12 +-
dev/diffs/3.5.8.diff | 668 ++++++++++++++++++---
.../org/apache/comet/rules/CometScanRule.scala | 10 +-
3 files changed, 590 insertions(+), 100 deletions(-)
diff --git a/.github/workflows/spark_sql_test.yml
b/.github/workflows/spark_sql_test.yml
index 5d35f3aee..610baa9f2 100644
--- a/.github/workflows/spark_sql_test.yml
+++ b/.github/workflows/spark_sql_test.yml
@@ -120,13 +120,13 @@ jobs:
# - auto scan: all Spark versions (3.4, 3.5, 4.0)
# - native_iceberg_compat: Spark 3.5 only
config:
- - {spark-short: '3.4', spark-full: '3.4.3', java: 11, scan-impl:
'auto', scan-env: ''}
- - {spark-short: '3.5', spark-full: '3.5.8', java: 11, scan-impl:
'auto', scan-env: ''}
- - {spark-short: '3.5', spark-full: '3.5.8', java: 11, scan-impl:
'native_iceberg_compat', scan-env:
'COMET_PARQUET_SCAN_IMPL=native_iceberg_compat'}
- - {spark-short: '4.0', spark-full: '4.0.1', java: 17, scan-impl:
'auto', scan-env: ''}
+ - {spark-short: '3.4', spark-full: '3.4.3', java: 11, scan-impl:
'auto'}
+ - {spark-short: '3.5', spark-full: '3.5.8', java: 11, scan-impl:
'auto'}
+ - {spark-short: '3.5', spark-full: '3.5.8', java: 11, scan-impl:
'native_datafusion'}
+ - {spark-short: '4.0', spark-full: '4.0.1', java: 17, scan-impl:
'auto'}
# Skip sql_hive-1 for Spark 4.0 due to
https://github.com/apache/datafusion-comet/issues/2946
exclude:
- - config: {spark-short: '4.0', spark-full: '4.0.1', java: 17,
scan-impl: 'auto', scan-env: ''}
+ - config: {spark-short: '4.0', spark-full: '4.0.1', java: 17,
scan-impl: 'auto'}
module: {name: "sql_hive-1", args1: "", args2: "hive/testOnly * --
-l org.apache.spark.tags.ExtendedHiveTest -l
org.apache.spark.tags.SlowHiveTest"}
fail-fast: false
name: spark-sql-${{ matrix.config.scan-impl }}-${{ matrix.module.name
}}/spark-${{ matrix.config.spark-full }}
@@ -155,7 +155,7 @@ jobs:
run: |
cd apache-spark
rm -rf /root/.m2/repository/org/apache/parquet # somehow parquet
cache requires cleanups
- ENABLE_COMET=true ENABLE_COMET_ONHEAP=true ${{
matrix.config.scan-env }} ENABLE_COMET_LOG_FALLBACK_REASONS=${{
github.event.inputs.collect-fallback-logs || 'false' }} \
+ NOLINT_ON_COMPILE=true ENABLE_COMET=true ENABLE_COMET_ONHEAP=true
COMET_PARQUET_SCAN_IMPL=${{ matrix.config.scan-impl }}
ENABLE_COMET_LOG_FALLBACK_REASONS=${{ github.event.inputs.collect-fallback-logs
|| 'false' }} \
build/sbt -Dsbt.log.noformat=true ${{ matrix.module.args1 }} "${{
matrix.module.args2 }}"
if [ "${{ github.event.inputs.collect-fallback-logs }}" = "true" ];
then
find . -type f -name "unit-tests.log" -print0 | xargs -0 grep -h
"Comet cannot accelerate" | sed 's/.*Comet cannot accelerate/Comet cannot
accelerate/' | sort -u > fallback.log
diff --git a/dev/diffs/3.5.8.diff b/dev/diffs/3.5.8.diff
index c617fa75b..10f579da6 100644
--- a/dev/diffs/3.5.8.diff
+++ b/dev/diffs/3.5.8.diff
@@ -1,5 +1,5 @@
diff --git a/pom.xml b/pom.xml
-index a0e25ce4d8d..b95fba458f2 100644
+index edd2ad57880..77a975ea48f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -152,6 +152,8 @@
@@ -38,7 +38,7 @@ index a0e25ce4d8d..b95fba458f2 100644
</dependencyManagement>
diff --git a/sql/core/pom.xml b/sql/core/pom.xml
-index e3d324c8edb..22342150522 100644
+index bc00c448b80..82068d7a2eb 100644
--- a/sql/core/pom.xml
+++ b/sql/core/pom.xml
@@ -77,6 +77,10 @@
@@ -238,6 +238,20 @@ index e5494726695..00937f025c2 100644
}
test("A cached table preserves the partitioning and ordering of its cached
SparkPlan") {
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
+index 9e8d77c53f3..855e3ada7d1 100644
+--- a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
++++ b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
+@@ -790,7 +790,8 @@ class ColumnExpressionSuite extends QueryTest with
SharedSparkSession {
+ }
+ }
+
+- test("input_file_name, input_file_block_start, input_file_block_length -
FileScanRDD") {
++ test("input_file_name, input_file_block_start, input_file_block_length -
FileScanRDD",
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3312"))
{
+ withTempPath { dir =>
+ val data = sparkContext.parallelize(0 to 10).toDF("id")
+ data.write.parquet(dir.getCanonicalPath)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
index 6f3090d8908..c08a60fb0c2 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
@@ -374,7 +388,7 @@ index a1d5d579338..c201d39cc78 100644
}
case _ => false
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
-index f32b32ffc5a..447d7c6416e 100644
+index c4fb4fa943c..a04b23870a8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.plans.{LeftAnti,
LeftSemi}
@@ -396,7 +410,7 @@ index f32b32ffc5a..447d7c6416e 100644
assert(exchanges.size == 2)
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
-index f33432ddb6f..0e1499a24ca 100644
+index f33432ddb6f..42eb9fd1cb7 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
@@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen
@@ -447,7 +461,17 @@ index f33432ddb6f..0e1499a24ca 100644
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key ->
"true") {
val df = sql(
""" WITH v as (
-@@ -1729,6 +1736,8 @@ abstract class DynamicPartitionPruningV1Suite extends
DynamicPartitionPruningDat
+@@ -1698,7 +1705,8 @@ abstract class DynamicPartitionPruningV1Suite extends
DynamicPartitionPruningDat
+ * Check the static scan metrics with and without DPP
+ */
+ test("static scan metrics",
+- DisableAdaptiveExecution("DPP in AQE must reuse broadcast")) {
++ DisableAdaptiveExecution("DPP in AQE must reuse broadcast"),
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3313"))
{
+ withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
+ SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
+ SQLConf.EXCHANGE_REUSE_ENABLED.key -> "false") {
+@@ -1729,6 +1737,8 @@ abstract class DynamicPartitionPruningV1Suite extends
DynamicPartitionPruningDat
case s: BatchScanExec =>
// we use f1 col for v2 tables due to schema pruning
s.output.exists(_.exists(_.argString(maxFields =
100).contains("f1")))
@@ -457,10 +481,20 @@ index f33432ddb6f..0e1499a24ca 100644
}
assert(scanOption.isDefined)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
-index a206e97c353..fea1149b67d 100644
+index a206e97c353..79813d8e259 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
-@@ -467,7 +467,8 @@ class ExplainSuite extends ExplainSuiteHelper with
DisableAdaptiveExecutionSuite
+@@ -280,7 +280,8 @@ class ExplainSuite extends ExplainSuiteHelper with
DisableAdaptiveExecutionSuite
+ }
+ }
+
+- test("explain formatted - check presence of subquery in case of DPP") {
++ test("explain formatted - check presence of subquery in case of DPP",
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3313"))
{
+ withTable("df1", "df2") {
+ withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
+ SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
+@@ -467,7 +468,8 @@ class ExplainSuite extends ExplainSuiteHelper with
DisableAdaptiveExecutionSuite
}
}
@@ -470,7 +504,7 @@ index a206e97c353..fea1149b67d 100644
withTempDir { dir =>
Seq("parquet", "orc", "csv", "json").foreach { fmt =>
val basePath = dir.getCanonicalPath + "/" + fmt
-@@ -545,7 +546,9 @@ class ExplainSuite extends ExplainSuiteHelper with
DisableAdaptiveExecutionSuite
+@@ -545,7 +547,9 @@ class ExplainSuite extends ExplainSuiteHelper with
DisableAdaptiveExecutionSuite
}
}
@@ -482,7 +516,7 @@ index a206e97c353..fea1149b67d 100644
test("SPARK-35884: Explain Formatted") {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
-index 93275487f29..33b2e7ad3b1 100644
+index 93275487f29..510e3087e0f 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
@@ -23,6 +23,7 @@ import java.nio.file.{Files, StandardOpenOption}
@@ -510,7 +544,17 @@ index 93275487f29..33b2e7ad3b1 100644
checkErrorMatchPVals(
exception = intercept[SparkException] {
testIgnoreMissingFiles(options)
-@@ -955,6 +959,7 @@ class FileBasedDataSourceSuite extends QueryTest
+@@ -639,7 +643,8 @@ class FileBasedDataSourceSuite extends QueryTest
+ }
+
+ Seq("parquet", "orc").foreach { format =>
+- test(s"Spark native readers should respect spark.sql.caseSensitive -
${format}") {
++ test(s"Spark native readers should respect spark.sql.caseSensitive -
${format}",
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311"))
{
+ withTempDir { dir =>
+ val tableName = s"spark_25132_${format}_native"
+ val tableDir = dir.getCanonicalPath + s"/$tableName"
+@@ -955,6 +960,7 @@ class FileBasedDataSourceSuite extends QueryTest
assert(bJoinExec.isEmpty)
val smJoinExec = collect(joinedDF.queryExecution.executedPlan) {
case smJoin: SortMergeJoinExec => smJoin
@@ -518,7 +562,7 @@ index 93275487f29..33b2e7ad3b1 100644
}
assert(smJoinExec.nonEmpty)
}
-@@ -1015,6 +1020,7 @@ class FileBasedDataSourceSuite extends QueryTest
+@@ -1015,6 +1021,7 @@ class FileBasedDataSourceSuite extends QueryTest
val fileScan = df.queryExecution.executedPlan collectFirst {
case BatchScanExec(_, f: FileScan, _, _, _, _) => f
@@ -526,7 +570,7 @@ index 93275487f29..33b2e7ad3b1 100644
}
assert(fileScan.nonEmpty)
assert(fileScan.get.partitionFilters.nonEmpty)
-@@ -1056,6 +1062,7 @@ class FileBasedDataSourceSuite extends QueryTest
+@@ -1056,6 +1063,7 @@ class FileBasedDataSourceSuite extends QueryTest
val fileScan = df.queryExecution.executedPlan collectFirst {
case BatchScanExec(_, f: FileScan, _, _, _, _) => f
@@ -534,7 +578,7 @@ index 93275487f29..33b2e7ad3b1 100644
}
assert(fileScan.nonEmpty)
assert(fileScan.get.partitionFilters.isEmpty)
-@@ -1240,6 +1247,9 @@ class FileBasedDataSourceSuite extends QueryTest
+@@ -1240,6 +1248,9 @@ class FileBasedDataSourceSuite extends QueryTest
val filters = df.queryExecution.executedPlan.collect {
case f: FileSourceScanLike => f.dataFilters
case b: BatchScanExec => b.scan.asInstanceOf[FileScan].dataFilters
@@ -546,7 +590,7 @@ index 93275487f29..33b2e7ad3b1 100644
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/IgnoreComet.scala
b/sql/core/src/test/scala/org/apache/spark/sql/IgnoreComet.scala
new file mode 100644
-index 00000000000..5691536c114
+index 00000000000..1ee842b6f62
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/IgnoreComet.scala
@@ -0,0 +1,45 @@
@@ -586,8 +630,8 @@ index 00000000000..5691536c114
+ * Helper trait that disables Comet for all tests regardless of default
config values.
+ */
+trait IgnoreCometSuite extends SQLTestUtils {
-+ override protected def test(testName: String, testTags: Tag*)(testFun: =>
Any)
-+ (implicit pos: Position): Unit = {
++ override protected def test(testName: String, testTags: Tag*)(testFun: =>
Any)(implicit
++ pos: Position): Unit = {
+ if (isCometEnabled) {
+ ignore(testName + " (disabled when Comet is on)", testTags: _*)(testFun)
+ } else {
@@ -1040,6 +1084,20 @@ index 04702201f82..5ee11f83ecf 100644
}
assert(exchanges.size === 1)
}
+diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
+index 9f8e979e3fb..3bc9dab8023 100644
+--- a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
++++ b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
+@@ -87,7 +87,8 @@ class UDFSuite extends QueryTest with SharedSparkSession {
+ spark.catalog.dropTempView("tmp_table")
+ }
+
+- test("SPARK-8005 input_file_name") {
++ test("SPARK-8005 input_file_name",
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3312"))
{
+ withTempPath { dir =>
+ val data = sparkContext.parallelize(0 to 10, 2).toDF("id")
+ data.write.parquet(dir.getCanonicalPath)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
index d269290e616..13726a31e07 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
@@ -1104,18 +1162,31 @@ index d269290e616..13726a31e07 100644
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala
-index cfc8b2cc845..c6fcfd7bd08 100644
+index cfc8b2cc845..b7c234e1437 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala
-@@ -21,6 +21,7 @@ import scala.collection.mutable.ArrayBuffer
+@@ -19,8 +19,9 @@ package org.apache.spark.sql.connector
+ import scala.collection.mutable.ArrayBuffer
+
import org.apache.spark.SparkConf
- import org.apache.spark.sql.{AnalysisException, QueryTest}
+-import org.apache.spark.sql.{AnalysisException, QueryTest}
++import org.apache.spark.sql.{AnalysisException, IgnoreCometNativeDataFusion,
QueryTest}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.comet.{CometNativeScanExec, CometScanExec}
import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite,
Table, TableCapability}
import org.apache.spark.sql.connector.read.ScanBuilder
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
-@@ -184,7 +185,11 @@ class FileDataSourceV2FallBackSuite extends QueryTest
with SharedSparkSession {
+@@ -152,7 +153,8 @@ class FileDataSourceV2FallBackSuite extends QueryTest with
SharedSparkSession {
+ }
+ }
+
+- test("Fallback Parquet V2 to V1") {
++ test("Fallback Parquet V2 to V1",
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3315"))
{
+ Seq("parquet", classOf[ParquetDataSourceV2].getCanonicalName).foreach {
format =>
+ withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> format) {
+ val commands = ArrayBuffer.empty[(String, LogicalPlan)]
+@@ -184,7 +186,11 @@ class FileDataSourceV2FallBackSuite extends QueryTest
with SharedSparkSession {
val df = spark.read.format(format).load(path.getCanonicalPath)
checkAnswer(df, inputData.toDF())
assert(
@@ -1379,6 +1450,28 @@ index 47679ed7865..9ffbaecb98e 100644
}.length == hashAggCount)
assert(collectWithSubqueries(plan) { case s: SortAggregateExec => s
}.length == sortAggCount)
}
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
+index a1147c16cc8..c7a29496328 100644
+--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
++++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
+@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution
+
+ import org.apache.spark.{SparkArithmeticException, SparkException,
SparkFileNotFoundException}
+ import org.apache.spark.sql._
++import org.apache.spark.sql.IgnoreCometNativeDataFusion
+ import org.apache.spark.sql.catalyst.TableIdentifier
+ import org.apache.spark.sql.catalyst.expressions.{Add, Alias, Divide}
+ import org.apache.spark.sql.catalyst.parser.ParseException
+@@ -968,7 +969,8 @@ abstract class SQLViewSuite extends QueryTest with
SQLTestUtils {
+ }
+ }
+
+- test("alter temporary view should follow current storeAnalyzedPlanForView
config") {
++ test("alter temporary view should follow current storeAnalyzedPlanForView
config",
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3314"))
{
+ withTable("t") {
+ Seq(2, 3, 1).toDF("c1").write.format("parquet").saveAsTable("t")
+ withView("v1") {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
index eec396b2e39..bf3f1c769d6 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
@@ -1877,7 +1970,7 @@ index 2f8e401e743..a4f94417dcc 100644
assert(o1.semanticEquals(o2), "Different output column order after
AQE optimization")
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala
-index bf496d6db21..9bb57a9b4c6 100644
+index fd52d038ca6..154c800be67 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.expressions.Concat
@@ -1888,7 +1981,7 @@ index bf496d6db21..9bb57a9b4c6 100644
import org.apache.spark.sql.execution.FileSourceScanExec
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.functions._
-@@ -868,6 +869,8 @@ abstract class SchemaPruningSuite
+@@ -884,6 +885,8 @@ abstract class SchemaPruningSuite
val fileSourceScanSchemata =
collect(df.queryExecution.executedPlan) {
case scan: FileSourceScanExec => scan.requiredSchema
@@ -1898,7 +1991,7 @@ index bf496d6db21..9bb57a9b4c6 100644
assert(fileSourceScanSchemata.size === expectedSchemaCatalogStrings.size,
s"Found ${fileSourceScanSchemata.size} file sources in dataframe, " +
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala
-index ce43edb79c1..4dbb5942bc3 100644
+index 5fd27410dcb..468abb1543a 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources
@@ -1909,7 +2002,7 @@ index ce43edb79c1..4dbb5942bc3 100644
import org.apache.spark.sql.execution.{QueryExecution, SortExec}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
import org.apache.spark.sql.internal.SQLConf
-@@ -225,6 +226,7 @@ class V1WriteCommandSuite extends QueryTest with
SharedSparkSession with V1Write
+@@ -243,6 +244,7 @@ class V1WriteCommandSuite extends QueryTest with
SharedSparkSession with V1Write
// assert the outer most sort in the executed plan
assert(plan.collectFirst {
case s: SortExec => s
@@ -1917,7 +2010,7 @@ index ce43edb79c1..4dbb5942bc3 100644
}.exists {
case SortExec(Seq(
SortOrder(AttributeReference("key", IntegerType, _, _),
Ascending, NullsFirst, _),
-@@ -272,6 +274,7 @@ class V1WriteCommandSuite extends QueryTest with
SharedSparkSession with V1Write
+@@ -290,6 +292,7 @@ class V1WriteCommandSuite extends QueryTest with
SharedSparkSession with V1Write
// assert the outer most sort in the executed plan
assert(plan.collectFirst {
case s: SortExec => s
@@ -1972,8 +2065,124 @@ index 07e2849ce6f..3e73645b638 100644
val extraOptions = Map[String, String](
ParquetOutputFormat.WRITER_VERSION ->
ParquetProperties.WriterVersion.PARQUET_2_0.toString
)
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFieldIdIOSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFieldIdIOSuite.scala
+index 5e01d3f447c..284d6657d4f 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFieldIdIOSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFieldIdIOSuite.scala
+@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources.parquet
+ import scala.collection.JavaConverters._
+
+ import org.apache.spark.SparkException
+-import org.apache.spark.sql.{QueryTest, Row}
++import org.apache.spark.sql.{IgnoreCometNativeDataFusion, QueryTest, Row}
+ import org.apache.spark.sql.internal.SQLConf
+ import org.apache.spark.sql.test.SharedSparkSession
+ import org.apache.spark.sql.types.{ArrayType, IntegerType, MapType, Metadata,
MetadataBuilder, StringType, StructType}
+@@ -30,7 +30,8 @@ class ParquetFieldIdIOSuite extends QueryTest with
ParquetTest with SharedSparkS
+ private def withId(id: Int): Metadata =
+ new MetadataBuilder().putLong(ParquetUtils.FIELD_ID_METADATA_KEY,
id).build()
+
+- test("Parquet reads infer fields using field ids correctly") {
++ test("Parquet reads infer fields using field ids correctly",
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3316"))
{
+ withTempDir { dir =>
+ val readSchema =
+ new StructType()
+@@ -78,7 +79,8 @@ class ParquetFieldIdIOSuite extends QueryTest with
ParquetTest with SharedSparkS
+ }
+ }
+
+- test("absence of field ids") {
++ test("absence of field ids",
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3316"))
{
+ withTempDir { dir =>
+ val readSchema =
+ new StructType()
+@@ -107,7 +109,8 @@ class ParquetFieldIdIOSuite extends QueryTest with
ParquetTest with SharedSparkS
+ }
+ }
+
+- test("SPARK-38094: absence of field ids: reading nested schema") {
++ test("SPARK-38094: absence of field ids: reading nested schema",
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3316"))
{
+ withTempDir { dir =>
+ // now with nested schema/complex type
+ val readSchema =
+@@ -136,7 +139,8 @@ class ParquetFieldIdIOSuite extends QueryTest with
ParquetTest with SharedSparkS
+ }
+ }
+
+- test("multiple id matches") {
++ test("multiple id matches",
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3316"))
{
+ withTempDir { dir =>
+ val readSchema =
+ new StructType()
+@@ -163,7 +167,8 @@ class ParquetFieldIdIOSuite extends QueryTest with
ParquetTest with SharedSparkS
+ }
+ }
+
+- test("read parquet file without ids") {
++ test("read parquet file without ids",
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3316"))
{
+ withTempDir { dir =>
+ val readSchema =
+ new StructType()
+@@ -196,7 +201,8 @@ class ParquetFieldIdIOSuite extends QueryTest with
ParquetTest with SharedSparkS
+ }
+ }
+
+- test("global read/write flag should work correctly") {
++ test("global read/write flag should work correctly",
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3316"))
{
+ withTempDir { dir =>
+ val readSchema =
+ new StructType()
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileMetadataStructRowIndexSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileMetadataStructRowIndexSuite.scala
+index c10e1799702..ba6629abfd9 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileMetadataStructRowIndexSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileMetadataStructRowIndexSuite.scala
+@@ -16,7 +16,7 @@
+ */
+ package org.apache.spark.sql.execution.datasources.parquet
+
+-import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest}
++import org.apache.spark.sql.{AnalysisException, DataFrame,
IgnoreCometNativeDataFusion, QueryTest}
+ import org.apache.spark.sql.execution.datasources.FileFormat
+ import org.apache.spark.sql.functions.{col, lit}
+ import org.apache.spark.sql.internal.SQLConf
+@@ -154,7 +154,8 @@ class ParquetFileMetadataStructRowIndexSuite extends
QueryTest with SharedSparkS
+ }
+ }
+
+- test(s"reading ${ROW_INDEX_TEMPORARY_COLUMN_NAME} - not present in a
table") {
++ test(s"reading ${ROW_INDEX_TEMPORARY_COLUMN_NAME} - not present in a table",
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3317"))
{
+ // File format supporting row index generation populates the column with
row indexes.
+ withReadDataFrame("parquet", extraSchemaFields =
+ Seq(StructField(ROW_INDEX_TEMPORARY_COLUMN_NAME, LongType))) { df =>
+@@ -172,7 +173,8 @@ class ParquetFileMetadataStructRowIndexSuite extends
QueryTest with SharedSparkS
+ }
+ }
+
+- test(s"reading ${ROW_INDEX_TEMPORARY_COLUMN_NAME} - present in a table") {
++ test(s"reading ${ROW_INDEX_TEMPORARY_COLUMN_NAME} - present in a table",
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3317"))
{
+ withReadDataFrame("parquet", extraCol = ROW_INDEX_TEMPORARY_COLUMN_NAME)
{ df =>
+ // Values of ROW_INDEX_TEMPORARY_COLUMN_NAME column are always
populated with
+ // generated row indexes, rather than read from the file.
+@@ -189,7 +191,8 @@ class ParquetFileMetadataStructRowIndexSuite extends
QueryTest with SharedSparkS
+ }
+ }
+
+- test(s"reading ${ROW_INDEX_TEMPORARY_COLUMN_NAME} - as partition col") {
++ test(s"reading ${ROW_INDEX_TEMPORARY_COLUMN_NAME} - as partition col",
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3317"))
{
+ withReadDataFrame("parquet", partitionCol =
ROW_INDEX_TEMPORARY_COLUMN_NAME) { df =>
+ // Column values are set for each partition, rather than populated with
generated row indexes.
+ assert(df
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
-index 8e88049f51e..8f3cf8a0f80 100644
+index 8e88049f51e..49f2001dc6b 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -1095,7 +1095,11 @@ abstract class ParquetFilterSuite extends QueryTest
with ParquetTest with Shared
@@ -1999,7 +2208,17 @@ index 8e88049f51e..8f3cf8a0f80 100644
import testImplicits._
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true",
-@@ -1580,7 +1585,11 @@ abstract class ParquetFilterSuite extends QueryTest
with ParquetTest with Shared
+@@ -1548,7 +1553,8 @@ abstract class ParquetFilterSuite extends QueryTest with
ParquetTest with Shared
+ }
+ }
+
+- test("SPARK-31026: Parquet predicate pushdown for fields having dots in the
names") {
++ test("SPARK-31026: Parquet predicate pushdown for fields having dots in the
names",
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3320"))
{
+ import testImplicits._
+
+ withAllParquetReaders {
+@@ -1580,13 +1586,18 @@ abstract class ParquetFilterSuite extends QueryTest
with ParquetTest with Shared
// than the total length but should not be a single record.
// Note that, if record level filtering is enabled, it should be a
single record.
// If no filter is pushed down to Parquet, it should be the total
length of data.
@@ -2012,7 +2231,15 @@ index 8e88049f51e..8f3cf8a0f80 100644
}
}
}
-@@ -1607,7 +1616,11 @@ abstract class ParquetFilterSuite extends QueryTest
with ParquetTest with Shared
+ }
+
+- test("Filters should be pushed down for Parquet readers at row group
level") {
++ test("Filters should be pushed down for Parquet readers at row group level",
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3320"))
{
+ import testImplicits._
+
+ withSQLConf(
+@@ -1607,7 +1618,11 @@ abstract class ParquetFilterSuite extends QueryTest
with ParquetTest with Shared
// than the total length but should not be a single record.
// Note that, if record level filtering is enabled, it should be a
single record.
// If no filter is pushed down to Parquet, it should be the total
length of data.
@@ -2025,7 +2252,7 @@ index 8e88049f51e..8f3cf8a0f80 100644
}
}
}
-@@ -1699,7 +1712,7 @@ abstract class ParquetFilterSuite extends QueryTest with
ParquetTest with Shared
+@@ -1699,7 +1714,7 @@ abstract class ParquetFilterSuite extends QueryTest with
ParquetTest with Shared
(attr, value) => sources.StringContains(attr, value))
}
@@ -2034,7 +2261,7 @@ index 8e88049f51e..8f3cf8a0f80 100644
import testImplicits._
// keep() should take effect on StartsWith/EndsWith/Contains
Seq(
-@@ -1743,7 +1756,8 @@ abstract class ParquetFilterSuite extends QueryTest with
ParquetTest with Shared
+@@ -1743,7 +1758,8 @@ abstract class ParquetFilterSuite extends QueryTest with
ParquetTest with Shared
}
}
@@ -2044,7 +2271,17 @@ index 8e88049f51e..8f3cf8a0f80 100644
val schema = StructType(Seq(
StructField("a", IntegerType, nullable = false)
))
-@@ -1984,7 +1998,8 @@ abstract class ParquetFilterSuite extends QueryTest with
ParquetTest with Shared
+@@ -1933,7 +1949,8 @@ abstract class ParquetFilterSuite extends QueryTest with
ParquetTest with Shared
+ }
+ }
+
+- test("SPARK-25207: exception when duplicate fields in case-insensitive
mode") {
++ test("SPARK-25207: exception when duplicate fields in case-insensitive
mode",
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311"))
{
+ withTempPath { dir =>
+ val count = 10
+ val tableName = "spark_25207"
+@@ -1984,7 +2001,8 @@ abstract class ParquetFilterSuite extends QueryTest with
ParquetTest with Shared
}
}
@@ -2054,7 +2291,7 @@ index 8e88049f51e..8f3cf8a0f80 100644
// block 1:
// null count min
max
// page-0 0 0
99
-@@ -2044,7 +2059,8 @@ abstract class ParquetFilterSuite extends QueryTest with
ParquetTest with Shared
+@@ -2044,7 +2062,8 @@ abstract class ParquetFilterSuite extends QueryTest with
ParquetTest with Shared
}
}
@@ -2064,7 +2301,7 @@ index 8e88049f51e..8f3cf8a0f80 100644
withTempPath { dir =>
val path = dir.getCanonicalPath
spark.range(100).selectExpr("id * 2 AS id")
-@@ -2276,7 +2292,11 @@ class ParquetV1FilterSuite extends ParquetFilterSuite {
+@@ -2276,7 +2295,11 @@ class ParquetV1FilterSuite extends ParquetFilterSuite {
assert(pushedParquetFilters.exists(_.getClass === filterClass),
s"${pushedParquetFilters.map(_.getClass).toList} did not contain
${filterClass}.")
@@ -2077,7 +2314,7 @@ index 8e88049f51e..8f3cf8a0f80 100644
} else {
assert(selectedFilters.isEmpty, "There is filter pushed down")
}
-@@ -2336,7 +2356,11 @@ class ParquetV2FilterSuite extends ParquetFilterSuite {
+@@ -2336,7 +2359,11 @@ class ParquetV2FilterSuite extends ParquetFilterSuite {
assert(pushedParquetFilters.exists(_.getClass === filterClass),
s"${pushedParquetFilters.map(_.getClass).toList} did not contain
${filterClass}.")
@@ -2091,10 +2328,30 @@ index 8e88049f51e..8f3cf8a0f80 100644
case _ =>
throw new AnalysisException("Can not match ParquetTable in the
query.")
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
-index 8ed9ef1630e..eed2a6f5ad5 100644
+index 8ed9ef1630e..f312174b182 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
-@@ -1345,7 +1345,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest
with SharedSparkSession
+@@ -1064,7 +1064,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest
with SharedSparkSession
+ }
+ }
+
+- test("SPARK-35640: read binary as timestamp should throw schema
incompatible error") {
++ test("SPARK-35640: read binary as timestamp should throw schema
incompatible error",
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311"))
{
+ val data = (1 to 4).map(i => Tuple1(i.toString))
+ val readSchema = StructType(Seq(StructField("_1",
DataTypes.TimestampType)))
+
+@@ -1075,7 +1076,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest
with SharedSparkSession
+ }
+ }
+
+- test("SPARK-35640: int as long should throw schema incompatible error") {
++ test("SPARK-35640: int as long should throw schema incompatible error",
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311"))
{
+ val data = (1 to 4).map(i => Tuple1(i))
+ val readSchema = StructType(Seq(StructField("_1", DataTypes.LongType)))
+
+@@ -1345,7 +1347,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest
with SharedSparkSession
}
}
@@ -2105,10 +2362,20 @@ index 8ed9ef1630e..eed2a6f5ad5 100644
checkAnswer(
// "fruit" column in this file is encoded using
DELTA_LENGTH_BYTE_ARRAY.
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
-index f6472ba3d9d..7a8f5317ed7 100644
+index f6472ba3d9d..ce39ebb52e6 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
-@@ -998,7 +998,8 @@ abstract class ParquetQuerySuite extends QueryTest with
ParquetTest with SharedS
+@@ -185,7 +185,8 @@ abstract class ParquetQuerySuite extends QueryTest with
ParquetTest with SharedS
+ }
+ }
+
+- test("SPARK-36182: can't read TimestampLTZ as TimestampNTZ") {
++ test("SPARK-36182: can't read TimestampLTZ as TimestampNTZ",
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311"))
{
+ val data = (1 to 1000).map { i =>
+ val ts = new java.sql.Timestamp(i)
+ Row(ts)
+@@ -998,7 +999,8 @@ abstract class ParquetQuerySuite extends QueryTest with
ParquetTest with SharedS
}
}
@@ -2118,7 +2385,17 @@ index f6472ba3d9d..7a8f5317ed7 100644
withAllParquetReaders {
withTempPath { path =>
// Repeated values for dictionary encoding.
-@@ -1067,7 +1068,8 @@ abstract class ParquetQuerySuite extends QueryTest with
ParquetTest with SharedS
+@@ -1051,7 +1053,8 @@ abstract class ParquetQuerySuite extends QueryTest with
ParquetTest with SharedS
+ testMigration(fromTsType = "TIMESTAMP_MICROS", toTsType = "INT96")
+ }
+
+- test("SPARK-34212 Parquet should read decimals correctly") {
++ test("SPARK-34212 Parquet should read decimals correctly",
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311"))
{
+ def readParquet(schema: String, path: File): DataFrame = {
+ spark.read.schema(schema).parquet(path.toString)
+ }
+@@ -1067,7 +1070,8 @@ abstract class ParquetQuerySuite extends QueryTest with
ParquetTest with SharedS
checkAnswer(readParquet(schema, path), df)
}
@@ -2128,7 +2405,7 @@ index f6472ba3d9d..7a8f5317ed7 100644
val schema1 = "a DECIMAL(3, 2), b DECIMAL(18, 3), c DECIMAL(37, 3)"
checkAnswer(readParquet(schema1, path), df)
val schema2 = "a DECIMAL(3, 0), b DECIMAL(18, 1), c DECIMAL(37, 1)"
-@@ -1089,7 +1091,8 @@ abstract class ParquetQuerySuite extends QueryTest with
ParquetTest with SharedS
+@@ -1089,7 +1093,8 @@ abstract class ParquetQuerySuite extends QueryTest with
ParquetTest with SharedS
val df = sql(s"SELECT 1 a, 123456 b, ${Int.MaxValue.toLong * 10} c,
CAST('1.2' AS BINARY) d")
df.write.parquet(path.toString)
@@ -2138,7 +2415,17 @@ index f6472ba3d9d..7a8f5317ed7 100644
checkAnswer(readParquet("a DECIMAL(3, 2)", path), sql("SELECT 1.00"))
checkAnswer(readParquet("b DECIMAL(3, 2)", path), Row(null))
checkAnswer(readParquet("b DECIMAL(11, 1)", path), sql("SELECT
123456.0"))
-@@ -1148,7 +1151,7 @@ abstract class ParquetQuerySuite extends QueryTest with
ParquetTest with SharedS
+@@ -1133,7 +1138,8 @@ abstract class ParquetQuerySuite extends QueryTest with
ParquetTest with SharedS
+ }
+ }
+
+- test("row group skipping doesn't overflow when reading into larger type") {
++ test("row group skipping doesn't overflow when reading into larger type",
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311"))
{
+ withTempPath { path =>
+ Seq(0).toDF("a").write.parquet(path.toString)
+ // The vectorized and non-vectorized readers will produce different
exceptions, we don't need
+@@ -1148,7 +1154,7 @@ abstract class ParquetQuerySuite extends QueryTest with
ParquetTest with SharedS
.where(s"a < ${Long.MaxValue}")
.collect()
}
@@ -2243,14 +2530,14 @@ index 5c0b7def039..151184bc98c 100644
assert(fileSourceScanSchemata.size === expectedSchemaCatalogStrings.size,
s"Found ${fileSourceScanSchemata.size} file sources in dataframe, " +
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
-index 3f47c5e506f..bc1ee1ec0ba 100644
+index 3f47c5e506f..92a5eafec84 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
@@ -27,6 +27,7 @@ import
org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
import org.apache.parquet.schema.Type._
import org.apache.spark.SparkException
-+import org.apache.spark.sql.IgnoreComet
++import org.apache.spark.sql.{IgnoreComet, IgnoreCometNativeDataFusion}
import org.apache.spark.sql.catalyst.expressions.Cast.toSQLType
import
org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException
import org.apache.spark.sql.functions.desc
@@ -2264,6 +2551,26 @@ index 3f47c5e506f..bc1ee1ec0ba 100644
withTempPath { dir =>
val e = testSchemaMismatch(dir.getCanonicalPath,
vectorizedReaderEnabled = false)
val expectedMessage = "Encountered error while reading file"
+@@ -1046,7 +1048,8 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
+ }
+ }
+
+- test("schema mismatch failure error message for parquet vectorized reader")
{
++ test("schema mismatch failure error message for parquet vectorized reader",
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311"))
{
+ withTempPath { dir =>
+ val e = testSchemaMismatch(dir.getCanonicalPath,
vectorizedReaderEnabled = true)
+ assert(e.getCause.isInstanceOf[SparkException])
+@@ -1087,7 +1090,8 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
+ }
+ }
+
+- test("SPARK-45604: schema mismatch failure error on timestamp_ntz to
array<timestamp_ntz>") {
++ test("SPARK-45604: schema mismatch failure error on timestamp_ntz to
array<timestamp_ntz>",
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311"))
{
+ import testImplicits._
+
+ withTempPath { dir =>
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala
index b8f3ea3c6f3..bbd44221288 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala
@@ -2313,18 +2620,30 @@ index 5cdbdc27b32..307fba16578 100644
spark.range(10).selectExpr("id", "id % 3 as p")
.write.partitionBy("p").saveAsTable("testDataForScan")
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala
-index 0ab8691801d..d9125f658ad 100644
+index 0ab8691801d..7b81f3a8f6d 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala
-@@ -18,6 +18,7 @@
+@@ -17,7 +17,9 @@
+
package org.apache.spark.sql.execution.python
++import org.apache.spark.sql.IgnoreCometNativeDataFusion
import org.apache.spark.sql.catalyst.plans.logical.{ArrowEvalPython,
BatchEvalPython, Limit, LocalLimit}
+import org.apache.spark.sql.comet._
import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan,
SparkPlanTest}
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
-@@ -108,6 +109,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with
SharedSparkSession {
+@@ -93,7 +95,8 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with
SharedSparkSession {
+ assert(arrowEvalNodes.size == 2)
+ }
+
+- test("Python UDF should not break column pruning/filter pushdown -- Parquet
V1") {
++ test("Python UDF should not break column pruning/filter pushdown -- Parquet
V1",
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3312"))
{
+ withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") {
+ withTempPath { f =>
+ spark.range(10).select($"id".as("a"), $"id".as("b"))
+@@ -108,6 +111,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with
SharedSparkSession {
val scanNodes = query.queryExecution.executedPlan.collect {
case scan: FileSourceScanExec => scan
@@ -2332,7 +2651,7 @@ index 0ab8691801d..d9125f658ad 100644
}
assert(scanNodes.length == 1)
assert(scanNodes.head.output.map(_.name) == Seq("a"))
-@@ -120,11 +122,16 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with
SharedSparkSession {
+@@ -120,11 +124,16 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with
SharedSparkSession {
val scanNodes = query.queryExecution.executedPlan.collect {
case scan: FileSourceScanExec => scan
@@ -2351,7 +2670,7 @@ index 0ab8691801d..d9125f658ad 100644
}
}
}
-@@ -145,6 +152,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with
SharedSparkSession {
+@@ -145,6 +154,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with
SharedSparkSession {
val scanNodes = query.queryExecution.executedPlan.collect {
case scan: BatchScanExec => scan
@@ -2359,7 +2678,7 @@ index 0ab8691801d..d9125f658ad 100644
}
assert(scanNodes.length == 1)
assert(scanNodes.head.output.map(_.name) == Seq("a"))
-@@ -157,6 +165,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with
SharedSparkSession {
+@@ -157,6 +167,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with
SharedSparkSession {
val scanNodes = query.queryExecution.executedPlan.collect {
case scan: BatchScanExec => scan
@@ -2384,7 +2703,7 @@ index d083cac48ff..3c11bcde807 100644
import testImplicits._
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
-index 746f289c393..a773971d3c1 100644
+index 746f289c393..5b9e31c1fa6 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
@@ -19,16 +19,19 @@ package org.apache.spark.sql.sources
@@ -2505,7 +2824,14 @@ index 746f289c393..a773971d3c1 100644
s"expected sort in the right child to be $sortRight but
found\n${joinOperator.right}")
// check the output partitioning
-@@ -836,11 +874,11 @@ abstract class BucketedReadSuite extends QueryTest with
SQLTestUtils with Adapti
+@@ -831,16 +869,17 @@ abstract class BucketedReadSuite extends QueryTest with
SQLTestUtils with Adapti
+ }
+ }
+
+- test("disable bucketing when the output doesn't contain all bucketing
columns") {
++ test("disable bucketing when the output doesn't contain all bucketing
columns",
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3319"))
{
+ withTable("bucketed_table") {
df1.write.format("parquet").bucketBy(8,
"i").saveAsTable("bucketed_table")
val scanDF = spark.table("bucketed_table").select("j")
@@ -2519,7 +2845,7 @@ index 746f289c393..a773971d3c1 100644
checkAnswer(aggDF, df1.groupBy("j").agg(max("k")))
}
}
-@@ -895,7 +933,10 @@ abstract class BucketedReadSuite extends QueryTest with
SQLTestUtils with Adapti
+@@ -895,7 +934,10 @@ abstract class BucketedReadSuite extends QueryTest with
SQLTestUtils with Adapti
}
test("SPARK-29655 Read bucketed tables obeys spark.sql.shuffle.partitions")
{
@@ -2530,7 +2856,7 @@ index 746f289c393..a773971d3c1 100644
SQLConf.SHUFFLE_PARTITIONS.key -> "5",
SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "7") {
val bucketSpec = Some(BucketSpec(6, Seq("i", "j"), Nil))
-@@ -914,7 +955,10 @@ abstract class BucketedReadSuite extends QueryTest with
SQLTestUtils with Adapti
+@@ -914,7 +956,10 @@ abstract class BucketedReadSuite extends QueryTest with
SQLTestUtils with Adapti
}
test("SPARK-32767 Bucket join should work if SHUFFLE_PARTITIONS larger than
bucket number") {
@@ -2541,7 +2867,7 @@ index 746f289c393..a773971d3c1 100644
SQLConf.SHUFFLE_PARTITIONS.key -> "9",
SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "10") {
-@@ -944,7 +988,10 @@ abstract class BucketedReadSuite extends QueryTest with
SQLTestUtils with Adapti
+@@ -944,7 +989,10 @@ abstract class BucketedReadSuite extends QueryTest with
SQLTestUtils with Adapti
}
test("bucket coalescing eliminates shuffle") {
@@ -2552,7 +2878,17 @@ index 746f289c393..a773971d3c1 100644
SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true",
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
// The side with bucketedTableTestSpec1 will be coalesced to have 4
output partitions.
-@@ -1029,15 +1076,21 @@ abstract class BucketedReadSuite extends QueryTest
with SQLTestUtils with Adapti
+@@ -1013,7 +1061,8 @@ abstract class BucketedReadSuite extends QueryTest with
SQLTestUtils with Adapti
+ }
+ }
+
+- test("bucket coalescing is applied when join expressions match with
partitioning expressions") {
++ test("bucket coalescing is applied when join expressions match with
partitioning expressions",
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3319"))
{
+ withTable("t1", "t2", "t3") {
+ df1.write.format("parquet").bucketBy(8, "i", "j").saveAsTable("t1")
+ df2.write.format("parquet").bucketBy(4, "i", "j").saveAsTable("t2")
+@@ -1029,15 +1078,21 @@ abstract class BucketedReadSuite extends QueryTest
with SQLTestUtils with Adapti
Seq(true, false).foreach { aqeEnabled =>
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key ->
aqeEnabled.toString) {
val plan = sql(query).queryExecution.executedPlan
@@ -2601,13 +2937,15 @@ index 6f897a9c0b7..b0723634f68 100644
protected override lazy val sql = spark.sql _
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala
-index d675503a8ba..659fa686fb7 100644
+index d675503a8ba..c386a8cb686 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala
-@@ -18,6 +18,7 @@
+@@ -17,7 +17,8 @@
+
package org.apache.spark.sql.sources
- import org.apache.spark.sql.QueryTest
+-import org.apache.spark.sql.QueryTest
++import org.apache.spark.sql.{IgnoreCometNativeDataFusion, QueryTest}
+import org.apache.spark.sql.comet.CometScanExec
import org.apache.spark.sql.execution.FileSourceScanExec
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper,
DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite}
@@ -2624,11 +2962,61 @@ index d675503a8ba..659fa686fb7 100644
assert(bucketedScan.length == expectedNumBucketedScan)
}
+@@ -83,7 +87,8 @@ abstract class DisableUnnecessaryBucketedScanSuite
+ }
+ }
+
+- test("SPARK-32859: disable unnecessary bucketed table scan - basic test") {
++ test("SPARK-32859: disable unnecessary bucketed table scan - basic test",
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3319"))
{
+ withTable("t1", "t2", "t3") {
+ df1.write.format("parquet").bucketBy(8, "i").saveAsTable("t1")
+ df2.write.format("parquet").bucketBy(8, "i").saveAsTable("t2")
+@@ -124,7 +129,8 @@ abstract class DisableUnnecessaryBucketedScanSuite
+ }
+ }
+
+- test("SPARK-32859: disable unnecessary bucketed table scan - multiple joins
test") {
++ test("SPARK-32859: disable unnecessary bucketed table scan - multiple joins
test",
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3319"))
{
+ withTable("t1", "t2", "t3") {
+ df1.write.format("parquet").bucketBy(8, "i").saveAsTable("t1")
+ df2.write.format("parquet").bucketBy(8, "i").saveAsTable("t2")
+@@ -167,7 +173,8 @@ abstract class DisableUnnecessaryBucketedScanSuite
+ }
+ }
+
+- test("SPARK-32859: disable unnecessary bucketed table scan - multiple
bucketed columns test") {
++ test("SPARK-32859: disable unnecessary bucketed table scan - multiple
bucketed columns test",
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3319"))
{
+ withTable("t1", "t2", "t3") {
+ df1.write.format("parquet").bucketBy(8, "i", "j").saveAsTable("t1")
+ df2.write.format("parquet").bucketBy(8, "i", "j").saveAsTable("t2")
+@@ -198,7 +205,8 @@ abstract class DisableUnnecessaryBucketedScanSuite
+ }
+ }
+
+- test("SPARK-32859: disable unnecessary bucketed table scan - other
operators test") {
++ test("SPARK-32859: disable unnecessary bucketed table scan - other
operators test",
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3319"))
{
+ withTable("t1", "t2", "t3") {
+ df1.write.format("parquet").bucketBy(8, "i").saveAsTable("t1")
+ df2.write.format("parquet").bucketBy(8, "i").saveAsTable("t2")
+@@ -239,7 +247,8 @@ abstract class DisableUnnecessaryBucketedScanSuite
+ }
+ }
+
+- test("Aggregates with no groupby over tables having 1 BUCKET, return
multiple rows") {
++ test("Aggregates with no groupby over tables having 1 BUCKET, return
multiple rows",
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3319"))
{
+ withTable("t1") {
+ withSQLConf(SQLConf.AUTO_BUCKETED_SCAN_ENABLED.key -> "true") {
+ sql(
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
-index 1954cce7fdc..73d1464780e 100644
+index 7f6fa2a123e..c778b4e2c48 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
-@@ -34,6 +34,7 @@ import org.apache.spark.paths.SparkPath
+@@ -35,6 +35,7 @@ import org.apache.spark.paths.SparkPath
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import org.apache.spark.sql.{AnalysisException, DataFrame}
import org.apache.spark.sql.catalyst.util.stringToFile
@@ -2636,7 +3024,7 @@ index 1954cce7fdc..73d1464780e 100644
import org.apache.spark.sql.execution.DataSourceScanExec
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec,
DataSourceV2Relation, FileScan, FileTable}
-@@ -761,6 +762,8 @@ class FileStreamSinkV2Suite extends FileStreamSinkSuite {
+@@ -777,6 +778,8 @@ class FileStreamSinkV2Suite extends FileStreamSinkSuite {
val fileScan = df.queryExecution.executedPlan.collect {
case batch: BatchScanExec if batch.scan.isInstanceOf[FileScan] =>
batch.scan.asInstanceOf[FileScan]
@@ -2758,6 +3146,72 @@ index aad91601758..201083bd621 100644
})
}
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+index b5cf13a9c12..ac17603fb7f 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+@@ -36,7 +36,7 @@ import org.scalatestplus.mockito.MockitoSugar
+
+ import org.apache.spark.{SparkException, TestUtils}
+ import org.apache.spark.internal.Logging
+-import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Dataset,
Row, SaveMode}
++import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Dataset,
IgnoreCometNativeDataFusion, Row, SaveMode}
+ import org.apache.spark.sql.catalyst.InternalRow
+ import org.apache.spark.sql.catalyst.expressions.{Literal, Rand, Randn,
Shuffle, Uuid}
+ import org.apache.spark.sql.catalyst.plans.logical.{CTERelationDef,
CTERelationRef, LocalRelation}
+@@ -660,7 +660,8 @@ class StreamingQuerySuite extends StreamTest with
BeforeAndAfter with Logging wi
+ )
+ }
+
+- test("SPARK-41198: input row calculation with CTE") {
++ test("SPARK-41198: input row calculation with CTE",
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3315"))
{
+ withTable("parquet_tbl", "parquet_streaming_tbl") {
+ spark.range(0, 10).selectExpr("id AS col1", "id AS col2")
+ .write.format("parquet").saveAsTable("parquet_tbl")
+@@ -712,7 +713,8 @@ class StreamingQuerySuite extends StreamTest with
BeforeAndAfter with Logging wi
+ }
+ }
+
+- test("SPARK-41199: input row calculation with mixed-up of DSv1 and DSv2
streaming sources") {
++ test("SPARK-41199: input row calculation with mixed-up of DSv1 and DSv2
streaming sources",
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3315"))
{
+ withTable("parquet_streaming_tbl") {
+ val streamInput = MemoryStream[Int]
+ val streamDf = streamInput.toDF().selectExpr("value AS key", "value AS
value_stream")
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSelfUnionSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSelfUnionSuite.scala
+index 8f099c31e6b..ce4b7ad25b3 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSelfUnionSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSelfUnionSuite.scala
+@@ -20,7 +20,7 @@ package org.apache.spark.sql.streaming
+ import org.scalatest.BeforeAndAfter
+ import org.scalatest.concurrent.PatienceConfiguration.Timeout
+
+-import org.apache.spark.sql.SaveMode
++import org.apache.spark.sql.{IgnoreCometNativeDataFusion, SaveMode}
+ import org.apache.spark.sql.connector.catalog.Identifier
+ import org.apache.spark.sql.execution.streaming.MemoryStream
+ import org.apache.spark.sql.streaming.test.{InMemoryStreamTable,
InMemoryStreamTableCatalog}
+@@ -42,7 +42,8 @@ class StreamingSelfUnionSuite extends StreamTest with
BeforeAndAfter {
+ sqlContext.streams.active.foreach(_.stop())
+ }
+
+- test("self-union, DSv1, read via DataStreamReader API") {
++ test("self-union, DSv1, read via DataStreamReader API",
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3401"))
{
+ withTempPath { dir =>
+ val dataLocation = dir.getAbsolutePath
+ spark.range(1, 4).write.format("parquet").save(dataLocation)
+@@ -66,7 +67,8 @@ class StreamingSelfUnionSuite extends StreamTest with
BeforeAndAfter {
+ }
+ }
+
+- test("self-union, DSv1, read via table API") {
++ test("self-union, DSv1, read via table API",
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3401"))
{
+ withTable("parquet_streaming_tbl") {
+ spark.sql("CREATE TABLE parquet_streaming_tbl (key integer) USING
parquet")
+
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
index abe606ad9c1..2d930b64cca 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
@@ -2782,7 +3236,7 @@ index abe606ad9c1..2d930b64cca 100644
val tblTargetName = "tbl_target"
val tblSourceQualified = s"default.$tblSourceName"
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
-index e937173a590..ca06132102d 100644
+index e937173a590..7d20538bc68 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
@@ -27,6 +27,7 @@ import scala.concurrent.duration._
@@ -2801,37 +3255,42 @@ index e937173a590..ca06132102d 100644
import org.apache.spark.sql.execution.FilterExec
import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecution
import org.apache.spark.sql.execution.datasources.DataSourceUtils
-@@ -126,7 +128,28 @@ private[sql] trait SQLTestUtils extends SparkFunSuite
with SQLTestUtilsBase with
- }
- }
- } else {
-- super.test(testName, testTags: _*)(testFun)
-+ if (isCometEnabled && testTags.exists(_.isInstanceOf[IgnoreComet])) {
-+ ignore(testName + " (disabled when Comet is on)", testTags:
_*)(testFun)
-+ } else {
-+ val cometScanImpl = CometConf.COMET_NATIVE_SCAN_IMPL.get(conf)
-+ val isNativeIcebergCompat = cometScanImpl ==
CometConf.SCAN_NATIVE_ICEBERG_COMPAT ||
-+ cometScanImpl == CometConf.SCAN_AUTO
-+ val isNativeDataFusion = cometScanImpl ==
CometConf.SCAN_NATIVE_DATAFUSION ||
-+ cometScanImpl == CometConf.SCAN_AUTO
-+ if (isCometEnabled && isNativeIcebergCompat &&
-+ testTags.exists(_.isInstanceOf[IgnoreCometNativeIcebergCompat])) {
-+ ignore(testName + " (disabled for NATIVE_ICEBERG_COMPAT)",
testTags: _*)(testFun)
-+ } else if (isCometEnabled && isNativeDataFusion &&
-+ testTags.exists(_.isInstanceOf[IgnoreCometNativeDataFusion])) {
-+ ignore(testName + " (disabled for NATIVE_DATAFUSION)", testTags:
_*)(testFun)
-+ } else if (isCometEnabled && (isNativeDataFusion ||
isNativeIcebergCompat) &&
-+ testTags.exists(_.isInstanceOf[IgnoreCometNativeScan])) {
-+ ignore(testName + " (disabled for NATIVE_DATAFUSION and
NATIVE_ICEBERG_COMPAT)",
-+ testTags: _*)(testFun)
-+ } else {
-+ super.test(testName, testTags: _*)(testFun)
-+ }
-+ }
- }
- }
+@@ -119,6 +121,34 @@ private[sql] trait SQLTestUtils extends SparkFunSuite
with SQLTestUtilsBase with
-@@ -242,6 +265,29 @@ private[sql] trait SQLTestUtilsBase
+ override protected def test(testName: String, testTags: Tag*)(testFun: =>
Any)
+ (implicit pos: Position): Unit = {
++ // Check Comet skip tags first, before DisableAdaptiveExecution handling
++ if (isCometEnabled && testTags.exists(_.isInstanceOf[IgnoreComet])) {
++ ignore(testName + " (disabled when Comet is on)", testTags: _*)(testFun)
++ return
++ }
++ if (isCometEnabled) {
++ val cometScanImpl = CometConf.COMET_NATIVE_SCAN_IMPL.get(conf)
++ val isNativeIcebergCompat = cometScanImpl ==
CometConf.SCAN_NATIVE_ICEBERG_COMPAT ||
++ cometScanImpl == CometConf.SCAN_AUTO
++ val isNativeDataFusion = cometScanImpl ==
CometConf.SCAN_NATIVE_DATAFUSION ||
++ cometScanImpl == CometConf.SCAN_AUTO
++ if (isNativeIcebergCompat &&
++ testTags.exists(_.isInstanceOf[IgnoreCometNativeIcebergCompat])) {
++ ignore(testName + " (disabled for NATIVE_ICEBERG_COMPAT)", testTags:
_*)(testFun)
++ return
++ }
++ if (isNativeDataFusion &&
++ testTags.exists(_.isInstanceOf[IgnoreCometNativeDataFusion])) {
++ ignore(testName + " (disabled for NATIVE_DATAFUSION)", testTags:
_*)(testFun)
++ return
++ }
++ if ((isNativeDataFusion || isNativeIcebergCompat) &&
++ testTags.exists(_.isInstanceOf[IgnoreCometNativeScan])) {
++ ignore(testName + " (disabled for NATIVE_DATAFUSION and
NATIVE_ICEBERG_COMPAT)",
++ testTags: _*)(testFun)
++ return
++ }
++ }
+ if (testTags.exists(_.isInstanceOf[DisableAdaptiveExecution])) {
+ super.test(testName, testTags: _*) {
+ withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
+@@ -242,6 +272,29 @@ private[sql] trait SQLTestUtilsBase
protected override def _sqlContext: SQLContext = self.spark.sqlContext
}
@@ -2861,7 +3320,7 @@ index e937173a590..ca06132102d 100644
protected override def withSQLConf(pairs: (String, String)*)(f: => Unit):
Unit = {
SparkSession.setActiveSession(spark)
super.withSQLConf(pairs: _*)(f)
-@@ -435,6 +481,8 @@ private[sql] trait SQLTestUtilsBase
+@@ -435,6 +488,8 @@ private[sql] trait SQLTestUtilsBase
val schema = df.schema
val withoutFilters = df.queryExecution.executedPlan.transform {
case FilterExec(_, child) => child
@@ -2963,6 +3422,29 @@ index de3b1ffccf0..2a76d127093 100644
override def beforeEach(): Unit = {
super.beforeEach()
+diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
+index f3be79f9022..b4b1ea8dbc4 100644
+---
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
++++
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
+@@ -34,7 +34,7 @@ import
org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectIn
+ import org.apache.hadoop.io.{LongWritable, Writable}
+
+ import org.apache.spark.{SparkException, SparkFiles, TestUtils}
+-import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
++import org.apache.spark.sql.{AnalysisException, IgnoreCometNativeDataFusion,
QueryTest, Row}
+ import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode
+ import org.apache.spark.sql.catalyst.plans.logical.Project
+ import org.apache.spark.sql.execution.WholeStageCodegenExec
+@@ -448,7 +448,8 @@ class HiveUDFSuite extends QueryTest with
TestHiveSingleton with SQLTestUtils {
+ }
+ }
+
+- test("SPARK-11522 select input_file_name from non-parquet table") {
++ test("SPARK-11522 select input_file_name from non-parquet table",
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3312"))
{
+
+ withTempDir { tempDir =>
+
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 6160c3e5f6c..0956d7d9edc 100644
---
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
index 45faa4d94..4be2fe501 100644
--- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
+++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
@@ -50,7 +50,7 @@ import org.apache.comet.objectstore.NativeConfig
import org.apache.comet.parquet.{CometParquetScan, Native, SupportsComet}
import org.apache.comet.parquet.CometParquetUtils.{encryptionEnabled,
isEncryptionConfigSupported}
import org.apache.comet.serde.operator.CometNativeScan
-import org.apache.comet.shims.CometTypeShim
+import org.apache.comet.shims.{CometTypeShim, ShimFileFormat}
/**
* Spark physical optimizer rule for replacing Spark scans with Comet scans.
@@ -193,6 +193,14 @@ case class CometScanRule(session: SparkSession) extends
Rule[SparkPlan] with Com
withInfo(scanExec, s"$SCAN_NATIVE_DATAFUSION does not support
encryption")
return None
}
+ if (scanExec.fileConstantMetadataColumns.nonEmpty) {
+ withInfo(scanExec, "Native DataFusion scan does not support metadata
columns")
+ return None
+ }
+ if
(ShimFileFormat.findRowIndexColumnIndexInSchema(scanExec.requiredSchema) >= 0) {
+ withInfo(scanExec, "Native DataFusion scan does not support row index
generation")
+ return None
+ }
if (!isSchemaSupported(scanExec, SCAN_NATIVE_DATAFUSION, r)) {
return None
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]