masteryhx commented on code in PR #26498:
URL: https://github.com/apache/flink/pull/26498#discussion_r2059892915


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java:
##########
@@ -267,18 +278,49 @@ private void snapshotActiveBuckets(
             final long checkpointId, final ListState<byte[]> 
bucketStatesContainer)
             throws Exception {
 
-        for (Bucket<IN, BucketID> bucket : activeBuckets.values()) {
-            final BucketState<BucketID> bucketState = 
bucket.onReceptionOfCheckpoint(checkpointId);
-
+        long start = System.currentTimeMillis();
+        List<CompletableFuture<BucketState<BucketID>>> futures =
+                activeBuckets.values().stream()
+                        .map(
+                                bucket ->
+                                        CompletableFuture.supplyAsync(
+                                                () -> {
+                                                    try {
+                                                        BucketState<BucketID> 
bucketState =
+                                                                
bucket.onReceptionOfCheckpoint(
+                                                                        
checkpointId);
+                                                        if 
(LOG.isDebugEnabled()) {
+                                                            LOG.debug(
+                                                                    "Subtask 
{} checkpointing: {}",
+                                                                    
subtaskIndex,
+                                                                    
bucketState);
+                                                        }
+                                                        return bucketState;
+                                                    } catch (IOException e) {
+                                                        throw new 
CompletionException(e);

Review Comment:
   Add some context for any exception like subtask, bucket?



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java:
##########
@@ -36,7 +37,13 @@
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.stream.Collectors;
 
 /**
  * The manager of the different active buckets in the {@link 
StreamingFileSink}.

Review Comment:
   How about also supporting FileSink, the refactored one ?



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java:
##########
@@ -267,18 +278,49 @@ private void snapshotActiveBuckets(
             final long checkpointId, final ListState<byte[]> 
bucketStatesContainer)
             throws Exception {
 
-        for (Bucket<IN, BucketID> bucket : activeBuckets.values()) {
-            final BucketState<BucketID> bucketState = 
bucket.onReceptionOfCheckpoint(checkpointId);
-
+        long start = System.currentTimeMillis();
+        List<CompletableFuture<BucketState<BucketID>>> futures =

Review Comment:
   How about adding a test about partial active buckets failure ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to