This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris-flink-connector.git
commit 49cc46e2992edb0179b5ff14e47939752939162a Author: wudi <676366...@qq.com> AuthorDate: Wed Dec 15 16:21:29 2021 +0800 [improvement](flink-connector) DataSourceFunction read doris supports parallel (#7232) The previous DataSourceFunction inherited from RichSourceFunction. As a result, no matter how much the parallelism of flink is set, the parallelism of DataSourceFunction is only 1. Now modify it to RichParallelSourceFunction. And when flink has multiple degrees of parallelism, assign the doris data to each parallelism. For example, read dorisPartitions.size = 10, flink.parallelism = 4 The task is split as follows: task0: dorisPartitions[0],[4],[8] task1: dorisPartitions[1],[5],[9] task2: dorisPartitions[2],[6] task3: dorisPartitions[3],[7] --- .../flink/datastream/DorisSourceFunction.java | 32 +++++++++++++++++++--- .../org/apache/doris/flink/rest/RestService.java | 4 +-- .../doris/flink/table/DorisDynamicTableSource.java | 2 +- .../org/apache/doris/flink/DorisSourceExample.java | 2 +- 4 files changed, 32 insertions(+), 8 deletions(-) diff --git a/src/main/java/org/apache/doris/flink/datastream/DorisSourceFunction.java b/src/main/java/org/apache/doris/flink/datastream/DorisSourceFunction.java index 85f5f6b..edde953 100644 --- a/src/main/java/org/apache/doris/flink/datastream/DorisSourceFunction.java +++ b/src/main/java/org/apache/doris/flink/datastream/DorisSourceFunction.java @@ -20,12 +20,14 @@ import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.cfg.DorisReadOptions; import org.apache.doris.flink.cfg.DorisStreamOptions; import org.apache.doris.flink.deserialization.DorisDeserializationSchema; +import org.apache.doris.flink.exception.DorisException; import org.apache.doris.flink.rest.PartitionDefinition; import org.apache.doris.flink.rest.RestService; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.apache.flink.shaded.guava18.com.google.common.collect.Lists; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,7 +38,7 @@ import java.util.List; * DorisSource **/ -public class DorisSourceFunction extends RichSourceFunction<List<?>> implements ResultTypeQueryable<List<?>> { +public class DorisSourceFunction extends RichParallelSourceFunction<List<?>> implements ResultTypeQueryable<List<?>> { private static final Logger logger = LoggerFactory.getLogger(DorisSourceFunction.class); @@ -45,23 +47,45 @@ public class DorisSourceFunction extends RichSourceFunction<List<?>> implements private final DorisReadOptions readOptions; private transient volatile boolean isRunning; private List<PartitionDefinition> dorisPartitions; + private List<PartitionDefinition> taskDorisPartitions = Lists.newArrayList(); public DorisSourceFunction(DorisStreamOptions streamOptions, DorisDeserializationSchema<List<?>> deserializer) { this.deserializer = deserializer; this.options = streamOptions.getOptions(); this.readOptions = streamOptions.getReadOptions(); + try { + this.dorisPartitions = RestService.findPartitions(options, readOptions, logger); + logger.info("Doris partitions size {}", dorisPartitions.size()); + } catch (DorisException e) { + throw new RuntimeException("Failed fetch doris partitions"); + } } @Override public void open(Configuration parameters) throws Exception { super.open(parameters); this.isRunning = true; - this.dorisPartitions = RestService.findPartitions(options, readOptions, logger); + assignTaskPartitions(); + } + + /** + * Assign patitions to each task. + */ + private void assignTaskPartitions() { + int taskIndex = getRuntimeContext().getIndexOfThisSubtask(); + int totalTasks = getRuntimeContext().getNumberOfParallelSubtasks(); + + for (int i = 0; i < dorisPartitions.size(); i++) { + if (i % totalTasks == taskIndex) { + taskDorisPartitions.add(dorisPartitions.get(i)); + } + } + logger.info("subtask {} process {} partitions ", taskIndex, taskDorisPartitions.size()); } @Override public void run(SourceContext<List<?>> sourceContext) { - for (PartitionDefinition partitions : dorisPartitions) { + for (PartitionDefinition partitions : taskDorisPartitions) { try (ScalaValueReader scalaValueReader = new ScalaValueReader(partitions, options, readOptions)) { while (isRunning && scalaValueReader.hasNext()) { List<?> next = scalaValueReader.next(); diff --git a/src/main/java/org/apache/doris/flink/rest/RestService.java b/src/main/java/org/apache/doris/flink/rest/RestService.java index 1e6310c..82e01e0 100644 --- a/src/main/java/org/apache/doris/flink/rest/RestService.java +++ b/src/main/java/org/apache/doris/flink/rest/RestService.java @@ -444,11 +444,11 @@ public class RestService implements Serializable { } /** - * find Doris RDD partitions from Doris FE. + * find Doris partitions from Doris FE. * * @param options configuration of request * @param logger {@link Logger} - * @return an list of Doris RDD partitions + * @return an list of Doris partitions * @throws DorisException throw when find partition failed */ public static List<PartitionDefinition> findPartitions(DorisOptions options, DorisReadOptions readOptions, Logger logger) throws DorisException { diff --git a/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java b/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java index 43d9e5f..0262677 100644 --- a/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java +++ b/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java @@ -72,7 +72,7 @@ public final class DorisDynamicTableSource implements ScanTableSource, LookupTab try { dorisPartitions = RestService.findPartitions(options, readOptions, LOG); } catch (DorisException e) { - throw new RuntimeException("can not fetch partitions"); + throw new RuntimeException("Failed fetch doris partitions"); } DorisRowDataInputFormat.Builder builder = DorisRowDataInputFormat.builder() .setFenodes(options.getFenodes()) diff --git a/src/test/java/org/apache/doris/flink/DorisSourceExample.java b/src/test/java/org/apache/doris/flink/DorisSourceExample.java index eb1d819..35857dc 100644 --- a/src/test/java/org/apache/doris/flink/DorisSourceExample.java +++ b/src/test/java/org/apache/doris/flink/DorisSourceExample.java @@ -26,7 +26,7 @@ public class DorisSourceExample { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(1); // source only supports parallelism of 1 + env.setParallelism(1); final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org