Naveen Kumar Kumar Puppala created SPARK-55848:
--------------------------------------------------

             Summary: [SQL] SPJ partial clustering produces incorrect results 
for post-join dropDuplicates and Window dedup operations
                 Key: SPARK-55848
                 URL: https://issues.apache.org/jira/browse/SPARK-55848
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 4.0.1, 4.0.0, 3.5.0, 3.4.0
            Reporter: Naveen Kumar Kumar Puppala


When using Storage-Partitioned Join (SPJ) with 
\{{spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled=true}},
 both \{{dropDuplicates()}} and Window-based dedup (\{{row_number() OVER 
PARTITION BY}}) produce incorrect results. Duplicate rows that should have been 
removed survive in the output.

*Root cause:* Partial clustering (SPARK-42038) splits a partition with many 
files across multiple tasks. After the join, each split task holds a subset of 
rows for the same partition key. The join's output inherits 
\{{KeyGroupedPartitioning}}, which \{{EnsureRequirements}} considers as 
satisfying \{{ClusteredDistribution}}. No Exchange (shuffle) is inserted before 
downstream dedup operators, so each split task independently deduplicates its 
subset — producing inflated results.

SPARK-53074 fixed the pre-join case (operators between scan and join) but the 
post-join case remains unfixed.

*Proposed fix:* Add an \{{isPartiallyClustered}} flag to 
\{{KeyGroupedPartitioning}}. Override \{{satisfies0(ClusteredDistribution)}} to 
return false when partially clustered, causing \{{EnsureRequirements}} to 
insert an Exchange. Plain SPJ joins without dedup remain unaffected.

GitHub issue: https://github.com/apache/spark/issues/54378



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to