XComp commented on code in PR #23531:
URL: https://github.com/apache/flink/pull/23531#discussion_r1374561765


##########
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/JobResultStoreOptions.java:
##########
@@ -65,4 +65,11 @@ public class JobResultStoreOptions {
                                     + "are, instead, marked as clean to 
indicate their state. In this "
                                     + "case, Flink no longer has ownership and 
the resources need to "
                                     + "be cleaned up by the user.");
+
+    
@Documentation.Section(Documentation.Sections.COMMON_HIGH_AVAILABILITY_JOB_RESULT_STORE)
+    public static final ConfigOption<Integer> TIME_TO_REMOVE_CLEAN_JOB_RESULT =
+            ConfigOptions.key("job-result-store.ttl-clean-job-result")
+                    .intType()
+                    .defaultValue(10)

Review Comment:
   Didn't we agree on infinity as the default value here? :thinking: 



##########
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/JobResultStoreOptions.java:
##########
@@ -65,4 +65,11 @@ public class JobResultStoreOptions {
                                     + "are, instead, marked as clean to 
indicate their state. In this "
                                     + "case, Flink no longer has ownership and 
the resources need to "
                                     + "be cleaned up by the user.");
+
+    
@Documentation.Section(Documentation.Sections.COMMON_HIGH_AVAILABILITY_JOB_RESULT_STORE)
+    public static final ConfigOption<Integer> TIME_TO_REMOVE_CLEAN_JOB_RESULT =
+            ConfigOptions.key("job-result-store.ttl-clean-job-result")
+                    .intType()

Review Comment:
   ```suggestion
                       .durationType()
   ```
   There is a `durationType()` for config parameters. That allows the user to 
specify any time unit.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/JobResultStoreOptions.java:
##########
@@ -65,4 +65,11 @@ public class JobResultStoreOptions {
                                     + "are, instead, marked as clean to 
indicate their state. In this "
                                     + "case, Flink no longer has ownership and 
the resources need to "
                                     + "be cleaned up by the user.");
+
+    
@Documentation.Section(Documentation.Sections.COMMON_HIGH_AVAILABILITY_JOB_RESULT_STORE)
+    public static final ConfigOption<Integer> TIME_TO_REMOVE_CLEAN_JOB_RESULT =
+            ConfigOptions.key("job-result-store.ttl-clean-job-result")

Review Comment:
   The parameter name doesn't specify the embedded job result store as target 
component. Users who use the file-based component might be surprised that this 
option doesn't have any effect.
   
   But this keeps me wondering: Shouldn't we also offer this functionality to 
the file-based `JobResultStore`? WDYT? This allows for automatic removal of 
files after a certain amount of time. Sticking to the default value being set 
to `infinity` wouldn't change the current behavior.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/JobResultStoreOptions.java:
##########
@@ -65,4 +65,11 @@ public class JobResultStoreOptions {
                                     + "are, instead, marked as clean to 
indicate their state. In this "
                                     + "case, Flink no longer has ownership and 
the resources need to "
                                     + "be cleaned up by the user.");
+
+    
@Documentation.Section(Documentation.Sections.COMMON_HIGH_AVAILABILITY_JOB_RESULT_STORE)
+    public static final ConfigOption<Integer> TIME_TO_REMOVE_CLEAN_JOB_RESULT =

Review Comment:
   I'm wondering whether we want to start a dev ML discussion on that one 
considering that configuration parameters are considered public API (I guess, a 
dedicated FLIP would be overkill here). WDYT?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedJobResultStore.java:
##########
@@ -67,7 +79,7 @@ public boolean hasDirtyJobResultEntryInternal(JobID jobId) {
 
     @Override
     public boolean hasCleanJobResultEntryInternal(JobID jobId) {
-        return cleanJobResults.containsKey(jobId);
+        return cleanJobResults.asMap().containsKey(jobId);

Review Comment:
   ```suggestion
           return cleanJobResults.getIfPresent(jobId) != null;
   ```
   nit: There is a interface method that appears to be a better fit to the use 
case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to