This is an automated email from the ASF dual-hosted git repository.

mbutrovich pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new e3761e013 fix: Fall back on dynamicpruning expressions for 
CometIcebergNativeScan (#3335)
e3761e013 is described below

commit e3761e0139a3fef46f7f43e254143b8a97400f63
Author: Matt Butrovich <[email protected]>
AuthorDate: Fri Jan 30 09:17:14 2026 -0500

    fix: Fall back on dynamicpruning expressions for CometIcebergNativeScan 
(#3335)
---
 .../org/apache/comet/rules/CometScanRule.scala     |  4 ++
 .../serde/operator/CometIcebergNativeScan.scala    | 13 ++---
 .../org/apache/comet/CometIcebergNativeSuite.scala | 66 ++++++++++++++++++++++
 3 files changed, 76 insertions(+), 7 deletions(-)

diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala 
b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
index 4291e3fb6..68a63b6ae 100644
--- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
+++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
@@ -327,6 +327,10 @@ case class CometScanRule(session: SparkSession) extends 
Rule[SparkPlan] with Com
       case _
           if scanExec.scan.getClass.getName ==
             "org.apache.iceberg.spark.source.SparkBatchQueryScan" =>
+        if (scanExec.runtimeFilters.exists(isDynamicPruningFilter)) {
+          return withInfo(scanExec, "Dynamic Partition Pruning is not 
supported")
+        }
+
         val fallbackReasons = new ListBuffer[String]()
 
         // Native Iceberg scan requires both configs to be enabled
diff --git 
a/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala
 
b/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala
index 7238f8ae8..0ad82af8f 100644
--- 
a/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala
+++ 
b/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala
@@ -700,10 +700,9 @@ object CometIcebergNativeScan extends 
CometOperatorSerde[CometBatchScanExec] wit
     // If metadata is None, this is a programming error - metadata should have 
been extracted
     // in CometScanRule before creating CometBatchScanExec
     val metadata = scan.nativeIcebergScanMetadata.getOrElse {
-      logError(
+      throw new IllegalStateException(
         "Programming error: CometBatchScanExec.nativeIcebergScanMetadata is 
None. " +
           "Metadata should have been extracted in CometScanRule.")
-      return None
     }
 
     // Use pre-extracted metadata (no reflection needed)
@@ -979,11 +978,11 @@ object CometIcebergNativeScan extends 
CometOperatorSerde[CometBatchScanExec] wit
       }
     } catch {
       case e: Exception =>
-        val msg =
-          "Iceberg reflection failure: Failed to extract FileScanTasks from 
Iceberg scan RDD: " +
-            s"${e.getMessage}"
-        logError(msg, e)
-        return None
+        // CometScanRule already validated this scan should use native 
execution.
+        // Failure here is a programming error, not a graceful fallback 
scenario.
+        throw new IllegalStateException(
+          s"Native Iceberg scan serialization failed unexpectedly: 
${e.getMessage}",
+          e)
     }
 
     // Log deduplication summary
diff --git 
a/spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala 
b/spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala
index 7b58e69c4..f3c8a8b2a 100644
--- a/spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala
@@ -2294,4 +2294,70 @@ class CometIcebergNativeSuite extends CometTestBase with 
RESTCatalogHelper {
       deleteRecursively(dir)
     }
   }
+
+  test("runtime filtering - join with dynamic partition pruning") {
+    assume(icebergAvailable, "Iceberg not available")
+    withTempIcebergDir { warehouseDir =>
+      val dimDir = new File(warehouseDir, "dim_parquet")
+      withSQLConf(
+        "spark.sql.catalog.runtime_cat" -> 
"org.apache.iceberg.spark.SparkCatalog",
+        "spark.sql.catalog.runtime_cat.type" -> "hadoop",
+        "spark.sql.catalog.runtime_cat.warehouse" -> 
warehouseDir.getAbsolutePath,
+        CometConf.COMET_ENABLED.key -> "true",
+        CometConf.COMET_EXEC_ENABLED.key -> "true",
+        CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") {
+
+        // Create partitioned Iceberg table (fact table)
+        spark.sql("""
+          CREATE TABLE runtime_cat.db.fact_table (
+            id BIGINT,
+            data STRING,
+            date DATE
+          ) USING iceberg
+          PARTITIONED BY (date)
+        """)
+
+        // Insert data across multiple partitions
+        spark.sql("""
+          INSERT INTO runtime_cat.db.fact_table VALUES
+          (1, 'a', DATE '1970-01-01'),
+          (2, 'b', DATE '1970-01-02'),
+          (3, 'c', DATE '1970-01-02'),
+          (4, 'd', DATE '1970-01-03')
+        """)
+
+        // Create dimension table (Parquet) in temp directory
+        spark
+          .createDataFrame(Seq((1L, java.sql.Date.valueOf("1970-01-02"))))
+          .toDF("id", "date")
+          .write
+          .parquet(dimDir.getAbsolutePath)
+        
spark.read.parquet(dimDir.getAbsolutePath).createOrReplaceTempView("dim")
+
+        // This join should trigger dynamic partition pruning
+        val query =
+          """SELECT f.* FROM runtime_cat.db.fact_table f
+            |JOIN dim d ON f.date = d.date AND d.id = 1
+            |ORDER BY f.id""".stripMargin
+
+        // Verify the initial plan contains dynamic pruning expression
+        val df = spark.sql(query)
+        val initialPlan = df.queryExecution.executedPlan
+        val planStr = initialPlan.toString
+        assert(
+          planStr.contains("dynamicpruning"),
+          s"Expected dynamic pruning in plan but got:\n$planStr")
+
+        // Check results match Spark
+        // Note: AQE re-plans after subquery executes, converting 
dynamicpruningexpression(...)
+        // to dynamicpruningexpression(true), which allows native Iceberg scan 
to proceed.
+        // This is correct behavior - no actual subquery to wait for after AQE 
re-planning.
+        // However, the rest of the still contains non-native operators 
because CometExecRule
+        // doesn't run again.
+        checkSparkAnswer(df)
+
+        spark.sql("DROP TABLE runtime_cat.db.fact_table")
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to