Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6343#discussion_r203349277 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala --- @@ -18,33 +18,299 @@ package org.apache.flink.table.catalog +import org.apache.flink.table.descriptors.DescriptorProperties.toScala +import org.apache.flink.table.descriptors.StatisticsValidator.{STATISTICS_COLUMNS, STATISTICS_ROW_COUNT, readColumnStats} +import org.apache.flink.table.descriptors.StreamTableDescriptorValidator.{UPDATE_MODE, UPDATE_MODE_VALUE_APPEND, UPDATE_MODE_VALUE_RETRACT, UPDATE_MODE_VALUE_UPSERT} import org.apache.flink.table.descriptors._ import org.apache.flink.table.plan.stats.TableStats +import scala.collection.JavaConverters._ + /** - * Defines a table in an [[ExternalCatalog]]. + * Defines a table in an [[ExternalCatalog]]. External catalog tables describe table sources + * and/or sinks for both batch and stream environments. + * + * The catalog table takes descriptors which allow for declaring the communication to external + * systems in an implementation-agnostic way. The classpath is scanned for suitable table factories + * that match the desired configuration. + * + * Use the provided builder methods to configure the external catalog table accordingly. + * + * The following example shows how to read from a connector using a JSON format and + * declaring it as a table source: * - * @param connectorDesc describes the system to connect to - * @param formatDesc describes the data format of a connector - * @param schemaDesc describes the schema of the result table - * @param statisticsDesc describes the estimated statistics of the result table - * @param metadataDesc describes additional metadata of a table + * {{{ + * ExternalCatalogTable( + * new ExternalSystemXYZ() + * .version("0.11")) + * .withFormat( + * new Json() + * .jsonSchema("{...}") + * .failOnMissingField(false)) + * .withSchema( + * new Schema() + * .field("user-name", "VARCHAR").from("u_name") + * .field("count", "DECIMAL") + * .asTableSource() + * }}} + * + * Note: For backwards-compatibility, the table is declared as a table source for batch and + * streaming environment by default. + * + * See also [[org.apache.flink.table.factories.TableFactory]] for more information about how + * to target suitable factories. + * + * @param connectorDescriptor describes the system to connect to */ -class ExternalCatalogTable( - connectorDesc: ConnectorDescriptor, - formatDesc: Option[FormatDescriptor], - schemaDesc: Option[Schema], - statisticsDesc: Option[Statistics], - metadataDesc: Option[Metadata]) - extends TableSourceDescriptor { - - this.connectorDescriptor = Some(connectorDesc) - this.formatDescriptor = formatDesc - this.schemaDescriptor = schemaDesc - this.statisticsDescriptor = statisticsDesc - this.metaDescriptor = metadataDesc - - // expose statistics for external table source util - override def getTableStats: Option[TableStats] = super.getTableStats +class ExternalCatalogTable(val connectorDescriptor: ConnectorDescriptor) + extends TableDescriptor --- End diff -- This is the code style that we should all comply with.
---