peter-toth commented on code in PR #54330:
URL: https://github.com/apache/spark/pull/54330#discussion_r2883865467


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala:
##########
@@ -346,43 +348,110 @@ case class CoalescedHashPartitioning(from: 
HashPartitioning, partitions: Seq[Coa
 }
 
 /**
- * Represents a partitioning where rows are split across partitions based on 
transforms defined
- * by `expressions`. `partitionValues`, if defined, should contain value of 
partition key(s) in
- * ascending order, after evaluated by the transforms in `expressions`, for 
each input partition.
- * In addition, its length must be the same as the number of Spark partitions 
(and thus is a 1-1
- * mapping), and each row in `partitionValues` must be unique.
+ * Represents a partitioning where rows are split across partitions based on 
transforms defined by
+ * `expressions`.
+ *
+ * == Usage Forms ==
+ * `KeyedPartitioning` is used in two distinct forms:
+ *
+ * 1. '''As outputPartitioning''': When used as a node's output partitioning 
(e.g., in
+ *    `BatchScanExec` or `GroupPartitionsExec`), the `partitionKeys` are 
always in sorted order.
+ *    This is how leaf data source nodes produce partition keys originally, 
and this ordering is
+ *    preserved through `GroupPartitionsExec`. The sorted order is critical 
for storage-partitioned
+ *    join compatibility.
+ *
+ * 2. '''In KeyGroupedShuffleSpec''': When used within 
`KeyGroupedShuffleSpec`, the `partitionKeys`
+ *    may not be in sorted order. This occurs because `KeyGroupedShuffleSpec` 
can project the
+ *    partition keys by join key positions. The `EnsureRequirements` rule 
ensures that either the
+ *    unordered keys from both sides of a join match exactly, or it builds a 
common ordered set of
+ *    keys and pushes them down to `GroupPartitionsExec` on both sides to 
establish a compatible
+ *    ordering.
+ *
+ * == Partition Keys ==
+ * - `partitionKeys`: The partition keys, one per partition. May contain 
duplicates initially
+ *   (ungrouped state), but becomes unique after `GroupPartitionsExec` applies 
grouping.
+ *
+ * == Grouping State ==
+ * A KeyedPartitioning can be in two states:
+ *
+ * - '''Ungrouped''' (when `isGrouped == false`): `partitionKeys` contains 
duplicates, meaning

Review Comment:
   Are you referring to that the number of ungrouped keys in a 
`KeyedPartitioning` can be larger than the number of grouped (distinct) keys in 
a previous `KeyGroupedPartitioning`?
   Well there can be more keys in certain cases, or can be the same number of 
keys in others (e.g. partial clustering was allowed).
   Maybe we could pack keys as `partitionKeys: 
Seq[(InternalRowComparableWrapper, Int)]` to store the number of occurrences 
together with a key in `KeyedPartitioning` to reduce its size and minimize the 
discrepancy to the previous `KeyGroupedPartitioning` size.
   
   Disabling `KeyedPartitioning` / `KeyGroupedPartitioning` in scans can 
probably be an orthogonal improvement to this refactor, but we can't detect all 
cases when grouping is not needed. That was a limitation of 
https://github.com/apache/spark/pull/53859.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to