This is an automated email from the ASF dual-hosted git repository.
changchen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 9c8641139e [GLUTEN-11550][UT] Remove
GlutenSingleLevelAggregateHashMapSuite, GlutenTwoLevelAggregateHashMapSuite,
GlutenTwoLevelAggregateHashMapWithVectorizedMapSuite (#11571)
9c8641139e is described below
commit 9c8641139ee2ef14f698e7fa827d38c15ceaa374
Author: Kapil Kumar Singh <[email protected]>
AuthorDate: Thu Feb 26 19:33:20 2026 +0530
[GLUTEN-11550][UT] Remove GlutenSingleLevelAggregateHashMapSuite,
GlutenTwoLevelAggregateHashMapSuite,
GlutenTwoLevelAggregateHashMapWithVectorizedMapSuite (#11571)
* enable suites
* Update trait
* Fix for single suite
* restore
* remove suites
* update dataframeaggregationsuite
* Add docs
---
gluten-ut/excluded-spark-uts.md | 6 +
.../gluten/utils/velox/VeloxTestSettings.scala | 5 +-
.../spark/sql/GlutenAggregateHashMapSuite.scala | 29 -----
.../spark/sql/GlutenDataFrameAggregateSuite.scala | 124 ++++++++++++++++++---
.../gluten/utils/velox/VeloxTestSettings.scala | 5 +-
.../spark/sql/GlutenAggregateHashMapSuite.scala | 29 -----
.../spark/sql/GlutenDataFrameAggregateSuite.scala | 123 +++++++++++++++++---
7 files changed, 220 insertions(+), 101 deletions(-)
diff --git a/gluten-ut/excluded-spark-uts.md b/gluten-ut/excluded-spark-uts.md
new file mode 100644
index 0000000000..598eca6493
--- /dev/null
+++ b/gluten-ut/excluded-spark-uts.md
@@ -0,0 +1,6 @@
+# List of excluded Spark UTs
+## This doc. contains list of Spark UTs which are chosen not run with Gluten,
because they checks for Spark specific behavior.
+
+| Suites
| Versions | Reason to exclude
[...]
+|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
[...]
+|
org.apache.spark.sql.SingleLevelAggregateHashMapSuite<br/>org.apache.spark.sql.TwoLevelAggregateHashMapSuite<br/>org.apache.spark.sql.TwoLevelAggregateHashMapWithVectorizedMapSuite
| 4.0, 4.1 | This UTs is similar to `DataFrameAggregateSuite`.<br/>The only
difference being that it contains variation for Spark codegen enabled and
disabled and enabling single level or two level aggregate hash-maps which is
also specific to Spark Aggregate implementation.<br/>We already run
`GlutenDataFra [...]
\ No newline at end of file
diff --git
a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index a2deb7740f..01db251852 100644
---
a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++
b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -770,7 +770,6 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenSSBQuerySuite]
enableSuite[GlutenSessionStateSuite]
// TODO: 4.x enableSuite[GlutenSetCommandSuite] // 1 failure
- // TODO: 4.x enableSuite[GlutenSingleLevelAggregateHashMapSuite] // 1
failure
enableSuite[GlutenSparkSessionBuilderSuite]
// TODO: 4.x enableSuite[GlutenSparkSessionJobTaggingAndCancellationSuite]
// 1 failure
enableSuite[GlutenTPCDSCollationQueryTestSuite]
@@ -786,8 +785,6 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenTPCDSV2_7_PlanStabilityWithStatsSuite]
enableSuite[GlutenTPCHPlanStabilitySuite]
enableSuite[GlutenTPCHQuerySuite]
- // TODO: 4.x enableSuite[GlutenTwoLevelAggregateHashMapSuite] // 1 failure
- // TODO: 4.x
enableSuite[GlutenTwoLevelAggregateHashMapWithVectorizedMapSuite] // 1 failure
enableSuite[GlutenUDFSuite]
enableSuite[GlutenUDTRegistrationSuite]
enableSuite[GlutenUnsafeRowSuite]
@@ -832,7 +829,7 @@ class VeloxTestSettings extends BackendTestSettings {
"SPARK-19471: AggregationIterator does not initialize the generated
result projection" +
" before using it",
// Velox's collect_list / collect_set are by design declarative
aggregate so plan check
- // for ObjectHashAggregateExec will fail.
+ // for ObjectHashAggregateExec will fail. Overridden
"SPARK-22223: ObjectHashAggregate should not introduce unnecessary
shuffle",
"SPARK-31620: agg with subquery (whole-stage-codegen = true)",
"SPARK-31620: agg with subquery (whole-stage-codegen = false)"
diff --git
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/GlutenAggregateHashMapSuite.scala
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/GlutenAggregateHashMapSuite.scala
deleted file mode 100644
index 3f576c9c1a..0000000000
---
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/GlutenAggregateHashMapSuite.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql
-
-class GlutenSingleLevelAggregateHashMapSuite
- extends SingleLevelAggregateHashMapSuite
- with GlutenTestsCommonTrait {}
-
-class GlutenTwoLevelAggregateHashMapSuite
- extends TwoLevelAggregateHashMapSuite
- with GlutenTestsCommonTrait {}
-
-class GlutenTwoLevelAggregateHashMapWithVectorizedMapSuite
- extends TwoLevelAggregateHashMapWithVectorizedMapSuite
- with GlutenTestsCommonTrait {}
diff --git
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/GlutenDataFrameAggregateSuite.scala
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/GlutenDataFrameAggregateSuite.scala
index 2f3777caa1..7e76463129 100644
---
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/GlutenDataFrameAggregateSuite.scala
+++
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/GlutenDataFrameAggregateSuite.scala
@@ -17,10 +17,11 @@
package org.apache.spark.sql
import org.apache.gluten.config.GlutenConfig
-import org.apache.gluten.execution.HashAggregateExecBaseTransformer
+import org.apache.gluten.execution.{HashAggregateExecBaseTransformer,
HashAggregateExecTransformer}
import org.apache.spark.sql.execution.WholeStageCodegenExec
-import org.apache.spark.sql.execution.aggregate.{HashAggregateExec,
SortAggregateExec}
+import org.apache.spark.sql.execution.aggregate.{HashAggregateExec,
ObjectHashAggregateExec, SortAggregateExec}
+import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
@@ -187,23 +188,24 @@ class GlutenDataFrameAggregateSuite extends
DataFrameAggregateSuite with GlutenS
// This test is applicable to velox backend. For CH backend, the replacement
is disabled.
testGluten("use gluten hash agg to replace vanilla spark sort agg") {
+ withTempView("t1") {
+ withSQLConf((GlutenConfig.COLUMNAR_FORCE_HASHAGG_ENABLED.key, "false")) {
+ Seq("A", "B", "C", "D").toDF("col1").createOrReplaceTempView("t1")
+ // SortAggregateExec is expected to be used for string type input.
+ val df = spark.sql("select max(col1) from t1")
+ checkAnswer(df, Row("D") :: Nil)
+
assert(find(df.queryExecution.executedPlan)(_.isInstanceOf[SortAggregateExec]).isDefined)
+ }
- withSQLConf((GlutenConfig.COLUMNAR_FORCE_HASHAGG_ENABLED.key, "false")) {
- Seq("A", "B", "C", "D").toDF("col1").createOrReplaceTempView("t1")
- // SortAggregateExec is expected to be used for string type input.
- val df = spark.sql("select max(col1) from t1")
- checkAnswer(df, Row("D") :: Nil)
-
assert(find(df.queryExecution.executedPlan)(_.isInstanceOf[SortAggregateExec]).isDefined)
- }
-
- withSQLConf((GlutenConfig.COLUMNAR_FORCE_HASHAGG_ENABLED.key, "true")) {
- Seq("A", "B", "C", "D").toDF("col1").createOrReplaceTempView("t1")
- val df = spark.sql("select max(col1) from t1")
- checkAnswer(df, Row("D") :: Nil)
- // Sort agg is expected to be replaced by gluten's hash agg.
- assert(
- find(df.queryExecution.executedPlan)(
- _.isInstanceOf[HashAggregateExecBaseTransformer]).isDefined)
+ withSQLConf((GlutenConfig.COLUMNAR_FORCE_HASHAGG_ENABLED.key, "true")) {
+ Seq("A", "B", "C", "D").toDF("col1").createOrReplaceTempView("t1")
+ val df = spark.sql("select max(col1) from t1")
+ checkAnswer(df, Row("D") :: Nil)
+ // Sort agg is expected to be replaced by gluten's hash agg.
+ assert(
+ find(df.queryExecution.executedPlan)(
+ _.isInstanceOf[HashAggregateExecBaseTransformer]).isDefined)
+ }
}
}
@@ -280,4 +282,90 @@ class GlutenDataFrameAggregateSuite extends
DataFrameAggregateSuite with GlutenS
randn(Random.nextLong())
).foreach(assertNoExceptions)
}
+
+ testGluten("SPARK-22223: ObjectHashAggregate should not introduce
unnecessary shuffle") {
+ withSQLConf(SQLConf.USE_OBJECT_HASH_AGG.key -> "true") {
+ val df = Seq(("1", "2", 1), ("1", "2", 2), ("2", "3", 3), ("2", "3", 4))
+ .toDF("a", "b", "c")
+ .repartition(col("a"))
+
+ val objHashAggDF = df
+ .withColumn("d", expr("(a, b, c)"))
+ .groupBy("a", "b")
+ .agg(collect_list("d").as("e"))
+ .withColumn("f", expr("(b, e)"))
+ .groupBy("a")
+ .agg(collect_list("f").as("g"))
+ val aggPlan = objHashAggDF.queryExecution.executedPlan
+
+ val sortAggPlans = collect(aggPlan) { case sortAgg: SortAggregateExec =>
sortAgg }
+ // SortAggregate will be retained due velox_collect_list
+ assert(sortAggPlans.size == 4)
+
+ val objHashAggPlans = collect(aggPlan) {
+ case objHashAgg: ObjectHashAggregateExec => objHashAgg
+ }
+ assert(objHashAggPlans.isEmpty)
+
+ val exchangePlans = collect(aggPlan) { case shuffle: ShuffleExchangeExec
=> shuffle }
+ assert(exchangePlans.length == 1)
+ }
+ }
+
+ Seq(true, false).foreach {
+ value =>
+ testGluten(s"SPARK-31620: agg with subquery (whole-stage-codegen =
$value)") {
+ withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> value.toString) {
+ withTempView("t1", "t2") {
+ sql("create temporary view t1 as select * from values (1, 2) as
t1(a, b)")
+ sql("create temporary view t2 as select * from values (3, 4) as
t2(c, d)")
+
+ // test without grouping keys
+ checkAnswer(
+ sql("select sum(if(c > (select a from t1), d, 0)) as csum from
t2"),
+ Row(4) :: Nil)
+
+ // test with grouping keys
+ checkAnswer(
+ sql(
+ "select c, sum(if(c > (select a from t1), d, 0)) as csum from
" +
+ "t2 group by c"),
+ Row(3, 4) :: Nil)
+
+ // test with distinct
+ checkAnswer(
+ sql(
+ "select avg(distinct(d)), sum(distinct(if(c > (select a from
t1)," +
+ " d, 0))) as csum from t2 group by c"),
+ Row(4, 4) :: Nil)
+
+ // test subquery with agg
+ checkAnswer(
+ sql(
+ "select sum(distinct(if(c > (select sum(distinct(a)) from
t1)," +
+ " d, 0))) as csum from t2 group by c"),
+ Row(4) :: Nil)
+
+ // test SortAggregateExec
+ // Converts to HashAggregateExecTransformer
+ var df = sql("select max(if(c > (select a from t1), 'str1',
'str2')) as csum from t2")
+ df.collect()
+ assert(
+ find(df.queryExecution.executedPlan)(
+ _.isInstanceOf[HashAggregateExecTransformer]).isDefined)
+ checkAnswer(df, Row("str1") :: Nil)
+
+ // test ObjectHashAggregateExec
+ // Converts to HashAggregateExecTransformer
+ df =
+ sql("select collect_list(d), sum(if(c > (select a from t1), d,
0)) as csum from t2")
+ df.collect()
+ assert(
+ find(df.queryExecution.executedPlan)(
+ _.isInstanceOf[HashAggregateExecTransformer]).isDefined)
+ checkAnswer(df, Row(Array(4), 4) :: Nil)
+ }
+ }
+ }
+ }
}
diff --git
a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index b247659b0d..df4f5766f6 100644
---
a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++
b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -735,7 +735,6 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenSSBQuerySuite]
enableSuite[GlutenSessionStateSuite]
// TODO: 4.x enableSuite[GlutenSetCommandSuite] // 1 failure
- // TODO: 4.x enableSuite[GlutenSingleLevelAggregateHashMapSuite] // 1
failure
enableSuite[GlutenSparkSessionBuilderSuite]
// TODO: 4.x enableSuite[GlutenSparkSessionJobTaggingAndCancellationSuite]
// 1 failure
enableSuite[GlutenTPCDSCollationQueryTestSuite]
@@ -751,8 +750,6 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenTPCDSV2_7_PlanStabilityWithStatsSuite]
enableSuite[GlutenTPCHPlanStabilitySuite]
enableSuite[GlutenTPCHQuerySuite]
- // TODO: 4.x enableSuite[GlutenTwoLevelAggregateHashMapSuite] // 1 failure
- // TODO: 4.x
enableSuite[GlutenTwoLevelAggregateHashMapWithVectorizedMapSuite] // 1 failure
enableSuite[GlutenUDFSuite]
enableSuite[GlutenUDTRegistrationSuite]
enableSuite[GlutenUnsafeRowSuite]
@@ -801,7 +798,7 @@ class VeloxTestSettings extends BackendTestSettings {
"SPARK-19471: AggregationIterator does not initialize the generated
result projection" +
" before using it",
// Velox's collect_list / collect_set are by design declarative
aggregate so plan check
- // for ObjectHashAggregateExec will fail.
+ // for ObjectHashAggregateExec will fail. Overriden
"SPARK-22223: ObjectHashAggregate should not introduce unnecessary
shuffle",
"SPARK-31620: agg with subquery (whole-stage-codegen = true)",
"SPARK-31620: agg with subquery (whole-stage-codegen = false)"
diff --git
a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/GlutenAggregateHashMapSuite.scala
b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/GlutenAggregateHashMapSuite.scala
deleted file mode 100644
index 3f576c9c1a..0000000000
---
a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/GlutenAggregateHashMapSuite.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql
-
-class GlutenSingleLevelAggregateHashMapSuite
- extends SingleLevelAggregateHashMapSuite
- with GlutenTestsCommonTrait {}
-
-class GlutenTwoLevelAggregateHashMapSuite
- extends TwoLevelAggregateHashMapSuite
- with GlutenTestsCommonTrait {}
-
-class GlutenTwoLevelAggregateHashMapWithVectorizedMapSuite
- extends TwoLevelAggregateHashMapWithVectorizedMapSuite
- with GlutenTestsCommonTrait {}
diff --git
a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/GlutenDataFrameAggregateSuite.scala
b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/GlutenDataFrameAggregateSuite.scala
index 2f3777caa1..a0338f10bc 100644
---
a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/GlutenDataFrameAggregateSuite.scala
+++
b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/GlutenDataFrameAggregateSuite.scala
@@ -17,10 +17,11 @@
package org.apache.spark.sql
import org.apache.gluten.config.GlutenConfig
-import org.apache.gluten.execution.HashAggregateExecBaseTransformer
+import org.apache.gluten.execution.{HashAggregateExecBaseTransformer,
HashAggregateExecTransformer}
import org.apache.spark.sql.execution.WholeStageCodegenExec
-import org.apache.spark.sql.execution.aggregate.{HashAggregateExec,
SortAggregateExec}
+import org.apache.spark.sql.execution.aggregate.{HashAggregateExec,
ObjectHashAggregateExec, SortAggregateExec}
+import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
@@ -188,22 +189,24 @@ class GlutenDataFrameAggregateSuite extends
DataFrameAggregateSuite with GlutenS
// This test is applicable to velox backend. For CH backend, the replacement
is disabled.
testGluten("use gluten hash agg to replace vanilla spark sort agg") {
- withSQLConf((GlutenConfig.COLUMNAR_FORCE_HASHAGG_ENABLED.key, "false")) {
- Seq("A", "B", "C", "D").toDF("col1").createOrReplaceTempView("t1")
- // SortAggregateExec is expected to be used for string type input.
- val df = spark.sql("select max(col1) from t1")
- checkAnswer(df, Row("D") :: Nil)
-
assert(find(df.queryExecution.executedPlan)(_.isInstanceOf[SortAggregateExec]).isDefined)
- }
+ withTempView("t1") {
+ withSQLConf((GlutenConfig.COLUMNAR_FORCE_HASHAGG_ENABLED.key, "false")) {
+ Seq("A", "B", "C", "D").toDF("col1").createOrReplaceTempView("t1")
+ // SortAggregateExec is expected to be used for string type input.
+ val df = spark.sql("select max(col1) from t1")
+ checkAnswer(df, Row("D") :: Nil)
+
assert(find(df.queryExecution.executedPlan)(_.isInstanceOf[SortAggregateExec]).isDefined)
+ }
- withSQLConf((GlutenConfig.COLUMNAR_FORCE_HASHAGG_ENABLED.key, "true")) {
- Seq("A", "B", "C", "D").toDF("col1").createOrReplaceTempView("t1")
- val df = spark.sql("select max(col1) from t1")
- checkAnswer(df, Row("D") :: Nil)
- // Sort agg is expected to be replaced by gluten's hash agg.
- assert(
- find(df.queryExecution.executedPlan)(
- _.isInstanceOf[HashAggregateExecBaseTransformer]).isDefined)
+ withSQLConf((GlutenConfig.COLUMNAR_FORCE_HASHAGG_ENABLED.key, "true")) {
+ Seq("A", "B", "C", "D").toDF("col1").createOrReplaceTempView("t1")
+ val df = spark.sql("select max(col1) from t1")
+ checkAnswer(df, Row("D") :: Nil)
+ // Sort agg is expected to be replaced by gluten's hash agg.
+ assert(
+ find(df.queryExecution.executedPlan)(
+ _.isInstanceOf[HashAggregateExecBaseTransformer]).isDefined)
+ }
}
}
@@ -280,4 +283,90 @@ class GlutenDataFrameAggregateSuite extends
DataFrameAggregateSuite with GlutenS
randn(Random.nextLong())
).foreach(assertNoExceptions)
}
+
+ testGluten("SPARK-22223: ObjectHashAggregate should not introduce
unnecessary shuffle") {
+ withSQLConf(SQLConf.USE_OBJECT_HASH_AGG.key -> "true") {
+ val df = Seq(("1", "2", 1), ("1", "2", 2), ("2", "3", 3), ("2", "3", 4))
+ .toDF("a", "b", "c")
+ .repartition(col("a"))
+
+ val objHashAggDF = df
+ .withColumn("d", expr("(a, b, c)"))
+ .groupBy("a", "b")
+ .agg(collect_list("d").as("e"))
+ .withColumn("f", expr("(b, e)"))
+ .groupBy("a")
+ .agg(collect_list("f").as("g"))
+ val aggPlan = objHashAggDF.queryExecution.executedPlan
+
+ val sortAggPlans = collect(aggPlan) { case sortAgg: SortAggregateExec =>
sortAgg }
+ // SortAggregate will be retained due velox_collect_list
+ assert(sortAggPlans.size == 4)
+
+ val objHashAggPlans = collect(aggPlan) {
+ case objHashAgg: ObjectHashAggregateExec => objHashAgg
+ }
+ assert(objHashAggPlans.isEmpty)
+
+ val exchangePlans = collect(aggPlan) { case shuffle: ShuffleExchangeExec
=> shuffle }
+ assert(exchangePlans.length == 1)
+ }
+ }
+
+ Seq(true, false).foreach {
+ value =>
+ testGluten(s"SPARK-31620: agg with subquery (whole-stage-codegen =
$value)") {
+ withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> value.toString) {
+ withTempView("t1", "t2") {
+ sql("create temporary view t1 as select * from values (1, 2) as
t1(a, b)")
+ sql("create temporary view t2 as select * from values (3, 4) as
t2(c, d)")
+
+ // test without grouping keys
+ checkAnswer(
+ sql("select sum(if(c > (select a from t1), d, 0)) as csum from
t2"),
+ Row(4) :: Nil)
+
+ // test with grouping keys
+ checkAnswer(
+ sql(
+ "select c, sum(if(c > (select a from t1), d, 0)) as csum from
" +
+ "t2 group by c"),
+ Row(3, 4) :: Nil)
+
+ // test with distinct
+ checkAnswer(
+ sql(
+ "select avg(distinct(d)), sum(distinct(if(c > (select a from
t1)," +
+ " d, 0))) as csum from t2 group by c"),
+ Row(4, 4) :: Nil)
+
+ // test subquery with agg
+ checkAnswer(
+ sql(
+ "select sum(distinct(if(c > (select sum(distinct(a)) from
t1)," +
+ " d, 0))) as csum from t2 group by c"),
+ Row(4) :: Nil)
+
+ // test SortAggregateExec
+ // Converts to HashAggregateExecTransformer
+ var df = sql("select max(if(c > (select a from t1), 'str1',
'str2')) as csum from t2")
+ df.collect()
+ assert(
+ find(df.queryExecution.executedPlan)(
+ _.isInstanceOf[HashAggregateExecTransformer]).isDefined)
+ checkAnswer(df, Row("str1") :: Nil)
+
+ // test ObjectHashAggregateExec
+ // Converts to HashAggregateExecTransformer
+ df =
+ sql("select collect_list(d), sum(if(c > (select a from t1), d,
0)) as csum from t2")
+ df.collect()
+ assert(
+ find(df.queryExecution.executedPlan)(
+ _.isInstanceOf[HashAggregateExecTransformer]).isDefined)
+ checkAnswer(df, Row(Array(4), 4) :: Nil)
+ }
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]