Github user suez1224 commented on a diff in the pull request: https://github.com/apache/flink/pull/6201#discussion_r200805902 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java --- @@ -56,23 +58,44 @@ public Environment() { return tables; } + private static TableDescriptor create(String name, Map<String, Object> config) { + if (!config.containsKey(TableDescriptorValidator.TABLE_TYPE())) { + throw new SqlClientException("The 'type' attribute of a table is missing."); + } + final String tableType = (String) config.get(TableDescriptorValidator.TABLE_TYPE()); + if (tableType.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE())) { + return new Source(name, ConfigUtil.normalizeYaml(config)); + } else if (tableType.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SINK())) { + return new Sink(name, ConfigUtil.normalizeYaml(config)); + } else if (tableType.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE_SINK())) { + return new SourceSink(name, ConfigUtil.normalizeYaml(config)); + } + return null; + } + public void setTables(List<Map<String, Object>> tables) { this.tables = new HashMap<>(tables.size()); tables.forEach(config -> { - if (!config.containsKey(TableDescriptorValidator.TABLE_TYPE())) { - throw new SqlClientException("The 'type' attribute of a table is missing."); + if (!config.containsKey(NAME)) { + throw new SqlClientException("The 'name' attribute of a table is missing."); } - if (config.get(TableDescriptorValidator.TABLE_TYPE()).equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE())) { - config.remove(TableDescriptorValidator.TABLE_TYPE()); - final Source s = Source.create(config); - if (this.tables.containsKey(s.getName())) { - throw new SqlClientException("Duplicate source name '" + s + "'."); - } - this.tables.put(s.getName(), s); - } else { + final Object name = config.get(NAME); + if (name == null || !(name instanceof String) || ((String) name).length() <= 0) { + throw new SqlClientException("Invalid table name '" + name + "'."); + } + final String tableName = (String) name; + final Map<String, Object> properties = new HashMap<>(config); + properties.remove(NAME); + + TableDescriptor tableDescriptor = create(tableName, properties); + if (null == tableDescriptor) { throw new SqlClientException( - "Invalid table 'type' attribute value, only 'source' is supported"); + "Invalid table 'type' attribute value, only 'source' or 'sink' is supported"); + } + if (this.tables.containsKey(tableName)) { + throw new SqlClientException("Duplicate table name '" + tableName + "'."); --- End diff -- the current implementation allow only source and sink in one table.
---