This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 5734fe7195 [CORE] Remove Reducer.java in shim layers (#10418)
5734fe7195 is described below

commit 5734fe71956916803e61014ce09f333b06e4caef
Author: Hongze Zhang <[email protected]>
AuthorDate: Tue Aug 12 16:51:49 2025 +0200

    [CORE] Remove Reducer.java in shim layers (#10418)
---
 .../org/apache/gluten/sql/shims/SparkShims.scala   |  4 +-
 .../sql/connector/catalog/functions/Reducer.java   | 45 ----------------------
 .../gluten/sql/shims/spark34/Spark34Shims.scala    |  4 +-
 .../gluten/sql/shims/spark35/Spark35Shims.scala    |  4 +-
 .../gluten/sql/shims/spark40/Spark40Shims.scala    | 21 ++--------
 5 files changed, 7 insertions(+), 71 deletions(-)

diff --git 
a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala 
b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
index e998d321b1..570e3cfef5 100644
--- a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
@@ -35,7 +35,6 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.plans.physical.{Distribution, 
Partitioning}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.connector.catalog.Table
-import org.apache.spark.sql.connector.catalog.functions.Reducer
 import org.apache.spark.sql.connector.expressions.Transform
 import org.apache.spark.sql.connector.read.{InputPartition, Scan}
 import org.apache.spark.sql.execution._
@@ -252,8 +251,7 @@ trait SparkShims {
       commonPartitionValues: Option[Seq[(InternalRow, Int)]],
       applyPartialClustering: Boolean,
       replicatePartitions: Boolean,
-      joinKeyPositions: Option[Seq[Int]] = None,
-      reducers: Option[Seq[Option[Reducer[_, _]]]] = None): 
Seq[Seq[InputPartition]] =
+      joinKeyPositions: Option[Seq[Int]] = None): Seq[Seq[InputPartition]] =
     filteredPartitions
 
   def extractExpressionTimestampAddUnit(timestampAdd: Expression): 
Option[Seq[String]] =
diff --git 
a/shims/common/src/main/scala/org/apache/spark/sql/connector/catalog/functions/Reducer.java
 
b/shims/common/src/main/scala/org/apache/spark/sql/connector/catalog/functions/Reducer.java
deleted file mode 100644
index bb5d4557b6..0000000000
--- 
a/shims/common/src/main/scala/org/apache/spark/sql/connector/catalog/functions/Reducer.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.connector.catalog.functions;
-
-import org.apache.spark.annotation.Evolving;
-
-// Copied from Spark for Gluten temporary use.
-// TODO: remove this and move code depends on this class to shims.
-
-/**
- * A 'reducer' for output of user-defined functions.
- *
- * @see ReducibleFunction
- *
- * A user defined function f_source(x) is 'reducible' on another user_defined 
function
- * f_target(x) if
- * <ul>
- *   <li> There exists a reducer function r(x) such that r(f_source(x)) = 
f_target(x) for
- *        all input x, or </li>
- *   <li> More generally, there exists reducer functions r1(x) and r2(x) such 
that
- *        r1(f_source(x)) = r2(f_target(x)) for all input x. </li>
- * </ul>
- *
- * @param <I> reducer input type
- * @param <O> reducer output type
- * @since 4.0.0
- */
-@Evolving
-public interface Reducer<I, O> {
-  O reduce(I arg);
-}
diff --git 
a/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
 
b/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
index bf03f3bfd2..08253d088c 100644
--- 
a/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
+++ 
b/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
@@ -41,7 +41,6 @@ import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, 
InternalRowComparableWrapper, TimestampFormatter}
 import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec
 import org.apache.spark.sql.connector.catalog.Table
-import org.apache.spark.sql.connector.catalog.functions.Reducer
 import org.apache.spark.sql.connector.expressions.Transform
 import org.apache.spark.sql.connector.read.{HasPartitionKey, InputPartition, 
Scan}
 import org.apache.spark.sql.execution._
