This is an automated email from the ASF dual-hosted git repository.

aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new e97615d56d [INLONG-11266][Sort] Add end-to-end test case for 
sort-connector-pulsar-v1.15 (#11268)
e97615d56d is described below

commit e97615d56da6e3e2c2de75e02d72e47324cc6838
Author: PeterZh6 <zhanghengyuan1...@outlook.com>
AuthorDate: Wed Oct 9 19:30:35 2024 +0800

    [INLONG-11266][Sort] Add end-to-end test case for 
sort-connector-pulsar-v1.15 (#11268)
---
 .../sort-end-to-end-tests-v1.15/pom.xml            |  20 +++
 .../inlong/sort/tests/Pulsar2StarRocksTest.java    | 149 +++++++++++++++++++++
 .../src/test/resources/flinkSql/pulsar_test.sql    |  39 ++++++
 pom.xml                                            |   6 +
 4 files changed, 214 insertions(+)

diff --git 
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/pom.xml 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/pom.xml
index cfaaee3f65..57d8d16d7a 100644
--- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/pom.xml
+++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/pom.xml
@@ -51,6 +51,11 @@
             <groupId>org.testcontainers</groupId>
             <artifactId>kafka</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>pulsar</artifactId>
+            <scope>test</scope>
+        </dependency>
         <dependency>
             <groupId>org.testcontainers</groupId>
             <artifactId>mongodb</artifactId>
@@ -157,6 +162,13 @@
             <artifactId>jedis</artifactId>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.pulsar</groupId>
+            <artifactId>pulsar-client</artifactId>
+            <version>${pulsar.version}</version>
+        </dependency>
+
     </dependencies>
 
     <build>
@@ -246,6 +258,14 @@
                             <type>jar</type>
                             
<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
                         </artifactItem>
+                        <artifactItem>
+                            <groupId>org.apache.inlong</groupId>
+                            
<artifactId>sort-connector-pulsar-v1.15</artifactId>
+                            <version>${project.version}</version>
+                            
<destFileName>sort-connector-pulsar.jar</destFileName>
+                            <type>jar</type>
+                            
<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
+                        </artifactItem>
                     </artifactItems>
                 </configuration>
                 <executions>
diff --git 
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Pulsar2StarRocksTest.java
 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Pulsar2StarRocksTest.java
new file mode 100644
index 0000000000..e5252d0b4a
--- /dev/null
+++ 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Pulsar2StarRocksTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.tests;
+
+import org.apache.inlong.sort.tests.utils.FlinkContainerTestEnvJRE8;
+import org.apache.inlong.sort.tests.utils.JdbcProxy;
+import org.apache.inlong.sort.tests.utils.StarRocksContainer;
+import org.apache.inlong.sort.tests.utils.TestUtils;
+
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.PulsarContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.utility.DockerImageName;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+import static 
org.apache.inlong.sort.tests.utils.StarRocksManager.INTER_CONTAINER_STAR_ROCKS_ALIAS;
+import static 
org.apache.inlong.sort.tests.utils.StarRocksManager.STAR_ROCKS_LOG;
+import static 
org.apache.inlong.sort.tests.utils.StarRocksManager.getNewStarRocksImageName;
+import static 
org.apache.inlong.sort.tests.utils.StarRocksManager.initializeStarRocksTable;
+
+public class Pulsar2StarRocksTest extends FlinkContainerTestEnvJRE8 {
+
+    private static final String PULSAR_TEST_FIRST_MESSAGE =
+            "{\"id\": 1, \"name\": \"Alice\", \"description\": \"Hello, 
Pulsar\"}";
+    private static final String PULSAR_TEST_SECOND_MESSAGE =
+            "{\"id\": 2, \"name\": \"Bob\", \"description\": \"Goodbye, 
Pulsar\"}";
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(Pulsar2StarRocksTest.class);
+
+    public static final Logger PULSAR_LOG = 
LoggerFactory.getLogger(PulsarContainer.class);
+    private static final Path jdbcJar = 
TestUtils.getResource("sort-connector-starrocks.jar");
+    private static final Path mysqlJdbcJar = 
TestUtils.getResource("mysql-driver.jar");
+
+    private static final Path pulsarJar = 
TestUtils.getResource("sort-connector-pulsar.jar");
+
+    private static final String sqlFile;
+
+    static {
+        try {
+            URI pulsarSqlFile = Objects
+                    
.requireNonNull(Pulsar2StarRocksTest.class.getResource("/flinkSql/pulsar_test.sql")).toURI();
+            sqlFile = Paths.get(pulsarSqlFile).toString();
+        } catch (URISyntaxException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @ClassRule
+    public static final PulsarContainer PULSAR = new 
PulsarContainer(DockerImageName.parse("apachepulsar/pulsar:2.8.2"))
+            .withNetwork(NETWORK)
+            .withNetworkAliases("pulsar")
+            .withLogConsumer(new Slf4jLogConsumer(PULSAR_LOG));
+
+    @ClassRule
+    public static final StarRocksContainer STAR_ROCKS = (StarRocksContainer) 
new StarRocksContainer(
+            getNewStarRocksImageName())
+                    .withExposedPorts(9030, 8030, 8040)
+                    .withNetwork(NETWORK)
+                    .withNetworkAliases(INTER_CONTAINER_STAR_ROCKS_ALIAS)
+                    .withLogConsumer(new Slf4jLogConsumer(STAR_ROCKS_LOG));
+
+    @Before
+    public void setup() {
+        waitUntilJobRunning(Duration.ofSeconds(30));
+        initializePulsarTopic();
+        initializeStarRocksTable(STAR_ROCKS);
+    }
+
+    private void initializePulsarTopic() {
+        try {
+            Container.ExecResult result = 
PULSAR.execInContainer("bin/pulsar-admin", "topics", "create",
+                    "persistent://public/default/test-topic");
+            LOG.info("Create Pulsar topic: test-topic, std: {}", 
result.getStdout());
+            if (result.getExitCode() != 0) {
+                throw new RuntimeException("Init Pulsar topic failed. Exit 
code:" + result.getExitCode());
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @AfterClass
+    public static void teardown() {
+        if (PULSAR != null) {
+            PULSAR.stop();
+        }
+        if (STAR_ROCKS != null) {
+            STAR_ROCKS.stop();
+        }
+    }
+
+    @Test
+    public void testPulsarToStarRocks() throws Exception {
+        submitSQLJob(sqlFile, pulsarJar, jdbcJar, mysqlJdbcJar);
+        waitUntilJobRunning(Duration.ofSeconds(10));
+
+        try (PulsarClient pulsarClient = 
PulsarClient.builder().serviceUrl(PULSAR.getPulsarBrokerUrl()).build()) {
+            Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                    .topic("persistent://public/default/test-topic")
+                    .create();
+
+            producer.send(PULSAR_TEST_FIRST_MESSAGE);
+            producer.send(PULSAR_TEST_SECOND_MESSAGE);
+
+            producer.close();
+        }
+
+        JdbcProxy proxy = new JdbcProxy(STAR_ROCKS.getJdbcUrl(), 
STAR_ROCKS.getUsername(),
+                STAR_ROCKS.getPassword(), STAR_ROCKS.getDriverClassName());
+
+        List<String> expectedResult = Arrays.asList(
+                "1,Alice,Hello, Pulsar",
+                "2,Bob,Goodbye, Pulsar");
+        proxy.checkResultWithTimeout(expectedResult, "test_output1", 3, 
60000L);
+    }
+
+}
diff --git 
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/flinkSql/pulsar_test.sql
 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/flinkSql/pulsar_test.sql
new file mode 100644
index 0000000000..1e9ac0e1c2
--- /dev/null
+++ 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/flinkSql/pulsar_test.sql
@@ -0,0 +1,39 @@
+CREATE TABLE test_input1
+(
+    `id` INT,
+    name STRING,
+    description STRING
+)
+WITH (
+    'connector' = 'pulsar-inlong',
+    'topic' = 'persistent://public/default/test-topic',
+    'service-url' = 'pulsar://pulsar:6650',
+    'admin-url' = 'http://pulsar:8080',
+    'format' = 'json',
+    'scan.startup.mode' = 'earliest'
+);
+
+
+CREATE TABLE test_output1
+(
+    id INT,
+    name STRING,
+    description STRING
+)
+     WITH (
+    'connector' = 'starrocks-inlong',
+    'jdbc-url' = 'jdbc:mysql://starrocks:9030',
+    'load-url'='starrocks:8030',
+    'database-name'='test',
+    'table-name' = 'test_output1',
+    'username' = 'inlong',
+    'password' = 'inlong',
+    'sink.properties.format' = 'json',
+    'sink.properties.strip_outer_array' = 'true',
+    'sink.buffer-flush.interval-ms' = '1000'
+);
+
+
+INSERT INTO test_output1
+SELECT id, name, description
+FROM test_input1;
diff --git a/pom.xml b/pom.xml
index a313eb73ee..21f4f74613 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1200,6 +1200,12 @@
                 <version>${testcontainers.version}</version>
                 <scope>test</scope>
             </dependency>
+            <dependency>
+                <groupId>org.testcontainers</groupId>
+                <artifactId>pulsar</artifactId>
+                <version>${testcontainers.version}</version>
+                <scope>test</scope>
+            </dependency>
 
             <dependency>
                 <groupId>org.hamcrest</groupId>

Reply via email to