This is an automated email from the ASF dual-hosted git repository. wanghailin pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new d7fa9afdfe [Feature][Connector] Add druid sink connector (#6346) d7fa9afdfe is described below commit d7fa9afdfe95633f73fb66c877f954b8a29f8579 Author: TaoZex <45089228+tao...@users.noreply.github.com> AuthorDate: Wed Jun 19 14:29:42 2024 +0800 [Feature][Connector] Add druid sink connector (#6346) --- config/plugin_config | 1 + docs/en/connector-v2/sink/Druid.md | 67 ++++++ plugin-mapping.properties | 1 + seatunnel-connectors-v2/connector-druid/pom.xml | 57 +++++ .../connectors/druid/config/DruidConfig.java | 44 ++++ .../druid/exception/DruidConnectorException.java | 38 +++ .../seatunnel/connectors/druid/sink/DruidSink.java | 61 +++++ .../connectors/druid/sink/DruidSinkFactory.java | 50 ++++ .../connectors/druid/sink/DruidWriter.java | 255 +++++++++++++++++++++ .../seatunnel/druid/DruidFactoryTest.java | 31 +++ seatunnel-connectors-v2/pom.xml | 1 + seatunnel-dist/pom.xml | 6 + .../connector-druid-e2e/pom.xml | 54 +++++ .../seatunnel/e2e/connector/druid/DruidIT.java | 148 ++++++++++++ .../src/test/resources/docker-compose.yml | 141 ++++++++++++ .../src/test/resources/environment | 53 +++++ .../src/test/resources/fakesource_to_druid.conf | 69 ++++++ seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml | 1 + 18 files changed, 1078 insertions(+) diff --git a/config/plugin_config b/config/plugin_config index 42fc280a65..b18d893e67 100644 --- a/config/plugin_config +++ b/config/plugin_config @@ -67,6 +67,7 @@ connector-openmldb connector-pulsar connector-rabbitmq connector-redis +connector-druid connector-s3-redshift connector-sentry connector-slack diff --git a/docs/en/connector-v2/sink/Druid.md b/docs/en/connector-v2/sink/Druid.md new file mode 100644 index 0000000000..0d4783b03a --- /dev/null +++ b/docs/en/connector-v2/sink/Druid.md @@ -0,0 +1,67 @@ +# Druid + +> Druid sink connector + +## Description + +Write data to Druid + +## Key features + +- [ ] [exactly-once](../../concept/connector-v2-features.md) + +## Data Type Mapping + +| SeaTunnel Data Type | Druid Data Type | +|---------------------|-----------------| +| TINYINT | LONG | +| SMALLINT | LONG | +| INT | LONG | +| BIGINT | LONG | +| FLOAT | FLOAT | +| DOUBLE | DOUBLE | +| DECIMAL | DOUBLE | +| STRING | STRING | +| BOOLEAN | STRING | +| TIMESTAMP | STRING | + +## Options + +| name | type | required | default value | +|----------------|--------|----------|---------------| +| coordinatorUrl | string | yes | - | +| datasource | string | yes | - | +| batchSize | int | no | 10000 | +| common-options | | no | - | + +### coordinatorUrl [string] + +The coordinatorUrl host and port of Druid, example: "myHost:8888" + +### datasource [string] + +The datasource name you want to write, example: "seatunnel" + +### batchSize [int] + +The number of rows flushed to Druid per batch. Default value is `1024`. + +### common options + +Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details + +## Example + +```hocon +Druid { + coordinatorUrl = "testHost:8888" + datasource = "seatunnel" +} +``` + +## Changelog + +### next version + +- Add Druid sink connector + diff --git a/plugin-mapping.properties b/plugin-mapping.properties index 411e42b880..25dc239f99 100644 --- a/plugin-mapping.properties +++ b/plugin-mapping.properties @@ -119,6 +119,7 @@ seatunnel.source.AmazonSqs = connector-amazonsqs seatunnel.sink.AmazonSqs = connector-amazonsqs seatunnel.source.Paimon = connector-paimon seatunnel.sink.Paimon = connector-paimon +seatunnel.sink.Druid = connector-druid seatunnel.source.Easysearch = connector-easysearch seatunnel.sink.Easysearch = connector-easysearch seatunnel.source.Postgres-CDC = connector-cdc-postgres diff --git a/seatunnel-connectors-v2/connector-druid/pom.xml b/seatunnel-connectors-v2/connector-druid/pom.xml new file mode 100644 index 0000000000..c53ca12f8f --- /dev/null +++ b/seatunnel-connectors-v2/connector-druid/pom.xml @@ -0,0 +1,57 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.seatunnel</groupId> + <artifactId>seatunnel-connectors-v2</artifactId> + <version>${revision}</version> + </parent> + + <artifactId>connector-druid</artifactId> + <name>SeaTunnel : Connectors V2 : Druid</name> + + <properties> + <druid.version>24.0.1</druid.version> + <httpclient.version>4.5.13</httpclient.version> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.seatunnel</groupId> + <artifactId>connector-common</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.druid</groupId> + <artifactId>druid-processing</artifactId> + <version>${druid.version}</version> + </dependency> + <dependency> + <groupId>org.apache.druid</groupId> + <artifactId>druid-indexing-service</artifactId> + <version>${druid.version}</version> + </dependency> + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpclient</artifactId> + <version>${httpclient.version}</version> + </dependency> + </dependencies> +</project> diff --git a/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/config/DruidConfig.java b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/config/DruidConfig.java new file mode 100644 index 0000000000..a2883143a3 --- /dev/null +++ b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/config/DruidConfig.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.druid.config; + +import org.apache.seatunnel.api.configuration.Option; +import org.apache.seatunnel.api.configuration.Options; + +public class DruidConfig { + public static final Integer BATCH_SIZE_DEFAULT = 10000; + + public static Option<String> COORDINATOR_URL = + Options.key("coordinatorUrl") + .stringType() + .noDefaultValue() + .withDescription("The coordinatorUrl host and port of Druid."); + + public static Option<String> DATASOURCE = + Options.key("datasource") + .stringType() + .noDefaultValue() + .withDescription("The datasource name need to write."); + + public static Option<Integer> BATCH_SIZE = + Options.key("batchSize") + .intType() + .defaultValue(BATCH_SIZE_DEFAULT) + .withDescription("The batch size of the druid write."); +} diff --git a/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/exception/DruidConnectorException.java b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/exception/DruidConnectorException.java new file mode 100644 index 0000000000..23c7348f7b --- /dev/null +++ b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/exception/DruidConnectorException.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.druid.exception; + +import org.apache.seatunnel.common.exception.SeaTunnelErrorCode; +import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; + +public class DruidConnectorException extends SeaTunnelRuntimeException { + + public DruidConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage) { + super(seaTunnelErrorCode, errorMessage); + } + + public DruidConnectorException( + SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage, Throwable cause) { + super(seaTunnelErrorCode, errorMessage, cause); + } + + public DruidConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, Throwable cause) { + super(seaTunnelErrorCode, cause); + } +} diff --git a/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidSink.java b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidSink.java new file mode 100644 index 0000000000..318f3a1bd0 --- /dev/null +++ b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidSink.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.druid.sink; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink; +import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; + +import java.io.IOException; + +import static org.apache.seatunnel.connectors.druid.config.DruidConfig.BATCH_SIZE; +import static org.apache.seatunnel.connectors.druid.config.DruidConfig.COORDINATOR_URL; +import static org.apache.seatunnel.connectors.druid.config.DruidConfig.DATASOURCE; + +public class DruidSink extends AbstractSimpleSink<SeaTunnelRow, Void> { + + private ReadonlyConfig config; + private CatalogTable catalogTable; + private SeaTunnelRowType seaTunnelRowType; + + @Override + public String getPluginName() { + return "Druid"; + } + + public DruidSink(ReadonlyConfig config, CatalogTable table) { + this.config = config; + this.catalogTable = table; + this.seaTunnelRowType = catalogTable.getSeaTunnelRowType(); + } + + @Override + public AbstractSinkWriter<SeaTunnelRow, Void> createWriter(SinkWriter.Context context) + throws IOException { + return new DruidWriter( + seaTunnelRowType, + config.get(COORDINATOR_URL), + config.get(DATASOURCE), + config.get(BATCH_SIZE)); + } +} diff --git a/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidSinkFactory.java b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidSinkFactory.java new file mode 100644 index 0000000000..44e887810e --- /dev/null +++ b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidSinkFactory.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.druid.sink; + +import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.connector.TableSink; +import org.apache.seatunnel.api.table.factory.Factory; +import org.apache.seatunnel.api.table.factory.TableSinkFactory; +import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext; + +import com.google.auto.service.AutoService; + +import static org.apache.seatunnel.connectors.druid.config.DruidConfig.COORDINATOR_URL; +import static org.apache.seatunnel.connectors.druid.config.DruidConfig.DATASOURCE; + +@AutoService(Factory.class) +public class DruidSinkFactory implements TableSinkFactory { + @Override + public String factoryIdentifier() { + return "Druid"; + } + + @Override + public OptionRule optionRule() { + return OptionRule.builder().required(COORDINATOR_URL, DATASOURCE).build(); + } + + @Override + public TableSink createSink(TableSinkFactoryContext context) { + CatalogTable catalogTable = context.getCatalogTable(); + return () -> new DruidSink(context.getOptions(), catalogTable); + } +} diff --git a/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidWriter.java b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidWriter.java new file mode 100644 index 0000000000..3f7709b51d --- /dev/null +++ b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidWriter.java @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.druid.sink; + +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated; +import org.apache.seatunnel.connectors.druid.exception.DruidConnectorException; +import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; + +import org.apache.druid.data.input.impl.CsvInputFormat; +import org.apache.druid.data.input.impl.DimensionSchema; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.DoubleDimensionSchema; +import org.apache.druid.data.input.impl.FloatDimensionSchema; +import org.apache.druid.data.input.impl.InlineInputSource; +import org.apache.druid.data.input.impl.LongDimensionSchema; +import org.apache.druid.data.input.impl.StringDimensionSchema; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIOConfig; +import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIngestionSpec; +import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.segment.indexing.DataSchema; +import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.MapperFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.fasterxml.jackson.datatype.joda.JodaModule; +import com.google.common.annotations.VisibleForTesting; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.StringJoiner; +import java.util.stream.Collectors; + +public class DruidWriter extends AbstractSinkWriter<SeaTunnelRow, Void> { + + private static final Logger LOG = LoggerFactory.getLogger(DruidWriter.class); + + private static final String DEFAULT_LINE_DELIMITER = "\n"; + private static final String DEFAULT_FIELD_DELIMITER = ","; + private static final String TIMESTAMP_SPEC_COLUMN_NAME = "timestamp"; + private static final String DRUID_ENDPOINT = "/druid/indexer/v1/task"; + + private int batchSize; + private int currentBatchSize = 0; + + private final DataSchema dataSchema; + + private final long processTime; + private final transient StringBuffer data; + + private final CloseableHttpClient httpClient; + private final ObjectMapper mapper; + private final String coordinatorUrl; + private final String datasource; + private final SeaTunnelRowType seaTunnelRowType; + + public DruidWriter( + SeaTunnelRowType seaTunnelRowType, + String coordinatorUrl, + String datasource, + int batchSize) { + this.seaTunnelRowType = seaTunnelRowType; + this.coordinatorUrl = coordinatorUrl; + this.datasource = datasource; + this.batchSize = batchSize; + this.mapper = provideDruidSerializer(); + this.httpClient = HttpClients.createDefault(); + this.dataSchema = provideDruidDataSchema(); + this.processTime = System.currentTimeMillis(); + this.data = new StringBuffer(); + } + + @Override + public void write(SeaTunnelRow element) throws IOException { + final StringJoiner joiner = new StringJoiner(DEFAULT_FIELD_DELIMITER, "", ""); + for (int i = 0; i < element.getArity(); i++) { + final Object v = element.getField(i); + if (v != null) { + joiner.add(v.toString()); + } + } + // timestamp column is a required field to add in Druid. + // See https://druid.apache.org/docs/24.0.0/ingestion/data-model.html#primary-timestamp + joiner.add(String.valueOf(processTime)); + data.append(joiner); + data.append(DEFAULT_LINE_DELIMITER); + currentBatchSize++; + if (currentBatchSize >= batchSize) { + flush(); + currentBatchSize = 0; + } + } + + public void flush() throws IOException { + final ParallelIndexIOConfig ioConfig = provideDruidIOConfig(data); + final ParallelIndexSupervisorTask indexTask = provideIndexTask(ioConfig); + final String inputJSON = provideInputJSONString(indexTask); + String uri = new String("http://" + this.coordinatorUrl + DRUID_ENDPOINT); + HttpPost post = new HttpPost(uri); + post.setHeader("Content-Type", "application/json"); + post.setHeader("Accept", "application/json, text/plain, */*"); + post.setEntity(new StringEntity(inputJSON)); + + try (CloseableHttpResponse response = httpClient.execute(post)) { + String responseBody = + response.getEntity() != null ? response.getEntity().toString() : ""; + LOG.info("Druid write task has been sent, and the response is {}", responseBody); + } + } + + @Override + public void close() throws IOException { + flush(); + if (httpClient != null) { + httpClient.close(); + } + } + + private ObjectMapper provideDruidSerializer() { + final ObjectMapper mapper = new ObjectMapper(); + mapper.registerModule(new JodaModule()); + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + mapper.configure(MapperFeature.AUTO_DETECT_GETTERS, false); + mapper.configure(MapperFeature.AUTO_DETECT_FIELDS, false); + mapper.configure(MapperFeature.AUTO_DETECT_IS_GETTERS, false); + mapper.configure(MapperFeature.AUTO_DETECT_SETTERS, false); + mapper.configure(SerializationFeature.INDENT_OUTPUT, false); + mapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false); + return mapper; + } + + /** + * One necessary information to provide is DimensionSchema list, which states data type of + * columns. More details in https://druid.apache.org/docs/latest/ingestion/ingestion-spec.html + */ + private DataSchema provideDruidDataSchema() { + final List<DimensionSchema> dimensionSchemas = transformToDimensionSchema(); + return new DataSchema( + datasource, + new TimestampSpec(TIMESTAMP_SPEC_COLUMN_NAME, "auto", null), + new DimensionsSpec(dimensionSchemas), + null, + new UniformGranularitySpec(Granularities.HOUR, Granularities.MINUTE, false, null), + null); + } + + private List<DimensionSchema> transformToDimensionSchema() { + List<DimensionSchema> dimensionSchemas = new ArrayList<>(); + String[] fieldNames = seaTunnelRowType.getFieldNames(); + SeaTunnelDataType<?>[] fieldTypes = seaTunnelRowType.getFieldTypes(); + for (int i = 0; i < fieldNames.length; i++) { + String columnName = fieldNames[i]; + switch (fieldTypes[i].getSqlType()) { + case BOOLEAN: + case TIMESTAMP: + case STRING: + dimensionSchemas.add(new StringDimensionSchema(columnName)); + break; + case FLOAT: + dimensionSchemas.add(new FloatDimensionSchema(columnName)); + break; + case DECIMAL: + case DOUBLE: + dimensionSchemas.add(new DoubleDimensionSchema(columnName)); + break; + case TINYINT: + case SMALLINT: + case INT: + case BIGINT: + dimensionSchemas.add(new LongDimensionSchema(columnName)); + break; + default: + throw new DruidConnectorException( + CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE, + "Unsupported data type " + seaTunnelRowType.getFieldType(i)); + } + } + return dimensionSchemas; + } + + ParallelIndexIOConfig provideDruidIOConfig(final StringBuffer data) { + List<String> formatList = + Arrays.stream(seaTunnelRowType.getFieldNames()).collect(Collectors.toList()); + formatList.add(TIMESTAMP_SPEC_COLUMN_NAME); + return new ParallelIndexIOConfig( + null, + new InlineInputSource(data.toString()), + new CsvInputFormat(formatList, DEFAULT_LINE_DELIMITER, null, false, 0), + false, + null); + } + + /** + * Provide ParallelIndexSupervisorTask that can run multiple indexing tasks concurrently. See + * more information in https://druid.apache.org/docs/latest/ingestion/native-batch.html + */ + @VisibleForTesting + ParallelIndexSupervisorTask provideIndexTask(final ParallelIndexIOConfig ioConfig) { + return new ParallelIndexSupervisorTask( + null, null, null, new ParallelIndexIngestionSpec(dataSchema, ioConfig, null), null); + } + + /** + * Provide JSON to be sent via HTTP request. Please see payload example in + * https://druid.apache.org/docs/latest/ingestion/ingestion-spec.html + */ + String provideInputJSONString(final ParallelIndexSupervisorTask indexTask) + throws JsonProcessingException { + String taskJSON = mapper.writeValueAsString(indexTask); + final ObjectNode jsonObject = (ObjectNode) mapper.readTree(taskJSON); + jsonObject.remove("id"); + jsonObject.remove("groupId"); + jsonObject.remove("resource"); + + final ObjectNode spec = (ObjectNode) jsonObject.get("spec"); + spec.remove("tuningConfig"); + jsonObject.put("spec", spec); + taskJSON = jsonObject.toString(); + return taskJSON; + } +} diff --git a/seatunnel-connectors-v2/connector-druid/src/test/java/org/apache/seatunnel/connectors/seatunnel/druid/DruidFactoryTest.java b/seatunnel-connectors-v2/connector-druid/src/test/java/org/apache/seatunnel/connectors/seatunnel/druid/DruidFactoryTest.java new file mode 100644 index 0000000000..b5b40bb3ec --- /dev/null +++ b/seatunnel-connectors-v2/connector-druid/src/test/java/org/apache/seatunnel/connectors/seatunnel/druid/DruidFactoryTest.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.druid; + +import org.apache.seatunnel.connectors.druid.sink.DruidSinkFactory; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class DruidFactoryTest { + @Test + public void optionRuleTest() { + Assertions.assertNotNull((new DruidSinkFactory()).optionRule()); + } +} diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml index d1e5af9ee6..0498ff4539 100644 --- a/seatunnel-connectors-v2/pom.xml +++ b/seatunnel-connectors-v2/pom.xml @@ -67,6 +67,7 @@ <module>connector-rabbitmq</module> <module>connector-openmldb</module> <module>connector-doris</module> + <module>connector-druid</module> <module>connector-maxcompute</module> <module>connector-tdengine</module> <module>connector-selectdb-cloud</module> diff --git a/seatunnel-dist/pom.xml b/seatunnel-dist/pom.xml index 59ce612230..fe75ace87a 100644 --- a/seatunnel-dist/pom.xml +++ b/seatunnel-dist/pom.xml @@ -227,6 +227,12 @@ <version>${project.version}</version> <scope>provided</scope> </dependency> + <dependency> + <groupId>org.apache.seatunnel</groupId> + <artifactId>connector-druid</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> <dependency> <groupId>org.apache.seatunnel</groupId> <artifactId>connector-jdbc</artifactId> diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-druid-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-druid-e2e/pom.xml new file mode 100644 index 0000000000..2531a70f48 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-druid-e2e/pom.xml @@ -0,0 +1,54 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.seatunnel</groupId> + <artifactId>seatunnel-connector-v2-e2e</artifactId> + <version>${revision}</version> + </parent> + + <artifactId>connector-druid-e2e</artifactId> + <name>SeaTunnel : E2E : Connector V2 : Druid</name> + + <properties> + <druid.version>24.0.1</druid.version> + <httpclient.version>4.5.13</httpclient.version> + </properties> + + <dependencies> + <!-- SeaTunnel connectors --> + <dependency> + <groupId>org.apache.seatunnel</groupId> + <artifactId>connector-fake</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.seatunnel</groupId> + <artifactId>connector-druid</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpclient</artifactId> + <version>${httpclient.version}</version> + <scope>test</scope> + </dependency> + </dependencies> +</project> diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-druid-e2e/src/test/java/org/apache/seatunnel/e2e/connector/druid/DruidIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-druid-e2e/src/test/java/org/apache/seatunnel/e2e/connector/druid/DruidIT.java new file mode 100644 index 0000000000..1639636b85 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-druid-e2e/src/test/java/org/apache/seatunnel/e2e/connector/druid/DruidIT.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.connector.druid; + +import org.apache.seatunnel.e2e.common.TestResource; +import org.apache.seatunnel.e2e.common.TestSuiteBase; +import org.apache.seatunnel.e2e.common.container.TestContainer; +import org.apache.seatunnel.e2e.common.container.TestContainerId; +import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; + +import org.apache.http.HttpResponse; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.TestTemplate; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.DockerComposeContainer; +import org.testcontainers.containers.wait.strategy.Wait; + +import lombok.extern.slf4j.Slf4j; + +import java.io.File; +import java.io.IOException; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.time.Duration; +import java.util.List; +import java.util.stream.Collectors; + +@Slf4j +@DisabledOnContainer( + value = {TestContainerId.SPARK_2_4}, + disabledReason = "The RoaringBitmap version is not compatible in docker container") +public class DruidIT extends TestSuiteBase implements TestResource { + + private static final String datasource = "testDataSource"; + private static final String sqlQuery = "SELECT * FROM " + datasource; + private static final String DRUID_SERVICE_NAME = "router"; + private static final int DRUID_SERVICE_PORT = 8888; + private DockerComposeContainer environment; + private String coordinatorURL; + + @BeforeAll + @Override + public void startUp() throws Exception { + environment = + new DockerComposeContainer(new File("src/test/resources/docker-compose.yml")) + .withExposedService( + DRUID_SERVICE_NAME, + DRUID_SERVICE_PORT, + Wait.forListeningPort() + .withStartupTimeout(Duration.ofSeconds(360))); + environment.start(); + changeCoordinatorURLConf(); + } + + @AfterAll + @Override + public void tearDown() throws Exception { + environment.close(); + } + + @TestTemplate + public void testDruidSink(TestContainer container) throws Exception { + Container.ExecResult execResult = container.executeJob("/fakesource_to_druid.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + while (true) { + try (CloseableHttpClient client = HttpClients.createDefault()) { + HttpPost request = new HttpPost("http://" + coordinatorURL + "/druid/v2/sql"); + String jsonRequest = "{\"query\": \"" + sqlQuery + "\"}"; + StringEntity entity = new StringEntity(jsonRequest); + entity.setContentType("application/json"); + request.setEntity(entity); + HttpResponse response = client.execute(request); + String responseBody = EntityUtils.toString(response.getEntity()); + String expectedDataRow1 = + "\"c_boolean\":\"true\",\"c_timestamp\":\"2020-02-02T02:02:02\",\"c_string\":\"NEW\",\"c_tinyint\":1,\"c_smallint\":2,\"c_int\":3,\"c_bigint\":4,\"c_float\":4.3,\"c_double\":5.3,\"c_decimal\":6.3"; + String expectedDataRow2 = + "\"c_boolean\":\"false\",\"c_timestamp\":\"2012-12-21T12:34:56\",\"c_string\":\"AAA\",\"c_tinyint\":1,\"c_smallint\":1,\"c_int\":333,\"c_bigint\":323232,\"c_float\":3.1,\"c_double\":9.33333,\"c_decimal\":99999.99999999"; + String expectedDataRow3 = + "\"c_boolean\":\"true\",\"c_timestamp\":\"2016-03-12T11:29:33\",\"c_string\":\"BBB\",\"c_tinyint\":1,\"c_smallint\":2,\"c_int\":672,\"c_bigint\":546782,\"c_float\":7.9,\"c_double\":6.88888,\"c_decimal\":88888.45623489"; + String expectedDataRow4 = + "\"c_boolean\":\"false\",\"c_timestamp\":\"2014-04-28T09:13:27\",\"c_string\":\"CCC\",\"c_tinyint\":1,\"c_smallint\":1,\"c_int\":271,\"c_bigint\":683221,\"c_float\":4.8,\"c_double\":4.45271,\"c_decimal\":79277.68219012"; + + if (!responseBody.contains("errorMessage")) { + // Check sink data + Assertions.assertEquals(responseBody.contains(expectedDataRow1), true); + Assertions.assertEquals(responseBody.contains(expectedDataRow2), true); + Assertions.assertEquals(responseBody.contains(expectedDataRow3), true); + Assertions.assertEquals(responseBody.contains(expectedDataRow4), true); + break; + } + Thread.sleep(1000); + } + } + } + + private void changeCoordinatorURLConf() throws UnknownHostException { + coordinatorURL = InetAddress.getLocalHost().getHostAddress() + ":8888"; + String resourceFilePath = "src/test/resources/fakesource_to_druid.conf"; + Path path = Paths.get(resourceFilePath); + try { + List<String> lines = Files.readAllLines(path); + List<String> newLines = + lines.stream() + .map( + line -> { + if (line.contains("coordinatorUrl")) { + return " coordinatorUrl = " + + "\"" + + coordinatorURL + + "\""; + } + return line; + }) + .collect(Collectors.toList()); + Files.write(path, newLines); + log.info("Conf has been updated successfully."); + } catch (IOException e) { + throw new RuntimeException("Change conf error", e); + } + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-druid-e2e/src/test/resources/docker-compose.yml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-druid-e2e/src/test/resources/docker-compose.yml new file mode 100644 index 0000000000..a59b23014e --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-druid-e2e/src/test/resources/docker-compose.yml @@ -0,0 +1,141 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +version: "2.2" + +volumes: + metadata_data: {} + middle_var: {} + historical_var: {} + broker_var: {} + coordinator_var: {} + router_var: {} + druid_shared: {} + + +services: + chmod-service: + image: ubuntu:latest + user: "0" + command: sh -c "mkdir -p /opt/druid/shared && chmod -R a+rwx /opt/druid/shared" + volumes: + - druid_shared:/opt/druid/shared + + postgres: + image: postgres:latest + ports: + - "5432:5432" + volumes: + - metadata_data:/var/lib/postgresql/data + environment: + - POSTGRES_PASSWORD=FoolishPassword + - POSTGRES_USER=druid + - POSTGRES_DB=druid + depends_on: + - chmod-service + + # Need 3.5 or later for container nodes + zookeeper: + image: zookeeper:3.5.10 + ports: + - "2181:2181" + environment: + - ZOO_MY_ID=1 + depends_on: + - chmod-service + + coordinator: + image: apache/druid:24.0.1 + volumes: + - druid_shared:/opt/druid/shared + - coordinator_var:/opt/druid/var + depends_on: + - zookeeper + - postgres + - chmod-service + ports: + - "8032:8081" + command: + - coordinator + env_file: + - environment + + broker: + image: apache/druid:24.0.1 + volumes: + - broker_var:/opt/druid/var + depends_on: + - zookeeper + - postgres + - coordinator + - chmod-service + ports: + - "8082:8082" + command: + - broker + env_file: + - environment + + historical: + image: apache/druid:24.0.1 + volumes: + - druid_shared:/opt/druid/shared + - historical_var:/opt/druid/var + depends_on: + - zookeeper + - postgres + - coordinator + - chmod-service + ports: + - "8083:8083" + command: + - historical + env_file: + - environment + + middlemanager: + image: apache/druid:24.0.1 + volumes: + - druid_shared:/opt/druid/shared + - middle_var:/opt/druid/var + depends_on: + - zookeeper + - postgres + - coordinator + - chmod-service + ports: + - "8091:8091" + - "8100-8105:8100-8105" + command: + - middleManager + env_file: + - environment + + router: + image: apache/druid:24.0.1 + volumes: + - router_var:/opt/druid/var + depends_on: + - zookeeper + - postgres + - coordinator + - chmod-service + ports: + - "8888:8888" + command: + - router + env_file: + - environment diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-druid-e2e/src/test/resources/environment b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-druid-e2e/src/test/resources/environment new file mode 100644 index 0000000000..55737fcbcd --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-druid-e2e/src/test/resources/environment @@ -0,0 +1,53 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Java tuning +#DRUID_XMX=1g +#DRUID_XMS=1g +#DRUID_MAXNEWSIZE=250m +#DRUID_NEWSIZE=250m +#DRUID_MAXDIRECTMEMORYSIZE=6172m +DRUID_SINGLE_NODE_CONF=nano-quickstart + +druid_emitter_logging_logLevel=debug + +druid_extensions_loadList=["druid-histogram", "druid-datasketches", "druid-lookups-cached-global", "postgresql-metadata-storage", "druid-multi-stage-query"] + +druid_zk_service_host=zookeeper + +druid_metadata_storage_host= +druid_metadata_storage_type=postgresql +druid_metadata_storage_connector_connectURI=jdbc:postgresql://postgres:5432/druid +druid_metadata_storage_connector_user=druid +druid_metadata_storage_connector_password=FoolishPassword + +druid_coordinator_balancer_strategy=cachingCost + +druid_indexer_runner_javaOptsArray=["-server", "-Xmx1g", "-Xms1g", "-XX:MaxDirectMemorySize=2g", "-Duser.timezone=UTC", "-Dfile.encoding=UTF-8", "-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager"] +druid_indexer_fork_property_druid_processing_buffer_sizeBytes=256MiB + +druid_storage_type=local +druid_storage_storageDirectory=/opt/druid/shared/segments +druid_indexer_logs_type=file +druid_indexer_logs_directory=/opt/druid/shared/indexing-logs + +druid_processing_numThreads=1 +druid_processing_numMergeBuffers=1 +druid_worker_capacity=1 + +DRUID_LOG4J=<?xml version="1.0" encoding="UTF-8" ?><Configuration status="WARN"><Appenders><Console name="Console" target="SYSTEM_OUT"><PatternLayout pattern="%d{ISO8601} %p [%t] %c - %m%n"/></Console></Appenders><Loggers><Root level="info"><AppenderRef ref="Console"/></Root><Logger name="org.apache.druid.jetty.RequestLog" additivity="false" level="DEBUG"><AppenderRef ref="Console"/></Logger></Loggers></Configuration> + diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-druid-e2e/src/test/resources/fakesource_to_druid.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-druid-e2e/src/test/resources/fakesource_to_druid.conf new file mode 100644 index 0000000000..7944c7c990 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-druid-e2e/src/test/resources/fakesource_to_druid.conf @@ -0,0 +1,69 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + FakeSource { + result_table_name = "fake" + schema = { + fields { + c_boolean = boolean + c_timestamp = timestamp + c_string = string + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_decimal = "decimal(16, 1)" + } + } + rows = [ + { + kind = INSERT + fields = [true, "2020-02-02T02:02:02", "NEW", 1, 2, 3, 4, 4.3, 5.3, 6.3] + }, + { + kind = INSERT + fields = [false, "2012-12-21T12:34:56", "AAA", 1, 1, 333, 323232, 3.1, 9.33333, 99999.99999999] + }, + { + kind = INSERT + fields = [true, "2016-03-12T11:29:33", "BBB", 1, 2, 672, 546782, 7.9, 6.88888, 88888.45623489] + }, + { + kind = INSERT + fields = [false, "2014-04-28T09:13:27", "CCC", 1, 1, 271, 683221, 4.8, 4.45271, 79277.68219012] + } + ] + } +} + +transform { +} + +sink { + Druid { + coordinatorUrl = "localhost:8888" + datasource = "testDataSource" + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml index 35a002fc4b..9f452425af 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml @@ -61,6 +61,7 @@ <module>connector-hbase-e2e</module> <module>connector-web3j-e2e</module> <module>connector-maxcompute-e2e</module> + <module>connector-druid-e2e</module> <module>connector-google-firestore-e2e</module> <module>connector-rocketmq-e2e</module> <module>connector-file-obs-e2e</module>