Carl-Zhou-CN commented on code in PR #7502: URL: https://github.com/apache/seatunnel/pull/7502#discussion_r1739610800
########## seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSource.java: ########## @@ -58,28 +59,51 @@ public class ElasticsearchSource SupportParallelism, SupportColumnProjection { - private final ReadonlyConfig config; + private final List<SourceConfig> sourceConfigList; + private final ReadonlyConfig connectionConfig; - private CatalogTable catalogTable; + public ElasticsearchSource(ReadonlyConfig config) { + this.connectionConfig = config; + if (config.getOptional(SourceConfig.INDEX_LIST).isPresent()) { + this.sourceConfigList = createMultiSource(config); + } else { + this.sourceConfigList = Collections.singletonList(parseOneIndexQueryConfig(config)); + } + } - private List<String> source; + private List<SourceConfig> createMultiSource(ReadonlyConfig config) { + List<Map<String, Object>> configMaps = config.get(SourceConfig.INDEX_LIST); + List<ReadonlyConfig> configList = + configMaps.stream().map(ReadonlyConfig::fromMap).collect(Collectors.toList()); + List<SourceConfig> sourceConfigList = new ArrayList<>(configList.size()); + for (ReadonlyConfig readonlyConfig : configList) { + SourceConfig sourceConfig = parseOneIndexQueryConfig(readonlyConfig); + sourceConfigList.add(sourceConfig); + } + return sourceConfigList; + } - private Map<String, String> arrayColumn; + private SourceConfig parseOneIndexQueryConfig(ReadonlyConfig readonlyConfig) { - public ElasticsearchSource(ReadonlyConfig config) { - this.config = config; - if (config.getOptional(TableSchemaOptions.SCHEMA).isPresent()) { + Map<String, Object> query = readonlyConfig.get(SourceConfig.QUERY); + String index = readonlyConfig.get(SourceConfig.INDEX); + + CatalogTable catalogTable; + List<String> source; + Map<String, String> arrayColumn; + + if (readonlyConfig.getOptional(TableSchemaOptions.SCHEMA).isPresent()) { // todo: We need to remove the schema in ES. log.warn( - "The schema config in ElasticSearch sink is deprecated, please use source config instead!"); - catalogTable = CatalogTableUtil.buildWithConfig(config); + "The schema config in ElasticSearch source/sink is deprecated, please use source config instead!"); + catalogTable = CatalogTableUtil.buildWithConfig(readonlyConfig); source = Arrays.asList(catalogTable.getSeaTunnelRowType().getFieldNames()); } else { - source = config.get(SourceConfig.SOURCE); - arrayColumn = config.get(SourceConfig.ARRAY_COLUMN); - EsRestClient esRestClient = EsRestClient.createInstance(config); + source = readonlyConfig.get(SourceConfig.SOURCE); + arrayColumn = readonlyConfig.get(SourceConfig.ARRAY_COLUMN); + EsRestClient esRestClient = EsRestClient.createInstance(connectionConfig); Map<String, BasicTypeDefine<EsType>> esFieldType = - esRestClient.getFieldTypeMapping(config.get(SourceConfig.INDEX), source); + esRestClient.getFieldTypeMapping(index, source); esRestClient.close(); Review Comment: Is there a risk that the connection is not closed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@seatunnel.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org