This is an automated email from the ASF dual-hosted git repository.

mbutrovich pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new f83f51c06 chore: Invert usingDataSourceExec test helper to 
usingLegacyNativeCometScan (#3310)
f83f51c06 is described below

commit f83f51c06e8466b186f2a1d093a184f107f35af5
Author: Andy Grove <[email protected]>
AuthorDate: Fri Jan 30 08:33:36 2026 -0700

    chore: Invert usingDataSourceExec test helper to usingLegacyNativeCometScan 
(#3310)
    
    * Invert usingDataSourceExec test helper to usingLegacyNativeCometScan 
(#3309)
    
    With native_datafusion enabled in auto scan mode, test helpers that
    check for specific scan config values fail because auto resolves at
    plan time, not config time. Invert the logic so tests check for the
    legacy native_comet mode instead, which is forward-compatible with
    auto and any future scan implementations.
    
    - Rename usingDataSourceExec → usingLegacyNativeCometScan (inverted)
    - Rename usingDataSourceExecWithIncompatTypes → 
hasUnsignedSmallIntSafetyCheck
    - Update all call sites across 11 test files
    
    Co-Authored-By: Claude Opus 4.5 <[email protected]>
    
    * Fix schema evolution test to only expect success for native_datafusion
    
    The inversion from usingDataSourceExec to !usingLegacyNativeCometScan
    incorrectly broadened the condition to include native_iceberg_compat and
    auto modes, which do not support schema evolution (INT32 to bigint).
    
    Co-Authored-By: Claude Opus 4.5 <[email protected]>
    
    ---------
    
    Co-authored-by: Claude Opus 4.5 <[email protected]>
---
 .../test/scala/org/apache/comet/CometCastSuite.scala   |  4 ++--
 .../scala/org/apache/comet/CometExpressionSuite.scala  |  8 ++++----
 .../org/apache/comet/CometFuzzAggregateSuite.scala     | 18 +++++++++---------
 .../scala/org/apache/comet/CometFuzzTestSuite.scala    | 14 +++++++-------
 .../org/apache/comet/CometMapExpressionSuite.scala     |  4 ++--
 .../apache/comet/exec/CometColumnarShuffleSuite.scala  |  2 +-
 .../scala/org/apache/comet/exec/CometJoinSuite.scala   |  2 +-
 .../org/apache/comet/parquet/ParquetReadSuite.scala    | 12 ++++++------
 .../scala/org/apache/spark/sql/CometTestBase.scala     | 13 ++++++-------
 .../spark/sql/comet/ParquetDatetimeRebaseSuite.scala   | 10 +++++-----
 10 files changed, 43 insertions(+), 44 deletions(-)

diff --git a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala 
b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala
index 8a68df382..26bb810b7 100644
--- a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala
@@ -64,7 +64,7 @@ class CometCastSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
   private val timestampPattern = "0123456789/:T" + whitespaceChars
 
   lazy val usingParquetExecWithIncompatTypes: Boolean =
-    usingDataSourceExecWithIncompatTypes(conf)
+    hasUnsignedSmallIntSafetyCheck(conf)
 
   test("all valid cast combinations covered") {
     val names = testNames
@@ -1087,7 +1087,7 @@ class CometCastSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
            |USING parquet
          """.stripMargin)
       sql("INSERT INTO TABLE tab1 SELECT named_struct('col1','1','col2','2')")
-      if (usingDataSourceExec) {
+      if (!usingLegacyNativeCometScan) {
         checkSparkAnswerAndOperator(
           "SELECT CAST(s AS struct<field1:string, field2:string>) AS 
new_struct FROM tab1")
       } else {
diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala 
b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
index fe5ea77a8..1bab18a1a 100644
--- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
@@ -1509,7 +1509,7 @@ class CometExpressionSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
 
   test("round") {
     // https://github.com/apache/datafusion-comet/issues/1441
-    assume(!usingDataSourceExec)
+    assume(usingLegacyNativeCometScan)
     Seq(true, false).foreach { dictionaryEnabled =>
       withTempDir { dir =>
         val path = new Path(dir.toURI.toString, "test.parquet")
@@ -1573,7 +1573,7 @@ class CometExpressionSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
 
   test("hex") {
     // https://github.com/apache/datafusion-comet/issues/1441
-    assume(!usingDataSourceExec)
+    assume(usingLegacyNativeCometScan)
     Seq(true, false).foreach { dictionaryEnabled =>
       withTempDir { dir =>
         val path = new Path(dir.toURI.toString, "hex.parquet")
@@ -2607,7 +2607,7 @@ class CometExpressionSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
   }
 
   test("get_struct_field with DataFusion ParquetExec - read entire struct") {
-    assume(usingDataSourceExec(conf))
+    assume(!usingLegacyNativeCometScan(conf))
     withTempPath { dir =>
       // create input file with Comet disabled
       withSQLConf(CometConf.COMET_ENABLED.key -> "false") {
@@ -2644,7 +2644,7 @@ class CometExpressionSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
   }
 
   test("read array[int] from parquet") {
-    assume(usingDataSourceExec(conf))
+    assume(!usingLegacyNativeCometScan(conf))
 
     withTempPath { dir =>
 // create input file with Comet disabled
diff --git 
a/spark/src/test/scala/org/apache/comet/CometFuzzAggregateSuite.scala 
b/spark/src/test/scala/org/apache/comet/CometFuzzAggregateSuite.scala
index 19812f38c..191ebd908 100644
--- a/spark/src/test/scala/org/apache/comet/CometFuzzAggregateSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometFuzzAggregateSuite.scala
@@ -29,7 +29,7 @@ class CometFuzzAggregateSuite extends CometFuzzTestBase {
     for (col <- df.schema.fields.filterNot(f => 
isComplexType(f.dataType)).map(_.name)) {
       val sql = s"SELECT count(distinct $col) FROM t1"
       val (_, cometPlan) = checkSparkAnswer(sql)
-      if (usingDataSourceExec) {
+      if (!usingLegacyNativeCometScan) {
         assert(1 == collectNativeScans(cometPlan).length)
       }
 
@@ -45,7 +45,7 @@ class CometFuzzAggregateSuite extends CometFuzzTestBase {
     for (col <- df.schema.fields.filter(f => 
isComplexType(f.dataType)).map(_.name)) {
       val sql = s"SELECT count(distinct $col) FROM t1"
       val (_, cometPlan) = checkSparkAnswer(sql)
-      if (usingDataSourceExec) {
+      if (!usingLegacyNativeCometScan) {
         assert(1 == collectNativeScans(cometPlan).length)
       }
     }
@@ -57,7 +57,7 @@ class CometFuzzAggregateSuite extends CometFuzzTestBase {
     for (col <- df.schema.fields.filterNot(f => 
isComplexType(f.dataType)).map(_.name)) {
       val sql = s"SELECT c1, c2, c3, count(distinct $col) FROM t1 group by c1, 
c2, c3"
       val (_, cometPlan) = checkSparkAnswer(sql)
-      if (usingDataSourceExec) {
+      if (!usingLegacyNativeCometScan) {
         assert(1 == collectNativeScans(cometPlan).length)
       }
 
@@ -73,7 +73,7 @@ class CometFuzzAggregateSuite extends CometFuzzTestBase {
     for (col <- df.schema.fields.filter(f => 
isComplexType(f.dataType)).map(_.name)) {
       val sql = s"SELECT c1, c2, c3, count(distinct $col) FROM t1 group by c1, 
c2, c3"
       val (_, cometPlan) = checkSparkAnswer(sql)
-      if (usingDataSourceExec) {
+      if (!usingLegacyNativeCometScan) {
         assert(1 == collectNativeScans(cometPlan).length)
       }
     }
@@ -87,7 +87,7 @@ class CometFuzzAggregateSuite extends CometFuzzTestBase {
     for (col <- df.columns) {
       val sql = s"SELECT c1, c2, c3, count(distinct $col, c4, c5) FROM t1 
group by c1, c2, c3"
       val (_, cometPlan) = checkSparkAnswer(sql)
-      if (usingDataSourceExec) {
+      if (!usingLegacyNativeCometScan) {
         assert(1 == collectNativeScans(cometPlan).length)
       }
     }
@@ -99,7 +99,7 @@ class CometFuzzAggregateSuite extends CometFuzzTestBase {
     for (col <- df.columns) {
       val sql = s"SELECT $col, count(*) FROM t1 GROUP BY $col ORDER BY $col"
       val (_, cometPlan) = checkSparkAnswer(sql)
-      if (usingDataSourceExec) {
+      if (!usingLegacyNativeCometScan) {
         assert(1 == collectNativeScans(cometPlan).length)
       }
     }
@@ -112,7 +112,7 @@ class CometFuzzAggregateSuite extends CometFuzzTestBase {
     for (col <- df.columns.drop(1)) {
       val sql = s"SELECT $groupCol, count($col) FROM t1 GROUP BY $groupCol 
ORDER BY $groupCol"
       val (_, cometPlan) = checkSparkAnswer(sql)
-      if (usingDataSourceExec) {
+      if (!usingLegacyNativeCometScan) {
         assert(1 == collectNativeScans(cometPlan).length)
       }
     }
@@ -126,7 +126,7 @@ class CometFuzzAggregateSuite extends CometFuzzTestBase {
     val sql = s"SELECT $groupCol, count(${otherCol.mkString(", ")}) FROM t1 " +
       s"GROUP BY $groupCol ORDER BY $groupCol"
     val (_, cometPlan) = checkSparkAnswer(sql)
-    if (usingDataSourceExec) {
+    if (!usingLegacyNativeCometScan) {
       assert(1 == collectNativeScans(cometPlan).length)
     }
   }
@@ -138,7 +138,7 @@ class CometFuzzAggregateSuite extends CometFuzzTestBase {
       // cannot run fully native due to HashAggregate
       val sql = s"SELECT min($col), max($col) FROM t1"
       val (_, cometPlan) = checkSparkAnswer(sql)
-      if (usingDataSourceExec) {
+      if (!usingLegacyNativeCometScan) {
         assert(1 == collectNativeScans(cometPlan).length)
       }
     }
diff --git a/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala 
b/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala
index 833314a5c..02d13c841 100644
--- a/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala
@@ -37,7 +37,7 @@ class CometFuzzTestSuite extends CometFuzzTestBase {
     val df = spark.read.parquet(filename)
     df.createOrReplaceTempView("t1")
     val sql = "SELECT * FROM t1"
-    if (usingDataSourceExec) {
+    if (!usingLegacyNativeCometScan) {
       checkSparkAnswerAndOperator(sql)
     } else {
       checkSparkAnswer(sql)
@@ -59,7 +59,7 @@ class CometFuzzTestSuite extends CometFuzzTestBase {
     val df = spark.read.parquet(filename)
     df.createOrReplaceTempView("t1")
     val sql = "SELECT * FROM t1 LIMIT 500"
-    if (usingDataSourceExec) {
+    if (!usingLegacyNativeCometScan) {
       checkSparkAnswerAndOperator(sql)
     } else {
       checkSparkAnswer(sql)
@@ -112,7 +112,7 @@ class CometFuzzTestSuite extends CometFuzzTestBase {
             s"alter table t2 add column col2 $defaultValueType default 
$defaultValueString")
           // Verify that our default value matches Spark's answer
           val sql = "select col2 from t2"
-          if (usingDataSourceExec) {
+          if (!usingLegacyNativeCometScan) {
             checkSparkAnswerAndOperator(sql)
           } else {
             checkSparkAnswer(sql)
@@ -139,7 +139,7 @@ class CometFuzzTestSuite extends CometFuzzTestBase {
       val sql = s"SELECT $col FROM t1 ORDER BY $col"
       // cannot run fully natively due to range partitioning and sort
       val (_, cometPlan) = checkSparkAnswer(sql)
-      if (usingDataSourceExec) {
+      if (!usingLegacyNativeCometScan) {
         assert(1 == collectNativeScans(cometPlan).length)
       }
     }
@@ -152,7 +152,7 @@ class CometFuzzTestSuite extends CometFuzzTestBase {
     val sql = s"SELECT $allCols FROM t1 ORDER BY $allCols"
     // cannot run fully natively due to range partitioning and sort
     val (_, cometPlan) = checkSparkAnswer(sql)
-    if (usingDataSourceExec) {
+    if (!usingLegacyNativeCometScan) {
       assert(1 == collectNativeScans(cometPlan).length)
     }
   }
@@ -207,7 +207,7 @@ class CometFuzzTestSuite extends CometFuzzTestBase {
     val df = spark.read.parquet(filename)
     val df2 = df.repartition(8, df.col("c0")).sort("c1")
     df2.collect()
-    if (usingDataSourceExec) {
+    if (!usingLegacyNativeCometScan) {
       val cometShuffles = 
collectCometShuffleExchanges(df2.queryExecution.executedPlan)
       val expectedNumCometShuffles = CometConf.COMET_NATIVE_SCAN_IMPL.get() 
match {
         case CometConf.SCAN_NATIVE_COMET =>
@@ -233,7 +233,7 @@ class CometFuzzTestSuite extends CometFuzzTestBase {
       // cannot run fully native due to HashAggregate
       val sql = s"SELECT count(*) FROM t1 JOIN t2 ON t1.$col = t2.$col"
       val (_, cometPlan) = checkSparkAnswer(sql)
-      if (usingDataSourceExec) {
+      if (!usingLegacyNativeCometScan) {
         assert(2 == collectNativeScans(cometPlan).length)
       }
     }
diff --git 
a/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala 
b/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala
index 9276a2034..ee77bb80f 100644
--- a/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala
@@ -31,7 +31,7 @@ import org.apache.comet.testing.{DataGenOptions, 
ParquetGenerator, SchemaGenOpti
 class CometMapExpressionSuite extends CometTestBase {
 
   test("read map[int, int] from parquet") {
-    assume(usingDataSourceExec(conf))
+    assume(!usingLegacyNativeCometScan(conf))
 
     withTempPath { dir =>
       // create input file with Comet disabled
@@ -63,7 +63,7 @@ class CometMapExpressionSuite extends CometTestBase {
 
   // repro for https://github.com/apache/datafusion-comet/issues/1754
   test("read map[struct, struct] from parquet") {
-    assume(usingDataSourceExec(conf))
+    assume(!usingLegacyNativeCometScan(conf))
 
     withTempPath { dir =>
       // create input file with Comet disabled
diff --git 
a/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala 
b/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala
index 70479f0e3..ed204ef77 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala
@@ -618,7 +618,7 @@ abstract class CometColumnarShuffleSuite extends 
CometTestBase with AdaptiveSpar
         // TODO: revisit this when we have resolution of 
https://github.com/apache/arrow-rs/issues/7040
         // and https://github.com/apache/arrow-rs/issues/7097
         val fieldsToTest =
-          if (usingDataSourceExec(conf)) {
+          if (!usingLegacyNativeCometScan(conf)) {
             Seq(
               $"_1",
               $"_4",
diff --git a/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala 
b/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala
index d47b4e0c1..6111b9c0d 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala
@@ -199,7 +199,7 @@ class CometJoinSuite extends CometTestBase {
 
   test("HashJoin struct key") {
     // https://github.com/apache/datafusion-comet/issues/1441
-    assume(!usingDataSourceExec)
+    assume(usingLegacyNativeCometScan)
     withSQLConf(
       "spark.sql.join.forceApplyShuffledHashJoin" -> "true",
       SQLConf.PREFER_SORTMERGEJOIN.key -> "false",
diff --git 
a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala 
b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala
index a05bb7c39..e4486e940 100644
--- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala
@@ -92,7 +92,7 @@ abstract class ParquetReadSuite extends CometTestBase {
       // for native iceberg compat, CometScanExec supports some types that 
native_comet does not.
       // note that native_datafusion does not use CometScanExec so we need not 
include that in
       // the check
-      val isDataFusionScan = usingDataSourceExec(conf)
+      val isDataFusionScan = !usingLegacyNativeCometScan(conf)
       Seq(
         NullType -> false,
         BooleanType -> true,
@@ -143,7 +143,7 @@ abstract class ParquetReadSuite extends CometTestBase {
 
       // Arrays support for iceberg compat native and for Parquet V1
       val cometScanExecSupported =
-        if (usingDataSourceExec(conf) && this.isInstanceOf[ParquetReadV1Suite])
+        if (!usingLegacyNativeCometScan(conf) && 
this.isInstanceOf[ParquetReadV1Suite])
           Seq(true, true, true)
         else Seq(true, false, false)
 
@@ -185,7 +185,7 @@ abstract class ParquetReadSuite extends CometTestBase {
             i.toDouble,
             DateTimeUtils.toJavaDate(i))
         }
-        if (!usingDataSourceExecWithIncompatTypes(conf)) {
+        if (!hasUnsignedSmallIntSafetyCheck(conf)) {
           checkParquetScan(data)
         }
         checkParquetFile(data)
@@ -207,7 +207,7 @@ abstract class ParquetReadSuite extends CometTestBase {
             i.toDouble,
             DateTimeUtils.toJavaDate(i))
         }
-        if (!usingDataSourceExecWithIncompatTypes(conf)) {
+        if (!hasUnsignedSmallIntSafetyCheck(conf)) {
           checkParquetScan(data)
         }
         checkParquetFile(data)
@@ -228,7 +228,7 @@ abstract class ParquetReadSuite extends CometTestBase {
         DateTimeUtils.toJavaDate(i))
     }
     val filter = (row: Row) => row.getBoolean(0)
-    if (!usingDataSourceExecWithIncompatTypes(conf)) {
+    if (!hasUnsignedSmallIntSafetyCheck(conf)) {
       checkParquetScan(data, filter)
     }
     checkParquetFile(data, filter)
@@ -1515,7 +1515,7 @@ abstract class ParquetReadSuite extends CometTestBase {
   test("row group skipping doesn't overflow when reading into larger type") {
     // Spark 4.0 no longer fails for widening types SPARK-40876
     // 
https://github.com/apache/spark/commit/3361f25dc0ff6e5233903c26ee105711b79ba967
-    assume(!isSpark40Plus && !usingDataSourceExec(conf))
+    assume(!isSpark40Plus && usingLegacyNativeCometScan(conf))
     withTempPath { path =>
       Seq(0).toDF("a").write.parquet(path.toString)
       // Reading integer 'a' as a long isn't supported. Check that an 
exception is raised instead
diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala 
b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
index 8a2f8af5c..e612d72cc 100644
--- a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
+++ b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
@@ -591,7 +591,7 @@ abstract class CometTestBase
   }
 
   def getPrimitiveTypesParquetSchema: String = {
-    if (usingDataSourceExecWithIncompatTypes(conf)) {
+    if (hasUnsignedSmallIntSafetyCheck(conf)) {
       // Comet complex type reader has different behavior for uint_8, uint_16 
types.
       // The issue stems from undefined behavior in the parquet spec and is 
tracked
       // here: https://github.com/apache/parquet-java/issues/3142
@@ -1268,14 +1268,13 @@ abstract class CometTestBase
     writer.close()
   }
 
-  def usingDataSourceExec: Boolean = usingDataSourceExec(SQLConf.get)
+  def usingLegacyNativeCometScan: Boolean = 
usingLegacyNativeCometScan(SQLConf.get)
 
-  def usingDataSourceExec(conf: SQLConf): Boolean =
-    Seq(CometConf.SCAN_NATIVE_ICEBERG_COMPAT, 
CometConf.SCAN_NATIVE_DATAFUSION).contains(
-      CometConf.COMET_NATIVE_SCAN_IMPL.get(conf))
+  def usingLegacyNativeCometScan(conf: SQLConf): Boolean =
+    CometConf.COMET_NATIVE_SCAN_IMPL.get(conf) == CometConf.SCAN_NATIVE_COMET
 
-  def usingDataSourceExecWithIncompatTypes(conf: SQLConf): Boolean = {
-    usingDataSourceExec(conf) &&
+  def hasUnsignedSmallIntSafetyCheck(conf: SQLConf): Boolean = {
+    !usingLegacyNativeCometScan(conf) &&
     CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK.get(conf)
   }
 }
diff --git 
a/spark/src/test/scala/org/apache/spark/sql/comet/ParquetDatetimeRebaseSuite.scala
 
b/spark/src/test/scala/org/apache/spark/sql/comet/ParquetDatetimeRebaseSuite.scala
index bdb4a9d4b..131423dde 100644
--- 
a/spark/src/test/scala/org/apache/spark/sql/comet/ParquetDatetimeRebaseSuite.scala
+++ 
b/spark/src/test/scala/org/apache/spark/sql/comet/ParquetDatetimeRebaseSuite.scala
@@ -52,7 +52,7 @@ abstract class ParquetDatetimeRebaseSuite extends 
CometTestBase {
           // Parquet file written by 2.4.5 should throw exception for both 
Spark and Comet
           // For Spark 4.0+, Parquet file written by 2.4.5 should not throw 
exception
           if ((exceptionOnRebase || sparkVersion == "2_4_5") && 
(!isSpark40Plus || sparkVersion != "2_4_5") &&
-            !usingDataSourceExec(conf)) {
+            usingLegacyNativeCometScan(conf)) {
             intercept[SparkException](df.collect())
           } else {
             checkSparkNoRebaseAnswer(df)
@@ -63,7 +63,7 @@ abstract class ParquetDatetimeRebaseSuite extends 
CometTestBase {
   }
 
   test("reading ancient timestamps before 1582") {
-    assume(!usingDataSourceExec(conf))
+    assume(usingLegacyNativeCometScan(conf))
     Seq(true, false).foreach { exceptionOnRebase =>
       withSQLConf(
         CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET,
@@ -78,7 +78,7 @@ abstract class ParquetDatetimeRebaseSuite extends 
CometTestBase {
             // Parquet file written by 2.4.5 should throw exception for both 
Spark and Comet
             // For Spark 4.0+, Parquet file written by 2.4.5 should not throw 
exception
             if ((exceptionOnRebase || sparkVersion == "2_4_5") && 
(!isSpark40Plus || sparkVersion != "2_4_5")
-              && !usingDataSourceExec(conf)) {
+              && usingLegacyNativeCometScan(conf)) {
               intercept[SparkException](df.collect())
             } else {
               checkSparkNoRebaseAnswer(df)
@@ -90,7 +90,7 @@ abstract class ParquetDatetimeRebaseSuite extends 
CometTestBase {
   }
 
   test("reading ancient int96 timestamps before 1582") {
-    assume(!usingDataSourceExec(conf))
+    assume(usingLegacyNativeCometScan(conf))
     Seq(true, false).foreach { exceptionOnRebase =>
       withSQLConf(
         CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET,
@@ -105,7 +105,7 @@ abstract class ParquetDatetimeRebaseSuite extends 
CometTestBase {
             // Parquet file written by 2.4.5 should throw exception for both 
Spark and Comet
             // For Spark 4.0+, Parquet file written by 2.4.5 should not throw 
exception
             if ((exceptionOnRebase || sparkVersion == "2_4_5") && 
(!isSpark40Plus || sparkVersion != "2_4_5")
-              && !usingDataSourceExec(conf)) {
+              && usingLegacyNativeCometScan(conf)) {
               intercept[SparkException](df.collect())
             } else {
               checkSparkNoRebaseAnswer(df)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to