[
https://issues.apache.org/jira/browse/FLINK-6196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15957214#comment-15957214
]
ASF GitHub Bot commented on FLINK-6196:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3623#discussion_r109965960
--- Diff:
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/UserDefinedTableFunctions.scala
---
@@ -109,6 +110,133 @@ class TableFunc3(data: String, conf: Map[String,
String]) extends TableFunction[
}
}
+class DynamicSchema extends TableFunction[Row] {
+
+ def eval(str: String, column: Int): Unit = {
+ if (str.contains("#")) {
+ str.split("#").foreach({ s =>
+ val row = new Row(column)
+ row.setField(0, s)
+ var i = 0
+ for (i <- 1 until column) {
+ row.setField(i, s.length)
+ }
+ collect(row)
+ })
+ }
+ }
+
+ override def getResultType(arguments: java.util.List[AnyRef]):
TypeInformation[Row] = {
+ val column = arguments.get(1).asInstanceOf[Int]
+ val basicTypeInfos = new Array[TypeInformation[_]](column)
+ basicTypeInfos(0) = BasicTypeInfo.STRING_TYPE_INFO
+ for (i <- 1 until column) {
+ basicTypeInfos(i) = BasicTypeInfo.INT_TYPE_INFO
+ }
+ new RowTypeInfo(basicTypeInfos: _*)
+ }
+}
+
+class DynamicSchema0 extends TableFunction[Row] {
+
+ def eval(str: String, cols: String): Unit = {
+ val columns = cols.split(",")
+
+ if (str.contains("#")) {
+ str.split("#").foreach({ s =>
+ val row = new Row(columns.length)
+ row.setField(0, s)
+ for (i <- 1 until columns.length) {
+ if (columns(i).equals("string")) {
+ row.setField(i, s.length.toString)
+ } else if (columns(i).equals("int")) {
+ row.setField(i, s.length)
+ }
+ }
+ collect(row)
+ })
+ }
+ }
+
+ override def getResultType(arguments: java.util.List[AnyRef]):
TypeInformation[Row] = {
+ val columnStr = arguments.get(1).asInstanceOf[String]
+ val columns = columnStr.split(",")
+
+ val basicTypeInfos = new Array[TypeInformation[_]](columns.length)
+ for (i <- 0 until columns.length) {
+ if (columns(i).equals("string")) {
+ basicTypeInfos(i) = BasicTypeInfo.STRING_TYPE_INFO
+ } else if (columns(i).equals("int")) {
+ basicTypeInfos(i) = BasicTypeInfo.INT_TYPE_INFO
+ }
+ }
+ new RowTypeInfo(basicTypeInfos: _*)
+ }
+}
+
+class DynamicSchemaWithRexNodes extends TableFunction[Row] {
+
+ def eval(str: String, i: Int, si: Int, bi: Int, flt: Double, real:
Double, d: Double, b: Boolean):
+ Unit = {
+ val row = new Row(8)
+ row.setField(0, str)
+ row.setField(1, i)
+ row.setField(2, si)
+ row.setField(3, bi)
+ row.setField(4, flt)
+ row.setField(5, real)
+ row.setField(6, d)
+ row.setField(7, b)
+ collect(row)
+ }
+
+ override def getResultType(arguments: util.List[AnyRef]):
TypeInformation[Row] = {
--- End diff --
Can we add a more non primitive types like `Timestamp`?
> Support dynamic schema in Table Function
> ----------------------------------------
>
> Key: FLINK-6196
> URL: https://issues.apache.org/jira/browse/FLINK-6196
> Project: Flink
> Issue Type: Improvement
> Components: Table API & SQL
> Reporter: Zhuoluo Yang
> Assignee: Zhuoluo Yang
>
> In many of our use cases. We have to decide the schema of a UDTF at the run
> time. For example. udtf('c1, c2, c3') will generate three columns for a
> lateral view.
> Most systems such as calcite and hive support this feature. However, the
> current implementation of flink didn't implement the feature correctly.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)