Myasuka commented on a change in pull request #18976:
URL: https://github.com/apache/flink/pull/18976#discussion_r820419673



##########
File path: 
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorage.java
##########
@@ -72,14 +73,19 @@ public FsStateChangelogStorage(
             ChangelogStorageMetricGroup metricGroup)
             throws IOException {
         this(

Review comment:
       If this constructor is only for testing, and `directScheduler` is only 
used here. How about remove this constructor to test scope package?

##########
File path: 
flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/TestingStateChangeUploader.java
##########
@@ -26,22 +27,30 @@
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
 
+import static java.util.Collections.emptyMap;
 import static java.util.stream.Collectors.toList;
 
 class TestingStateChangeUploader implements StateChangeUploader {
     private final Collection<StateChangeSet> uploaded = new 
CopyOnWriteArrayList<>();
-    private final List<UploadTask> tasks = new CopyOnWriteArrayList<>();
+    private final List<UploadTask> tasks;
     private boolean closed;
 
+    TestingStateChangeUploader() {
+        tasks = new CopyOnWriteArrayList<>();

Review comment:
       Maybe not related with this PR, why we must make it as a 
`CopyOnWriteArrayList`?

##########
File path: 
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/RetryingExecutor.java
##########
@@ -81,11 +111,20 @@ public void close() throws Exception {
      *
      * <p>NOTE: the action must be idempotent because of potential concurrent 
attempts.
      */
-    interface RetriableAction extends RunnableWithException {}
+    interface RetriableAction<Result> {
+        Result tryExecute() throws Exception;
+
+        void completeWithResult(Result result);
+
+        void discardResult(Result result) throws Exception;
+
+        void handleFailure(Throwable throwable);

Review comment:
       I think it would be clear to add javadocs for these methods. In other 
words, under what case, will `completeWithResult` be called, and under what 
case will `discardResult` be called. Moreover, what kind of failure would 
`handleFailure` handle. 

##########
File path: 
flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/RetryingExecutorTest.java
##########
@@ -55,6 +65,62 @@ public void testFixedRetryLimit() throws Exception {
         testPolicy(5, RetryPolicy.fixed(5, 0, 0), FAILING_TASK);
     }
 
+    @Test
+    public void testDiscardOnTimeout() throws Exception {
+        int timeoutMs = 5;
+        int numAttempts = 7;
+        int successfulAttempt = numAttempts - 1;
+        List<Integer> completed = new CopyOnWriteArrayList<>();
+        List<Integer> discarded = new CopyOnWriteArrayList<>();
+        AtomicBoolean executionBlocked = new AtomicBoolean(true);
+        Deadline deadline = Deadline.fromNow(Duration.ofMillis(timeoutMs * 
numAttempts * 2));

Review comment:
       The test itself is easy to expire due to the too short timeout duration. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to