This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 45653e1d22 [Feature][Elasticsearch] Support multi-table sink write 
#7041 (#7052)
45653e1d22 is described below

commit 45653e1d2259b21efbf49fd38e20f38080608a97
Author: CosmosNi <40288034+cosmo...@users.noreply.github.com>
AuthorDate: Mon Jun 24 21:38:41 2024 +0800

    [Feature][Elasticsearch] Support multi-table sink write #7041 (#7052)
---
 .../elasticsearch/sink/ElasticsearchSink.java      |  2 +
 .../sink/ElasticsearchSinkFactory.java             | 55 +++++++++++--
 .../sink/ElasticsearchSinkWriter.java              |  8 +-
 .../connector/elasticsearch/ElasticsearchIT.java   | 78 +++++++++++++++++-
 .../fakesource_to_elasticsearch_multi_sink.conf    | 95 ++++++++++++++++++++++
 5 files changed, 226 insertions(+), 12 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java
index d2ca6045eb..6325a14e99 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java
@@ -24,6 +24,7 @@ import org.apache.seatunnel.api.sink.SaveModeHandler;
 import org.apache.seatunnel.api.sink.SchemaSaveMode;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.sink.SupportMultiTableSink;
 import org.apache.seatunnel.api.sink.SupportSaveMode;
 import org.apache.seatunnel.api.table.catalog.Catalog;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
@@ -47,6 +48,7 @@ public class ElasticsearchSink
                         ElasticsearchSinkState,
                         ElasticsearchCommitInfo,
                         ElasticsearchAggregatedCommitInfo>,
