curcur commented on a change in pull request #14943: URL: https://github.com/apache/flink/pull/14943#discussion_r578932063
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java ########## @@ -242,6 +265,53 @@ public static StateBackend fromApplicationOrConfigOrDefault( return backend; } + public static StateBackend loadStateBackend( + @Nullable StateBackend fromApplication, + Configuration config, + ClassLoader classLoader, + @Nullable Logger logger) + throws IllegalConfigurationException, DynamicCodeLoadingException, IOException { + + final StateBackend backend = + fromApplicationOrConfigOrDefault(fromApplication, config, classLoader, logger); + + if (config.get(CheckpointingOptions.ENABLE_STATE_CHANGE_LOG)) { + + Preconditions.checkArgument( + backend instanceof DelegatedStateBackend, "backend is not delegable"); + + LOG.info( + "Delegate State Backend is used, and the root State Backend is {}", + backend.getClass().getSimpleName()); + + // ChangelogStateBackend resides in a separate module, load it using reflection + try { + Constructor<? extends DelegateStateBackend> constructor = + Class.forName(CHANGELOG_STATE_BACKEND, false, classLoader) + .asSubclass(DelegateStateBackend.class) + .getConstructor(DelegatedStateBackend.class); + + Class.forName(ROCKSDB_STATE_BACKEND_FACTORY, false, classLoader); Review comment: Thanks for the careful review, it may cause bugs that are strange/difficult to debug. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org