This is an automated email from the ASF dual-hosted git repository. jiafengzheng pushed a commit to branch branch-for-flink-before-1.13 in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/branch-for-flink-before-1.13 by this push: new 154bcf4 update (#67) 154bcf4 is described below commit 154bcf4161ba29a90d9d0603c241dda85ff85901 Author: wudi <676366...@qq.com> AuthorDate: Wed Sep 28 08:59:12 2022 +0800 update (#67) Co-authored-by: wudi <> --- .../java/org/apache/doris/flink/cfg/GenericDorisSinkFunction.java | 2 +- .../java/org/apache/doris/flink/table/DorisDynamicTableSink.java | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/GenericDorisSinkFunction.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/GenericDorisSinkFunction.java index 0ea11d0..a055195 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/GenericDorisSinkFunction.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/GenericDorisSinkFunction.java @@ -59,7 +59,7 @@ public class GenericDorisSinkFunction<T> extends RichSinkFunction<T> @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { - + outputFormat.flush(); } @Override diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java index cccdb45..813669b 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java @@ -19,10 +19,11 @@ package org.apache.doris.flink.table; import org.apache.doris.flink.cfg.DorisExecutionOptions; import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.cfg.DorisReadOptions; +import org.apache.doris.flink.cfg.GenericDorisSinkFunction; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.sink.DynamicTableSink; -import org.apache.flink.table.connector.sink.OutputFormatProvider; +import org.apache.flink.table.connector.sink.SinkFunctionProvider; import org.apache.flink.types.RowKind; /** @@ -65,7 +66,7 @@ public class DorisDynamicTableSink implements DynamicTableSink { .setExecutionOptions(executionOptions) .setFieldDataTypes(tableSchema.getFieldDataTypes()) .setFieldNames(tableSchema.getFieldNames()); - return OutputFormatProvider.of(builder.build()); + return SinkFunctionProvider.of(new GenericDorisSinkFunction(builder.build())); } @Override --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org