[ https://issues.apache.org/jira/browse/FLINK-10597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16702894#comment-16702894 ]
ASF GitHub Bot commented on FLINK-10597: ---------------------------------------- twalthr commented on a change in pull request #7189: [FLINK-10597][table] Enabled UDFs support in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7189#discussion_r237402448 ########## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/MatchRecognizeITCase.scala ########## @@ -543,10 +543,79 @@ class MatchRecognizeITCase extends StreamingWithStateTestBase { // We do not assert the proctime in the result, cause it is currently // accessed from System.currentTimeMillis(), so there is no graceful way to assert the proctime } + + @Test + def testRichUdfs(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setParallelism(1) + val tEnv = TableEnvironment.getTableEnvironment(env) + tEnv.getConfig.setMaxGeneratedCodeLength(1) + StreamITCase.clear + + val data = new mutable.MutableList[(Int, String, Long)] + data.+=((1, "a", 1)) + data.+=((2, "a", 1)) + data.+=((3, "a", 1)) + data.+=((4, "a", 1)) + data.+=((5, "a", 1)) + data.+=((6, "b", 1)) + data.+=((7, "a", 1)) + data.+=((8, "a", 1)) + data.+=((9, "f", 1)) + + val t = env.fromCollection(data) + .toTable(tEnv, 'id, 'name, 'price, 'proctime.proctime) + tEnv.registerTable("MyTable", t) + tEnv.registerFunction("prefix", new RichScalarFunc) + val prefix = "PREF" + UserDefinedFunctionTestUtils + .setJobParameters(env, Map("prefix" -> prefix)) + + val sqlQuery = + s""" + |SELECT * + |FROM MyTable + |MATCH_RECOGNIZE ( + | ORDER BY proctime + | MEASURES + | FIRST(id) as firstId, + | prefix(A.name) as prefixedNameA, + | LAST(id) as lastId + | AFTER MATCH SKIP PAST LAST ROW + | PATTERN (A+ C) + | DEFINE + | A AS prefix(A.name) = '$prefix:a' + |) AS T + |""".stripMargin + + val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] + result.addSink(new StreamITCase.StringSink[Row]) + env.execute() + + val expected = mutable.MutableList("1,PREF:a,6", "7,PREF:a,9") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } } class ToMillis extends ScalarFunction { def eval(t: Timestamp): Long = { t.toInstant.toEpochMilli + TimeZone.getDefault.getOffset(t.toInstant.toEpochMilli) } } + +private class RichScalarFunc extends ScalarFunction { Review comment: Give more meaningful name. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Enable UDFs support in MATCH_RECOGNIZE > -------------------------------------- > > Key: FLINK-10597 > URL: https://issues.apache.org/jira/browse/FLINK-10597 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API & SQL > Reporter: Dawid Wysakowicz > Assignee: Dawid Wysakowicz > Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)