[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15772768#comment-15772768 ]
ASF GitHub Bot commented on FLINK-5280: --------------------------------------- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93759469 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala --- @@ -535,4 +509,74 @@ object TableEnvironment { new ScalaStreamTableEnv(executionEnvironment, tableConfig) } + + /** + * Returns field names and field positions for a given [[TypeInformation]]. + * + * Field names are automatically extracted for + * [[org.apache.flink.api.common.typeutils.CompositeType]]. + * The method fails if inputType is not a + * [[org.apache.flink.api.common.typeutils.CompositeType]]. + * + * @param inputType The TypeInformation extract the field names and positions from. + * @tparam A The type of the TypeInformation. + * @return A tuple of two arrays holding the field names and corresponding field positions. + */ + def getFieldInfo[A](inputType: TypeInformation[A]): (Array[String], Array[Int]) = { + validateType(inputType) + + val fieldNames: Array[String] = inputType match { + case t: TupleTypeInfo[A] => t.getFieldNames + case c: CaseClassTypeInfo[A] => c.getFieldNames + case p: PojoTypeInfo[A] => p.getFieldNames + case r: RowTypeInfo => r.getFieldNames + case tpe => + throw new TableException(s"Type $tpe lacks explicit field naming") + } + val fieldIndexes = fieldNames.indices.toArray + + if (fieldNames.contains("*")) { + throw new TableException("Field name can not be '*'.") + } + + (fieldNames, fieldIndexes) + } + + def validateType(typeInfo: TypeInformation[_]): Unit = { + val clazz = typeInfo.getTypeClass + if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) || + !Modifier.isPublic(clazz.getModifiers) || + clazz.getCanonicalName == null) { + throw TableException(s"Class '$clazz' described in type information '$typeInfo' must be " + + s"static and globally accessible.") + } + } + + /** + * Returns field types for a given [[TypeInformation]]. + * + * Field types are automatically extracted for + * [[org.apache.flink.api.common.typeutils.CompositeType]]. + * The method fails if inputType is not a + * [[org.apache.flink.api.common.typeutils.CompositeType]]. --- End diff -- I think we should not restrict the input type is a `CompositeType`, because the `TableSource.getReturnType` could return any type including `AtomicType`. > Extend TableSource to support nested data > ----------------------------------------- > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL > Affects Versions: 1.2.0 > Reporter: Fabian Hueske > Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)