[ https://issues.apache.org/jira/browse/FLINK-6196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15958785#comment-15958785 ]
ASF GitHub Bot commented on FLINK-6196: --------------------------------------- Github user clarkyzl commented on a diff in the pull request: https://github.com/apache/flink/pull/3623#discussion_r110141028 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala --- @@ -358,4 +366,120 @@ object UserDefinedFunctionUtils { InstantiationUtil .deserializeObject[UserDefinedFunction](byteData, Thread.currentThread.getContextClassLoader) } + + /** + * Build a TableFunctionCall, a name and a sequence of params will determine a unique + * [[TableFunctionCall]] + * + * @param name function name + * @param implicitResultType If no result type returned, it will use this type. + * @param params The input expressions + * @return A unique [[TableFunctionCall]] + */ + private[table] def buildTableFunctionCall( + name: String, + tableFunction: TableFunction[_], + implicitResultType: TypeInformation[_], + params: Expression*): TableFunctionCall = { + val arguments = transformLiteralExpressions(params: _*) + val userDefinedResultType = tableFunction.getResultType(arguments) + val resultType = { + if (userDefinedResultType != null) userDefinedResultType + else implicitResultType + } + TableFunctionCall(name, tableFunction, params, resultType) + } + + /** + * Transform the expressions to Objects + * Only literal expressions will be transformed, non-literal expressions will be + * translated to nulls. + * + * @param params actual parameters of the function + * @return A java List of the Objects + */ + private[table] def transformLiteralExpressions(params: Expression*): java.util.List[AnyRef] = { + params.map { + case exp: Literal => + exp.value.asInstanceOf[AnyRef] + case _ => + null + } + } + + /** + * Transform the rex nodes to Objects + * Only literal rex nodes will be transformed, non-literal rex nodes will be + * translated to nulls. + * + * @param rexNodes actual parameters of the function + * @return A java List of the Objects + */ + private[table] def transformRexNodes( + rexNodes: java.util.List[RexNode]): java.util.List[AnyRef] = { + rexNodes.map { + case rexNode: RexLiteral => + val value = rexNode.getValue2 + rexNode.getType.getSqlTypeName match { + case SqlTypeName.INTEGER => + value.asInstanceOf[Long].toInt.asInstanceOf[AnyRef] + case SqlTypeName.SMALLINT => + value.asInstanceOf[Long].toShort.asInstanceOf[AnyRef] + case SqlTypeName.TINYINT => + value.asInstanceOf[Long].toByte.asInstanceOf[AnyRef] + case SqlTypeName.FLOAT => + value.asInstanceOf[Double].toFloat.asInstanceOf[AnyRef] + case SqlTypeName.REAL => + value.asInstanceOf[Double].toFloat.asInstanceOf[AnyRef] + case _ => --- End diff -- `Timestamp` works fine. When I call `getValue2()` it will generate a `Long` on this. I will add a test on this. > Support dynamic schema in Table Function > ---------------------------------------- > > Key: FLINK-6196 > URL: https://issues.apache.org/jira/browse/FLINK-6196 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL > Reporter: Zhuoluo Yang > Assignee: Zhuoluo Yang > > In many of our use cases. We have to decide the schema of a UDTF at the run > time. For example. udtf('c1, c2, c3') will generate three columns for a > lateral view. > Most systems such as calcite and hive support this feature. However, the > current implementation of flink didn't implement the feature correctly. -- This message was sent by Atlassian JIRA (v6.3.15#6346)