FuYouJ commented on code in PR #7502:
URL: https://github.com/apache/seatunnel/pull/7502#discussion_r1739630810


##########
seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSource.java:
##########
@@ -58,28 +59,51 @@ public class ElasticsearchSource
                 SupportParallelism,
                 SupportColumnProjection {
 
-    private final ReadonlyConfig config;
+    private final List<SourceConfig> sourceConfigList;
+    private final ReadonlyConfig connectionConfig;
 
-    private CatalogTable catalogTable;
+    public ElasticsearchSource(ReadonlyConfig config) {
+        this.connectionConfig = config;
+        if (config.getOptional(SourceConfig.INDEX_LIST).isPresent()) {
+            this.sourceConfigList = createMultiSource(config);
+        } else {
+            this.sourceConfigList = 
Collections.singletonList(parseOneIndexQueryConfig(config));
+        }
+    }
 
-    private List<String> source;
+    private List<SourceConfig> createMultiSource(ReadonlyConfig config) {
+        List<Map<String, Object>> configMaps = 
config.get(SourceConfig.INDEX_LIST);
+        List<ReadonlyConfig> configList =
+                
configMaps.stream().map(ReadonlyConfig::fromMap).collect(Collectors.toList());
+        List<SourceConfig> sourceConfigList = new 
ArrayList<>(configList.size());
+        for (ReadonlyConfig readonlyConfig : configList) {
+            SourceConfig sourceConfig = 
parseOneIndexQueryConfig(readonlyConfig);
+            sourceConfigList.add(sourceConfig);
+        }
+        return sourceConfigList;
+    }
 
-    private Map<String, String> arrayColumn;
+    private SourceConfig parseOneIndexQueryConfig(ReadonlyConfig 
readonlyConfig) {
 
-    public ElasticsearchSource(ReadonlyConfig config) {
-        this.config = config;
-        if (config.getOptional(TableSchemaOptions.SCHEMA).isPresent()) {
+        Map<String, Object> query = readonlyConfig.get(SourceConfig.QUERY);
+        String index = readonlyConfig.get(SourceConfig.INDEX);
+
+        CatalogTable catalogTable;
+        List<String> source;
+        Map<String, String> arrayColumn;
+
+        if (readonlyConfig.getOptional(TableSchemaOptions.SCHEMA).isPresent()) 
{
             // todo: We need to remove the schema in ES.
             log.warn(
-                    "The schema config in ElasticSearch sink is deprecated, 
please use source config instead!");
-            catalogTable = CatalogTableUtil.buildWithConfig(config);
+                    "The schema config in ElasticSearch source/sink is 
deprecated, please use source config instead!");
+            catalogTable = CatalogTableUtil.buildWithConfig(readonlyConfig);
             source = 
Arrays.asList(catalogTable.getSeaTunnelRowType().getFieldNames());
         } else {
-            source = config.get(SourceConfig.SOURCE);
-            arrayColumn = config.get(SourceConfig.ARRAY_COLUMN);
-            EsRestClient esRestClient = EsRestClient.createInstance(config);
+            source = readonlyConfig.get(SourceConfig.SOURCE);
+            arrayColumn = readonlyConfig.get(SourceConfig.ARRAY_COLUMN);
+            EsRestClient esRestClient = 
EsRestClient.createInstance(connectionConfig);
             Map<String, BasicTypeDefine<EsType>> esFieldType =
-                    
esRestClient.getFieldTypeMapping(config.get(SourceConfig.INDEX), source);
+                    esRestClient.getFieldTypeMapping(index, source);
             esRestClient.close();

Review Comment:
   I implemented Closeable for EsRestClient and extracted the code into a 
separate method to ensure that resources are closed even if an exception occurs
   ```
       private Map<String, BasicTypeDefine<EsType>> getFieldTypeMapping(
               String index, List<String> source) {
           // EsRestClient#getFieldTypeMapping may throw runtime exception
           // so here we use try-resources-finally to close the resource
           try (EsRestClient esRestClient = 
EsRestClient.createInstance(connectionConfig)) {
               return esRestClient.getFieldTypeMapping(index, source);
           }
       }
   ```



##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java:
##########
@@ -394,6 +434,12 @@ private List<String> 
getDocsWithTransformTimestamp(List<String> source, String i
     }
 
     private List<String> getDocsWithTransformDate(List<String> source, String 
index) {
+        return getDocsWithTransformDate(source, index, 
Collections.emptyList());
+    }
+
+    //

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to