curcur commented on a change in pull request #14943: URL: https://github.com/apache/flink/pull/14943#discussion_r578931859
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java ########## @@ -242,6 +265,53 @@ public static StateBackend fromApplicationOrConfigOrDefault( return backend; } + public static StateBackend loadStateBackend( + @Nullable StateBackend fromApplication, + Configuration config, + ClassLoader classLoader, + @Nullable Logger logger) + throws IllegalConfigurationException, DynamicCodeLoadingException, IOException { + + final StateBackend backend = + fromApplicationOrConfigOrDefault(fromApplication, config, classLoader, logger); + + if (config.get(CheckpointingOptions.ENABLE_STATE_CHANGE_LOG)) { + + Preconditions.checkArgument( + backend instanceof DelegatedStateBackend, "backend is not delegable"); + + LOG.info( + "Delegate State Backend is used, and the root State Backend is {}", + backend.getClass().getSimpleName()); + + // ChangelogStateBackend resides in a separate module, load it using reflection + try { + Constructor<? extends DelegateStateBackend> constructor = + Class.forName(CHANGELOG_STATE_BACKEND, false, classLoader) + .asSubclass(DelegateStateBackend.class) + .getConstructor(DelegatedStateBackend.class); + + Class.forName(ROCKSDB_STATE_BACKEND_FACTORY, false, classLoader); Review comment: I've added it to test classloader, it should be removed. Sorry. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org