zentol commented on a change in pull request #18749: URL: https://github.com/apache/flink/pull/18749#discussion_r806852636
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/LocallyCleanableResource.java ########## @@ -24,12 +24,11 @@ import java.util.concurrent.Executor; /** - * {@code LocallyCleanableResource} is supposed to be used by any class that provides artifacts for - * a given job that can be cleaned up locally. Artifacts considered to be local are located on the - * JobManager instance itself and won't survive a failover scenario. These artifacts are, in - * contrast to {@link GloballyCleanableResource} artifacts, going to be cleaned up even after the - * job reaches a locally-terminated state. + * {@code LocallyCleanableResource} is supposed to be implemented by any class that provides + * artifacts for a given job that need to be cleaned up after the job reached a local terminal Review comment: Are these also cleaned up when the job reaches a global terminal state? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/LocallyCleanableResource.java ########## @@ -24,12 +24,11 @@ import java.util.concurrent.Executor; /** - * {@code LocallyCleanableResource} is supposed to be used by any class that provides artifacts for - * a given job that can be cleaned up locally. Artifacts considered to be local are located on the - * JobManager instance itself and won't survive a failover scenario. These artifacts are, in - * contrast to {@link GloballyCleanableResource} artifacts, going to be cleaned up even after the - * job reaches a locally-terminated state. + * {@code LocallyCleanableResource} is supposed to be implemented by any class that provides + * artifacts for a given job that need to be cleaned up after the job reached a local terminal Review comment: And if not, shouldn't they? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DispatcherResourceCleanerFactory.java ########## @@ -101,13 +101,13 @@ public ResourceCleaner createLocalResourceCleaner( @Override public ResourceCleaner createGlobalResourceCleaner( ComponentMainThreadExecutor mainThreadExecutor) { - return DefaultResourceCleaner.forGloballyCleanableResources( mainThreadExecutor, cleanupExecutor, retryStrategy) - .withPrioritizedCleanup(jobManagerRunnerRegistry) + .withPrioritizedCleanup(ofLocalResource(jobManagerRunnerRegistry)) .withRegularCleanup(jobGraphWriter) Review comment: I don't understand why we need to replicate things here. Shouldn't every local resources always be cleaned up on a global cleanup? Why can we not, in the case where currently we'd use the global cleaner, first call the local cleaner and then run the global one? Is there any use-case where a local resource should _not_ be cleaned up on a global terminal state? Is there any use-case where a local resource should be cleaned _after_ global resource? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DispatcherResourceCleanerFactory.java ########## @@ -101,13 +101,13 @@ public ResourceCleaner createLocalResourceCleaner( @Override public ResourceCleaner createGlobalResourceCleaner( ComponentMainThreadExecutor mainThreadExecutor) { - return DefaultResourceCleaner.forGloballyCleanableResources( mainThreadExecutor, cleanupExecutor, retryStrategy) - .withPrioritizedCleanup(jobManagerRunnerRegistry) + .withPrioritizedCleanup(ofLocalResource(jobManagerRunnerRegistry)) .withRegularCleanup(jobGraphWriter) Review comment: I don't understand why we need to replicate things here. Shouldn't every local resources always be cleaned up on a global cleanup? Why can we not, in the case where currently we'd use the global cleaner, first call the local cleaner and then run the global one? Is there any use-case where a local resource should _not_ be cleaned up on a global terminal state? Is there any use-case where a local resource should be cleaned _after_ a global resource? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DispatcherResourceCleanerFactory.java ########## @@ -101,13 +101,13 @@ public ResourceCleaner createLocalResourceCleaner( @Override public ResourceCleaner createGlobalResourceCleaner( ComponentMainThreadExecutor mainThreadExecutor) { - return DefaultResourceCleaner.forGloballyCleanableResources( mainThreadExecutor, cleanupExecutor, retryStrategy) - .withPrioritizedCleanup(jobManagerRunnerRegistry) + .withPrioritizedCleanup(ofLocalResource(jobManagerRunnerRegistry)) .withRegularCleanup(jobGraphWriter) Review comment: I don't understand why we need to replicate things here. Shouldn't every local resources always be cleaned up on a global cleanup? Why can we not, in the case where currently we'd use the global cleaner, first call the local cleaner and then run the global one? Is there any use-case where a local resource should _not_ be cleaned up on a global terminal state? Is there any use-case where a local resource should be cleaned _after_ a global resource? (or vice-versa, where a global resource should be cleaned up _before_ a local resource). These question primarily aim at my assumption that the only benefit of duplicating everything is that it allows us to cleanup things in a specific order that local cleanup -> global cleanup wouldn't provide. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DispatcherResourceCleanerFactory.java ########## @@ -101,13 +101,13 @@ public ResourceCleaner createLocalResourceCleaner( @Override public ResourceCleaner createGlobalResourceCleaner( ComponentMainThreadExecutor mainThreadExecutor) { - return DefaultResourceCleaner.forGloballyCleanableResources( mainThreadExecutor, cleanupExecutor, retryStrategy) - .withPrioritizedCleanup(jobManagerRunnerRegistry) + .withPrioritizedCleanup(ofLocalResource(jobManagerRunnerRegistry)) .withRegularCleanup(jobGraphWriter) Review comment: I don't understand why we need to replicate things here. Shouldn't every local resources always be cleaned up on a global cleanup? Why can we not, in the case where currently we'd use the global cleaner, first call the local cleaner and then run the global one? Is there any use-case where a local resource should _not_ be cleaned up on a global terminal state? Is there any use-case where a local resource should be cleaned _after_ a global resource? (or vice-versa, where a global resource should be cleaned up _before_ a local resource). These question are primarily based on my assumption that the only benefit of duplicating everything is that it allows us to cleanup things in a specific order that local cleanup -> global cleanup wouldn't provide. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/LocallyCleanableResource.java ########## @@ -24,12 +24,11 @@ import java.util.concurrent.Executor; /** - * {@code LocallyCleanableResource} is supposed to be used by any class that provides artifacts for - * a given job that can be cleaned up locally. Artifacts considered to be local are located on the - * JobManager instance itself and won't survive a failover scenario. These artifacts are, in - * contrast to {@link GloballyCleanableResource} artifacts, going to be cleaned up even after the - * job reaches a locally-terminated state. + * {@code LocallyCleanableResource} is supposed to be implemented by any class that provides + * artifacts for a given job that need to be cleaned up after the job reached a local terminal Review comment: I'm not too fond of the phrasing because it contradicts what we're doing in this PR. The JMMG cleanup "_needs to be triggered for a global terminal state as well_", but does _not_ implement `GloballyCleanableResource`. I'd rather mention that local resources should be wrapped like in the DispatcherResourceCleanerFactory to ensure they are also called for global terminal jobs. Then we should file a follow-up ticket to clean this up such that every LocallyCleanableResource is automatically cleaned up on a globally terminal state. A simple approach for that could be to introduce a parent interface containing both cleanup definitions; The Local interface has a default global impl that calls the local cleanup; the Global interface has a default local impl that doesn't do anything. Then we adjust all data-structures to work against the parent, and call the respective method as needed based on the terminal state. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/LocallyCleanableResource.java ########## @@ -24,12 +24,11 @@ import java.util.concurrent.Executor; /** - * {@code LocallyCleanableResource} is supposed to be used by any class that provides artifacts for - * a given job that can be cleaned up locally. Artifacts considered to be local are located on the - * JobManager instance itself and won't survive a failover scenario. These artifacts are, in - * contrast to {@link GloballyCleanableResource} artifacts, going to be cleaned up even after the - * job reaches a locally-terminated state. + * {@code LocallyCleanableResource} is supposed to be implemented by any class that provides + * artifacts for a given job that need to be cleaned up after the job reached a local terminal Review comment: I'm not too fond of the phrasing because it contradicts what we're doing in this PR. The JMMG cleanup "_needs to be triggered for a global terminal state as well_", but does _not_ implement `GloballyCleanableResource`. I'd rather mention that local resources should be wrapped like in the DispatcherResourceCleanerFactory to ensure they are also called for global terminal jobs. Then we should file a follow-up ticket to clean this up such that every LocallyCleanableResource is automatically cleaned up on a globally terminal state. A simple approach for that could be to introduce a parent interface containing both cleanup methods; The Local interface has a default global impl that calls the local cleanup; the Global interface has a default local impl that doesn't do anything. Then we adjust all data-structures to work against the parent, and call the respective method as needed based on the terminal state. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/LocallyCleanableResource.java ########## @@ -24,12 +24,11 @@ import java.util.concurrent.Executor; /** - * {@code LocallyCleanableResource} is supposed to be used by any class that provides artifacts for - * a given job that can be cleaned up locally. Artifacts considered to be local are located on the - * JobManager instance itself and won't survive a failover scenario. These artifacts are, in - * contrast to {@link GloballyCleanableResource} artifacts, going to be cleaned up even after the - * job reaches a locally-terminated state. + * {@code LocallyCleanableResource} is supposed to be implemented by any class that provides + * artifacts for a given job that need to be cleaned up after the job reached a local terminal Review comment: I'm not too fond of the phrasing because it contradicts what we're doing in this PR. The JMMG cleanup "_needs to be triggered for a global terminal state as well_", but does _not_ implement `GloballyCleanableResource`. I'd rather mention that local resources should be wrapped like in the DispatcherResourceCleanerFactory to ensure they are also called for global terminal jobs. Then we should file a follow-up ticket to clean this up such that every LocallyCleanableResource is automatically cleaned up on a globally terminal state. One approach for that could be to introduce a parent interface containing both cleanup methods; The Local interface has a default global impl that calls the local cleanup; the Global interface has a default local impl that doesn't do anything. Then we adjust all data-structures to work against the parent, and call the respective method as needed based on the terminal state. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org