cloud-fan commented on PR #54330:
URL: https://github.com/apache/spark/pull/54330#issuecomment-3989510809

   *This review was generated by an AI assistant and reviewed by @cloud-fan.*
   
   Overall the architecture looks great — decoupling partition grouping from 
the scan into a dedicated `GroupPartitionsExec` operator is the right design. 
Here are some issues I noticed:
   
   ### 1. `isGrouped` recomputes on every `.copy()`
   
   `isGrouped` is a `@transient lazy val` on a case class. Every `.copy()` 
(e.g., in `GroupPartitionsExec.outputPartitioning` via `k.copy(...)`) creates a 
new instance, discarding the cache. Then the next `satisfies0` call recomputes 
`distinctBy` over all partition keys — O(n) with wrapper allocations.
   
   **Suggestion:** Make `isGrouped` a constructor parameter with a default:
   ```scala
   case class KeyedPartitioning(
       expressions: Seq[Expression],
       partitionKeys: Seq[InternalRow],
       isGrouped: Boolean = false) ...
   ```
   Then `toGrouped` and `GroupPartitionsExec` set it explicitly to `true`.
   
   ### 2. `toGrouped` may not preserve sorted order
   
   `toGrouped` uses `distinctBy`, which preserves first-occurrence order. The 
class documentation says keys may be unsorted in `KeyGroupedShuffleSpec` after 
projection. If `toGrouped` is called on unsorted keys, the result is grouped 
but unsorted, violating the invariant for `outputPartitioning`.
   
   Currently `GroupedPartitions.unapply` only calls it on 
`plan.outputPartitioning` (which should be sorted), so this works in practice. 
But a defensive sort would prevent future surprises:
   ```scala
   def toGrouped: KeyedPartitioning = {
       val groupedKeys = 
partitionKeys.distinctBy(comparableKeyWrapperFactory).sorted(keyOrdering)
       KeyedPartitioning(expressions, groupedKeys)
   }
   ```
   
   ### 3. `GroupPartitionsExec.outputPartitioning` applies `transform` without 
verifying invariant
   
   The `transform` replaces **all** `KeyedPartitioning` nodes in the child's 
partitioning tree with the same `groupedPartitions.map(_._1)` keys. The comment 
says "they can only differ in `expressions`" but nothing enforces this. If a 
`PartitioningCollection` ever contains `KeyedPartitioning`s with different key 
sets, this would silently assign wrong keys.
   
   **Suggestion:** Add an assertion:
   ```scala
   p.transform {
     case k: KeyedPartitioning =>
       assert(k.numPartitions == keyedPartitioning.numPartitions,
         "All KeyedPartitionings in output must have the same partition keys")
       k.copy(expressions = projectedExpressions, partitionKeys = 
groupedPartitions.map(_._1))
   }
   ```
   
   ### 4. `GroupPartitionsExec` has no `doCanonicalize` override
   
   `GroupPartitionsExec` holds `commonPartitionKeys: Option[Seq[(InternalRow, 
Int)]]` and `reducers`. In AQE, plans are canonicalized to detect equivalent 
stages. Without `doCanonicalize`, two nodes with semantically equal but 
referentially different `InternalRow` keys may not be recognized as equivalent, 
potentially missing stage reuse.
   
   ### 5. `GroupPartitionsExec` doesn't handle columnar execution
   
   If the child is a columnar `BatchScanExec`, `doExecute()` calls 
`child.execute()` which forces row-based execution. There's no 
`supportsColumnar` override or `doExecuteColumnar`, so the columnar chain 
breaks silently. At minimum, an explicit override documenting the intent would 
help:
   ```scala
   override def supportsColumnar: Boolean = false
   ```
   Or better, propagate columnar support via `CoalescedRDD` on 
`child.executeColumnar()`.
   
   ### 6. `equals`/`hashCode` on `KeyedPartitioning` allocates wrappers per key 
on every call
   
   Every `equals`/`hashCode` wraps each key in `InternalRowComparableWrapper`. 
The wrapping is needed for correctness (since 
`UnsafeRow.equals(GenericInternalRow)` returns false even for identical data — 
they only compare within their own type). But allocating wrappers per call is 
wasteful.
   
   **Suggestion:** Pre-wrap keys lazily so `equals`/`hashCode` reuse them:
   ```scala
   @transient lazy val wrappedKeys: Seq[InternalRowComparableWrapper] =
       partitionKeys.map(comparableKeyWrapperFactory)
   
   override def equals(that: Any): Boolean = that match {
       case k: KeyedPartitioning if this.expressions == k.expressions =>
           wrappedKeys == k.wrappedKeys
       case _ => false
   }
   
   override lazy val hashCode: Int = Objects.hash(expressions, wrappedKeys)
   ```
   
   ### 7. Dead variable in `ShuffleExchangeExec`
   
   ```scala
   case p => throw SparkException.internalError(s"Exchange not implemented for 
$newPartitioning")
   ```
   `p` is bound but unused — should be `case _`.
   
   ### 8. `applyGroupPartitions` doesn't handle `GroupPartitionsExec` nested 
under unary nodes
   
   The old `populateCommonPartitionInfo` recursively walked the tree via 
`mapChildren`. The new `applyGroupPartitions` only checks the top level — if a 
`ProjectExec` or `FilterExec` sits between the join and an existing 
`GroupPartitionsExec`, the match falls to `_` and creates a second 
`GroupPartitionsExec` on top, leaving the original orphaned. Either add 
recursive walking for unary nodes, or document the invariant that 
`GroupPartitionsExec` is always the immediate child at this point.
   
   ---
   _This comment was generated with [GitHub MCP](http://go/mcps)._


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to