peter-toth commented on code in PR #54330: URL: https://github.com/apache/spark/pull/54330#discussion_r2894678169
########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala: ########## @@ -346,43 +348,110 @@ case class CoalescedHashPartitioning(from: HashPartitioning, partitions: Seq[Coa } /** - * Represents a partitioning where rows are split across partitions based on transforms defined - * by `expressions`. `partitionValues`, if defined, should contain value of partition key(s) in - * ascending order, after evaluated by the transforms in `expressions`, for each input partition. - * In addition, its length must be the same as the number of Spark partitions (and thus is a 1-1 - * mapping), and each row in `partitionValues` must be unique. + * Represents a partitioning where rows are split across partitions based on transforms defined by + * `expressions`. + * + * == Usage Forms == + * `KeyedPartitioning` is used in two distinct forms: + * + * 1. '''As outputPartitioning''': When used as a node's output partitioning (e.g., in + * `BatchScanExec` or `GroupPartitionsExec`), the `partitionKeys` are always in sorted order. + * This is how leaf data source nodes produce partition keys originally, and this ordering is + * preserved through `GroupPartitionsExec`. The sorted order is critical for storage-partitioned + * join compatibility. + * + * 2. '''In KeyGroupedShuffleSpec''': When used within `KeyGroupedShuffleSpec`, the `partitionKeys` + * may not be in sorted order. This occurs because `KeyGroupedShuffleSpec` can project the + * partition keys by join key positions. The `EnsureRequirements` rule ensures that either the + * unordered keys from both sides of a join match exactly, or it builds a common ordered set of + * keys and pushes them down to `GroupPartitionsExec` on both sides to establish a compatible + * ordering. + * + * == Partition Keys == + * - `partitionKeys`: The partition keys, one per partition. May contain duplicates initially + * (ungrouped state), but becomes unique after `GroupPartitionsExec` applies grouping. + * + * == Grouping State == + * A KeyedPartitioning can be in two states: + * + * - '''Ungrouped''' (when `isGrouped == false`): `partitionKeys` contains duplicates, meaning + * multiple input partitions share the same key. This occurs when a data source has multiple + * splits for the same partition value. * - * The `originalPartitionValues`, on the other hand, are partition values from the original input - * splits returned by data sources. It may contain duplicated values. + * - '''Grouped''' (when `isGrouped == true`): `partitionKeys` contains only unique values, with + * each partition having a distinct key. This occurs when: (1) a data source natively produces + * unique partition keys, or (2) `GroupPartitionsExec` coalesces partitions with duplicate keys. Review Comment: I got that, but the current behaviour of v2 scans is to produce `KeyGroupedPartitioning` with grouped partitions when the scan reports `org.apache.spark.sql.connector.read.partitioning.KeyGroupedPartitioning`. We could change the scans of a plan in `EnsureRequirements` or in a new rule when we realize that we can't utilize that grouping because we need a shuffle to fulfill a distribution requirement of a join or aggregate. But when we have cached/checkpointed data that has `KeyGroupedPartitioning` output partitioning, and we use that in a second query then we can no longer ungroup the cached / checkpointed partitions. IMO extracting the grouping logic from the v2 scan and inserting a grouping operator when needed (maybe on the top of a cached/checkpointed plan with ungrouped data) not just simplifies the scan but makes the whole v2 partitioning more flexible. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
