This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new 7bba8aa [Feature]Support 'sink.parallelism' configuration (#72) 7bba8aa is described below commit 7bba8aa4e69650ff61752c06a8aadaa26b0f12ad Author: huyuanfeng2018 <731030...@qq.com> AuthorDate: Mon Oct 17 10:10:54 2022 +0800 [Feature]Support 'sink.parallelism' configuration (#72) Co-authored-by: huyuanfeng <huyuanf...@huya.com> --- .../org/apache/doris/flink/catalog/DorisCatalogFactory.java | 2 ++ .../org/apache/doris/flink/table/DorisConfigOptions.java | 3 +++ .../apache/doris/flink/table/DorisDynamicTableFactory.java | 7 ++++++- .../org/apache/doris/flink/table/DorisDynamicTableSink.java | 13 ++++++++----- 4 files changed, 19 insertions(+), 6 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalogFactory.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalogFactory.java index f23f0dc..9a80370 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalogFactory.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalogFactory.java @@ -48,6 +48,7 @@ import static org.apache.doris.flink.table.DorisConfigOptions.SINK_ENABLE_2PC; import static org.apache.doris.flink.table.DorisConfigOptions.SINK_ENABLE_DELETE; import static org.apache.doris.flink.table.DorisConfigOptions.SINK_LABEL_PREFIX; import static org.apache.doris.flink.table.DorisConfigOptions.SINK_MAX_RETRIES; +import static org.apache.doris.flink.table.DorisConfigOptions.SINK_PARALLELISM; import static org.apache.doris.flink.table.DorisConfigOptions.SOURCE_USE_OLD_API; import static org.apache.doris.flink.table.DorisConfigOptions.STREAM_LOAD_PROP_PREFIX; import static org.apache.doris.flink.table.DorisConfigOptions.TABLE_IDENTIFIER; @@ -102,6 +103,7 @@ public class DorisCatalogFactory implements CatalogFactory { options.add(SINK_LABEL_PREFIX); options.add(SINK_BUFFER_SIZE); options.add(SINK_BUFFER_COUNT); + options.add(SINK_PARALLELISM); options.add(SOURCE_USE_OLD_API); return options; diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java index 02a4d22..391df7d 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java @@ -19,6 +19,7 @@ package org.apache.doris.flink.table; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.table.factories.FactoryUtil; import java.time.Duration; @@ -163,6 +164,8 @@ public class DorisConfigOptions { .defaultValue(true) .withDescription("whether to enable the delete function"); + public static final ConfigOption<Integer> SINK_PARALLELISM = FactoryUtil.SINK_PARALLELISM; + // Prefix for Doris StreamLoad specific properties. public static final String STREAM_LOAD_PROP_PREFIX = "sink.properties."; diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java index 5a11605..521724b 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java @@ -60,6 +60,7 @@ import static org.apache.doris.flink.table.DorisConfigOptions.SINK_ENABLE_2PC; import static org.apache.doris.flink.table.DorisConfigOptions.SINK_ENABLE_DELETE; import static org.apache.doris.flink.table.DorisConfigOptions.SINK_LABEL_PREFIX; import static org.apache.doris.flink.table.DorisConfigOptions.SINK_MAX_RETRIES; +import static org.apache.doris.flink.table.DorisConfigOptions.SINK_PARALLELISM; import static org.apache.doris.flink.table.DorisConfigOptions.SOURCE_USE_OLD_API; import static org.apache.doris.flink.table.DorisConfigOptions.STREAM_LOAD_PROP_PREFIX; import static org.apache.doris.flink.table.DorisConfigOptions.TABLE_IDENTIFIER; @@ -117,6 +118,7 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactory options.add(SINK_LABEL_PREFIX); options.add(SINK_BUFFER_SIZE); options.add(SINK_BUFFER_COUNT); + options.add(SINK_PARALLELISM); options.add(SOURCE_USE_OLD_API); return options; @@ -213,6 +215,8 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactory // validate all options helper.validateExcept(STREAM_LOAD_PROP_PREFIX); + // sink parallelism + final Integer parallelism = context.getConfiguration().getOptional(SINK_PARALLELISM).orElse(null); Properties streamLoadProp = getStreamLoadProp(context.getCatalogTable().getOptions()); TableSchema physicalSchema = @@ -222,7 +226,8 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactory getDorisOptions(helper.getOptions()), getDorisReadOptions(helper.getOptions()), getDorisExecutionOptions(helper.getOptions(), streamLoadProp), - physicalSchema + physicalSchema, + parallelism ); } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java index 572e09c..2bba98b 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java @@ -14,6 +14,7 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. + package org.apache.doris.flink.table; import org.apache.doris.flink.cfg.DorisExecutionOptions; @@ -21,15 +22,14 @@ import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.cfg.DorisReadOptions; import org.apache.doris.flink.rest.RestService; import org.apache.doris.flink.sink.DorisSink; - import org.apache.doris.flink.sink.writer.RowDataSerializer; + import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.sink.DynamicTableSink; import org.apache.flink.table.connector.sink.SinkProvider; import org.apache.flink.table.data.RowData; import org.apache.flink.types.RowKind; - import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,15 +54,18 @@ public class DorisDynamicTableSink implements DynamicTableSink { private final DorisReadOptions readOptions; private final DorisExecutionOptions executionOptions; private final TableSchema tableSchema; + private final Integer sinkParallelism; public DorisDynamicTableSink(DorisOptions options, DorisReadOptions readOptions, DorisExecutionOptions executionOptions, - TableSchema tableSchema) { + TableSchema tableSchema, + Integer sinkParallelism) { this.options = options; this.readOptions = readOptions; this.executionOptions = executionOptions; this.tableSchema = tableSchema; + this.sinkParallelism = sinkParallelism; } @Override @@ -99,12 +102,12 @@ public class DorisDynamicTableSink implements DynamicTableSink { .setDorisReadOptions(readOptions) .setDorisExecutionOptions(executionOptions) .setSerializer(serializerBuilder.build()); - return SinkProvider.of(dorisSinkBuilder.build()); + return SinkProvider.of(dorisSinkBuilder.build(), sinkParallelism); } @Override public DynamicTableSink copy() { - return new DorisDynamicTableSink(options, readOptions, executionOptions, tableSchema); + return new DorisDynamicTableSink(options, readOptions, executionOptions, tableSchema, sinkParallelism); } @Override --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org