@@ -438,8 +437,7 @@ class Spark34Shims extends SparkShims {
       commonPartitionValues: Option[Seq[(InternalRow, Int)]],
       applyPartialClustering: Boolean,
       replicatePartitions: Boolean,
-      joinKeyPositions: Option[Seq[Int]] = None,
-      reducers: Option[Seq[Option[Reducer[_, _]]]] = None): 
Seq[Seq[InputPartition]] = {
+      joinKeyPositions: Option[Seq[Int]] = None): Seq[Seq[InputPartition]] = {
     scan match {
       case _ if keyGroupedPartitioning.isDefined =>
         var finalPartitions = filteredPartitions
diff --git 
a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
 
b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
index 19bb97fcaa..643aed59b1 100644
--- 
a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
+++ 
b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
@@ -42,7 +42,6 @@ import org.apache.spark.sql.catalyst.types.DataTypeUtils
 import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, 
InternalRowComparableWrapper, TimestampFormatter}
 import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec
 import org.apache.spark.sql.connector.catalog.Table
-import org.apache.spark.sql.connector.catalog.functions.Reducer
 import org.apache.spark.sql.connector.expressions.Transform
 import org.apache.spark.sql.connector.read.{HasPartitionKey, InputPartition, 
Scan}
 import org.apache.spark.sql.execution._
@@ -470,8 +469,7 @@ class Spark35Shims extends SparkShims {
       commonPartitionValues: Option[Seq[(InternalRow, Int)]],
       applyPartialClustering: Boolean,
       replicatePartitions: Boolean,
-      joinKeyPositions: Option[Seq[Int]] = None,
-      reducers: Option[Seq[Option[Reducer[_, _]]]] = None): 
Seq[Seq[InputPartition]] = {
+      joinKeyPositions: Option[Seq[Int]] = None): Seq[Seq[InputPartition]] = {
     scan match {
       case _ if keyGroupedPartitioning.isDefined =>
         var finalPartitions = filteredPartitions
diff --git 
a/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala
 
b/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala
index 1cd671f8cf..1c3a2dfdf1 100644
--- 
a/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala
+++ 
b/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala
@@ -42,7 +42,6 @@ import org.apache.spark.sql.catalyst.types.DataTypeUtils
 import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, 
InternalRowComparableWrapper, TimestampFormatter}
 import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec
 import org.apache.spark.sql.connector.catalog.Table
-import org.apache.spark.sql.connector.catalog.functions.Reducer
 import org.apache.spark.sql.connector.expressions.Transform
 import org.apache.spark.sql.connector.read.{HasPartitionKey, InputPartition, 
Scan}
 import org.apache.spark.sql.execution._
@@ -471,8 +470,7 @@ class Spark40Shims extends SparkShims {
       commonPartitionValues: Option[Seq[(InternalRow, Int)]],
       applyPartialClustering: Boolean,
       replicatePartitions: Boolean,
-      joinKeyPositions: Option[Seq[Int]] = None,
-      reducers: Option[Seq[Option[Reducer[_, _]]]] = None): 
Seq[Seq[InputPartition]] = {
+      joinKeyPositions: Option[Seq[Int]] = None): Seq[Seq[InputPartition]] = {
     scan match {
       case _ if keyGroupedPartitioning.isDefined =>
         var finalPartitions = filteredPartitions
@@ -507,20 +505,8 @@ class Spark40Shims extends SparkShims {
             }
 
             // Also re-group the partitions if we are reducing compatible 
partition expressions
-            val finalGroupedPartitions = reducers match {
-              case Some(reducers) =>
-                val result = groupedPartitions
-                  .groupBy {
-                    case (row, _) =>
-                      KeyGroupedShuffleSpec.reducePartitionValue(row, 
partExpressions, reducers)
-                  }
-                  .map { case (wrapper, splits) => (wrapper.row, 
splits.flatMap(_._2)) }
-                  .toSeq
-                val rowOrdering =
-                  
RowOrdering.createNaturalAscendingOrdering(partExpressions.map(_.dataType))
-                result.sorted(rowOrdering.on((t: (InternalRow, _)) => t._1))
-              case _ => groupedPartitions
-            }
+            // TODO: Respect Reducer settings?
+            val finalGroupedPartitions = groupedPartitions
 
             // When partially clustered, the input partitions are not grouped 
by partition
             // values. Here we'll need to check `commonPartitionValues` and 
decide how to group
@@ -709,4 +695,5 @@ class Spark40Shims extends SparkShims {
   override def widerDecimalType(d1: DecimalType, d2: DecimalType): DecimalType 
= {
     DecimalPrecisionTypeCoercion.widerDecimalType(d1, d2)
   }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to