nimesh1601 opened a new pull request, #50360:
URL: https://github.com/apache/spark/pull/50360

   ### What changes were proposed in this pull request?
   
   In this PR we plan to 
   To check whether the plan exists in the cache or not, CacheManager matches 
the canonicalized version of the plan. Currently, in canonicalized versions, 
CTEIds are not handled, which results in unnecessary cache misses in cases 
where queries using CTE are stored. This issue starts after the commit to 
[Avoid inlining non-deterministic 
With-CTEs](https://github.com/apache/spark/pull/33671/files) in which each 
CTERelationDef and CTERelationRef were introduced and their canonicalization 
was not handled. So to fix this we need to add Canonicalize logic for CTEs too.
   
   ### Why are the changes needed?
   To fix the bug mentioned above.
   
   
   ### Does this PR introduce _any_ user-facing change?
   Yes, this will now let cache cte to be reused 
   Before plan
   ```
   >>>spark.sql("CACHE TABLE cached_cte AS WITH cte1 AS ( SELECT 1 AS id, 
'Alice' AS name UNION ALL SELECT 2 AS id, 'Bob' AS name ), cte2 AS ( SELECT 1 
AS id, 10 AS score UNION ALL SELECT 2 AS id, 20 AS score ) SELECT cte1.id, 
cte1.name, cte2.score FROM cte1 JOIN cte2 ON cte1.id = cte2.id");
   DataFrame[]
   >>> spark.sql("select count(*) from cached_cte").explain()
   == Physical Plan ==
   AdaptiveSparkPlan isFinalPlan=false
   +- HashAggregate(keys=[], functions=[count(1)])
      +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=165]
         +- HashAggregate(keys=[], functions=[partial_count(1)])
            +- Project
               +- BroadcastHashJoin [id#120], [id#124], Inner, BuildRight, false
                  :- Union
                  :  :- Project [1 AS id#120]
                  :  :  +- Scan OneRowRelation[]
                  :  +- Project [2 AS id#122]
                  :     +- Scan OneRowRelation[]
                  +- BroadcastExchange 
HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), 
[plan_id=160]
                     +- Union
                        :- Project [1 AS id#124]
                        :  +- Scan OneRowRelation[]
                        +- Project [2 AS id#126]
                           +- Scan OneRowRelation[]
   ```
    After this change 
   ```
   >>> spark.sql("CACHE TABLE cached_cte AS WITH cte1 AS ( SELECT 1 AS id, 
'Alice' AS name UNION ALL SELECT 2 AS id, 'Bob' AS name ), cte2 AS ( SELECT 1 
AS id, 10 AS score UNION ALL SELECT 2 AS id, 20 AS score ) SELECT cte1.id, 
cte1.name, cte2.score FROM cte1 JOIN cte2 ON cte1.id = cte2.id");
   DataFrame[]
   >>> spark.sql("select count(*) from cached_cte").explain()
   == Physical Plan ==
   AdaptiveSparkPlan isFinalPlan=false
   +- HashAggregate(keys=[], functions=[count(1)])
      +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=116]
         +- HashAggregate(keys=[], functions=[partial_count(1)])
            +- Scan In-memory table cached_cte
                  +- InMemoryRelation [id#128, name#129, score#130], 
StorageLevel(disk, memory, deserialized, 1 replicas)
                        +- *(5) Project [id#8, name#9, score#13]
                           +- *(5) BroadcastHashJoin [id#8], [id#12], Inner, 
BuildRight, false
                              :- Union
                              :  :- *(1) Project [1 AS id#8, Alice AS name#9]
                              :  :  +- *(1) Scan OneRowRelation[]
                              :  +- *(2) Project [2 AS id#10, Bob AS name#11]
                              :     +- *(2) Scan OneRowRelation[]
                              +- BroadcastExchange 
HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), 
[plan_id=60]
                                 +- Union
                                    :- *(3) Project [1 AS id#12, 10 AS score#13]
                                    :  +- *(3) Scan OneRowRelation[]
                                    +- *(4) Project [2 AS id#14, 20 AS score#15]
                                       +- *(4) Scan OneRowRelation[]
   ```
   
   ### How was this patch tested?
   Uts are added
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   No
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to