XComp commented on a change in pull request #18749: URL: https://github.com/apache/flink/pull/18749#discussion_r806241092
########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/DispatcherResourceCleanerFactoryTest.java ########## @@ -192,7 +192,6 @@ private void assertLocalCleanupNotTriggered() { assertThat(jobManagerRunnerRegistryLocalCleanupFuture).isNotDone(); assertThat(jobGraphWriterLocalCleanupFuture).isNotDone(); assertThat(blobServer.getLocalCleanupFuture()).isNotDone(); - assertThat(jobManagerMetricGroup.numRegisteredJobMetricGroups()).isEqualTo(1); Review comment: There are also cases where the local and global cleanup shall be performed in a synchronized fashion (e.g. `BlobServer`) which required to have the two interfaces (i.e. the two different types of cleanup methods) being operatable independent from each other. I updated the JavaDoc of `GloballyCleanableResource` and `LocallyCleanableResource` to reflect. The decision on what method to call is now based on whether the job reaches a locally terminal or globally terminal state. I think that makes the intention of these two interfaces clearer. WDYT? ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/DispatcherResourceCleanerFactoryTest.java ########## @@ -192,7 +192,6 @@ private void assertLocalCleanupNotTriggered() { assertThat(jobManagerRunnerRegistryLocalCleanupFuture).isNotDone(); assertThat(jobGraphWriterLocalCleanupFuture).isNotDone(); assertThat(blobServer.getLocalCleanupFuture()).isNotDone(); - assertThat(jobManagerMetricGroup.numRegisteredJobMetricGroups()).isEqualTo(1); Review comment: There are also cases where the local and global cleanup shall be performed in a synchronized fashion (e.g. `BlobServer`) which required to have the two interfaces (i.e. the two different types of cleanup methods) being operatable independent from each other. I updated the JavaDoc of `GloballyCleanableResource` and `LocallyCleanableResource` to reflect this. The decision on what method to call is now based on whether the job reaches a locally terminal or globally terminal state. I think that makes the intention of these two interfaces clearer. WDYT? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java ########## @@ -38,7 +39,7 @@ * tasks any more */ public class JobManagerMetricGroup extends ComponentMetricGroup<JobManagerMetricGroup> - implements LocallyCleanableResource { + implements LocallyCleanableResource, GloballyCleanableResource { Review comment: That's actually a cleaner approach I haven't thought of. That would also apply to the `JobManagerRunnerRegistry`. It makes sense because the `DispatcherResourceCleanerFactory` is more closely tight to the `Dispatcher`. I will adapt the PR accordingly and update the JavaDoc to fit. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DispatcherResourceCleanerFactory.java ########## @@ -101,13 +101,13 @@ public ResourceCleaner createLocalResourceCleaner( @Override public ResourceCleaner createGlobalResourceCleaner( ComponentMainThreadExecutor mainThreadExecutor) { - return DefaultResourceCleaner.forGloballyCleanableResources( mainThreadExecutor, cleanupExecutor, retryStrategy) - .withPrioritizedCleanup(jobManagerRunnerRegistry) + .withPrioritizedCleanup(ofLocalResource(jobManagerRunnerRegistry)) .withRegularCleanup(jobGraphWriter) Review comment: > These question are primarily based on my assumption that the only benefit of duplicating everything is that it allows us to cleanup things in a specific order that local cleanup -> global cleanup wouldn't provide. The global cleanup of the `BlobServer` and the `DefaultJobGraphStore` clean the local data and global data atomically. This wouldn't be possible if we say that we let the local data being handled by the `localCleanupAsync` call and global data being handled by the `globalCleanupAsync` call. It would leave us in an intermediate inconsistent state. That's the reason why we don't add a dependency between local cleanup and global cleanup. The individual resource has take care of the cleanup of each of the cleanups individually. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DispatcherResourceCleanerFactory.java ########## @@ -101,13 +101,13 @@ public ResourceCleaner createLocalResourceCleaner( @Override public ResourceCleaner createGlobalResourceCleaner( ComponentMainThreadExecutor mainThreadExecutor) { - return DefaultResourceCleaner.forGloballyCleanableResources( mainThreadExecutor, cleanupExecutor, retryStrategy) - .withPrioritizedCleanup(jobManagerRunnerRegistry) + .withPrioritizedCleanup(ofLocalResource(jobManagerRunnerRegistry)) .withRegularCleanup(jobGraphWriter) Review comment: > These question are primarily based on my assumption that the only benefit of duplicating everything is that it allows us to cleanup things in a specific order that local cleanup -> global cleanup wouldn't provide. The global cleanup of the `BlobServer` and the `DefaultJobGraphStore` clean the local data and global data atomically. This wouldn't be possible if we say that we let the local data being handled by the `localCleanupAsync` call and global data being handled by the `globalCleanupAsync` call. It would leave us in an intermediate inconsistent state. That's the reason why we don't add a dependency between local cleanup and global cleanup. The individual resource has to take care of the cleanup of each of the cleanups individually. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DispatcherResourceCleanerFactory.java ########## @@ -101,13 +101,13 @@ public ResourceCleaner createLocalResourceCleaner( @Override public ResourceCleaner createGlobalResourceCleaner( ComponentMainThreadExecutor mainThreadExecutor) { - return DefaultResourceCleaner.forGloballyCleanableResources( mainThreadExecutor, cleanupExecutor, retryStrategy) - .withPrioritizedCleanup(jobManagerRunnerRegistry) + .withPrioritizedCleanup(ofLocalResource(jobManagerRunnerRegistry)) .withRegularCleanup(jobGraphWriter) Review comment: > These question are primarily based on my assumption that the only benefit of duplicating everything is that it allows us to cleanup things in a specific order that local cleanup -> global cleanup wouldn't provide. The global cleanup of the `BlobServer` and the `DefaultJobGraphStore` clean the local data and global data atomically. This wouldn't be possible if we say that we let the local data being handled by the `localCleanupAsync` call and global data being handled by the `globalCleanupAsync` call. It would leave us in an intermediate inconsistent state. That's the reason why we don't add a dependency between local cleanup and global cleanup. The individual resource has to take care of the local and the global cleanup individually to ensure consistency. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/LocallyCleanableResource.java ########## @@ -24,12 +24,11 @@ import java.util.concurrent.Executor; /** - * {@code LocallyCleanableResource} is supposed to be used by any class that provides artifacts for - * a given job that can be cleaned up locally. Artifacts considered to be local are located on the - * JobManager instance itself and won't survive a failover scenario. These artifacts are, in - * contrast to {@link GloballyCleanableResource} artifacts, going to be cleaned up even after the - * job reaches a locally-terminated state. + * {@code LocallyCleanableResource} is supposed to be implemented by any class that provides + * artifacts for a given job that need to be cleaned up after the job reached a local terminal Review comment: I made the JavaDoc a bit more specific for the local cleanup vs global cleanup discussion -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org