XComp commented on a change in pull request #18749:
URL: https://github.com/apache/flink/pull/18749#discussion_r806241092



##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/DispatcherResourceCleanerFactoryTest.java
##########
@@ -192,7 +192,6 @@ private void assertLocalCleanupNotTriggered() {
         assertThat(jobManagerRunnerRegistryLocalCleanupFuture).isNotDone();
         assertThat(jobGraphWriterLocalCleanupFuture).isNotDone();
         assertThat(blobServer.getLocalCleanupFuture()).isNotDone();
-        
assertThat(jobManagerMetricGroup.numRegisteredJobMetricGroups()).isEqualTo(1);

Review comment:
       There are also cases where the local and global cleanup shall be 
performed in a synchronized fashion (e.g. `BlobServer`) which required to have 
the two interfaces (i.e. the two different types of cleanup methods) being 
operatable independent from each other. I updated the JavaDoc of 
`GloballyCleanableResource` and `LocallyCleanableResource` to reflect. The 
decision on what method to call is now based on whether the job reaches a 
locally terminal or globally terminal state. I think that makes the intention 
of these two interfaces clearer. WDYT?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/DispatcherResourceCleanerFactoryTest.java
##########
@@ -192,7 +192,6 @@ private void assertLocalCleanupNotTriggered() {
         assertThat(jobManagerRunnerRegistryLocalCleanupFuture).isNotDone();
         assertThat(jobGraphWriterLocalCleanupFuture).isNotDone();
         assertThat(blobServer.getLocalCleanupFuture()).isNotDone();
-        
assertThat(jobManagerMetricGroup.numRegisteredJobMetricGroups()).isEqualTo(1);

Review comment:
       There are also cases where the local and global cleanup shall be 
performed in a synchronized fashion (e.g. `BlobServer`) which required to have 
the two interfaces (i.e. the two different types of cleanup methods) being 
operatable independent from each other. I updated the JavaDoc of 
`GloballyCleanableResource` and `LocallyCleanableResource` to reflect this. The 
decision on what method to call is now based on whether the job reaches a 
locally terminal or globally terminal state. I think that makes the intention 
of these two interfaces clearer. WDYT?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java
##########
@@ -38,7 +39,7 @@
  * tasks any more
  */
 public class JobManagerMetricGroup extends 
ComponentMetricGroup<JobManagerMetricGroup>
-        implements LocallyCleanableResource {
+        implements LocallyCleanableResource, GloballyCleanableResource {

Review comment:
       That's actually a cleaner approach I haven't thought of. That would also 
apply to the `JobManagerRunnerRegistry`. It makes sense because the 
`DispatcherResourceCleanerFactory` is more closely tight to the `Dispatcher`. I 
will adapt the PR accordingly and update the JavaDoc to fit.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DispatcherResourceCleanerFactory.java
##########
@@ -101,13 +101,13 @@ public ResourceCleaner createLocalResourceCleaner(
     @Override
     public ResourceCleaner createGlobalResourceCleaner(
             ComponentMainThreadExecutor mainThreadExecutor) {
-
         return DefaultResourceCleaner.forGloballyCleanableResources(
                         mainThreadExecutor, cleanupExecutor, retryStrategy)
-                .withPrioritizedCleanup(jobManagerRunnerRegistry)
+                
.withPrioritizedCleanup(ofLocalResource(jobManagerRunnerRegistry))
                 .withRegularCleanup(jobGraphWriter)

Review comment:
       > These question are primarily based on my assumption that the only 
benefit of duplicating everything is that it allows us to cleanup things in a 
specific order that local cleanup -> global cleanup wouldn't provide.
   
   The global cleanup of the `BlobServer` and the `DefaultJobGraphStore` clean 
the local data and global data atomically. This wouldn't be possible if we say 
that we let the local data being handled by the `localCleanupAsync` call and 
global data being handled by the `globalCleanupAsync` call. It would leave us 
in an intermediate inconsistent state. 
   
   That's the reason why we don't add a dependency between local cleanup and 
global cleanup. The individual resource has take care of the cleanup of each of 
the cleanups individually.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DispatcherResourceCleanerFactory.java
##########
@@ -101,13 +101,13 @@ public ResourceCleaner createLocalResourceCleaner(
     @Override
     public ResourceCleaner createGlobalResourceCleaner(
             ComponentMainThreadExecutor mainThreadExecutor) {
-
         return DefaultResourceCleaner.forGloballyCleanableResources(
                         mainThreadExecutor, cleanupExecutor, retryStrategy)
-                .withPrioritizedCleanup(jobManagerRunnerRegistry)
+                
.withPrioritizedCleanup(ofLocalResource(jobManagerRunnerRegistry))
                 .withRegularCleanup(jobGraphWriter)

Review comment:
       > These question are primarily based on my assumption that the only 
benefit of duplicating everything is that it allows us to cleanup things in a 
specific order that local cleanup -> global cleanup wouldn't provide.
   
   The global cleanup of the `BlobServer` and the `DefaultJobGraphStore` clean 
the local data and global data atomically. This wouldn't be possible if we say 
that we let the local data being handled by the `localCleanupAsync` call and 
global data being handled by the `globalCleanupAsync` call. It would leave us 
in an intermediate inconsistent state. 
   
   That's the reason why we don't add a dependency between local cleanup and 
global cleanup. The individual resource has to take care of the cleanup of each 
of the cleanups individually.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DispatcherResourceCleanerFactory.java
##########
@@ -101,13 +101,13 @@ public ResourceCleaner createLocalResourceCleaner(
     @Override
     public ResourceCleaner createGlobalResourceCleaner(
             ComponentMainThreadExecutor mainThreadExecutor) {
-
         return DefaultResourceCleaner.forGloballyCleanableResources(
                         mainThreadExecutor, cleanupExecutor, retryStrategy)
-                .withPrioritizedCleanup(jobManagerRunnerRegistry)
+                
.withPrioritizedCleanup(ofLocalResource(jobManagerRunnerRegistry))
                 .withRegularCleanup(jobGraphWriter)

Review comment:
       > These question are primarily based on my assumption that the only 
benefit of duplicating everything is that it allows us to cleanup things in a 
specific order that local cleanup -> global cleanup wouldn't provide.
   
   The global cleanup of the `BlobServer` and the `DefaultJobGraphStore` clean 
the local data and global data atomically. This wouldn't be possible if we say 
that we let the local data being handled by the `localCleanupAsync` call and 
global data being handled by the `globalCleanupAsync` call. It would leave us 
in an intermediate inconsistent state. 
   
   That's the reason why we don't add a dependency between local cleanup and 
global cleanup. The individual resource has to take care of the local and the 
global cleanup individually to ensure consistency.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/LocallyCleanableResource.java
##########
@@ -24,12 +24,11 @@
 import java.util.concurrent.Executor;
 
 /**
- * {@code LocallyCleanableResource} is supposed to be used by any class that 
provides artifacts for
- * a given job that can be cleaned up locally. Artifacts considered to be 
local are located on the
- * JobManager instance itself and won't survive a failover scenario. These 
artifacts are, in
- * contrast to {@link GloballyCleanableResource} artifacts, going to be 
cleaned up even after the
- * job reaches a locally-terminated state.
+ * {@code LocallyCleanableResource} is supposed to be implemented by any class 
that provides
+ * artifacts for a given job that need to be cleaned up after the job reached 
a local terminal

Review comment:
       I made the JavaDoc a bit more specific for the local cleanup vs global 
cleanup discussion




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to