twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237450569
########## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/MatchRecognizeITCase.scala ########## @@ -464,6 +467,129 @@ class MatchRecognizeITCase extends StreamingWithStateTestBase { assertEquals(expected.sorted, StreamITCase.testResults.sorted) } + /** + * This query checks: + * + * 1. count(D.price) produces 0, because no rows matched to D + * 2. sum(D.price) produces null, because no rows matched to D + * 3. aggregates that take multiple parameters work + * 4. aggregates with expressions work + */ + @Test + def testCepAggregates(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setParallelism(1) + val tEnv = TableEnvironment.getTableEnvironment(env) + tEnv.getConfig.setMaxGeneratedCodeLength(1) + StreamITCase.clear + + val data = new mutable.MutableList[(Int, String, Long, Double, Int)] + data.+=((1, "a", 1, 0.8, 1)) + data.+=((2, "z", 2, 0.8, 3)) + data.+=((3, "b", 1, 0.8, 2)) + data.+=((4, "c", 1, 0.8, 5)) + data.+=((5, "d", 4, 0.1, 5)) + data.+=((6, "a", 2, 1.5, 2)) + data.+=((7, "b", 2, 0.8, 3)) + data.+=((8, "c", 1, 0.8, 2)) + data.+=((9, "h", 2, 0.8, 3)) + + val t = env.fromCollection(data) + .toTable(tEnv, 'id, 'name, 'price, 'rate, 'weight, 'proctime.proctime) + tEnv.registerTable("MyTable", t) + tEnv.registerFunction("weightedAvg", new WeightedAvg) + + val sqlQuery = + s""" + |SELECT * + |FROM MyTable + |MATCH_RECOGNIZE ( + | ORDER BY proctime + | MEASURES + | FIRST(id) as startId, + | SUM(A.price) AS sumA, + | COUNT(DISTINCT D.price) AS countD, + | SUM(D.price) as sumD, + | weightedAvg(price, weight) as wAvg, + | AVG(B.price) AS avgB, + | SUM(B.price * B.rate) as sumExprB, + | LAST(id) as endId + | AFTER MATCH SKIP PAST LAST ROW + | PATTERN (A+ B+ C D? E ) + | DEFINE + | A AS SUM(A.price) < 6, + | B AS SUM(B.price * B.rate) < SUM(A.price) AND + | SUM(B.price * B.rate) > 0.2 AND + | SUM(B.price) >= 1 AND + | AVG(B.price) >= 1 AND + | weightedAvg(price, weight) > 1 + |) AS T + |""".stripMargin + + val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] + result.addSink(new StreamITCase.StringSink[Row]) + env.execute() + + val expected = mutable.MutableList("1,5,0,null,2,3,3.4,8") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testCepAggregatesWithNullInputs(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setParallelism(1) + val tEnv = TableEnvironment.getTableEnvironment(env) + tEnv.getConfig.setMaxGeneratedCodeLength(1) + StreamITCase.clear + + val data = new mutable.MutableList[Row] + data.+=(Row.of(1:java.lang.Integer, "a", 10:java.lang.Integer)) Review comment: nit: hint you can also use `Int.box()` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services