This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 2f64b60b9 chore: Run Spark SQL tests with `native_datafusion` in CI 
(#3393)
2f64b60b9 is described below

commit 2f64b60b926c1f3183ec59acafe77f23629391c7
Author: Andy Grove <[email protected]>
AuthorDate: Thu Feb 5 15:03:05 2026 -0700

    chore: Run Spark SQL tests with `native_datafusion` in CI (#3393)
---
 .github/workflows/spark_sql_test.yml               |  12 +-
 dev/diffs/3.5.8.diff                               | 668 ++++++++++++++++++---
 .../org/apache/comet/rules/CometScanRule.scala     |  10 +-
 3 files changed, 590 insertions(+), 100 deletions(-)

diff --git a/.github/workflows/spark_sql_test.yml 
b/.github/workflows/spark_sql_test.yml
index 5d35f3aee..610baa9f2 100644
--- a/.github/workflows/spark_sql_test.yml
+++ b/.github/workflows/spark_sql_test.yml
@@ -120,13 +120,13 @@ jobs:
         # - auto scan: all Spark versions (3.4, 3.5, 4.0)
         # - native_iceberg_compat: Spark 3.5 only
         config:
-          - {spark-short: '3.4', spark-full: '3.4.3', java: 11, scan-impl: 
'auto', scan-env: ''}
-          - {spark-short: '3.5', spark-full: '3.5.8', java: 11, scan-impl: 
'auto', scan-env: ''}
-          - {spark-short: '3.5', spark-full: '3.5.8', java: 11, scan-impl: 
'native_iceberg_compat', scan-env: 
'COMET_PARQUET_SCAN_IMPL=native_iceberg_compat'}
-          - {spark-short: '4.0', spark-full: '4.0.1', java: 17, scan-impl: 
'auto', scan-env: ''}
+          - {spark-short: '3.4', spark-full: '3.4.3', java: 11, scan-impl: 
'auto'}
+          - {spark-short: '3.5', spark-full: '3.5.8', java: 11, scan-impl: 
'auto'}
+          - {spark-short: '3.5', spark-full: '3.5.8', java: 11, scan-impl: 
'native_datafusion'}
+          - {spark-short: '4.0', spark-full: '4.0.1', java: 17, scan-impl: 
'auto'}
         # Skip sql_hive-1 for Spark 4.0 due to 
https://github.com/apache/datafusion-comet/issues/2946
         exclude:
-          - config: {spark-short: '4.0', spark-full: '4.0.1', java: 17, 
scan-impl: 'auto', scan-env: ''}
+          - config: {spark-short: '4.0', spark-full: '4.0.1', java: 17, 
scan-impl: 'auto'}
             module: {name: "sql_hive-1", args1: "", args2: "hive/testOnly * -- 
-l org.apache.spark.tags.ExtendedHiveTest -l 
org.apache.spark.tags.SlowHiveTest"}
       fail-fast: false
     name: spark-sql-${{ matrix.config.scan-impl }}-${{ matrix.module.name 
}}/spark-${{ matrix.config.spark-full }}
@@ -155,7 +155,7 @@ jobs:
         run: |
           cd apache-spark
           rm -rf /root/.m2/repository/org/apache/parquet # somehow parquet 
cache requires cleanups
-          ENABLE_COMET=true ENABLE_COMET_ONHEAP=true ${{ 
matrix.config.scan-env }} ENABLE_COMET_LOG_FALLBACK_REASONS=${{ 
github.event.inputs.collect-fallback-logs || 'false' }} \
+          NOLINT_ON_COMPILE=true ENABLE_COMET=true ENABLE_COMET_ONHEAP=true 
COMET_PARQUET_SCAN_IMPL=${{ matrix.config.scan-impl }} 
ENABLE_COMET_LOG_FALLBACK_REASONS=${{ github.event.inputs.collect-fallback-logs 
|| 'false' }} \
             build/sbt -Dsbt.log.noformat=true ${{ matrix.module.args1 }} "${{ 
matrix.module.args2 }}"
           if [ "${{ github.event.inputs.collect-fallback-logs }}" = "true" ]; 
then
             find . -type f -name "unit-tests.log" -print0 | xargs -0 grep -h 
"Comet cannot accelerate" | sed 's/.*Comet cannot accelerate/Comet cannot 
accelerate/' | sort -u > fallback.log
diff --git a/dev/diffs/3.5.8.diff b/dev/diffs/3.5.8.diff
index c617fa75b..10f579da6 100644
--- a/dev/diffs/3.5.8.diff
+++ b/dev/diffs/3.5.8.diff
@@ -1,5 +1,5 @@
 diff --git a/pom.xml b/pom.xml
-index a0e25ce4d8d..b95fba458f2 100644
+index edd2ad57880..77a975ea48f 100644
 --- a/pom.xml
 +++ b/pom.xml
 @@ -152,6 +152,8 @@
@@ -38,7 +38,7 @@ index a0e25ce4d8d..b95fba458f2 100644
    </dependencyManagement>
  
 diff --git a/sql/core/pom.xml b/sql/core/pom.xml
-index e3d324c8edb..22342150522 100644
+index bc00c448b80..82068d7a2eb 100644
 --- a/sql/core/pom.xml
 +++ b/sql/core/pom.xml
 @@ -77,6 +77,10 @@
@@ -238,6 +238,20 @@ index e5494726695..00937f025c2 100644
    }
  
    test("A cached table preserves the partitioning and ordering of its cached 
SparkPlan") {
+diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
+index 9e8d77c53f3..855e3ada7d1 100644
+--- a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
++++ b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
+@@ -790,7 +790,8 @@ class ColumnExpressionSuite extends QueryTest with 
SharedSparkSession {
+     }
+   }
+ 
+-  test("input_file_name, input_file_block_start, input_file_block_length - 
FileScanRDD") {
++  test("input_file_name, input_file_block_start, input_file_block_length - 
FileScanRDD",
++    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3312";))
 {
+     withTempPath { dir =>
+       val data = sparkContext.parallelize(0 to 10).toDF("id")
+       data.write.parquet(dir.getCanonicalPath)
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
 index 6f3090d8908..c08a60fb0c2 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
@@ -374,7 +388,7 @@ index a1d5d579338..c201d39cc78 100644
            }
          case _ => false
 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
-index f32b32ffc5a..447d7c6416e 100644
+index c4fb4fa943c..a04b23870a8 100644
 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
 +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
 @@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.plans.{LeftAnti, 
LeftSemi}
@@ -396,7 +410,7 @@ index f32b32ffc5a..447d7c6416e 100644
      assert(exchanges.size == 2)
    }
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
-index f33432ddb6f..0e1499a24ca 100644
+index f33432ddb6f..42eb9fd1cb7 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
 +++ 
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
 @@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen
@@ -447,7 +461,17 @@ index f33432ddb6f..0e1499a24ca 100644
      withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> 
"true") {
        val df = sql(
          """ WITH v as (
-@@ -1729,6 +1736,8 @@ abstract class DynamicPartitionPruningV1Suite extends 
DynamicPartitionPruningDat
+@@ -1698,7 +1705,8 @@ abstract class DynamicPartitionPruningV1Suite extends 
DynamicPartitionPruningDat
+    * Check the static scan metrics with and without DPP
+    */
+   test("static scan metrics",
+-    DisableAdaptiveExecution("DPP in AQE must reuse broadcast")) {
++    DisableAdaptiveExecution("DPP in AQE must reuse broadcast"),
++    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3313";))
 {
+     withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
+       SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
+       SQLConf.EXCHANGE_REUSE_ENABLED.key -> "false") {
+@@ -1729,6 +1737,8 @@ abstract class DynamicPartitionPruningV1Suite extends 
DynamicPartitionPruningDat
                case s: BatchScanExec =>
                  // we use f1 col for v2 tables due to schema pruning
                  s.output.exists(_.exists(_.argString(maxFields = 
100).contains("f1")))
@@ -457,10 +481,20 @@ index f33432ddb6f..0e1499a24ca 100644
              }
            assert(scanOption.isDefined)
 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
-index a206e97c353..fea1149b67d 100644
+index a206e97c353..79813d8e259 100644
 --- a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
 +++ b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
-@@ -467,7 +467,8 @@ class ExplainSuite extends ExplainSuiteHelper with 
DisableAdaptiveExecutionSuite
+@@ -280,7 +280,8 @@ class ExplainSuite extends ExplainSuiteHelper with 
DisableAdaptiveExecutionSuite
+     }
+   }
+ 
+-  test("explain formatted - check presence of subquery in case of DPP") {
++  test("explain formatted - check presence of subquery in case of DPP",
++    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3313";))
 {
+     withTable("df1", "df2") {
+       withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
+         SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
+@@ -467,7 +468,8 @@ class ExplainSuite extends ExplainSuiteHelper with 
DisableAdaptiveExecutionSuite
      }
    }
  
@@ -470,7 +504,7 @@ index a206e97c353..fea1149b67d 100644
      withTempDir { dir =>
        Seq("parquet", "orc", "csv", "json").foreach { fmt =>
          val basePath = dir.getCanonicalPath + "/" + fmt
-@@ -545,7 +546,9 @@ class ExplainSuite extends ExplainSuiteHelper with 
DisableAdaptiveExecutionSuite
+@@ -545,7 +547,9 @@ class ExplainSuite extends ExplainSuiteHelper with 
DisableAdaptiveExecutionSuite
    }
  }
  
@@ -482,7 +516,7 @@ index a206e97c353..fea1149b67d 100644
  
    test("SPARK-35884: Explain Formatted") {
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
-index 93275487f29..33b2e7ad3b1 100644
+index 93275487f29..510e3087e0f 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
 +++ 
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
 @@ -23,6 +23,7 @@ import java.nio.file.{Files, StandardOpenOption}
@@ -510,7 +544,17 @@ index 93275487f29..33b2e7ad3b1 100644
              checkErrorMatchPVals(
                exception = intercept[SparkException] {
                  testIgnoreMissingFiles(options)
-@@ -955,6 +959,7 @@ class FileBasedDataSourceSuite extends QueryTest
+@@ -639,7 +643,8 @@ class FileBasedDataSourceSuite extends QueryTest
+   }
+ 
+   Seq("parquet", "orc").foreach { format =>
+-    test(s"Spark native readers should respect spark.sql.caseSensitive - 
${format}") {
++    test(s"Spark native readers should respect spark.sql.caseSensitive - 
${format}",
++      
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311";))
 {
+       withTempDir { dir =>
+         val tableName = s"spark_25132_${format}_native"
+         val tableDir = dir.getCanonicalPath + s"/$tableName"
+@@ -955,6 +960,7 @@ class FileBasedDataSourceSuite extends QueryTest
              assert(bJoinExec.isEmpty)
              val smJoinExec = collect(joinedDF.queryExecution.executedPlan) {
                case smJoin: SortMergeJoinExec => smJoin
@@ -518,7 +562,7 @@ index 93275487f29..33b2e7ad3b1 100644
              }
              assert(smJoinExec.nonEmpty)
            }
-@@ -1015,6 +1020,7 @@ class FileBasedDataSourceSuite extends QueryTest
+@@ -1015,6 +1021,7 @@ class FileBasedDataSourceSuite extends QueryTest
  
            val fileScan = df.queryExecution.executedPlan collectFirst {
              case BatchScanExec(_, f: FileScan, _, _, _, _) => f
@@ -526,7 +570,7 @@ index 93275487f29..33b2e7ad3b1 100644
            }
            assert(fileScan.nonEmpty)
            assert(fileScan.get.partitionFilters.nonEmpty)
-@@ -1056,6 +1062,7 @@ class FileBasedDataSourceSuite extends QueryTest
+@@ -1056,6 +1063,7 @@ class FileBasedDataSourceSuite extends QueryTest
  
            val fileScan = df.queryExecution.executedPlan collectFirst {
              case BatchScanExec(_, f: FileScan, _, _, _, _) => f
@@ -534,7 +578,7 @@ index 93275487f29..33b2e7ad3b1 100644
            }
            assert(fileScan.nonEmpty)
            assert(fileScan.get.partitionFilters.isEmpty)
-@@ -1240,6 +1247,9 @@ class FileBasedDataSourceSuite extends QueryTest
+@@ -1240,6 +1248,9 @@ class FileBasedDataSourceSuite extends QueryTest
            val filters = df.queryExecution.executedPlan.collect {
              case f: FileSourceScanLike => f.dataFilters
              case b: BatchScanExec => b.scan.asInstanceOf[FileScan].dataFilters
@@ -546,7 +590,7 @@ index 93275487f29..33b2e7ad3b1 100644
          }
 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/IgnoreComet.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/IgnoreComet.scala
 new file mode 100644
-index 00000000000..5691536c114
+index 00000000000..1ee842b6f62
 --- /dev/null
 +++ b/sql/core/src/test/scala/org/apache/spark/sql/IgnoreComet.scala
 @@ -0,0 +1,45 @@
@@ -586,8 +630,8 @@ index 00000000000..5691536c114
 + * Helper trait that disables Comet for all tests regardless of default 
config values.
 + */
 +trait IgnoreCometSuite extends SQLTestUtils {
-+  override protected def test(testName: String, testTags: Tag*)(testFun: => 
Any)
-+    (implicit pos: Position): Unit = {
++  override protected def test(testName: String, testTags: Tag*)(testFun: => 
Any)(implicit
++      pos: Position): Unit = {
 +    if (isCometEnabled) {
 +      ignore(testName + " (disabled when Comet is on)", testTags: _*)(testFun)
 +    } else {
@@ -1040,6 +1084,20 @@ index 04702201f82..5ee11f83ecf 100644
        }
        assert(exchanges.size === 1)
      }
+diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
+index 9f8e979e3fb..3bc9dab8023 100644
+--- a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
++++ b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
+@@ -87,7 +87,8 @@ class UDFSuite extends QueryTest with SharedSparkSession {
+     spark.catalog.dropTempView("tmp_table")
+   }
+ 
+-  test("SPARK-8005 input_file_name") {
++  test("SPARK-8005 input_file_name",
++    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3312";))
 {
+     withTempPath { dir =>
+       val data = sparkContext.parallelize(0 to 10, 2).toDF("id")
+       data.write.parquet(dir.getCanonicalPath)
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
 index d269290e616..13726a31e07 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
@@ -1104,18 +1162,31 @@ index d269290e616..13726a31e07 100644
                }
              }
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala
-index cfc8b2cc845..c6fcfd7bd08 100644
+index cfc8b2cc845..b7c234e1437 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala
 +++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala
-@@ -21,6 +21,7 @@ import scala.collection.mutable.ArrayBuffer
+@@ -19,8 +19,9 @@ package org.apache.spark.sql.connector
+ import scala.collection.mutable.ArrayBuffer
+ 
  import org.apache.spark.SparkConf
- import org.apache.spark.sql.{AnalysisException, QueryTest}
+-import org.apache.spark.sql.{AnalysisException, QueryTest}
++import org.apache.spark.sql.{AnalysisException, IgnoreCometNativeDataFusion, 
QueryTest}
  import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 +import org.apache.spark.sql.comet.{CometNativeScanExec, CometScanExec}
  import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, 
Table, TableCapability}
  import org.apache.spark.sql.connector.read.ScanBuilder
  import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
-@@ -184,7 +185,11 @@ class FileDataSourceV2FallBackSuite extends QueryTest 
with SharedSparkSession {
+@@ -152,7 +153,8 @@ class FileDataSourceV2FallBackSuite extends QueryTest with 
SharedSparkSession {
+     }
+   }
+ 
+-  test("Fallback Parquet V2 to V1") {
++  test("Fallback Parquet V2 to V1",
++    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3315";))
 {
+     Seq("parquet", classOf[ParquetDataSourceV2].getCanonicalName).foreach { 
format =>
+       withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> format) {
+         val commands = ArrayBuffer.empty[(String, LogicalPlan)]
+@@ -184,7 +186,11 @@ class FileDataSourceV2FallBackSuite extends QueryTest 
with SharedSparkSession {
              val df = spark.read.format(format).load(path.getCanonicalPath)
              checkAnswer(df, inputData.toDF())
              assert(
@@ -1379,6 +1450,28 @@ index 47679ed7865..9ffbaecb98e 100644
      }.length == hashAggCount)
      assert(collectWithSubqueries(plan) { case s: SortAggregateExec => s 
}.length == sortAggCount)
    }
+diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
+index a1147c16cc8..c7a29496328 100644
+--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
++++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
+@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution
+ 
+ import org.apache.spark.{SparkArithmeticException, SparkException, 
SparkFileNotFoundException}
+ import org.apache.spark.sql._
++import org.apache.spark.sql.IgnoreCometNativeDataFusion
+ import org.apache.spark.sql.catalyst.TableIdentifier
+ import org.apache.spark.sql.catalyst.expressions.{Add, Alias, Divide}
+ import org.apache.spark.sql.catalyst.parser.ParseException
+@@ -968,7 +969,8 @@ abstract class SQLViewSuite extends QueryTest with 
SQLTestUtils {
+     }
+   }
+ 
+-  test("alter temporary view should follow current storeAnalyzedPlanForView 
config") {
++  test("alter temporary view should follow current storeAnalyzedPlanForView 
config",
++    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3314";))
 {
+     withTable("t") {
+       Seq(2, 3, 1).toDF("c1").write.format("parquet").saveAsTable("t")
+       withView("v1") {
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
 index eec396b2e39..bf3f1c769d6 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
@@ -1877,7 +1970,7 @@ index 2f8e401e743..a4f94417dcc 100644
          assert(o1.semanticEquals(o2), "Different output column order after 
AQE optimization")
        }
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala
-index bf496d6db21..9bb57a9b4c6 100644
+index fd52d038ca6..154c800be67 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala
 +++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala
 @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.expressions.Concat
@@ -1888,7 +1981,7 @@ index bf496d6db21..9bb57a9b4c6 100644
  import org.apache.spark.sql.execution.FileSourceScanExec
  import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
  import org.apache.spark.sql.functions._
-@@ -868,6 +869,8 @@ abstract class SchemaPruningSuite
+@@ -884,6 +885,8 @@ abstract class SchemaPruningSuite
      val fileSourceScanSchemata =
        collect(df.queryExecution.executedPlan) {
          case scan: FileSourceScanExec => scan.requiredSchema
@@ -1898,7 +1991,7 @@ index bf496d6db21..9bb57a9b4c6 100644
      assert(fileSourceScanSchemata.size === expectedSchemaCatalogStrings.size,
        s"Found ${fileSourceScanSchemata.size} file sources in dataframe, " +
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala
-index ce43edb79c1..4dbb5942bc3 100644
+index 5fd27410dcb..468abb1543a 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala
 +++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala
 @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources
@@ -1909,7 +2002,7 @@ index ce43edb79c1..4dbb5942bc3 100644
  import org.apache.spark.sql.execution.{QueryExecution, SortExec}
  import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
  import org.apache.spark.sql.internal.SQLConf
-@@ -225,6 +226,7 @@ class V1WriteCommandSuite extends QueryTest with 
SharedSparkSession with V1Write
+@@ -243,6 +244,7 @@ class V1WriteCommandSuite extends QueryTest with 
SharedSparkSession with V1Write
            // assert the outer most sort in the executed plan
            assert(plan.collectFirst {
              case s: SortExec => s
@@ -1917,7 +2010,7 @@ index ce43edb79c1..4dbb5942bc3 100644
            }.exists {
              case SortExec(Seq(
                SortOrder(AttributeReference("key", IntegerType, _, _), 
Ascending, NullsFirst, _),
-@@ -272,6 +274,7 @@ class V1WriteCommandSuite extends QueryTest with 
SharedSparkSession with V1Write
+@@ -290,6 +292,7 @@ class V1WriteCommandSuite extends QueryTest with 
SharedSparkSession with V1Write
          // assert the outer most sort in the executed plan
          assert(plan.collectFirst {
            case s: SortExec => s
@@ -1972,8 +2065,124 @@ index 07e2849ce6f..3e73645b638 100644
      val extraOptions = Map[String, String](
        ParquetOutputFormat.WRITER_VERSION -> 
ParquetProperties.WriterVersion.PARQUET_2_0.toString
      )
+diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFieldIdIOSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFieldIdIOSuite.scala
+index 5e01d3f447c..284d6657d4f 100644
+--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFieldIdIOSuite.scala
++++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFieldIdIOSuite.scala
+@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources.parquet
+ import scala.collection.JavaConverters._
+ 
+ import org.apache.spark.SparkException
+-import org.apache.spark.sql.{QueryTest, Row}
++import org.apache.spark.sql.{IgnoreCometNativeDataFusion, QueryTest, Row}
+ import org.apache.spark.sql.internal.SQLConf
+ import org.apache.spark.sql.test.SharedSparkSession
+ import org.apache.spark.sql.types.{ArrayType, IntegerType, MapType, Metadata, 
MetadataBuilder, StringType, StructType}
+@@ -30,7 +30,8 @@ class ParquetFieldIdIOSuite extends QueryTest with 
ParquetTest with SharedSparkS
+   private def withId(id: Int): Metadata =
+     new MetadataBuilder().putLong(ParquetUtils.FIELD_ID_METADATA_KEY, 
id).build()
+ 
+-  test("Parquet reads infer fields using field ids correctly") {
++  test("Parquet reads infer fields using field ids correctly",
++    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3316";))
 {
+     withTempDir { dir =>
+       val readSchema =
+         new StructType()
+@@ -78,7 +79,8 @@ class ParquetFieldIdIOSuite extends QueryTest with 
ParquetTest with SharedSparkS
+     }
+   }
+ 
+-  test("absence of field ids") {
++  test("absence of field ids",
++    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3316";))
 {
+     withTempDir { dir =>
+       val readSchema =
+         new StructType()
+@@ -107,7 +109,8 @@ class ParquetFieldIdIOSuite extends QueryTest with 
ParquetTest with SharedSparkS
+     }
+   }
+ 
+-  test("SPARK-38094: absence of field ids: reading nested schema") {
++  test("SPARK-38094: absence of field ids: reading nested schema",
++    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3316";))
 {
+     withTempDir { dir =>
+       // now with nested schema/complex type
+       val readSchema =
+@@ -136,7 +139,8 @@ class ParquetFieldIdIOSuite extends QueryTest with 
ParquetTest with SharedSparkS
+     }
+   }
+ 
+-  test("multiple id matches") {
++  test("multiple id matches",
++    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3316";))
 {
+     withTempDir { dir =>
+       val readSchema =
+         new StructType()
+@@ -163,7 +167,8 @@ class ParquetFieldIdIOSuite extends QueryTest with 
ParquetTest with SharedSparkS
+     }
+   }
+ 
+-  test("read parquet file without ids") {
++  test("read parquet file without ids",
++    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3316";))
 {
+     withTempDir { dir =>
+       val readSchema =
+         new StructType()
+@@ -196,7 +201,8 @@ class ParquetFieldIdIOSuite extends QueryTest with 
ParquetTest with SharedSparkS
+     }
+   }
+ 
+-  test("global read/write flag should work correctly") {
++  test("global read/write flag should work correctly",
++    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3316";))
 {
+     withTempDir { dir =>
+       val readSchema =
+         new StructType()
+diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileMetadataStructRowIndexSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileMetadataStructRowIndexSuite.scala
+index c10e1799702..ba6629abfd9 100644
+--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileMetadataStructRowIndexSuite.scala
++++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileMetadataStructRowIndexSuite.scala
+@@ -16,7 +16,7 @@
+  */
+ package org.apache.spark.sql.execution.datasources.parquet
+ 
+-import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest}
++import org.apache.spark.sql.{AnalysisException, DataFrame, 
IgnoreCometNativeDataFusion, QueryTest}
+ import org.apache.spark.sql.execution.datasources.FileFormat
+ import org.apache.spark.sql.functions.{col, lit}
+ import org.apache.spark.sql.internal.SQLConf
+@@ -154,7 +154,8 @@ class ParquetFileMetadataStructRowIndexSuite extends 
QueryTest with SharedSparkS
+     }
+   }
+ 
+-  test(s"reading ${ROW_INDEX_TEMPORARY_COLUMN_NAME} - not present in a 
table") {
++  test(s"reading ${ROW_INDEX_TEMPORARY_COLUMN_NAME} - not present in a table",
++    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3317";))
 {
+     // File format supporting row index generation populates the column with 
row indexes.
+     withReadDataFrame("parquet", extraSchemaFields =
+         Seq(StructField(ROW_INDEX_TEMPORARY_COLUMN_NAME, LongType))) { df =>
+@@ -172,7 +173,8 @@ class ParquetFileMetadataStructRowIndexSuite extends 
QueryTest with SharedSparkS
+     }
+   }
+ 
+-  test(s"reading ${ROW_INDEX_TEMPORARY_COLUMN_NAME} - present in a table") {
++  test(s"reading ${ROW_INDEX_TEMPORARY_COLUMN_NAME} - present in a table",
++    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3317";))
 {
+     withReadDataFrame("parquet", extraCol = ROW_INDEX_TEMPORARY_COLUMN_NAME) 
{ df =>
+       // Values of ROW_INDEX_TEMPORARY_COLUMN_NAME column are always 
populated with
+       // generated row indexes, rather than read from the file.
+@@ -189,7 +191,8 @@ class ParquetFileMetadataStructRowIndexSuite extends 
QueryTest with SharedSparkS
+     }
+   }
+ 
+-  test(s"reading ${ROW_INDEX_TEMPORARY_COLUMN_NAME} - as partition col") {
++  test(s"reading ${ROW_INDEX_TEMPORARY_COLUMN_NAME} - as partition col",
++    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3317";))
 {
+     withReadDataFrame("parquet", partitionCol = 
ROW_INDEX_TEMPORARY_COLUMN_NAME) { df =>
+       // Column values are set for each partition, rather than populated with 
generated row indexes.
+       assert(df
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
-index 8e88049f51e..8f3cf8a0f80 100644
+index 8e88049f51e..49f2001dc6b 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
 +++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
 @@ -1095,7 +1095,11 @@ abstract class ParquetFilterSuite extends QueryTest 
with ParquetTest with Shared
@@ -1999,7 +2208,17 @@ index 8e88049f51e..8f3cf8a0f80 100644
      import testImplicits._
  
      withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true",
-@@ -1580,7 +1585,11 @@ abstract class ParquetFilterSuite extends QueryTest 
with ParquetTest with Shared
+@@ -1548,7 +1553,8 @@ abstract class ParquetFilterSuite extends QueryTest with 
ParquetTest with Shared
+     }
+   }
+ 
+-  test("SPARK-31026: Parquet predicate pushdown for fields having dots in the 
names") {
++  test("SPARK-31026: Parquet predicate pushdown for fields having dots in the 
names",
++    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3320";))
 {
+     import testImplicits._
+ 
+     withAllParquetReaders {
+@@ -1580,13 +1586,18 @@ abstract class ParquetFilterSuite extends QueryTest 
with ParquetTest with Shared
            // than the total length but should not be a single record.
            // Note that, if record level filtering is enabled, it should be a 
single record.
            // If no filter is pushed down to Parquet, it should be the total 
length of data.
@@ -2012,7 +2231,15 @@ index 8e88049f51e..8f3cf8a0f80 100644
          }
        }
      }
-@@ -1607,7 +1616,11 @@ abstract class ParquetFilterSuite extends QueryTest 
with ParquetTest with Shared
+   }
+ 
+-  test("Filters should be pushed down for Parquet readers at row group 
level") {
++  test("Filters should be pushed down for Parquet readers at row group level",
++    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3320";))
 {
+     import testImplicits._
+ 
+     withSQLConf(
+@@ -1607,7 +1618,11 @@ abstract class ParquetFilterSuite extends QueryTest 
with ParquetTest with Shared
          // than the total length but should not be a single record.
          // Note that, if record level filtering is enabled, it should be a 
single record.
          // If no filter is pushed down to Parquet, it should be the total 
length of data.
@@ -2025,7 +2252,7 @@ index 8e88049f51e..8f3cf8a0f80 100644
        }
      }
    }
-@@ -1699,7 +1712,7 @@ abstract class ParquetFilterSuite extends QueryTest with 
ParquetTest with Shared
+@@ -1699,7 +1714,7 @@ abstract class ParquetFilterSuite extends QueryTest with 
ParquetTest with Shared
        (attr, value) => sources.StringContains(attr, value))
    }
  
@@ -2034,7 +2261,7 @@ index 8e88049f51e..8f3cf8a0f80 100644
      import testImplicits._
      // keep() should take effect on StartsWith/EndsWith/Contains
      Seq(
-@@ -1743,7 +1756,8 @@ abstract class ParquetFilterSuite extends QueryTest with 
ParquetTest with Shared
+@@ -1743,7 +1758,8 @@ abstract class ParquetFilterSuite extends QueryTest with 
ParquetTest with Shared
      }
    }
  
@@ -2044,7 +2271,17 @@ index 8e88049f51e..8f3cf8a0f80 100644
      val schema = StructType(Seq(
        StructField("a", IntegerType, nullable = false)
      ))
-@@ -1984,7 +1998,8 @@ abstract class ParquetFilterSuite extends QueryTest with 
ParquetTest with Shared
+@@ -1933,7 +1949,8 @@ abstract class ParquetFilterSuite extends QueryTest with 
ParquetTest with Shared
+     }
+   }
+ 
+-  test("SPARK-25207: exception when duplicate fields in case-insensitive 
mode") {
++  test("SPARK-25207: exception when duplicate fields in case-insensitive 
mode",
++    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311";))
 {
+     withTempPath { dir =>
+       val count = 10
+       val tableName = "spark_25207"
+@@ -1984,7 +2001,8 @@ abstract class ParquetFilterSuite extends QueryTest with 
ParquetTest with Shared
      }
    }
  
@@ -2054,7 +2291,7 @@ index 8e88049f51e..8f3cf8a0f80 100644
      // block 1:
      //                      null count  min                                   
    max
      // page-0                         0  0                                    
     99
-@@ -2044,7 +2059,8 @@ abstract class ParquetFilterSuite extends QueryTest with 
ParquetTest with Shared
+@@ -2044,7 +2062,8 @@ abstract class ParquetFilterSuite extends QueryTest with 
ParquetTest with Shared
      }
    }
  
@@ -2064,7 +2301,7 @@ index 8e88049f51e..8f3cf8a0f80 100644
      withTempPath { dir =>
        val path = dir.getCanonicalPath
        spark.range(100).selectExpr("id * 2 AS id")
-@@ -2276,7 +2292,11 @@ class ParquetV1FilterSuite extends ParquetFilterSuite {
+@@ -2276,7 +2295,11 @@ class ParquetV1FilterSuite extends ParquetFilterSuite {
            assert(pushedParquetFilters.exists(_.getClass === filterClass),
              s"${pushedParquetFilters.map(_.getClass).toList} did not contain 
${filterClass}.")
  
@@ -2077,7 +2314,7 @@ index 8e88049f51e..8f3cf8a0f80 100644
          } else {
            assert(selectedFilters.isEmpty, "There is filter pushed down")
          }
-@@ -2336,7 +2356,11 @@ class ParquetV2FilterSuite extends ParquetFilterSuite {
+@@ -2336,7 +2359,11 @@ class ParquetV2FilterSuite extends ParquetFilterSuite {
            assert(pushedParquetFilters.exists(_.getClass === filterClass),
              s"${pushedParquetFilters.map(_.getClass).toList} did not contain 
${filterClass}.")
  
@@ -2091,10 +2328,30 @@ index 8e88049f51e..8f3cf8a0f80 100644
          case _ =>
            throw new AnalysisException("Can not match ParquetTable in the 
query.")
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
-index 8ed9ef1630e..eed2a6f5ad5 100644
+index 8ed9ef1630e..f312174b182 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
 +++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
-@@ -1345,7 +1345,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest 
with SharedSparkSession
+@@ -1064,7 +1064,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest 
with SharedSparkSession
+     }
+   }
+ 
+-  test("SPARK-35640: read binary as timestamp should throw schema 
incompatible error") {
++  test("SPARK-35640: read binary as timestamp should throw schema 
incompatible error",
++    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311";))
 {
+     val data = (1 to 4).map(i => Tuple1(i.toString))
+     val readSchema = StructType(Seq(StructField("_1", 
DataTypes.TimestampType)))
+ 
+@@ -1075,7 +1076,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest 
with SharedSparkSession
+     }
+   }
+ 
+-  test("SPARK-35640: int as long should throw schema incompatible error") {
++  test("SPARK-35640: int as long should throw schema incompatible error",
++    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311";))
 {
+     val data = (1 to 4).map(i => Tuple1(i))
+     val readSchema = StructType(Seq(StructField("_1", DataTypes.LongType)))
+ 
+@@ -1345,7 +1347,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest 
with SharedSparkSession
      }
    }
  
@@ -2105,10 +2362,20 @@ index 8ed9ef1630e..eed2a6f5ad5 100644
        checkAnswer(
          // "fruit" column in this file is encoded using 
DELTA_LENGTH_BYTE_ARRAY.
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
-index f6472ba3d9d..7a8f5317ed7 100644
+index f6472ba3d9d..ce39ebb52e6 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
 +++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
-@@ -998,7 +998,8 @@ abstract class ParquetQuerySuite extends QueryTest with 
ParquetTest with SharedS
+@@ -185,7 +185,8 @@ abstract class ParquetQuerySuite extends QueryTest with 
ParquetTest with SharedS
+     }
+   }
+ 
+-  test("SPARK-36182: can't read TimestampLTZ as TimestampNTZ") {
++  test("SPARK-36182: can't read TimestampLTZ as TimestampNTZ",
++    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311";))
 {
+     val data = (1 to 1000).map { i =>
+       val ts = new java.sql.Timestamp(i)
+       Row(ts)
+@@ -998,7 +999,8 @@ abstract class ParquetQuerySuite extends QueryTest with 
ParquetTest with SharedS
      }
    }
  
@@ -2118,7 +2385,17 @@ index f6472ba3d9d..7a8f5317ed7 100644
      withAllParquetReaders {
        withTempPath { path =>
          // Repeated values for dictionary encoding.
-@@ -1067,7 +1068,8 @@ abstract class ParquetQuerySuite extends QueryTest with 
ParquetTest with SharedS
+@@ -1051,7 +1053,8 @@ abstract class ParquetQuerySuite extends QueryTest with 
ParquetTest with SharedS
+     testMigration(fromTsType = "TIMESTAMP_MICROS", toTsType = "INT96")
+   }
+ 
+-  test("SPARK-34212 Parquet should read decimals correctly") {
++  test("SPARK-34212 Parquet should read decimals correctly",
++    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311";))
 {
+     def readParquet(schema: String, path: File): DataFrame = {
+       spark.read.schema(schema).parquet(path.toString)
+     }
+@@ -1067,7 +1070,8 @@ abstract class ParquetQuerySuite extends QueryTest with 
ParquetTest with SharedS
          checkAnswer(readParquet(schema, path), df)
        }
  
@@ -2128,7 +2405,7 @@ index f6472ba3d9d..7a8f5317ed7 100644
          val schema1 = "a DECIMAL(3, 2), b DECIMAL(18, 3), c DECIMAL(37, 3)"
          checkAnswer(readParquet(schema1, path), df)
          val schema2 = "a DECIMAL(3, 0), b DECIMAL(18, 1), c DECIMAL(37, 1)"
-@@ -1089,7 +1091,8 @@ abstract class ParquetQuerySuite extends QueryTest with 
ParquetTest with SharedS
+@@ -1089,7 +1093,8 @@ abstract class ParquetQuerySuite extends QueryTest with 
ParquetTest with SharedS
        val df = sql(s"SELECT 1 a, 123456 b, ${Int.MaxValue.toLong * 10} c, 
CAST('1.2' AS BINARY) d")
        df.write.parquet(path.toString)
  
@@ -2138,7 +2415,17 @@ index f6472ba3d9d..7a8f5317ed7 100644
          checkAnswer(readParquet("a DECIMAL(3, 2)", path), sql("SELECT 1.00"))
          checkAnswer(readParquet("b DECIMAL(3, 2)", path), Row(null))
          checkAnswer(readParquet("b DECIMAL(11, 1)", path), sql("SELECT 
123456.0"))
-@@ -1148,7 +1151,7 @@ abstract class ParquetQuerySuite extends QueryTest with 
ParquetTest with SharedS
+@@ -1133,7 +1138,8 @@ abstract class ParquetQuerySuite extends QueryTest with 
ParquetTest with SharedS
+     }
+   }
+ 
+-  test("row group skipping doesn't overflow when reading into larger type") {
++  test("row group skipping doesn't overflow when reading into larger type",
++    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311";))
 {
+     withTempPath { path =>
+       Seq(0).toDF("a").write.parquet(path.toString)
+       // The vectorized and non-vectorized readers will produce different 
exceptions, we don't need
+@@ -1148,7 +1154,7 @@ abstract class ParquetQuerySuite extends QueryTest with 
ParquetTest with SharedS
              .where(s"a < ${Long.MaxValue}")
              .collect()
          }
@@ -2243,14 +2530,14 @@ index 5c0b7def039..151184bc98c 100644
      assert(fileSourceScanSchemata.size === expectedSchemaCatalogStrings.size,
        s"Found ${fileSourceScanSchemata.size} file sources in dataframe, " +
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
-index 3f47c5e506f..bc1ee1ec0ba 100644
+index 3f47c5e506f..92a5eafec84 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
 +++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
 @@ -27,6 +27,7 @@ import 
org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
  import org.apache.parquet.schema.Type._
  
  import org.apache.spark.SparkException
-+import org.apache.spark.sql.IgnoreComet
++import org.apache.spark.sql.{IgnoreComet, IgnoreCometNativeDataFusion}
  import org.apache.spark.sql.catalyst.expressions.Cast.toSQLType
  import 
org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException
  import org.apache.spark.sql.functions.desc
@@ -2264,6 +2551,26 @@ index 3f47c5e506f..bc1ee1ec0ba 100644
      withTempPath { dir =>
        val e = testSchemaMismatch(dir.getCanonicalPath, 
vectorizedReaderEnabled = false)
        val expectedMessage = "Encountered error while reading file"
+@@ -1046,7 +1048,8 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
+     }
+   }
+ 
+-  test("schema mismatch failure error message for parquet vectorized reader") 
{
++  test("schema mismatch failure error message for parquet vectorized reader",
++      
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311";))
 {
+     withTempPath { dir =>
+       val e = testSchemaMismatch(dir.getCanonicalPath, 
vectorizedReaderEnabled = true)
+       assert(e.getCause.isInstanceOf[SparkException])
+@@ -1087,7 +1090,8 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
+     }
+   }
+ 
+-  test("SPARK-45604: schema mismatch failure error on timestamp_ntz to 
array<timestamp_ntz>") {
++  test("SPARK-45604: schema mismatch failure error on timestamp_ntz to 
array<timestamp_ntz>",
++    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311";))
 {
+     import testImplicits._
+ 
+     withTempPath { dir =>
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala
 index b8f3ea3c6f3..bbd44221288 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala
@@ -2313,18 +2620,30 @@ index 5cdbdc27b32..307fba16578 100644
        spark.range(10).selectExpr("id", "id % 3 as p")
          .write.partitionBy("p").saveAsTable("testDataForScan")
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala
-index 0ab8691801d..d9125f658ad 100644
+index 0ab8691801d..7b81f3a8f6d 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala
 +++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala
-@@ -18,6 +18,7 @@
+@@ -17,7 +17,9 @@
+ 
  package org.apache.spark.sql.execution.python
  
++import org.apache.spark.sql.IgnoreCometNativeDataFusion
  import org.apache.spark.sql.catalyst.plans.logical.{ArrowEvalPython, 
BatchEvalPython, Limit, LocalLimit}
 +import org.apache.spark.sql.comet._
  import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan, 
SparkPlanTest}
  import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
  import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
-@@ -108,6 +109,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with 
SharedSparkSession {
+@@ -93,7 +95,8 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with 
SharedSparkSession {
+     assert(arrowEvalNodes.size == 2)
+   }
+ 
+-  test("Python UDF should not break column pruning/filter pushdown -- Parquet 
V1") {
++  test("Python UDF should not break column pruning/filter pushdown -- Parquet 
V1",
++    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3312";))
 {
+     withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") {
+       withTempPath { f =>
+         spark.range(10).select($"id".as("a"), $"id".as("b"))
+@@ -108,6 +111,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with 
SharedSparkSession {
  
            val scanNodes = query.queryExecution.executedPlan.collect {
              case scan: FileSourceScanExec => scan
@@ -2332,7 +2651,7 @@ index 0ab8691801d..d9125f658ad 100644
            }
            assert(scanNodes.length == 1)
            assert(scanNodes.head.output.map(_.name) == Seq("a"))
-@@ -120,11 +122,16 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with 
SharedSparkSession {
+@@ -120,11 +124,16 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with 
SharedSparkSession {
  
            val scanNodes = query.queryExecution.executedPlan.collect {
              case scan: FileSourceScanExec => scan
@@ -2351,7 +2670,7 @@ index 0ab8691801d..d9125f658ad 100644
          }
        }
      }
-@@ -145,6 +152,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with 
SharedSparkSession {
+@@ -145,6 +154,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with 
SharedSparkSession {
  
            val scanNodes = query.queryExecution.executedPlan.collect {
              case scan: BatchScanExec => scan
@@ -2359,7 +2678,7 @@ index 0ab8691801d..d9125f658ad 100644
            }
            assert(scanNodes.length == 1)
            assert(scanNodes.head.output.map(_.name) == Seq("a"))
-@@ -157,6 +165,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with 
SharedSparkSession {
+@@ -157,6 +167,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with 
SharedSparkSession {
  
            val scanNodes = query.queryExecution.executedPlan.collect {
              case scan: BatchScanExec => scan
@@ -2384,7 +2703,7 @@ index d083cac48ff..3c11bcde807 100644
    import testImplicits._
  
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
-index 746f289c393..a773971d3c1 100644
+index 746f289c393..5b9e31c1fa6 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
 +++ 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
 @@ -19,16 +19,19 @@ package org.apache.spark.sql.sources
@@ -2505,7 +2824,14 @@ index 746f289c393..a773971d3c1 100644
            s"expected sort in the right child to be $sortRight but 
found\n${joinOperator.right}")
  
          // check the output partitioning
-@@ -836,11 +874,11 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
+@@ -831,16 +869,17 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
+     }
+   }
+ 
+-  test("disable bucketing when the output doesn't contain all bucketing 
columns") {
++  test("disable bucketing when the output doesn't contain all bucketing 
columns",
++    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3319";))
 {
+     withTable("bucketed_table") {
        df1.write.format("parquet").bucketBy(8, 
"i").saveAsTable("bucketed_table")
  
        val scanDF = spark.table("bucketed_table").select("j")
@@ -2519,7 +2845,7 @@ index 746f289c393..a773971d3c1 100644
        checkAnswer(aggDF, df1.groupBy("j").agg(max("k")))
      }
    }
-@@ -895,7 +933,10 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
+@@ -895,7 +934,10 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
    }
  
    test("SPARK-29655 Read bucketed tables obeys spark.sql.shuffle.partitions") 
{
@@ -2530,7 +2856,7 @@ index 746f289c393..a773971d3c1 100644
        SQLConf.SHUFFLE_PARTITIONS.key -> "5",
        SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "7")  {
        val bucketSpec = Some(BucketSpec(6, Seq("i", "j"), Nil))
-@@ -914,7 +955,10 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
+@@ -914,7 +956,10 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
    }
  
    test("SPARK-32767 Bucket join should work if SHUFFLE_PARTITIONS larger than 
bucket number") {
@@ -2541,7 +2867,7 @@ index 746f289c393..a773971d3c1 100644
        SQLConf.SHUFFLE_PARTITIONS.key -> "9",
        SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "10")  {
  
-@@ -944,7 +988,10 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
+@@ -944,7 +989,10 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
    }
  
    test("bucket coalescing eliminates shuffle") {
@@ -2552,7 +2878,17 @@ index 746f289c393..a773971d3c1 100644
        SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true",
        SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
        // The side with bucketedTableTestSpec1 will be coalesced to have 4 
output partitions.
-@@ -1029,15 +1076,21 @@ abstract class BucketedReadSuite extends QueryTest 
with SQLTestUtils with Adapti
+@@ -1013,7 +1061,8 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
+     }
+   }
+ 
+-  test("bucket coalescing is applied when join expressions match with 
partitioning expressions") {
++  test("bucket coalescing is applied when join expressions match with 
partitioning expressions",
++    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3319";))
 {
+     withTable("t1", "t2", "t3") {
+       df1.write.format("parquet").bucketBy(8, "i", "j").saveAsTable("t1")
+       df2.write.format("parquet").bucketBy(4, "i", "j").saveAsTable("t2")
+@@ -1029,15 +1078,21 @@ abstract class BucketedReadSuite extends QueryTest 
with SQLTestUtils with Adapti
            Seq(true, false).foreach { aqeEnabled =>
              withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> 
aqeEnabled.toString) {
                val plan = sql(query).queryExecution.executedPlan
@@ -2601,13 +2937,15 @@ index 6f897a9c0b7..b0723634f68 100644
  
    protected override lazy val sql = spark.sql _
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala
-index d675503a8ba..659fa686fb7 100644
+index d675503a8ba..c386a8cb686 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala
 +++ 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala
-@@ -18,6 +18,7 @@
+@@ -17,7 +17,8 @@
+ 
  package org.apache.spark.sql.sources
  
- import org.apache.spark.sql.QueryTest
+-import org.apache.spark.sql.QueryTest
++import org.apache.spark.sql.{IgnoreCometNativeDataFusion, QueryTest}
 +import org.apache.spark.sql.comet.CometScanExec
  import org.apache.spark.sql.execution.FileSourceScanExec
  import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, 
DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite}
@@ -2624,11 +2962,61 @@ index d675503a8ba..659fa686fb7 100644
        assert(bucketedScan.length == expectedNumBucketedScan)
      }
  
+@@ -83,7 +87,8 @@ abstract class DisableUnnecessaryBucketedScanSuite
+     }
+   }
+ 
+-  test("SPARK-32859: disable unnecessary bucketed table scan - basic test") {
++  test("SPARK-32859: disable unnecessary bucketed table scan - basic test",
++    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3319";))
 {
+     withTable("t1", "t2", "t3") {
+       df1.write.format("parquet").bucketBy(8, "i").saveAsTable("t1")
+       df2.write.format("parquet").bucketBy(8, "i").saveAsTable("t2")
+@@ -124,7 +129,8 @@ abstract class DisableUnnecessaryBucketedScanSuite
+     }
+   }
+ 
+-  test("SPARK-32859: disable unnecessary bucketed table scan - multiple joins 
test") {
++  test("SPARK-32859: disable unnecessary bucketed table scan - multiple joins 
test",
++    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3319";))
 {
+     withTable("t1", "t2", "t3") {
+       df1.write.format("parquet").bucketBy(8, "i").saveAsTable("t1")
+       df2.write.format("parquet").bucketBy(8, "i").saveAsTable("t2")
+@@ -167,7 +173,8 @@ abstract class DisableUnnecessaryBucketedScanSuite
+     }
+   }
+ 
+-  test("SPARK-32859: disable unnecessary bucketed table scan - multiple 
bucketed columns test") {
++  test("SPARK-32859: disable unnecessary bucketed table scan - multiple 
bucketed columns test",
++    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3319";))
 {
+     withTable("t1", "t2", "t3") {
+       df1.write.format("parquet").bucketBy(8, "i", "j").saveAsTable("t1")
+       df2.write.format("parquet").bucketBy(8, "i", "j").saveAsTable("t2")
+@@ -198,7 +205,8 @@ abstract class DisableUnnecessaryBucketedScanSuite
+     }
+   }
+ 
+-  test("SPARK-32859: disable unnecessary bucketed table scan - other 
operators test") {
++  test("SPARK-32859: disable unnecessary bucketed table scan - other 
operators test",
++    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3319";))
 {
+     withTable("t1", "t2", "t3") {
+       df1.write.format("parquet").bucketBy(8, "i").saveAsTable("t1")
+       df2.write.format("parquet").bucketBy(8, "i").saveAsTable("t2")
+@@ -239,7 +247,8 @@ abstract class DisableUnnecessaryBucketedScanSuite
+     }
+   }
+ 
+-  test("Aggregates with no groupby over tables having 1 BUCKET, return 
multiple rows") {
++  test("Aggregates with no groupby over tables having 1 BUCKET, return 
multiple rows",
++    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3319";))
 {
+     withTable("t1") {
+       withSQLConf(SQLConf.AUTO_BUCKETED_SCAN_ENABLED.key -> "true") {
+         sql(
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
-index 1954cce7fdc..73d1464780e 100644
+index 7f6fa2a123e..c778b4e2c48 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
 +++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
-@@ -34,6 +34,7 @@ import org.apache.spark.paths.SparkPath
+@@ -35,6 +35,7 @@ import org.apache.spark.paths.SparkPath
  import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
  import org.apache.spark.sql.{AnalysisException, DataFrame}
  import org.apache.spark.sql.catalyst.util.stringToFile
@@ -2636,7 +3024,7 @@ index 1954cce7fdc..73d1464780e 100644
  import org.apache.spark.sql.execution.DataSourceScanExec
  import org.apache.spark.sql.execution.datasources._
  import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, 
DataSourceV2Relation, FileScan, FileTable}
-@@ -761,6 +762,8 @@ class FileStreamSinkV2Suite extends FileStreamSinkSuite {
+@@ -777,6 +778,8 @@ class FileStreamSinkV2Suite extends FileStreamSinkSuite {
        val fileScan = df.queryExecution.executedPlan.collect {
          case batch: BatchScanExec if batch.scan.isInstanceOf[FileScan] =>
            batch.scan.asInstanceOf[FileScan]
@@ -2758,6 +3146,72 @@ index aad91601758..201083bd621 100644
        })
    }
  
+diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+index b5cf13a9c12..ac17603fb7f 100644
+--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
++++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+@@ -36,7 +36,7 @@ import org.scalatestplus.mockito.MockitoSugar
+ 
+ import org.apache.spark.{SparkException, TestUtils}
+ import org.apache.spark.internal.Logging
+-import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Dataset, 
Row, SaveMode}
++import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Dataset, 
IgnoreCometNativeDataFusion, Row, SaveMode}
+ import org.apache.spark.sql.catalyst.InternalRow
+ import org.apache.spark.sql.catalyst.expressions.{Literal, Rand, Randn, 
Shuffle, Uuid}
+ import org.apache.spark.sql.catalyst.plans.logical.{CTERelationDef, 
CTERelationRef, LocalRelation}
+@@ -660,7 +660,8 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging wi
+     )
+   }
+ 
+-  test("SPARK-41198: input row calculation with CTE") {
++  test("SPARK-41198: input row calculation with CTE",
++    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3315";))
 {
+     withTable("parquet_tbl", "parquet_streaming_tbl") {
+       spark.range(0, 10).selectExpr("id AS col1", "id AS col2")
+         .write.format("parquet").saveAsTable("parquet_tbl")
+@@ -712,7 +713,8 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging wi
+     }
+   }
+ 
+-  test("SPARK-41199: input row calculation with mixed-up of DSv1 and DSv2 
streaming sources") {
++  test("SPARK-41199: input row calculation with mixed-up of DSv1 and DSv2 
streaming sources",
++    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3315";))
 {
+     withTable("parquet_streaming_tbl") {
+       val streamInput = MemoryStream[Int]
+       val streamDf = streamInput.toDF().selectExpr("value AS key", "value AS 
value_stream")
+diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSelfUnionSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSelfUnionSuite.scala
+index 8f099c31e6b..ce4b7ad25b3 100644
+--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSelfUnionSuite.scala
++++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSelfUnionSuite.scala
+@@ -20,7 +20,7 @@ package org.apache.spark.sql.streaming
+ import org.scalatest.BeforeAndAfter
+ import org.scalatest.concurrent.PatienceConfiguration.Timeout
+ 
+-import org.apache.spark.sql.SaveMode
++import org.apache.spark.sql.{IgnoreCometNativeDataFusion, SaveMode}
+ import org.apache.spark.sql.connector.catalog.Identifier
+ import org.apache.spark.sql.execution.streaming.MemoryStream
+ import org.apache.spark.sql.streaming.test.{InMemoryStreamTable, 
InMemoryStreamTableCatalog}
+@@ -42,7 +42,8 @@ class StreamingSelfUnionSuite extends StreamTest with 
BeforeAndAfter {
+     sqlContext.streams.active.foreach(_.stop())
+   }
+ 
+-  test("self-union, DSv1, read via DataStreamReader API") {
++  test("self-union, DSv1, read via DataStreamReader API",
++    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3401";))
 {
+     withTempPath { dir =>
+       val dataLocation = dir.getAbsolutePath
+       spark.range(1, 4).write.format("parquet").save(dataLocation)
+@@ -66,7 +67,8 @@ class StreamingSelfUnionSuite extends StreamTest with 
BeforeAndAfter {
+     }
+   }
+ 
+-  test("self-union, DSv1, read via table API") {
++  test("self-union, DSv1, read via table API",
++    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3401";))
 {
+     withTable("parquet_streaming_tbl") {
+       spark.sql("CREATE TABLE parquet_streaming_tbl (key integer) USING 
parquet")
+ 
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
 index abe606ad9c1..2d930b64cca 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
@@ -2782,7 +3236,7 @@ index abe606ad9c1..2d930b64cca 100644
      val tblTargetName = "tbl_target"
      val tblSourceQualified = s"default.$tblSourceName"
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
-index e937173a590..ca06132102d 100644
+index e937173a590..7d20538bc68 100644
 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
 +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
 @@ -27,6 +27,7 @@ import scala.concurrent.duration._
@@ -2801,37 +3255,42 @@ index e937173a590..ca06132102d 100644
  import org.apache.spark.sql.execution.FilterExec
  import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecution
  import org.apache.spark.sql.execution.datasources.DataSourceUtils
-@@ -126,7 +128,28 @@ private[sql] trait SQLTestUtils extends SparkFunSuite 
with SQLTestUtilsBase with
-         }
-       }
-     } else {
--      super.test(testName, testTags: _*)(testFun)
-+      if (isCometEnabled && testTags.exists(_.isInstanceOf[IgnoreComet])) {
-+        ignore(testName + " (disabled when Comet is on)", testTags: 
_*)(testFun)
-+      } else {
-+        val cometScanImpl = CometConf.COMET_NATIVE_SCAN_IMPL.get(conf)
-+        val isNativeIcebergCompat = cometScanImpl == 
CometConf.SCAN_NATIVE_ICEBERG_COMPAT ||
-+          cometScanImpl == CometConf.SCAN_AUTO
-+        val isNativeDataFusion = cometScanImpl == 
CometConf.SCAN_NATIVE_DATAFUSION ||
-+          cometScanImpl == CometConf.SCAN_AUTO
-+        if (isCometEnabled && isNativeIcebergCompat &&
-+          testTags.exists(_.isInstanceOf[IgnoreCometNativeIcebergCompat])) {
-+          ignore(testName + " (disabled for NATIVE_ICEBERG_COMPAT)", 
testTags: _*)(testFun)
-+        } else if (isCometEnabled && isNativeDataFusion &&
-+          testTags.exists(_.isInstanceOf[IgnoreCometNativeDataFusion])) {
-+          ignore(testName + " (disabled for NATIVE_DATAFUSION)", testTags: 
_*)(testFun)
-+        } else if (isCometEnabled && (isNativeDataFusion || 
isNativeIcebergCompat) &&
-+          testTags.exists(_.isInstanceOf[IgnoreCometNativeScan])) {
-+          ignore(testName + " (disabled for NATIVE_DATAFUSION and 
NATIVE_ICEBERG_COMPAT)",
-+            testTags: _*)(testFun)
-+        } else {
-+          super.test(testName, testTags: _*)(testFun)
-+        }
-+      }
-     }
-   }
+@@ -119,6 +121,34 @@ private[sql] trait SQLTestUtils extends SparkFunSuite 
with SQLTestUtilsBase with
  
-@@ -242,6 +265,29 @@ private[sql] trait SQLTestUtilsBase
+   override protected def test(testName: String, testTags: Tag*)(testFun: => 
Any)
+       (implicit pos: Position): Unit = {
++    // Check Comet skip tags first, before DisableAdaptiveExecution handling
++    if (isCometEnabled && testTags.exists(_.isInstanceOf[IgnoreComet])) {
++      ignore(testName + " (disabled when Comet is on)", testTags: _*)(testFun)
++      return
++    }
++    if (isCometEnabled) {
++      val cometScanImpl = CometConf.COMET_NATIVE_SCAN_IMPL.get(conf)
++      val isNativeIcebergCompat = cometScanImpl == 
CometConf.SCAN_NATIVE_ICEBERG_COMPAT ||
++        cometScanImpl == CometConf.SCAN_AUTO
++      val isNativeDataFusion = cometScanImpl == 
CometConf.SCAN_NATIVE_DATAFUSION ||
++        cometScanImpl == CometConf.SCAN_AUTO
++      if (isNativeIcebergCompat &&
++        testTags.exists(_.isInstanceOf[IgnoreCometNativeIcebergCompat])) {
++        ignore(testName + " (disabled for NATIVE_ICEBERG_COMPAT)", testTags: 
_*)(testFun)
++        return
++      }
++      if (isNativeDataFusion &&
++        testTags.exists(_.isInstanceOf[IgnoreCometNativeDataFusion])) {
++        ignore(testName + " (disabled for NATIVE_DATAFUSION)", testTags: 
_*)(testFun)
++        return
++      }
++      if ((isNativeDataFusion || isNativeIcebergCompat) &&
++        testTags.exists(_.isInstanceOf[IgnoreCometNativeScan])) {
++        ignore(testName + " (disabled for NATIVE_DATAFUSION and 
NATIVE_ICEBERG_COMPAT)",
++          testTags: _*)(testFun)
++        return
++      }
++    }
+     if (testTags.exists(_.isInstanceOf[DisableAdaptiveExecution])) {
+       super.test(testName, testTags: _*) {
+         withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
+@@ -242,6 +272,29 @@ private[sql] trait SQLTestUtilsBase
      protected override def _sqlContext: SQLContext = self.spark.sqlContext
    }
  
@@ -2861,7 +3320,7 @@ index e937173a590..ca06132102d 100644
    protected override def withSQLConf(pairs: (String, String)*)(f: => Unit): 
Unit = {
      SparkSession.setActiveSession(spark)
      super.withSQLConf(pairs: _*)(f)
-@@ -435,6 +481,8 @@ private[sql] trait SQLTestUtilsBase
+@@ -435,6 +488,8 @@ private[sql] trait SQLTestUtilsBase
      val schema = df.schema
      val withoutFilters = df.queryExecution.executedPlan.transform {
        case FilterExec(_, child) => child
@@ -2963,6 +3422,29 @@ index de3b1ffccf0..2a76d127093 100644
  
    override def beforeEach(): Unit = {
      super.beforeEach()
+diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
+index f3be79f9022..b4b1ea8dbc4 100644
+--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
++++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
+@@ -34,7 +34,7 @@ import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectIn
+ import org.apache.hadoop.io.{LongWritable, Writable}
+ 
+ import org.apache.spark.{SparkException, SparkFiles, TestUtils}
+-import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
++import org.apache.spark.sql.{AnalysisException, IgnoreCometNativeDataFusion, 
QueryTest, Row}
+ import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode
+ import org.apache.spark.sql.catalyst.plans.logical.Project
+ import org.apache.spark.sql.execution.WholeStageCodegenExec
+@@ -448,7 +448,8 @@ class HiveUDFSuite extends QueryTest with 
TestHiveSingleton with SQLTestUtils {
+     }
+   }
+ 
+-  test("SPARK-11522 select input_file_name from non-parquet table") {
++  test("SPARK-11522 select input_file_name from non-parquet table",
++    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3312";))
 {
+ 
+     withTempDir { tempDir =>
+ 
 diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
 index 6160c3e5f6c..0956d7d9edc 100644
 --- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala 
b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
index 45faa4d94..4be2fe501 100644
--- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
+++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
@@ -50,7 +50,7 @@ import org.apache.comet.objectstore.NativeConfig
 import org.apache.comet.parquet.{CometParquetScan, Native, SupportsComet}
 import org.apache.comet.parquet.CometParquetUtils.{encryptionEnabled, 
isEncryptionConfigSupported}
 import org.apache.comet.serde.operator.CometNativeScan
-import org.apache.comet.shims.CometTypeShim
+import org.apache.comet.shims.{CometTypeShim, ShimFileFormat}
 
 /**
  * Spark physical optimizer rule for replacing Spark scans with Comet scans.
@@ -193,6 +193,14 @@ case class CometScanRule(session: SparkSession) extends 
Rule[SparkPlan] with Com
       withInfo(scanExec, s"$SCAN_NATIVE_DATAFUSION does not support 
encryption")
       return None
     }
+    if (scanExec.fileConstantMetadataColumns.nonEmpty) {
+      withInfo(scanExec, "Native DataFusion scan does not support metadata 
columns")
+      return None
+    }
+    if 
(ShimFileFormat.findRowIndexColumnIndexInSchema(scanExec.requiredSchema) >= 0) {
+      withInfo(scanExec, "Native DataFusion scan does not support row index 
generation")
+      return None
+    }
     if (!isSchemaSupported(scanExec, SCAN_NATIVE_DATAFUSION, r)) {
       return None
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to