Carl-Zhou-CN commented on code in PR #7502:
URL: https://github.com/apache/seatunnel/pull/7502#discussion_r1739610800


##########
seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSource.java:
##########
@@ -58,28 +59,51 @@ public class ElasticsearchSource
                 SupportParallelism,
                 SupportColumnProjection {
 
-    private final ReadonlyConfig config;
+    private final List<SourceConfig> sourceConfigList;
+    private final ReadonlyConfig connectionConfig;
 
-    private CatalogTable catalogTable;
+    public ElasticsearchSource(ReadonlyConfig config) {
+        this.connectionConfig = config;
+        if (config.getOptional(SourceConfig.INDEX_LIST).isPresent()) {
+            this.sourceConfigList = createMultiSource(config);
+        } else {
+            this.sourceConfigList = 
Collections.singletonList(parseOneIndexQueryConfig(config));
+        }
+    }
 
-    private List<String> source;
+    private List<SourceConfig> createMultiSource(ReadonlyConfig config) {
+        List<Map<String, Object>> configMaps = 
config.get(SourceConfig.INDEX_LIST);
+        List<ReadonlyConfig> configList =
+                
configMaps.stream().map(ReadonlyConfig::fromMap).collect(Collectors.toList());
+        List<SourceConfig> sourceConfigList = new 
ArrayList<>(configList.size());
+        for (ReadonlyConfig readonlyConfig : configList) {
+            SourceConfig sourceConfig = 
parseOneIndexQueryConfig(readonlyConfig);
+            sourceConfigList.add(sourceConfig);
+        }
+        return sourceConfigList;
+    }
 
-    private Map<String, String> arrayColumn;
+    private SourceConfig parseOneIndexQueryConfig(ReadonlyConfig 
readonlyConfig) {
 
-    public ElasticsearchSource(ReadonlyConfig config) {
-        this.config = config;
-        if (config.getOptional(TableSchemaOptions.SCHEMA).isPresent()) {
+        Map<String, Object> query = readonlyConfig.get(SourceConfig.QUERY);
+        String index = readonlyConfig.get(SourceConfig.INDEX);
+
+        CatalogTable catalogTable;
+        List<String> source;
+        Map<String, String> arrayColumn;
+
+        if (readonlyConfig.getOptional(TableSchemaOptions.SCHEMA).isPresent()) 
{
             // todo: We need to remove the schema in ES.
             log.warn(
-                    "The schema config in ElasticSearch sink is deprecated, 
please use source config instead!");
-            catalogTable = CatalogTableUtil.buildWithConfig(config);
+                    "The schema config in ElasticSearch source/sink is 
deprecated, please use source config instead!");
+            catalogTable = CatalogTableUtil.buildWithConfig(readonlyConfig);
             source = 
Arrays.asList(catalogTable.getSeaTunnelRowType().getFieldNames());
         } else {
-            source = config.get(SourceConfig.SOURCE);
-            arrayColumn = config.get(SourceConfig.ARRAY_COLUMN);
-            EsRestClient esRestClient = EsRestClient.createInstance(config);
+            source = readonlyConfig.get(SourceConfig.SOURCE);
+            arrayColumn = readonlyConfig.get(SourceConfig.ARRAY_COLUMN);
+            EsRestClient esRestClient = 
EsRestClient.createInstance(connectionConfig);
             Map<String, BasicTypeDefine<EsType>> esFieldType =
-                    
esRestClient.getFieldTypeMapping(config.get(SourceConfig.INDEX), source);
+                    esRestClient.getFieldTypeMapping(index, source);
             esRestClient.close();

Review Comment:
   Is there a risk that the connection is not closed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to