This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 5734fe7195 [CORE] Remove Reducer.java in shim layers (#10418)
5734fe7195 is described below
commit 5734fe71956916803e61014ce09f333b06e4caef
Author: Hongze Zhang <[email protected]>
AuthorDate: Tue Aug 12 16:51:49 2025 +0200
[CORE] Remove Reducer.java in shim layers (#10418)
---
.../org/apache/gluten/sql/shims/SparkShims.scala | 4 +-
.../sql/connector/catalog/functions/Reducer.java | 45 ----------------------
.../gluten/sql/shims/spark34/Spark34Shims.scala | 4 +-
.../gluten/sql/shims/spark35/Spark35Shims.scala | 4 +-
.../gluten/sql/shims/spark40/Spark40Shims.scala | 21 ++--------
5 files changed, 7 insertions(+), 71 deletions(-)
diff --git
a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
index e998d321b1..570e3cfef5 100644
--- a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
@@ -35,7 +35,6 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.physical.{Distribution,
Partitioning}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.Table
-import org.apache.spark.sql.connector.catalog.functions.Reducer
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.connector.read.{InputPartition, Scan}
import org.apache.spark.sql.execution._
@@ -252,8 +251,7 @@ trait SparkShims {
commonPartitionValues: Option[Seq[(InternalRow, Int)]],
applyPartialClustering: Boolean,
replicatePartitions: Boolean,
- joinKeyPositions: Option[Seq[Int]] = None,
- reducers: Option[Seq[Option[Reducer[_, _]]]] = None):
Seq[Seq[InputPartition]] =
+ joinKeyPositions: Option[Seq[Int]] = None): Seq[Seq[InputPartition]] =
filteredPartitions
def extractExpressionTimestampAddUnit(timestampAdd: Expression):
Option[Seq[String]] =
diff --git
a/shims/common/src/main/scala/org/apache/spark/sql/connector/catalog/functions/Reducer.java
b/shims/common/src/main/scala/org/apache/spark/sql/connector/catalog/functions/Reducer.java
deleted file mode 100644
index bb5d4557b6..0000000000
---
a/shims/common/src/main/scala/org/apache/spark/sql/connector/catalog/functions/Reducer.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.connector.catalog.functions;
-
-import org.apache.spark.annotation.Evolving;
-
-// Copied from Spark for Gluten temporary use.
-// TODO: remove this and move code depends on this class to shims.
-
-/**
- * A 'reducer' for output of user-defined functions.
- *
- * @see ReducibleFunction
- *
- * A user defined function f_source(x) is 'reducible' on another user_defined
function
- * f_target(x) if
- * <ul>
- * <li> There exists a reducer function r(x) such that r(f_source(x)) =
f_target(x) for
- * all input x, or </li>
- * <li> More generally, there exists reducer functions r1(x) and r2(x) such
that
- * r1(f_source(x)) = r2(f_target(x)) for all input x. </li>
- * </ul>
- *
- * @param <I> reducer input type
- * @param <O> reducer output type
- * @since 4.0.0
- */
-@Evolving
-public interface Reducer<I, O> {
- O reduce(I arg);
-}
diff --git
a/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
b/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
index bf03f3bfd2..08253d088c 100644
---
a/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
+++
b/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
@@ -41,7 +41,6 @@ import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap,
InternalRowComparableWrapper, TimestampFormatter}
import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec
import org.apache.spark.sql.connector.catalog.Table
-import org.apache.spark.sql.connector.catalog.functions.Reducer
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.connector.read.{HasPartitionKey, InputPartition,
Scan}
import org.apache.spark.sql.execution._
@@ -438,8 +437,7 @@ class Spark34Shims extends SparkShims {
commonPartitionValues: Option[Seq[(InternalRow, Int)]],
applyPartialClustering: Boolean,
replicatePartitions: Boolean,
- joinKeyPositions: Option[Seq[Int]] = None,
- reducers: Option[Seq[Option[Reducer[_, _]]]] = None):
Seq[Seq[InputPartition]] = {
+ joinKeyPositions: Option[Seq[Int]] = None): Seq[Seq[InputPartition]] = {
scan match {
case _ if keyGroupedPartitioning.isDefined =>
var finalPartitions = filteredPartitions
diff --git
a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
index 19bb97fcaa..643aed59b1 100644
---
a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
+++
b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
@@ -42,7 +42,6 @@ import org.apache.spark.sql.catalyst.types.DataTypeUtils
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap,
InternalRowComparableWrapper, TimestampFormatter}
import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec
import org.apache.spark.sql.connector.catalog.Table
-import org.apache.spark.sql.connector.catalog.functions.Reducer
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.connector.read.{HasPartitionKey, InputPartition,
Scan}
import org.apache.spark.sql.execution._
@@ -470,8 +469,7 @@ class Spark35Shims extends SparkShims {
commonPartitionValues: Option[Seq[(InternalRow, Int)]],
applyPartialClustering: Boolean,
replicatePartitions: Boolean,
- joinKeyPositions: Option[Seq[Int]] = None,
- reducers: Option[Seq[Option[Reducer[_, _]]]] = None):
Seq[Seq[InputPartition]] = {
+ joinKeyPositions: Option[Seq[Int]] = None): Seq[Seq[InputPartition]] = {
scan match {
case _ if keyGroupedPartitioning.isDefined =>
var finalPartitions = filteredPartitions
diff --git
a/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala
b/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala
index 1cd671f8cf..1c3a2dfdf1 100644
---
a/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala
+++
b/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala
@@ -42,7 +42,6 @@ import org.apache.spark.sql.catalyst.types.DataTypeUtils
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap,
InternalRowComparableWrapper, TimestampFormatter}
import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec
import org.apache.spark.sql.connector.catalog.Table
-import org.apache.spark.sql.connector.catalog.functions.Reducer
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.connector.read.{HasPartitionKey, InputPartition,
Scan}
import org.apache.spark.sql.execution._
@@ -471,8 +470,7 @@ class Spark40Shims extends SparkShims {
commonPartitionValues: Option[Seq[(InternalRow, Int)]],
applyPartialClustering: Boolean,
replicatePartitions: Boolean,
- joinKeyPositions: Option[Seq[Int]] = None,
- reducers: Option[Seq[Option[Reducer[_, _]]]] = None):
Seq[Seq[InputPartition]] = {
+ joinKeyPositions: Option[Seq[Int]] = None): Seq[Seq[InputPartition]] = {
scan match {
case _ if keyGroupedPartitioning.isDefined =>
var finalPartitions = filteredPartitions
@@ -507,20 +505,8 @@ class Spark40Shims extends SparkShims {
}
// Also re-group the partitions if we are reducing compatible
partition expressions
- val finalGroupedPartitions = reducers match {
- case Some(reducers) =>
- val result = groupedPartitions
- .groupBy {
- case (row, _) =>
- KeyGroupedShuffleSpec.reducePartitionValue(row,
partExpressions, reducers)
- }
- .map { case (wrapper, splits) => (wrapper.row,
splits.flatMap(_._2)) }
- .toSeq
- val rowOrdering =
-
RowOrdering.createNaturalAscendingOrdering(partExpressions.map(_.dataType))
- result.sorted(rowOrdering.on((t: (InternalRow, _)) => t._1))
- case _ => groupedPartitions
- }
+ // TODO: Respect Reducer settings?
+ val finalGroupedPartitions = groupedPartitions
// When partially clustered, the input partitions are not grouped
by partition
// values. Here we'll need to check `commonPartitionValues` and
decide how to group
@@ -709,4 +695,5 @@ class Spark40Shims extends SparkShims {
override def widerDecimalType(d1: DecimalType, d2: DecimalType): DecimalType
= {
DecimalPrecisionTypeCoercion.widerDecimalType(d1, d2)
}
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]