[ https://issues.apache.org/jira/browse/FLINK-4686?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15529858#comment-15529858 ]
ASF GitHub Bot commented on FLINK-4686: --------------------------------------- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2553#discussion_r80931239 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/Schema.scala --- @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table + +import org.apache.flink.api.common.typeinfo.TypeInformation + +/** + * A Schema represents a Table's structure + */ +class Schema( + val columnNames: Array[String], + val columnTypes: Array[TypeInformation[_]]) { + + if (columnNames.length != columnTypes.length) { + throw new TableException( + "Number of column indexes and column names must be equal.") + } + + // check uniqueness of field names + if (columnNames.toSet.size != columnTypes.length) { + throw new TableException( + "Table column names must be unique.") + } + + val columnNameToIndex: Map[String, Int] = columnNames.zipWithIndex.toMap + + /** + * Returns all type informations as an array + */ + def getTypes: Array[TypeInformation[_]] = columnTypes + + /** + * Returns the specified type information for the given column index + * + * @param columnIndex the index of the field + */ + def getType(columnIndex: Int): TypeInformation[_] = columnTypes(columnIndex) + + /** + * Returns the specified type information for the given column name + * + * @param columnName the name of the field + */ + def getType(columnName: String): TypeInformation[_] = { + if (columnNameToIndex.contains(columnName)) { + columnTypes(columnNameToIndex(columnName)) + } else { + throw FieldNotFoundException(s"Table doesn't have the column : $columnName") --- End diff -- You could also return an `Option`. Then we don't need a new exception type. > Add possibility to get column names > ----------------------------------- > > Key: FLINK-4686 > URL: https://issues.apache.org/jira/browse/FLINK-4686 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL > Reporter: Timo Walther > Assignee: Jark Wu > Labels: starter > > For debugging and maybe for visualization in future (e.g. in a shell) it > would be good to have the possibilty to get the names of {{Table}} columns. > At the moment the user has no idea how the table columns are named; if they > need to be matched with POJO fields for example. > My suggestion: > {code} > Schema s = table.schema(); > TypeInformation<?> type = s.getType(1); > TypeInformation<?> type = s.getType("col"); > String s = s.getColumnName(1); > String[] s = s.getColumnNames(); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)