This is an automated email from the ASF dual-hosted git repository. wanghailin pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new 45653e1d22 [Feature][Elasticsearch] Support multi-table sink write #7041 (#7052) 45653e1d22 is described below commit 45653e1d2259b21efbf49fd38e20f38080608a97 Author: CosmosNi <40288034+cosmo...@users.noreply.github.com> AuthorDate: Mon Jun 24 21:38:41 2024 +0800 [Feature][Elasticsearch] Support multi-table sink write #7041 (#7052) --- .../elasticsearch/sink/ElasticsearchSink.java | 2 + .../sink/ElasticsearchSinkFactory.java | 55 +++++++++++-- .../sink/ElasticsearchSinkWriter.java | 8 +- .../connector/elasticsearch/ElasticsearchIT.java | 78 +++++++++++++++++- .../fakesource_to_elasticsearch_multi_sink.conf | 95 ++++++++++++++++++++++ 5 files changed, 226 insertions(+), 12 deletions(-) diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java index d2ca6045eb..6325a14e99 100644 --- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java @@ -24,6 +24,7 @@ import org.apache.seatunnel.api.sink.SaveModeHandler; import org.apache.seatunnel.api.sink.SchemaSaveMode; import org.apache.seatunnel.api.sink.SeaTunnelSink; import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.sink.SupportMultiTableSink; import org.apache.seatunnel.api.sink.SupportSaveMode; import org.apache.seatunnel.api.table.catalog.Catalog; import org.apache.seatunnel.api.table.catalog.CatalogTable; @@ -47,6 +48,7 @@ public class ElasticsearchSink ElasticsearchSinkState, ElasticsearchCommitInfo, ElasticsearchAggregatedCommitInfo>, + SupportMultiTableSink, SupportSaveMode { private ReadonlyConfig config; diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkFactory.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkFactory.java index ad2c01e47e..63770dd1d7 100644 --- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkFactory.java +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkFactory.java @@ -17,7 +17,9 @@ package org.apache.seatunnel.connectors.seatunnel.elasticsearch.sink; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.sink.SinkReplaceNameConstant; import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.catalog.TableIdentifier; import org.apache.seatunnel.api.table.connector.TableSink; @@ -28,6 +30,9 @@ import org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig import com.google.auto.service.AutoService; +import java.util.HashMap; +import java.util.Map; + import static org.apache.seatunnel.api.sink.SinkReplaceNameConstant.REPLACE_TABLE_NAME_KEY; import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.HOSTS; import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.PASSWORD; @@ -75,11 +80,14 @@ public class ElasticsearchSinkFactory implements TableSinkFactory { @Override public TableSink createSink(TableSinkFactoryContext context) { - String original = context.getOptions().get(INDEX); - original = - original.replace( - REPLACE_TABLE_NAME_KEY, - context.getCatalogTable().getTableId().getTableName()); + ReadonlyConfig readonlyConfig = context.getOptions(); + CatalogTable catalogTable = context.getCatalogTable(); + + ReadonlyConfig finalReadonlyConfig = + generateCurrentReadonlyConfig(readonlyConfig, catalogTable); + + String original = finalReadonlyConfig.get(INDEX); + CatalogTable newTable = CatalogTable.of( TableIdentifier.of( @@ -87,6 +95,41 @@ public class ElasticsearchSinkFactory implements TableSinkFactory { context.getCatalogTable().getTablePath().getDatabaseName(), original), context.getCatalogTable()); - return () -> new ElasticsearchSink(context.getOptions(), newTable); + return () -> new ElasticsearchSink(finalReadonlyConfig, newTable); + } + + private ReadonlyConfig generateCurrentReadonlyConfig( + ReadonlyConfig readonlyConfig, CatalogTable catalogTable) { + + Map<String, String> configMap = readonlyConfig.toMap(); + + readonlyConfig + .getOptional(INDEX) + .ifPresent( + tableName -> { + String replacedPath = + replaceCatalogTableInPath(tableName, catalogTable); + configMap.put(INDEX.key(), replacedPath); + }); + + return ReadonlyConfig.fromMap(new HashMap<>(configMap)); + } + + private String replaceCatalogTableInPath(String originTableName, CatalogTable catalogTable) { + String tableName = originTableName; + TableIdentifier tableIdentifier = catalogTable.getTableId(); + if (tableIdentifier != null) { + if (tableIdentifier.getSchemaName() != null) { + tableName = + tableName.replace( + SinkReplaceNameConstant.REPLACE_SCHEMA_NAME_KEY, + tableIdentifier.getSchemaName()); + } + if (tableIdentifier.getTableName() != null) { + tableName = + tableName.replace(REPLACE_TABLE_NAME_KEY, tableIdentifier.getTableName()); + } + } + return tableName; } } diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java index 6edac760c1..8cb054ca0a 100644 --- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java @@ -19,6 +19,7 @@ package org.apache.seatunnel.connectors.seatunnel.elasticsearch.sink; import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter; import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.type.RowKind; import org.apache.seatunnel.api.table.type.SeaTunnelRow; @@ -47,9 +48,10 @@ import java.util.Optional; */ @Slf4j public class ElasticsearchSinkWriter - implements SinkWriter<SeaTunnelRow, ElasticsearchCommitInfo, ElasticsearchSinkState> { + implements SinkWriter<SeaTunnelRow, ElasticsearchCommitInfo, ElasticsearchSinkState>, + SupportMultiTableSinkWriter<Void> { - private final SinkWriter.Context context; + private final Context context; private final int maxBatchSize; @@ -60,7 +62,7 @@ public class ElasticsearchSinkWriter private static final long DEFAULT_SLEEP_TIME_MS = 200L; public ElasticsearchSinkWriter( - SinkWriter.Context context, + Context context, CatalogTable catalogTable, ReadonlyConfig config, int maxBatchSize, diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java index b754ea425a..3180f386b2 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java @@ -32,7 +32,9 @@ import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.BulkResponse; import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.ScrollResult; import org.apache.seatunnel.e2e.common.TestResource; import org.apache.seatunnel.e2e.common.TestSuiteBase; +import org.apache.seatunnel.e2e.common.container.EngineType; import org.apache.seatunnel.e2e.common.container.TestContainer; +import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; import org.apache.seatunnel.e2e.common.util.ContainerUtil; import org.apache.commons.io.IOUtils; @@ -50,6 +52,7 @@ import org.testcontainers.utility.DockerImageName; import org.testcontainers.utility.DockerLoggerFactory; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import lombok.extern.slf4j.Slf4j; import java.io.IOException; @@ -176,11 +179,54 @@ public class ElasticsearchIT extends TestSuiteBase implements TestResource { Container.ExecResult execResult = container.executeJob("/elasticsearch/elasticsearch_source_and_sink.conf"); Assertions.assertEquals(0, execResult.getExitCode()); - List<String> sinkData = readSinkData(); + List<String> sinkData = readSinkData("st_index2"); // for DSL is: {"range":{"c_int":{"gte":10,"lte":20}}} Assertions.assertIterableEquals(mapTestDatasetForDSL(), sinkData); } + @DisabledOnContainer( + value = {}, + type = {EngineType.SPARK, EngineType.FLINK}, + disabledReason = "Currently SPARK/FLINK do not support multiple table read") + @TestTemplate + public void testElasticsearchWithMultiSink(TestContainer container) + throws IOException, InterruptedException { + Container.ExecResult execResult = + container.executeJob("/elasticsearch/fakesource_to_elasticsearch_multi_sink.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + List<String> source5 = + Lists.newArrayList( + "id", + "c_bool", + "c_tinyint", + "c_smallint", + "c_int", + "c_bigint", + "c_float", + "c_double", + "c_decimal", + "c_string"); + List<String> source6 = + Lists.newArrayList( + "id", + "c_bool", + "c_tinyint", + "c_smallint", + "c_int", + "c_bigint", + "c_float", + "c_double", + "c_decimal"); + List<String> sinkIndexData5 = readMultiSinkData("st_index5", source5); + List<String> sinkIndexData6 = readMultiSinkData("st_index6", source6); + String stIndex5 = + "{\"c_smallint\":2,\"c_string\":\"NEW\",\"c_float\":4.3,\"c_double\":5.3,\"c_decimal\":6.3,\"id\":1,\"c_int\":3,\"c_bigint\":4,\"c_bool\":true,\"c_tinyint\":1}"; + String stIndex6 = + "{\"c_smallint\":2,\"c_float\":4.3,\"c_double\":5.3,\"c_decimal\":6.3,\"id\":1,\"c_int\":3,\"c_bigint\":4,\"c_bool\":true,\"c_tinyint\":1}"; + Assertions.assertIterableEquals(Lists.newArrayList(stIndex5), sinkIndexData5); + Assertions.assertIterableEquals(Lists.newArrayList(stIndex6), sinkIndexData6); + } + @TestTemplate public void testElasticsearchWithFullType(TestContainer container) throws IOException, InterruptedException { @@ -262,7 +308,7 @@ public class ElasticsearchIT extends TestSuiteBase implements TestResource { return getDocsWithTransformDate(source, "st_index4"); } - private List<String> readSinkData() throws InterruptedException { + private List<String> readSinkData(String index) throws InterruptedException { // wait for index refresh Thread.sleep(2000); List<String> source = @@ -281,7 +327,33 @@ public class ElasticsearchIT extends TestSuiteBase implements TestResource { "c_int", "c_date", "c_timestamp"); - return getDocsWithTransformTimestamp(source, "st_index2"); + return getDocsWithTransformTimestamp(source, index); + } + + private List<String> readMultiSinkData(String index, List<String> source) + throws InterruptedException { + // wait for index refresh + Thread.sleep(2000); + Map<String, Object> query = new HashMap<>(); + query.put("match_all", Maps.newHashMap()); + + ScrollResult scrollResult = esRestClient.searchByScroll(index, source, query, "1m", 1000); + scrollResult + .getDocs() + .forEach( + x -> { + x.remove("_index"); + x.remove("_type"); + x.remove("_id"); + }); + List<String> docs = + scrollResult.getDocs().stream() + .sorted( + Comparator.comparingInt( + o -> Integer.valueOf(o.get("c_int").toString()))) + .map(JsonUtils::toJsonString) + .collect(Collectors.toList()); + return docs; } private List<String> getDocsWithTransformTimestamp(List<String> source, String index) { diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/fakesource_to_elasticsearch_multi_sink.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/fakesource_to_elasticsearch_multi_sink.conf new file mode 100644 index 0000000000..cbb28545f2 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/fakesource_to_elasticsearch_multi_sink.conf @@ -0,0 +1,95 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + parallelism = 1 + job.mode = "BATCH" + #checkpoint.interval = 10000 +} + +source { + FakeSource { + tables_configs = [ + { + schema = { + table = "st_index5" + fields { + id = int + c_bool = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_decimal = "decimal(16, 1)" + c_string = string + } + } + rows = [ + { + kind = INSERT + fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW"] + } + ] + }, + { + schema = { + table = "st_index6" + fields { + id = int + c_bool = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_decimal = "decimal(16, 1)" + } + } + rows = [ + { + kind = INSERT + fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3] + } + ] + } + ] + } +} +transform { +} + +sink { + Elasticsearch { + hosts = ["https://elasticsearch:9200"] + username = "elastic" + password = "elasticsearch" + tls_verify_certificate = false + tls_verify_hostname = false + + index = "${table_name}" + index_type = "st" + "schema_save_mode"="CREATE_SCHEMA_WHEN_NOT_EXIST" + "data_save_mode"="APPEND_DATA" + } +}