luoyuxia commented on a change in pull request #16027:
URL: https://github.com/apache/flink/pull/16027#discussion_r645228194



##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileWriter.java
##########
@@ -26,29 +27,52 @@
 
 import java.util.ArrayList;
 import java.util.HashSet;
+import java.util.List;
 import java.util.NavigableMap;
 import java.util.Set;
 import java.util.TreeMap;
 
+import static 
org.apache.flink.table.filesystem.FileSystemOptions.SINK_PARTITION_COMMIT_POLICY_KIND;
+
 /** Writer for emitting {@link PartitionCommitInfo} to downstream. */
 public class StreamingFileWriter<IN> extends AbstractStreamingWriter<IN, 
PartitionCommitInfo> {
 
     private static final long serialVersionUID = 2L;
 
+    private final List<String> partitionKeys;
+    private final Configuration conf;
+
     private transient Set<String> currentNewPartitions;
     private transient TreeMap<Long, Set<String>> newPartitions;
     private transient Set<String> committablePartitions;
 
+    private transient PartitionCommitTrigger trigger;
+
     public StreamingFileWriter(
             long bucketCheckInterval,
             StreamingFileSink.BucketsBuilder<
                             IN, String, ? extends 
StreamingFileSink.BucketsBuilder<IN, String, ?>>
-                    bucketsBuilder) {
+                    bucketsBuilder,
+            List<String> partitionKeys,
+            Configuration conf) {
         super(bucketCheckInterval, bucketsBuilder);
+        this.partitionKeys = partitionKeys;
+        this.conf = conf;
     }
 
     @Override
     public void initializeState(StateInitializationContext context) throws 
Exception {
+        if (isPartitionCommitTriggerEnabled()) {
+            trigger =
+                    PartitionCommitTrigger.create(
+                            context.isRestored(),
+                            context.getOperatorStateStore(),

Review comment:
       Thanks for your suggestion.
   I have refactor the PartitionCommitTrigger and remove any state store in 
StreamFileWriter.
   Look forward your review again.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to