mxm commented on code in PR #26640: URL: https://github.com/apache/flink/pull/26640#discussion_r2139665565
########## flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBStateUploader.java: ########## @@ -39,25 +39,32 @@ import java.io.InputStream; import java.nio.file.Files; import java.nio.file.Path; +import java.time.Duration; import java.util.ArrayList; import java.util.List; +import java.util.Random; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; /** Help class for uploading RocksDB state files. */ public class RocksDBStateUploader implements Closeable { private static final int READ_BUFFER_SIZE = 16 * 1024; - + private final Duration uploadJitter; + private final Random random; private final RocksDBStateDataTransferHelper transfer; @VisibleForTesting - public RocksDBStateUploader(int numberOfSnapshottingThreads) { - this(RocksDBStateDataTransferHelper.forThreadNum(numberOfSnapshottingThreads)); + public RocksDBStateUploader(int numberOfSnapshottingThreads, Duration uploadJitter) { + this( + RocksDBStateDataTransferHelper.forThreadNum(numberOfSnapshottingThreads), + uploadJitter); } - public RocksDBStateUploader(RocksDBStateDataTransferHelper transfer) { + public RocksDBStateUploader(RocksDBStateDataTransferHelper transfer, Duration uploadJitter) { this.transfer = transfer; + this.uploadJitter = uploadJitter; + this.random = new Random(uploadJitter.toMillis()); Review Comment: This will use the same seed in all the TaskManagers. Not sure if that is what we want. Probably using the default constructor is fine. ########## flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBStateUploader.java: ########## @@ -180,6 +189,17 @@ private HandleAndLocalPath uploadLocalFileToCheckpointFs( } } + @VisibleForTesting + long applyJitter() throws InterruptedException { Review Comment: We can split this into two methods. One for calculating the wait time, another for doing the sleep. Alternatively, you could pass in the sleep method as a parameter, so you can swap it in the tests. ########## flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBStateUploader.java: ########## @@ -180,6 +189,17 @@ private HandleAndLocalPath uploadLocalFileToCheckpointFs( } } + @VisibleForTesting + long applyJitter() throws InterruptedException { Review Comment: Why are we returning long here? For testing? Probably better to do a `Thread.sleep(getWaitTime())` then. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org