This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 46d4567b [Improve](test) improve some ut and itcase (#486)
46d4567b is described below

commit 46d4567baeeb525acbb5590e26727b646880ab20
Author: wudi <676366...@qq.com>
AuthorDate: Wed Sep 11 10:37:28 2024 +0800

    [Improve](test) improve some ut and itcase (#486)
---
 flink-doris-connector/pom.xml                      |   6 +
 .../doris/flink/catalog/DorisCatalogITCase.java    |   2 +-
 .../flink/container/AbstractContainerTestBase.java |  23 ++
 .../doris/flink/container/AbstractE2EService.java  |   1 +
 .../flink/container/AbstractITCaseService.java     |  21 +-
 .../flink/container/e2e/Doris2DorisE2ECase.java    |   2 +-
 .../apache/doris/flink/sink/DorisSinkITCase.java   |  35 +--
 .../doris/flink/source/DorisSourceITCase.java      | 240 ++++++++++++++++++---
 .../enumerator/DorisSourceEnumeratorTest.java      | 111 ++++++++++
 9 files changed, 388 insertions(+), 53 deletions(-)

diff --git a/flink-doris-connector/pom.xml b/flink-doris-connector/pom.xml
index 64ca7392..d773339b 100644
--- a/flink-doris-connector/pom.xml
+++ b/flink-doris-connector/pom.xml
@@ -423,6 +423,12 @@ under the License.
             <version>${flink.version}</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-test-utils</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
         <dependency>
             <groupId>com.github.jsqlparser</groupId>
             <artifactId>jsqlparser</artifactId>
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/DorisCatalogITCase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/DorisCatalogITCase.java
index b3a3ce04..099f6ebd 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/DorisCatalogITCase.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/DorisCatalogITCase.java
@@ -146,7 +146,7 @@ public class DorisCatalogITCase extends 
AbstractITCaseService {
         props.put("sink.enable-2pc", "false");
         catalog = new DorisCatalog(TEST_CATALOG_NAME, connectionOptions, 
TEST_DB, props);
         this.tEnv = 
TableEnvironment.create(EnvironmentSettings.inStreamingMode());
-        tEnv.getConfig().set(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+        tEnv.getConfig().set(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 
DEFAULT_PARALLELISM);
         // Use doris catalog.
         tEnv.registerCatalog(TEST_CATALOG_NAME, catalog);
         tEnv.useCatalog(TEST_CATALOG_NAME);
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractContainerTestBase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractContainerTestBase.java
index 967e6f36..61e0faac 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractContainerTestBase.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractContainerTestBase.java
@@ -24,11 +24,18 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.sql.Connection;
+import java.util.List;
 import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 public abstract class AbstractContainerTestBase {
     private static final Logger LOG = 
LoggerFactory.getLogger(AbstractContainerTestBase.class);
     private static ContainerService dorisContainerService;
+    public static final int DEFAULT_PARALLELISM = 2;
 
     @BeforeClass
     public static void initContainers() {
@@ -88,4 +95,20 @@ public abstract class AbstractContainerTestBase {
         dorisContainerService.close();
         LOG.info("Doris container was closed.");
     }
+
+    // ------------------------------------------------------------------------
+    //  test utilities
+    // ------------------------------------------------------------------------
+    public static void assertEqualsInAnyOrder(List<Object> expected, 
List<Object> actual) {
+        assertTrue(expected != null && actual != null);
+        assertEqualsInOrder(
+                expected.stream().sorted().collect(Collectors.toList()),
+                actual.stream().sorted().collect(Collectors.toList()));
+    }
+
+    public static void assertEqualsInOrder(List<Object> expected, List<Object> 
actual) {
+        assertTrue(expected != null && actual != null);
+        assertEquals(expected.size(), actual.size());
+        assertArrayEquals(expected.toArray(new Object[0]), actual.toArray(new 
Object[0]));
+    }
 }
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractE2EService.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractE2EService.java
index 527f82cc..ec536ee6 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractE2EService.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractE2EService.java
@@ -113,6 +113,7 @@ public abstract class AbstractE2EService extends 
AbstractContainerTestBase {
 
     private StreamExecutionEnvironment configFlinkEnvironment() {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(DEFAULT_PARALLELISM);
         Map<String, String> flinkMap = new HashMap<>();
         flinkMap.put("execution.checkpointing.interval", "10s");
         flinkMap.put("pipeline.operator-chaining", "false");
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractITCaseService.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractITCaseService.java
index 956b8be6..6628933c 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractITCaseService.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractITCaseService.java
@@ -23,12 +23,8 @@ import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.core.execution.JobClient;
 import 
org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl;
 import org.apache.flink.runtime.minicluster.MiniCluster;
-import org.apache.flink.runtime.minicluster.RpcServiceSharing;
-import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
-import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.util.function.SupplierWithException;
 
-import org.junit.Rule;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -85,16 +81,6 @@ public abstract class AbstractITCaseService extends 
AbstractContainerTestBase {
         }
     }
 
-    @Rule
-    public final MiniClusterWithClientResource miniClusterResource =
-            new MiniClusterWithClientResource(
-                    new MiniClusterResourceConfiguration.Builder()
-                            .setNumberTaskManagers(1)
-                            .setNumberSlotsPerTaskManager(2)
-                            .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
-                            .withHaLeadershipControl()
-                            .build());
-
     /** The type of failover. */
     protected enum FailoverType {
         TM,
@@ -138,4 +124,11 @@ public abstract class AbstractITCaseService extends 
AbstractContainerTestBase {
         LOG.info("flink cluster will grant job master leadership. jobId={}", 
jobId);
         haLeadershipControl.grantJobMasterLeadership(jobId).get();
     }
+
+    protected void sleepMs(long millis) {
+        try {
+            Thread.sleep(millis);
+        } catch (InterruptedException ignored) {
+        }
+    }
 }
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Doris2DorisE2ECase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Doris2DorisE2ECase.java
index fcb4858a..4b4e3b26 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Doris2DorisE2ECase.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Doris2DorisE2ECase.java
@@ -55,7 +55,7 @@ public class Doris2DorisE2ECase extends AbstractE2EService {
         LOG.info("Start executing the test case of doris to doris.");
         initializeDorisTable();
         final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setParallelism(1);
+        env.setParallelism(2);
         env.setRuntimeMode(RuntimeExecutionMode.BATCH);
         final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
 
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java
index 50bcf6be..96562fa4 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java
@@ -22,8 +22,11 @@ import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.doris.flink.cfg.DorisExecutionOptions;
@@ -35,6 +38,7 @@ import org.apache.doris.flink.sink.DorisSink.Builder;
 import org.apache.doris.flink.sink.batch.DorisBatchSink;
 import org.apache.doris.flink.sink.writer.serializer.SimpleStringSerializer;
 import org.apache.doris.flink.utils.MockSource;
+import org.junit.Rule;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -65,6 +69,16 @@ public class DorisSinkITCase extends AbstractITCaseService {
     static final String TABLE_CSV_JM = "tbl_csv_jm";
     static final String TABLE_CSV_TM = "tbl_csv_tm";
 
+    @Rule
+    public final MiniClusterWithClientResource miniClusterResource =
+            new MiniClusterWithClientResource(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setNumberTaskManagers(1)
+                            .setNumberSlotsPerTaskManager(2)
+                            .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+                            .withHaLeadershipControl()
+                            .build());
+
     @Test
     public void testSinkCsvFormat() throws Exception {
         initializeTable(TABLE_CSV);
@@ -131,6 +145,7 @@ public class DorisSinkITCase extends AbstractITCaseService {
             throws Exception {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+        env.setParallelism(DEFAULT_PARALLELISM);
         Builder<String> builder = DorisSink.builder();
         final DorisReadOptions.Builder readOptionBuilder = 
DorisReadOptions.builder();
 
@@ -147,7 +162,7 @@ public class DorisSinkITCase extends AbstractITCaseService {
     public void testTableSinkJsonFormat() throws Exception {
         initializeTable(TABLE_JSON_TBL);
         final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setParallelism(1);
+        env.setParallelism(DEFAULT_PARALLELISM);
         env.setRuntimeMode(RuntimeExecutionMode.BATCH);
         final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
 
@@ -196,7 +211,7 @@ public class DorisSinkITCase extends AbstractITCaseService {
     public void testTableBatch() throws Exception {
         initializeTable(TABLE_CSV_BATCH_TBL);
         final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setParallelism(1);
+        env.setParallelism(DEFAULT_PARALLELISM);
         env.setRuntimeMode(RuntimeExecutionMode.BATCH);
         final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
 
@@ -244,6 +259,7 @@ public class DorisSinkITCase extends AbstractITCaseService {
         initializeTable(TABLE_CSV_BATCH_DS);
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+        env.setParallelism(DEFAULT_PARALLELISM);
         DorisBatchSink.Builder<String> builder = DorisBatchSink.builder();
 
         DorisOptions.Builder dorisBuilder = DorisOptions.builder();
@@ -283,7 +299,7 @@ public class DorisSinkITCase extends AbstractITCaseService {
     public void testTableGroupCommit() throws Exception {
         initializeTable(TABLE_GROUP_COMMIT);
         final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setParallelism(1);
+        env.setParallelism(DEFAULT_PARALLELISM);
         env.setRuntimeMode(RuntimeExecutionMode.BATCH);
         final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
 
@@ -332,7 +348,7 @@ public class DorisSinkITCase extends AbstractITCaseService {
     public void testTableGzFormat() throws Exception {
         initializeTable(TABLE_GZ_FORMAT);
         final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setParallelism(1);
+        env.setParallelism(DEFAULT_PARALLELISM);
         env.setRuntimeMode(RuntimeExecutionMode.BATCH);
         final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
 
@@ -374,7 +390,7 @@ public class DorisSinkITCase extends AbstractITCaseService {
         LOG.info("start to test JobManagerFailoverSink.");
         initializeFailoverTable(TABLE_CSV_JM);
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setParallelism(2);
+        env.setParallelism(DEFAULT_PARALLELISM);
         env.enableCheckpointing(10000);
         env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
 
@@ -434,7 +450,7 @@ public class DorisSinkITCase extends AbstractITCaseService {
         LOG.info("start to test TaskManagerFailoverSink.");
         initializeFailoverTable(TABLE_CSV_TM);
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setParallelism(2);
+        env.setParallelism(DEFAULT_PARALLELISM);
         env.enableCheckpointing(10000);
         env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
 
@@ -486,13 +502,6 @@ public class DorisSinkITCase extends AbstractITCaseService 
{
         ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected, 
query, 2);
     }
 
-    private void sleepMs(long millis) {
-        try {
-            Thread.sleep(millis);
-        } catch (InterruptedException ignored) {
-        }
-    }
-
     private void initializeTable(String table) {
         ContainerUtils.executeSQLStatement(
                 getDorisQueryConnection(),
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
index 783e6bda..96a08d1c 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
@@ -17,11 +17,16 @@
 
 package org.apache.doris.flink.source;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.CloseableIterator;
 
@@ -31,18 +36,17 @@ import 
org.apache.doris.flink.container.AbstractITCaseService;
 import org.apache.doris.flink.container.ContainerUtils;
 import org.apache.doris.flink.datastream.DorisSourceFunction;
 import org.apache.doris.flink.deserialization.SimpleListDeserializationSchema;
-import org.apache.doris.flink.exception.DorisRuntimeException;
 import org.junit.Assert;
+import org.junit.Rule;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Properties;
-import java.util.Set;
 
 /** DorisSource ITCase. */
 public class DorisSourceITCase extends AbstractITCaseService {
@@ -56,13 +60,25 @@ public class DorisSourceITCase extends 
AbstractITCaseService {
     private static final String TABLE_READ_TBL_PUSH_DOWN = 
"tbl_read_tbl_push_down";
     private static final String TABLE_READ_TBL_PUSH_DOWN_WITH_UNION_ALL =
             "tbl_read_tbl_push_down_with_union_all";
+    static final String TABLE_CSV_JM = "tbl_csv_jm_source";
+    static final String TABLE_CSV_TM = "tbl_csv_tm_source";
+
+    @Rule
+    public final MiniClusterWithClientResource miniClusterResource =
+            new MiniClusterWithClientResource(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setNumberTaskManagers(1)
+                            .setNumberSlotsPerTaskManager(2)
+                            .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+                            .withHaLeadershipControl()
+                            .build());
 
     @Test
     public void testSource() throws Exception {
         initializeTable(TABLE_READ);
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         env.setRuntimeMode(RuntimeExecutionMode.BATCH);
-
+        env.setParallelism(DEFAULT_PARALLELISM);
         DorisOptions.Builder dorisBuilder = DorisOptions.builder();
         dorisBuilder
                 .setFenodes(getFenodes())
@@ -84,13 +100,14 @@ public class DorisSourceITCase extends 
AbstractITCaseService {
             }
         }
         List<String> expected = Arrays.asList("[doris, 18]", "[flink, 10]", 
"[apache, 12]");
-        checkResult("testSource", expected.toArray(), actual.toArray());
+        checkResultInAnyOrder("testSource", expected.toArray(), 
actual.toArray());
     }
 
     @Test
     public void testOldSourceApi() throws Exception {
         initializeTable(TABLE_READ_OLD_API);
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(DEFAULT_PARALLELISM);
         Properties properties = new Properties();
         properties.put("fenodes", getFenodes());
         properties.put("username", getDorisUsername());
@@ -109,14 +126,14 @@ public class DorisSourceITCase extends 
AbstractITCaseService {
             }
         }
         List<String> expected = Arrays.asList("[doris, 18]", "[flink, 10]", 
"[apache, 12]");
-        checkResult("testOldSourceApi", expected.toArray(), actual.toArray());
+        checkResultInAnyOrder("testOldSourceApi", expected.toArray(), 
actual.toArray());
     }
 
     @Test
     public void testTableSource() throws Exception {
         initializeTable(TABLE_READ_TBL);
         final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setParallelism(1);
+        env.setParallelism(DEFAULT_PARALLELISM);
         env.setRuntimeMode(RuntimeExecutionMode.BATCH);
         final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
 
@@ -146,7 +163,7 @@ public class DorisSourceITCase extends 
AbstractITCaseService {
             }
         }
         String[] expected = new String[] {"+I[doris, 18]", "+I[flink, 10]", 
"+I[apache, 12]"};
-        Assert.assertArrayEquals(expected, actual.toArray());
+        assertEqualsInAnyOrder(Arrays.asList(expected), 
Arrays.asList(actual.toArray()));
 
         // fitler query
         List<String> actualFilter = new ArrayList<>();
@@ -158,14 +175,14 @@ public class DorisSourceITCase extends 
AbstractITCaseService {
             }
         }
         String[] expectedFilter = new String[] {"+I[doris, 18]"};
-        checkResult("testTableSource", expectedFilter, actualFilter.toArray());
+        checkResultInAnyOrder("testTableSource", expectedFilter, 
actualFilter.toArray());
     }
 
     @Test
     public void testTableSourceOldApi() throws Exception {
         initializeTable(TABLE_READ_TBL_OLD_API);
         final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setParallelism(1);
+        env.setParallelism(DEFAULT_PARALLELISM);
         final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
 
         String sourceDDL =
@@ -195,14 +212,14 @@ public class DorisSourceITCase extends 
AbstractITCaseService {
             }
         }
         String[] expected = new String[] {"+I[doris, 18]", "+I[flink, 10]", 
"+I[apache, 12]"};
-        checkResult("testTableSourceOldApi", expected, actual.toArray());
+        checkResultInAnyOrder("testTableSourceOldApi", expected, 
actual.toArray());
     }
 
     @Test
     public void testTableSourceAllOptions() throws Exception {
         initializeTable(TABLE_READ_TBL_ALL_OPTIONS);
         final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setParallelism(1);
+        env.setParallelism(DEFAULT_PARALLELISM);
         final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
 
         String sourceDDL =
@@ -241,14 +258,14 @@ public class DorisSourceITCase extends 
AbstractITCaseService {
             }
         }
         String[] expected = new String[] {"+I[doris, 18]", "+I[flink, 10]", 
"+I[apache, 12]"};
-        checkResult("testTableSourceAllOptions", expected, actual.toArray());
+        checkResultInAnyOrder("testTableSourceAllOptions", expected, 
actual.toArray());
     }
 
     @Test
     public void testTableSourceFilterAndProjectionPushDown() throws Exception {
         initializeTable(TABLE_READ_TBL_PUSH_DOWN);
         final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setParallelism(1);
+        env.setParallelism(DEFAULT_PARALLELISM);
         final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
 
         String sourceDDL =
@@ -279,15 +296,16 @@ public class DorisSourceITCase extends 
AbstractITCaseService {
             }
         }
         String[] expected = new String[] {"+I[18]"};
-        checkResult("testTableSourceFilterAndProjectionPushDown", expected, 
actual.toArray());
+        checkResultInAnyOrder(
+                "testTableSourceFilterAndProjectionPushDown", expected, 
actual.toArray());
     }
 
     @Test
-    public void testTableSourceFilterWithUnionAll() {
+    public void testTableSourceFilterWithUnionAll() throws Exception {
         LOG.info("starting to execute testTableSourceFilterWithUnionAll 
case.");
         initializeTable(TABLE_READ_TBL_PUSH_DOWN_WITH_UNION_ALL);
         final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setParallelism(1);
+        env.setParallelism(DEFAULT_PARALLELISM);
         final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
 
         String sourceDDL =
@@ -318,14 +336,134 @@ public class DorisSourceITCase extends 
AbstractITCaseService {
             while (iterator.hasNext()) {
                 actual.add(iterator.next().toString());
             }
-        } catch (Exception e) {
-            LOG.error("Failed to execute sql. sql={}", querySql, e);
-            throw new DorisRuntimeException(e);
         }
-        Set<String> expected = new HashSet<>(Arrays.asList("+I[flink, 10]", 
"+I[doris, 18]"));
-        for (String a : actual) {
-            Assert.assertTrue(expected.contains(a));
+
+        String[] expected = new String[] {"+I[flink, 10]", "+I[doris, 18]"};
+        checkResultInAnyOrder("testTableSourceFilterWithUnionAll", expected, 
actual.toArray());
+    }
+
+    @Test
+    public void testJobManagerFailoverSource() throws Exception {
+        LOG.info("start to test JobManagerFailoverSource.");
+        initializeTableWithData(TABLE_CSV_JM);
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(DEFAULT_PARALLELISM);
+        env.enableCheckpointing(200L);
+        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+        String sourceDDL =
+                String.format(
+                        "CREATE TABLE doris_source_jm ("
+                                + " name STRING,"
+                                + " age INT"
+                                + ") WITH ("
+                                + " 'connector' = 'doris',"
+                                + " 'fenodes' = '%s',"
+                                + " 'table.identifier' = '%s',"
+                                + " 'username' = '%s',"
+                                + " 'password' = '%s'"
+                                + ")",
+                        getFenodes(),
+                        DATABASE + "." + TABLE_CSV_JM,
+                        getDorisUsername(),
+                        getDorisPassword());
+        tEnv.executeSql(sourceDDL);
+        TableResult tableResult = tEnv.executeSql("select * from 
doris_source_jm");
+        CloseableIterator<Row> iterator = tableResult.collect();
+        JobID jobId = tableResult.getJobClient().get().getJobID();
+
+        List<String> expectedData = getExpectedData();
+        if (iterator.hasNext()) {
+            LOG.info("trigger jobmanager failover...");
+            triggerFailover(
+                    FailoverType.JM,
+                    jobId,
+                    miniClusterResource.getMiniCluster(),
+                    () -> sleepMs(100));
+        }
+        List<String> actual = fetchRows(iterator);
+        LOG.info("actual data: {}, expected: {}", actual, expectedData);
+        Assert.assertTrue(actual.size() >= expectedData.size());
+        Assert.assertTrue(actual.containsAll(expectedData));
+    }
+
+    private static List<String> getExpectedData() {
+        String[] expected =
+                new String[] {
+                    "+I[101, 1]",
+                    "+I[102, 1]",
+                    "+I[103, 1]",
+                    "+I[201, 2]",
+                    "+I[202, 2]",
+                    "+I[203, 2]",
+                    "+I[301, 3]",
+                    "+I[302, 3]",
+                    "+I[303, 3]",
+                    "+I[401, 4]",
+                    "+I[402, 4]",
+                    "+I[403, 4]",
+                    "+I[501, 5]",
+                    "+I[502, 5]",
+                    "+I[503, 5]",
+                    "+I[601, 6]",
+                    "+I[602, 6]",
+                    "+I[603, 6]",
+                    "+I[701, 7]",
+                    "+I[702, 7]",
+                    "+I[703, 7]",
+                    "+I[801, 8]",
+                    "+I[802, 8]",
+                    "+I[803, 8]",
+                    "+I[901, 9]",
+                    "+I[902, 9]",
+                    "+I[903, 9]"
+                };
+        return Arrays.asList(expected);
+    }
+
+    @Test
+    public void testTaskManagerFailoverSource() throws Exception {
+        LOG.info("start to test TaskManagerFailoverSource.");
+        initializeTableWithData(TABLE_CSV_TM);
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(DEFAULT_PARALLELISM);
+        env.enableCheckpointing(200L);
+        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+        String sourceDDL =
+                String.format(
+                        "CREATE TABLE doris_source_tm ("
+                                + " name STRING,"
+                                + " age INT"
+                                + ") WITH ("
+                                + " 'connector' = 'doris',"
+                                + " 'fenodes' = '%s',"
+                                + " 'table.identifier' = '%s',"
+                                + " 'username' = '%s',"
+                                + " 'password' = '%s'"
+                                + ")",
+                        getFenodes(),
+                        DATABASE + "." + TABLE_CSV_TM,
+                        getDorisUsername(),
+                        getDorisPassword());
+        tEnv.executeSql(sourceDDL);
+        TableResult tableResult = tEnv.executeSql("select * from 
doris_source_tm");
+        CloseableIterator<Row> iterator = tableResult.collect();
+        JobID jobId = tableResult.getJobClient().get().getJobID();
+        List<String> expectedData = getExpectedData();
+        if (iterator.hasNext()) {
+            LOG.info("trigger taskmanager failover...");
+            triggerFailover(
+                    FailoverType.TM,
+                    jobId,
+                    miniClusterResource.getMiniCluster(),
+                    () -> sleepMs(100));
         }
+
+        List<String> actual = fetchRows(iterator);
+        LOG.info("actual data: {}, expected: {}", actual, expectedData);
+        Assert.assertTrue(actual.size() >= expectedData.size());
+        Assert.assertTrue(actual.containsAll(expectedData));
     }
 
     private void checkResult(String testName, Object[] expected, Object[] 
actual) {
@@ -337,6 +475,15 @@ public class DorisSourceITCase extends 
AbstractITCaseService {
         Assert.assertArrayEquals(expected, actual);
     }
 
+    private void checkResultInAnyOrder(String testName, Object[] expected, 
Object[] actual) {
+        LOG.info(
+                "Checking DorisSourceITCase result. testName={}, actual={}, 
expected={}",
+                testName,
+                actual,
+                expected);
+        assertEqualsInAnyOrder(Arrays.asList(expected), Arrays.asList(actual));
+    }
+
     private void initializeTable(String table) {
         ContainerUtils.executeSQLStatement(
                 getDorisQueryConnection(),
@@ -347,7 +494,7 @@ public class DorisSourceITCase extends 
AbstractITCaseService {
                         "CREATE TABLE %s.%s ( \n"
                                 + "`name` varchar(256),\n"
                                 + "`age` int\n"
-                                + ") DISTRIBUTED BY HASH(`name`) BUCKETS 1\n"
+                                + ") DISTRIBUTED BY HASH(`name`) BUCKETS 10\n"
                                 + "PROPERTIES (\n"
                                 + "\"replication_num\" = \"1\"\n"
                                 + ")\n",
@@ -356,4 +503,49 @@ public class DorisSourceITCase extends 
AbstractITCaseService {
                 String.format("insert into %s.%s  values ('flink',10)", 
DATABASE, table),
                 String.format("insert into %s.%s  values ('apache',12)", 
DATABASE, table));
     }
+
+    private void initializeTableWithData(String table) {
+        ContainerUtils.executeSQLStatement(
+                getDorisQueryConnection(),
+                LOG,
+                String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE),
+                String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, table),
+                String.format(
+                        "CREATE TABLE %s.%s ( \n"
+                                + "`name` varchar(256),\n"
+                                + "`age` int\n"
+                                + ") DISTRIBUTED BY HASH(`name`) BUCKETS 10\n"
+                                + "PROPERTIES (\n"
+                                + "\"replication_num\" = \"1\"\n"
+                                + ")\n",
+                        DATABASE, table),
+                String.format(
+                        "insert into %s.%s  values 
('101',1),('102',1),('103',1)", DATABASE, table),
+                String.format(
+                        "insert into %s.%s  values 
('201',2),('202',2),('203',2)", DATABASE, table),
+                String.format(
+                        "insert into %s.%s  values 
('301',3),('302',3),('303',3)", DATABASE, table),
+                String.format(
+                        "insert into %s.%s  values 
('401',4),('402',4),('403',4)", DATABASE, table),
+                String.format(
+                        "insert into %s.%s  values 
('501',5),('502',5),('503',5)", DATABASE, table),
+                String.format(
+                        "insert into %s.%s  values 
('601',6),('602',6),('603',6)", DATABASE, table),
+                String.format(
+                        "insert into %s.%s  values 
('701',7),('702',7),('703',7)", DATABASE, table),
+                String.format(
+                        "insert into %s.%s  values 
('801',8),('802',8),('803',8)", DATABASE, table),
+                String.format(
+                        "insert into %s.%s  values 
('901',9),('902',9),('903',9)",
+                        DATABASE, table));
+    }
+
+    private static List<String> fetchRows(Iterator<Row> iter) {
+        List<String> rows = new ArrayList<>();
+        while (iter.hasNext()) {
+            Row row = iter.next();
+            rows.add(row.toString());
+        }
+        return rows;
+    }
 }
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/enumerator/DorisSourceEnumeratorTest.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/enumerator/DorisSourceEnumeratorTest.java
new file mode 100644
index 00000000..68789015
--- /dev/null
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/enumerator/DorisSourceEnumeratorTest.java
@@ -0,0 +1,111 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.source.enumerator;
+
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import 
org.apache.flink.connector.testutils.source.reader.TestingSplitEnumeratorContext;
+
+import org.apache.doris.flink.rest.PartitionDefinition;
+import org.apache.doris.flink.source.DorisSource;
+import org.apache.doris.flink.source.assigners.SimpleSplitAssigner;
+import org.apache.doris.flink.source.split.DorisSourceSplit;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit tests for the {@link DorisSourceEnumerator}. */
+public class DorisSourceEnumeratorTest {
+    private static long splitId = 1L;
+    private TestingSplitEnumeratorContext<DorisSourceSplit> context;
+    private DorisSourceSplit split;
+    private DorisSourceEnumerator enumerator;
+
+    @BeforeEach
+    void setup() {
+        this.context = new TestingSplitEnumeratorContext<>(2);
+        this.split = createRandomSplit();
+        this.enumerator = createEnumerator(context, split);
+    }
+
+    @Test
+    void testCheckpointNoSplitRequested() throws Exception {
+        PendingSplitsCheckpoint state = enumerator.snapshotState(1L);
+        assertThat(state.getSplits()).contains(split);
+    }
+
+    @Test
+    void testRestoreEnumerator() throws Exception {
+        PendingSplitsCheckpoint state = enumerator.snapshotState(1L);
+
+        DorisSource<String> source = DorisSource.<String>builder().build();
+        SplitEnumerator<DorisSourceSplit, PendingSplitsCheckpoint> 
restoreEnumerator =
+                source.restoreEnumerator(context, state);
+        PendingSplitsCheckpoint pendingSplitsCheckpoint = 
restoreEnumerator.snapshotState(1L);
+        assertThat(pendingSplitsCheckpoint.getSplits()).contains(split);
+    }
+
+    @Test
+    void testSplitRequestForRegisteredReader() throws Exception {
+        context.registerReader(1, "somehost");
+        enumerator.addReader(1);
+        enumerator.handleSplitRequest(1, "somehost");
+        assertThat(enumerator.snapshotState(1L).getSplits()).isEmpty();
+        
assertThat(context.getSplitAssignments().get(1).getAssignedSplits()).contains(split);
+    }
+
+    @Test
+    void testSplitRequestForNonRegisteredReader() throws Exception {
+        enumerator.handleSplitRequest(1, "somehost");
+        assertThat(context.getSplitAssignments()).doesNotContainKey(1);
+        assertThat(enumerator.snapshotState(1L).getSplits()).contains(split);
+    }
+
+    @Test
+    void testNoMoreSplits() {
+        // first split assignment
+        context.registerReader(1, "somehost");
+        enumerator.addReader(1);
+        enumerator.handleSplitRequest(1, "somehost");
+
+        // second request has no more split
+        enumerator.handleSplitRequest(1, "somehost");
+
+        
assertThat(context.getSplitAssignments().get(1).getAssignedSplits()).contains(split);
+        
assertThat(context.getSplitAssignments().get(1).hasReceivedNoMoreSplitsSignal()).isTrue();
+    }
+
+    private static DorisSourceSplit createRandomSplit() {
+        Set<Long> tabletIds = new HashSet<>();
+        tabletIds.add(1001L);
+        return new DorisSourceSplit(
+                String.valueOf(splitId),
+                new PartitionDefinition("db", "tbl", "127.0.0.1", tabletIds, 
"queryPlan"));
+    }
+
+    private static DorisSourceEnumerator createEnumerator(
+            final SplitEnumeratorContext<DorisSourceSplit> context,
+            final DorisSourceSplit... splits) {
+        return new DorisSourceEnumerator(context, new 
SimpleSplitAssigner(Arrays.asList(splits)));
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to