cloud-fan commented on PR #54330:
URL: https://github.com/apache/spark/pull/54330#issuecomment-3989510809
*This review was generated by an AI assistant and reviewed by @cloud-fan.*
Overall the architecture looks great — decoupling partition grouping from
the scan into a dedicated `GroupPartitionsExec` operator is the right design.
Here are some issues I noticed:
### 1. `isGrouped` recomputes on every `.copy()`
`isGrouped` is a `@transient lazy val` on a case class. Every `.copy()`
(e.g., in `GroupPartitionsExec.outputPartitioning` via `k.copy(...)`) creates a
new instance, discarding the cache. Then the next `satisfies0` call recomputes
`distinctBy` over all partition keys — O(n) with wrapper allocations.
**Suggestion:** Make `isGrouped` a constructor parameter with a default:
```scala
case class KeyedPartitioning(
expressions: Seq[Expression],
partitionKeys: Seq[InternalRow],
isGrouped: Boolean = false) ...
```
Then `toGrouped` and `GroupPartitionsExec` set it explicitly to `true`.
### 2. `toGrouped` may not preserve sorted order
`toGrouped` uses `distinctBy`, which preserves first-occurrence order. The
class documentation says keys may be unsorted in `KeyGroupedShuffleSpec` after
projection. If `toGrouped` is called on unsorted keys, the result is grouped
but unsorted, violating the invariant for `outputPartitioning`.
Currently `GroupedPartitions.unapply` only calls it on
`plan.outputPartitioning` (which should be sorted), so this works in practice.
But a defensive sort would prevent future surprises:
```scala
def toGrouped: KeyedPartitioning = {
val groupedKeys =
partitionKeys.distinctBy(comparableKeyWrapperFactory).sorted(keyOrdering)
KeyedPartitioning(expressions, groupedKeys)
}
```
### 3. `GroupPartitionsExec.outputPartitioning` applies `transform` without
verifying invariant
The `transform` replaces **all** `KeyedPartitioning` nodes in the child's
partitioning tree with the same `groupedPartitions.map(_._1)` keys. The comment
says "they can only differ in `expressions`" but nothing enforces this. If a
`PartitioningCollection` ever contains `KeyedPartitioning`s with different key
sets, this would silently assign wrong keys.
**Suggestion:** Add an assertion:
```scala
p.transform {
case k: KeyedPartitioning =>
assert(k.numPartitions == keyedPartitioning.numPartitions,
"All KeyedPartitionings in output must have the same partition keys")
k.copy(expressions = projectedExpressions, partitionKeys =
groupedPartitions.map(_._1))
}
```
### 4. `GroupPartitionsExec` has no `doCanonicalize` override
`GroupPartitionsExec` holds `commonPartitionKeys: Option[Seq[(InternalRow,
Int)]]` and `reducers`. In AQE, plans are canonicalized to detect equivalent
stages. Without `doCanonicalize`, two nodes with semantically equal but
referentially different `InternalRow` keys may not be recognized as equivalent,
potentially missing stage reuse.
### 5. `GroupPartitionsExec` doesn't handle columnar execution
If the child is a columnar `BatchScanExec`, `doExecute()` calls
`child.execute()` which forces row-based execution. There's no
`supportsColumnar` override or `doExecuteColumnar`, so the columnar chain
breaks silently. At minimum, an explicit override documenting the intent would
help:
```scala
override def supportsColumnar: Boolean = false
```
Or better, propagate columnar support via `CoalescedRDD` on
`child.executeColumnar()`.
### 6. `equals`/`hashCode` on `KeyedPartitioning` allocates wrappers per key
on every call
Every `equals`/`hashCode` wraps each key in `InternalRowComparableWrapper`.
The wrapping is needed for correctness (since
`UnsafeRow.equals(GenericInternalRow)` returns false even for identical data —
they only compare within their own type). But allocating wrappers per call is
wasteful.
**Suggestion:** Pre-wrap keys lazily so `equals`/`hashCode` reuse them:
```scala
@transient lazy val wrappedKeys: Seq[InternalRowComparableWrapper] =
partitionKeys.map(comparableKeyWrapperFactory)
override def equals(that: Any): Boolean = that match {
case k: KeyedPartitioning if this.expressions == k.expressions =>
wrappedKeys == k.wrappedKeys
case _ => false
}
override lazy val hashCode: Int = Objects.hash(expressions, wrappedKeys)
```
### 7. Dead variable in `ShuffleExchangeExec`
```scala
case p => throw SparkException.internalError(s"Exchange not implemented for
$newPartitioning")
```
`p` is bound but unused — should be `case _`.
### 8. `applyGroupPartitions` doesn't handle `GroupPartitionsExec` nested
under unary nodes
The old `populateCommonPartitionInfo` recursively walked the tree via
`mapChildren`. The new `applyGroupPartitions` only checks the top level — if a
`ProjectExec` or `FilterExec` sits between the join and an existing
`GroupPartitionsExec`, the match falls to `_` and creates a second
`GroupPartitionsExec` on top, leaving the original orphaned. Either add
recursive walking for unary nodes, or document the invariant that
`GroupPartitionsExec` is always the immediate child at this point.
---
_This comment was generated with [GitHub MCP](http://go/mcps)._
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]