[ https://issues.apache.org/jira/browse/FLINK-6196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15944394#comment-15944394 ]
ASF GitHub Bot commented on FLINK-6196: --------------------------------------- Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3623#discussion_r108319900 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTableFunctionImpl.scala --- @@ -27,52 +27,73 @@ import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation} import org.apache.flink.api.common.typeutils.CompositeType import org.apache.flink.table.api.TableException import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils +import org.apache.flink.table.functions.{TableFunction => FlinkUDTF} /** * This is heavily inspired by Calcite's [[org.apache.calcite.schema.impl.TableFunctionImpl]]. * We need it in order to create a [[org.apache.flink.table.functions.utils.TableSqlFunction]]. * The main difference is that we override the [[getRowType()]] and [[getElementType()]]. + * + * @param tableFunction The Table Function instance + * @param implicitResultType The implicit result type. + * @param evalMethod The eval() method of the [[tableFunction]] */ -class FlinkTableFunctionImpl[T]( - val typeInfo: TypeInformation[T], - val fieldIndexes: Array[Int], - val fieldNames: Array[String], - val evalMethod: Method) +class FlinkTableFunctionImpl[T](val tableFunction: FlinkUDTF[_], + val implicitResultType: TypeInformation[_], + val evalMethod: Method) extends ReflectiveFunctionBase(evalMethod) with TableFunction { - if (fieldIndexes.length != fieldNames.length) { - throw new TableException( - "Number of field indexes and field names must be equal.") - } + override def getElementType(arguments: util.List[AnyRef]): Type = classOf[Array[Object]] - // check uniqueness of field names - if (fieldNames.length != fieldNames.toSet.size) { - throw new TableException( - "Table field names must be unique.") + override def getRowType(typeFactory: RelDataTypeFactory, + arguments: util.List[AnyRef]): RelDataType = { + + // Get the result type from table function. If it is not null, the implicitResultType may + // already be generated by Table API's apply() method. + val resultType = if (tableFunction.getResultType(arguments) != null) { + tableFunction.getResultType(arguments) + } else { + implicitResultType + } + val (fieldNames, fieldIndexes, _) = UserDefinedFunctionUtils.getFieldInfo(resultType) + buildRelDataType(typeFactory, resultType, fieldNames, fieldIndexes) } - val fieldTypes: Array[TypeInformation[_]] = - typeInfo match { - case cType: CompositeType[T] => - if (fieldNames.length != cType.getArity) { - throw new TableException( - s"Arity of type (" + cType.getFieldNames.deep + ") " + - "not equal to number of field names " + fieldNames.deep + ".") - } - fieldIndexes.map(cType.getTypeAt(_).asInstanceOf[TypeInformation[_]]) - case aType: AtomicType[T] => - if (fieldIndexes.length != 1 || fieldIndexes(0) != 0) { - throw new TableException( - "Non-composite input type may have only a single field and its index must be 0.") - } - Array(aType) + private [table] def buildRelDataType(typeFactory: RelDataTypeFactory, --- End diff -- I think this method can go into `UserDefinedFunctionUtils` > Support dynamic schema in Table Function > ---------------------------------------- > > Key: FLINK-6196 > URL: https://issues.apache.org/jira/browse/FLINK-6196 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL > Reporter: Zhuoluo Yang > Assignee: Zhuoluo Yang > > In many of our use cases. We have to decide the schema of a UDTF at the run > time. For example. udtf('c1, c2, c3') will generate three columns for a > lateral view. > Most systems such as calcite and hive support this feature. However, the > current implementation of flink didn't implement the feature correctly. -- This message was sent by Atlassian JIRA (v6.3.15#6346)