This is an automated email from the ASF dual-hosted git repository.
mbutrovich pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new e3761e013 fix: Fall back on dynamicpruning expressions for
CometIcebergNativeScan (#3335)
e3761e013 is described below
commit e3761e0139a3fef46f7f43e254143b8a97400f63
Author: Matt Butrovich <[email protected]>
AuthorDate: Fri Jan 30 09:17:14 2026 -0500
fix: Fall back on dynamicpruning expressions for CometIcebergNativeScan
(#3335)
---
.../org/apache/comet/rules/CometScanRule.scala | 4 ++
.../serde/operator/CometIcebergNativeScan.scala | 13 ++---
.../org/apache/comet/CometIcebergNativeSuite.scala | 66 ++++++++++++++++++++++
3 files changed, 76 insertions(+), 7 deletions(-)
diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
index 4291e3fb6..68a63b6ae 100644
--- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
+++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
@@ -327,6 +327,10 @@ case class CometScanRule(session: SparkSession) extends
Rule[SparkPlan] with Com
case _
if scanExec.scan.getClass.getName ==
"org.apache.iceberg.spark.source.SparkBatchQueryScan" =>
+ if (scanExec.runtimeFilters.exists(isDynamicPruningFilter)) {
+ return withInfo(scanExec, "Dynamic Partition Pruning is not
supported")
+ }
+
val fallbackReasons = new ListBuffer[String]()
// Native Iceberg scan requires both configs to be enabled
diff --git
a/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala
b/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala
index 7238f8ae8..0ad82af8f 100644
---
a/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala
+++
b/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala
@@ -700,10 +700,9 @@ object CometIcebergNativeScan extends
CometOperatorSerde[CometBatchScanExec] wit
// If metadata is None, this is a programming error - metadata should have
been extracted
// in CometScanRule before creating CometBatchScanExec
val metadata = scan.nativeIcebergScanMetadata.getOrElse {
- logError(
+ throw new IllegalStateException(
"Programming error: CometBatchScanExec.nativeIcebergScanMetadata is
None. " +
"Metadata should have been extracted in CometScanRule.")
- return None
}
// Use pre-extracted metadata (no reflection needed)
@@ -979,11 +978,11 @@ object CometIcebergNativeScan extends
CometOperatorSerde[CometBatchScanExec] wit
}
} catch {
case e: Exception =>
- val msg =
- "Iceberg reflection failure: Failed to extract FileScanTasks from
Iceberg scan RDD: " +
- s"${e.getMessage}"
- logError(msg, e)
- return None
+ // CometScanRule already validated this scan should use native
execution.
+ // Failure here is a programming error, not a graceful fallback
scenario.
+ throw new IllegalStateException(
+ s"Native Iceberg scan serialization failed unexpectedly:
${e.getMessage}",
+ e)
}
// Log deduplication summary
diff --git
a/spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala
b/spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala
index 7b58e69c4..f3c8a8b2a 100644
--- a/spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala
@@ -2294,4 +2294,70 @@ class CometIcebergNativeSuite extends CometTestBase with
RESTCatalogHelper {
deleteRecursively(dir)
}
}
+
+ test("runtime filtering - join with dynamic partition pruning") {
+ assume(icebergAvailable, "Iceberg not available")
+ withTempIcebergDir { warehouseDir =>
+ val dimDir = new File(warehouseDir, "dim_parquet")
+ withSQLConf(
+ "spark.sql.catalog.runtime_cat" ->
"org.apache.iceberg.spark.SparkCatalog",
+ "spark.sql.catalog.runtime_cat.type" -> "hadoop",
+ "spark.sql.catalog.runtime_cat.warehouse" ->
warehouseDir.getAbsolutePath,
+ CometConf.COMET_ENABLED.key -> "true",
+ CometConf.COMET_EXEC_ENABLED.key -> "true",
+ CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") {
+
+ // Create partitioned Iceberg table (fact table)
+ spark.sql("""
+ CREATE TABLE runtime_cat.db.fact_table (
+ id BIGINT,
+ data STRING,
+ date DATE
+ ) USING iceberg
+ PARTITIONED BY (date)
+ """)
+
+ // Insert data across multiple partitions
+ spark.sql("""
+ INSERT INTO runtime_cat.db.fact_table VALUES
+ (1, 'a', DATE '1970-01-01'),
+ (2, 'b', DATE '1970-01-02'),
+ (3, 'c', DATE '1970-01-02'),
+ (4, 'd', DATE '1970-01-03')
+ """)
+
+ // Create dimension table (Parquet) in temp directory
+ spark
+ .createDataFrame(Seq((1L, java.sql.Date.valueOf("1970-01-02"))))
+ .toDF("id", "date")
+ .write
+ .parquet(dimDir.getAbsolutePath)
+
spark.read.parquet(dimDir.getAbsolutePath).createOrReplaceTempView("dim")
+
+ // This join should trigger dynamic partition pruning
+ val query =
+ """SELECT f.* FROM runtime_cat.db.fact_table f
+ |JOIN dim d ON f.date = d.date AND d.id = 1
+ |ORDER BY f.id""".stripMargin
+
+ // Verify the initial plan contains dynamic pruning expression
+ val df = spark.sql(query)
+ val initialPlan = df.queryExecution.executedPlan
+ val planStr = initialPlan.toString
+ assert(
+ planStr.contains("dynamicpruning"),
+ s"Expected dynamic pruning in plan but got:\n$planStr")
+
+ // Check results match Spark
+ // Note: AQE re-plans after subquery executes, converting
dynamicpruningexpression(...)
+ // to dynamicpruningexpression(true), which allows native Iceberg scan
to proceed.
+ // This is correct behavior - no actual subquery to wait for after AQE
re-planning.
+ // However, the rest of the still contains non-native operators
because CometExecRule
+ // doesn't run again.
+ checkSparkAnswer(df)
+
+ spark.sql("DROP TABLE runtime_cat.db.fact_table")
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]