This is an automated email from the ASF dual-hosted git repository.

vernedeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 7be09a4222 [INLONG-11495][Sort] Add end-to-end test for Kafka 
connector v1.18 (#11554)
7be09a4222 is described below

commit 7be09a42226a251efcdd2db8f86c84715ddb8293
Author: Hengyuan <zhanghengyuan1...@outlook.com>
AuthorDate: Mon Dec 2 12:48:05 2024 +0800

    [INLONG-11495][Sort] Add end-to-end test for Kafka connector v1.18 (#11554)
---
 .../sort-end-to-end-tests-v1.18/pom.xml            |  46 +++-
 .../sort/tests/Kafka2Elasticsearch7Test.java       | 248 +++++++++++++++++++++
 .../test/resources/env/kafka_test_kafka_init.txt   |   1 +
 .../resources/flinkSql/kafka_to_elasticsearch.sql  |  24 ++
 .../org.apache.flink.table.factories.Factory       |   4 +-
 5 files changed, 314 insertions(+), 9 deletions(-)

diff --git 
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/pom.xml 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/pom.xml
index 78e19a16eb..850d5dddfd 100644
--- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/pom.xml
+++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/pom.xml
@@ -31,8 +31,8 @@
     <properties>
         
<inlong.root.dir>${project.parent.parent.parent.basedir}</inlong.root.dir>
         <flink.version>1.18.1</flink.version>
-        <elasticsearch.version>6.8.17</elasticsearch.version>
         
<flink.shaded.jackson.version>2.15.3-18.0</flink.shaded.jackson.version>
+        <kafka.clients.version>3.7.1</kafka.clients.version>
     </properties>
 
     <dependencies>
@@ -51,6 +51,10 @@
             <artifactId>postgresql</artifactId>
             <version>${testcontainers.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>kafka</artifactId>
+        </dependency>
         <dependency>
             <groupId>org.postgresql</groupId>
             <artifactId>postgresql</artifactId>
@@ -61,16 +65,11 @@
             <artifactId>elasticsearch</artifactId>
             <version>${testcontainers.version}</version>
         </dependency>
-        <dependency>
-            <groupId>org.elasticsearch.client</groupId>
-            <artifactId>elasticsearch-rest-high-level-client</artifactId>
-            <version>${elasticsearch.version}</version>
-        </dependency>
         <!-- 
https://mvnrepository.com/artifact/org.elasticsearch.client/elasticsearch-rest-client
 -->
         <dependency>
             <groupId>org.elasticsearch.client</groupId>
             <artifactId>elasticsearch-rest-client</artifactId>
-            <version>${elasticsearch.version}</version>
+            <version>${elasticsearch7.version}</version>
         </dependency>
 
         <dependency>
@@ -142,6 +141,17 @@
             <version>${flink.version}</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+            <version>${kafka.clients.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>co.elastic.clients</groupId>
+            <artifactId>elasticsearch-java</artifactId>
+            <version>${elasticsearch7.version}</version>
+        </dependency>
     </dependencies>
     <build>
         <plugins>
@@ -158,6 +168,23 @@
                             <type>jar</type>
                             
<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
                         </artifactItem>
+                        <artifactItem>
+                            <groupId>org.apache.inlong</groupId>
+                            <artifactId>sort-connector-kafka-v1.18</artifactId>
+                            <version>${project.version}</version>
+                            
<destFileName>sort-connector-kafka.jar</destFileName>
+                            <type>jar</type>
+                            
<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
+                        </artifactItem>
+                        <artifactItem>
+                            <groupId>org.apache.inlong</groupId>
+                            
<artifactId>sort-connector-elasticsearch7-v1.18</artifactId>
+                            <version>${project.version}</version>
+                            
<destFileName>sort-connector-elasticsearch7.jar</destFileName>
+                            <type>jar</type>
+                            
<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
+                        </artifactItem>
+
                     </artifactItems>
                 </configuration>
                 <executions>
@@ -203,6 +230,11 @@
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-surefire-plugin</artifactId>
                 <version>${plugin.surefire.version}</version>
+                <configuration>
+                    <systemPropertyVariables>
+                        
<log4j.configurationFile>src/test/resources/log4j2-test.properties</log4j.configurationFile>
+                    </systemPropertyVariables>
+                </configuration>
             </plugin>
         </plugins>
     </build>
diff --git 
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/test/java/org/apache/inlong/sort/tests/Kafka2Elasticsearch7Test.java
 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/test/java/org/apache/inlong/sort/tests/Kafka2Elasticsearch7Test.java
new file mode 100644
index 0000000000..0cf94a0663
--- /dev/null
+++ 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/test/java/org/apache/inlong/sort/tests/Kafka2Elasticsearch7Test.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.tests;
+
+import org.apache.inlong.sort.tests.utils.FlinkContainerTestEnvJRE8;
+import org.apache.inlong.sort.tests.utils.PlaceholderResolver;
+import org.apache.inlong.sort.tests.utils.TestUtils;
+
+import co.elastic.clients.elasticsearch.ElasticsearchClient;
+import co.elastic.clients.json.jackson.JacksonJsonpMapper;
+import co.elastic.clients.transport.rest_client.RestClientTransport;
+import org.apache.http.HttpHost;
+import org.elasticsearch.client.RestClient;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container.ExecResult;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
+import org.testcontainers.utility.DockerImageName;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+public class Kafka2Elasticsearch7Test extends FlinkContainerTestEnvJRE8 {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(Kafka2Elasticsearch7Test.class);
+    public static final Logger KAFKA_LOG = 
LoggerFactory.getLogger(KafkaContainer.class);
+    public static final Logger ELASTICSEARCH_LOGGER = 
LoggerFactory.getLogger(ElasticsearchContainer.class);
+
+    private static final Path kafkaJar = 
TestUtils.getResource("sort-connector-kafka.jar");
+    private static final Path elasticsearchJar = 
TestUtils.getResource("sort-connector-elasticsearch7.jar");
+
+    private static final int ELASTICSEARCH_DEFAULT_PORT = 9200;
+
+    private static final String FIRST_KAFKA_MESSAGE = "{\"message\":\"Hello 
From Kafka\"}";
+    private static final String SECOND_KAFKA_MESSAGE = "{\"message\":\"Goodbye 
From ElasticSearch\"}";
+
+    private static final String FIRST_EXPECTED_MESSAGE = "Hello From Kafka";
+    private static final String SECOND_EXPECTED_MESSAGE = "Goodbye From 
ElasticSearch";
+
+    private static final String sqlFile;
+
+    static {
+        try {
+            sqlFile = Paths
+                    
.get(Kafka2Elasticsearch7Test.class.getResource("/flinkSql/kafka_to_elasticsearch.sql").toURI())
+                    .toString();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @ClassRule
+    public static final KafkaContainer KAFKA =
+            new 
KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))
+                    .withNetwork(NETWORK)
+                    .withNetworkAliases("kafka")
+                    .withEmbeddedZookeeper()
+                    .withLogConsumer(new Slf4jLogConsumer(KAFKA_LOG));
+
+    @ClassRule
+    public static final ElasticsearchContainer ELASTICSEARCH =
+            new 
ElasticsearchContainer(DockerImageName.parse("docker.elastic.co/elasticsearch/elasticsearch:7.17.13"))
+                    .withNetwork(NETWORK)
+                    .withNetworkAliases("elasticsearch")
+                    .withLogConsumer(new 
Slf4jLogConsumer(ELASTICSEARCH_LOGGER));
+
+    @Before
+    public void setup() throws IOException {
+        waitUntilJobRunning(Duration.ofSeconds(30));
+        initializeKafkaTopic("test-topic");
+        initializeElasticsearchIndex();
+    }
+
+    private void initializeKafkaTopic(String topic) {
+        String fileName = "kafka_test_kafka_init.txt";
+        int port = KafkaContainer.ZOOKEEPER_PORT;
+
+        Map<String, Object> properties = new HashMap<>();
+        properties.put("TOPIC", topic);
+        properties.put("ZOOKEEPER_PORT", port);
+
+        try {
+            String createKafkaStatement = getCreateStatement(fileName, 
properties);
+            ExecResult result = KAFKA.execInContainer("bash", "-c", 
createKafkaStatement);
+            LOG.info("Create kafka topic: {}, std: {}", createKafkaStatement, 
result.getStdout());
+            if (result.getExitCode() != 0) {
+                throw new RuntimeException("Init kafka topic failed. Exit 
code:" + result.getExitCode());
+            }
+        } catch (IOException | InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private String getCreateStatement(String fileName, Map<String, Object> 
properties) {
+        URL url = 
Objects.requireNonNull(Kafka2Elasticsearch7Test.class.getResource("/env/" + 
fileName));
+
+        try {
+            Path file = Paths.get(url.toURI());
+            return PlaceholderResolver.getDefaultResolver().resolveByMap(
+                    new String(Files.readAllBytes(file), 
StandardCharsets.UTF_8),
+                    properties);
+        } catch (IOException | URISyntaxException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private void initializeElasticsearchIndex() throws IOException {
+        RestClient restClient = RestClient.builder(
+                new HttpHost("localhost", 
ELASTICSEARCH.getMappedPort(ELASTICSEARCH_DEFAULT_PORT), "http"))
+                .build();
+        RestClientTransport transport = new RestClientTransport(restClient, 
new JacksonJsonpMapper());
+        ElasticsearchClient client = new ElasticsearchClient(transport);
+
+        client.indices().create(c -> c.index("test-index"));
+        LOG.info("Created Elasticsearch index: test-index");
+    }
+
+    @AfterClass
+    public static void teardown() {
+        if (KAFKA != null) {
+            KAFKA.stop();
+        }
+        if (ELASTICSEARCH != null) {
+            ELASTICSEARCH.stop();
+        }
+    }
+
+    @Test
+    public void testKafkaToElasticsearch() throws Exception {
+        submitSQLJob(sqlFile, kafkaJar, elasticsearchJar);
+        waitUntilJobRunning(Duration.ofSeconds(10));
+
+        // Produce messages to Kafka
+        org.apache.kafka.clients.producer.KafkaProducer<String, String> 
producer =
+                new 
org.apache.kafka.clients.producer.KafkaProducer<>(getKafkaProducerConfig());
+        producer.send(
+                new 
org.apache.kafka.clients.producer.ProducerRecord<>("test-topic", "key1", 
FIRST_KAFKA_MESSAGE));
+        producer.send(
+                new 
org.apache.kafka.clients.producer.ProducerRecord<>("test-topic", "key2", 
SECOND_KAFKA_MESSAGE));
+
+        // Query Elasticsearch to verify data is ingested
+        RestClient restClient = RestClient.builder(
+                new HttpHost("localhost", ELASTICSEARCH.getMappedPort(9200), 
"http"))
+                .build();
+        RestClientTransport transport = new RestClientTransport(restClient, 
new JacksonJsonpMapper());
+        ElasticsearchClient client = new ElasticsearchClient(transport);
+
+        List<String> messages = new ArrayList<>();
+        int maxRetries = 10; // Maximum number of retries (10 seconds)
+        int retryCount = 0;
+
+        while (retryCount < maxRetries) {
+            co.elastic.clients.elasticsearch.core.SearchRequest searchRequest =
+                    new 
co.elastic.clients.elasticsearch.core.SearchRequest.Builder()
+                            .index("test-index")
+                            .query(q -> q.matchAll(m -> m))
+                            .build();
+
+            co.elastic.clients.elasticsearch.core.SearchResponse<Map> response 
=
+                    client.search(searchRequest, Map.class);
+
+            // Extract `message` fields using Elasticsearch Java API
+            messages = response.hits().hits().stream()
+                    .map(hit -> {
+                        @SuppressWarnings("unchecked")
+                        Map<String, Object> source = hit.source();
+                        if (source != null && source.containsKey("message")) {
+                            return (String) source.get("message");
+                        }
+                        return null;
+                    })
+                    .filter(Objects::nonNull) // Remove null values
+                    .collect(Collectors.toList());
+
+            if (!messages.isEmpty()) {
+                // Stop polling if data is found
+                break;
+            }
+
+            // Wait for 1 second before retrying
+            Thread.sleep(1000);
+            retryCount++;
+        }
+
+        if (messages.isEmpty()) {
+            throw new AssertionError("Elasticsearch validation failed: No 
messages found after polling.");
+        }
+
+        LOG.info("Extracted messages from Elasticsearch: {}", messages);
+
+        // Create expected messages list
+        List<String> expectedMessages = new ArrayList<>();
+        expectedMessages.add(FIRST_EXPECTED_MESSAGE);
+        expectedMessages.add(SECOND_EXPECTED_MESSAGE);
+
+        // Validate messages against the expected messages
+        if (new HashSet<>(messages).equals(new HashSet<>(expectedMessages))) {
+            LOG.info("Elasticsearch contains all expected messages: {}", 
expectedMessages);
+        } else {
+            throw new AssertionError(
+                    String.format("Elasticsearch validation failed. Expected: 
%s, Found: %s", expectedMessages,
+                            messages));
+        }
+    }
+
+    private java.util.Properties getKafkaProducerConfig() {
+        java.util.Properties props = new java.util.Properties();
+        String bootstrapServers = KAFKA.getBootstrapServers();
+        props.put("bootstrap.servers", bootstrapServers);
+        props.put("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
+        props.put("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
+        return props;
+    }
+}
diff --git 
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/test/resources/env/kafka_test_kafka_init.txt
 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/test/resources/env/kafka_test_kafka_init.txt
new file mode 100644
index 0000000000..b2f31d78fa
--- /dev/null
+++ 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/test/resources/env/kafka_test_kafka_init.txt
@@ -0,0 +1 @@
+kafka-topics --create --topic ${TOPIC} --replication-factor 1 --partitions 1 
--zookeeper localhost:${ZOOKEEPER_PORT}
\ No newline at end of file
diff --git 
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/test/resources/flinkSql/kafka_to_elasticsearch.sql
 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/test/resources/flinkSql/kafka_to_elasticsearch.sql
new file mode 100644
index 0000000000..77cdeb8cae
--- /dev/null
+++ 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/test/resources/flinkSql/kafka_to_elasticsearch.sql
@@ -0,0 +1,24 @@
+CREATE TABLE kafka_source (
+    `message` STRING
+) WITH (
+    'connector' = 'kafka-inlong',
+    'topic' = 'test-topic',
+    'properties.bootstrap.servers' = 'kafka:9092',
+    'properties.group.id' = 'flink-group',
+    'scan.startup.mode' = 'earliest-offset',
+    'format' = 'json'
+);
+
+
+CREATE TABLE elasticsearch_sink (
+    `message` STRING
+) WITH (
+    'connector' = 'elasticsearch7-inlong',
+    'hosts' = 'http://elasticsearch:9200',
+    'index' = 'test-index',
+    'format' = 'json'
+);
+
+
+INSERT INTO elasticsearch_sink
+SELECT * FROM kafka_source;
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/kafka/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/kafka/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
index 9b8bf8e042..5f3c21a9d4 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/kafka/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/kafka/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -13,5 +13,5 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory
-org.apache.flink.streaming.connectors.kafka.table.UpsertKafkaDynamicTableFactory
+org.apache.inlong.sort.kafka.table.KafkaDynamicTableFactory
+org.apache.inlong.sort.kafka.table.UpsertKafkaDynamicTableFactory

Reply via email to