This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository 
https://gitbox.apache.org/repos/asf/incubator-doris-flink-connector.git

commit 49cc46e2992edb0179b5ff14e47939752939162a
Author: wudi <676366...@qq.com>
AuthorDate: Wed Dec 15 16:21:29 2021 +0800

    [improvement](flink-connector) DataSourceFunction read doris supports 
parallel (#7232)
    
    The previous DataSourceFunction inherited from RichSourceFunction.
    As a result, no matter how much the parallelism of flink is set, the 
parallelism of DataSourceFunction is only 1.
    Now modify it to RichParallelSourceFunction.
    
    And when flink has multiple degrees of parallelism, assign the doris data 
to each parallelism.
    For example, read dorisPartitions.size = 10, flink.parallelism = 4
    The task is split as follows:
    task0: dorisPartitions[0],[4],[8]
    task1: dorisPartitions[1],[5],[9]
    task2: dorisPartitions[2],[6]
    task3: dorisPartitions[3],[7]
---
 .../flink/datastream/DorisSourceFunction.java      | 32 +++++++++++++++++++---
 .../org/apache/doris/flink/rest/RestService.java   |  4 +--
 .../doris/flink/table/DorisDynamicTableSource.java |  2 +-
 .../org/apache/doris/flink/DorisSourceExample.java |  2 +-
 4 files changed, 32 insertions(+), 8 deletions(-)

diff --git 
a/src/main/java/org/apache/doris/flink/datastream/DorisSourceFunction.java 
b/src/main/java/org/apache/doris/flink/datastream/DorisSourceFunction.java
index 85f5f6b..edde953 100644
--- a/src/main/java/org/apache/doris/flink/datastream/DorisSourceFunction.java
+++ b/src/main/java/org/apache/doris/flink/datastream/DorisSourceFunction.java
@@ -20,12 +20,14 @@ import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.cfg.DorisReadOptions;
 import org.apache.doris.flink.cfg.DorisStreamOptions;
 import org.apache.doris.flink.deserialization.DorisDeserializationSchema;
+import org.apache.doris.flink.exception.DorisException;
 import org.apache.doris.flink.rest.PartitionDefinition;
 import org.apache.doris.flink.rest.RestService;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,7 +38,7 @@ import java.util.List;
  * DorisSource
  **/
 
-public class DorisSourceFunction extends RichSourceFunction<List<?>> 
implements ResultTypeQueryable<List<?>> {
+public class DorisSourceFunction extends RichParallelSourceFunction<List<?>> 
implements ResultTypeQueryable<List<?>> {
 
     private static final Logger logger = 
LoggerFactory.getLogger(DorisSourceFunction.class);
 
@@ -45,23 +47,45 @@ public class DorisSourceFunction extends 
RichSourceFunction<List<?>> implements
     private final DorisReadOptions readOptions;
     private transient volatile boolean isRunning;
     private List<PartitionDefinition> dorisPartitions;
+    private List<PartitionDefinition> taskDorisPartitions = 
Lists.newArrayList();
 
     public DorisSourceFunction(DorisStreamOptions streamOptions, 
DorisDeserializationSchema<List<?>> deserializer) {
         this.deserializer = deserializer;
         this.options = streamOptions.getOptions();
         this.readOptions = streamOptions.getReadOptions();
+        try {
+            this.dorisPartitions = RestService.findPartitions(options, 
readOptions, logger);
+            logger.info("Doris partitions size {}", dorisPartitions.size());
+        } catch (DorisException e) {
+            throw new RuntimeException("Failed fetch doris partitions");
+        }
     }
 
     @Override
     public void open(Configuration parameters) throws Exception {
         super.open(parameters);
         this.isRunning = true;
-        this.dorisPartitions = RestService.findPartitions(options, 
readOptions, logger);
+        assignTaskPartitions();
+    }
+
+    /**
+     * Assign patitions to each task.
+     */
+    private void assignTaskPartitions() {
+        int taskIndex = getRuntimeContext().getIndexOfThisSubtask();
+        int totalTasks = getRuntimeContext().getNumberOfParallelSubtasks();
+
+        for (int i = 0; i < dorisPartitions.size(); i++) {
+            if (i % totalTasks == taskIndex) {
+                taskDorisPartitions.add(dorisPartitions.get(i));
+            }
+        }
+        logger.info("subtask {} process {} partitions ", taskIndex, 
taskDorisPartitions.size());
     }
 
     @Override
     public void run(SourceContext<List<?>> sourceContext) {
-        for (PartitionDefinition partitions : dorisPartitions) {
+        for (PartitionDefinition partitions : taskDorisPartitions) {
             try (ScalaValueReader scalaValueReader = new 
ScalaValueReader(partitions, options, readOptions)) {
                 while (isRunning && scalaValueReader.hasNext()) {
                     List<?> next = scalaValueReader.next();
diff --git a/src/main/java/org/apache/doris/flink/rest/RestService.java 
b/src/main/java/org/apache/doris/flink/rest/RestService.java
index 1e6310c..82e01e0 100644
--- a/src/main/java/org/apache/doris/flink/rest/RestService.java
+++ b/src/main/java/org/apache/doris/flink/rest/RestService.java
@@ -444,11 +444,11 @@ public class RestService implements Serializable {
     }
 
     /**
-     * find Doris RDD partitions from Doris FE.
+     * find Doris partitions from Doris FE.
      *
      * @param options configuration of request
      * @param logger  {@link Logger}
-     * @return an list of Doris RDD partitions
+     * @return an list of Doris partitions
      * @throws DorisException throw when find partition failed
      */
     public static List<PartitionDefinition> findPartitions(DorisOptions 
options, DorisReadOptions readOptions, Logger logger) throws DorisException {
diff --git 
a/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java 
b/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
index 43d9e5f..0262677 100644
--- a/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
+++ b/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
@@ -72,7 +72,7 @@ public final class DorisDynamicTableSource implements 
ScanTableSource, LookupTab
         try {
             dorisPartitions = RestService.findPartitions(options, readOptions, 
LOG);
         } catch (DorisException e) {
-            throw new RuntimeException("can not fetch partitions");
+            throw new RuntimeException("Failed fetch doris partitions");
         }
         DorisRowDataInputFormat.Builder builder = 
DorisRowDataInputFormat.builder()
                 .setFenodes(options.getFenodes())
diff --git a/src/test/java/org/apache/doris/flink/DorisSourceExample.java 
b/src/test/java/org/apache/doris/flink/DorisSourceExample.java
index eb1d819..35857dc 100644
--- a/src/test/java/org/apache/doris/flink/DorisSourceExample.java
+++ b/src/test/java/org/apache/doris/flink/DorisSourceExample.java
@@ -26,7 +26,7 @@ public class DorisSourceExample {
     public static void main(String[] args) throws Exception {
 
         final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setParallelism(1); // source only supports parallelism of 1
+        env.setParallelism(1);
 
         final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
 

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to