nimesh1601 opened a new pull request, #50360: URL: https://github.com/apache/spark/pull/50360
### What changes were proposed in this pull request? In this PR we plan to To check whether the plan exists in the cache or not, CacheManager matches the canonicalized version of the plan. Currently, in canonicalized versions, CTEIds are not handled, which results in unnecessary cache misses in cases where queries using CTE are stored. This issue starts after the commit to [Avoid inlining non-deterministic With-CTEs](https://github.com/apache/spark/pull/33671/files) in which each CTERelationDef and CTERelationRef were introduced and their canonicalization was not handled. So to fix this we need to add Canonicalize logic for CTEs too. ### Why are the changes needed? To fix the bug mentioned above. ### Does this PR introduce _any_ user-facing change? Yes, this will now let cache cte to be reused Before plan ``` >>>spark.sql("CACHE TABLE cached_cte AS WITH cte1 AS ( SELECT 1 AS id, 'Alice' AS name UNION ALL SELECT 2 AS id, 'Bob' AS name ), cte2 AS ( SELECT 1 AS id, 10 AS score UNION ALL SELECT 2 AS id, 20 AS score ) SELECT cte1.id, cte1.name, cte2.score FROM cte1 JOIN cte2 ON cte1.id = cte2.id"); DataFrame[] >>> spark.sql("select count(*) from cached_cte").explain() == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- HashAggregate(keys=[], functions=[count(1)]) +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=165] +- HashAggregate(keys=[], functions=[partial_count(1)]) +- Project +- BroadcastHashJoin [id#120], [id#124], Inner, BuildRight, false :- Union : :- Project [1 AS id#120] : : +- Scan OneRowRelation[] : +- Project [2 AS id#122] : +- Scan OneRowRelation[] +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [plan_id=160] +- Union :- Project [1 AS id#124] : +- Scan OneRowRelation[] +- Project [2 AS id#126] +- Scan OneRowRelation[] ``` After this change ``` >>> spark.sql("CACHE TABLE cached_cte AS WITH cte1 AS ( SELECT 1 AS id, 'Alice' AS name UNION ALL SELECT 2 AS id, 'Bob' AS name ), cte2 AS ( SELECT 1 AS id, 10 AS score UNION ALL SELECT 2 AS id, 20 AS score ) SELECT cte1.id, cte1.name, cte2.score FROM cte1 JOIN cte2 ON cte1.id = cte2.id"); DataFrame[] >>> spark.sql("select count(*) from cached_cte").explain() == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- HashAggregate(keys=[], functions=[count(1)]) +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=116] +- HashAggregate(keys=[], functions=[partial_count(1)]) +- Scan In-memory table cached_cte +- InMemoryRelation [id#128, name#129, score#130], StorageLevel(disk, memory, deserialized, 1 replicas) +- *(5) Project [id#8, name#9, score#13] +- *(5) BroadcastHashJoin [id#8], [id#12], Inner, BuildRight, false :- Union : :- *(1) Project [1 AS id#8, Alice AS name#9] : : +- *(1) Scan OneRowRelation[] : +- *(2) Project [2 AS id#10, Bob AS name#11] : +- *(2) Scan OneRowRelation[] +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [plan_id=60] +- Union :- *(3) Project [1 AS id#12, 10 AS score#13] : +- *(3) Scan OneRowRelation[] +- *(4) Project [2 AS id#14, 20 AS score#15] +- *(4) Scan OneRowRelation[] ``` ### How was this patch tested? Uts are added ### Was this patch authored or co-authored using generative AI tooling? No -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org