rkhachatryan commented on a change in pull request #16153:
URL: https://github.com/apache/flink/pull/16153#discussion_r654583087



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java
##########
@@ -384,6 +358,22 @@ public static boolean 
stateBackendFromApplicationOrConfigOrDefaultUseManagedMemo
         return false;
     }
 
+    /**
+     * Try to unwrap a DelegatingStateBackend and get the inner delegated 
StateBackend. If the
+     * provided StateBackend is not a delegated StateBackend, just return 
itself.
+     *
+     * @param backend the provided StateBackend that may delegate another 
StateBackend.
+     * @return the root StateBackend W/O delegation.
+     */
+    @VisibleForTesting
+    public static StateBackend unwrapFromDelegatingStateBackend(StateBackend 
backend) {
+        if (backend != null && backend instanceof DelegatingStateBackend) {

Review comment:
       nit: Is this method used **only** for tests? If so, maybe move it to 
`test` source directory?
   nit: I think `null` check is unnecessary here.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
##########
@@ -607,6 +612,53 @@ public StateBackend getStateBackend() {
         return defaultStateBackend;
     }
 
+    /**
+     * Enable the change log for current state backend. this changelog allows 
operators to persist
+     * state changes in a very fine-grained manner, as described below:
+     *
+     * <p>Stateful operators write the state changes to that log (logging the 
state), in addition to
+     * applying them to the state tables in RocksDB or the in-mem Hashtable.
+     *
+     * <p>An operator can acknowledge a checkpoint as soon as the changes in 
the log have reached
+     * the durable checkpoint storage.
+     *
+     * <p>The state tables are persisted periodically, independent of the 
checkpoints. We call this
+     * the materialization of the state on the checkpoint storage.
+     *
+     * <p>Once the state is materialized on checkpoint storage, the state 
changelog can be truncated
+     * to the corresponding point.
+     *
+     * <p>It establish a way to drastically reduce the checkpoint interval for 
streaming
+     * applications across state backends. For more details please check the 
FLIP-158.
+     *
+     * <p>If this method is not called explicitly, it means no preference for 
enabling the change
+     * log. Configs for change log enabling will override in different config 
levels
+     * (job/local/cluster).
+     *
+     * @param enabled true if enable the change log for state backend 
explicitly, otherwise disable
+     *     the change log.
+     * @return This StreamExecutionEnvironment itself, to allow chaining of 
function calls.
+     * @see #isChangelogStateBackendEnabled()
+     */
+    @PublicEvolving
+    public StreamExecutionEnvironment enableChangelogStateBackend(boolean 
enabled) {

Review comment:
       very good description :+1: :slightly_smiling_face: 
   
   Should we also mention that it only applies to keyed state currently? (so 
non-keyed operator state and channel state are checkpointed as usual)

##########
File path: 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateBackend.java
##########
@@ -58,7 +58,14 @@
 
     private final StateBackend delegatedStateBackend;
 
-    public ChangelogStateBackend(StateBackend stateBackend) {
+    /**
+     * Delegate a state backend by a ChangelogStateBackend.
+     *
+     * <p>As FLINK-22678 mentioned, we currently hide this constructor from 
user.
+     *
+     * @param stateBackend the delegated state backend.
+     */
+    ChangelogStateBackend(StateBackend stateBackend) {

Review comment:
       Maybe also mark the class as `@Internal` to make the intention clear to 
the user?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLoader.java
##########
@@ -175,21 +174,16 @@ public static CheckpointStorage load(
                     CheckpointingOptions.SAVEPOINT_DIRECTORY, 
defaultSavepointDirectory.toString());
         }
 
-        // Legacy state backends always take precedence for backwards 
compatibility.
-        StateBackend rootStateBackend =
-                (configuredStateBackend instanceof DelegatingStateBackend)
-                        ? ((DelegatingStateBackend) configuredStateBackend)
-                                .getDelegatedStateBackend()
-                        : configuredStateBackend;
-
-        if (rootStateBackend instanceof CheckpointStorage) {

Review comment:
       Could you please explain the removal? 
   IIUC, changelog state backend can still be passed here (even if hidden from 
user) when called from `StreamTask#createCheckpointStorage`
   
   cc: @curcur 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to