+                SupportMultiTableSink,
                 SupportSaveMode {
 
     private ReadonlyConfig config;
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkFactory.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkFactory.java
index ad2c01e47e..63770dd1d7 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkFactory.java
@@ -17,7 +17,9 @@
 
 package org.apache.seatunnel.connectors.seatunnel.elasticsearch.sink;
 
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.sink.SinkReplaceNameConstant;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.TableIdentifier;
 import org.apache.seatunnel.api.table.connector.TableSink;
@@ -28,6 +30,9 @@ import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig
 
 import com.google.auto.service.AutoService;
 
+import java.util.HashMap;
+import java.util.Map;
+
 import static 
org.apache.seatunnel.api.sink.SinkReplaceNameConstant.REPLACE_TABLE_NAME_KEY;
 import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.HOSTS;
 import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.PASSWORD;
@@ -75,11 +80,14 @@ public class ElasticsearchSinkFactory implements 
TableSinkFactory {
 
     @Override
     public TableSink createSink(TableSinkFactoryContext context) {
-        String original = context.getOptions().get(INDEX);
-        original =
-                original.replace(
-                        REPLACE_TABLE_NAME_KEY,
-                        context.getCatalogTable().getTableId().getTableName());
+        ReadonlyConfig readonlyConfig = context.getOptions();
+        CatalogTable catalogTable = context.getCatalogTable();
+
+        ReadonlyConfig finalReadonlyConfig =
+                generateCurrentReadonlyConfig(readonlyConfig, catalogTable);
+
+        String original = finalReadonlyConfig.get(INDEX);
+
         CatalogTable newTable =
                 CatalogTable.of(
                         TableIdentifier.of(
@@ -87,6 +95,41 @@ public class ElasticsearchSinkFactory implements 
TableSinkFactory {
                                 
context.getCatalogTable().getTablePath().getDatabaseName(),
                                 original),
                         context.getCatalogTable());
-        return () -> new ElasticsearchSink(context.getOptions(), newTable);
+        return () -> new ElasticsearchSink(finalReadonlyConfig, newTable);
+    }
+
+    private ReadonlyConfig generateCurrentReadonlyConfig(
+            ReadonlyConfig readonlyConfig, CatalogTable catalogTable) {
+
+        Map<String, String> configMap = readonlyConfig.toMap();
+
+        readonlyConfig
+                .getOptional(INDEX)
+                .ifPresent(
+                        tableName -> {
+                            String replacedPath =
+                                    replaceCatalogTableInPath(tableName, 
catalogTable);
+                            configMap.put(INDEX.key(), replacedPath);
+                        });
+
+        return ReadonlyConfig.fromMap(new HashMap<>(configMap));
+    }
+
+    private String replaceCatalogTableInPath(String originTableName, 
CatalogTable catalogTable) {
+        String tableName = originTableName;
+        TableIdentifier tableIdentifier = catalogTable.getTableId();
+        if (tableIdentifier != null) {
+            if (tableIdentifier.getSchemaName() != null) {
+                tableName =
+                        tableName.replace(
+                                
SinkReplaceNameConstant.REPLACE_SCHEMA_NAME_KEY,
+                                tableIdentifier.getSchemaName());
+            }
+            if (tableIdentifier.getTableName() != null) {
+                tableName =
+                        tableName.replace(REPLACE_TABLE_NAME_KEY, 
tableIdentifier.getTableName());
+            }
+        }
+        return tableName;
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
index 6edac760c1..8cb054ca0a 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
@@ -19,6 +19,7 @@ package 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.sink;
 
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.RowKind;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -47,9 +48,10 @@ import java.util.Optional;
  */
 @Slf4j
 public class ElasticsearchSinkWriter
-        implements SinkWriter<SeaTunnelRow, ElasticsearchCommitInfo, 
ElasticsearchSinkState> {
+        implements SinkWriter<SeaTunnelRow, ElasticsearchCommitInfo, 
ElasticsearchSinkState>,
+                SupportMultiTableSinkWriter<Void> {
 
-    private final SinkWriter.Context context;
+    private final Context context;
 
     private final int maxBatchSize;
 
@@ -60,7 +62,7 @@ public class ElasticsearchSinkWriter
     private static final long DEFAULT_SLEEP_TIME_MS = 200L;
 
     public ElasticsearchSinkWriter(
-            SinkWriter.Context context,
+            Context context,
             CatalogTable catalogTable,
             ReadonlyConfig config,
             int maxBatchSize,
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
index b754ea425a..3180f386b2 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
@@ -32,7 +32,9 @@ import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.BulkResponse;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.ScrollResult;
 import org.apache.seatunnel.e2e.common.TestResource;
 import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.EngineType;
 import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
 import org.apache.seatunnel.e2e.common.util.ContainerUtil;
 
 import org.apache.commons.io.IOUtils;
@@ -50,6 +52,7 @@ import org.testcontainers.utility.DockerImageName;
 import org.testcontainers.utility.DockerLoggerFactory;
 
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
@@ -176,11 +179,54 @@ public class ElasticsearchIT extends TestSuiteBase 
implements TestResource {
         Container.ExecResult execResult =
                 
container.executeJob("/elasticsearch/elasticsearch_source_and_sink.conf");
         Assertions.assertEquals(0, execResult.getExitCode());
-        List<String> sinkData = readSinkData();
+        List<String> sinkData = readSinkData("st_index2");
         // for DSL is: {"range":{"c_int":{"gte":10,"lte":20}}}
         Assertions.assertIterableEquals(mapTestDatasetForDSL(), sinkData);
     }
 
+    @DisabledOnContainer(
+            value = {},
+            type = {EngineType.SPARK, EngineType.FLINK},
+            disabledReason = "Currently SPARK/FLINK do not support multiple 
table read")
+    @TestTemplate
+    public void testElasticsearchWithMultiSink(TestContainer container)
+            throws IOException, InterruptedException {
+        Container.ExecResult execResult =
+                
container.executeJob("/elasticsearch/fakesource_to_elasticsearch_multi_sink.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        List<String> source5 =
+                Lists.newArrayList(
+                        "id",
+                        "c_bool",
+                        "c_tinyint",
+                        "c_smallint",
+                        "c_int",
+                        "c_bigint",
+                        "c_float",
+                        "c_double",
+                        "c_decimal",
+                        "c_string");
+        List<String> source6 =
+                Lists.newArrayList(
+                        "id",
+                        "c_bool",
+                        "c_tinyint",
+                        "c_smallint",
+                        "c_int",
+                        "c_bigint",
+                        "c_float",
+                        "c_double",
+                        "c_decimal");
+        List<String> sinkIndexData5 = readMultiSinkData("st_index5", source5);
+        List<String> sinkIndexData6 = readMultiSinkData("st_index6", source6);
+        String stIndex5 =
+                
"{\"c_smallint\":2,\"c_string\":\"NEW\",\"c_float\":4.3,\"c_double\":5.3,\"c_decimal\":6.3,\"id\":1,\"c_int\":3,\"c_bigint\":4,\"c_bool\":true,\"c_tinyint\":1}";
+        String stIndex6 =
+                
"{\"c_smallint\":2,\"c_float\":4.3,\"c_double\":5.3,\"c_decimal\":6.3,\"id\":1,\"c_int\":3,\"c_bigint\":4,\"c_bool\":true,\"c_tinyint\":1}";
+        Assertions.assertIterableEquals(Lists.newArrayList(stIndex5), 
sinkIndexData5);
+        Assertions.assertIterableEquals(Lists.newArrayList(stIndex6), 
sinkIndexData6);
+    }
+
     @TestTemplate
     public void testElasticsearchWithFullType(TestContainer container)
             throws IOException, InterruptedException {
@@ -262,7 +308,7 @@ public class ElasticsearchIT extends TestSuiteBase 
implements TestResource {
         return getDocsWithTransformDate(source, "st_index4");
     }
 
-    private List<String> readSinkData() throws InterruptedException {
+    private List<String> readSinkData(String index) throws 
InterruptedException {
         // wait for index refresh
         Thread.sleep(2000);
         List<String> source =
@@ -281,7 +327,33 @@ public class ElasticsearchIT extends TestSuiteBase 
implements TestResource {
                         "c_int",
                         "c_date",
                         "c_timestamp");
-        return getDocsWithTransformTimestamp(source, "st_index2");
+        return getDocsWithTransformTimestamp(source, index);
+    }
+
+    private List<String> readMultiSinkData(String index, List<String> source)
+            throws InterruptedException {
+        // wait for index refresh
+        Thread.sleep(2000);
+        Map<String, Object> query = new HashMap<>();
+        query.put("match_all", Maps.newHashMap());
+
+        ScrollResult scrollResult = esRestClient.searchByScroll(index, source, 
query, "1m", 1000);
+        scrollResult
+                .getDocs()
+                .forEach(
+                        x -> {
+                            x.remove("_index");
+                            x.remove("_type");
+                            x.remove("_id");
+                        });
+        List<String> docs =
+                scrollResult.getDocs().stream()
+                        .sorted(
+                                Comparator.comparingInt(
+                                        o -> 
Integer.valueOf(o.get("c_int").toString())))
+                        .map(JsonUtils::toJsonString)
+                        .collect(Collectors.toList());
+        return docs;
     }
 
     private List<String> getDocsWithTransformTimestamp(List<String> source, 
String index) {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/fakesource_to_elasticsearch_multi_sink.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/fakesource_to_elasticsearch_multi_sink.conf
new file mode 100644
index 0000000000..cbb28545f2
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/fakesource_to_elasticsearch_multi_sink.conf
@@ -0,0 +1,95 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+  #checkpoint.interval = 10000
+}
+
+source {
+  FakeSource {
+    tables_configs = [
+       {
+        schema = {
+          table = "st_index5"
+         fields {
+                id = int
+                c_bool = boolean
+                c_tinyint = tinyint
+                c_smallint = smallint
+                c_int = int
+                c_bigint = bigint
+                c_float = float
+                c_double = double
+                c_decimal = "decimal(16, 1)"
+                c_string = string
+      }
+        }
+            rows = [
+              {
+                kind = INSERT
+                fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW"]
+              }
+              ]
+       },
+       {
+       schema = {
+         table = "st_index6"
+              fields {
+               id = int
+               c_bool = boolean
+               c_tinyint = tinyint
+               c_smallint = smallint
+               c_int = int
+               c_bigint = bigint
+               c_float = float
+               c_double = double
+               c_decimal = "decimal(16, 1)"
+              }
+       }
+           rows = [
+             {
+               kind = INSERT
+               fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3]
+             }
+             ]
+      }
+    ]
+  }
+}
+transform {
+}
+
+sink {
+  Elasticsearch {
+    hosts = ["https://elasticsearch:9200";]
+    username = "elastic"
+    password = "elasticsearch"
+    tls_verify_certificate = false
+    tls_verify_hostname = false
+
+    index = "${table_name}"
+    index_type = "st"
+    "schema_save_mode"="CREATE_SCHEMA_WHEN_NOT_EXIST"
+    "data_save_mode"="APPEND_DATA"
+  }
+}

Reply via email to