This is an automated email from the ASF dual-hosted git repository.

jiafengzheng pushed a commit to branch branch-for-flink-before-1.13
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/branch-for-flink-before-1.13 
by this push:
     new 154bcf4  update (#67)
154bcf4 is described below

commit 154bcf4161ba29a90d9d0603c241dda85ff85901
Author: wudi <676366...@qq.com>
AuthorDate: Wed Sep 28 08:59:12 2022 +0800

    update (#67)
    
    Co-authored-by: wudi <>
---
 .../java/org/apache/doris/flink/cfg/GenericDorisSinkFunction.java    | 2 +-
 .../java/org/apache/doris/flink/table/DorisDynamicTableSink.java     | 5 +++--
 2 files changed, 4 insertions(+), 3 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/GenericDorisSinkFunction.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/GenericDorisSinkFunction.java
index 0ea11d0..a055195 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/GenericDorisSinkFunction.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/GenericDorisSinkFunction.java
@@ -59,7 +59,7 @@ public class GenericDorisSinkFunction<T> extends 
RichSinkFunction<T>
 
     @Override
     public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
-
+        outputFormat.flush();
     }
 
     @Override
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java
index cccdb45..813669b 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java
@@ -19,10 +19,11 @@ package org.apache.doris.flink.table;
 import org.apache.doris.flink.cfg.DorisExecutionOptions;
 import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.cfg.DorisReadOptions;
+import org.apache.doris.flink.cfg.GenericDorisSinkFunction;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
-import org.apache.flink.table.connector.sink.OutputFormatProvider;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
 import org.apache.flink.types.RowKind;
 
 /**
@@ -65,7 +66,7 @@ public class DorisDynamicTableSink implements 
DynamicTableSink {
                 .setExecutionOptions(executionOptions)
                 .setFieldDataTypes(tableSchema.getFieldDataTypes())
                 .setFieldNames(tableSchema.getFieldNames());
-        return OutputFormatProvider.of(builder.build());
+        return SinkFunctionProvider.of(new 
GenericDorisSinkFunction(builder.build()));
     }
 
     @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to