twalthr commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237441497
 
 

 ##########
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/MatchRecognizeITCase.scala
 ##########
 @@ -464,6 +467,129 @@ class MatchRecognizeITCase extends 
StreamingWithStateTestBase {
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 
+  /**
+    * This query checks:
+    *
+    * 1. count(D.price) produces 0, because no rows matched to D
+    * 2. sum(D.price) produces null, because no rows matched to D
+    * 3. aggregates that take multiple parameters work
+    * 4. aggregates with expressions work
+    */
+  @Test
+  def testCepAggregates(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setParallelism(1)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    tEnv.getConfig.setMaxGeneratedCodeLength(1)
+    StreamITCase.clear
+
+    val data = new mutable.MutableList[(Int, String, Long, Double, Int)]
+    data.+=((1, "a", 1, 0.8, 1))
+    data.+=((2, "z", 2, 0.8, 3))
+    data.+=((3, "b", 1, 0.8, 2))
+    data.+=((4, "c", 1, 0.8, 5))
+    data.+=((5, "d", 4, 0.1, 5))
+    data.+=((6, "a", 2, 1.5, 2))
+    data.+=((7, "b", 2, 0.8, 3))
+    data.+=((8, "c", 1, 0.8, 2))
+    data.+=((9, "h", 2, 0.8, 3))
+
+    val t = env.fromCollection(data)
+      .toTable(tEnv, 'id, 'name, 'price, 'rate, 'weight, 'proctime.proctime)
+    tEnv.registerTable("MyTable", t)
+    tEnv.registerFunction("weightedAvg", new WeightedAvg)
+
+    val sqlQuery =
+      s"""
+         |SELECT *
+         |FROM MyTable
+         |MATCH_RECOGNIZE (
+         |  ORDER BY proctime
+         |  MEASURES
+         |    FIRST(id) as startId,
+         |    SUM(A.price) AS sumA,
+         |    COUNT(DISTINCT D.price) AS countD,
 
 Review comment:
   remove this when we don't support it

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to