mxm commented on code in PR #26640:
URL: https://github.com/apache/flink/pull/26640#discussion_r2139665565


##########
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBStateUploader.java:
##########
@@ -39,25 +39,32 @@
 import java.io.InputStream;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Random;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
 
 /** Help class for uploading RocksDB state files. */
 public class RocksDBStateUploader implements Closeable {
     private static final int READ_BUFFER_SIZE = 16 * 1024;
-
+    private final Duration uploadJitter;
+    private final Random random;
     private final RocksDBStateDataTransferHelper transfer;
 
     @VisibleForTesting
-    public RocksDBStateUploader(int numberOfSnapshottingThreads) {
-        
this(RocksDBStateDataTransferHelper.forThreadNum(numberOfSnapshottingThreads));
+    public RocksDBStateUploader(int numberOfSnapshottingThreads, Duration 
uploadJitter) {
+        this(
+                
RocksDBStateDataTransferHelper.forThreadNum(numberOfSnapshottingThreads),
+                uploadJitter);
     }
 
-    public RocksDBStateUploader(RocksDBStateDataTransferHelper transfer) {
+    public RocksDBStateUploader(RocksDBStateDataTransferHelper transfer, 
Duration uploadJitter) {
         this.transfer = transfer;
+        this.uploadJitter = uploadJitter;
+        this.random = new Random(uploadJitter.toMillis());

Review Comment:
   This will use the same seed in all the TaskManagers. Not sure if that is 
what we want. Probably using the default constructor is fine.



##########
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBStateUploader.java:
##########
@@ -180,6 +189,17 @@ private HandleAndLocalPath uploadLocalFileToCheckpointFs(
         }
     }
 
+    @VisibleForTesting
+    long applyJitter() throws InterruptedException {

Review Comment:
   We can split this into two methods. One for calculating the wait time, 
another for doing the sleep. Alternatively, you could pass in the sleep method 
as a parameter, so you can swap it in the tests.



##########
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBStateUploader.java:
##########
@@ -180,6 +189,17 @@ private HandleAndLocalPath uploadLocalFileToCheckpointFs(
         }
     }
 
+    @VisibleForTesting
+    long applyJitter() throws InterruptedException {

Review Comment:
   Why are we returning long here? For testing? Probably better to do a 
`Thread.sleep(getWaitTime())` then.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to