This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new d3ea9fdd5 feat: pass spark.comet.datafusion.* configs through to 
DataFusion session (#3455)
d3ea9fdd5 is described below

commit d3ea9fdd505fdda71c8de4b3d2021a4ba1f99ae1
Author: Andy Grove <[email protected]>
AuthorDate: Fri Feb 13 14:00:49 2026 -0700

    feat: pass spark.comet.datafusion.* configs through to DataFusion session 
(#3455)
---
 common/src/main/scala/org/apache/comet/CometConf.scala | 10 ++++++++++
 native/core/src/execution/jni_api.rs                   | 18 ++++++++++++++----
 .../scala/org/apache/comet/CometExecIterator.scala     |  8 +++++++-
 3 files changed, 31 insertions(+), 5 deletions(-)

diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala 
b/common/src/main/scala/org/apache/comet/CometConf.scala
index 5ab81ba8d..49eb55479 100644
--- a/common/src/main/scala/org/apache/comet/CometConf.scala
+++ b/common/src/main/scala/org/apache/comet/CometConf.scala
@@ -828,6 +828,16 @@ object CometConf extends ShimCometConf {
       .bytesConf(ByteUnit.BYTE)
       .createWithDefault(100L * 1024 * 1024 * 1024) // 100 GB
 
+  val COMET_RESPECT_DATAFUSION_CONFIGS: ConfigEntry[Boolean] =
+    conf(s"$COMET_EXEC_CONFIG_PREFIX.respectDataFusionConfigs")
+      .category(CATEGORY_TESTING)
+      .doc(
+        "Development and testing configuration option to allow DataFusion 
configs set in " +
+          "Spark configuration settings starting with 
`spark.comet.datafusion.` to be passed " +
+          "into native execution.")
+      .booleanConf
+      .createWithDefault(false)
+
   val COMET_STRICT_TESTING: ConfigEntry[Boolean] = 
conf(s"$COMET_PREFIX.testing.strict")
     .category(CATEGORY_TESTING)
     .doc("Experimental option to enable strict testing, which will fail tests 
that could be " +
diff --git a/native/core/src/execution/jni_api.rs 
b/native/core/src/execution/jni_api.rs
index ed68de5d9..c2cb1d910 100644
--- a/native/core/src/execution/jni_api.rs
+++ b/native/core/src/execution/jni_api.rs
@@ -255,6 +255,7 @@ pub unsafe extern "system" fn 
Java_org_apache_comet_Native_createPlan(
                 local_dirs_vec,
                 max_temp_directory_size,
                 task_cpus as usize,
+                &spark_config,
             )?;
 
             let plan_creation_time = start.elapsed();
@@ -309,6 +310,7 @@ fn prepare_datafusion_session_context(
     local_dirs: Vec<String>,
     max_temp_directory_size: u64,
     task_cpus: usize,
+    spark_config: &HashMap<String, String>,
 ) -> CometResult<SessionContext> {
     let paths = local_dirs.into_iter().map(PathBuf::from).collect();
     let disk_manager = DiskManagerBuilder::default()
@@ -317,10 +319,7 @@ fn prepare_datafusion_session_context(
     let mut rt_config = 
RuntimeEnvBuilder::new().with_disk_manager_builder(disk_manager);
     rt_config = rt_config.with_memory_pool(memory_pool);
 
-    // Get Datafusion configuration from Spark Execution context
-    // can be configured in Comet Spark JVM using Spark --conf parameters
-    // e.g: spark-shell --conf 
spark.datafusion.sql_parser.parse_float_as_decimal=true
-    let session_config = SessionConfig::new()
+    let mut session_config = SessionConfig::new()
         .with_target_partitions(task_cpus)
         // This DataFusion context is within the scope of an executing Spark 
Task. We want to set
         // its internal parallelism to the number of CPUs allocated to Spark 
Tasks. This can be
@@ -337,6 +336,17 @@ fn prepare_datafusion_session_context(
             &ScalarValue::Float64(Some(1.1)),
         );
 
+    // Pass through DataFusion configs from Spark.
+    // e.g: spark-shell --conf 
spark.comet.datafusion.sql_parser.parse_float_as_decimal=true
+    // becomes datafusion.sql_parser.parse_float_as_decimal=true
+    const SPARK_COMET_DF_PREFIX: &str = "spark.comet.datafusion.";
+    for (key, value) in spark_config {
+        if let Some(df_key) = key.strip_prefix(SPARK_COMET_DF_PREFIX) {
+            let df_key = format!("datafusion.{df_key}");
+            session_config = session_config.set_str(&df_key, value);
+        }
+    }
+
     let runtime = rt_config.build()?;
 
     let mut session_ctx = SessionContext::new_with_config_rt(session_config, 
Arc::new(runtime));
diff --git a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala 
b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala
index 3156eb387..d27f88b49 100644
--- a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala
+++ b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala
@@ -270,7 +270,13 @@ object CometExecIterator extends Logging {
   def serializeCometSQLConfs(): Array[Byte] = {
     val builder = ConfigMap.newBuilder()
     cometSqlConfs.foreach { case (k, v) =>
-      builder.putEntries(k, v)
+      if (k.startsWith(s"${CometConf.COMET_PREFIX}.datafusion.")) {
+        if (CometConf.COMET_RESPECT_DATAFUSION_CONFIGS.get(SQLConf.get)) {
+          builder.putEntries(k, v)
+        }
+      } else {
+        builder.putEntries(k, v)
+      }
     }
     builder.build().toByteArray
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to