Hi,

Did you enable externalized checkpoints? [1]

Best,
Tony Wei

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/checkpoints.html#externalized-checkpoints

2017-10-24 13:07 GMT+08:00 vipul singh <neoea...@gmail.com>:

> Thanks Aljoscha for the answer above.
>
> I am experimenting with savepoints and checkpoints on my end, so that we
> built fault tolerant application with exactly once semantics.
>
> I have been able to test various scenarios, but have doubts about one use
> case.
>
> My app is running on an emr cluster, and I am trying to test the case when
> a emr cluster is terminated. I have read that *state.checkpoints.dir *is
> responsible for storing metadata information, and links to data files in
> *state.backend.fs.checkpointdir.*
>
> For my application I have configured both
> *state.backend.fs.checkpointdir* and *state.checkpoints.dir*
>
> Also I have the following in my main app:
>
> env.enableCheckpointing(CHECKPOINT_TIME_MS)
>
> val CHECKPOINT_LOCATION = 
> s"s3://${config.s3Bucket}/${config.s3BasePath}/${config.s3ExtensionPath}/checkpoints/rocksdb"
>
> val backend:RocksDBStateBackend =
>   new RocksDBStateBackend(CHECKPOINT_LOCATION)
>
> env.setStateBackend(backend)
> env.getCheckpointConfig.setMinPauseBetweenCheckpoints(CHECKPOINT_MIN_PAUSE)
> env.getCheckpointConfig.setCheckpointTimeout(CHECKPOINT_TIMEOUT_MS)
> env.getCheckpointConfig.setMaxConcurrentCheckpoints(CHECKPOINT_MAX_CONCURRENT)
>
>
> In the application startup logs I can see *state.backend.fs.checkpointdir*
> and *state.checkpoints.dir, *values being loaded. However when the
> checkpoint happens I dont see any content in the metadata dir. Is there
> something I am missing? Please let me know. I am using flink version 1.3
>
> Thanks,
> Vipul
>
>
>
> On Tue, Oct 10, 2017 at 7:55 AM, Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
>> Hi,
>>
>> Flink does not rely on file system operations to list contents, all
>> necessary file paths are stored in the meta data file, as you guessed. This
>> is the reason savepoints also work with file systems that "only" have
>> read-after-write consistency.
>>
>> Best,
>> Aljoscha
>>
>>
>> On 10. Oct 2017, at 03:01, vipul singh <neoea...@gmail.com> wrote:
>>
>> Thanks Stefan for the answers above. These are really helpful.
>>
>> I have a few followup questions:
>>
>>    1. I see my savepoints are created in a folder, which has a _metadata
>>    file and another file. Looking at the code
>>    
>> <https://github.com/apache/flink/blob/6642768ad8f8c5d1856742a6d148f7724c20666c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java#L191>
>>    it seems like the metadata file contains tasks states, operator state
>>    and master states
>>    
>> <https://github.com/apache/flink/blob/6642768ad8f8c5d1856742a6d148f7724c20666c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java#L87>.
>>    What is the purpose of the other file in the savepoint folder? My guess is
>>    it should be a checkpoint file?
>>    2. I am planning to use s3 as my state backend, so want to ensure
>>    that application restarts are not affected by read-after-write consistency
>>    of s3( if I use s3 as a savepoint backend). I am curious how flink 
>> restores
>>    data from the _metadata file, and the other file? Does the _metadata file
>>    contain path to these other files? or would it do a listing on the s3
>>    folder?
>>
>>
>> Please let me know,
>>
>> Thanks,
>> Vipul
>>
>> On Tue, Sep 26, 2017 at 2:36 AM, Stefan Richter <
>> s.rich...@data-artisans.com> wrote:
>>
>>> Hi,
>>>
>>> I have answered your questions inline:
>>>
>>>
>>>    1. It seems to me that checkpoints can be treated as flink internal
>>>    recovery mechanism, and savepoints act more as user-defined recovery
>>>    points. Would that be a correct assumption?
>>>
>>> You could see it that way, but I would describe savepoints more as
>>> user-defined *restart* points than *recovery* points. Please take a look at
>>> my answers in this thread, because they cover most of your question:
>>>
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nab
>>> ble.com/difference-between-checkpoints-amp-savepoints-td14787.html .
>>>
>>>
>>>    1. While cancelling an application with -s option, it specifies the
>>>    savepoint location. Is there a way during application startup to identify
>>>    the last know savepoint from a folder by itself, and restart from there.
>>>    Since I am saving my savepoints on s3, I want to avoid issues arising 
>>> from
>>>    *ls* command on s3 due to read-after-write consistency of s3.
>>>
>>> I don’t think that this feature exists, you have to specify the
>>> savepoint.
>>>
>>>
>>>    1. Suppose my application has a checkpoint at point t1, and say i
>>>    cancel this application sometime in future before the next available
>>>    checkpoint( say t1+x). If I start the application without specifying the
>>>    savepoint, it will start from the last known checkpoint(at t1), which 
>>> wont
>>>    have the application state saved, since I had cancelled the application.
>>>    Would this is a correct assumption?
>>>
>>> If you restart a canceled application it will not consider checkpoints.
>>> They are only considered in recovery on failure. You need to specify a
>>> savepoint or externalized checkpoint for restarts to make explicit that you
>>> intend to restart a job, and not to run a new instance of the job.
>>>
>>>
>>>    1. Would using ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION be
>>>    same as manually saving regular savepoints?
>>>
>>> Not the same, because checkpoints and savepoints are different in
>>> certain aspects, but both methods leave you with something that survives
>>> job cancelation and can be used to restart from a certain state.
>>>
>>> Best,
>>> Stefan
>>>
>>>
>>
>>
>> --
>> Thanks,
>> Vipul
>>
>>
>>
>
>
> --
> Thanks,
> Vipul
>

Reply via email to