[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15802703#comment-15802703 ]
ASF GitHub Bot commented on FLINK-5280: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r94859631 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala --- @@ -535,4 +511,92 @@ object TableEnvironment { new ScalaStreamTableEnv(executionEnvironment, tableConfig) } + + /** + * Returns field names for a given [[TypeInformation]]. + * + * Field names are automatically extracted for + * [[org.apache.flink.api.common.typeutils.CompositeType]] + * or [[org.apache.flink.api.common.typeinfo.AtomicType]]. + * The method fails if inputType is not a + * [[org.apache.flink.api.common.typeutils.CompositeType]] + * or [[org.apache.flink.api.common.typeinfo.AtomicType]]. + * + * @param inputType The TypeInformation extract the field names. + * @tparam A The type of the TypeInformation. + * @return A an array holding the field names + */ + def getFieldNames[A](inputType: TypeInformation[A]): Array[String] = { + validateType(inputType) + + val fieldNames: Array[String] = inputType match { + case t: CompositeType[_] => t.getFieldNames + case a: AtomicType[_] => Array("f0") + case tpe => + throw new TableException(s"Currently only CompositeType and AtomicType are supported. " + + s"Type $tpe lacks explicit field naming") + } + + if (fieldNames.contains("*")) { + throw new TableException("Field name can not be '*'.") + } + + fieldNames + } + + /** + * Validate if class represented by the typeInfo is static and globally accessible + * @param typeInfo type to check + * @throws TableException if type does not meet these criteria + */ + def validateType(typeInfo: TypeInformation[_]): Unit = { + val clazz = typeInfo.getTypeClass + if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) || + !Modifier.isPublic(clazz.getModifiers) || + clazz.getCanonicalName == null) { + throw TableException(s"Class '$clazz' described in type information '$typeInfo' must be " + + s"static and globally accessible.") + } + } + + /** + * Returns field indexes for a given [[TypeInformation]]. + * + * Field indexes are automatically extracted for + * [[org.apache.flink.api.common.typeutils.CompositeType]] + * or [[org.apache.flink.api.common.typeinfo.AtomicType]]. + * The method fails if inputType is not a --- End diff -- No need to mention this, IMO. > Extend TableSource to support nested data > ----------------------------------------- > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL > Affects Versions: 1.2.0 > Reporter: Fabian Hueske > Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)