[ 
https://issues.apache.org/jira/browse/FLINK-24506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias updated FLINK-24506:
-----------------------------
    Description: 
FLINK-19463 introduced the separation of {{StateBackend}} and 
{{{}CheckpointStorage{}}}. Before that, both were included in the same 
interface implementation 
[AbstractFileStateBackend|https://github.com/apache/flink/blob/0a76daba0a428a322f0273d7dc6a70966f62bf26/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateBackend.java].
 {{FsStateBackend}} was used as a default implementation pre-1.13.

pre-{{{}1.13{}}} initialized the checkpoint directory when instantiating the 
state backend (see 
[FsStateBackendFactory|https://github.com/apache/flink/blob/release-1.12/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java#L46]).
 Starting from {{1.13}} loading the {{CheckpointStorage}} is done by the 
{{CheckpointStorageLoader.load}} method that is called in various places:
 * Savepoint Disposal (through {{{}Checkpoints.loadCheckpointStorage{}}}) where 
it only relies on the configuration passed in by the cluster configuration (no 
application checkpoint storage is passed)
 * {{SchedulerBase}} initialization (through DefaultExecutionGraphBuilder) 
where it’s based on the cluster’s configuration but also the application 
configuration (i.e. the {{{}JobGraph{}}}’s setting) that would be considered if 
{{CheckpointConfig#configure}} would have the checkpoint storage included
 * {{StreamTask}} on the {{{}TaskManager{}}}’s side where it’s based on the 
configuration passed in by the {{JobVertex}} for the application’s 
{{CheckpointStorage}} and the {{{}TaskManager{}}}’s configuration (coming from 
the session cluster) for the fallback {{CheckpointStorage}}

The issue is that we don't set the checkpoint directory in the 
{{{}CheckpointConfig{}}}. Hence, it's not going to get picked up as a 
job-related property. Flink always uses the fallback provided by the session 
cluster configuration.

  was:
FLINK-19463 introduced the separation of `StateBackend` and 
`CheckpointStorage`. Before that, both were included in the same interface 
implementation 
[AbstractFileStateBackend|https://github.com/apache/flink/blob/0a76daba0a428a322f0273d7dc6a70966f62bf26/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateBackend.java].
 `FsStateBackend` was used as a default implementation pre-1.13.

pre-{{1.13}} initialized the checkpoint directory when instantiating the state 
backend (see 
[FsStateBackendFactory|https://github.com/apache/flink/blob/release-1.12/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java#L46]).
 Starting from {{1.13}} loading the {{CheckpointStorage}} is done by the 
{{CheckpointStorageLoader.load}} method that is called in various places:
* Savepoint Disposal (through {{Checkpoints.loadCheckpointStorage}}) where it 
only relies on the configuration passed in by the cluster configuration (no 
application checkpoint storage is passed)
* {{SchedulerBase}} initialization (through DefaultExecutionGraphBuilder) where 
it’s based on the cluster’s configuration but also the application 
configuration (i.e. the {{JobGraph}}’s setting) that would be considered if 
{{CheckpointConfig#configure}} would have the checkpoint storage included
* {{StreamTask}} on the {{TaskManager}}’s side where it’s based on the 
configuration passed in by the {{JobVertex}} for the application’s 
{{CheckpointStorage}} and the {[TaskManager}}’s configuration (coming from the 
session cluster) for the fallback {{CheckpointStorage}}

The issue is that we don't set the checkpoint directory in the 
{{CheckpointConfig}}. Hence, it's not going to get picked up as a job-related 
property. Flink always uses the fallback provided by the session cluster 
configuration.


> checkpoint directory is not configurable through the Flink configuration 
> passed into the StreamExecutionEnvironment
> -------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-24506
>                 URL: https://issues.apache.org/jira/browse/FLINK-24506
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Configuration, Runtime / State Backends
>    Affects Versions: 1.14.0, 1.13.2
>            Reporter: Matthias
>            Priority: Major
>
> FLINK-19463 introduced the separation of {{StateBackend}} and 
> {{{}CheckpointStorage{}}}. Before that, both were included in the same 
> interface implementation 
> [AbstractFileStateBackend|https://github.com/apache/flink/blob/0a76daba0a428a322f0273d7dc6a70966f62bf26/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateBackend.java].
>  {{FsStateBackend}} was used as a default implementation pre-1.13.
> pre-{{{}1.13{}}} initialized the checkpoint directory when instantiating the 
> state backend (see 
> [FsStateBackendFactory|https://github.com/apache/flink/blob/release-1.12/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java#L46]).
>  Starting from {{1.13}} loading the {{CheckpointStorage}} is done by the 
> {{CheckpointStorageLoader.load}} method that is called in various places:
>  * Savepoint Disposal (through {{{}Checkpoints.loadCheckpointStorage{}}}) 
> where it only relies on the configuration passed in by the cluster 
> configuration (no application checkpoint storage is passed)
>  * {{SchedulerBase}} initialization (through DefaultExecutionGraphBuilder) 
> where it’s based on the cluster’s configuration but also the application 
> configuration (i.e. the {{{}JobGraph{}}}’s setting) that would be considered 
> if {{CheckpointConfig#configure}} would have the checkpoint storage included
>  * {{StreamTask}} on the {{{}TaskManager{}}}’s side where it’s based on the 
> configuration passed in by the {{JobVertex}} for the application’s 
> {{CheckpointStorage}} and the {{{}TaskManager{}}}’s configuration (coming 
> from the session cluster) for the fallback {{CheckpointStorage}}
> The issue is that we don't set the checkpoint directory in the 
> {{{}CheckpointConfig{}}}. Hence, it's not going to get picked up as a 
> job-related property. Flink always uses the fallback provided by the session 
> cluster configuration.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to