masteryhx commented on code in PR #26498: URL: https://github.com/apache/flink/pull/26498#discussion_r2059892915
########## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java: ########## @@ -267,18 +278,49 @@ private void snapshotActiveBuckets( final long checkpointId, final ListState<byte[]> bucketStatesContainer) throws Exception { - for (Bucket<IN, BucketID> bucket : activeBuckets.values()) { - final BucketState<BucketID> bucketState = bucket.onReceptionOfCheckpoint(checkpointId); - + long start = System.currentTimeMillis(); + List<CompletableFuture<BucketState<BucketID>>> futures = + activeBuckets.values().stream() + .map( + bucket -> + CompletableFuture.supplyAsync( + () -> { + try { + BucketState<BucketID> bucketState = + bucket.onReceptionOfCheckpoint( + checkpointId); + if (LOG.isDebugEnabled()) { + LOG.debug( + "Subtask {} checkpointing: {}", + subtaskIndex, + bucketState); + } + return bucketState; + } catch (IOException e) { + throw new CompletionException(e); Review Comment: Add some context for any exception like subtask, bucket? ########## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java: ########## @@ -36,7 +37,13 @@ import java.util.Collections; import java.util.HashMap; import java.util.Iterator; +import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.stream.Collectors; /** * The manager of the different active buckets in the {@link StreamingFileSink}. Review Comment: How about also supporting FileSink, the refactored one ? ########## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java: ########## @@ -267,18 +278,49 @@ private void snapshotActiveBuckets( final long checkpointId, final ListState<byte[]> bucketStatesContainer) throws Exception { - for (Bucket<IN, BucketID> bucket : activeBuckets.values()) { - final BucketState<BucketID> bucketState = bucket.onReceptionOfCheckpoint(checkpointId); - + long start = System.currentTimeMillis(); + List<CompletableFuture<BucketState<BucketID>>> futures = Review Comment: How about adding a test about partial active buckets failure ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org