and124578963 opened a new issue, #2389: URL: https://github.com/apache/datafusion-comet/issues/2389
### Describe the bug When running the [TPC-DS query 88](https://github.com/apache/doris/blob/master/tools/tpcds-tools/queries/sf1000/query88.sql) with **"spark.sql.adaptive.enabled": "false"**, it fails with the following error: **java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of partitions: ArrayBuffer(6958, 6975, 6958, 6958).** The issue is resolved by either setting "spark.comet.exec.broadcastHashJoin.enabled": "false" or re-enabling AQE. My hypothesis is that the query plan is built incorrectly, leading to RDDs from different subqueries with varying partition counts being zipped together. The query also runs successfully if rewritten to use a single common SELECT statement or if all subqueries are forced to have identical conditions, which results in RDDs of the same size. Stack Trace ```Problem with SQL: An error occurred while calling o384.showString. : java.util.concurrent.ExecutionException: java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of partitions: ArrayBuffer(6958, 6975, 6958, 6958) at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:205) at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:212) at org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:517) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeBroadcast$1(SparkPlan.scala:208) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:246) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:243) at org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:204) at org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec.prepareBroadcast(BroadcastNestedLoopJoinExec.scala:444) at org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec.codegenInner(BroadcastNestedLoopJoinExec.scala:454) at org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec.doConsume(BroadcastNestedLoopJoinExec.scala:428) at org.apache.spark.sql.execution.CodegenSupport.consume(WholeStageCodegenExec.scala:196) at org.apache.spark.sql.execution.CodegenSupport.consume$(WholeStageCodegenExec.scala:151) at org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec.consume(BroadcastNestedLoopJoinExec.scala:32) at org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec.codegenInner(BroadcastNestedLoopJoinExec.scala:469) at org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec.doConsume(BroadcastNestedLoopJoinExec.scala:428) at org.apache.spark.sql.execution.CodegenSupport.consume(WholeStageCodegenExec.scala:196) at org.apache.spark.sql.execution.CodegenSupport.consume$(WholeStageCodegenExec.scala:151) at org.apache.spark.sql.comet.CometColumnarToRowExec.consume(CometColumnarToRowExec.scala:54) at org.apache.spark.sql.comet.CometColumnarToRowExec.doProduce(CometColumnarToRowExec.scala:277) at org.apache.spark.sql.execution.CodegenSupport.$anonfun$produce$1(WholeStageCodegenExec.scala:97) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:246) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:243) at org.apache.spark.sql.execution.CodegenSupport.produce(WholeStageCodegenExec.scala:92) at org.apache.spark.sql.execution.CodegenSupport.produce$(WholeStageCodegenExec.scala:92) at org.apache.spark.sql.comet.CometColumnarToRowExec.produce(CometColumnarToRowExec.scala:54) at org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec.doProduce(BroadcastNestedLoopJoinExec.scala:423) at org.apache.spark.sql.execution.CodegenSupport.$anonfun$produce$1(WholeStageCodegenExec.scala:97) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:246) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:243) at org.apache.spark.sql.execution.CodegenSupport.produce(WholeStageCodegenExec.scala:92) at org.apache.spark.sql.execution.CodegenSupport.produce$(WholeStageCodegenExec.scala:92) at org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec.produce(BroadcastNestedLoopJoinExec.scala:32) at org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec.doProduce(BroadcastNestedLoopJoinExec.scala:423) at org.apache.spark.sql.execution.CodegenSupport.$anonfun$produce$1(WholeStageCodegenExec.scala:97) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:246) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:243) at org.apache.spark.sql.execution.CodegenSupport.produce(WholeStageCodegenExec.scala:92) at org.apache.spark.sql.execution.CodegenSupport.produce$(WholeStageCodegenExec.scala:92) at org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec.produce(BroadcastNestedLoopJoinExec.scala:32) at org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec.doProduce(BroadcastNestedLoopJoinExec.scala:423) at org.apache.spark.sql.execution.CodegenSupport.$anonfun$produce$1(WholeStageCodegenExec.scala:97) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:246) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:243) at org.apache.spark.sql.execution.CodegenSupport.produce(WholeStageCodegenExec.scala:92) at org.apache.spark.sql.execution.CodegenSupport.produce$(WholeStageCodegenExec.scala:92) at org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec.produce(BroadcastNestedLoopJoinExec.scala:32) at org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec.doProduce(BroadcastNestedLoopJoinExec.scala:423) at org.apache.spark.sql.execution.CodegenSupport.$anonfun$produce$1(WholeStageCodegenExec.scala:97) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:246) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:243) at org.apache.spark.sql.execution.CodegenSupport.produce(WholeStageCodegenExec.scala:92) at org.apache.spark.sql.execution.CodegenSupport.produce$(WholeStageCodegenExec.scala:92) at org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec.produce(BroadcastNestedLoopJoinExec.scala:32) at org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec.doProduce(BroadcastNestedLoopJoinExec.scala:423) at org.apache.spark.sql.execution.CodegenSupport.$anonfun$produce$1(WholeStageCodegenExec.scala:97) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:246) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:243) at org.apache.spark.sql.execution.CodegenSupport.produce(WholeStageCodegenExec.scala:92) at org.apache.spark.sql.execution.CodegenSupport.produce$(WholeStageCodegenExec.scala:92) at org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec.produce(BroadcastNestedLoopJoinExec.scala:32) at org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec.doProduce(BroadcastNestedLoopJoinExec.scala:423) at org.apache.spark.sql.execution.CodegenSupport.$anonfun$produce$1(WholeStageCodegenExec.scala:97) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:246) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:243) at org.apache.spark.sql.execution.CodegenSupport.produce(WholeStageCodegenExec.scala:92) at org.apache.spark.sql.execution.CodegenSupport.produce$(WholeStageCodegenExec.scala:92) at org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec.produce(BroadcastNestedLoopJoinExec.scala:32) at org.apache.spark.sql.execution.ProjectExec.doProduce(basicPhysicalOperators.scala:55) at org.apache.spark.sql.execution.CodegenSupport.$anonfun$produce$1(WholeStageCodegenExec.scala:97) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:246) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:243) at org.apache.spark.sql.execution.CodegenSupport.produce(WholeStageCodegenExec.scala:92) at org.apache.spark.sql.execution.CodegenSupport.produce$(WholeStageCodegenExec.scala:92) at org.apache.spark.sql.execution.ProjectExec.produce(basicPhysicalOperators.scala:42) at org.apache.spark.sql.execution.WholeStageCodegenExec.doCodeGen(WholeStageCodegenExec.scala:660) at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:723) at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:195) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:246) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:243) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:191) at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:364) at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:445) at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4333) at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3316) at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4323) at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546) at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4321) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4321) at org.apache.spark.sql.Dataset.head(Dataset.scala:3316) at org.apache.spark.sql.Dataset.take(Dataset.scala:3539) at org.apache.spark.sql.Dataset.getRows(Dataset.scala:280) at org.apache.spark.sql.Dataset.showString(Dataset.scala:315) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:569) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182) at py4j.ClientServerConnection.run(ClientServerConnection.java:106) at java.base/java.lang.Thread.run(Thread.java:840) Caused by: java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of partitions: ArrayBuffer(6958, 6975, 6958, 6958) at org.apache.spark.rdd.ZippedPartitionsBaseRDD.getPartitions(ZippedPartitionsRDD.scala:58) at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:294) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.rdd.RDD.partitions(RDD.scala:290) at org.apache.spark.rdd.RDD.getNumPartitions(RDD.scala:310) at org.apache.spark.sql.comet.ZippedPartitionsRDD.<init>(ZippedPartitionsRDD.scala:41) at org.apache.spark.sql.comet.ZippedPartitionsRDD$.$anonfun$apply$1(ZippedPartitionsRDD.scala:62) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.sql.comet.ZippedPartitionsRDD$.withScope(ZippedPartitionsRDD.scala:66) at org.apache.spark.sql.comet.ZippedPartitionsRDD$.apply(ZippedPartitionsRDD.scala:62) at org.apache.spark.sql.comet.CometNativeExec.doExecuteColumnar(operators.scala:374) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeColumnar$1(SparkPlan.scala:222) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:246) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:243) at org.apache.spark.sql.execution.SparkPlan.executeColumnar(SparkPlan.scala:218) at org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec.inputRDD$lzycompute(CometShuffleExchangeExec.scala:88) at org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec.inputRDD(CometShuffleExchangeExec.scala:86) at org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec.shuffleDependency$lzycompute(CometShuffleExchangeExec.scala:135) at org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec.shuffleDependency(CometShuffleExchangeExec.scala:132) at org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec.doExecuteColumnar(CometShuffleExchangeExec.scala:186) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeColumnar$1(SparkPlan.scala:222) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:246) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:243) at org.apache.spark.sql.execution.SparkPlan.executeColumnar(SparkPlan.scala:218) at org.apache.spark.sql.comet.CometNativeExec.doExecuteColumnar(operators.scala:300) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeColumnar$1(SparkPlan.scala:222) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:246) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:243) at org.apache.spark.sql.execution.SparkPlan.executeColumnar(SparkPlan.scala:218) at org.apache.spark.sql.execution.InputAdapter.doExecuteColumnar(WholeStageCodegenExec.scala:521) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeColumnar$1(SparkPlan.scala:222) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:246) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:243) at org.apache.spark.sql.execution.SparkPlan.executeColumnar(SparkPlan.scala:218) at org.apache.spark.sql.comet.CometColumnarToRowExec.inputRDDs(CometColumnarToRowExec.scala:306) at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:751) at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:195) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:246) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:243) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:191) at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:364) at org.apache.spark.sql.execution.SparkPlan.executeCollectIterator(SparkPlan.scala:455) at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.$anonfun$relationFuture$1(BroadcastExchangeExec.scala:140) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$2(SQLExecution.scala:224) at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$1(SQLExecution.scala:219) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ... 1 more``` ### Steps to reproduce Set up the environment: - Spark: 3.5.4 - Iceberg: 1.8.1 - Comet: 0.9.1 Use the following Spark configuration ``` spark_conf = { "spark.dynamicAllocation.enabled": "false", "spark.driver.memory": "20g", "spark.driver.cores": "4", "spark.driver.maxResultSize": "18g", "spark.executor.instances": "16", "spark.executor.cores": "5", "spark.executor.memory": "15g", "spark.memory.offHeap.enabled": "true", "spark.memory.offHeap.size": "30g", "spark.executor.memoryOverhead": "5g", "spark.sql.shuffle.partitions": "248", "spark.sql.broadcastTimeout": "1200s", "spark.sql.autoBroadcastJoinThreshold": "200MB", "spark.sql.catalog.spark_catalog.cache.expiration-interval-ms": "-1", "spark.comet.enabled": "true", "spark.plugins": "org.apache.spark.CometPlugin", "spark.shuffle.manager": "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager", "spark.sql.iceberg.parquet.reader-type": "COMET", "spark.sql.iceberg.vectorization.enabled": "true", "spark.comet.exec.replaceSortMergeJoin": "true", "spark.comet.exec.sortMergeJoinWithJoinFilter.enabled": "true", "spark.sql.parquet.filterPushdown": "false", "spark.sql.adaptive.enabled": "false", "spark.shuffle.spill.compress": "true", "spark.comet.explain.native.enabled": "true", "spark.comet.exec.broadcastHashJoin.enabled": "true" } ``` Run the TPC-DS query #88: [Link to query](https://github.com/apache/doris/blob/master/tools/tpcds-tools/queries/sf1000/query88.sql) ### Expected behavior The query should execute successfully without errors, even with AQE disabled. ### Additional context I conducted an experiment by adding logging to CometNativeExec.doExecuteColumnar() ([commit link ](https://github.com/and124578963/datafusion-comet/commit/2b239190aa3225b3a3c975f7c941ff38c1d73a82#diff-2f8452022fd33221b3d78d5c0289e097f2fa9088e10b361eb6cd8878c0584164)). I can provide logs for the following scenarios: 1. Failure with AQE disabled. 2. Successful run with spark.comet.exec.broadcastHashJoin.enabled disabled. 3. Successful run when all subqueries have identical conditions. [1_withProblem.log](https://github.com/user-attachments/files/22301357/1_withProblem.log) [2_noHashNoProblem.log](https://github.com/user-attachments/files/22301362/2_noHashNoProblem.log) [3_noProblemSameParts.log](https://github.com/user-attachments/files/22301363/3_noProblemSameParts.log) Please let me know if any additional information is required. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org