tillrohrmann commented on a change in pull request #11109:
URL: https://github.com/apache/flink/pull/11109#discussion_r414622511



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
##########
@@ -233,8 +233,7 @@ public void allocatePages(
                        Set<MemorySegment> segmentsForOwner = 
currentSegmentsForOwner == null ?
                                new HashSet<>(numberOfPages) : 
currentSegmentsForOwner;
                        for (long i = numberOfPages; i > 0; i--) {
-                               int size = getPageSize();
-                               MemorySegment segment = 
allocateOffHeapUnsafeMemory(size, owner, () -> 
memoryBudget.releaseMemory(size));
+                               MemorySegment segment = 
allocateOffHeapUnsafeMemory(getPageSize(), owner, this::releasePage);

Review comment:
       I think `this::releasePage` should also be pulled out of the loop.

##########
File path: 
flink-core/src/main/java/org/apache/flink/util/JavaGcCleanerWrapper.java
##########
@@ -43,326 +47,338 @@
        private static final Logger LOG = 
LoggerFactory.getLogger(JavaGcCleanerWrapper.class);
 
        private static final Collection<CleanerProvider> CLEANER_PROVIDERS =
-               Arrays.asList(LegacyCleanerProvider.INSTANCE, 
Java9CleanerProvider.INSTANCE);
-       private static final CleanerFactory CLEANER_FACTORY = findGcCleaner();
+               Arrays.asList(createLegacyCleanerProvider(), 
createJava9CleanerProvider());
+       private static final CleanerManager CLEANER_MANAGER = 
findGcCleanerManager();
+
+       private static CleanerProvider createLegacyCleanerProvider() {
+               String name = "Legacy (before Java 9) cleaner";
+               ReflectionUtils reflectionUtils = new ReflectionUtils(name + " 
provider");
+               String cleanerClassName = "sun.misc.Cleaner";
+               return new CleanerProvider(
+                       name,
+                       new CleanerFactoryProvider(
+                               name,
+                               reflectionUtils,
+                               cleanerClassName,
+                               Optional::empty, // there is no Cleaner object, 
static method of its class will be called to create it
+                               "create", // static method of Cleaner class to 
create it
+                               cleanerClassName, // Cleaner is Cleanable in 
this case
+                               "clean"),
+                       new PendingCleanersRunnerProvider(
+                               name,
+                               reflectionUtils,
+                               "sun.misc.SharedSecrets",
+                               "sun.misc.JavaLangRefAccess",
+                               "getJavaLangRefAccess",
+                               "tryHandlePendingReference"));
+       }
+
+       private static CleanerProvider createJava9CleanerProvider() {
+               String name = "New Java 9+ cleaner";
+               ReflectionUtils reflectionUtils = new ReflectionUtils(name + " 
provider");
+               String cleanerClassName = "java.lang.ref.Cleaner";
+               return new CleanerProvider(
+                       name,
+                       new CleanerFactoryProvider(
+                               name,
+                               reflectionUtils,
+                               cleanerClassName,
+                               () -> {
+                                       Class<?> cleanerClass = 
reflectionUtils.findClass(cleanerClassName);
+                                       Method cleanerCreateMethod = 
reflectionUtils.findMethod(cleanerClass, "create");
+                                       try {
+                                               return 
Optional.of(cleanerCreateMethod.invoke(null));
+                                       } catch (IllegalAccessException | 
InvocationTargetException e) {
+                                               throw new 
FlinkRuntimeException("Failed to create a Java 9 Cleaner", e);
+                                       }
+                               },
+                               "register",
+                               "java.lang.ref.Cleaner$Cleanable",
+                               "clean"),
+                       new PendingCleanersRunnerProvider(
+                               name,
+                               reflectionUtils,
+                               "jdk.internal.misc.SharedSecrets",
+                               "jdk.internal.misc.JavaLangRefAccess",
+                               "getJavaLangRefAccess",
+                               "waitForReferenceProcessing"));
+       }
 
-       private static CleanerFactory findGcCleaner() {
-               CleanerFactory foundCleanerFactory = null;
+       private static CleanerManager findGcCleanerManager() {
+               CleanerManager foundCleanerManager = null;
                Throwable t = null;
                for (CleanerProvider cleanerProvider : CLEANER_PROVIDERS) {
-                       //noinspection OverlyBroadCatchBlock
                        try {
-                               foundCleanerFactory = 
cleanerProvider.createCleanerFactory();
+                               foundCleanerManager = 
cleanerProvider.createCleanerManager();
                                break;
                        } catch (Throwable e) {
                                t = ExceptionUtils.firstOrSuppressed(e, t);
                        }
                }
 
-               if (foundCleanerFactory == null) {
+               if (foundCleanerManager == null) {
                        String errorMessage = String.format("Failed to find GC 
Cleaner among available providers: %s", CLEANER_PROVIDERS);
                        throw new Error(errorMessage, t);
                }
-               return foundCleanerFactory;
+               return foundCleanerManager;
        }
 
        public static Runnable create(Object owner, Runnable cleanOperation) {
-               return CLEANER_FACTORY.create(owner, cleanOperation);
+               return CLEANER_MANAGER.create(owner, cleanOperation);
        }
 
-       public static boolean waitForGcToRunReadyCleaners() throws 
InterruptedException {
-               return CLEANER_FACTORY.waitForGcToRunReadyCleaners();
+       public static boolean tryRunPendingCleaners() throws 
InterruptedException {
+               return CLEANER_MANAGER.tryRunPendingCleaners();
        }
 
-       private static Class<?> findClass(String className, String 
errorMessage) {
-               try {
-                       return Class.forName(className);
-               } catch (ClassNotFoundException e) {
-                       throw new FlinkRuntimeException(errorMessage, e);
+       private static class CleanerProvider {
+               private final String cleanerName;
+               private final CleanerFactoryProvider cleanerFactoryProvider;
+               private final PendingCleanersRunnerProvider 
pendingCleanersRunnerProvider;
+
+               private CleanerProvider(
+                               String cleanerName,
+                               CleanerFactoryProvider cleanerFactoryProvider,
+                               PendingCleanersRunnerProvider 
pendingCleanersRunnerProvider) {
+                       this.cleanerName = cleanerName;
+                       this.cleanerFactoryProvider = cleanerFactoryProvider;
+                       this.pendingCleanersRunnerProvider = 
pendingCleanersRunnerProvider;
                }
-       }
-
-       @FunctionalInterface
-       private interface CleanerProvider {
-               CleanerFactory createCleanerFactory() throws 
ClassNotFoundException;
-       }
 
-       private interface CleanerFactory {
-               Runnable create(Object owner, Runnable cleanOperation);
+               private CleanerManager createCleanerManager() {
+                       return new CleanerManager(
+                               cleanerName,
+                               cleanerFactoryProvider.createCleanerFactory(),
+                               
pendingCleanersRunnerProvider.createPendingCleanersRunner());
+               }
 
-               boolean waitForGcToRunReadyCleaners() throws 
InterruptedException;
+               @Override
+               public String toString() {
+                       return cleanerName + " provider";
+               }
        }
 
-       private enum LegacyCleanerProvider implements CleanerProvider {
-               INSTANCE;
-
-               private static final String LEGACY_CLEANER_CLASS_NAME = 
"sun.misc.Cleaner";
-               private static final String LEGACY_SHARED_SECRETS = 
"sun.misc.SharedSecrets";
-               private static final String LEGACY_JAVA_LANG_REF_ACCESS = 
"sun.misc.JavaLangRefAccess";
+       private static class CleanerManager {
+               private final String cleanerName;
+               private final CleanerFactory cleanerFactory;
+               private final PendingCleanersRunner pendingCleanersRunner;
+
+               private CleanerManager(
+                               String cleanerName,
+                               CleanerFactory cleanerFactory,
+                               PendingCleanersRunner pendingCleanersRunner) {
+                       this.cleanerName = cleanerName;
+                       this.cleanerFactory = cleanerFactory;
+                       this.pendingCleanersRunner = pendingCleanersRunner;
+               }
 
-               @Override
-               public CleanerFactory createCleanerFactory() {
-                       Class<?> cleanerClass = 
findClass(LEGACY_CLEANER_CLASS_NAME, "Failed to find Java legacy Cleaner 
class");
-                       Class<?> sharedSecretsClass = 
findClass(LEGACY_SHARED_SECRETS, "Failed to find Java legacy SharedSecrets 
class");
-                       Class<?> javaLangRefAccessClass = 
findClass(LEGACY_JAVA_LANG_REF_ACCESS, "Failed to find Java legacy 
JavaLangRefAccess class");
-                       Method cleanerCreateMethod = 
getCleanerCreateMethod(cleanerClass);
-                       Method cleanerCleanMethod = 
getCleanerCleanMethod(cleanerClass);
-                       Method getJavaLangRefAccessMethod = 
getSharedSecretsGetJavaLangRefAccessMethod(sharedSecretsClass);
-                       Method tryHandlePendingReferenceMethod = 
getJavaLangRefAccessTryHandlePendingReference(javaLangRefAccessClass);
-                       return new LegacyCleanerFactory(cleanerCreateMethod, 
cleanerCleanMethod, getJavaLangRefAccessMethod, 
tryHandlePendingReferenceMethod);
+               private Runnable create(Object owner, Runnable cleanOperation) {
+                       return cleanerFactory.create(owner, cleanOperation);
                }
 
-               private static Method getCleanerCreateMethod(Class<?> 
cleanerClass) {
-                       try {
-                               return cleanerClass.getMethod("create", 
Object.class, Runnable.class);
-                       } catch (NoSuchMethodException e) {
-                               throw new FlinkRuntimeException("Failed to find 
Java legacy Cleaner#create method", e);
-                       }
+               private boolean tryRunPendingCleaners() throws 
InterruptedException {
+                       return pendingCleanersRunner.tryRunPendingCleaners();
                }
 
-               private static Method getCleanerCleanMethod(Class<?> 
cleanerClass) {
-                       try {
-                               return cleanerClass.getMethod("clean");
-                       } catch (NoSuchMethodException e) {
-                               throw new FlinkRuntimeException("Failed to find 
Java legacy Cleaner#clean method", e);
-                       }
+               @Override
+               public String toString() {
+                       return cleanerName + " manager";
                }
+       }
 
-               private static Method 
getSharedSecretsGetJavaLangRefAccessMethod(Class<?> sharedSecretsClass) {
-                       try {
-                               return 
sharedSecretsClass.getMethod("getJavaLangRefAccess");
-                       } catch (NoSuchMethodException e) {
-                               throw new FlinkRuntimeException("Failed to find 
Java legacy SharedSecrets#getJavaLangRefAccess method", e);
-                       }
+       private static class CleanerFactoryProvider {
+               private final String cleanerName;
+               private final ReflectionUtils reflectionUtils;
+               private final String cleanerClassName;
+               private final Supplier<Optional<Object>> cleanerSupplier;
+               private final String cleanableCreationMethodName;
+               private final String cleanableClassName;
+               private final String cleanMethodName;
+
+               private CleanerFactoryProvider(
+                               String cleanerName,
+                               ReflectionUtils reflectionUtils,
+                               String cleanerClassName,
+                               Supplier<Optional<Object>> cleanerSupplier,
+                               String cleanableCreationMethodName, // Cleaner 
is a factory for Cleanable
+                               String cleanableClassName,
+                               String cleanMethodName) {
+                       this.cleanerName = cleanerName;
+                       this.reflectionUtils = reflectionUtils;
+                       this.cleanerClassName = cleanerClassName;
+                       this.cleanerSupplier = cleanerSupplier;
+                       this.cleanableCreationMethodName = 
cleanableCreationMethodName;
+                       this.cleanableClassName = cleanableClassName;
+                       this.cleanMethodName = cleanMethodName;
                }
 
-               private static Method 
getJavaLangRefAccessTryHandlePendingReference(Class<?> javaLangRefAccessClass) {
-                       try {
-                               return 
javaLangRefAccessClass.getMethod("tryHandlePendingReference");
-                       } catch (NoSuchMethodException e) {
-                               throw new FlinkRuntimeException("Failed to find 
Java legacy JavaLangRefAccess#tryHandlePendingReference method", e);
-                       }
+               private CleanerFactory createCleanerFactory() {
+                       Class<?> cleanerClass = 
reflectionUtils.findClass(cleanerClassName);
+                       Method cleanableCreationMethod = 
reflectionUtils.findMethod(
+                               cleanerClass,
+                               cleanableCreationMethodName,
+                               Object.class,
+                               Runnable.class);
+                       Class<?> cleanableClass = 
reflectionUtils.findClass(cleanableClassName);
+                       Method cleanMethod = 
reflectionUtils.findMethod(cleanableClass, cleanMethodName);
+                       return new CleanerFactory(
+                               cleanerName,
+                               cleanerSupplier.get().orElse(null), // static 
method of Cleaner class will be called to create Cleanable
+                               cleanableCreationMethod,
+                               cleanMethod);
                }
 
                @Override
                public String toString() {
-                       return "Legacy cleaner provider before Java 9 using " + 
LEGACY_CLEANER_CLASS_NAME;
+                       return cleanerName + " factory provider using " + 
cleanerClassName;
                }
        }
 
-       private static final class LegacyCleanerFactory implements 
CleanerFactory {
-               private final Method cleanerCreateMethod;
-               private final Method cleanerCleanMethod;
-               private final Method getJavaLangRefAccessMethod;
-               private final Method tryHandlePendingReferenceMethod;
-
-               private LegacyCleanerFactory(
-                               Method cleanerCreateMethod,
-                               Method cleanerCleanMethod,
-                               Method getJavaLangRefAccessMethod,
-                               Method tryHandlePendingReferenceMethod) {
-                       this.cleanerCreateMethod = cleanerCreateMethod;
-                       this.cleanerCleanMethod = cleanerCleanMethod;
-                       this.getJavaLangRefAccessMethod = 
getJavaLangRefAccessMethod;
-                       this.tryHandlePendingReferenceMethod = 
tryHandlePendingReferenceMethod;
+       private static class CleanerFactory {
+               private final String cleanerName;
+               private final Object cleaner;
+               private final Method cleanableCreationMethod;
+               private final Method cleanMethod;
+
+               private CleanerFactory(
+                       String cleanerName,
+                       Object cleaner,
+                       Method cleanableCreationMethod,
+                       Method cleanMethod) {
+                       this.cleanerName = cleanerName;
+                       this.cleaner = cleaner;
+                       this.cleanableCreationMethod = cleanableCreationMethod;
+                       this.cleanMethod = cleanMethod;
                }
 
-               @Override
-               public Runnable create(Object owner, Runnable cleanupOperation) 
{
-                       Object cleaner;
+               private Runnable create(Object owner, Runnable 
cleanupOperation) {
+                       Object cleanable;
                        try {
-                               cleaner = cleanerCreateMethod.invoke(null, 
owner, cleanupOperation);
+                               cleanable = 
cleanableCreationMethod.invoke(cleaner, owner, cleanupOperation);
                        } catch (IllegalAccessException | 
InvocationTargetException e) {
-                               throw new Error("Failed to create a Java legacy 
Cleaner", e);
+                               throw new Error("Failed to create a " + 
cleanerName, e);
                        }
                        String ownerString = owner.toString(); // lambda should 
not capture the owner object
                        return () -> {
                                try {
-                                       cleanerCleanMethod.invoke(cleaner);
+                                       cleanMethod.invoke(cleanable);
                                } catch (IllegalAccessException | 
InvocationTargetException e) {
-                                       String message = String.format("FATAL 
UNEXPECTED - Failed to invoke a Java legacy Cleaner for %s", ownerString);
+                                       String message = String.format("FATAL 
UNEXPECTED - Failed to invoke a %s for %s", cleanerName, ownerString);
                                        LOG.error(message, e);
                                        throw new Error(message, e);
                                }
                        };
                }
-
-               @Override
-               public boolean waitForGcToRunReadyCleaners() {
-                       Object javaLangRefAccess = getJavaLangRefAccess();
-                       try {
-                               return (Boolean) 
tryHandlePendingReferenceMethod.invoke(javaLangRefAccess);
-                       } catch (IllegalAccessException | 
InvocationTargetException e) {
-                               String message = "FATAL UNEXPECTED - Failed to 
invoke JavaLangRefAccess#tryHandlePending";
-                               LOG.error(message, e);
-                               throw new Error(message, e);
-                       }
-               }
-
-               private Object getJavaLangRefAccess() {
-                       try {
-                               return getJavaLangRefAccessMethod.invoke(null);
-                       } catch (IllegalAccessException | 
InvocationTargetException e) {
-                               String message = "FATAL UNEXPECTED - Failed to 
invoke SharedSecrets#getJavaLangRefAccess";
-                               LOG.error(message, e);
-                               throw new Error(message, e);
-                       }
-               }
        }
 
-       /** New cleaner provider for Java 9+. */
-       private enum Java9CleanerProvider implements CleanerProvider {
-               INSTANCE;
-
-               private static final String JAVA9_CLEANER_CLASS_NAME = 
"java.lang.ref.Cleaner";
-               private static final String LEGACY_SHARED_SECRETS = 
"jdk.internal.misc.SharedSecrets";
-               private static final String LEGACY_JAVA_LANG_REF_ACCESS = 
"jdk.internal.misc.JavaLangRefAccess";
-
-               @Override
-               public CleanerFactory createCleanerFactory() {
-                       Class<?> cleanerClass = 
findClass(JAVA9_CLEANER_CLASS_NAME, "Failed to find Java 9 Cleaner class");
-                       Class<?> sharedSecretsClass = 
findClass(LEGACY_SHARED_SECRETS, "Failed to find Java 9 SharedSecrets class");
-                       Class<?> javaLangRefAccessClass = 
findClass(LEGACY_JAVA_LANG_REF_ACCESS, "Failed to find Java 9 JavaLangRefAccess 
class");
-                       Method cleanerCreateMethod = 
getCleanerCreateMethod(cleanerClass);
-                       Object cleaner = createCleaner(cleanerCreateMethod);
-                       Method cleanerRegisterMethod = 
getCleanerRegisterMethod(cleanerClass);
-                       Class<?> cleanableClass = findCleanableClass();
-                       Method cleanMethod = getCleanMethod(cleanableClass);
-                       Method getJavaLangRefAccessMethod = 
getSharedSecretsGetJavaLangRefAccessMethod(sharedSecretsClass);
-                       Method waitForReferenceProcessingMethod = 
getJavaLangRefAccessWaitForReferenceProcessing(javaLangRefAccessClass);
-                       return new Java9CleanerFactory(
-                               cleaner,
-                               cleanerRegisterMethod,
-                               cleanMethod,
-                               getJavaLangRefAccessMethod,
-                               waitForReferenceProcessingMethod);
+       private static class PendingCleanersRunnerProvider {
+               private final String cleanerName;
+               private final ReflectionUtils reflectionUtils;
+               private final String sharedSecretsClassName;
+               private final String javaLangRefAccessClassName;
+               private final String getJavaLangRefAccessName;
+               private final String tryHandlePendingReferenceName;
+
+               private PendingCleanersRunnerProvider(
+                               String cleanerName,
+                               ReflectionUtils reflectionUtils,
+                               String sharedSecretsClassName,
+                               String javaLangRefAccessClassName,
+                               String getJavaLangRefAccessName,
+                               String tryHandlePendingReferenceName) {
+                       this.cleanerName = cleanerName;
+                       this.reflectionUtils = reflectionUtils;
+                       this.sharedSecretsClassName = sharedSecretsClassName;
+                       this.javaLangRefAccessClassName = 
javaLangRefAccessClassName;
+                       this.getJavaLangRefAccessName = 
getJavaLangRefAccessName;
+                       this.tryHandlePendingReferenceName = 
tryHandlePendingReferenceName;
                }
 
-               private static Method getCleanerCreateMethod(Class<?> 
cleanerClass) {
-                       try {
-                               return cleanerClass.getMethod("create");
-                       } catch (NoSuchMethodException e) {
-                               throw new FlinkRuntimeException("Failed to find 
Java 9 Cleaner#create method", e);
-                       }
+               private PendingCleanersRunner createPendingCleanersRunner() {
+                       Class<?> sharedSecretsClass = 
reflectionUtils.findClass(sharedSecretsClassName);
+                       Class<?> javaLangRefAccessClass = 
reflectionUtils.findClass(javaLangRefAccessClassName);
+                       Method getJavaLangRefAccessMethod = 
reflectionUtils.findMethod(sharedSecretsClass, getJavaLangRefAccessName);
+                       Method tryHandlePendingReferenceMethod = 
reflectionUtils.findMethod(
+                               javaLangRefAccessClass,
+                               tryHandlePendingReferenceName);
+                       return new 
PendingCleanersRunner(getJavaLangRefAccessMethod, 
tryHandlePendingReferenceMethod);
                }
 
-               private static Object createCleaner(Method cleanerCreateMethod) 
{
-                       try {
-                               return cleanerCreateMethod.invoke(null);
-                       } catch (IllegalAccessException | 
InvocationTargetException e) {
-                               throw new FlinkRuntimeException("Failed to 
create a Java 9 Cleaner", e);
-                       }
+               @Override
+               public String toString() {
+                       return "Pending " + cleanerName + "s runner provider";
                }
+       }
 
-               private static Method getCleanerRegisterMethod(Class<?> 
cleanerClass) {
-                       try {
-                               return cleanerClass.getMethod("register", 
Object.class, Runnable.class);
-                       } catch (NoSuchMethodException e) {
-                               throw new FlinkRuntimeException("Failed to find 
Java 9 Cleaner#create method", e);
-                       }
-               }
+       private static class PendingCleanersRunner {
+               private final Method getJavaLangRefAccessMethod;
+               private final Method waitForReferenceProcessingMethod;
 
-               private static Class<?> findCleanableClass() {
-                       try {
-                               return 
Class.forName("java.lang.ref.Cleaner$Cleanable");
-                       } catch (ClassNotFoundException e) {
-                               throw new FlinkRuntimeException("Failed to find 
Java 9 Cleaner#Cleanable class", e);
-                       }
+               private PendingCleanersRunner(Method 
getJavaLangRefAccessMethod, Method waitForReferenceProcessingMethod) {
+                       this.getJavaLangRefAccessMethod = 
getJavaLangRefAccessMethod;
+                       this.waitForReferenceProcessingMethod = 
waitForReferenceProcessingMethod;
                }
 
-               private static Method getCleanMethod(Class<?> cleanableClass) {
+               private boolean tryRunPendingCleaners() throws 
InterruptedException {
+                       Object javaLangRefAccess = getJavaLangRefAccess();
                        try {
-                               return cleanableClass.getMethod("clean");
-                       } catch (NoSuchMethodException e) {
-                               throw new FlinkRuntimeException("Failed to find 
Java 9 Cleaner$Cleanable#clean method", e);
+                               return (Boolean) 
waitForReferenceProcessingMethod.invoke(javaLangRefAccess);
+                       } catch (IllegalAccessException | 
InvocationTargetException e) {
+                               throwIfCauseIsInterruptedException(e);
+                               return throwInvokationError(e, 
javaLangRefAccess, waitForReferenceProcessingMethod);
                        }
                }
 
-               private static Method 
getSharedSecretsGetJavaLangRefAccessMethod(Class<?> sharedSecretsClass) {
+               private Object getJavaLangRefAccess() {
                        try {
-                               return 
sharedSecretsClass.getMethod("getJavaLangRefAccess");
-                       } catch (NoSuchMethodException e) {
-                               throw new FlinkRuntimeException("Failed to find 
Java 9 SharedSecrets#getJavaLangRefAccess method", e);
+                               return getJavaLangRefAccessMethod.invoke(null);
+                       } catch (IllegalAccessException | 
InvocationTargetException e) {
+                               return throwInvokationError(e, null, 
waitForReferenceProcessingMethod);
                        }
                }
 
-               private static Method 
getJavaLangRefAccessWaitForReferenceProcessing(Class<?> javaLangRefAccessClass) 
{
-                       try {
-                               return 
javaLangRefAccessClass.getMethod("waitForReferenceProcessing");
-                       } catch (NoSuchMethodException e) {
-                               throw new FlinkRuntimeException("Failed to find 
Java 9 JavaLangRefAccess#waitForReferenceProcessing method", e);
+               private static void 
throwIfCauseIsInterruptedException(Throwable t) throws InterruptedException {
+                       // if the original wrapped method can throw 
InterruptedException
+                       // then we may want to explicitly handle in the user 
code for certain implementations
+                       if (t.getCause() instanceof InterruptedException) {
+                               throw (InterruptedException) t.getCause();
                        }
                }
 
-               @Override
-               public String toString() {
-                       return "New cleaner provider for Java 9+" + 
JAVA9_CLEANER_CLASS_NAME;
+               private static <T> T throwInvokationError(Throwable t, 
@Nullable Object obj, Method method) {

Review comment:
       ```suggestion
                private static <T> T throwInvocationError(Throwable t, 
@Nullable Object obj, Method method) {
   ```

##########
File path: 
flink-core/src/main/java/org/apache/flink/util/JavaGcCleanerWrapper.java
##########
@@ -43,326 +47,338 @@
        private static final Logger LOG = 
LoggerFactory.getLogger(JavaGcCleanerWrapper.class);
 
        private static final Collection<CleanerProvider> CLEANER_PROVIDERS =
-               Arrays.asList(LegacyCleanerProvider.INSTANCE, 
Java9CleanerProvider.INSTANCE);
-       private static final CleanerFactory CLEANER_FACTORY = findGcCleaner();
+               Arrays.asList(createLegacyCleanerProvider(), 
createJava9CleanerProvider());
+       private static final CleanerManager CLEANER_MANAGER = 
findGcCleanerManager();
+
+       private static CleanerProvider createLegacyCleanerProvider() {
+               String name = "Legacy (before Java 9) cleaner";
+               ReflectionUtils reflectionUtils = new ReflectionUtils(name + " 
provider");
+               String cleanerClassName = "sun.misc.Cleaner";
+               return new CleanerProvider(
+                       name,
+                       new CleanerFactoryProvider(
+                               name,
+                               reflectionUtils,
+                               cleanerClassName,
+                               Optional::empty, // there is no Cleaner object, 
static method of its class will be called to create it
+                               "create", // static method of Cleaner class to 
create it
+                               cleanerClassName, // Cleaner is Cleanable in 
this case
+                               "clean"),
+                       new PendingCleanersRunnerProvider(
+                               name,
+                               reflectionUtils,
+                               "sun.misc.SharedSecrets",
+                               "sun.misc.JavaLangRefAccess",
+                               "getJavaLangRefAccess",
+                               "tryHandlePendingReference"));
+       }
+
+       private static CleanerProvider createJava9CleanerProvider() {
+               String name = "New Java 9+ cleaner";
+               ReflectionUtils reflectionUtils = new ReflectionUtils(name + " 
provider");
+               String cleanerClassName = "java.lang.ref.Cleaner";
+               return new CleanerProvider(
+                       name,
+                       new CleanerFactoryProvider(
+                               name,
+                               reflectionUtils,
+                               cleanerClassName,
+                               () -> {
+                                       Class<?> cleanerClass = 
reflectionUtils.findClass(cleanerClassName);
+                                       Method cleanerCreateMethod = 
reflectionUtils.findMethod(cleanerClass, "create");
+                                       try {
+                                               return 
Optional.of(cleanerCreateMethod.invoke(null));
+                                       } catch (IllegalAccessException | 
InvocationTargetException e) {
+                                               throw new 
FlinkRuntimeException("Failed to create a Java 9 Cleaner", e);
+                                       }
+                               },
+                               "register",
+                               "java.lang.ref.Cleaner$Cleanable",
+                               "clean"),
+                       new PendingCleanersRunnerProvider(
+                               name,
+                               reflectionUtils,
+                               "jdk.internal.misc.SharedSecrets",
+                               "jdk.internal.misc.JavaLangRefAccess",
+                               "getJavaLangRefAccess",
+                               "waitForReferenceProcessing"));
+       }
 
-       private static CleanerFactory findGcCleaner() {
-               CleanerFactory foundCleanerFactory = null;
+       private static CleanerManager findGcCleanerManager() {
+               CleanerManager foundCleanerManager = null;
                Throwable t = null;
                for (CleanerProvider cleanerProvider : CLEANER_PROVIDERS) {
-                       //noinspection OverlyBroadCatchBlock
                        try {
-                               foundCleanerFactory = 
cleanerProvider.createCleanerFactory();
+                               foundCleanerManager = 
cleanerProvider.createCleanerManager();
                                break;
                        } catch (Throwable e) {
                                t = ExceptionUtils.firstOrSuppressed(e, t);
                        }
                }
 
-               if (foundCleanerFactory == null) {
+               if (foundCleanerManager == null) {
                        String errorMessage = String.format("Failed to find GC 
Cleaner among available providers: %s", CLEANER_PROVIDERS);
                        throw new Error(errorMessage, t);
                }
-               return foundCleanerFactory;
+               return foundCleanerManager;
        }
 
        public static Runnable create(Object owner, Runnable cleanOperation) {
-               return CLEANER_FACTORY.create(owner, cleanOperation);
+               return CLEANER_MANAGER.create(owner, cleanOperation);
        }
 
-       public static boolean waitForGcToRunReadyCleaners() throws 
InterruptedException {
-               return CLEANER_FACTORY.waitForGcToRunReadyCleaners();
+       public static boolean tryRunPendingCleaners() throws 
InterruptedException {
+               return CLEANER_MANAGER.tryRunPendingCleaners();
        }
 
-       private static Class<?> findClass(String className, String 
errorMessage) {
-               try {
-                       return Class.forName(className);
-               } catch (ClassNotFoundException e) {
-                       throw new FlinkRuntimeException(errorMessage, e);
+       private static class CleanerProvider {
+               private final String cleanerName;
+               private final CleanerFactoryProvider cleanerFactoryProvider;
+               private final PendingCleanersRunnerProvider 
pendingCleanersRunnerProvider;
+
+               private CleanerProvider(
+                               String cleanerName,
+                               CleanerFactoryProvider cleanerFactoryProvider,
+                               PendingCleanersRunnerProvider 
pendingCleanersRunnerProvider) {
+                       this.cleanerName = cleanerName;
+                       this.cleanerFactoryProvider = cleanerFactoryProvider;
+                       this.pendingCleanersRunnerProvider = 
pendingCleanersRunnerProvider;
                }
-       }
-
-       @FunctionalInterface
-       private interface CleanerProvider {
-               CleanerFactory createCleanerFactory() throws 
ClassNotFoundException;
-       }
 
-       private interface CleanerFactory {
-               Runnable create(Object owner, Runnable cleanOperation);
+               private CleanerManager createCleanerManager() {
+                       return new CleanerManager(
+                               cleanerName,
+                               cleanerFactoryProvider.createCleanerFactory(),
+                               
pendingCleanersRunnerProvider.createPendingCleanersRunner());
+               }
 
-               boolean waitForGcToRunReadyCleaners() throws 
InterruptedException;
+               @Override
+               public String toString() {
+                       return cleanerName + " provider";
+               }
        }
 
-       private enum LegacyCleanerProvider implements CleanerProvider {
-               INSTANCE;
-
-               private static final String LEGACY_CLEANER_CLASS_NAME = 
"sun.misc.Cleaner";
-               private static final String LEGACY_SHARED_SECRETS = 
"sun.misc.SharedSecrets";
-               private static final String LEGACY_JAVA_LANG_REF_ACCESS = 
"sun.misc.JavaLangRefAccess";
+       private static class CleanerManager {
+               private final String cleanerName;
+               private final CleanerFactory cleanerFactory;
+               private final PendingCleanersRunner pendingCleanersRunner;
+
+               private CleanerManager(
+                               String cleanerName,
+                               CleanerFactory cleanerFactory,
+                               PendingCleanersRunner pendingCleanersRunner) {
+                       this.cleanerName = cleanerName;
+                       this.cleanerFactory = cleanerFactory;
+                       this.pendingCleanersRunner = pendingCleanersRunner;
+               }
 
-               @Override
-               public CleanerFactory createCleanerFactory() {
-                       Class<?> cleanerClass = 
findClass(LEGACY_CLEANER_CLASS_NAME, "Failed to find Java legacy Cleaner 
class");
-                       Class<?> sharedSecretsClass = 
findClass(LEGACY_SHARED_SECRETS, "Failed to find Java legacy SharedSecrets 
class");
-                       Class<?> javaLangRefAccessClass = 
findClass(LEGACY_JAVA_LANG_REF_ACCESS, "Failed to find Java legacy 
JavaLangRefAccess class");
-                       Method cleanerCreateMethod = 
getCleanerCreateMethod(cleanerClass);
-                       Method cleanerCleanMethod = 
getCleanerCleanMethod(cleanerClass);
-                       Method getJavaLangRefAccessMethod = 
getSharedSecretsGetJavaLangRefAccessMethod(sharedSecretsClass);
-                       Method tryHandlePendingReferenceMethod = 
getJavaLangRefAccessTryHandlePendingReference(javaLangRefAccessClass);
-                       return new LegacyCleanerFactory(cleanerCreateMethod, 
cleanerCleanMethod, getJavaLangRefAccessMethod, 
tryHandlePendingReferenceMethod);
+               private Runnable create(Object owner, Runnable cleanOperation) {
+                       return cleanerFactory.create(owner, cleanOperation);
                }
 
-               private static Method getCleanerCreateMethod(Class<?> 
cleanerClass) {
-                       try {
-                               return cleanerClass.getMethod("create", 
Object.class, Runnable.class);
-                       } catch (NoSuchMethodException e) {
-                               throw new FlinkRuntimeException("Failed to find 
Java legacy Cleaner#create method", e);
-                       }
+               private boolean tryRunPendingCleaners() throws 
InterruptedException {
+                       return pendingCleanersRunner.tryRunPendingCleaners();
                }
 
-               private static Method getCleanerCleanMethod(Class<?> 
cleanerClass) {
-                       try {
-                               return cleanerClass.getMethod("clean");
-                       } catch (NoSuchMethodException e) {
-                               throw new FlinkRuntimeException("Failed to find 
Java legacy Cleaner#clean method", e);
-                       }
+               @Override
+               public String toString() {
+                       return cleanerName + " manager";
                }
+       }
 
-               private static Method 
getSharedSecretsGetJavaLangRefAccessMethod(Class<?> sharedSecretsClass) {
-                       try {
-                               return 
sharedSecretsClass.getMethod("getJavaLangRefAccess");
-                       } catch (NoSuchMethodException e) {
-                               throw new FlinkRuntimeException("Failed to find 
Java legacy SharedSecrets#getJavaLangRefAccess method", e);
-                       }
+       private static class CleanerFactoryProvider {
+               private final String cleanerName;
+               private final ReflectionUtils reflectionUtils;
+               private final String cleanerClassName;
+               private final Supplier<Optional<Object>> cleanerSupplier;
+               private final String cleanableCreationMethodName;
+               private final String cleanableClassName;
+               private final String cleanMethodName;
+
+               private CleanerFactoryProvider(
+                               String cleanerName,
+                               ReflectionUtils reflectionUtils,
+                               String cleanerClassName,
+                               Supplier<Optional<Object>> cleanerSupplier,
+                               String cleanableCreationMethodName, // Cleaner 
is a factory for Cleanable
+                               String cleanableClassName,
+                               String cleanMethodName) {
+                       this.cleanerName = cleanerName;
+                       this.reflectionUtils = reflectionUtils;
+                       this.cleanerClassName = cleanerClassName;
+                       this.cleanerSupplier = cleanerSupplier;
+                       this.cleanableCreationMethodName = 
cleanableCreationMethodName;
+                       this.cleanableClassName = cleanableClassName;
+                       this.cleanMethodName = cleanMethodName;
                }
 
-               private static Method 
getJavaLangRefAccessTryHandlePendingReference(Class<?> javaLangRefAccessClass) {
-                       try {
-                               return 
javaLangRefAccessClass.getMethod("tryHandlePendingReference");
-                       } catch (NoSuchMethodException e) {
-                               throw new FlinkRuntimeException("Failed to find 
Java legacy JavaLangRefAccess#tryHandlePendingReference method", e);
-                       }
+               private CleanerFactory createCleanerFactory() {
+                       Class<?> cleanerClass = 
reflectionUtils.findClass(cleanerClassName);
+                       Method cleanableCreationMethod = 
reflectionUtils.findMethod(
+                               cleanerClass,
+                               cleanableCreationMethodName,
+                               Object.class,
+                               Runnable.class);
+                       Class<?> cleanableClass = 
reflectionUtils.findClass(cleanableClassName);
+                       Method cleanMethod = 
reflectionUtils.findMethod(cleanableClass, cleanMethodName);
+                       return new CleanerFactory(
+                               cleanerName,
+                               cleanerSupplier.get().orElse(null), // static 
method of Cleaner class will be called to create Cleanable
+                               cleanableCreationMethod,
+                               cleanMethod);
                }
 
                @Override
                public String toString() {
-                       return "Legacy cleaner provider before Java 9 using " + 
LEGACY_CLEANER_CLASS_NAME;
+                       return cleanerName + " factory provider using " + 
cleanerClassName;
                }
        }
 
-       private static final class LegacyCleanerFactory implements 
CleanerFactory {
-               private final Method cleanerCreateMethod;
-               private final Method cleanerCleanMethod;
-               private final Method getJavaLangRefAccessMethod;
-               private final Method tryHandlePendingReferenceMethod;
-
-               private LegacyCleanerFactory(
-                               Method cleanerCreateMethod,
-                               Method cleanerCleanMethod,
-                               Method getJavaLangRefAccessMethod,
-                               Method tryHandlePendingReferenceMethod) {
-                       this.cleanerCreateMethod = cleanerCreateMethod;
-                       this.cleanerCleanMethod = cleanerCleanMethod;
-                       this.getJavaLangRefAccessMethod = 
getJavaLangRefAccessMethod;
-                       this.tryHandlePendingReferenceMethod = 
tryHandlePendingReferenceMethod;
+       private static class CleanerFactory {
+               private final String cleanerName;
+               private final Object cleaner;
+               private final Method cleanableCreationMethod;
+               private final Method cleanMethod;
+
+               private CleanerFactory(
+                       String cleanerName,
+                       Object cleaner,

Review comment:
       ```suggestion
                        @Nullable Object cleaner,
   ```

##########
File path: 
flink-core/src/main/java/org/apache/flink/util/JavaGcCleanerWrapper.java
##########
@@ -43,326 +47,338 @@
        private static final Logger LOG = 
LoggerFactory.getLogger(JavaGcCleanerWrapper.class);
 
        private static final Collection<CleanerProvider> CLEANER_PROVIDERS =
-               Arrays.asList(LegacyCleanerProvider.INSTANCE, 
Java9CleanerProvider.INSTANCE);
-       private static final CleanerFactory CLEANER_FACTORY = findGcCleaner();
+               Arrays.asList(createLegacyCleanerProvider(), 
createJava9CleanerProvider());
+       private static final CleanerManager CLEANER_MANAGER = 
findGcCleanerManager();
+
+       private static CleanerProvider createLegacyCleanerProvider() {
+               String name = "Legacy (before Java 9) cleaner";
+               ReflectionUtils reflectionUtils = new ReflectionUtils(name + " 
provider");
+               String cleanerClassName = "sun.misc.Cleaner";
+               return new CleanerProvider(
+                       name,
+                       new CleanerFactoryProvider(
+                               name,
+                               reflectionUtils,
+                               cleanerClassName,
+                               Optional::empty, // there is no Cleaner object, 
static method of its class will be called to create it
+                               "create", // static method of Cleaner class to 
create it
+                               cleanerClassName, // Cleaner is Cleanable in 
this case
+                               "clean"),
+                       new PendingCleanersRunnerProvider(
+                               name,
+                               reflectionUtils,
+                               "sun.misc.SharedSecrets",
+                               "sun.misc.JavaLangRefAccess",
+                               "getJavaLangRefAccess",
+                               "tryHandlePendingReference"));
+       }
+
+       private static CleanerProvider createJava9CleanerProvider() {
+               String name = "New Java 9+ cleaner";
+               ReflectionUtils reflectionUtils = new ReflectionUtils(name + " 
provider");
+               String cleanerClassName = "java.lang.ref.Cleaner";
+               return new CleanerProvider(
+                       name,
+                       new CleanerFactoryProvider(
+                               name,
+                               reflectionUtils,
+                               cleanerClassName,
+                               () -> {
+                                       Class<?> cleanerClass = 
reflectionUtils.findClass(cleanerClassName);
+                                       Method cleanerCreateMethod = 
reflectionUtils.findMethod(cleanerClass, "create");
+                                       try {
+                                               return 
Optional.of(cleanerCreateMethod.invoke(null));
+                                       } catch (IllegalAccessException | 
InvocationTargetException e) {
+                                               throw new 
FlinkRuntimeException("Failed to create a Java 9 Cleaner", e);
+                                       }
+                               },
+                               "register",
+                               "java.lang.ref.Cleaner$Cleanable",
+                               "clean"),
+                       new PendingCleanersRunnerProvider(
+                               name,
+                               reflectionUtils,
+                               "jdk.internal.misc.SharedSecrets",
+                               "jdk.internal.misc.JavaLangRefAccess",
+                               "getJavaLangRefAccess",
+                               "waitForReferenceProcessing"));
+       }
 
-       private static CleanerFactory findGcCleaner() {
-               CleanerFactory foundCleanerFactory = null;
+       private static CleanerManager findGcCleanerManager() {
+               CleanerManager foundCleanerManager = null;
                Throwable t = null;
                for (CleanerProvider cleanerProvider : CLEANER_PROVIDERS) {
-                       //noinspection OverlyBroadCatchBlock
                        try {
-                               foundCleanerFactory = 
cleanerProvider.createCleanerFactory();
+                               foundCleanerManager = 
cleanerProvider.createCleanerManager();
                                break;
                        } catch (Throwable e) {
                                t = ExceptionUtils.firstOrSuppressed(e, t);
                        }
                }
 
-               if (foundCleanerFactory == null) {
+               if (foundCleanerManager == null) {
                        String errorMessage = String.format("Failed to find GC 
Cleaner among available providers: %s", CLEANER_PROVIDERS);
                        throw new Error(errorMessage, t);
                }
-               return foundCleanerFactory;
+               return foundCleanerManager;
        }
 
        public static Runnable create(Object owner, Runnable cleanOperation) {

Review comment:
       Maybe rename into `createCleaner`

##########
File path: 
flink-core/src/main/java/org/apache/flink/util/JavaGcCleanerWrapper.java
##########
@@ -43,326 +47,338 @@
        private static final Logger LOG = 
LoggerFactory.getLogger(JavaGcCleanerWrapper.class);
 
        private static final Collection<CleanerProvider> CLEANER_PROVIDERS =
-               Arrays.asList(LegacyCleanerProvider.INSTANCE, 
Java9CleanerProvider.INSTANCE);
-       private static final CleanerFactory CLEANER_FACTORY = findGcCleaner();
+               Arrays.asList(createLegacyCleanerProvider(), 
createJava9CleanerProvider());
+       private static final CleanerManager CLEANER_MANAGER = 
findGcCleanerManager();
+
+       private static CleanerProvider createLegacyCleanerProvider() {
+               String name = "Legacy (before Java 9) cleaner";
+               ReflectionUtils reflectionUtils = new ReflectionUtils(name + " 
provider");
+               String cleanerClassName = "sun.misc.Cleaner";
+               return new CleanerProvider(
+                       name,
+                       new CleanerFactoryProvider(
+                               name,
+                               reflectionUtils,
+                               cleanerClassName,
+                               Optional::empty, // there is no Cleaner object, 
static method of its class will be called to create it
+                               "create", // static method of Cleaner class to 
create it
+                               cleanerClassName, // Cleaner is Cleanable in 
this case
+                               "clean"),
+                       new PendingCleanersRunnerProvider(
+                               name,
+                               reflectionUtils,
+                               "sun.misc.SharedSecrets",
+                               "sun.misc.JavaLangRefAccess",
+                               "getJavaLangRefAccess",
+                               "tryHandlePendingReference"));
+       }
+
+       private static CleanerProvider createJava9CleanerProvider() {
+               String name = "New Java 9+ cleaner";
+               ReflectionUtils reflectionUtils = new ReflectionUtils(name + " 
provider");
+               String cleanerClassName = "java.lang.ref.Cleaner";
+               return new CleanerProvider(
+                       name,
+                       new CleanerFactoryProvider(
+                               name,
+                               reflectionUtils,
+                               cleanerClassName,
+                               () -> {
+                                       Class<?> cleanerClass = 
reflectionUtils.findClass(cleanerClassName);
+                                       Method cleanerCreateMethod = 
reflectionUtils.findMethod(cleanerClass, "create");
+                                       try {
+                                               return 
Optional.of(cleanerCreateMethod.invoke(null));
+                                       } catch (IllegalAccessException | 
InvocationTargetException e) {
+                                               throw new 
FlinkRuntimeException("Failed to create a Java 9 Cleaner", e);
+                                       }
+                               },
+                               "register",
+                               "java.lang.ref.Cleaner$Cleanable",
+                               "clean"),
+                       new PendingCleanersRunnerProvider(
+                               name,
+                               reflectionUtils,
+                               "jdk.internal.misc.SharedSecrets",
+                               "jdk.internal.misc.JavaLangRefAccess",
+                               "getJavaLangRefAccess",
+                               "waitForReferenceProcessing"));
+       }
 
-       private static CleanerFactory findGcCleaner() {
-               CleanerFactory foundCleanerFactory = null;
+       private static CleanerManager findGcCleanerManager() {
+               CleanerManager foundCleanerManager = null;
                Throwable t = null;
                for (CleanerProvider cleanerProvider : CLEANER_PROVIDERS) {
-                       //noinspection OverlyBroadCatchBlock
                        try {
-                               foundCleanerFactory = 
cleanerProvider.createCleanerFactory();
+                               foundCleanerManager = 
cleanerProvider.createCleanerManager();
                                break;
                        } catch (Throwable e) {
                                t = ExceptionUtils.firstOrSuppressed(e, t);
                        }
                }
 
-               if (foundCleanerFactory == null) {
+               if (foundCleanerManager == null) {
                        String errorMessage = String.format("Failed to find GC 
Cleaner among available providers: %s", CLEANER_PROVIDERS);
                        throw new Error(errorMessage, t);
                }
-               return foundCleanerFactory;
+               return foundCleanerManager;
        }
 
        public static Runnable create(Object owner, Runnable cleanOperation) {
-               return CLEANER_FACTORY.create(owner, cleanOperation);
+               return CLEANER_MANAGER.create(owner, cleanOperation);
        }
 
-       public static boolean waitForGcToRunReadyCleaners() throws 
InterruptedException {
-               return CLEANER_FACTORY.waitForGcToRunReadyCleaners();
+       public static boolean tryRunPendingCleaners() throws 
InterruptedException {
+               return CLEANER_MANAGER.tryRunPendingCleaners();
        }
 
-       private static Class<?> findClass(String className, String 
errorMessage) {
-               try {
-                       return Class.forName(className);
-               } catch (ClassNotFoundException e) {
-                       throw new FlinkRuntimeException(errorMessage, e);
+       private static class CleanerProvider {
+               private final String cleanerName;
+               private final CleanerFactoryProvider cleanerFactoryProvider;
+               private final PendingCleanersRunnerProvider 
pendingCleanersRunnerProvider;
+
+               private CleanerProvider(
+                               String cleanerName,
+                               CleanerFactoryProvider cleanerFactoryProvider,
+                               PendingCleanersRunnerProvider 
pendingCleanersRunnerProvider) {
+                       this.cleanerName = cleanerName;
+                       this.cleanerFactoryProvider = cleanerFactoryProvider;
+                       this.pendingCleanersRunnerProvider = 
pendingCleanersRunnerProvider;
                }
-       }
-
-       @FunctionalInterface
-       private interface CleanerProvider {
-               CleanerFactory createCleanerFactory() throws 
ClassNotFoundException;
-       }
 
-       private interface CleanerFactory {
-               Runnable create(Object owner, Runnable cleanOperation);
+               private CleanerManager createCleanerManager() {
+                       return new CleanerManager(
+                               cleanerName,
+                               cleanerFactoryProvider.createCleanerFactory(),
+                               
pendingCleanersRunnerProvider.createPendingCleanersRunner());
+               }
 
-               boolean waitForGcToRunReadyCleaners() throws 
InterruptedException;
+               @Override
+               public String toString() {
+                       return cleanerName + " provider";
+               }
        }
 
-       private enum LegacyCleanerProvider implements CleanerProvider {
-               INSTANCE;
-
-               private static final String LEGACY_CLEANER_CLASS_NAME = 
"sun.misc.Cleaner";
-               private static final String LEGACY_SHARED_SECRETS = 
"sun.misc.SharedSecrets";
-               private static final String LEGACY_JAVA_LANG_REF_ACCESS = 
"sun.misc.JavaLangRefAccess";
+       private static class CleanerManager {
+               private final String cleanerName;
+               private final CleanerFactory cleanerFactory;
+               private final PendingCleanersRunner pendingCleanersRunner;
+
+               private CleanerManager(
+                               String cleanerName,
+                               CleanerFactory cleanerFactory,
+                               PendingCleanersRunner pendingCleanersRunner) {
+                       this.cleanerName = cleanerName;
+                       this.cleanerFactory = cleanerFactory;
+                       this.pendingCleanersRunner = pendingCleanersRunner;
+               }
 
-               @Override
-               public CleanerFactory createCleanerFactory() {
-                       Class<?> cleanerClass = 
findClass(LEGACY_CLEANER_CLASS_NAME, "Failed to find Java legacy Cleaner 
class");
-                       Class<?> sharedSecretsClass = 
findClass(LEGACY_SHARED_SECRETS, "Failed to find Java legacy SharedSecrets 
class");
-                       Class<?> javaLangRefAccessClass = 
findClass(LEGACY_JAVA_LANG_REF_ACCESS, "Failed to find Java legacy 
JavaLangRefAccess class");
-                       Method cleanerCreateMethod = 
getCleanerCreateMethod(cleanerClass);
-                       Method cleanerCleanMethod = 
getCleanerCleanMethod(cleanerClass);
-                       Method getJavaLangRefAccessMethod = 
getSharedSecretsGetJavaLangRefAccessMethod(sharedSecretsClass);
-                       Method tryHandlePendingReferenceMethod = 
getJavaLangRefAccessTryHandlePendingReference(javaLangRefAccessClass);
-                       return new LegacyCleanerFactory(cleanerCreateMethod, 
cleanerCleanMethod, getJavaLangRefAccessMethod, 
tryHandlePendingReferenceMethod);
+               private Runnable create(Object owner, Runnable cleanOperation) {
+                       return cleanerFactory.create(owner, cleanOperation);
                }
 
-               private static Method getCleanerCreateMethod(Class<?> 
cleanerClass) {
-                       try {
-                               return cleanerClass.getMethod("create", 
Object.class, Runnable.class);
-                       } catch (NoSuchMethodException e) {
-                               throw new FlinkRuntimeException("Failed to find 
Java legacy Cleaner#create method", e);
-                       }
+               private boolean tryRunPendingCleaners() throws 
InterruptedException {
+                       return pendingCleanersRunner.tryRunPendingCleaners();
                }
 
-               private static Method getCleanerCleanMethod(Class<?> 
cleanerClass) {
-                       try {
-                               return cleanerClass.getMethod("clean");
-                       } catch (NoSuchMethodException e) {
-                               throw new FlinkRuntimeException("Failed to find 
Java legacy Cleaner#clean method", e);
-                       }
+               @Override
+               public String toString() {
+                       return cleanerName + " manager";
                }
+       }
 
-               private static Method 
getSharedSecretsGetJavaLangRefAccessMethod(Class<?> sharedSecretsClass) {
-                       try {
-                               return 
sharedSecretsClass.getMethod("getJavaLangRefAccess");
-                       } catch (NoSuchMethodException e) {
-                               throw new FlinkRuntimeException("Failed to find 
Java legacy SharedSecrets#getJavaLangRefAccess method", e);
-                       }
+       private static class CleanerFactoryProvider {
+               private final String cleanerName;
+               private final ReflectionUtils reflectionUtils;
+               private final String cleanerClassName;
+               private final Supplier<Optional<Object>> cleanerSupplier;
+               private final String cleanableCreationMethodName;
+               private final String cleanableClassName;
+               private final String cleanMethodName;
+
+               private CleanerFactoryProvider(
+                               String cleanerName,
+                               ReflectionUtils reflectionUtils,
+                               String cleanerClassName,
+                               Supplier<Optional<Object>> cleanerSupplier,
+                               String cleanableCreationMethodName, // Cleaner 
is a factory for Cleanable
+                               String cleanableClassName,
+                               String cleanMethodName) {
+                       this.cleanerName = cleanerName;
+                       this.reflectionUtils = reflectionUtils;
+                       this.cleanerClassName = cleanerClassName;
+                       this.cleanerSupplier = cleanerSupplier;
+                       this.cleanableCreationMethodName = 
cleanableCreationMethodName;
+                       this.cleanableClassName = cleanableClassName;
+                       this.cleanMethodName = cleanMethodName;
                }
 
-               private static Method 
getJavaLangRefAccessTryHandlePendingReference(Class<?> javaLangRefAccessClass) {
-                       try {
-                               return 
javaLangRefAccessClass.getMethod("tryHandlePendingReference");
-                       } catch (NoSuchMethodException e) {
-                               throw new FlinkRuntimeException("Failed to find 
Java legacy JavaLangRefAccess#tryHandlePendingReference method", e);
-                       }
+               private CleanerFactory createCleanerFactory() {
+                       Class<?> cleanerClass = 
reflectionUtils.findClass(cleanerClassName);
+                       Method cleanableCreationMethod = 
reflectionUtils.findMethod(
+                               cleanerClass,
+                               cleanableCreationMethodName,
+                               Object.class,
+                               Runnable.class);
+                       Class<?> cleanableClass = 
reflectionUtils.findClass(cleanableClassName);
+                       Method cleanMethod = 
reflectionUtils.findMethod(cleanableClass, cleanMethodName);
+                       return new CleanerFactory(
+                               cleanerName,
+                               cleanerSupplier.get().orElse(null), // static 
method of Cleaner class will be called to create Cleanable
+                               cleanableCreationMethod,
+                               cleanMethod);
                }
 
                @Override
                public String toString() {
-                       return "Legacy cleaner provider before Java 9 using " + 
LEGACY_CLEANER_CLASS_NAME;
+                       return cleanerName + " factory provider using " + 
cleanerClassName;
                }
        }
 
-       private static final class LegacyCleanerFactory implements 
CleanerFactory {
-               private final Method cleanerCreateMethod;
-               private final Method cleanerCleanMethod;
-               private final Method getJavaLangRefAccessMethod;
-               private final Method tryHandlePendingReferenceMethod;
-
-               private LegacyCleanerFactory(
-                               Method cleanerCreateMethod,
-                               Method cleanerCleanMethod,
-                               Method getJavaLangRefAccessMethod,
-                               Method tryHandlePendingReferenceMethod) {
-                       this.cleanerCreateMethod = cleanerCreateMethod;
-                       this.cleanerCleanMethod = cleanerCleanMethod;
-                       this.getJavaLangRefAccessMethod = 
getJavaLangRefAccessMethod;
-                       this.tryHandlePendingReferenceMethod = 
tryHandlePendingReferenceMethod;
+       private static class CleanerFactory {
+               private final String cleanerName;
+               private final Object cleaner;

Review comment:
       ```suggestion
                   @Nullable
                private final Object cleaner;
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to