This is an automated email from the ASF dual-hosted git repository.
comphead pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 1d4907474 chore: Add Comet writer nested types test assertion (#3480)
1d4907474 is described below
commit 1d49074742f0df147982d0af3c8cbad105fb0c31
Author: Oleks V <[email protected]>
AuthorDate: Wed Feb 11 08:16:11 2026 -0800
chore: Add Comet writer nested types test assertion (#3480)
---
.../comet/parquet/CometParquetWriterSuite.scala | 30 +++++++++++++---------
1 file changed, 18 insertions(+), 12 deletions(-)
diff --git
a/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala
b/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala
index b691039f1..e4c405c00 100644
---
a/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala
+++
b/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala
@@ -447,17 +447,7 @@ class CometParquetWriterSuite extends CometTestBase {
}
}
- private def writeWithCometNativeWriteExec(
- inputPath: String,
- outputPath: String,
- num_partitions: Option[Int] = None): Option[SparkPlan] = {
- val df = spark.read.parquet(inputPath)
-
- val plan = captureWritePlan(
- path => num_partitions.fold(df)(n =>
df.repartition(n)).write.parquet(path),
- outputPath)
-
- // Count CometNativeWriteExec instances in the plan
+ private def assertHasCometNativeWriteExec(plan: SparkPlan): Unit = {
var nativeWriteCount = 0
plan.foreach {
case _: CometNativeWriteExec =>
@@ -474,6 +464,19 @@ class CometParquetWriterSuite extends CometTestBase {
assert(
nativeWriteCount == 1,
s"Expected exactly one CometNativeWriteExec in the plan, but found
$nativeWriteCount:\n${plan.treeString}")
+ }
+
+ private def writeWithCometNativeWriteExec(
+ inputPath: String,
+ outputPath: String,
+ num_partitions: Option[Int] = None): Option[SparkPlan] = {
+ val df = spark.read.parquet(inputPath)
+
+ val plan = captureWritePlan(
+ path => num_partitions.fold(df)(n =>
df.repartition(n)).write.parquet(path),
+ outputPath)
+
+ assertHasCometNativeWriteExec(plan)
Some(plan)
}
@@ -524,7 +527,10 @@ class CometParquetWriterSuite extends CometTestBase {
SQLConf.SESSION_LOCAL_TIMEZONE.key -> "America/Halifax") {
val parquetDf = spark.read.parquet(inputPath)
- parquetDf.write.parquet(outputPath)
+
+ // Capture plan and verify CometNativeWriteExec is used
+ val plan = captureWritePlan(path => parquetDf.write.parquet(path),
outputPath)
+ assertHasCometNativeWriteExec(plan)
}
// Verify round-trip: read with Spark and Comet, compare results
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]