rishvin opened a new issue, #1831: URL: https://github.com/apache/datafusion-comet/issues/1831
### Describe the bug The following test cases failed when enabling them (relates to #1739), - Runtime bloom filter join: do not add bloom filter if dpp filter exists on the same column - Runtime bloom filter join: add bloom filter if dpp filter exists on a different column These two tests failed with the following stacktrace, ``` Custom columnar rules cannot transform shuffle node to something else. java.lang.IllegalStateException: Custom columnar rules cannot transform shuffle node to something else. at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.newQueryStage(AdaptiveSparkPlanExec.scala:554) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.createQueryStages(AdaptiveSparkPlanExec.scala:506) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$createQueryStages$2(AdaptiveSparkPlanExec.scala:536) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at scala.collection.TraversableLike.map(TraversableLike.scala:286) at scala.collection.TraversableLike.map$(TraversableLike.scala:279) at scala.collection.AbstractTraversable.map(Traversable.scala:108) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.createQueryStages(AdaptiveSparkPlanExec.scala:536) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$createQueryStages$2(AdaptiveSparkPlanExec.scala:536) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at scala.collection.TraversableLike.map(TraversableLike.scala:286) at scala.collection.TraversableLike.map$(TraversableLike.scala:279) at scala.collection.AbstractTraversable.map(Traversable.scala:108) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.createQueryStages(AdaptiveSparkPlanExec.scala:536) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:247) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:242) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:387) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:360) at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4218) at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:3459) at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4208) at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526) at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4206) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4206) at org.apache.spark.sql.Dataset.collect(Dataset.scala:3459) at org.apache.spark.sql.InjectRuntimeFilterSuite.$anonfun$checkWithAndWithoutFeatureEnabled$1(InjectRuntimeFilterSuite.scala:246) at org.apache.spark.sql.catalyst.plans.SQLHelper.withSQLConf(SQLHelper.scala:54) at org.apache.spark.sql.catalyst.plans.SQLHelper.withSQLConf$(SQLHelper.scala:38) at org.apache.spark.sql.InjectRuntimeFilterSuite.org$apache$spark$sql$test$SQLTestUtilsBase$$super$withSQLConf(InjectRuntimeFilterSuite.scala:31) at org.apache.spark.sql.test.SQLTestUtilsBase.withSQLConf(SQLTestUtils.scala:275) at org.apache.spark.sql.test.SQLTestUtilsBase.withSQLConf$(SQLTestUtils.scala:273) at org.apache.spark.sql.InjectRuntimeFilterSuite.withSQLConf(InjectRuntimeFilterSuite.scala:31) at org.apache.spark.sql.InjectRuntimeFilterSuite.checkWithAndWithoutFeatureEnabled(InjectRuntimeFilterSuite.scala:244) at org.apache.spark.sql.InjectRuntimeFilterSuite.assertDidNotRewriteWithBloomFilter(InjectRuntimeFilterSuite.scala:335) at org.apache.spark.sql.InjectRuntimeFilterSuite.$anonfun$new$29(InjectRuntimeFilterSuite.scala:474) at org.apache.spark.sql.catalyst.plans.SQLHelper.withSQLConf(SQLHelper.scala:54) at org.apache.spark.sql.catalyst.plans.SQLHelper.withSQLConf$(SQLHelper.scala:38) at org.apache.spark.sql.InjectRuntimeFilterSuite.org$apache$spark$sql$test$SQLTestUtilsBase$$super$withSQLConf(InjectRuntimeFilterSuite.scala:31) ``` --- **Reason** This test will fail when AQE is enabled. The physical plan before AQE is like so, ``` SortMergeJoin [c5#XXX, f5#XXX], [c2#XXX, f2#XXX], Inner :- Sort [c5#XXX ASC NULLS FIRST, f5#XXX ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(c5#XXX, f5#XXX, 5), ENSURE_REQUIREMENTS, [plan_id=XXX] : +- Filter isnotnull(c5#XXX) : +- FileScan parquet spark_catalog.default.table1[a5#XXX,b5#XXX,c5#XXX,d5#XXX,e5#XXX,f5#XXX] Batched: true, DataFilters: [isnotnull(c5#XXX)], Format: Parquet, Location: InMemoryFileIndex(14 paths)[file:///.../table1/..., PartitionFilters: [isnotnull(f5#XXX), dynamicpruningexpression(f5#XXX IN dynamicpruning#XXX)], PushedFilters: [IsNotNull(c5)], ReadSchema: struct<a5:int,b5:int,c5:int,d5:int,e5:int> : +- SubqueryAdaptiveBroadcast dynamicpruning#XXX, 1, true, Filter ((isnotnull(a2#XXX) AND (a2#XXX = 62)) AND (isnotnull(c2#XXX) AND isnotnull(f2#XXX))), [c2#XXX, f2#XXX] : +- AdaptiveSparkPlan isFinalPlan=false : +- CometFilter [a2#XXX, b2#XXX, c2#XXX, d2#XXX, e2#XXX, f2#XXX], (((isnotnull(a2#XXX) AND (a2#XXX = 62)) AND isnotnull(c2#XXX)) AND isnotnull(f2#XXX)) : +- CometScan parquet spark_catalog.default.table2[a2#XXX,b2#XXX,c2#XXX,d2#XXX,e2#XXX,f2#XXX] Batched: true, DataFilters: [isnotnull(a2#XXX), (a2#XXX = 62), isnotnull(c2#XXX), isnotnull(f2#XXX)], Format: CometParquet, Location: InMemoryFileIndex(1 paths)[file:///.../table2/..., PartitionFilters: [], PushedFilters: [IsNotNull(a2), EqualTo(a2,62), IsNotNull(c2), IsNotNull(f2)], ReadSchema: struct<a2:int,b2:int,c2:int,d2:int,e2:int,f2:int> +- CometSort [a2#XXX, b2#XXX, c2#XXX, d2#XXX, e2#XXX, f2#XXX], [c2#XXX ASC NULLS FIRST, f2#XXX ASC NULLS FIRST] +- CometExchange hashpartitioning(c2#XXX, f2#XXX, 5), ENSURE_REQUIREMENTS, CometNativeShuffle, [plan_id=XXX] +- CometFilter [a2#XXX, b2#XXX, c2#XXX, d2#XXX, e2#XXX, f2#XXX], (((isnotnull(a2#XXX) AND (a2#XXX = 62)) AND isnotnull(c2#XXX)) AND isnotnull(f2#XXX)) +- CometScan parquet spark_catalog.default.table2[a2#XXX,b2#XXX,c2#XXX,d2#XXX,e2#XXX,f2#XXX] Batched: true, DataFilters: [isnotnull(a2#XXX), (a2#XXX = 62), isnotnull(c2#XXX), isnotnull(f2#XXX)], Format: CometParquet, Location: InMemoryFileIndex(1 paths)[file:///.../table2/..., PartitionFilters: [], PushedFilters: [IsNotNull(a2), EqualTo(a2,62), IsNotNull(c2), IsNotNull(f2)], ReadSchema: struct<a2:int,b2:int,c2:int,d2:int,e2:int,f2:int> ``` The stacktrace is thrown while AQE is creating query-stages inside the method [newQueryStage](https://github.com/apache/spark/blob/d3d84e045cc484cf7b70d36410a554238d7aef0e/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala#L552). The method newQueryStage is called after all child stages of the current physical nodes have been materialized. At this point the AQE has the opportunity to apply rewrite rules to optimize the physical plan. Since the following sub-plans doesn't have any child stages, they are already-materialized, ``` CreateStageResult(FileScan parquet spark_catalog.default.table1[a5#XXX,b5#XXX,c5#XXX,d5#XXX,e5#XXX,f5#XXX] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(14 paths)[file:///.../table1/..., PartitionFilters: [isnotnull(f5#XXX), dynamicpruningexpression(f5#XXX IN dynamicpruning#XXX)], PushedFilters: [], ReadSchema: struct<a5:int,b5:int,c5:int,d5:int,e5:int> +- SubqueryAdaptiveBroadcast dynamicpruning#XXX, 0, true, Filter ((isnotnull(a2#XXX) AND (a2#XXX = 62)) AND isnotnull(c2#XXX)), [c2#XXX] +- AdaptiveSparkPlan isFinalPlan=false +- CometFilter [a2#XXX, b2#XXX, c2#XXX, d2#XXX, e2#XXX, f2#XXX], ((isnotnull(a2#XXX) AND (a2#XXX = 62)) AND isnotnull(c2#XXX)) +- CometScan parquet spark_catalog.default.table2[a2#XXX,b2#XXX,c2#XXX,d2#XXX,e2#XXX,f2#XXX] Batched: true, DataFilters: [isnotnull(a2#XXX), (a2#XXX = 62), isnotnull(c2#XXX)], Format: CometParquet, Location: InMemoryFileIndex(1 paths)[file:///.../table2/..., PartitionFilters: [], PushedFilters: [IsNotNull(a2), EqualTo(a2,62), IsNotNull(c2)], ReadSchema: struct<a2:int,b2:int,c2:int,d2:int,e2:int,f2:int> ,true,List()) ``` Then the `newQueryStage` is called with above sub-plan like so, ``` Exchange hashpartitioning(f5#XXX, 5), ENSURE_REQUIREMENTS, [plan_id=XXX] +- FileScan parquet spark_catalog.default.table1[a5#XXX,b5#XXX,c5#XXX,d5#XXX,e5#XXX,f5#XXX] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(14 paths)[file:///.../table1/..., PartitionFilters: [isnotnull(f5#XXX), dynamicpruningexpression(f5#XXX IN dynamicpruning#XXX)], PushedFilters: [], ReadSchema: struct<a5:int,b5:int,c5:int,d5:int,e5:int> +- SubqueryAdaptiveBroadcast dynamicpruning#XXX, 0, true, Filter ((isnotnull(a2#XXX) AND (a2#XXX = 62)) AND isnotnull(c2#XXX)), [c2#XXX] +- AdaptiveSparkPlan isFinalPlan=false +- CometFilter [a2#XXX, b2#XXX, c2#XXX, d2#XXX, e2#XXX, f2#XXX], ((isnotnull(a2#XXX) AND (a2#XXX = 62)) AND isnotnull(c2#XXX)) +- CometScan parquet spark_catalog.default.table2[a2#XXX,b2#XXX,c2#XXX,d2#XXX,e2#XXX,f2#XXX] Batched: true, DataFilters: [isnotnull(a2#XXX), (a2#XXX = 62), isnotnull(c2#XXX)], Format: CometParquet, Location: InMemoryFileIndex(1 paths)[file:///.../table2/..., PartitionFilters: [], PushedFilters: [IsNotNull(a2), EqualTo(a2,62), IsNotNull(c2)], ReadSchema: struct<a2:int,b2:int,c2:int,d2:int,e2:int,f2:int> ``` Somewhere down the stack in this method, the columnar rules [ApplyColumnarRulesAndInsertTransitions](https://github.com/apache/spark/blob/d3d84e045cc484cf7b70d36410a554238d7aef0e/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala#L566) will be applied. Before applying the post-columnar rules, the plan will be wrapped within [ColumnarToRowExec](https://github.com/apache/spark/blob/d3d84e045cc484cf7b70d36410a554238d7aef0e/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala#L545). Later the plan is passed to the Comet rewriter, where `EliminateRedundantTransitions` is applied. This rule has the following case, ``` case c @ ColumnarToRowExec(child) if hasCometNativeChild(child) => val op = CometColumnarToRowExec(child) if (c.logicalLink.isEmpty) { op.unsetTagValue(SparkPlan.LOGICAL_PLAN_TAG) op.unsetTagValue(SparkPlan.LOGICAL_PLAN_INHERITED_TAG) } else { c.logicalLink.foreach(op.setLogicalLink) } op ``` We can see that it's wrapping the `ColumnarToRowExec` with `ComentColumnarToRowExec`. All this happened within the `newQueryStage` method. The `newQueryStage` receives the rewritten plan with top-level physical node being `ComentColumnarToRowExec`. Since this node is not an `Exchange` physical node, the `newQueryStage` method throws. In this test case, the top-level node was expected to be of type `ShuffleExchangeLike` but was actually `ComentColumnarToRowExec`. Here, was the rewritten plan that threw, ``` CometColumnarToRow +- CometExchange hashpartitioning(c5#XXX, f5#XXX, 5), ENSURE_REQUIREMENTS, CometNativeShuffle, [plan_id=XXX] +- CometFilter [a5#XXX, b5#XXX, c5#XXX, d5#XXX, e5#XXX, f5#XXX], isnotnull(c5#XXX) +- CometScan parquet spark_catalog.default.table1[a5#XXX,b5#XXX,c5#XXX,d5#XXX,e5#XXX,f5#XXX] Batched: true, DataFilters: [isnotnull(c5#XXX)], Format: CometParquet, Location: InMemoryFileIndex(14 paths)[file:///.../table1/..., PartitionFilters: [isnotnull(f5#XXX), dynamicpruningexpression(true)], PushedFilters: [IsNotNull(c5)], ReadSchema: struct<a5:int,b5:int,c5:int,d5:int,e5:int> ``` We can see that `CometColumnarToRow` is top-level to `CometExchange`, which is an instance of type `ShuffleExchangeLike`. Ideally, the top-level operator should `CometExchange` but it is not. Looks like this case needs to handle this case. Not sure, but it is possible that there might be relation between this and #1798 issue. cc @andygrove ### Steps to reproduce Followed the steps mentioned here https://datafusion.apache.org/comet/contributor-guide/spark-sql-tests.html Enabled and ran the following tests in `InjectRuntimeFilterSuite.scala` - Runtime bloom filter join: do not add bloom filter if dpp filter exists on the same column - Runtime bloom filter join: add bloom filter if dpp filter exists on a different column ### Expected behavior _No response_ ### Additional context _No response_ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org