tillrohrmann commented on a change in pull request #8527:
URL: https://github.com/apache/flink/pull/8527#discussion_r696390658



##########
File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/agg/AggregateReduceGroupingITCase.scala
##########
@@ -0,0 +1,405 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.batch.sql.agg
+
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.table.api.{PlannerConfigOptions, TableConfigOptions, 
Types}
+import org.apache.flink.table.plan.stats.FlinkStatistic
+import org.apache.flink.table.runtime.utils.BatchTestBase
+import org.apache.flink.table.runtime.utils.BatchTestBase.row
+import org.apache.flink.table.util.DateTimeTestUtil.UTCTimestamp
+
+import org.junit.{Before, Test}
+
+import java.sql.Date
+
+import scala.collection.JavaConverters._
+import scala.collection.Seq
+
+class AggregateReduceGroupingITCase extends BatchTestBase {
+
+  @Before
+  def before(): Unit = {
+    registerCollection("T1",
+      Seq(row(2, 1, "A", null),
+        row(3, 2, "A", "Hi"),
+        row(5, 2, "B", "Hello"),
+        row(6, 3, "C", "Hello world")),
+      new RowTypeInfo(Types.INT, Types.INT, Types.STRING, Types.STRING),
+      "a1, b1, c1, d1",
+      Array(true, true, true, true),
+      FlinkStatistic.builder().uniqueKeys(Set(Set("a1").asJava).asJava).build()
+    )
+
+    registerCollection("T2",
+      Seq(row(1, 1, "X"),
+        row(1, 2, "Y"),
+        row(2, 3, null),
+        row(2, 4, "Z")),
+      new RowTypeInfo(Types.INT, Types.INT, Types.STRING),
+      "a2, b2, c2",
+      Array(true, true, true),
+      FlinkStatistic.builder()
+        .uniqueKeys(Set(Set("b2").asJava, Set("a2", 
"b2").asJava).asJava).build()
+    )
+
+    registerCollection("T3",
+      Seq(row(1, 10, "Hi", 1L),
+        row(2, 20, "Hello", 1L),
+        row(2, 20, "Hello world", 2L),
+        row(3, 10, "Hello world, how are you?", 1L),
+        row(4, 20, "I am fine.", 2L),
+        row(4, null, "Luke Skywalker", 2L)),
+      new RowTypeInfo(Types.INT, Types.INT, Types.STRING, Types.LONG),
+      "a3, b3, c3, d3",
+      Array(true, true, true, true),
+      FlinkStatistic.builder().uniqueKeys(Set(Set("a1").asJava).asJava).build()
+    )
+
+    registerCollection("T4",
+      Seq(row(1, 1, "A", UTCTimestamp("2018-06-01 10:05:30"), "Hi"),
+        row(2, 1, "B", UTCTimestamp("2018-06-01 10:10:10"), "Hello"),
+        row(3, 2, "B", UTCTimestamp("2018-06-01 10:15:25"), "Hello world"),
+        row(4, 3, "C", UTCTimestamp("2018-06-01 10:36:49"), "I am fine.")),
+      new RowTypeInfo(Types.INT, Types.INT, Types.STRING, Types.SQL_TIMESTAMP, 
Types.STRING),
+      "a4, b4, c4, d4, e4",
+      Array(true, true, true, true, true),
+      FlinkStatistic.builder().uniqueKeys(Set(Set("a4").asJava).asJava).build()
+    )
+
+    registerCollection("T5",
+      Seq(row(2, 1, "A", null),
+        row(3, 2, "B", "Hi"),
+        row(1, null, "C", "Hello"),
+        row(4, 3, "D", "Hello world"),
+        row(3, 1, "E", "Hello world, how are you?"),
+        row(5, null, "F", null),
+        row(7, 2, "I", "hahaha"),
+        row(6, 1, "J", "I am fine.")),
+      new RowTypeInfo(Types.INT, Types.INT, Types.STRING, Types.STRING),
+      "a5, b5, c5, d5",
+      Array(true, true, true, true),
+      FlinkStatistic.builder().uniqueKeys(Set(Set("c5").asJava).asJava).build()
+    )
+
+    registerCollection("T6",
+      (0 until 50000).map(
+        i => row(i, 1L, if (i % 500 == 0) null else s"Hello$i", "Hello world", 
10,
+          new Date(i + 1531820000000L))),
+      new RowTypeInfo(Types.INT, Types.LONG, Types.STRING, Types.STRING, 
Types.INT, Types.SQL_DATE),
+      "a6, b6, c6, d6, e6, f6",
+      Array(true, true, true, true, true, true),
+      FlinkStatistic.builder().uniqueKeys(Set(Set("a6").asJava).asJava).build()
+    )
+    // HashJoin is disabled due to translateToPlanInternal method is not 
implemented yet
+    
tEnv.getConfig.getConf.setString(TableConfigOptions.SQL_EXEC_DISABLED_OPERATORS,
 "HashJoin")
+  }
+
+  @Test
+  def testSingleAggOnTable_SortAgg(): Unit = {
+    
tEnv.getConfig.getConf.setString(TableConfigOptions.SQL_EXEC_DISABLED_OPERATORS,
 "HashAgg")
+    testSingleAggOnTable()
+    checkResult("SELECT a6, b6, max(c6), count(d6), sum(e6) FROM T6 GROUP BY 
a6, b6",
+      (0 until 50000).map(i => row(i, 1L, if (i % 500 == 0) null else 
s"Hello$i", 1L, 10))
+    )
+  }
+
+  @Test
+  def testSingleAggOnTable_HashAgg_WithLocalAgg(): Unit = {
+    
tEnv.getConfig.getConf.setString(TableConfigOptions.SQL_EXEC_DISABLED_OPERATORS,
 "SortAgg")
+    tEnv.getConfig.getConf.setString(
+      PlannerConfigOptions.SQL_OPTIMIZER_AGG_PHASE_ENFORCER, "TWO_PHASE")
+    
tEnv.getConfig.getConf.setInteger(TableConfigOptions.SQL_RESOURCE_HASH_AGG_TABLE_MEM,
 2) // 1M
+    testSingleAggOnTable()
+  }
+
+  @Test
+  def testSingleAggOnTable_HashAgg_WithoutLocalAgg(): Unit = {
+    
tEnv.getConfig.getConf.setString(TableConfigOptions.SQL_EXEC_DISABLED_OPERATORS,
 "SortAgg")
+    tEnv.getConfig.getConf.setString(
+      PlannerConfigOptions.SQL_OPTIMIZER_AGG_PHASE_ENFORCER, "ONE_PHASE")
+    
tEnv.getConfig.getConf.setInteger(TableConfigOptions.SQL_RESOURCE_HASH_AGG_TABLE_MEM,
 2) // 1M
+    testSingleAggOnTable()
+  }
+
+  private def testSingleAggOnTable(): Unit = {
+    // group by fix length
+    checkResult("SELECT a1, b1, count(c1) FROM T1 GROUP BY a1, b1",
+      Seq(row(2, 1, 1), row(3, 2, 1), row(5, 2, 1), row(6, 3, 1)))
+    // group by string
+    checkResult("SELECT a1, c1, count(d1), avg(b1) FROM T1 GROUP BY a1, c1",
+      Seq(row(2, "A", 0, 1.0), row(3, "A", 1, 2.0), row(5, "B", 1, 2.0), 
row(6, "C", 1, 3.0)))
+    checkResult("SELECT c5, d5, avg(b5), avg(a5) FROM T5 WHERE d5 IS NOT NULL 
GROUP BY c5, d5",
+      Seq(row("B", "Hi", 2.0, 3.0), row("C", "Hello", null, 1.0),
+        row("D", "Hello world", 3.0, 4.0), row("E", "Hello world, how are 
you?", 1.0, 3.0),
+        row("I", "hahaha", 2.0, 7.0), row("J", "I am fine.", 1.0, 6.0)))
+    // group by string with null
+    checkResult("SELECT a1, d1, count(d1) FROM T1 GROUP BY a1, d1",
+      Seq(row(2, null, 0), row(3, "Hi", 1), row(5, "Hello", 1), row(6, "Hello 
world", 1)))
+    checkResult("SELECT c5, d5, avg(b5), avg(a5) FROM T5 GROUP BY c5, d5",
+      Seq(row("A", null, 1.0, 2.0), row("B", "Hi", 2.0, 3.0), row("C", 
"Hello", null, 1.0),
+        row("D", "Hello world", 3.0, 4.0), row("E", "Hello world, how are 
you?", 1.0, 3.0),
+        row("F", null, null, 5.0), row("I", "hahaha", 2.0, 7.0), row("J", "I 
am fine.", 1.0, 6.0)))
+
+    checkResult("SELECT a3, b3, count(c3) FROM T3 GROUP BY a3, b3",
+      Seq(row(1, 10, 1), row(2, 20, 2), row(3, 10, 1), row(4, 20, 1), row(4, 
null, 1)))
+    checkResult("SELECT a2, b2, count(c2) FROM T2 GROUP BY a2, b2",
+      Seq(row(1, 1, 1), row(1, 2, 1), row(2, 3, 0), row(2, 4, 1)))
+
+    // group by constants
+    checkResult("SELECT a1, b1, count(c1) FROM T1 GROUP BY a1, b1, 1, true",
+      Seq(row(2, 1, 1), row(3, 2, 1), row(5, 2, 1), row(6, 3, 1)))
+    checkResult("SELECT count(c1) FROM T1 GROUP BY 1, true", Seq(row(4)))
+
+    // large data, for hash agg mode it will fallback
+    checkResult("SELECT a6, c6, avg(b6), count(d6), avg(e6) FROM T6 GROUP BY 
a6, c6",
+      (0 until 50000).map(i => row(i, if (i % 500 == 0) null else s"Hello$i", 
1D, 1L, 10D))
+    )
+    checkResult("SELECT a6, d6, avg(b6), count(c6), avg(e6) FROM T6 GROUP BY 
a6, d6",
+      (0 until 50000).map(i => row(i, "Hello world", 1D, if (i % 500 == 0) 0L 
else 1L, 10D))
+    )
+    checkResult("SELECT a6, f6, avg(b6), count(c6), avg(e6) FROM T6 GROUP BY 
a6, f6",
+      (0 until 50000).map(i => row(i, new Date(i + 1531820000000L), 1D,
+        if (i % 500 == 0) 0L else 1L, 10D))
+    )
+  }

Review comment:
       A piece of feedback for running multiple jobs from the same test method: 
It is super hard to figure out what's going on in the logs because multiple 
jobs are submitted from `testSingleAggOnTable`. To make things even worse, all 
jobs have the same name `"collect"`. I would propose to create for every query 
a dedicated test method with a descriptive name.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to