This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new c61342f9 [Improve](case) add customer doris container cluster (#491)
c61342f9 is described below

commit c61342f9651829c72f7828480c831dba52ad8ef3
Author: wudi <676366...@qq.com>
AuthorDate: Fri Sep 20 10:16:09 2024 +0800

    [Improve](case) add customer doris container cluster (#491)
---
 .../flink/container/AbstractContainerTestBase.java |   8 +-
 .../flink/container/instance/ContainerService.java |   2 +
 .../flink/container/instance/DorisContainer.java   |   5 +
 .../container/instance/DorisCustomerContainer.java | 132 +++++++++++++++++++++
 .../flink/container/instance/MySQLContainer.java   |   5 +
 .../apache/doris/flink/sink/DorisSinkITCase.java   |  51 +++++++-
 6 files changed, 196 insertions(+), 7 deletions(-)

diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractContainerTestBase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractContainerTestBase.java
index 61e0faac..5c7c151e 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractContainerTestBase.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractContainerTestBase.java
@@ -19,6 +19,7 @@ package org.apache.doris.flink.container;
 
 import org.apache.doris.flink.container.instance.ContainerService;
 import org.apache.doris.flink.container.instance.DorisContainer;
+import org.apache.doris.flink.container.instance.DorisCustomerContainer;
 import org.junit.BeforeClass;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -48,7 +49,8 @@ public abstract class AbstractContainerTestBase {
             LOG.info("The doris container has been started and is running 
status.");
             return;
         }
-        dorisContainerService = new DorisContainer();
+        Boolean customerEnv = 
Boolean.valueOf(System.getProperty("customer_env", "false"));
+        dorisContainerService = customerEnv ? new DorisCustomerContainer() : 
new DorisContainer();
         dorisContainerService.startContainer();
         LOG.info("Doris container was started.");
     }
@@ -74,9 +76,7 @@ public abstract class AbstractContainerTestBase {
     }
 
     protected String getDorisQueryUrl() {
-        return String.format(
-                "jdbc:mysql://%s:%s",
-                getDorisInstanceHost(), 
dorisContainerService.getMappedPort(9030));
+        return dorisContainerService.getJdbcUrl();
     }
 
     protected String getDorisInstanceHost() {
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/ContainerService.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/ContainerService.java
index 6ad1e3cd..684de5a0 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/ContainerService.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/ContainerService.java
@@ -29,6 +29,8 @@ public interface ContainerService {
 
     Connection getQueryConnection();
 
+    String getJdbcUrl();
+
     String getInstanceHost();
 
     Integer getMappedPort(int originalPort);
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/DorisContainer.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/DorisContainer.java
index 6af827b8..ef399d0d 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/DorisContainer.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/DorisContainer.java
@@ -115,6 +115,11 @@ public class DorisContainer implements ContainerService {
         }
     }
 
+    @Override
+    public String getJdbcUrl() {
+        return String.format(JDBC_URL, dorisContainer.getHost());
+    }
+
     @Override
     public String getInstanceHost() {
         return dorisContainer.getHost();
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/DorisCustomerContainer.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/DorisCustomerContainer.java
new file mode 100644
index 00000000..3d417303
--- /dev/null
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/DorisCustomerContainer.java
@@ -0,0 +1,132 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.container.instance;
+
+import org.apache.flink.util.Preconditions;
+
+import org.apache.doris.flink.exception.DorisRuntimeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+/** Using a custom Doris environment */
+public class DorisCustomerContainer implements ContainerService {
+    private static final Logger LOG = 
LoggerFactory.getLogger(DorisCustomerContainer.class);
+    private static final String JDBC_URL = "jdbc:mysql://%s:%s";
+
+    @Override
+    public void startContainer() {
+        LOG.info("Using doris customer containers env.");
+        checkParams();
+        if (!isRunning()) {
+            throw new DorisRuntimeException(
+                    "Backend is not alive. Please check the doris cluster.");
+        }
+    }
+
+    private void checkParams() {
+        Preconditions.checkArgument(
+                System.getProperty("doris_host") != null, "doris_host is 
required.");
+        Preconditions.checkArgument(
+                System.getProperty("doris_query_port") != null, 
"doris_query_port is required.");
+        Preconditions.checkArgument(
+                System.getProperty("doris_http_port") != null, 
"doris_http_port is required.");
+        Preconditions.checkArgument(
+                System.getProperty("doris_user") != null, "doris_user is 
required.");
+        Preconditions.checkArgument(
+                System.getProperty("doris_passwd") != null, "doris_passwd is 
required.");
+    }
+
+    @Override
+    public boolean isRunning() {
+        try (Connection conn = getQueryConnection();
+                Statement stmt = conn.createStatement()) {
+            ResultSet showBackends = stmt.executeQuery("show backends");
+            while (showBackends.next()) {
+                String isAlive = showBackends.getString("Alive").trim();
+                if (Boolean.toString(true).equalsIgnoreCase(isAlive)) {
+                    return true;
+                }
+            }
+        } catch (SQLException e) {
+            LOG.error("Failed to connect doris cluster.", e);
+            return false;
+        }
+        return false;
+    }
+
+    @Override
+    public Connection getQueryConnection() {
+        LOG.info("Try to get query connection from doris.");
+        String jdbcUrl =
+                String.format(
+                        JDBC_URL,
+                        System.getProperty("doris_host"),
+                        System.getProperty("doris_query_port"));
+        try {
+            return DriverManager.getConnection(jdbcUrl, getUsername(), 
getPassword());
+        } catch (SQLException e) {
+            LOG.info("Failed to get doris query connection. jdbcUrl={}", 
jdbcUrl, e);
+            throw new DorisRuntimeException(e);
+        }
+    }
+
+    @Override
+    public String getJdbcUrl() {
+        return String.format(
+                JDBC_URL, System.getProperty("doris_host"), 
System.getProperty("doris_query_port"));
+    }
+
+    @Override
+    public String getInstanceHost() {
+        return System.getProperty("doris_host");
+    }
+
+    @Override
+    public Integer getMappedPort(int originalPort) {
+        return originalPort;
+    }
+
+    @Override
+    public String getUsername() {
+        return System.getProperty("doris_user");
+    }
+
+    @Override
+    public String getPassword() {
+        return System.getProperty("doris_passwd");
+    }
+
+    @Override
+    public String getFenodes() {
+        return System.getProperty("doris_host") + ":" + 
System.getProperty("doris_http_port");
+    }
+
+    @Override
+    public String getBenodes() {
+        return null;
+    }
+
+    @Override
+    public void close() {}
+}
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/MySQLContainer.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/MySQLContainer.java
index 21b30e81..4e50ac64 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/MySQLContainer.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/MySQLContainer.java
@@ -92,6 +92,11 @@ public class MySQLContainer implements ContainerService {
         }
     }
 
+    @Override
+    public String getJdbcUrl() {
+        return mysqlcontainer.getJdbcUrl();
+    }
+
     @Override
     public void close() {
         LOG.info("Stopping MySQL container.");
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java
index 877074ed..80986ea3 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java
@@ -27,6 +27,7 @@ import 
org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.StringUtils;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.doris.flink.cfg.DorisExecutionOptions;
@@ -63,6 +64,7 @@ public class DorisSinkITCase extends AbstractITCaseService {
     static final String TABLE_CSV = "tbl_csv";
     static final String TABLE_JSON = "tbl_json";
     static final String TABLE_JSON_TBL = "tbl_json_tbl";
+    static final String TABLE_TBL_AUTO_REDIRECT = "tbl_tbl_auto_redirect";
     static final String TABLE_CSV_BATCH_TBL = "tbl_csv_batch_tbl";
     static final String TABLE_CSV_BATCH_DS = "tbl_csv_batch_DS";
     static final String TABLE_GROUP_COMMIT = "tbl_group_commit";
@@ -177,8 +179,6 @@ public class DorisSinkITCase extends AbstractITCaseService {
                                 + DorisConfigOptions.IDENTIFIER
                                 + "',"
                                 + " 'fenodes' = '%s',"
-                                + " 'benodes' = '%s',"
-                                + " 'auto-redirect' = 'false',"
                                 + " 'table.identifier' = '%s',"
                                 + " 'username' = '%s',"
                                 + " 'password' = '%s',"
@@ -196,7 +196,6 @@ public class DorisSinkITCase extends AbstractITCaseService {
                                 + "'"
                                 + ")",
                         getFenodes(),
-                        getBenodes(),
                         DATABASE + "." + TABLE_JSON_TBL,
                         getDorisUsername(),
                         getDorisPassword());
@@ -210,6 +209,52 @@ public class DorisSinkITCase extends AbstractITCaseService 
{
         ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected, 
query, 2);
     }
 
+    @Test
+    public void testTableSinkAutoRedirectFalse() throws Exception {
+        if (StringUtils.isNullOrWhitespaceOnly(getBenodes())) {
+            LOG.info("benodes is empty, skip the test.");
+            return;
+        }
+        initializeTable(TABLE_TBL_AUTO_REDIRECT);
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(DEFAULT_PARALLELISM);
+        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+        final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+        String sinkDDL =
+                String.format(
+                        "CREATE TABLE doris_sink ("
+                                + " name STRING,"
+                                + " age INT"
+                                + ") WITH ("
+                                + " 'connector' = '"
+                                + DorisConfigOptions.IDENTIFIER
+                                + "',"
+                                + " 'fenodes' = '%s',"
+                                + " 'benodes' = '%s',"
+                                + " 'auto-redirect' = 'false',"
+                                + " 'table.identifier' = '%s',"
+                                + " 'username' = '%s',"
+                                + " 'password' = '%s',"
+                                + " 'sink.label-prefix' = 'doris_sink"
+                                + UUID.randomUUID()
+                                + "'"
+                                + ")",
+                        getFenodes(),
+                        getBenodes(),
+                        DATABASE + "." + TABLE_TBL_AUTO_REDIRECT,
+                        getDorisUsername(),
+                        getDorisPassword());
+        tEnv.executeSql(sinkDDL);
+        tEnv.executeSql("INSERT INTO doris_sink SELECT 'doris',1 union all 
SELECT 'flink',2");
+
+        Thread.sleep(10000);
+        List<String> expected = Arrays.asList("doris,1", "flink,2");
+        String query =
+                String.format("select name,age from %s.%s order by 1", 
DATABASE, TABLE_JSON_TBL);
+        ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected, 
query, 2);
+    }
+
     @Test
     public void testTableBatch() throws Exception {
         initializeTable(TABLE_CSV_BATCH_TBL);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to