This is an automated email from the ASF dual-hosted git repository. wanghailin pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new 476d492165 [Feature][Druid]Support multi table for druid sink (#7023) 476d492165 is described below commit 476d492165ab2747aa0bb1a5f8a6ed74bfe55f20 Author: TaoZex <45089228+tao...@users.noreply.github.com> AuthorDate: Fri Jun 21 10:06:02 2024 +0800 [Feature][Druid]Support multi table for druid sink (#7023) --- .../seatunnel/connectors/druid/sink/DruidSink.java | 4 +- .../connectors/druid/sink/DruidSinkFactory.java | 49 +++++++++- .../connectors/druid/sink/DruidWriter.java | 4 +- .../seatunnel/e2e/connector/druid/DruidIT.java | 105 +++++++++++++++------ .../resources/fakesource_to_druid_with_multi.conf | 83 ++++++++++++++++ 5 files changed, 211 insertions(+), 34 deletions(-) diff --git a/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidSink.java b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidSink.java index 318f3a1bd0..99758c76f3 100644 --- a/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidSink.java +++ b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidSink.java @@ -20,6 +20,7 @@ package org.apache.seatunnel.connectors.druid.sink; import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.sink.SupportMultiTableSink; import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; @@ -32,7 +33,8 @@ import static org.apache.seatunnel.connectors.druid.config.DruidConfig.BATCH_SIZ import static org.apache.seatunnel.connectors.druid.config.DruidConfig.COORDINATOR_URL; import static org.apache.seatunnel.connectors.druid.config.DruidConfig.DATASOURCE; -public class DruidSink extends AbstractSimpleSink<SeaTunnelRow, Void> { +public class DruidSink extends AbstractSimpleSink<SeaTunnelRow, Void> + implements SupportMultiTableSink { private ReadonlyConfig config; private CatalogTable catalogTable; diff --git a/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidSinkFactory.java b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidSinkFactory.java index 44e887810e..0f78ba0a58 100644 --- a/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidSinkFactory.java +++ b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidSinkFactory.java @@ -18,8 +18,11 @@ package org.apache.seatunnel.connectors.druid.sink; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.sink.SinkReplaceNameConstant; import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.TableIdentifier; import org.apache.seatunnel.api.table.connector.TableSink; import org.apache.seatunnel.api.table.factory.Factory; import org.apache.seatunnel.api.table.factory.TableSinkFactory; @@ -27,6 +30,9 @@ import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext; import com.google.auto.service.AutoService; +import java.util.HashMap; +import java.util.Map; + import static org.apache.seatunnel.connectors.druid.config.DruidConfig.COORDINATOR_URL; import static org.apache.seatunnel.connectors.druid.config.DruidConfig.DATASOURCE; @@ -44,7 +50,48 @@ public class DruidSinkFactory implements TableSinkFactory { @Override public TableSink createSink(TableSinkFactoryContext context) { + ReadonlyConfig readonlyConfig = context.getOptions(); CatalogTable catalogTable = context.getCatalogTable(); - return () -> new DruidSink(context.getOptions(), catalogTable); + + ReadonlyConfig finalReadonlyConfig = + generateCurrentReadonlyConfig(readonlyConfig, catalogTable); + return () -> new DruidSink(finalReadonlyConfig, catalogTable); + } + + private ReadonlyConfig generateCurrentReadonlyConfig( + ReadonlyConfig readonlyConfig, CatalogTable catalogTable) { + + Map<String, String> configMap = readonlyConfig.toMap(); + + readonlyConfig + .getOptional(DATASOURCE) + .ifPresent( + tableName -> { + String replacedPath = + replaceCatalogTableInPath(tableName, catalogTable); + configMap.put(DATASOURCE.key(), replacedPath); + }); + + return ReadonlyConfig.fromMap(new HashMap<>(configMap)); + } + + private String replaceCatalogTableInPath(String originTableName, CatalogTable catalogTable) { + String tableName = originTableName; + TableIdentifier tableIdentifier = catalogTable.getTableId(); + if (tableIdentifier != null) { + if (tableIdentifier.getSchemaName() != null) { + tableName = + tableName.replace( + SinkReplaceNameConstant.REPLACE_SCHEMA_NAME_KEY, + tableIdentifier.getSchemaName()); + } + if (tableIdentifier.getTableName() != null) { + tableName = + tableName.replace( + SinkReplaceNameConstant.REPLACE_TABLE_NAME_KEY, + tableIdentifier.getTableName()); + } + } + return tableName; } } diff --git a/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidWriter.java b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidWriter.java index 3f7709b51d..0ebb1c49ad 100644 --- a/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidWriter.java +++ b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidWriter.java @@ -18,6 +18,7 @@ package org.apache.seatunnel.connectors.druid.sink; +import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; @@ -65,7 +66,8 @@ import java.util.List; import java.util.StringJoiner; import java.util.stream.Collectors; -public class DruidWriter extends AbstractSinkWriter<SeaTunnelRow, Void> { +public class DruidWriter extends AbstractSinkWriter<SeaTunnelRow, Void> + implements SupportMultiTableSinkWriter<Void> { private static final Logger LOG = LoggerFactory.getLogger(DruidWriter.class); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-druid-e2e/src/test/java/org/apache/seatunnel/e2e/connector/druid/DruidIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-druid-e2e/src/test/java/org/apache/seatunnel/e2e/connector/druid/DruidIT.java index 1639636b85..21bf3c5d5c 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-druid-e2e/src/test/java/org/apache/seatunnel/e2e/connector/druid/DruidIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-druid-e2e/src/test/java/org/apache/seatunnel/e2e/connector/druid/DruidIT.java @@ -20,6 +20,7 @@ package org.apache.seatunnel.e2e.connector.druid; import org.apache.seatunnel.e2e.common.TestResource; import org.apache.seatunnel.e2e.common.TestSuiteBase; +import org.apache.seatunnel.e2e.common.container.EngineType; import org.apache.seatunnel.e2e.common.container.TestContainer; import org.apache.seatunnel.e2e.common.container.TestContainerId; import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; @@ -58,8 +59,11 @@ import java.util.stream.Collectors; disabledReason = "The RoaringBitmap version is not compatible in docker container") public class DruidIT extends TestSuiteBase implements TestResource { - private static final String datasource = "testDataSource"; - private static final String sqlQuery = "SELECT * FROM " + datasource; + private static final String DATASOURCE = "testDataSource"; + private static final String MULTI_DATASOURCE_1 = "druid_sink_1"; + private static final String MULTI_DATASOURCE_2 = "druid_sink_2"; + private static final String SQL_QUERY_TEMPLATE = "SELECT * FROM "; + private static final String CONF_PREFIX = "src/test/resources"; private static final String DRUID_SERVICE_NAME = "router"; private static final int DRUID_SERVICE_PORT = 8888; private DockerComposeContainer environment; @@ -76,7 +80,8 @@ public class DruidIT extends TestSuiteBase implements TestResource { Wait.forListeningPort() .withStartupTimeout(Duration.ofSeconds(360))); environment.start(); - changeCoordinatorURLConf(); + changeCoordinatorURLConf(CONF_PREFIX + "/fakesource_to_druid.conf"); + changeCoordinatorURLConf(CONF_PREFIX + "/fakesource_to_druid_with_multi.conf"); } @AfterAll @@ -90,39 +95,64 @@ public class DruidIT extends TestSuiteBase implements TestResource { Container.ExecResult execResult = container.executeJob("/fakesource_to_druid.conf"); Assertions.assertEquals(0, execResult.getExitCode()); while (true) { - try (CloseableHttpClient client = HttpClients.createDefault()) { - HttpPost request = new HttpPost("http://" + coordinatorURL + "/druid/v2/sql"); - String jsonRequest = "{\"query\": \"" + sqlQuery + "\"}"; - StringEntity entity = new StringEntity(jsonRequest); - entity.setContentType("application/json"); - request.setEntity(entity); - HttpResponse response = client.execute(request); - String responseBody = EntityUtils.toString(response.getEntity()); - String expectedDataRow1 = - "\"c_boolean\":\"true\",\"c_timestamp\":\"2020-02-02T02:02:02\",\"c_string\":\"NEW\",\"c_tinyint\":1,\"c_smallint\":2,\"c_int\":3,\"c_bigint\":4,\"c_float\":4.3,\"c_double\":5.3,\"c_decimal\":6.3"; - String expectedDataRow2 = - "\"c_boolean\":\"false\",\"c_timestamp\":\"2012-12-21T12:34:56\",\"c_string\":\"AAA\",\"c_tinyint\":1,\"c_smallint\":1,\"c_int\":333,\"c_bigint\":323232,\"c_float\":3.1,\"c_double\":9.33333,\"c_decimal\":99999.99999999"; - String expectedDataRow3 = - "\"c_boolean\":\"true\",\"c_timestamp\":\"2016-03-12T11:29:33\",\"c_string\":\"BBB\",\"c_tinyint\":1,\"c_smallint\":2,\"c_int\":672,\"c_bigint\":546782,\"c_float\":7.9,\"c_double\":6.88888,\"c_decimal\":88888.45623489"; - String expectedDataRow4 = - "\"c_boolean\":\"false\",\"c_timestamp\":\"2014-04-28T09:13:27\",\"c_string\":\"CCC\",\"c_tinyint\":1,\"c_smallint\":1,\"c_int\":271,\"c_bigint\":683221,\"c_float\":4.8,\"c_double\":4.45271,\"c_decimal\":79277.68219012"; - - if (!responseBody.contains("errorMessage")) { - // Check sink data - Assertions.assertEquals(responseBody.contains(expectedDataRow1), true); - Assertions.assertEquals(responseBody.contains(expectedDataRow2), true); - Assertions.assertEquals(responseBody.contains(expectedDataRow3), true); - Assertions.assertEquals(responseBody.contains(expectedDataRow4), true); - break; - } - Thread.sleep(1000); + String responseBody = getSelectResponse(DATASOURCE); + String expectedDataRow1 = + "\"c_boolean\":\"true\",\"c_timestamp\":\"2020-02-02T02:02:02\",\"c_string\":\"NEW\",\"c_tinyint\":1,\"c_smallint\":2,\"c_int\":3,\"c_bigint\":4,\"c_float\":4.3,\"c_double\":5.3,\"c_decimal\":6.3"; + String expectedDataRow2 = + "\"c_boolean\":\"false\",\"c_timestamp\":\"2012-12-21T12:34:56\",\"c_string\":\"AAA\",\"c_tinyint\":1,\"c_smallint\":1,\"c_int\":333,\"c_bigint\":323232,\"c_float\":3.1,\"c_double\":9.33333,\"c_decimal\":99999.99999999"; + String expectedDataRow3 = + "\"c_boolean\":\"true\",\"c_timestamp\":\"2016-03-12T11:29:33\",\"c_string\":\"BBB\",\"c_tinyint\":1,\"c_smallint\":2,\"c_int\":672,\"c_bigint\":546782,\"c_float\":7.9,\"c_double\":6.88888,\"c_decimal\":88888.45623489"; + String expectedDataRow4 = + "\"c_boolean\":\"false\",\"c_timestamp\":\"2014-04-28T09:13:27\",\"c_string\":\"CCC\",\"c_tinyint\":1,\"c_smallint\":1,\"c_int\":271,\"c_bigint\":683221,\"c_float\":4.8,\"c_double\":4.45271,\"c_decimal\":79277.68219012"; + + if (!responseBody.contains("errorMessage")) { + // Check sink data + Assertions.assertEquals(responseBody.contains(expectedDataRow1), true); + Assertions.assertEquals(responseBody.contains(expectedDataRow2), true); + Assertions.assertEquals(responseBody.contains(expectedDataRow3), true); + Assertions.assertEquals(responseBody.contains(expectedDataRow4), true); + break; } + Thread.sleep(1000); } } - private void changeCoordinatorURLConf() throws UnknownHostException { + @DisabledOnContainer( + value = {}, + type = {EngineType.SPARK, EngineType.FLINK}, + disabledReason = "Currently SPARK/FLINK do not support multiple table read") + @TestTemplate + public void testDruidMultiSink(TestContainer container) throws Exception { + Container.ExecResult execResult = + container.executeJob("/fakesource_to_druid_with_multi.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + // Check multi sink table 1 + while (true) { + String responseBody = getSelectResponse(MULTI_DATASOURCE_1); + String expectedDataRow = + "\"id\":1,\"val_bool\":\"true\",\"val_tinyint\":1,\"val_smallint\":2,\"val_int\":3,\"val_bigint\":4,\"val_float\":4.3,\"val_double\":5.3,\"val_decimal\":6.3,\"val_string\":\"NEW\""; + + if (!responseBody.contains("errorMessage")) { + Assertions.assertEquals(responseBody.contains(expectedDataRow), true); + break; + } + Thread.sleep(1000); + } + // Check multi sink table 2 + while (true) { + String responseBody = getSelectResponse(MULTI_DATASOURCE_2); + String expectedDataRow = + "\"id\":1,\"val_bool\":\"true\",\"val_tinyint\":1,\"val_smallint\":2,\"val_int\":3,\"val_bigint\":4,\"val_float\":4.3,\"val_double\":5.3,\"val_decimal\":6.3"; + if (!responseBody.contains("errorMessage")) { + Assertions.assertEquals(responseBody.contains(expectedDataRow), true); + break; + } + Thread.sleep(1000); + } + } + + private void changeCoordinatorURLConf(String resourceFilePath) throws UnknownHostException { coordinatorURL = InetAddress.getLocalHost().getHostAddress() + ":8888"; - String resourceFilePath = "src/test/resources/fakesource_to_druid.conf"; Path path = Paths.get(resourceFilePath); try { List<String> lines = Files.readAllLines(path); @@ -145,4 +175,17 @@ public class DruidIT extends TestSuiteBase implements TestResource { throw new RuntimeException("Change conf error", e); } } + + private String getSelectResponse(String datasource) throws IOException { + try (CloseableHttpClient client = HttpClients.createDefault()) { + HttpPost request = new HttpPost("http://" + coordinatorURL + "/druid/v2/sql"); + String jsonRequest = "{\"query\": \"" + SQL_QUERY_TEMPLATE + datasource + "\"}"; + StringEntity entity = new StringEntity(jsonRequest); + entity.setContentType("application/json"); + request.setEntity(entity); + HttpResponse response = client.execute(request); + String responseBody = EntityUtils.toString(response.getEntity()); + return responseBody; + } + } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-druid-e2e/src/test/resources/fakesource_to_druid_with_multi.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-druid-e2e/src/test/resources/fakesource_to_druid_with_multi.conf new file mode 100644 index 0000000000..a66cde8d7e --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-druid-e2e/src/test/resources/fakesource_to_druid_with_multi.conf @@ -0,0 +1,83 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + FakeSource { + tables_configs = [ + { + schema = { + table = "druid_sink_1" + fields { + id = int + val_bool = boolean + val_tinyint = tinyint + val_smallint = smallint + val_int = int + val_bigint = bigint + val_float = float + val_double = double + val_decimal = "decimal(16, 1)" + val_string = string + } + } + rows = [ + { + kind = INSERT + fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW"] + } + ] + }, + { + schema = { + table = "druid_sink_2" + fields { + id = int + val_bool = boolean + val_tinyint = tinyint + val_smallint = smallint + val_int = int + val_bigint = bigint + val_float = float + val_double = double + val_decimal = "decimal(16, 1)" + } + } + rows = [ + { + kind = INSERT + fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3] + } + ] + } + ] + } +} + +transform { +} + +sink { + Druid { + coordinatorUrl = "localhost:8888" + datasource = "${table_name}" + } +}