This is an automated email from the ASF dual-hosted git repository.

zykkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 6f5f5a5  [test](itcase) Add itcase for connector (#175)
6f5f5a5 is described below

commit 6f5f5a5de4b62516e019580d71883255020369c4
Author: wudi <676366...@qq.com>
AuthorDate: Thu Dec 28 16:45:08 2023 +0800

    [test](itcase) Add itcase for connector (#175)
---
 .github/workflows/run-itcase-12.yml                |  44 ++++++
 .github/workflows/run-itcase-20.yml                |  44 ++++++
 spark-doris-connector/pom.xml                      |  13 ++
 .../java/org/apache/doris/spark/DorisTestBase.java | 137 +++++++++++++++++++
 .../apache/doris/spark/sql/DorisReaderITCase.scala | 124 +++++++++++++++++
 .../apache/doris/spark/sql/DorisWriterITCase.scala | 151 +++++++++++++++++++++
 6 files changed, 513 insertions(+)

diff --git a/.github/workflows/run-itcase-12.yml 
b/.github/workflows/run-itcase-12.yml
new file mode 100644
index 0000000..87a9ba8
--- /dev/null
+++ b/.github/workflows/run-itcase-12.yml
@@ -0,0 +1,44 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+---
+name: Run ITCases 1.2
+on:
+  pull_request:
+  push:
+
+jobs:
+  build-extension:
+    name: "Run ITCases 1.2"
+    runs-on: ubuntu-latest
+    defaults:
+      run:
+        shell: bash
+    steps:
+    - name: Checkout
+      uses: actions/checkout@master
+
+    - name: Setup java
+      uses: actions/setup-java@v2
+      with:
+        distribution: adopt
+        java-version: '8'
+
+    - name: Run ITCases
+      run: |
+        cd spark-doris-connector && mvn test -Dtest="*ITCase" 
-Dimage="adamlee489/doris:1.2.7.1_x86"
+
diff --git a/.github/workflows/run-itcase-20.yml 
b/.github/workflows/run-itcase-20.yml
new file mode 100644
index 0000000..f972c36
--- /dev/null
+++ b/.github/workflows/run-itcase-20.yml
@@ -0,0 +1,44 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+---
+name: Run ITCases 2.0
+on:
+  pull_request:
+  push:
+
+jobs:
+  build-extension:
+    name: "Run ITCases 2.0"
+    runs-on: ubuntu-latest
+    defaults:
+      run:
+        shell: bash
+    steps:
+    - name: Checkout
+      uses: actions/checkout@master
+
+    - name: Setup java
+      uses: actions/setup-java@v2
+      with:
+        distribution: adopt
+        java-version: '8'
+
+    - name: Run ITCases
+      run: |
+        cd spark-doris-connector && mvn test -Dtest="*ITCase" 
-Dimage="adamlee489/doris:2.0.3"
+
diff --git a/spark-doris-connector/pom.xml b/spark-doris-connector/pom.xml
index 89e1716..1dd5ade 100644
--- a/spark-doris-connector/pom.xml
+++ b/spark-doris-connector/pom.xml
@@ -78,6 +78,7 @@
         <netty.version>4.1.77.Final</netty.version>
         <fasterxml.jackson.version>2.10.5</fasterxml.jackson.version>
         <thrift-service.version>1.0.0</thrift-service.version>
+        <testcontainers.version>1.17.6</testcontainers.version>
     </properties>
 
     <dependencies>
@@ -199,6 +200,18 @@
             <scope>test</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>testcontainers</artifactId>
+            <version>${testcontainers.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.awaitility</groupId>
+            <artifactId>awaitility</artifactId>
+            <version>4.2.0</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/spark-doris-connector/src/test/java/org/apache/doris/spark/DorisTestBase.java 
b/spark-doris-connector/src/test/java/org/apache/doris/spark/DorisTestBase.java
new file mode 100644
index 0000000..9b3989a
--- /dev/null
+++ 
b/spark-doris-connector/src/test/java/org/apache/doris/spark/DorisTestBase.java
@@ -0,0 +1,137 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.spark;
+
+import com.google.common.collect.Lists;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.LockSupport;
+import java.util.stream.Stream;
+
+import static org.awaitility.Awaitility.given;
+import static org.awaitility.Durations.ONE_SECOND;
+
+public abstract class DorisTestBase {
+    protected static final Logger LOG = 
LoggerFactory.getLogger(DorisTestBase.class);
+    protected static final String DORIS_DOCKER_IMAGE = 
System.getProperty("image");
+    private static final String DRIVER_JAR =
+            
"https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.16/mysql-connector-java-8.0.16.jar";;
+    protected static final String DRIVER_CLASS = "com.mysql.cj.jdbc.Driver";
+    protected static final String URL = "jdbc:mysql://%s:9030";
+    protected static final String USERNAME = "root";
+    public static final String PASSWORD = "";
+    protected static final GenericContainer DORIS_CONTAINER = 
createDorisContainer();
+    protected static Connection connection;
+
+    protected static String getFenodes() {
+        return DORIS_CONTAINER.getHost() + ":8030";
+    }
+
+    @BeforeClass
+    public static void startContainers() {
+        LOG.info("Starting containers...");
+        Startables.deepStart(Stream.of(DORIS_CONTAINER)).join();
+        given().ignoreExceptions()
+                .await()
+                .atMost(300, TimeUnit.SECONDS)
+                .pollInterval(ONE_SECOND)
+                .untilAsserted(DorisTestBase::initializeJdbcConnection);
+        LOG.info("Containers are started.");
+    }
+
+    @AfterClass
+    public static void stopContainers() {
+        LOG.info("Stopping containers...");
+        DORIS_CONTAINER.stop();
+        LOG.info("Containers are stopped.");
+    }
+
+    public static GenericContainer createDorisContainer() {
+        GenericContainer container =
+                new GenericContainer<>(DORIS_DOCKER_IMAGE)
+                        .withNetwork(Network.newNetwork())
+                        .withNetworkAliases("DorisContainer")
+                        .withEnv("FE_SERVERS", "fe1:127.0.0.1:9010")
+                        .withEnv("FE_ID", "1")
+                        .withEnv("CURRENT_BE_IP", "127.0.0.1")
+                        .withEnv("CURRENT_BE_PORT", "9050")
+                        .withCommand("ulimit -n 65536")
+                        .withCreateContainerCmdModifier(
+                                cmd -> cmd.getHostConfig().withMemorySwap(0L))
+                        .withPrivilegedMode(true)
+                        .withLogConsumer(
+                                new Slf4jLogConsumer(
+                                        
DockerLoggerFactory.getLogger(DORIS_DOCKER_IMAGE)));
+
+        container.setPortBindings(
+                Lists.newArrayList(
+                        String.format("%s:%s", "8030", "8030"),
+                        String.format("%s:%s", "9030", "9030"),
+                        String.format("%s:%s", "9060", "9060"),
+                        String.format("%s:%s", "8040", "8040")));
+
+        return container;
+    }
+
+    protected static void initializeJdbcConnection() throws SQLException, 
MalformedURLException {
+        URLClassLoader urlClassLoader =
+                new URLClassLoader(
+                        new URL[] {new URL(DRIVER_JAR)}, 
DorisTestBase.class.getClassLoader());
+        LOG.info("Try to connect to Doris...");
+        Thread.currentThread().setContextClassLoader(urlClassLoader);
+        connection =
+                DriverManager.getConnection(
+                        String.format(URL, DORIS_CONTAINER.getHost()), 
USERNAME, PASSWORD);
+        try (Statement statement = connection.createStatement()) {
+            ResultSet resultSet;
+            do {
+                LOG.info("Wait for the Backend to start successfully...");
+                resultSet = statement.executeQuery("show backends");
+            } while (!isBeReady(resultSet, Duration.ofSeconds(1L)));
+        }
+        LOG.info("Connected to Doris successfully...");
+    }
+
+    private static boolean isBeReady(ResultSet rs, Duration duration) throws 
SQLException {
+        LockSupport.parkNanos(duration.toNanos());
+        if (rs.next()) {
+            String isAlive = rs.getString("Alive").trim();
+            String totalCap = rs.getString("TotalCapacity").trim();
+            return "true".equalsIgnoreCase(isAlive) && 
!"0.000".equalsIgnoreCase(totalCap);
+        }
+        return false;
+    }
+}
diff --git 
a/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/DorisReaderITCase.scala
 
b/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/DorisReaderITCase.scala
new file mode 100644
index 0000000..a4a288a
--- /dev/null
+++ 
b/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/DorisReaderITCase.scala
@@ -0,0 +1,124 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.spark.sql
+
+import org.apache.doris.spark.DorisTestBase
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.{SparkConf, SparkContext}
+import org.junit.Test
+
+import java.sql.Statement
+
+class DorisReaderITCase extends DorisTestBase {
+
+  val DATABASE: String = "test"
+  val TABLE_READ: String = "tbl_read"
+  val TABLE_READ_TBL: String = "tbl_read_tbl"
+
+  @Test
+  @throws[Exception]
+  def testRddSource(): Unit = {
+    initializeTable(TABLE_READ)
+
+    val sparkConf: SparkConf = new 
SparkConf().setMaster("local[*]").setAppName("rddSource")
+    val sc = new SparkContext(sparkConf)
+    import org.apache.doris.spark._
+    val dorisSparkRDD = sc.dorisRDD(
+      tableIdentifier = Some(DATABASE + "." + TABLE_READ),
+      cfg = Some(Map(
+        "doris.fenodes" -> DorisTestBase.getFenodes,
+        "doris.request.auth.user" -> DorisTestBase.USERNAME,
+        "doris.request.auth.password" -> DorisTestBase.PASSWORD
+      ))
+    )
+    import scala.collection.JavaConverters._
+    val result = dorisSparkRDD.collect().toList.asJava
+    sc.stop()
+
+    assert(List(List("doris", 18).asJava, List("spark", 
10).asJava).asJava.equals(result))
+  }
+
+  @Test
+  @throws[Exception]
+  def testDataFrameSource(): Unit = {
+    initializeTable(TABLE_READ_TBL)
+
+    val session = SparkSession.builder().master("local[*]").getOrCreate()
+    val dorisSparkDF = session.read
+      .format("doris")
+      .option("doris.fenodes", DorisTestBase.getFenodes)
+      .option("doris.table.identifier", DATABASE + "." + TABLE_READ_TBL)
+      .option("user", DorisTestBase.USERNAME)
+      .option("password", DorisTestBase.PASSWORD)
+      .load()
+
+    val result = dorisSparkDF.collect().toList.toString()
+    session.stop()
+    assert("List([doris,18], [spark,10])".equals(result))
+  }
+
+  @Test
+  @throws[Exception]
+  def testSQLSource(): Unit = {
+    initializeTable(TABLE_READ_TBL)
+    val session = SparkSession.builder().master("local[*]").getOrCreate()
+    session.sql(
+      s"""
+         |CREATE TEMPORARY VIEW test_source
+         |USING doris
+         |OPTIONS(
+         | "table.identifier"="${DATABASE + "." + TABLE_READ_TBL}",
+         | "fenodes"="${DorisTestBase.getFenodes}",
+         | "user"="${DorisTestBase.USERNAME}",
+         | "password"="${DorisTestBase.PASSWORD}"
+         |);
+         |""".stripMargin)
+
+    val result = session.sql(
+      """
+        |select  name,age from test_source;
+        |""".stripMargin).collect().toList.toString()
+    session.stop()
+
+    assert("List([doris,18], [spark,10])".equals(result))
+  }
+
+  @throws[Exception]
+  private def initializeTable(table: String): Unit = {
+    try {
+      val statement: Statement = DorisTestBase.connection.createStatement
+      try {
+        statement.execute(String.format("CREATE DATABASE IF NOT EXISTS %s", 
DATABASE))
+        statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", 
DATABASE, table))
+        statement.execute(String.format("CREATE TABLE %s.%s ( \n" +
+          "`name` varchar(256),\n" +
+          "`age` int\n" +
+          ") DISTRIBUTED BY HASH(`name`) BUCKETS 1\n" +
+          "PROPERTIES (\n" +
+          "\"replication_num\" = \"1\"\n" +
+          ")\n", DATABASE, table))
+        statement.execute(String.format("insert into %s.%s  values 
('doris',18)", DATABASE, table))
+        statement.execute(String.format("insert into %s.%s  values 
('spark',10)", DATABASE, table))
+      } finally {
+        if (statement != null) statement.close()
+      }
+    }
+  }
+
+
+}
diff --git 
a/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/DorisWriterITCase.scala
 
b/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/DorisWriterITCase.scala
new file mode 100644
index 0000000..747c0bd
--- /dev/null
+++ 
b/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/DorisWriterITCase.scala
@@ -0,0 +1,151 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.spark.sql
+
+import org.apache.doris.spark.DorisTestBase
+import org.apache.spark.sql.SparkSession
+import org.junit.Test
+
+import java.sql.{ResultSet, Statement}
+import scala.collection.mutable.ListBuffer
+
+class DorisWriterITCase extends DorisTestBase {
+
+  val DATABASE: String = "test"
+  val TABLE_CSV: String = "tbl_csv"
+  val TABLE_JSON: String = "tbl_json"
+  val TABLE_JSON_TBL: String = "tbl_json_tbl"
+
+  @Test
+  @throws[Exception]
+  def testSinkCsvFormat(): Unit = {
+    initializeTable(TABLE_CSV)
+    val session = SparkSession.builder().master("local[*]").getOrCreate()
+    val df = session.createDataFrame(Seq(
+      ("doris_csv", 1),
+      ("spark_csv", 2)
+    )).toDF("name", "age")
+    df.write
+      .format("doris")
+      .option("doris.fenodes", DorisTestBase.getFenodes)
+      .option("doris.table.identifier", DATABASE + "." + TABLE_CSV)
+      .option("user", DorisTestBase.USERNAME)
+      .option("password", DorisTestBase.PASSWORD)
+      .option("sink.properteis.column_separator", ",")
+      .option("sink.properteis.line_delimiter", "\n")
+      .option("sink.properteis.format", "csv")
+      .save()
+    session.stop()
+
+    Thread.sleep(10000)
+    val actual = queryResult(TABLE_CSV);
+    val expected = ListBuffer(List("doris_csv", 1), List("spark_csv", 2))
+    assert(expected.equals(actual))
+  }
+
+  @Test
+  @throws[Exception]
+  def testSinkJsonFormat(): Unit = {
+    initializeTable(TABLE_JSON)
+    val session = SparkSession.builder().master("local[*]").getOrCreate()
+    val df = session.createDataFrame(Seq(
+      ("doris_json", 1),
+      ("spark_json", 2)
+    )).toDF("name", "age")
+    df.write
+      .format("doris")
+      .option("doris.fenodes", DorisTestBase.getFenodes)
+      .option("doris.table.identifier", DATABASE + "." + TABLE_JSON)
+      .option("user", DorisTestBase.USERNAME)
+      .option("password", DorisTestBase.PASSWORD)
+      .option("sink.properteis.read_json_by_line", "true")
+      .option("sink.properteis.format", "json")
+      .save()
+    session.stop()
+
+    Thread.sleep(10000)
+    val actual = queryResult(TABLE_JSON);
+    val expected = ListBuffer(List("doris_json", 1), List("spark_json", 2))
+    assert(expected.equals(actual))
+  }
+
+  @Test
+  @throws[Exception]
+  def testSQLSinkFormat(): Unit = {
+    initializeTable(TABLE_JSON_TBL)
+    val session = SparkSession.builder().master("local[*]").getOrCreate()
+    val df = session.createDataFrame(Seq(
+      ("doris_tbl", 1),
+      ("spark_tbl", 2)
+    )).toDF("name", "age")
+    df.createTempView("mock_source")
+    session.sql(
+      s"""
+         |CREATE TEMPORARY VIEW test_sink
+         |USING doris
+         |OPTIONS(
+         | "table.identifier"="${DATABASE + "." + TABLE_JSON_TBL}",
+         | "fenodes"="${DorisTestBase.getFenodes}",
+         | "user"="${DorisTestBase.USERNAME}",
+         | "password"="${DorisTestBase.PASSWORD}"
+         |);
+         |""".stripMargin)
+    session.sql(
+      """
+        |insert into test_sink select  name,age from mock_source ;
+        |""".stripMargin)
+    session.stop()
+
+    Thread.sleep(10000)
+    val actual = queryResult(TABLE_JSON_TBL);
+    val expected = ListBuffer(List("doris_tbl", 1), List("spark_tbl", 2))
+    assert(expected.equals(actual))
+  }
+
+  private def queryResult(table: String): ListBuffer[Any] = {
+    val actual = new ListBuffer[Any]
+    try {
+      val sinkStatement: Statement = DorisTestBase.connection.createStatement
+      try {
+        val sinkResultSet: ResultSet = 
sinkStatement.executeQuery(String.format("select name,age from %s.%s order by 
1", DATABASE, table))
+        while (sinkResultSet.next) {
+          val row = List(sinkResultSet.getString("name"), 
sinkResultSet.getInt("age"))
+          actual += row
+        }
+      } finally if (sinkStatement != null) sinkStatement.close()
+    }
+    actual
+  }
+
+  @throws[Exception]
+  private def initializeTable(table: String): Unit = {
+    try {
+      val statement: Statement = DorisTestBase.connection.createStatement
+      try {
+        statement.execute(String.format("CREATE DATABASE IF NOT EXISTS %s", 
DATABASE))
+        statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", 
DATABASE, table))
+        statement.execute(String.format(
+          "CREATE TABLE %s.%s ( \n" + "`name` varchar(256),\n" + "`age` int\n" 
+ ") " +
+          "DISTRIBUTED BY HASH(`name`) BUCKETS 1\n" +
+          "PROPERTIES (\n" +
+          "\"replication_num\" = \"1\"\n" + ")\n", DATABASE, table))
+      } finally if (statement != null) statement.close()
+    }
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to