[ https://issues.apache.org/jira/browse/FLINK-4469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15584302#comment-15584302 ]
ASF GitHub Bot commented on FLINK-4469: --------------------------------------- GitHub user wuchong opened a pull request: https://github.com/apache/flink/pull/2653 [FLINK-4469] [table] Add support for user defined table function in Table API & SQL Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [x] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [x] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed This PR introduces user-defined table functions for the Table and SQL API. I will add documentation after this proposal is accepted. This is the general syntax so far: In Java: ```java public class Split extends TableFunction<String> { public int eval(String str) { for (String s : str.split(" ")) { collect(s); } } } tableEnv.registerFunction("split", new Split()); // cross apply Table result = table.crossApply("split(c)", "s").select("c, s") Table result = tableEnv.sql("SELECT MyTable.c, t.s FROM MyTable CROSS APPLY split(c) as t(s)") // outer apply Table result = table.outerApply("split(c)", "s").select("c, s") Table result = tableEnv.sql("SELECT MyTable.c, t.s FROM MyTable OUTER APPLY split(c) as t(s)") ``` In Scala ```scala object Split extends TableFunction<String> { def eval(str: String): Unit = { str.split(" ").foreach(collect) } } // cross apply val result = table.crossApply(Split('c) as ('s)).select('c, 's) tableEnv.registerFunction("split", Split); val result = tableEnv.sql("SELECT MyTable.c, t.s FROM MyTable CROSS APPLY split(c) as t(s)") // outer apply val result = table.outerApply(Split('c) as ('s)).select('c, 's) tableEnv.registerFunction("split", Split); val result = tableEnv.sql("SELECT MyTable.c, t.s FROM MyTable OUTER APPLY split(c) as t(s)") ``` You can merge this pull request into a Git repository by running: $ git pull https://github.com/wuchong/flink udtf-FLINK-4469 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2653.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2653 ---- commit 60812e51156ec9fa6088154d2f6dea8c1ff9ac17 Author: Jark Wu <wuchong...@alibaba-inc.com> Date: 2016-10-18T03:15:07Z [FLINK-4469] [table] Add support for user defined table function in Table API & SQL ---- > Add support for user defined table function in Table API & SQL > -------------------------------------------------------------- > > Key: FLINK-4469 > URL: https://issues.apache.org/jira/browse/FLINK-4469 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL > Reporter: Jark Wu > Assignee: Jark Wu > > Normal user-defined functions, such as concat(), take in a single input row > and output a single output row. In contrast, table-generating functions > transform a single input row to multiple output rows. It is very useful in > some cases, such as look up in HBase by rowkey and return one or more rows. > Adding a user defined table function should: > 1. inherit from UDTF class with specific generic type T > 2. define one or more evel function. > NOTE: > 1. the eval method must be public and non-static. > 2. eval should always return java.lang.Iterable or scala.collection.Iterable > with the generic type T. > 3. the generic type T is the row type returned by table function. Because of > Java type erasure, we can’t extract T from the Iterable. > 4. eval method can be overload. Blink will choose the best match eval method > to call according to parameter types and number. > {code} > public class Word { > public String word; > public Integer length; > } > public class SplitStringUDTF extends UDTF<Word> { > public Iterable<Word> eval(String str) { > if (str == null) { > return new ArrayList<>(); > } else { > List<Word> list = new ArrayList<>(); > for (String s : str.split(",")) { > Word word = new Word(s, s.length()); > list.add(word); > } > return list; > } > } > } > // in SQL > tableEnv.registerFunction("split", new SplitStringUDTF()) > tableEnv.sql("SELECT a, b, t.* FROM MyTable CROSS APPLY split(c) AS t(w,l)") > // in Java Table API > tableEnv.registerFunction("split", new SplitStringUDTF()) > // rename split table columns to “w” and “l” > table.crossApply("split(c)", "w, l") > .select("a, b, w, l") > // without renaming, we will use the origin field names in the POJO/case/... > table.crossApply("split(c)") > .select("a, b, word, length") > // in Scala Table API > val split = new SplitStringUDTF() > table.crossApply(split('c), 'w, 'l) > .select('a, 'b, 'w, 'l) > // outerApply for outer join to a UDTF > table.outerApply(split('c)) > .select('a, 'b, 'word, 'length) > {code} > Here we introduce CROSS/OUTER APPLY keywords to join table functions , which > is used in SQL Server. We can discuss the API in the comment. > Maybe the {{UDTF}} class should be replaced by {{TableFunction}} or something > others, because we have introduced {{ScalarFunction}} for custom functions, > we need to keep consistent. Although, I prefer {{UDTF}} rather than > {{TableFunction}} as the former is more SQL-like and the latter maybe > confused with DataStream functions. > **This issue is blocked by CALCITE-1309, so we need to wait Calcite fix this > and release.** > See [1] for more information about UDTF design. > [1] > https://docs.google.com/document/d/15iVc1781dxYWm3loVQlESYvMAxEzbbuVFPZWBYuY1Ek/edit# -- This message was sent by Atlassian JIRA (v6.3.4#6332)