twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237441497
########## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/MatchRecognizeITCase.scala ########## @@ -464,6 +467,129 @@ class MatchRecognizeITCase extends StreamingWithStateTestBase { assertEquals(expected.sorted, StreamITCase.testResults.sorted) } + /** + * This query checks: + * + * 1. count(D.price) produces 0, because no rows matched to D + * 2. sum(D.price) produces null, because no rows matched to D + * 3. aggregates that take multiple parameters work + * 4. aggregates with expressions work + */ + @Test + def testCepAggregates(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setParallelism(1) + val tEnv = TableEnvironment.getTableEnvironment(env) + tEnv.getConfig.setMaxGeneratedCodeLength(1) + StreamITCase.clear + + val data = new mutable.MutableList[(Int, String, Long, Double, Int)] + data.+=((1, "a", 1, 0.8, 1)) + data.+=((2, "z", 2, 0.8, 3)) + data.+=((3, "b", 1, 0.8, 2)) + data.+=((4, "c", 1, 0.8, 5)) + data.+=((5, "d", 4, 0.1, 5)) + data.+=((6, "a", 2, 1.5, 2)) + data.+=((7, "b", 2, 0.8, 3)) + data.+=((8, "c", 1, 0.8, 2)) + data.+=((9, "h", 2, 0.8, 3)) + + val t = env.fromCollection(data) + .toTable(tEnv, 'id, 'name, 'price, 'rate, 'weight, 'proctime.proctime) + tEnv.registerTable("MyTable", t) + tEnv.registerFunction("weightedAvg", new WeightedAvg) + + val sqlQuery = + s""" + |SELECT * + |FROM MyTable + |MATCH_RECOGNIZE ( + | ORDER BY proctime + | MEASURES + | FIRST(id) as startId, + | SUM(A.price) AS sumA, + | COUNT(DISTINCT D.price) AS countD, Review comment: remove this when we don't support it ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services