This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new d3ea9fdd5 feat: pass spark.comet.datafusion.* configs through to
DataFusion session (#3455)
d3ea9fdd5 is described below
commit d3ea9fdd505fdda71c8de4b3d2021a4ba1f99ae1
Author: Andy Grove <[email protected]>
AuthorDate: Fri Feb 13 14:00:49 2026 -0700
feat: pass spark.comet.datafusion.* configs through to DataFusion session
(#3455)
---
common/src/main/scala/org/apache/comet/CometConf.scala | 10 ++++++++++
native/core/src/execution/jni_api.rs | 18 ++++++++++++++----
.../scala/org/apache/comet/CometExecIterator.scala | 8 +++++++-
3 files changed, 31 insertions(+), 5 deletions(-)
diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala
b/common/src/main/scala/org/apache/comet/CometConf.scala
index 5ab81ba8d..49eb55479 100644
--- a/common/src/main/scala/org/apache/comet/CometConf.scala
+++ b/common/src/main/scala/org/apache/comet/CometConf.scala
@@ -828,6 +828,16 @@ object CometConf extends ShimCometConf {
.bytesConf(ByteUnit.BYTE)
.createWithDefault(100L * 1024 * 1024 * 1024) // 100 GB
+ val COMET_RESPECT_DATAFUSION_CONFIGS: ConfigEntry[Boolean] =
+ conf(s"$COMET_EXEC_CONFIG_PREFIX.respectDataFusionConfigs")
+ .category(CATEGORY_TESTING)
+ .doc(
+ "Development and testing configuration option to allow DataFusion
configs set in " +
+ "Spark configuration settings starting with
`spark.comet.datafusion.` to be passed " +
+ "into native execution.")
+ .booleanConf
+ .createWithDefault(false)
+
val COMET_STRICT_TESTING: ConfigEntry[Boolean] =
conf(s"$COMET_PREFIX.testing.strict")
.category(CATEGORY_TESTING)
.doc("Experimental option to enable strict testing, which will fail tests
that could be " +
diff --git a/native/core/src/execution/jni_api.rs
b/native/core/src/execution/jni_api.rs
index ed68de5d9..c2cb1d910 100644
--- a/native/core/src/execution/jni_api.rs
+++ b/native/core/src/execution/jni_api.rs
@@ -255,6 +255,7 @@ pub unsafe extern "system" fn
Java_org_apache_comet_Native_createPlan(
local_dirs_vec,
max_temp_directory_size,
task_cpus as usize,
+ &spark_config,
)?;
let plan_creation_time = start.elapsed();
@@ -309,6 +310,7 @@ fn prepare_datafusion_session_context(
local_dirs: Vec<String>,
max_temp_directory_size: u64,
task_cpus: usize,
+ spark_config: &HashMap<String, String>,
) -> CometResult<SessionContext> {
let paths = local_dirs.into_iter().map(PathBuf::from).collect();
let disk_manager = DiskManagerBuilder::default()
@@ -317,10 +319,7 @@ fn prepare_datafusion_session_context(
let mut rt_config =
RuntimeEnvBuilder::new().with_disk_manager_builder(disk_manager);
rt_config = rt_config.with_memory_pool(memory_pool);
- // Get Datafusion configuration from Spark Execution context
- // can be configured in Comet Spark JVM using Spark --conf parameters
- // e.g: spark-shell --conf
spark.datafusion.sql_parser.parse_float_as_decimal=true
- let session_config = SessionConfig::new()
+ let mut session_config = SessionConfig::new()
.with_target_partitions(task_cpus)
// This DataFusion context is within the scope of an executing Spark
Task. We want to set
// its internal parallelism to the number of CPUs allocated to Spark
Tasks. This can be
@@ -337,6 +336,17 @@ fn prepare_datafusion_session_context(
&ScalarValue::Float64(Some(1.1)),
);
+ // Pass through DataFusion configs from Spark.
+ // e.g: spark-shell --conf
spark.comet.datafusion.sql_parser.parse_float_as_decimal=true
+ // becomes datafusion.sql_parser.parse_float_as_decimal=true
+ const SPARK_COMET_DF_PREFIX: &str = "spark.comet.datafusion.";
+ for (key, value) in spark_config {
+ if let Some(df_key) = key.strip_prefix(SPARK_COMET_DF_PREFIX) {
+ let df_key = format!("datafusion.{df_key}");
+ session_config = session_config.set_str(&df_key, value);
+ }
+ }
+
let runtime = rt_config.build()?;
let mut session_ctx = SessionContext::new_with_config_rt(session_config,
Arc::new(runtime));
diff --git a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala
b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala
index 3156eb387..d27f88b49 100644
--- a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala
+++ b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala
@@ -270,7 +270,13 @@ object CometExecIterator extends Logging {
def serializeCometSQLConfs(): Array[Byte] = {
val builder = ConfigMap.newBuilder()
cometSqlConfs.foreach { case (k, v) =>
- builder.putEntries(k, v)
+ if (k.startsWith(s"${CometConf.COMET_PREFIX}.datafusion.")) {
+ if (CometConf.COMET_RESPECT_DATAFUSION_CONFIGS.get(SQLConf.get)) {
+ builder.putEntries(k, v)
+ }
+ } else {
+ builder.putEntries(k, v)
+ }
}
builder.build().toByteArray
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]