mxm commented on code in PR #26640:
URL: https://github.com/apache/flink/pull/26640#discussion_r2137165179


##########
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBStateUploader.java:
##########
@@ -180,6 +188,14 @@ private HandleAndLocalPath uploadLocalFileToCheckpointFs(
         }
     }
 
+    @VisibleForTesting
+    long applyJitter() throws InterruptedException {
+        long sleepMilliseconds =
+                ThreadLocalRandom.current().nextLong(0, 
uploadJitter.toMillis() + 1);
+        Thread.sleep(sleepMilliseconds);
+        return sleepMilliseconds;

Review Comment:
   Looks like we will always call this code
   
   ```suggestion
       long applyJitter() throws InterruptedException {
           if (uploadJitter.isZero()) {
               return;
           }
           long sleepMilliseconds =
                   ThreadLocalRandom.current().nextLong(0, 
uploadJitter.toMillis() + 1);
           Thread.sleep(sleepMilliseconds);
           return sleepMilliseconds;
   ```



##########
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBStateUploader.java:
##########
@@ -43,21 +43,26 @@
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ThreadLocalRandom;

Review Comment:
   I think we should be using a single instance of Random. This will ensure 
that the seed is shared across all Threads. Random is thread-safe. 



##########
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBOptions.java:
##########
@@ -91,6 +93,15 @@ public class RocksDBOptions {
                                     + CLUSTER_IO_EXECUTOR_POOL_SIZE.key()
                                     + ")");
 
+    /** The number of millisecond for RocksDBStateBackend checkpoint file 
upload jitter. */
+    @Documentation.Section(Documentation.Sections.EXPERT_ROCKSDB)
+    public static final ConfigOption<Duration> CHECKPOINT_UPLOAD_JITTER =
+            ConfigOptions.key("state.backend.rocksdb.checkpoint.upload-jitter")
+                    .durationType()
+                    .defaultValue(Duration.ofMillis(0L))

Review Comment:
   ```suggestion
                       .defaultValue(Duration.ZERO)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to