cloud-fan commented on code in PR #49144:
URL: https://github.com/apache/spark/pull/49144#discussion_r1905208603


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InsertMapSortExpression.scala:
##########
@@ -77,15 +71,56 @@ object InsertMapSortInGroupingExpressions extends 
Rule[LogicalPlan] {
             }.asInstanceOf[NamedExpression]
         }
         val newChild = Project(child.output ++ exprToMapSort.values, child)
-        val newAgg = Aggregate(newGroupingKeys, newAggregateExprs, newChild)
+        val newAgg = Aggregate(newGroupingKeys, newAggregateExprs, newChild, 
hint)
         newAgg -> agg.output.zip(newAgg.output)
     }
   }
+}
+
+/**
+ * Adds [[MapSort]] to [[RepartitionByExpression]] expressions containing map 
columns,
+ * as the key/value pairs need to be in the correct order before 
repartitioning:
+ *
+ * SELECT * FROM TABLE DISTRIBUTE BY map_column =>
+ * SELECT * FROM TABLE DISTRIBUTE BY map_sort(map_column)
+ */
+object InsertMapSortInRepartitionExpressions extends Rule[LogicalPlan] {
+  import InsertMapSortExpression._
+
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+    plan.transformUpWithPruning(_.containsPattern(REPARTITION_OPERATION)) {
+      case rep @
+        RepartitionByExpression(partitionExprs, child, optNumPartitions, 
optAdvisoryPartitionSize)

Review Comment:
   nit:
   ```
   case rep: RepartitionByExpression =>
     ...
     rep.copy(partitionExprs = newPartitionExprs)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to