rishvin opened a new issue, #1831:
URL: https://github.com/apache/datafusion-comet/issues/1831

   ### Describe the bug
   
   The following test cases failed when enabling them (relates to  #1739),
   
   - Runtime bloom filter join: do not add bloom filter if dpp filter exists on 
the same column 
   - Runtime bloom filter join: add bloom filter if dpp filter exists on a 
different column
   
   These two tests failed with the following stacktrace,
   ```
   Custom columnar rules cannot transform shuffle node to something else.
   java.lang.IllegalStateException: Custom columnar rules cannot transform 
shuffle node to something else.
        at 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.newQueryStage(AdaptiveSparkPlanExec.scala:554)
        at 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.createQueryStages(AdaptiveSparkPlanExec.scala:506)
        at 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$createQueryStages$2(AdaptiveSparkPlanExec.scala:536)
        at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
        at scala.collection.Iterator.foreach(Iterator.scala:943)
        at scala.collection.Iterator.foreach$(Iterator.scala:943)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
        at scala.collection.IterableLike.foreach(IterableLike.scala:74)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
        at scala.collection.TraversableLike.map(TraversableLike.scala:286)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
        at scala.collection.AbstractTraversable.map(Traversable.scala:108)
        at 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.createQueryStages(AdaptiveSparkPlanExec.scala:536)
        at 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$createQueryStages$2(AdaptiveSparkPlanExec.scala:536)
        at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
        at scala.collection.Iterator.foreach(Iterator.scala:943)
        at scala.collection.Iterator.foreach$(Iterator.scala:943)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
        at scala.collection.IterableLike.foreach(IterableLike.scala:74)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
        at scala.collection.TraversableLike.map(TraversableLike.scala:286)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
        at scala.collection.AbstractTraversable.map(Traversable.scala:108)
        at 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.createQueryStages(AdaptiveSparkPlanExec.scala:536)
        at 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:247)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
        at 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:242)
        at 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:387)
        at 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:360)
        at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4218)
        at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:3459)
        at 
org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4208)
        at 
org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526)
        at 
org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4206)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
        at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4206)
        at org.apache.spark.sql.Dataset.collect(Dataset.scala:3459)
        at 
org.apache.spark.sql.InjectRuntimeFilterSuite.$anonfun$checkWithAndWithoutFeatureEnabled$1(InjectRuntimeFilterSuite.scala:246)
        at 
org.apache.spark.sql.catalyst.plans.SQLHelper.withSQLConf(SQLHelper.scala:54)
        at 
org.apache.spark.sql.catalyst.plans.SQLHelper.withSQLConf$(SQLHelper.scala:38)
        at 
org.apache.spark.sql.InjectRuntimeFilterSuite.org$apache$spark$sql$test$SQLTestUtilsBase$$super$withSQLConf(InjectRuntimeFilterSuite.scala:31)
        at 
org.apache.spark.sql.test.SQLTestUtilsBase.withSQLConf(SQLTestUtils.scala:275)
        at 
org.apache.spark.sql.test.SQLTestUtilsBase.withSQLConf$(SQLTestUtils.scala:273)
        at 
org.apache.spark.sql.InjectRuntimeFilterSuite.withSQLConf(InjectRuntimeFilterSuite.scala:31)
        at 
org.apache.spark.sql.InjectRuntimeFilterSuite.checkWithAndWithoutFeatureEnabled(InjectRuntimeFilterSuite.scala:244)
        at 
org.apache.spark.sql.InjectRuntimeFilterSuite.assertDidNotRewriteWithBloomFilter(InjectRuntimeFilterSuite.scala:335)
        at 
org.apache.spark.sql.InjectRuntimeFilterSuite.$anonfun$new$29(InjectRuntimeFilterSuite.scala:474)
        at 
org.apache.spark.sql.catalyst.plans.SQLHelper.withSQLConf(SQLHelper.scala:54)
        at 
org.apache.spark.sql.catalyst.plans.SQLHelper.withSQLConf$(SQLHelper.scala:38)
        at 
org.apache.spark.sql.InjectRuntimeFilterSuite.org$apache$spark$sql$test$SQLTestUtilsBase$$super$withSQLConf(InjectRuntimeFilterSuite.scala:31)
   ```
   
   ---
   **Reason**
   
   This test will fail when AQE is enabled. The physical plan before AQE is 
like so,
   ```
   SortMergeJoin [c5#XXX, f5#XXX], [c2#XXX, f2#XXX], Inner
     :- Sort [c5#XXX ASC NULLS FIRST, f5#XXX ASC NULLS FIRST], false, 0
     :  +- Exchange hashpartitioning(c5#XXX, f5#XXX, 5), ENSURE_REQUIREMENTS, 
[plan_id=XXX]
     :     +- Filter isnotnull(c5#XXX)
     :        +- FileScan parquet 
spark_catalog.default.table1[a5#XXX,b5#XXX,c5#XXX,d5#XXX,e5#XXX,f5#XXX] 
Batched: true, DataFilters: [isnotnull(c5#XXX)], Format: Parquet, Location: 
InMemoryFileIndex(14 paths)[file:///.../table1/...,
      PartitionFilters: [isnotnull(f5#XXX), dynamicpruningexpression(f5#XXX IN 
dynamicpruning#XXX)], PushedFilters: [IsNotNull(c5)], ReadSchema: 
struct<a5:int,b5:int,c5:int,d5:int,e5:int>
     :              +- SubqueryAdaptiveBroadcast dynamicpruning#XXX, 1, true, 
Filter ((isnotnull(a2#XXX) AND (a2#XXX = 62)) AND (isnotnull(c2#XXX) AND 
isnotnull(f2#XXX))), [c2#XXX, f2#XXX]
     :                 +- AdaptiveSparkPlan isFinalPlan=false
     :                    +- CometFilter [a2#XXX, b2#XXX, c2#XXX, d2#XXX, 
e2#XXX, f2#XXX], (((isnotnull(a2#XXX) AND (a2#XXX = 62)) AND isnotnull(c2#XXX)) 
AND isnotnull(f2#XXX))
     :                       +- CometScan parquet 
spark_catalog.default.table2[a2#XXX,b2#XXX,c2#XXX,d2#XXX,e2#XXX,f2#XXX] 
Batched: true, DataFilters: [isnotnull(a2#XXX), (a2#XXX = 62), 
isnotnull(c2#XXX), isnotnull(f2#XXX)], Format:
     CometParquet, Location: InMemoryFileIndex(1 paths)[file:///.../table2/..., 
PartitionFilters: [], PushedFilters: [IsNotNull(a2), EqualTo(a2,62), 
IsNotNull(c2), IsNotNull(f2)], ReadSchema:
     struct<a2:int,b2:int,c2:int,d2:int,e2:int,f2:int>
     +- CometSort [a2#XXX, b2#XXX, c2#XXX, d2#XXX, e2#XXX, f2#XXX], [c2#XXX ASC 
NULLS FIRST, f2#XXX ASC NULLS FIRST]
        +- CometExchange hashpartitioning(c2#XXX, f2#XXX, 5), 
ENSURE_REQUIREMENTS, CometNativeShuffle, [plan_id=XXX]
           +- CometFilter [a2#XXX, b2#XXX, c2#XXX, d2#XXX, e2#XXX, f2#XXX], 
(((isnotnull(a2#XXX) AND (a2#XXX = 62)) AND isnotnull(c2#XXX)) AND 
isnotnull(f2#XXX))
              +- CometScan parquet 
spark_catalog.default.table2[a2#XXX,b2#XXX,c2#XXX,d2#XXX,e2#XXX,f2#XXX] 
Batched: true, DataFilters: [isnotnull(a2#XXX), (a2#XXX = 62), 
isnotnull(c2#XXX), isnotnull(f2#XXX)], Format: CometParquet,
     Location: InMemoryFileIndex(1 paths)[file:///.../table2/..., 
PartitionFilters: [], PushedFilters: [IsNotNull(a2), EqualTo(a2,62), 
IsNotNull(c2), IsNotNull(f2)], ReadSchema: 
struct<a2:int,b2:int,c2:int,d2:int,e2:int,f2:int>
   ```
   
   The stacktrace is thrown while AQE is creating query-stages inside the 
method 
[newQueryStage](https://github.com/apache/spark/blob/d3d84e045cc484cf7b70d36410a554238d7aef0e/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala#L552).
 
   
   The method newQueryStage is called after all child stages of the current 
physical nodes have been materialized. At this point the AQE has the 
opportunity to apply rewrite rules to optimize the physical plan. 
   
   Since the following sub-plans doesn't have any child stages, they are 
already-materialized,
   ```
    CreateStageResult(FileScan parquet 
spark_catalog.default.table1[a5#XXX,b5#XXX,c5#XXX,d5#XXX,e5#XXX,f5#XXX] 
Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(14 
paths)[file:///.../table1/..., 
     PartitionFilters: [isnotnull(f5#XXX), dynamicpruningexpression(f5#XXX IN 
dynamicpruning#XXX)], PushedFilters: [], ReadSchema: 
struct<a5:int,b5:int,c5:int,d5:int,e5:int>
        +- SubqueryAdaptiveBroadcast dynamicpruning#XXX, 0, true, Filter 
((isnotnull(a2#XXX) AND (a2#XXX = 62)) AND isnotnull(c2#XXX)), [c2#XXX]
           +- AdaptiveSparkPlan isFinalPlan=false
              +- CometFilter [a2#XXX, b2#XXX, c2#XXX, d2#XXX, e2#XXX, f2#XXX], 
((isnotnull(a2#XXX) AND (a2#XXX = 62)) AND isnotnull(c2#XXX))
                 +- CometScan parquet 
spark_catalog.default.table2[a2#XXX,b2#XXX,c2#XXX,d2#XXX,e2#XXX,f2#XXX] 
Batched: true, DataFilters: [isnotnull(a2#XXX), (a2#XXX = 62), 
isnotnull(c2#XXX)], Format: CometParquet, Location:
     InMemoryFileIndex(1 paths)[file:///.../table2/..., PartitionFilters: [], 
PushedFilters: [IsNotNull(a2), EqualTo(a2,62), IsNotNull(c2)], ReadSchema: 
struct<a2:int,b2:int,c2:int,d2:int,e2:int,f2:int>
     ,true,List())
   ```
   
   
   Then the `newQueryStage` is called with above sub-plan like so,
   ```
   Exchange hashpartitioning(f5#XXX, 5), ENSURE_REQUIREMENTS, [plan_id=XXX]
     +- FileScan parquet 
spark_catalog.default.table1[a5#XXX,b5#XXX,c5#XXX,d5#XXX,e5#XXX,f5#XXX] 
Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(14 
paths)[file:///.../table1/..., PartitionFilters: 
     [isnotnull(f5#XXX), dynamicpruningexpression(f5#XXX IN 
dynamicpruning#XXX)], PushedFilters: [], ReadSchema: 
struct<a5:int,b5:int,c5:int,d5:int,e5:int>
           +- SubqueryAdaptiveBroadcast dynamicpruning#XXX, 0, true, Filter 
((isnotnull(a2#XXX) AND (a2#XXX = 62)) AND isnotnull(c2#XXX)), [c2#XXX]
              +- AdaptiveSparkPlan isFinalPlan=false
                 +- CometFilter [a2#XXX, b2#XXX, c2#XXX, d2#XXX, e2#XXX, 
f2#XXX], ((isnotnull(a2#XXX) AND (a2#XXX = 62)) AND isnotnull(c2#XXX))
                    +- CometScan parquet 
spark_catalog.default.table2[a2#XXX,b2#XXX,c2#XXX,d2#XXX,e2#XXX,f2#XXX] 
Batched: true, DataFilters: [isnotnull(a2#XXX), (a2#XXX = 62), 
isnotnull(c2#XXX)], Format: CometParquet, Location:
     InMemoryFileIndex(1 paths)[file:///.../table2/..., PartitionFilters: [], 
PushedFilters: [IsNotNull(a2), EqualTo(a2,62), IsNotNull(c2)], ReadSchema: 
struct<a2:int,b2:int,c2:int,d2:int,e2:int,f2:int>
   ``` 
   
   Somewhere down the stack in this method, the columnar rules 
[ApplyColumnarRulesAndInsertTransitions](https://github.com/apache/spark/blob/d3d84e045cc484cf7b70d36410a554238d7aef0e/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala#L566)
 will be applied.
   
   Before applying the post-columnar rules, the plan will be wrapped within 
[ColumnarToRowExec](https://github.com/apache/spark/blob/d3d84e045cc484cf7b70d36410a554238d7aef0e/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala#L545).
 Later the plan is passed to the Comet rewriter, where 
`EliminateRedundantTransitions` is applied. 
   
   This rule has the following case,
   ```
    case c @ ColumnarToRowExec(child) if hasCometNativeChild(child) =>
           val op = CometColumnarToRowExec(child)
           if (c.logicalLink.isEmpty) {
             op.unsetTagValue(SparkPlan.LOGICAL_PLAN_TAG)
             op.unsetTagValue(SparkPlan.LOGICAL_PLAN_INHERITED_TAG)
           } else {
             c.logicalLink.foreach(op.setLogicalLink)
           }
           op
   ```
   We can see that it's wrapping the `ColumnarToRowExec` with 
`ComentColumnarToRowExec`.
   
   All this happened within the `newQueryStage` method. The `newQueryStage` 
receives the rewritten plan with top-level physical node being 
`ComentColumnarToRowExec`. Since this node is not an `Exchange` physical node, 
the `newQueryStage` method throws. 
   
   In this test case, the top-level node was expected to be of type 
`ShuffleExchangeLike` but was actually `ComentColumnarToRowExec`. 
   Here, was the rewritten plan that threw,
   ```
   CometColumnarToRow
     +- CometExchange hashpartitioning(c5#XXX, f5#XXX, 5), ENSURE_REQUIREMENTS, 
CometNativeShuffle, [plan_id=XXX]
        +- CometFilter [a5#XXX, b5#XXX, c5#XXX, d5#XXX, e5#XXX, f5#XXX], 
isnotnull(c5#XXX)
           +- CometScan parquet 
spark_catalog.default.table1[a5#XXX,b5#XXX,c5#XXX,d5#XXX,e5#XXX,f5#XXX] 
Batched: true, DataFilters: [isnotnull(c5#XXX)], Format: CometParquet, 
Location: InMemoryFileIndex(14
     paths)[file:///.../table1/..., PartitionFilters: [isnotnull(f5#XXX), 
dynamicpruningexpression(true)], PushedFilters: [IsNotNull(c5)], ReadSchema: 
struct<a5:int,b5:int,c5:int,d5:int,e5:int>
   ```
   
   We can see that `CometColumnarToRow` is top-level to `CometExchange`, which 
is an instance of type `ShuffleExchangeLike`. Ideally, the top-level operator 
should `CometExchange` but it is not. 
   
   Looks like this case needs to handle this case.
   
   Not sure, but it is possible that there might be relation between this and 
#1798 issue.
   
   cc @andygrove 
   
   ### Steps to reproduce
   
   Followed the steps mentioned here 
https://datafusion.apache.org/comet/contributor-guide/spark-sql-tests.html 
   
   Enabled and ran the following tests in `InjectRuntimeFilterSuite.scala`
   
   - Runtime bloom filter join: do not add bloom filter if dpp filter exists on 
the same column
   - Runtime bloom filter join: add bloom filter if dpp filter exists on a 
different column
   
   ### Expected behavior
   
   _No response_
   
   ### Additional context
   
   _No response_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to