FuYouJ commented on code in PR #7502: URL: https://github.com/apache/seatunnel/pull/7502#discussion_r1739630810
########## seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSource.java: ########## @@ -58,28 +59,51 @@ public class ElasticsearchSource SupportParallelism, SupportColumnProjection { - private final ReadonlyConfig config; + private final List<SourceConfig> sourceConfigList; + private final ReadonlyConfig connectionConfig; - private CatalogTable catalogTable; + public ElasticsearchSource(ReadonlyConfig config) { + this.connectionConfig = config; + if (config.getOptional(SourceConfig.INDEX_LIST).isPresent()) { + this.sourceConfigList = createMultiSource(config); + } else { + this.sourceConfigList = Collections.singletonList(parseOneIndexQueryConfig(config)); + } + } - private List<String> source; + private List<SourceConfig> createMultiSource(ReadonlyConfig config) { + List<Map<String, Object>> configMaps = config.get(SourceConfig.INDEX_LIST); + List<ReadonlyConfig> configList = + configMaps.stream().map(ReadonlyConfig::fromMap).collect(Collectors.toList()); + List<SourceConfig> sourceConfigList = new ArrayList<>(configList.size()); + for (ReadonlyConfig readonlyConfig : configList) { + SourceConfig sourceConfig = parseOneIndexQueryConfig(readonlyConfig); + sourceConfigList.add(sourceConfig); + } + return sourceConfigList; + } - private Map<String, String> arrayColumn; + private SourceConfig parseOneIndexQueryConfig(ReadonlyConfig readonlyConfig) { - public ElasticsearchSource(ReadonlyConfig config) { - this.config = config; - if (config.getOptional(TableSchemaOptions.SCHEMA).isPresent()) { + Map<String, Object> query = readonlyConfig.get(SourceConfig.QUERY); + String index = readonlyConfig.get(SourceConfig.INDEX); + + CatalogTable catalogTable; + List<String> source; + Map<String, String> arrayColumn; + + if (readonlyConfig.getOptional(TableSchemaOptions.SCHEMA).isPresent()) { // todo: We need to remove the schema in ES. log.warn( - "The schema config in ElasticSearch sink is deprecated, please use source config instead!"); - catalogTable = CatalogTableUtil.buildWithConfig(config); + "The schema config in ElasticSearch source/sink is deprecated, please use source config instead!"); + catalogTable = CatalogTableUtil.buildWithConfig(readonlyConfig); source = Arrays.asList(catalogTable.getSeaTunnelRowType().getFieldNames()); } else { - source = config.get(SourceConfig.SOURCE); - arrayColumn = config.get(SourceConfig.ARRAY_COLUMN); - EsRestClient esRestClient = EsRestClient.createInstance(config); + source = readonlyConfig.get(SourceConfig.SOURCE); + arrayColumn = readonlyConfig.get(SourceConfig.ARRAY_COLUMN); + EsRestClient esRestClient = EsRestClient.createInstance(connectionConfig); Map<String, BasicTypeDefine<EsType>> esFieldType = - esRestClient.getFieldTypeMapping(config.get(SourceConfig.INDEX), source); + esRestClient.getFieldTypeMapping(index, source); esRestClient.close(); Review Comment: I implemented Closeable for EsRestClient and extracted the code into a separate method to ensure that resources are closed even if an exception occurs ``` private Map<String, BasicTypeDefine<EsType>> getFieldTypeMapping( String index, List<String> source) { // EsRestClient#getFieldTypeMapping may throw runtime exception // so here we use try-resources-finally to close the resource try (EsRestClient esRestClient = EsRestClient.createInstance(connectionConfig)) { return esRestClient.getFieldTypeMapping(index, source); } } ``` ########## seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java: ########## @@ -394,6 +434,12 @@ private List<String> getDocsWithTransformTimestamp(List<String> source, String i } private List<String> getDocsWithTransformDate(List<String> source, String index) { + return getDocsWithTransformDate(source, index, Collections.emptyList()); + } + + // Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@seatunnel.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org