sjwiesman commented on a change in pull request #13797: URL: https://github.com/apache/flink/pull/13797#discussion_r562842079
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLoader.java ########## @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.util.DynamicCodeLoadingException; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; + +import javax.annotation.Nullable; + +import java.io.IOException; + +/** This class contains utility methods to load checkpoint storage from configurations. */ +public class CheckpointStorageLoader { + + public static final String JOB_MANAGER_STORAGE_NAME = "jobmanager"; + + public static final String FILE_SYSTEM_STORAGE_NAME = "filesystem"; + + /** + * Loads the checkpoint storage from the configuration, from the parameter + * 'state.checkpoint-storage', as defined in {@link CheckpointingOptions#CHECKPOINT_STORAGE}. + * + * <p>The state backends can be specified either via their shortcut name, or via the class name + * of a {@link CheckpointStorageFactory}. If a CheckpointStorageFactory class name is specified, + * the factory is instantiated (via its zero-argument constructor) and its {@link + * CheckpointStorageFactory#createFromConfig(ReadableConfig, ClassLoader)} method is called. + * + * <p>Recognized shortcut names are '{@value #JOB_MANAGER_STORAGE_NAME}', and '{@value + * #FILE_SYSTEM_STORAGE_NAME}'. + * + * @param config The configuration to load the checkpoint storage from + * @param classLoader The class loader that should be used to load the checkpoint storage + * @param logger Optionally, a logger to log actions to (may be null) + * @return The instantiated checkpoint storage. + * @throws DynamicCodeLoadingException Thrown if a checkpoint storage factory is configured and + * the factory class was not found or the factory could not be instantiated + * @throws IllegalConfigurationException May be thrown by the CheckpointStorageFactory when + * creating / configuring the checkpoint storage in the factory + * @throws IOException May be thrown by the CheckpointStorageFactory when instantiating the + * checkpoint storage + */ + public static CheckpointStorage loadCheckpointStorageFromConfig( + ReadableConfig config, ClassLoader classLoader, @Nullable Logger logger) + throws IllegalStateException, DynamicCodeLoadingException, IOException { + + Preconditions.checkNotNull(config, "config"); + Preconditions.checkNotNull(classLoader, "classLoader"); + + final String storageName = config.get(CheckpointingOptions.CHECKPOINT_STORAGE); + if (storageName == null) { + return null; + } + + switch (storageName.toLowerCase()) { + case JOB_MANAGER_STORAGE_NAME: + throw new IllegalStateException( + "JobManagerCheckpointStorage is not yet implemented"); + + case FILE_SYSTEM_STORAGE_NAME: + throw new IllegalStateException( + "FileSystemCheckpointStorage is not yet implemented"); + + default: + if (logger != null) { + logger.info("Loading state backend via factory {}", storageName); + } + + CheckpointStorageFactory<?> factory; + try { + @SuppressWarnings("rawtypes") + Class<? extends CheckpointStorageFactory> clazz = + Class.forName(storageName, false, classLoader) + .asSubclass(CheckpointStorageFactory.class); + + factory = clazz.newInstance(); + } catch (ClassNotFoundException e) { + throw new DynamicCodeLoadingException( + "Cannot find configured state backend factory class: " + storageName, + e); + } catch (ClassCastException | InstantiationException | IllegalAccessException e) { + throw new DynamicCodeLoadingException( + "The class configured under '" + + CheckpointingOptions.CHECKPOINT_STORAGE.key() + + "' is not a valid checkpoint storage factory (" + + storageName + + ')', + e); + } + + return factory.createFromConfig(config, classLoader); + } + } + + public static CheckpointStorage fromApplicationOrConfigOrDefault( + @Nullable CheckpointStorage fromApplication, Review comment: Flink's style guide says not to use `Optional` for function arguments. https://flink.apache.org/contributing/code-style-and-quality-java.html#java-optional ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org