This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new d7fa9afdfe [Feature][Connector] Add druid sink connector (#6346)
d7fa9afdfe is described below

commit d7fa9afdfe95633f73fb66c877f954b8a29f8579
Author: TaoZex <45089228+tao...@users.noreply.github.com>
AuthorDate: Wed Jun 19 14:29:42 2024 +0800

    [Feature][Connector] Add druid sink connector (#6346)
---
 config/plugin_config                               |   1 +
 docs/en/connector-v2/sink/Druid.md                 |  67 ++++++
 plugin-mapping.properties                          |   1 +
 seatunnel-connectors-v2/connector-druid/pom.xml    |  57 +++++
 .../connectors/druid/config/DruidConfig.java       |  44 ++++
 .../druid/exception/DruidConnectorException.java   |  38 +++
 .../seatunnel/connectors/druid/sink/DruidSink.java |  61 +++++
 .../connectors/druid/sink/DruidSinkFactory.java    |  50 ++++
 .../connectors/druid/sink/DruidWriter.java         | 255 +++++++++++++++++++++
 .../seatunnel/druid/DruidFactoryTest.java          |  31 +++
 seatunnel-connectors-v2/pom.xml                    |   1 +
 seatunnel-dist/pom.xml                             |   6 +
 .../connector-druid-e2e/pom.xml                    |  54 +++++
 .../seatunnel/e2e/connector/druid/DruidIT.java     | 148 ++++++++++++
 .../src/test/resources/docker-compose.yml          | 141 ++++++++++++
 .../src/test/resources/environment                 |  53 +++++
 .../src/test/resources/fakesource_to_druid.conf    |  69 ++++++
 seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml   |   1 +
 18 files changed, 1078 insertions(+)

diff --git a/config/plugin_config b/config/plugin_config
index 42fc280a65..b18d893e67 100644
--- a/config/plugin_config
+++ b/config/plugin_config
@@ -67,6 +67,7 @@ connector-openmldb
 connector-pulsar
 connector-rabbitmq
 connector-redis
+connector-druid
 connector-s3-redshift
 connector-sentry
 connector-slack
diff --git a/docs/en/connector-v2/sink/Druid.md 
b/docs/en/connector-v2/sink/Druid.md
new file mode 100644
index 0000000000..0d4783b03a
--- /dev/null
+++ b/docs/en/connector-v2/sink/Druid.md
@@ -0,0 +1,67 @@
+# Druid
+
+> Druid sink connector
+
+## Description
+
+Write data to Druid
+
+## Key features
+
+- [ ] [exactly-once](../../concept/connector-v2-features.md)
+
+## Data Type Mapping
+
+| SeaTunnel Data Type | Druid Data Type |
+|---------------------|-----------------|
+| TINYINT             | LONG            |
+| SMALLINT            | LONG            |
+| INT                 | LONG            |
+| BIGINT              | LONG            |
+| FLOAT               | FLOAT           |
+| DOUBLE              | DOUBLE          |
+| DECIMAL             | DOUBLE          |
+| STRING              | STRING          |
+| BOOLEAN             | STRING          |
+| TIMESTAMP           | STRING          |
+
+## Options
+
+|      name      |  type  | required | default value |
+|----------------|--------|----------|---------------|
+| coordinatorUrl | string | yes      | -             |
+| datasource     | string | yes      | -             |
+| batchSize      | int    | no       | 10000         |
+| common-options |        | no       | -             |
+
+### coordinatorUrl [string]
+
+The coordinatorUrl host and port of Druid, example: "myHost:8888"
+
+### datasource [string]
+
+The datasource name you want to write, example: "seatunnel"
+
+### batchSize [int]
+
+The number of rows flushed to Druid per batch. Default value is `1024`.
+
+### common options
+
+Sink plugin common parameters, please refer to [Sink Common 
Options](common-options.md) for details
+
+## Example
+
+```hocon
+Druid {
+  coordinatorUrl = "testHost:8888"
+  datasource = "seatunnel"
+}
+```
+
+## Changelog
+
+### next version
+
+- Add Druid sink connector
+
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index 411e42b880..25dc239f99 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -119,6 +119,7 @@ seatunnel.source.AmazonSqs = connector-amazonsqs
 seatunnel.sink.AmazonSqs = connector-amazonsqs
 seatunnel.source.Paimon = connector-paimon
 seatunnel.sink.Paimon = connector-paimon
+seatunnel.sink.Druid = connector-druid
 seatunnel.source.Easysearch = connector-easysearch
 seatunnel.sink.Easysearch = connector-easysearch
 seatunnel.source.Postgres-CDC = connector-cdc-postgres
diff --git a/seatunnel-connectors-v2/connector-druid/pom.xml 
b/seatunnel-connectors-v2/connector-druid/pom.xml
new file mode 100644
index 0000000000..c53ca12f8f
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-druid/pom.xml
@@ -0,0 +1,57 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.seatunnel</groupId>
+        <artifactId>seatunnel-connectors-v2</artifactId>
+        <version>${revision}</version>
+    </parent>
+
+    <artifactId>connector-druid</artifactId>
+    <name>SeaTunnel : Connectors V2 : Druid</name>
+
+    <properties>
+        <druid.version>24.0.1</druid.version>
+        <httpclient.version>4.5.13</httpclient.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.druid</groupId>
+            <artifactId>druid-processing</artifactId>
+            <version>${druid.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.druid</groupId>
+            <artifactId>druid-indexing-service</artifactId>
+            <version>${druid.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpclient</artifactId>
+            <version>${httpclient.version}</version>
+        </dependency>
+    </dependencies>
+</project>
diff --git 
a/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/config/DruidConfig.java
 
b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/config/DruidConfig.java
new file mode 100644
index 0000000000..a2883143a3
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/config/DruidConfig.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.druid.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+public class DruidConfig {
+    public static final Integer BATCH_SIZE_DEFAULT = 10000;
+
+    public static Option<String> COORDINATOR_URL =
+            Options.key("coordinatorUrl")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The coordinatorUrl host and port of 
Druid.");
+
+    public static Option<String> DATASOURCE =
+            Options.key("datasource")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The datasource name need to write.");
+
+    public static Option<Integer> BATCH_SIZE =
+            Options.key("batchSize")
+                    .intType()
+                    .defaultValue(BATCH_SIZE_DEFAULT)
+                    .withDescription("The batch size of the druid write.");
+}
diff --git 
a/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/exception/DruidConnectorException.java
 
b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/exception/DruidConnectorException.java
new file mode 100644
index 0000000000..23c7348f7b
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/exception/DruidConnectorException.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.druid.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+
+public class DruidConnectorException extends SeaTunnelRuntimeException {
+
+    public DruidConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, 
String errorMessage) {
+        super(seaTunnelErrorCode, errorMessage);
+    }
+
+    public DruidConnectorException(
+            SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage, 
Throwable cause) {
+        super(seaTunnelErrorCode, errorMessage, cause);
+    }
+
+    public DruidConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, 
Throwable cause) {
+        super(seaTunnelErrorCode, cause);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidSink.java
 
b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidSink.java
new file mode 100644
index 0000000000..318f3a1bd0
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidSink.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.druid.sink;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
+import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+
+import java.io.IOException;
+
+import static 
org.apache.seatunnel.connectors.druid.config.DruidConfig.BATCH_SIZE;
+import static 
org.apache.seatunnel.connectors.druid.config.DruidConfig.COORDINATOR_URL;
+import static 
org.apache.seatunnel.connectors.druid.config.DruidConfig.DATASOURCE;
+
+public class DruidSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
+
+    private ReadonlyConfig config;
+    private CatalogTable catalogTable;
+    private SeaTunnelRowType seaTunnelRowType;
+
+    @Override
+    public String getPluginName() {
+        return "Druid";
+    }
+
+    public DruidSink(ReadonlyConfig config, CatalogTable table) {
+        this.config = config;
+        this.catalogTable = table;
+        this.seaTunnelRowType = catalogTable.getSeaTunnelRowType();
+    }
+
+    @Override
+    public AbstractSinkWriter<SeaTunnelRow, Void> 
createWriter(SinkWriter.Context context)
+            throws IOException {
+        return new DruidWriter(
+                seaTunnelRowType,
+                config.get(COORDINATOR_URL),
+                config.get(DATASOURCE),
+                config.get(BATCH_SIZE));
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidSinkFactory.java
 
b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidSinkFactory.java
new file mode 100644
index 0000000000..44e887810e
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidSinkFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.druid.sink;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.connector.TableSink;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
+
+import com.google.auto.service.AutoService;
+
+import static 
org.apache.seatunnel.connectors.druid.config.DruidConfig.COORDINATOR_URL;
+import static 
org.apache.seatunnel.connectors.druid.config.DruidConfig.DATASOURCE;
+
+@AutoService(Factory.class)
+public class DruidSinkFactory implements TableSinkFactory {
+    @Override
+    public String factoryIdentifier() {
+        return "Druid";
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        return OptionRule.builder().required(COORDINATOR_URL, 
DATASOURCE).build();
+    }
+
+    @Override
+    public TableSink createSink(TableSinkFactoryContext context) {
+        CatalogTable catalogTable = context.getCatalogTable();
+        return () -> new DruidSink(context.getOptions(), catalogTable);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidWriter.java
 
b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidWriter.java
new file mode 100644
index 0000000000..3f7709b51d
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidWriter.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.druid.sink;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
+import org.apache.seatunnel.connectors.druid.exception.DruidConnectorException;
+import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+
+import org.apache.druid.data.input.impl.CsvInputFormat;
+import org.apache.druid.data.input.impl.DimensionSchema;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.DoubleDimensionSchema;
+import org.apache.druid.data.input.impl.FloatDimensionSchema;
+import org.apache.druid.data.input.impl.InlineInputSource;
+import org.apache.druid.data.input.impl.LongDimensionSchema;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import 
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIOConfig;
+import 
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIngestionSpec;
+import 
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.MapperFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.datatype.joda.JodaModule;
+import com.google.common.annotations.VisibleForTesting;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.StringJoiner;
+import java.util.stream.Collectors;
+
+public class DruidWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(DruidWriter.class);
+
+    private static final String DEFAULT_LINE_DELIMITER = "\n";
+    private static final String DEFAULT_FIELD_DELIMITER = ",";
+    private static final String TIMESTAMP_SPEC_COLUMN_NAME = "timestamp";
+    private static final String DRUID_ENDPOINT = "/druid/indexer/v1/task";
+
+    private int batchSize;
+    private int currentBatchSize = 0;
+
+    private final DataSchema dataSchema;
+
+    private final long processTime;
+    private final transient StringBuffer data;
+
+    private final CloseableHttpClient httpClient;
+    private final ObjectMapper mapper;
+    private final String coordinatorUrl;
+    private final String datasource;
+    private final SeaTunnelRowType seaTunnelRowType;
+
+    public DruidWriter(
+            SeaTunnelRowType seaTunnelRowType,
+            String coordinatorUrl,
+            String datasource,
+            int batchSize) {
+        this.seaTunnelRowType = seaTunnelRowType;
+        this.coordinatorUrl = coordinatorUrl;
+        this.datasource = datasource;
+        this.batchSize = batchSize;
+        this.mapper = provideDruidSerializer();
+        this.httpClient = HttpClients.createDefault();
+        this.dataSchema = provideDruidDataSchema();
+        this.processTime = System.currentTimeMillis();
+        this.data = new StringBuffer();
+    }
+
+    @Override
+    public void write(SeaTunnelRow element) throws IOException {
+        final StringJoiner joiner = new StringJoiner(DEFAULT_FIELD_DELIMITER, 
"", "");
+        for (int i = 0; i < element.getArity(); i++) {
+            final Object v = element.getField(i);
+            if (v != null) {
+                joiner.add(v.toString());
+            }
+        }
+        // timestamp column is a required field to add in Druid.
+        // See 
https://druid.apache.org/docs/24.0.0/ingestion/data-model.html#primary-timestamp
+        joiner.add(String.valueOf(processTime));
+        data.append(joiner);
+        data.append(DEFAULT_LINE_DELIMITER);
+        currentBatchSize++;
+        if (currentBatchSize >= batchSize) {
+            flush();
+            currentBatchSize = 0;
+        }
+    }
+
+    public void flush() throws IOException {
+        final ParallelIndexIOConfig ioConfig = provideDruidIOConfig(data);
+        final ParallelIndexSupervisorTask indexTask = 
provideIndexTask(ioConfig);
+        final String inputJSON = provideInputJSONString(indexTask);
+        String uri = new String("http://"; + this.coordinatorUrl + 
DRUID_ENDPOINT);
+        HttpPost post = new HttpPost(uri);
+        post.setHeader("Content-Type", "application/json");
+        post.setHeader("Accept", "application/json, text/plain, */*");
+        post.setEntity(new StringEntity(inputJSON));
+
+        try (CloseableHttpResponse response = httpClient.execute(post)) {
+            String responseBody =
+                    response.getEntity() != null ? 
response.getEntity().toString() : "";
+            LOG.info("Druid write task has been sent, and the response is {}", 
responseBody);
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        flush();
+        if (httpClient != null) {
+            httpClient.close();
+        }
+    }
+
+    private ObjectMapper provideDruidSerializer() {
+        final ObjectMapper mapper = new ObjectMapper();
+        mapper.registerModule(new JodaModule());
+        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
+        mapper.configure(MapperFeature.AUTO_DETECT_GETTERS, false);
+        mapper.configure(MapperFeature.AUTO_DETECT_FIELDS, false);
+        mapper.configure(MapperFeature.AUTO_DETECT_IS_GETTERS, false);
+        mapper.configure(MapperFeature.AUTO_DETECT_SETTERS, false);
+        mapper.configure(SerializationFeature.INDENT_OUTPUT, false);
+        mapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
+        return mapper;
+    }
+
+    /**
+     * One necessary information to provide is DimensionSchema list, which 
states data type of
+     * columns. More details in 
https://druid.apache.org/docs/latest/ingestion/ingestion-spec.html
+     */
+    private DataSchema provideDruidDataSchema() {
+        final List<DimensionSchema> dimensionSchemas = 
transformToDimensionSchema();
+        return new DataSchema(
+                datasource,
+                new TimestampSpec(TIMESTAMP_SPEC_COLUMN_NAME, "auto", null),
+                new DimensionsSpec(dimensionSchemas),
+                null,
+                new UniformGranularitySpec(Granularities.HOUR, 
Granularities.MINUTE, false, null),
+                null);
+    }
+
+    private List<DimensionSchema> transformToDimensionSchema() {
+        List<DimensionSchema> dimensionSchemas = new ArrayList<>();
+        String[] fieldNames = seaTunnelRowType.getFieldNames();
+        SeaTunnelDataType<?>[] fieldTypes = seaTunnelRowType.getFieldTypes();
+        for (int i = 0; i < fieldNames.length; i++) {
+            String columnName = fieldNames[i];
+            switch (fieldTypes[i].getSqlType()) {
+                case BOOLEAN:
+                case TIMESTAMP:
+                case STRING:
+                    dimensionSchemas.add(new 
StringDimensionSchema(columnName));
+                    break;
+                case FLOAT:
+                    dimensionSchemas.add(new FloatDimensionSchema(columnName));
+                    break;
+                case DECIMAL:
+                case DOUBLE:
+                    dimensionSchemas.add(new 
DoubleDimensionSchema(columnName));
+                    break;
+                case TINYINT:
+                case SMALLINT:
+                case INT:
+                case BIGINT:
+                    dimensionSchemas.add(new LongDimensionSchema(columnName));
+                    break;
+                default:
+                    throw new DruidConnectorException(
+                            CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
+                            "Unsupported data type " + 
seaTunnelRowType.getFieldType(i));
+            }
+        }
+        return dimensionSchemas;
+    }
+
+    ParallelIndexIOConfig provideDruidIOConfig(final StringBuffer data) {
+        List<String> formatList =
+                
Arrays.stream(seaTunnelRowType.getFieldNames()).collect(Collectors.toList());
+        formatList.add(TIMESTAMP_SPEC_COLUMN_NAME);
+        return new ParallelIndexIOConfig(
+                null,
+                new InlineInputSource(data.toString()),
+                new CsvInputFormat(formatList, DEFAULT_LINE_DELIMITER, null, 
false, 0),
+                false,
+                null);
+    }
+
+    /**
+     * Provide ParallelIndexSupervisorTask that can run multiple indexing 
tasks concurrently. See
+     * more information in 
https://druid.apache.org/docs/latest/ingestion/native-batch.html
+     */
+    @VisibleForTesting
+    ParallelIndexSupervisorTask provideIndexTask(final ParallelIndexIOConfig 
ioConfig) {
+        return new ParallelIndexSupervisorTask(
+                null, null, null, new ParallelIndexIngestionSpec(dataSchema, 
ioConfig, null), null);
+    }
+
+    /**
+     * Provide JSON to be sent via HTTP request. Please see payload example in
+     * https://druid.apache.org/docs/latest/ingestion/ingestion-spec.html
+     */
+    String provideInputJSONString(final ParallelIndexSupervisorTask indexTask)
+            throws JsonProcessingException {
+        String taskJSON = mapper.writeValueAsString(indexTask);
+        final ObjectNode jsonObject = (ObjectNode) mapper.readTree(taskJSON);
+        jsonObject.remove("id");
+        jsonObject.remove("groupId");
+        jsonObject.remove("resource");
+
+        final ObjectNode spec = (ObjectNode) jsonObject.get("spec");
+        spec.remove("tuningConfig");
+        jsonObject.put("spec", spec);
+        taskJSON = jsonObject.toString();
+        return taskJSON;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-druid/src/test/java/org/apache/seatunnel/connectors/seatunnel/druid/DruidFactoryTest.java
 
b/seatunnel-connectors-v2/connector-druid/src/test/java/org/apache/seatunnel/connectors/seatunnel/druid/DruidFactoryTest.java
new file mode 100644
index 0000000000..b5b40bb3ec
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-druid/src/test/java/org/apache/seatunnel/connectors/seatunnel/druid/DruidFactoryTest.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.druid;
+
+import org.apache.seatunnel.connectors.druid.sink.DruidSinkFactory;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class DruidFactoryTest {
+    @Test
+    public void optionRuleTest() {
+        Assertions.assertNotNull((new DruidSinkFactory()).optionRule());
+    }
+}
diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml
index d1e5af9ee6..0498ff4539 100644
--- a/seatunnel-connectors-v2/pom.xml
+++ b/seatunnel-connectors-v2/pom.xml
@@ -67,6 +67,7 @@
         <module>connector-rabbitmq</module>
         <module>connector-openmldb</module>
         <module>connector-doris</module>
+        <module>connector-druid</module>
         <module>connector-maxcompute</module>
         <module>connector-tdengine</module>
         <module>connector-selectdb-cloud</module>
diff --git a/seatunnel-dist/pom.xml b/seatunnel-dist/pom.xml
index 59ce612230..fe75ace87a 100644
--- a/seatunnel-dist/pom.xml
+++ b/seatunnel-dist/pom.xml
@@ -227,6 +227,12 @@
                     <version>${project.version}</version>
                     <scope>provided</scope>
                 </dependency>
+                <dependency>
+                    <groupId>org.apache.seatunnel</groupId>
+                    <artifactId>connector-druid</artifactId>
+                    <version>${project.version}</version>
+                    <scope>provided</scope>
+                </dependency>
                 <dependency>
                     <groupId>org.apache.seatunnel</groupId>
                     <artifactId>connector-jdbc</artifactId>
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-druid-e2e/pom.xml 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-druid-e2e/pom.xml
new file mode 100644
index 0000000000..2531a70f48
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-druid-e2e/pom.xml
@@ -0,0 +1,54 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+       http://www.apache.org/licenses/LICENSE-2.0
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.seatunnel</groupId>
+        <artifactId>seatunnel-connector-v2-e2e</artifactId>
+        <version>${revision}</version>
+    </parent>
+
+    <artifactId>connector-druid-e2e</artifactId>
+    <name>SeaTunnel : E2E : Connector V2 : Druid</name>
+
+    <properties>
+        <druid.version>24.0.1</druid.version>
+        <httpclient.version>4.5.13</httpclient.version>
+    </properties>
+
+    <dependencies>
+        <!-- SeaTunnel connectors -->
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-fake</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-druid</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpclient</artifactId>
+            <version>${httpclient.version}</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-druid-e2e/src/test/java/org/apache/seatunnel/e2e/connector/druid/DruidIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-druid-e2e/src/test/java/org/apache/seatunnel/e2e/connector/druid/DruidIT.java
new file mode 100644
index 0000000000..1639636b85
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-druid-e2e/src/test/java/org/apache/seatunnel/e2e/connector/druid/DruidIT.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.druid;
+
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.container.TestContainerId;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.DockerComposeContainer;
+import org.testcontainers.containers.wait.strategy.Wait;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.List;
+import java.util.stream.Collectors;
+
+@Slf4j
+@DisabledOnContainer(
+        value = {TestContainerId.SPARK_2_4},
+        disabledReason = "The RoaringBitmap version is not compatible in 
docker container")
+public class DruidIT extends TestSuiteBase implements TestResource {
+
+    private static final String datasource = "testDataSource";
+    private static final String sqlQuery = "SELECT * FROM " + datasource;
+    private static final String DRUID_SERVICE_NAME = "router";
+    private static final int DRUID_SERVICE_PORT = 8888;
+    private DockerComposeContainer environment;
+    private String coordinatorURL;
+
+    @BeforeAll
+    @Override
+    public void startUp() throws Exception {
+        environment =
+                new DockerComposeContainer(new 
File("src/test/resources/docker-compose.yml"))
+                        .withExposedService(
+                                DRUID_SERVICE_NAME,
+                                DRUID_SERVICE_PORT,
+                                Wait.forListeningPort()
+                                        
.withStartupTimeout(Duration.ofSeconds(360)));
+        environment.start();
+        changeCoordinatorURLConf();
+    }
+
+    @AfterAll
+    @Override
+    public void tearDown() throws Exception {
+        environment.close();
+    }
+
+    @TestTemplate
+    public void testDruidSink(TestContainer container) throws Exception {
+        Container.ExecResult execResult = 
container.executeJob("/fakesource_to_druid.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        while (true) {
+            try (CloseableHttpClient client = HttpClients.createDefault()) {
+                HttpPost request = new HttpPost("http://"; + coordinatorURL + 
"/druid/v2/sql");
+                String jsonRequest = "{\"query\": \"" + sqlQuery + "\"}";
+                StringEntity entity = new StringEntity(jsonRequest);
+                entity.setContentType("application/json");
+                request.setEntity(entity);
+                HttpResponse response = client.execute(request);
+                String responseBody = 
EntityUtils.toString(response.getEntity());
+                String expectedDataRow1 =
+                        
"\"c_boolean\":\"true\",\"c_timestamp\":\"2020-02-02T02:02:02\",\"c_string\":\"NEW\",\"c_tinyint\":1,\"c_smallint\":2,\"c_int\":3,\"c_bigint\":4,\"c_float\":4.3,\"c_double\":5.3,\"c_decimal\":6.3";
+                String expectedDataRow2 =
+                        
"\"c_boolean\":\"false\",\"c_timestamp\":\"2012-12-21T12:34:56\",\"c_string\":\"AAA\",\"c_tinyint\":1,\"c_smallint\":1,\"c_int\":333,\"c_bigint\":323232,\"c_float\":3.1,\"c_double\":9.33333,\"c_decimal\":99999.99999999";
+                String expectedDataRow3 =
+                        
"\"c_boolean\":\"true\",\"c_timestamp\":\"2016-03-12T11:29:33\",\"c_string\":\"BBB\",\"c_tinyint\":1,\"c_smallint\":2,\"c_int\":672,\"c_bigint\":546782,\"c_float\":7.9,\"c_double\":6.88888,\"c_decimal\":88888.45623489";
+                String expectedDataRow4 =
+                        
"\"c_boolean\":\"false\",\"c_timestamp\":\"2014-04-28T09:13:27\",\"c_string\":\"CCC\",\"c_tinyint\":1,\"c_smallint\":1,\"c_int\":271,\"c_bigint\":683221,\"c_float\":4.8,\"c_double\":4.45271,\"c_decimal\":79277.68219012";
+
+                if (!responseBody.contains("errorMessage")) {
+                    // Check sink data
+                    
Assertions.assertEquals(responseBody.contains(expectedDataRow1), true);
+                    
Assertions.assertEquals(responseBody.contains(expectedDataRow2), true);
+                    
Assertions.assertEquals(responseBody.contains(expectedDataRow3), true);
+                    
Assertions.assertEquals(responseBody.contains(expectedDataRow4), true);
+                    break;
+                }
+                Thread.sleep(1000);
+            }
+        }
+    }
+
+    private void changeCoordinatorURLConf() throws UnknownHostException {
+        coordinatorURL = InetAddress.getLocalHost().getHostAddress() + ":8888";
+        String resourceFilePath = 
"src/test/resources/fakesource_to_druid.conf";
+        Path path = Paths.get(resourceFilePath);
+        try {
+            List<String> lines = Files.readAllLines(path);
+            List<String> newLines =
+                    lines.stream()
+                            .map(
+                                    line -> {
+                                        if (line.contains("coordinatorUrl")) {
+                                            return "    coordinatorUrl = "
+                                                    + "\""
+                                                    + coordinatorURL
+                                                    + "\"";
+                                        }
+                                        return line;
+                                    })
+                            .collect(Collectors.toList());
+            Files.write(path, newLines);
+            log.info("Conf has been updated successfully.");
+        } catch (IOException e) {
+            throw new RuntimeException("Change conf error", e);
+        }
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-druid-e2e/src/test/resources/docker-compose.yml
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-druid-e2e/src/test/resources/docker-compose.yml
new file mode 100644
index 0000000000..a59b23014e
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-druid-e2e/src/test/resources/docker-compose.yml
@@ -0,0 +1,141 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+version: "2.2"
+
+volumes:
+  metadata_data: {}
+  middle_var: {}
+  historical_var: {}
+  broker_var: {}
+  coordinator_var: {}
+  router_var: {}
+  druid_shared: {}
+
+
+services:
+  chmod-service:
+    image: ubuntu:latest
+    user: "0"
+    command: sh -c "mkdir -p /opt/druid/shared && chmod -R a+rwx 
/opt/druid/shared"
+    volumes:
+      - druid_shared:/opt/druid/shared
+
+  postgres:
+    image: postgres:latest
+    ports:
+      - "5432:5432"
+    volumes:
+      - metadata_data:/var/lib/postgresql/data
+    environment:
+      - POSTGRES_PASSWORD=FoolishPassword
+      - POSTGRES_USER=druid
+      - POSTGRES_DB=druid
+    depends_on:
+      - chmod-service
+
+  # Need 3.5 or later for container nodes
+  zookeeper:
+    image: zookeeper:3.5.10
+    ports:
+      - "2181:2181"
+    environment:
+      - ZOO_MY_ID=1
+    depends_on:
+      - chmod-service
+
+  coordinator:
+    image: apache/druid:24.0.1
+    volumes:
+      - druid_shared:/opt/druid/shared
+      - coordinator_var:/opt/druid/var
+    depends_on:
+      - zookeeper
+      - postgres
+      - chmod-service
+    ports:
+      - "8032:8081"
+    command:
+      - coordinator
+    env_file:
+      - environment
+
+  broker:
+    image: apache/druid:24.0.1
+    volumes:
+      - broker_var:/opt/druid/var
+    depends_on:
+      - zookeeper
+      - postgres
+      - coordinator
+      - chmod-service
+    ports:
+      - "8082:8082"
+    command:
+      - broker
+    env_file:
+      - environment
+
+  historical:
+    image: apache/druid:24.0.1
+    volumes:
+      - druid_shared:/opt/druid/shared
+      - historical_var:/opt/druid/var
+    depends_on:
+      - zookeeper
+      - postgres
+      - coordinator
+      - chmod-service
+    ports:
+      - "8083:8083"
+    command:
+      - historical
+    env_file:
+      - environment
+
+  middlemanager:
+    image: apache/druid:24.0.1
+    volumes:
+      - druid_shared:/opt/druid/shared
+      - middle_var:/opt/druid/var
+    depends_on:
+      - zookeeper
+      - postgres
+      - coordinator
+      - chmod-service
+    ports:
+      - "8091:8091"
+      - "8100-8105:8100-8105"
+    command:
+      - middleManager
+    env_file:
+      - environment
+
+  router:
+    image: apache/druid:24.0.1
+    volumes:
+      - router_var:/opt/druid/var
+    depends_on:
+      - zookeeper
+      - postgres
+      - coordinator
+      - chmod-service
+    ports:
+      - "8888:8888"
+    command:
+      - router
+    env_file:
+      - environment
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-druid-e2e/src/test/resources/environment
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-druid-e2e/src/test/resources/environment
new file mode 100644
index 0000000000..55737fcbcd
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-druid-e2e/src/test/resources/environment
@@ -0,0 +1,53 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Java tuning
+#DRUID_XMX=1g
+#DRUID_XMS=1g
+#DRUID_MAXNEWSIZE=250m
+#DRUID_NEWSIZE=250m
+#DRUID_MAXDIRECTMEMORYSIZE=6172m
+DRUID_SINGLE_NODE_CONF=nano-quickstart
+
+druid_emitter_logging_logLevel=debug
+
+druid_extensions_loadList=["druid-histogram", "druid-datasketches", 
"druid-lookups-cached-global", "postgresql-metadata-storage", 
"druid-multi-stage-query"]
+
+druid_zk_service_host=zookeeper
+
+druid_metadata_storage_host=
+druid_metadata_storage_type=postgresql
+druid_metadata_storage_connector_connectURI=jdbc:postgresql://postgres:5432/druid
+druid_metadata_storage_connector_user=druid
+druid_metadata_storage_connector_password=FoolishPassword
+
+druid_coordinator_balancer_strategy=cachingCost
+
+druid_indexer_runner_javaOptsArray=["-server", "-Xmx1g", "-Xms1g", 
"-XX:MaxDirectMemorySize=2g", "-Duser.timezone=UTC", "-Dfile.encoding=UTF-8", 
"-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager"]
+druid_indexer_fork_property_druid_processing_buffer_sizeBytes=256MiB
+
+druid_storage_type=local
+druid_storage_storageDirectory=/opt/druid/shared/segments
+druid_indexer_logs_type=file
+druid_indexer_logs_directory=/opt/druid/shared/indexing-logs
+
+druid_processing_numThreads=1
+druid_processing_numMergeBuffers=1
+druid_worker_capacity=1
+
+DRUID_LOG4J=<?xml version="1.0" encoding="UTF-8" ?><Configuration 
status="WARN"><Appenders><Console name="Console" 
target="SYSTEM_OUT"><PatternLayout pattern="%d{ISO8601} %p [%t] %c - 
%m%n"/></Console></Appenders><Loggers><Root level="info"><AppenderRef 
ref="Console"/></Root><Logger name="org.apache.druid.jetty.RequestLog" 
additivity="false" level="DEBUG"><AppenderRef 
ref="Console"/></Logger></Loggers></Configuration>
+
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-druid-e2e/src/test/resources/fakesource_to_druid.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-druid-e2e/src/test/resources/fakesource_to_druid.conf
new file mode 100644
index 0000000000..7944c7c990
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-druid-e2e/src/test/resources/fakesource_to_druid.conf
@@ -0,0 +1,69 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    result_table_name = "fake"
+    schema = {
+      fields {
+        c_boolean = boolean
+        c_timestamp = timestamp
+        c_string = string
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_decimal = "decimal(16, 1)"
+      }
+    }
+    rows = [
+      {
+        kind = INSERT
+        fields = [true, "2020-02-02T02:02:02", "NEW", 1, 2, 3, 4, 4.3, 5.3, 
6.3]
+      },
+      {
+        kind = INSERT
+        fields = [false, "2012-12-21T12:34:56", "AAA",  1, 1, 333, 323232, 
3.1, 9.33333, 99999.99999999]
+      },
+      {
+        kind = INSERT
+        fields = [true, "2016-03-12T11:29:33", "BBB",  1, 2, 672, 546782, 7.9, 
6.88888, 88888.45623489]
+      },
+      {
+        kind = INSERT
+        fields = [false, "2014-04-28T09:13:27", "CCC",  1, 1, 271, 683221, 
4.8, 4.45271, 79277.68219012]
+      }
+    ]
+  }
+}
+
+transform {
+}
+
+sink {
+  Druid {
+    coordinatorUrl = "localhost:8888"
+    datasource = "testDataSource"
+  }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
index 35a002fc4b..9f452425af 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
@@ -61,6 +61,7 @@
         <module>connector-hbase-e2e</module>
         <module>connector-web3j-e2e</module>
         <module>connector-maxcompute-e2e</module>
+        <module>connector-druid-e2e</module>
         <module>connector-google-firestore-e2e</module>
         <module>connector-rocketmq-e2e</module>
         <module>connector-file-obs-e2e</module>

Reply via email to