rkhachatryan commented on a change in pull request #16341:
URL: https://github.com/apache/flink/pull/16341#discussion_r664023741



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
##########
@@ -669,13 +677,18 @@ private void stopTaskExecutorServices() throws Exception {
                             taskInformation.getJobVertexId(),
                             tdd.getSubtaskIndex());
 
+            final StateChangelogStorage<?> changelogStorage =
+                    changelogStoragesManager.stateChangelogStorageForJob(
+                            jobId, jobInformation.getJobConfiguration());

Review comment:
       I think we are currently missing the API to set 
`STATE_CHANGE_LOG_STORAGE` from the user program (similar to 
`enableChangelogStateBackend()` for `ENABLE_STATE_CHANGE_LOG`).
   
   If we add it (and implement similarly) then the value will come in 
`taskInformation.getTaskConfiguration()`.
   
   And the value configured in yaml will come in 
`taskManagerConfiguration.getConfiguration()` as you pointed out.
   So `taskInformation.getTaskConfiguration()` should override 
`taskManagerConfiguration.getConfiguration()` - if the API is added.
   
   From the consistency point of view, the API should be added in this PR 
(something like `enableChangelogStateBackend("memory")`).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to