[ https://issues.apache.org/jira/browse/FLINK-6196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15978695#comment-15978695 ]
ASF GitHub Bot commented on FLINK-6196: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3623#discussion_r112679780 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/DeferredTypeFlinkTableFunction.scala --- @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.schema + +import java.lang.reflect.Method +import java.util + +import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils +import org.apache.flink.table.functions.{TableFunction => FlinkUDTF} + +import scala.collection.JavaConversions._ + +/** + * A Deferred Type is a Table Function which the result type hasn't been determined yet. + * It will determine the result type after the arguments are passed. + * + * @param tableFunction The Table Function instance + * @param evalMethod The eval() method of the [[tableFunction]] + * @param implicitResultType Implicit result type. + */ +class DeferredTypeFlinkTableFunction( + val tableFunction: FlinkUDTF[_], + val evalMethod: Method, + val implicitResultType: TypeInformation[_]) + extends FlinkTableFunction(tableFunction, evalMethod) { + + val paramTypeInfos = evalMethod.getParameterTypes.toList + + override def getResultType(arguments: util.List[AnyRef]): TypeInformation[_] = { + determineResultType(arguments) + } + + override def getRowType( + typeFactory: RelDataTypeFactory, + arguments: util.List[AnyRef]): RelDataType = { + val resultType = determineResultType(arguments) + val (fieldNames, fieldIndexes, _) = UserDefinedFunctionUtils.getFieldInfo(resultType) + UserDefinedFunctionUtils.buildRelDataType(typeFactory, resultType, fieldNames, fieldIndexes) + } + + private def determineResultType(arguments: util.List[AnyRef]): TypeInformation[_] = { + val resultType = tableFunction.getResultType(arguments, paramTypeInfos) --- End diff -- Here we use the parameter types of `eval()`. This is different from the logic in `UserDefinedFunctionUtils`. I think we should make this consistent because users should expect consistent behavior. > Support dynamic schema in Table Function > ---------------------------------------- > > Key: FLINK-6196 > URL: https://issues.apache.org/jira/browse/FLINK-6196 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL > Reporter: Zhuoluo Yang > Assignee: Zhuoluo Yang > > In many of our use cases. We have to decide the schema of a UDTF at the run > time. For example. udtf('c1, c2, c3') will generate three columns for a > lateral view. > Most systems such as calcite and hive support this feature. However, the > current implementation of flink didn't implement the feature correctly. -- This message was sent by Atlassian JIRA (v6.3.15#6346)