[ https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16535678#comment-16535678 ]
ASF GitHub Bot commented on FLINK-8866: --------------------------------------- Github user suez1224 commented on a diff in the pull request: https://github.com/apache/flink/pull/6201#discussion_r200805902 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java --- @@ -56,23 +58,44 @@ public Environment() { return tables; } + private static TableDescriptor create(String name, Map<String, Object> config) { + if (!config.containsKey(TableDescriptorValidator.TABLE_TYPE())) { + throw new SqlClientException("The 'type' attribute of a table is missing."); + } + final String tableType = (String) config.get(TableDescriptorValidator.TABLE_TYPE()); + if (tableType.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE())) { + return new Source(name, ConfigUtil.normalizeYaml(config)); + } else if (tableType.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SINK())) { + return new Sink(name, ConfigUtil.normalizeYaml(config)); + } else if (tableType.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE_SINK())) { + return new SourceSink(name, ConfigUtil.normalizeYaml(config)); + } + return null; + } + public void setTables(List<Map<String, Object>> tables) { this.tables = new HashMap<>(tables.size()); tables.forEach(config -> { - if (!config.containsKey(TableDescriptorValidator.TABLE_TYPE())) { - throw new SqlClientException("The 'type' attribute of a table is missing."); + if (!config.containsKey(NAME)) { + throw new SqlClientException("The 'name' attribute of a table is missing."); } - if (config.get(TableDescriptorValidator.TABLE_TYPE()).equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE())) { - config.remove(TableDescriptorValidator.TABLE_TYPE()); - final Source s = Source.create(config); - if (this.tables.containsKey(s.getName())) { - throw new SqlClientException("Duplicate source name '" + s + "'."); - } - this.tables.put(s.getName(), s); - } else { + final Object name = config.get(NAME); + if (name == null || !(name instanceof String) || ((String) name).length() <= 0) { + throw new SqlClientException("Invalid table name '" + name + "'."); + } + final String tableName = (String) name; + final Map<String, Object> properties = new HashMap<>(config); + properties.remove(NAME); + + TableDescriptor tableDescriptor = create(tableName, properties); + if (null == tableDescriptor) { throw new SqlClientException( - "Invalid table 'type' attribute value, only 'source' is supported"); + "Invalid table 'type' attribute value, only 'source' or 'sink' is supported"); + } + if (this.tables.containsKey(tableName)) { + throw new SqlClientException("Duplicate table name '" + tableName + "'."); --- End diff -- the current implementation allow only source and sink in one table. > Create unified interfaces to configure and instatiate TableSinks > ---------------------------------------------------------------- > > Key: FLINK-8866 > URL: https://issues.apache.org/jira/browse/FLINK-8866 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL > Reporter: Timo Walther > Assignee: Shuyi Chen > Priority: Major > Labels: pull-request-available > > Similar to the efforts done in FLINK-8240. We need unified ways to configure > and instantiate TableSinks. Among other applications, this is necessary in > order to declare table sinks in an environment file of the SQL client. Such > that the sink can be used for {{INSERT INTO}} statements. > Below are a few major changes in mind. > 1) Add TableSinkFactory/TableSinkFactoryService similar to > TableSourceFactory/TableSourceFactoryService > 2) Add a common property called "type" with values (source, sink and both) > for both TableSource and TableSink. > 3) in yaml file, replace "sources" with "tables", and use tableType to > identify whether it's source or sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)