rkhachatryan commented on a change in pull request #13912:
URL: https://github.com/apache/flink/pull/13912#discussion_r566247386



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java
##########
@@ -24,19 +24,46 @@
 import java.io.IOException;
 
 /**
- * CheckpointStorage defines how checkpoint snapshots are persisted for fault 
tolerance. Various
- * implementations store their checkpoints in different fashions and have 
different requirements and
- * availability guarantees.
+ * CheckpointStorage defines how {@link StateBackend}'s checkpoint their state 
for fault tolerance

Review comment:
       checkpoint their state -> store their state?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java
##########
@@ -24,19 +24,46 @@
 import java.io.IOException;
 
 /**
- * CheckpointStorage defines how checkpoint snapshots are persisted for fault 
tolerance. Various
- * implementations store their checkpoints in different fashions and have 
different requirements and
- * availability guarantees.
+ * CheckpointStorage defines how {@link StateBackend}'s checkpoint their state 
for fault tolerance
+ * in streaming applications. Various implementations store their checkpoints 
in different fashions
+ * and have different requirements and availability guarantees.
  *
- * <p>For example, JobManagerCheckpointStorage stores checkpoints in the 
memory of the JobManager.
- * It is lightweight and without additional dependencies but is not highly 
available and only
- * supports small state sizes. This checkpoint storage policy is convenient 
for local testing and
+ * <p>For example, {@link 
org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage
+ * JobManagerCheckpointStorage} stores checkpoints in the memory of the 
JobManager. It is
+ * lightweight and without additional dependencies but is not highly available 
and only supports

Review comment:
       IIUC, high availability can still be achieved with 
`JobManagerCheckpointStorage`. What can't be achieved is scalability.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/storage/AbstractFileCheckpointStorage.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.storage;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.CheckpointStorage;
+import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
+import 
org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.net.URI;
+
+/**
+ * A base class for all checkpoint storage instances that store their metadata 
(and data) in files.
+ *
+ * <p>This class takes the base checkpoint- and savepoint directory paths, but 
also accepts null for
+ * both of then, in which case creating externalized checkpoint is not 
possible, and it is not
+ * possible to create a savepoint with a default path. Null is accepted to 
enable implementations
+ * that only optionally support default savepoints.
+ *
+ * <h1>Checkpoint Layout</h1>
+ *
+ * <p>The checkpoint storage is configured with a base directory and persists 
the checkpoint data of
+ * specific checkpoints in specific subdirectories. For example, if the base 
directory was set to
+ * {@code hdfs://namenode:port/flink-checkpoints/}, the state backend will 
create a subdirectory
+ * with the job's ID that will contain the actual checkpoints: ({@code
+ * hdfs://namenode:port/flink-checkpoints/1b080b6e710aabbef8993ab18c6de98b})
+ *
+ * <p>Each checkpoint individually will store all its files in a subdirectory 
that includes the
+ * checkpoint number, such as {@code
+ * 
hdfs://namenode:port/flink-checkpoints/1b080b6e710aabbef8993ab18c6de98b/chk-17/}.
+ *
+ * <h1>Savepoint Layout</h1>
+ *
+ * <p>A savepoint that is set to be stored in path {@code 
hdfs://namenode:port/flink-savepoints/},
+ * will create a subdirectory {@code savepoint-jobId(0, 6)-randomDigits} in 
which it stores all
+ * savepoint data. The random digits are added as "entropy" to avoid directory 
collisions.
+ *
+ * <h1>Metadata File</h1>
+ *
+ * <p>A completed checkpoint writes its metadata into a file '{@value
+ * AbstractFsCheckpointStorageAccess#METADATA_FILE_NAME}'.
+ */
+@PublicEvolving
+public abstract class AbstractFileCheckpointStorage implements 
CheckpointStorage {
+
+    private static final long serialVersionUID = 1L;
+
+    // ------------------------------------------------------------------------
+    //  Checkpoint Storage Properties
+    // ------------------------------------------------------------------------
+
+    /** The path where checkpoints will be stored, or null, if none has been 
configured. */
+    @Nullable private Path baseCheckpointPath;
+
+    /** The path where savepoints will be stored, or null, if none has been 
configured. */
+    @Nullable private Path baseSavepointPath;
+
+    @Override
+    public CompletedCheckpointStorageLocation resolveCheckpoint(String 
pointer) throws IOException {
+        return 
AbstractFsCheckpointStorageAccess.resolveCheckpointPointer(pointer);
+    }
+
+    /**
+     * Gets the directory where savepoints are stored by default (when no 
custom path is given to
+     * the savepoint trigger command).
+     *
+     * @return The default directory for savepoints, or null, if no default 
directory has been
+     *     configured.
+     */
+    @Nullable
+    public Path getSavepointPath() {
+        return baseSavepointPath;
+    }
+
+    /** @param baseSavepointPath The base directory for savepoints. */
+    public void setSavepointPath(String baseSavepointPath) {
+        setSavepointPath(new Path(baseSavepointPath));
+    }
+
+    /** @param baseSavepointPath The base directory for savepoints. */
+    public void setSavepointPath(@Nullable Path baseSavepointPath) {
+        this.baseSavepointPath = baseSavepointPath == null ? null : 
validatePath(baseSavepointPath);
+    }
+
+    /**
+     * Sets the given savepoint directory, or the values defined in the given 
configuration. If a
+     * checkpoint parameter is not null, that value takes precedence over the 
value in the
+     * configuration. If the configuration does not specify a value, it is 
possible that the
+     * savepoint directory in the savepoint storage will be null.
+     *
+     * @param baseSavepointPath The checkpoint base directory to use (or null).
+     * @param config The configuration to read values from.
+     */
+    public void setSavepointPath(Path baseSavepointPath, ReadableConfig 
config) {
+        this.baseSavepointPath =
+                parameterOrConfigured(
+                        baseSavepointPath, config, 
CheckpointingOptions.SAVEPOINT_DIRECTORY);
+    }
+
+    /**
+     * Gets the directory where savepoints are stored by default (when no 
custom path is given to
+     * the savepoint trigger command).
+     *
+     * @return The default directory for savepoints, or null, if no default 
directory has been
+     *     configured.
+     */
+    @Nullable
+    public Path getCheckpointPath() {
+        return baseCheckpointPath;
+    }
+
+    /** @param baseCheckpointPath The base directory for checkpoints. */
+    public void setCheckpointPath(String baseCheckpointPath) {
+        setCheckpointPath(new Path(baseCheckpointPath));
+    }
+
+    /** @param baseCheckpointPath The base directory for checkpoints. */
+    public void setCheckpointPath(@Nullable Path baseCheckpointPath) {
+        this.baseCheckpointPath =
+                baseCheckpointPath == null ? null : 
validatePath(baseCheckpointPath);
+    }
+
+    /**
+     * Sets the given checkpoint directory, or the values defined in the given 
configuration. If a
+     * checkpoint parameter is not null, that value takes precedence over the 
value in the
+     * configuration. If the configuration does not specify a value, it is 
possible that the
+     * checkpoint directory in the checkpoint storage will be null.
+     *
+     * @param baseCheckpointPath The checkpoint base directory to use (or 
null).
+     * @param config The configuration to read values from.
+     */
+    public void setCheckpointPath(Path baseCheckpointPath, ReadableConfig 
config) {
+        this.baseCheckpointPath =
+                parameterOrConfigured(
+                        baseCheckpointPath, config, 
CheckpointingOptions.CHECKPOINTS_DIRECTORY);
+    }
+
+    // ------------------------------------------------------------------------
+    //  Utilities
+    // ------------------------------------------------------------------------
+
+    /**
+     * Checks the validity of the path's scheme and path.
+     *
+     * @param path The path to check.
+     * @return The URI as a Path.
+     * @throws IllegalArgumentException Thrown, if the URI misses scheme or 
path.
+     */
+    protected static Path validatePath(Path path) {
+        final URI uri = path.toUri();
+        final String scheme = uri.getScheme();
+        final String pathPart = uri.getPath();
+
+        // some validity checks
+        if (scheme == null) {
+            throw new IllegalArgumentException(
+                    "The scheme (hdfs://, file://, etc) is null. "
+                            + "Please specify the file system scheme 
explicitly in the URI.");
+        }
+        if (pathPart == null) {
+            throw new IllegalArgumentException(
+                    "The path to store the checkpoint data in is null. "
+                            + "Please specify a directory path for the 
checkpoint data.");
+        }
+        if (pathPart.length() == 0 || pathPart.equals("/")) {
+            throw new IllegalArgumentException("Cannot use the root directory 
for checkpoints.");
+        }
+
+        return path;
+    }
+
+    @Nullable
+    protected static Path parameterOrConfigured(

Review comment:
       How about moving this method to `ReadableConfig` (replacing `Path` type 
with `T` and adding `Function<String, T>` argument)?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/storage/AbstractFileCheckpointStorage.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.storage;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.CheckpointStorage;
+import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
+import 
org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.net.URI;
+
+/**
+ * A base class for all checkpoint storage instances that store their metadata 
(and data) in files.
+ *
+ * <p>This class takes the base checkpoint- and savepoint directory paths, but 
also accepts null for
+ * both of then, in which case creating externalized checkpoint is not 
possible, and it is not
+ * possible to create a savepoint with a default path. Null is accepted to 
enable implementations
+ * that only optionally support default savepoints.
+ *
+ * <h1>Checkpoint Layout</h1>
+ *
+ * <p>The checkpoint storage is configured with a base directory and persists 
the checkpoint data of
+ * specific checkpoints in specific subdirectories. For example, if the base 
directory was set to
+ * {@code hdfs://namenode:port/flink-checkpoints/}, the state backend will 
create a subdirectory
+ * with the job's ID that will contain the actual checkpoints: ({@code
+ * hdfs://namenode:port/flink-checkpoints/1b080b6e710aabbef8993ab18c6de98b})
+ *
+ * <p>Each checkpoint individually will store all its files in a subdirectory 
that includes the
+ * checkpoint number, such as {@code
+ * 
hdfs://namenode:port/flink-checkpoints/1b080b6e710aabbef8993ab18c6de98b/chk-17/}.
+ *
+ * <h1>Savepoint Layout</h1>
+ *
+ * <p>A savepoint that is set to be stored in path {@code 
hdfs://namenode:port/flink-savepoints/},
+ * will create a subdirectory {@code savepoint-jobId(0, 6)-randomDigits} in 
which it stores all
+ * savepoint data. The random digits are added as "entropy" to avoid directory 
collisions.
+ *
+ * <h1>Metadata File</h1>
+ *
+ * <p>A completed checkpoint writes its metadata into a file '{@value
+ * AbstractFsCheckpointStorageAccess#METADATA_FILE_NAME}'.
+ */
+@PublicEvolving
+public abstract class AbstractFileCheckpointStorage implements 
CheckpointStorage {
+
+    private static final long serialVersionUID = 1L;
+
+    // ------------------------------------------------------------------------
+    //  Checkpoint Storage Properties
+    // ------------------------------------------------------------------------
+
+    /** The path where checkpoints will be stored, or null, if none has been 
configured. */
+    @Nullable private Path baseCheckpointPath;
+
+    /** The path where savepoints will be stored, or null, if none has been 
configured. */
+    @Nullable private Path baseSavepointPath;
+
+    @Override
+    public CompletedCheckpointStorageLocation resolveCheckpoint(String 
pointer) throws IOException {
+        return 
AbstractFsCheckpointStorageAccess.resolveCheckpointPointer(pointer);
+    }
+
+    /**
+     * Gets the directory where savepoints are stored by default (when no 
custom path is given to
+     * the savepoint trigger command).
+     *
+     * @return The default directory for savepoints, or null, if no default 
directory has been
+     *     configured.
+     */
+    @Nullable
+    public Path getSavepointPath() {
+        return baseSavepointPath;
+    }
+
+    /** @param baseSavepointPath The base directory for savepoints. */
+    public void setSavepointPath(String baseSavepointPath) {
+        setSavepointPath(new Path(baseSavepointPath));
+    }
+
+    /** @param baseSavepointPath The base directory for savepoints. */
+    public void setSavepointPath(@Nullable Path baseSavepointPath) {
+        this.baseSavepointPath = baseSavepointPath == null ? null : 
validatePath(baseSavepointPath);
+    }
+
+    /**
+     * Sets the given savepoint directory, or the values defined in the given 
configuration. If a
+     * checkpoint parameter is not null, that value takes precedence over the 
value in the
+     * configuration. If the configuration does not specify a value, it is 
possible that the
+     * savepoint directory in the savepoint storage will be null.
+     *
+     * @param baseSavepointPath The checkpoint base directory to use (or null).
+     * @param config The configuration to read values from.
+     */
+    public void setSavepointPath(Path baseSavepointPath, ReadableConfig 
config) {
+        this.baseSavepointPath =
+                parameterOrConfigured(
+                        baseSavepointPath, config, 
CheckpointingOptions.SAVEPOINT_DIRECTORY);
+    }
+
+    /**
+     * Gets the directory where savepoints are stored by default (when no 
custom path is given to
+     * the savepoint trigger command).
+     *
+     * @return The default directory for savepoints, or null, if no default 
directory has been
+     *     configured.
+     */
+    @Nullable
+    public Path getCheckpointPath() {
+        return baseCheckpointPath;
+    }
+
+    /** @param baseCheckpointPath The base directory for checkpoints. */
+    public void setCheckpointPath(String baseCheckpointPath) {
+        setCheckpointPath(new Path(baseCheckpointPath));
+    }
+
+    /** @param baseCheckpointPath The base directory for checkpoints. */
+    public void setCheckpointPath(@Nullable Path baseCheckpointPath) {
+        this.baseCheckpointPath =
+                baseCheckpointPath == null ? null : 
validatePath(baseCheckpointPath);
+    }
+
+    /**
+     * Sets the given checkpoint directory, or the values defined in the given 
configuration. If a
+     * checkpoint parameter is not null, that value takes precedence over the 
value in the
+     * configuration. If the configuration does not specify a value, it is 
possible that the
+     * checkpoint directory in the checkpoint storage will be null.
+     *
+     * @param baseCheckpointPath The checkpoint base directory to use (or 
null).
+     * @param config The configuration to read values from.
+     */
+    public void setCheckpointPath(Path baseCheckpointPath, ReadableConfig 
config) {
+        this.baseCheckpointPath =
+                parameterOrConfigured(
+                        baseCheckpointPath, config, 
CheckpointingOptions.CHECKPOINTS_DIRECTORY);
+    }
+
+    // ------------------------------------------------------------------------
+    //  Utilities
+    // ------------------------------------------------------------------------
+
+    /**
+     * Checks the validity of the path's scheme and path.
+     *
+     * @param path The path to check.
+     * @return The URI as a Path.
+     * @throws IllegalArgumentException Thrown, if the URI misses scheme or 
path.
+     */
+    protected static Path validatePath(Path path) {

Review comment:
       How about moving this method is the `Path` class itself?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java
##########
@@ -24,19 +24,46 @@
 import java.io.IOException;
 
 /**
- * CheckpointStorage defines how checkpoint snapshots are persisted for fault 
tolerance. Various
- * implementations store their checkpoints in different fashions and have 
different requirements and
- * availability guarantees.
+ * CheckpointStorage defines how {@link StateBackend}'s checkpoint their state 
for fault tolerance
+ * in streaming applications. Various implementations store their checkpoints 
in different fashions
+ * and have different requirements and availability guarantees.
  *
- * <p>For example, JobManagerCheckpointStorage stores checkpoints in the 
memory of the JobManager.
- * It is lightweight and without additional dependencies but is not highly 
available and only
- * supports small state sizes. This checkpoint storage policy is convenient 
for local testing and
+ * <p>For example, {@link 
org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage
+ * JobManagerCheckpointStorage} stores checkpoints in the memory of the 
JobManager. It is
+ * lightweight and without additional dependencies but is not highly available 
and only supports
+ * small state sizes. This checkpoint storage policy is convenient for local 
testing and
  * development.
  *
- * <p>FileSystemCheckpointStorage stores checkpoints in a filesystem. For 
systems like HDFS, NFS
+ * <p>{@link org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage
+ * FileSystemCheckpointStorage} stores checkpoints in a filesystem. For 
systems like HDFS, NFS
  * Drives, S3, and GCS, this storage policy supports large state size, in the 
magnitude of many
  * terabytes while providing a highly available foundation for stateful 
applications. This
  * checkpoint storage policy is recommended for most production deployments.
+ *
+ * <h2>Raw Bytes Storage</h2>
+ *
+ * <p>The {@code CheckpointStorage} creates services for <i>raw bytes 
storage</i>.
+ *
+ * <p>The <i>raw bytes storage</i> (through the {@link 
CheckpointStreamFactory}) is the fundamental
+ * service that simply stores bytes in a fault tolerant fashion. This service 
is used by the
+ * JobManager to store checkpoint and recovery metadata and is typically also 
used by the keyed- and
+ * operator state backends to store checkpointed state.
+ *
+ * <h2>Serializability</h2>
+ *
+ * <p>State Backends need to be {@link java.io.Serializable serializable}, 
because they distributed
+ * across parallel processes (for distributed execution) together with the 
streaming application
+ * code.
+ *
+ * <p>Because of that, {@code CheckpointStorage} implementations are meant to 
be like
+ * <i>factories</i> that create the proper states stores that provide access 
to the persistent. That
+ * way, the Checkpoint Storage can be very lightweight (contain only 
configurations) which makes it
+ * easier to be serializable.
+ *
+ * <h2>Thread Safety</h2>
+ *
+ * <p>Checkpoint storage implementations have to be thread-safe. Multiple 
threads may be creating
+ * streams concurrently.
  */
 @PublicEvolving
 public interface CheckpointStorage extends java.io.Serializable {

Review comment:
       I think `resolveCheckpoint` javadoc is outdated as it 
   1. mentions state backends and 
   2. implies some snapshot validation (while all it does is only opening a 
file or something like that). 

##########
File path: 
flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
##########
@@ -32,17 +32,15 @@
     public static final ConfigOption<String> STATE_BACKEND =
             ConfigOptions.key("state.backend")
                     .noDefaultValue()
-                    .withDescription("The state backend to be used to store 
and checkpoint state.");
+                    .withDescription("The state backend to be used to store 
state.");
 
     /** The checkpoint storage used to checkpoint state. */
     @Documentation.Section(value = 
Documentation.Sections.COMMON_STATE_BACKENDS, position = 2)
-    @Documentation.ExcludeFromDocumentation(
-            "Hidden until FileSystemStorage and JobManagerStorage are 
implemented")
     public static final ConfigOption<String> CHECKPOINT_STORAGE =
             ConfigOptions.key("state.checkpoint-storage")
                     .stringType()
                     .noDefaultValue()
-                    .withDescription("The state backend to be used to 
checkpoint state.");
+                    .withDescription("The checkpoint storage to be used to 
checkpoint state.");

Review comment:
       I'd copy or move here (and to the javadoc) the part how to configure 
this option from 
    `CheckpointStorageLoader.fromConfig` javadoc. It talks about shortcuts, 
class name, etc. which might be relevant here.
   
   There, should "state backends" be replaced with "implementations"?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/storage/AbstractFileCheckpointStorage.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.storage;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.CheckpointStorage;
+import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
+import 
org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.net.URI;
+
+/**
+ * A base class for all checkpoint storage instances that store their metadata 
(and data) in files.
+ *
+ * <p>This class takes the base checkpoint- and savepoint directory paths, but 
also accepts null for
+ * both of then, in which case creating externalized checkpoint is not 
possible, and it is not

Review comment:
       ```suggestion
    * both of them, in which case creating externalized checkpoint is not 
possible, and it is not
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLoader.java
##########
@@ -162,6 +178,13 @@ public static CheckpointStorage load(
         Preconditions.checkNotNull(classLoader, "classLoader");
         Preconditions.checkNotNull(configuredStateBackend, "statebackend");
 
+        if (defaultSavepointDirectory != null) {
+            config.set(
+                    CheckpointingOptions.SAVEPOINT_DIRECTORY, 
defaultSavepointDirectory.toString());

Review comment:
       Could you add a brief motivation for this change to the commit message?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java
##########
@@ -24,19 +24,46 @@
 import java.io.IOException;
 
 /**
- * CheckpointStorage defines how checkpoint snapshots are persisted for fault 
tolerance. Various
- * implementations store their checkpoints in different fashions and have 
different requirements and
- * availability guarantees.
+ * CheckpointStorage defines how {@link StateBackend}'s checkpoint their state 
for fault tolerance
+ * in streaming applications. Various implementations store their checkpoints 
in different fashions
+ * and have different requirements and availability guarantees.
  *
- * <p>For example, JobManagerCheckpointStorage stores checkpoints in the 
memory of the JobManager.
- * It is lightweight and without additional dependencies but is not highly 
available and only
- * supports small state sizes. This checkpoint storage policy is convenient 
for local testing and
+ * <p>For example, {@link 
org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage
+ * JobManagerCheckpointStorage} stores checkpoints in the memory of the 
JobManager. It is
+ * lightweight and without additional dependencies but is not highly available 
and only supports
+ * small state sizes. This checkpoint storage policy is convenient for local 
testing and
  * development.
  *
- * <p>FileSystemCheckpointStorage stores checkpoints in a filesystem. For 
systems like HDFS, NFS
+ * <p>{@link org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage
+ * FileSystemCheckpointStorage} stores checkpoints in a filesystem. For 
systems like HDFS, NFS
  * Drives, S3, and GCS, this storage policy supports large state size, in the 
magnitude of many
  * terabytes while providing a highly available foundation for stateful 
applications. This
  * checkpoint storage policy is recommended for most production deployments.
+ *
+ * <h2>Raw Bytes Storage</h2>
+ *
+ * <p>The {@code CheckpointStorage} creates services for <i>raw bytes 
storage</i>.
+ *
+ * <p>The <i>raw bytes storage</i> (through the {@link 
CheckpointStreamFactory}) is the fundamental
+ * service that simply stores bytes in a fault tolerant fashion. This service 
is used by the
+ * JobManager to store checkpoint and recovery metadata and is typically also 
used by the keyed- and
+ * operator state backends to store checkpointed state.
+ *
+ * <h2>Serializability</h2>
+ *
+ * <p>State Backends need to be {@link java.io.Serializable serializable}, 
because they distributed

Review comment:
       ```suggestion
    * <p>Implementations need to be {@link java.io.Serializable serializable}, 
because they are distributed
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLoader.java
##########
@@ -84,12 +88,23 @@
 
         switch (storageName.toLowerCase()) {
             case JOB_MANAGER_STORAGE_NAME:
-                throw new UnsupportedOperationException(
-                        "JobManagerCheckpointStorage is not yet implemented");
+                if (logger != null) {

Review comment:
       What do you think about using usual `private static final` logger named 
by this class (`CheckpointStorageLoader.class`)?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/storage/AbstractFileCheckpointStorage.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.storage;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.CheckpointStorage;
+import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
+import 
org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.net.URI;
+
+/**
+ * A base class for all checkpoint storage instances that store their metadata 
(and data) in files.
+ *
+ * <p>This class takes the base checkpoint- and savepoint directory paths, but 
also accepts null for
+ * both of then, in which case creating externalized checkpoint is not 
possible, and it is not
+ * possible to create a savepoint with a default path. Null is accepted to 
enable implementations
+ * that only optionally support default savepoints.
+ *
+ * <h1>Checkpoint Layout</h1>
+ *
+ * <p>The checkpoint storage is configured with a base directory and persists 
the checkpoint data of
+ * specific checkpoints in specific subdirectories. For example, if the base 
directory was set to
+ * {@code hdfs://namenode:port/flink-checkpoints/}, the state backend will 
create a subdirectory
+ * with the job's ID that will contain the actual checkpoints: ({@code
+ * hdfs://namenode:port/flink-checkpoints/1b080b6e710aabbef8993ab18c6de98b})
+ *
+ * <p>Each checkpoint individually will store all its files in a subdirectory 
that includes the
+ * checkpoint number, such as {@code
+ * 
hdfs://namenode:port/flink-checkpoints/1b080b6e710aabbef8993ab18c6de98b/chk-17/}.
+ *
+ * <h1>Savepoint Layout</h1>
+ *
+ * <p>A savepoint that is set to be stored in path {@code 
hdfs://namenode:port/flink-savepoints/},
+ * will create a subdirectory {@code savepoint-jobId(0, 6)-randomDigits} in 
which it stores all
+ * savepoint data. The random digits are added as "entropy" to avoid directory 
collisions.
+ *
+ * <h1>Metadata File</h1>
+ *
+ * <p>A completed checkpoint writes its metadata into a file '{@value
+ * AbstractFsCheckpointStorageAccess#METADATA_FILE_NAME}'.
+ */
+@PublicEvolving
+public abstract class AbstractFileCheckpointStorage implements 
CheckpointStorage {

Review comment:
       I made a comment to remove this class (and hierarchy). If we still 
decide to keep it:
   1. I don't think it should be `@PublicEvolving`
   2. Nor public
   3. Rename `AbstractFileCheckpointStorage` to 
`AbstractFileSystemCheckpointStorage` for consistency?
   4. The javadoc still refers to state backends sometimes

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/storage/JobManagerCheckpointStorageFactory.java
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.storage;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.runtime.state.CheckpointStorageFactory;
+
+/** A factory that creates an {@link FileSystemCheckpointStorage} from a 
configuration. */
+@PublicEvolving
+public class JobManagerCheckpointStorageFactory
+        implements CheckpointStorageFactory<JobManagerCheckpointStorage> {
+
+    @Override
+    public JobManagerCheckpointStorage createFromConfig(

Review comment:
       How about moving this method to `JobManagerCheckpointStorage` and making 
it static?
   (and removing this class)
   
   ditto: `FileSystemCheckpointStorageFactory`

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/storage/JobManagerCheckpointStorage.java
##########
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.storage;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.CheckpointStorage;
+import org.apache.flink.runtime.state.CheckpointStorageAccess;
+import org.apache.flink.runtime.state.ConfigurableCheckpointStorage;
+import 
org.apache.flink.runtime.state.memory.MemoryBackendCheckpointStorageAccess;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * The {@link CheckpointStorage} checkpoints state directly to the 
JobManager's memory (hence the
+ * name), but savepoints will be persisted to a file system.
+ *
+ * <p>This checkpoint storage is primarily for experimentation, quick local 
setups, or for streaming
+ * applications that have very small state: Because it requires checkpoints to 
go through the
+ * JobManager's memory, larger state will occupy larger portions of the 
JobManager's main memory,
+ * reducing operational stability. For any other setup, the {@link 
FileSystemCheckpointStorage}
+ * should be used. The {@code FileSystemCheckpointStorage} but checkpoints 
state directly to files
+ * rather than to the JobManager's memory, thus supporting larger state sizes 
and more highly
+ * available recovery.
+ *
+ * <h1>State Size Considerations</h1>
+ *
+ * <p>State checkpointing with this storage is subject to the following 
conditions:
+ *
+ * <ul>
+ *   <li>Each individual state must not exceed the configured maximum state 
size (see {@link
+ *       #getMaxStateSize()}.
+ *   <li>All state from one task (i.e., the sum of all operator states and 
keyed states from all
+ *       chained operators of the task) must not exceed what the RPC system 
supports, which is be
+ *       default < 10 MB. That limit can be configured up, but that is 
typically not advised.
+ *   <li>The sum of all states in the application times all retained 
checkpoints must comfortably
+ *       fit into the JobManager's JVM heap space.
+ * </ul>
+ *
+ * <h1>Persistence Guarantees</h1>
+ *
+ * <p>For the use cases where the state sizes can be handled by this 
checkpoint storage, the storage
+ * does guarantee persistence for savepoints, externalized checkpoints (if 
configured), and
+ * checkpoints (when high-availability is configured).
+ *
+ * <h1>Configuration</h1>
+ *
+ * <p>As for all checkpoint storage, this storage policy can either be 
configured within the
+ * application (by creating the storage with the respective constructor 
parameters and setting it on
+ * the execution environment) or by specifying it in the Flink configuration.
+ *
+ * <p>If the checkpoint storage was specified in the application, it may pick 
up additional
+ * configuration parameters from the Flink configuration. For example, if the 
storage if configured
+ * in the application without a default savepoint directory, it will pick up a 
default savepoint
+ * directory specified in the Flink configuration of the running job/cluster. 
That behavior is
+ * implemented via the {@link #configure(ReadableConfig, ClassLoader)} method.
+ */
+@PublicEvolving
+public class JobManagerCheckpointStorage extends AbstractFileCheckpointStorage
+        implements CheckpointStorage, ConfigurableCheckpointStorage {

Review comment:
       I don't think that inheritance is necessary here. After moving two 
static methods from `AbstractFileCheckpointStorage`, only two fields an a call 
to static  `AbstractFsCheckpointStorageAccess.resolveCheckpointPointer` are 
inherited.
   
   Having JM storage extending FS storage is confusing to me, as TM doesn't 
actually use FS.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/storage/AbstractFileCheckpointStorage.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.storage;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.CheckpointStorage;
+import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
+import 
org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.net.URI;
+
+/**
+ * A base class for all checkpoint storage instances that store their metadata 
(and data) in files.
+ *
+ * <p>This class takes the base checkpoint- and savepoint directory paths, but 
also accepts null for
+ * both of then, in which case creating externalized checkpoint is not 
possible, and it is not
+ * possible to create a savepoint with a default path. Null is accepted to 
enable implementations
+ * that only optionally support default savepoints.
+ *
+ * <h1>Checkpoint Layout</h1>
+ *
+ * <p>The checkpoint storage is configured with a base directory and persists 
the checkpoint data of
+ * specific checkpoints in specific subdirectories. For example, if the base 
directory was set to
+ * {@code hdfs://namenode:port/flink-checkpoints/}, the state backend will 
create a subdirectory
+ * with the job's ID that will contain the actual checkpoints: ({@code
+ * hdfs://namenode:port/flink-checkpoints/1b080b6e710aabbef8993ab18c6de98b})
+ *
+ * <p>Each checkpoint individually will store all its files in a subdirectory 
that includes the
+ * checkpoint number, such as {@code
+ * 
hdfs://namenode:port/flink-checkpoints/1b080b6e710aabbef8993ab18c6de98b/chk-17/}.
+ *
+ * <h1>Savepoint Layout</h1>
+ *
+ * <p>A savepoint that is set to be stored in path {@code 
hdfs://namenode:port/flink-savepoints/},
+ * will create a subdirectory {@code savepoint-jobId(0, 6)-randomDigits} in 
which it stores all
+ * savepoint data. The random digits are added as "entropy" to avoid directory 
collisions.
+ *
+ * <h1>Metadata File</h1>
+ *
+ * <p>A completed checkpoint writes its metadata into a file '{@value
+ * AbstractFsCheckpointStorageAccess#METADATA_FILE_NAME}'.
+ */
+@PublicEvolving
+public abstract class AbstractFileCheckpointStorage implements 
CheckpointStorage {
+
+    private static final long serialVersionUID = 1L;
+
+    // ------------------------------------------------------------------------
+    //  Checkpoint Storage Properties
+    // ------------------------------------------------------------------------
+
+    /** The path where checkpoints will be stored, or null, if none has been 
configured. */
+    @Nullable private Path baseCheckpointPath;
+
+    /** The path where savepoints will be stored, or null, if none has been 
configured. */
+    @Nullable private Path baseSavepointPath;
+
+    @Override
+    public CompletedCheckpointStorageLocation resolveCheckpoint(String 
pointer) throws IOException {
+        return 
AbstractFsCheckpointStorageAccess.resolveCheckpointPointer(pointer);
+    }
+
+    /**
+     * Gets the directory where savepoints are stored by default (when no 
custom path is given to
+     * the savepoint trigger command).
+     *
+     * @return The default directory for savepoints, or null, if no default 
directory has been
+     *     configured.
+     */
+    @Nullable
+    public Path getSavepointPath() {
+        return baseSavepointPath;
+    }
+
+    /** @param baseSavepointPath The base directory for savepoints. */
+    public void setSavepointPath(String baseSavepointPath) {
+        setSavepointPath(new Path(baseSavepointPath));
+    }
+
+    /** @param baseSavepointPath The base directory for savepoints. */
+    public void setSavepointPath(@Nullable Path baseSavepointPath) {
+        this.baseSavepointPath = baseSavepointPath == null ? null : 
validatePath(baseSavepointPath);
+    }
+
+    /**
+     * Sets the given savepoint directory, or the values defined in the given 
configuration. If a
+     * checkpoint parameter is not null, that value takes precedence over the 
value in the
+     * configuration. If the configuration does not specify a value, it is 
possible that the
+     * savepoint directory in the savepoint storage will be null.
+     *
+     * @param baseSavepointPath The checkpoint base directory to use (or null).
+     * @param config The configuration to read values from.
+     */
+    public void setSavepointPath(Path baseSavepointPath, ReadableConfig 
config) {
+        this.baseSavepointPath =
+                parameterOrConfigured(
+                        baseSavepointPath, config, 
CheckpointingOptions.SAVEPOINT_DIRECTORY);
+    }
+
+    /**
+     * Gets the directory where savepoints are stored by default (when no 
custom path is given to
+     * the savepoint trigger command).
+     *
+     * @return The default directory for savepoints, or null, if no default 
directory has been
+     *     configured.
+     */
+    @Nullable
+    public Path getCheckpointPath() {
+        return baseCheckpointPath;
+    }
+
+    /** @param baseCheckpointPath The base directory for checkpoints. */
+    public void setCheckpointPath(String baseCheckpointPath) {
+        setCheckpointPath(new Path(baseCheckpointPath));
+    }
+
+    /** @param baseCheckpointPath The base directory for checkpoints. */
+    public void setCheckpointPath(@Nullable Path baseCheckpointPath) {
+        this.baseCheckpointPath =
+                baseCheckpointPath == null ? null : 
validatePath(baseCheckpointPath);
+    }
+
+    /**
+     * Sets the given checkpoint directory, or the values defined in the given 
configuration. If a
+     * checkpoint parameter is not null, that value takes precedence over the 
value in the
+     * configuration. If the configuration does not specify a value, it is 
possible that the
+     * checkpoint directory in the checkpoint storage will be null.
+     *
+     * @param baseCheckpointPath The checkpoint base directory to use (or 
null).
+     * @param config The configuration to read values from.
+     */
+    public void setCheckpointPath(Path baseCheckpointPath, ReadableConfig 
config) {
+        this.baseCheckpointPath =
+                parameterOrConfigured(
+                        baseCheckpointPath, config, 
CheckpointingOptions.CHECKPOINTS_DIRECTORY);
+    }
+
+    // ------------------------------------------------------------------------
+    //  Utilities
+    // ------------------------------------------------------------------------
+
+    /**
+     * Checks the validity of the path's scheme and path.
+     *
+     * @param path The path to check.
+     * @return The URI as a Path.
+     * @throws IllegalArgumentException Thrown, if the URI misses scheme or 
path.
+     */
+    protected static Path validatePath(Path path) {
+        final URI uri = path.toUri();
+        final String scheme = uri.getScheme();
+        final String pathPart = uri.getPath();
+
+        // some validity checks
+        if (scheme == null) {
+            throw new IllegalArgumentException(
+                    "The scheme (hdfs://, file://, etc) is null. "
+                            + "Please specify the file system scheme 
explicitly in the URI.");
+        }
+        if (pathPart == null) {
+            throw new IllegalArgumentException(
+                    "The path to store the checkpoint data in is null. "
+                            + "Please specify a directory path for the 
checkpoint data.");
+        }
+        if (pathPart.length() == 0 || pathPart.equals("/")) {
+            throw new IllegalArgumentException("Cannot use the root directory 
for checkpoints.");
+        }
+
+        return path;
+    }
+
+    @Nullable
+    protected static Path parameterOrConfigured(
+            @Nullable Path path, ReadableConfig config, ConfigOption<String> 
option) {
+        if (path != null) {
+            return path;
+        } else {
+            String configValue = config.get(option);
+            try {
+                return configValue == null ? null : new Path(configValue);

Review comment:
       nit: `return config.getOptional(option).map(Path::new).orElse(null);` ?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/storage/FileSystemCheckpointStorage.java
##########
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.storage;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.CheckpointStorage;
+import org.apache.flink.runtime.state.CheckpointStorageAccess;
+import org.apache.flink.runtime.state.ConfigurableCheckpointStorage;
+import org.apache.flink.runtime.state.filesystem.FsCheckpointStorageAccess;
+import org.apache.flink.util.MathUtils;
+
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.net.URI;
+
+import static 
org.apache.flink.configuration.CheckpointingOptions.FS_SMALL_FILE_THRESHOLD;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link FileSystemCheckpointStorage} checkpoints state as files to a file 
system.
+ *
+ * <p>Each checkpoint individually will store all its files in a subdirectory 
that includes the
+ * checkpoint number, such as {@code 
hdfs://namenode:port/flink-checkpoints/chk-17/}.
+ *
+ * <h1>State Size Considerations</h1>
+ *
+ * <p>This checkpoint storage stores small state chunks directly with the 
metadata, to avoid
+ * creating many small files. The threshold for that is configurable. When 
increasing this
+ * threshold, the size of the checkpoint metadata increases. The checkpoint 
metadata of all retained
+ * completed checkpoints needs to fit into the JobManager's heap memory. This 
is typically not a
+ * problem, unless the threshold {@link #getMinFileSizeThreshold()} is 
increased significantly.
+ *
+ * <h1>Persistence Guarantees</h1>
+ *
+ * <p>Checkpoints from this checkpoint storage are as persistent and available 
as filesystem that is
+ * written to. If the file system is a persistent distributed file system, 
this checkpoint storage
+ * supports highly available setups. The backend additionally supports 
savepoints and externalized
+ * checkpoints.
+ *
+ * <h1>Configuration</h1>
+ *
+ * <p>As for all checkpoint storage policies, this backend can either be 
configured within the
+ * application (by creating the backend with the respective constructor 
parameters and setting it on
+ * the execution environment) or by specifying it in the Flink configuration.
+ *
+ * <p>If the checkpoint storage was specified in the application, it may pick 
up additional
+ * configuration parameters from the Flink configuration. For example, if the 
backend if configured
+ * in the application without a default savepoint directory, it will pick up a 
default savepoint
+ * directory specified in the Flink configuration of the running job/cluster. 
That behavior is
+ * implemented via the {@link #configure(ReadableConfig, ClassLoader)} method.
+ */
+@PublicEvolving
+public class FileSystemCheckpointStorage extends AbstractFileCheckpointStorage
+        implements CheckpointStorage, ConfigurableCheckpointStorage {
+
+    private static final long serialVersionUID = -8191916350224044011L;
+
+    /** Maximum size of state that is stored with the metadata, rather than in 
files (1 MiByte). */
+    private static final int MAX_FILE_STATE_THRESHOLD = 1024 * 1024;
+
+    // ------------------------------------------------------------------------
+
+    /**
+     * State below this size will be stored as part of the metadata, rather 
than in files. A value
+     * of '-1' means not yet configured, in which case the default will be 
used.
+     */
+    private final int fileStateThreshold;
+
+    /**
+     * The write buffer size for created checkpoint stream, this should not be 
less than file state
+     * threshold when we want state below that threshold stored as part of 
metadata not files. A
+     * value of '-1' means not yet configured, in which case the default will 
be used.
+     */
+    private final int writeBufferSize;
+
+    /**
+     * Creates a new checkpoint storage that stores its checkpoint data in the 
file system and
+     * location defined by the given URI.
+     *
+     * <p>A file system for the file system scheme in the URI (e.g., 
'file://', 'hdfs://', or
+     * 'S3://') must be accessible via {@link FileSystem#get(URI)}.
+     *
+     * <p>For a Job targeting HDFS, this means that the URI must either 
specify the authority (host
+     * and port), or that the Hadoop configuration that describes that 
information must be in the
+     * classpath.
+     *
+     * @param checkpointDirectory The path to write checkpoint metadata to.
+     */
+    public FileSystemCheckpointStorage(String checkpointDirectory) {
+        this(new Path(checkpointDirectory));
+    }
+
+    /**
+     * Creates a new checkpoint storage that stores its checkpoint data in the 
file system and
+     * location defined by the given URI.
+     *
+     * <p>A file system for the file system scheme in the URI (e.g., 
'file://', 'hdfs://', or
+     * 'S3://') must be accessible via {@link FileSystem#get(URI)}.
+     *
+     * <p>For a Job targeting HDFS, this means that the URI must either 
specify the authority (host
+     * and port), or that the Hadoop configuration that describes that 
information must be in the
+     * classpath.
+     *
+     * @param checkpointDirectory The path to write checkpoint metadata to.
+     */
+    public FileSystemCheckpointStorage(Path checkpointDirectory) {
+        this(checkpointDirectory, -1, -1);
+    }
+
+    /**
+     * Creates a new checkpoint storage that stores its checkpoint data in the 
file system and
+     * location defined by the given URI.
+     *
+     * <p>A file system for the file system scheme in the URI (e.g., 
'file://', 'hdfs://', or
+     * 'S3://') must be accessible via {@link FileSystem#get(URI)}.
+     *
+     * <p>For a Job targeting HDFS, this means that the URI must either 
specify the authority (host
+     * and port), or that the Hadoop configuration that describes that 
information must be in the
+     * classpath.
+     *
+     * @param checkpointDirectory The path to write checkpoint metadata to.
+     */
+    public FileSystemCheckpointStorage(URI checkpointDirectory) {
+        this(new Path(checkpointDirectory));
+    }
+
+    /**
+     * Creates a new checkpoint storage that stores its checkpoint data in the 
file system and
+     * location defined by the given URI.
+     *
+     * <p>A file system for the file system scheme in the URI (e.g., 
'file://', 'hdfs://', or
+     * 'S3://') must be accessible via {@link FileSystem#get(URI)}.
+     *
+     * <p>For a Job targeting HDFS, this means that the URI must either 
specify the authority (host
+     * and port), or that the Hadoop configuration that describes that 
information must be in the
+     * classpath.
+     *
+     * @param checkpointDirectory The path to write checkpoint metadata to.
+     * @param fileStateSizeThreshold State below this size will be stored as 
part of the metadata,
+     *     rather than in files. If -1, the value configured in the runtime 
configuration will be
+     *     used, or the default value (1KB) if nothing is configured.
+     */
+    public FileSystemCheckpointStorage(URI checkpointDirectory, int 
fileStateSizeThreshold) {
+        this(new Path(checkpointDirectory), fileStateSizeThreshold, -1);
+    }
+
+    /**
+     * Creates a new checkpoint storage that stores its checkpoint data in the 
file system and
+     * location defined by the given URI.
+     *
+     * <p>A file system for the file system scheme in the URI (e.g., 
'file://', 'hdfs://', or
+     * 'S3://') must be accessible via {@link FileSystem#get(URI)}.
+     *
+     * <p>For a Job targeting HDFS, this means that the URI must either 
specify the authority (host
+     * and port), or that the Hadoop configuration that describes that 
information must be in the
+     * classpath.
+     *
+     * @param checkpointDirectory The path to write checkpoint metadata to.
+     * @param fileStateSizeThreshold State below this size will be stored as 
part of the metadata,
+     *     rather than in files. If -1, the value configured in the runtime 
configuration will be
+     *     used, or the default value (1KB) if nothing is configured.
+     * @param writeBufferSize Write buffer size used to serialize state. If 
-1, the value configured
+     *     in the runtime configuration will be used, or the default value 
(4KB) if nothing is
+     *     configured.
+     */
+    public FileSystemCheckpointStorage(
+            Path checkpointDirectory, int fileStateSizeThreshold, int 
writeBufferSize) {
+
+        checkNotNull(checkpointDirectory, "checkpoint directory is null");
+        checkArgument(
+                fileStateSizeThreshold >= -1 && fileStateSizeThreshold <= 
MAX_FILE_STATE_THRESHOLD,
+                "The threshold for file state size must be in [-1, %s], where 
'-1' means to use "
+                        + "the value from the deployment's configuration.",
+                MAX_FILE_STATE_THRESHOLD);
+        checkArgument(
+                writeBufferSize >= -1,
+                "The write buffer size must be not less than '-1', where '-1' 
means to use "
+                        + "the value from the deployment's configuration.");
+
+        this.fileStateThreshold = fileStateSizeThreshold;
+        this.writeBufferSize = writeBufferSize;
+        setCheckpointPath(checkpointDirectory);
+    }
+
+    /**
+     * Private constructor that creates a re-configured copy of the checkpoint 
storage.
+     *
+     * @param original The checkpoint storage to re-configure
+     * @param configuration The configuration
+     */
+    private FileSystemCheckpointStorage(
+            FileSystemCheckpointStorage original,
+            ReadableConfig configuration,
+            ClassLoader classLoader) {

Review comment:
       `classLoader` is never used.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to