[ https://issues.apache.org/jira/browse/FLINK-9852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546407#comment-16546407 ]
ASF GitHub Bot commented on FLINK-9852: --------------------------------------- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6343#discussion_r202978881 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala --- @@ -18,33 +18,299 @@ package org.apache.flink.table.catalog +import org.apache.flink.table.descriptors.DescriptorProperties.toScala +import org.apache.flink.table.descriptors.StatisticsValidator.{STATISTICS_COLUMNS, STATISTICS_ROW_COUNT, readColumnStats} +import org.apache.flink.table.descriptors.StreamTableDescriptorValidator.{UPDATE_MODE, UPDATE_MODE_VALUE_APPEND, UPDATE_MODE_VALUE_RETRACT, UPDATE_MODE_VALUE_UPSERT} import org.apache.flink.table.descriptors._ import org.apache.flink.table.plan.stats.TableStats +import scala.collection.JavaConverters._ + /** - * Defines a table in an [[ExternalCatalog]]. + * Defines a table in an [[ExternalCatalog]]. External catalog tables describe table sources + * and/or sinks for both batch and stream environments. + * + * The catalog table takes descriptors which allow for declaring the communication to external + * systems in an implementation-agnostic way. The classpath is scanned for suitable table factories + * that match the desired configuration. + * + * Use the provided builder methods to configure the external catalog table accordingly. + * + * The following example shows how to read from a connector using a JSON format and + * declaring it as a table source: * - * @param connectorDesc describes the system to connect to - * @param formatDesc describes the data format of a connector - * @param schemaDesc describes the schema of the result table - * @param statisticsDesc describes the estimated statistics of the result table - * @param metadataDesc describes additional metadata of a table + * {{{ + * ExternalCatalogTable( + * new ExternalSystemXYZ() + * .version("0.11")) + * .withFormat( + * new Json() + * .jsonSchema("{...}") + * .failOnMissingField(false)) + * .withSchema( + * new Schema() + * .field("user-name", "VARCHAR").from("u_name") + * .field("count", "DECIMAL") + * .asTableSource() --- End diff -- I also thought about that but actually descriptors don't "build" something. The only final representation would be the properties but we don't expose them to the user. > Expose descriptor-based sink creation in table environments > ----------------------------------------------------------- > > Key: FLINK-9852 > URL: https://issues.apache.org/jira/browse/FLINK-9852 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL > Reporter: Timo Walther > Assignee: Timo Walther > Priority: Major > Labels: pull-request-available > > Currently, only a table source can be created using the unified table > descriptors with {{tableEnv.from(...)}}. A similar approach should be > supported for defining sinks or even both types at the same time. > I suggest the following syntax: > {code} > tableEnv.connect(Kafka(...)).registerSource("name") > tableEnv.connect(Kafka(...)).registerSink("name") > tableEnv.connect(Kafka(...)).registerSourceAndSink("name") > {code} > A table could then access the registered source/sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)