Hi team, I am a similar use case do we have any answers on this? When we trigger savepoint can we store that information to ZK as well? So I can avoid S3 file listing and do not have to use other external services?
On Wed, Oct 25, 2017 at 11:19 PM vipul singh <neoea...@gmail.com> wrote: > As a followup to above, is there a way to get the last checkpoint metadata > location inside *notifyCheckpointComplete* method? I tried poking > around, but didnt see a way to achieve this. Or incase there is any other > way to save the actual checkpoint metadata location information into a > datastore(dynamodb etc)? > > We are looking to save the savepoint/externalized checkpoint metadata > location in some storage space, so that we can pass this information to > flink run command during recovery(thereby removing the possibility of any > read after write consistency arising out of listing file paths etc). > > Thanks, > Vipul > > On Tue, Oct 24, 2017 at 11:53 PM, vipul singh <neoea...@gmail.com> wrote: > >> Thanks Aljoscha for the explanations. I was able to recover from the >> last externalized checkpoint, by using flink run -s <metadata file> >> <options> >> >> I am curious, are there any options to save the metadata file name to >> some other place like dynamo etc at the moment? The reason why I am asking >> is, >> for the end launcher code we are writing, we want to ensure if a flink >> job crashes, we can just start it from last known externalized checkpoint. >> In the present senario, we have to list the contents of the s3 bucket >> which saves the metadata, to see the last metadata before failure, and >> there might a window where >> we might run into read after write consistency of s3. Thoughts? >> >> On Tue, Oct 24, 2017 at 2:13 AM, Aljoscha Krettek <aljos...@apache.org> >> wrote: >> >>> Hi, >>> >>> That distinction with externalised checkpoints is a bit of a pitfall and >>> I'm hoping that we can actually get rid of that distinction in the next >>> version or the version after that. With that change, all checkpoints would >>> always be externalised, since it's not really any noticeable overhead. >>> >>> Regarding read-after-write consistency, you should be fine since an the >>> "externalised checkpoint", i.e. the metadata, is only one file. If you know >>> the file-path (either from the Flink dashboard or by looking at the S3 >>> bucket) you can restore from it. >>> >>> Best, >>> Aljoscha >>> >>> >>> On 24. Oct 2017, at 08:22, vipul singh <neoea...@gmail.com> wrote: >>> >>> Thanks Tony, that was the issue. I was thinking that when we use Rocksdb >>> and provide an s3 path, it uses externalized checkpoints by default. Thanks >>> so much! >>> >>> I have one followup question. Say in above case, I terminate the >>> cluster, and since the metadata is on s3, and not on local storage, does >>> flink avoid read after write consistency of s3? Would it be a valid >>> concern, or we handle that case in externalized checkpoints as well, and >>> dont deal with file system operations while dealing with retrieving >>> externalized checkpoints on s3. >>> >>> Thanks, >>> Vipul >>> >>> >>> >>> On Mon, Oct 23, 2017 at 11:00 PM, Tony Wei <tony19920...@gmail.com> >>> wrote: >>> >>>> Hi, >>>> >>>> Did you enable externalized checkpoints? [1] >>>> >>>> Best, >>>> Tony Wei >>>> >>>> [1] >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/checkpoints.html#externalized-checkpoints >>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/checkpoints.html#externalized-checkpoints> >>>> >>>> 2017-10-24 13:07 GMT+08:00 vipul singh <neoea...@gmail.com>: >>>> >>>>> Thanks Aljoscha for the answer above. >>>>> >>>>> I am experimenting with savepoints and checkpoints on my end, so that >>>>> we built fault tolerant application with exactly once semantics. >>>>> >>>>> I have been able to test various scenarios, but have doubts about one >>>>> use case. >>>>> >>>>> My app is running on an emr cluster, and I am trying to test the case >>>>> when a emr cluster is terminated. I have read that >>>>> *state.checkpoints.dir *is responsible for storing metadata >>>>> information, and links to data files in >>>>> *state.backend.fs.checkpointdir.* >>>>> >>>>> For my application I have configured both >>>>> *state.backend.fs.checkpointdir* and *state.checkpoints.dir* >>>>> >>>>> Also I have the following in my main app: >>>>> >>>>> env.enableCheckpointing(CHECKPOINT_TIME_MS) >>>>> >>>>> val CHECKPOINT_LOCATION = >>>>> s"s3://${config.s3Bucket}/${config.s3BasePath}/${config.s3ExtensionPath}/checkpoints/rocksdb" >>>>> >>>>> val backend:RocksDBStateBackend = >>>>> new RocksDBStateBackend(CHECKPOINT_LOCATION) >>>>> >>>>> env.setStateBackend(backend) >>>>> env.getCheckpointConfig.setMinPauseBetweenCheckpoints(CHECKPOINT_MIN_PAUSE) >>>>> env.getCheckpointConfig.setCheckpointTimeout(CHECKPOINT_TIMEOUT_MS) >>>>> env.getCheckpointConfig.setMaxConcurrentCheckpoints(CHECKPOINT_MAX_CONCURRENT) >>>>> >>>>> >>>>> In the application startup logs I can see >>>>> *state.backend.fs.checkpointdir* and *state.checkpoints.dir, *values >>>>> being loaded. However when the checkpoint happens I dont see any content >>>>> in >>>>> the metadata dir. Is there something I am missing? Please let me know. I >>>>> am >>>>> using flink version 1.3 >>>>> >>>>> Thanks, >>>>> Vipul >>>>> >>>>> >>>>> >>>>> On Tue, Oct 10, 2017 at 7:55 AM, Aljoscha Krettek <aljos...@apache.org >>>>> > wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> Flink does not rely on file system operations to list contents, all >>>>>> necessary file paths are stored in the meta data file, as you guessed. >>>>>> This >>>>>> is the reason savepoints also work with file systems that "only" have >>>>>> read-after-write consistency. >>>>>> >>>>>> Best, >>>>>> Aljoscha >>>>>> >>>>>> >>>>>> On 10. Oct 2017, at 03:01, vipul singh <neoea...@gmail.com> wrote: >>>>>> >>>>>> Thanks Stefan for the answers above. These are really helpful. >>>>>> >>>>>> I have a few followup questions: >>>>>> >>>>>> 1. I see my savepoints are created in a folder, which has a >>>>>> _metadata file and another file. Looking at the code >>>>>> >>>>>> <https://github.com/apache/flink/blob/6642768ad8f8c5d1856742a6d148f7724c20666c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java#L191> >>>>>> it seems like >>>>>> the metadata file contains tasks states, operator state and >>>>>> master states >>>>>> <https://github.com/apache/flink/blob/6642768ad8f8c5d1856742a6d148f7724c20666c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java#L87>. >>>>>> What is the purpose of the other file in the savepoint folder? My >>>>>> guess is >>>>>> it should be a checkpoint file? >>>>>> 2. I am planning to use s3 as my state backend, so want to ensure >>>>>> that application restarts are not affected by read-after-write >>>>>> consistency >>>>>> of s3( if I use s3 as a savepoint backend). I am curious how flink >>>>>> restores >>>>>> data from the _metadata file, and the other file? Does the _metadata >>>>>> file >>>>>> contain path to these other files? or would it do a listing on the s3 >>>>>> folder? >>>>>> >>>>>> >>>>>> Please let me know, >>>>>> >>>>>> Thanks, >>>>>> Vipul >>>>>> >>>>>> On Tue, Sep 26, 2017 at 2:36 AM, Stefan Richter < >>>>>> s.rich...@data-artisans.com> wrote: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> I have answered your questions inline: >>>>>>> >>>>>>> >>>>>>> 1. It seems to me that checkpoints can be treated as flink >>>>>>> internal recovery mechanism, and savepoints act more as user-defined >>>>>>> recovery points. Would that be a correct assumption? >>>>>>> >>>>>>> You could see it that way, but I would describe savepoints more as >>>>>>> user-defined *restart* points than *recovery* points. Please take a >>>>>>> look at >>>>>>> my answers in this thread, because they cover most of your question: >>>>>>> >>>>>>> >>>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/difference-between-checkpoints-amp-savepoints-td14787.html >>>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/difference-between-checkpoints-amp-savepoints-td14787.html> >>>>>>> . >>>>>>> >>>>>>> >>>>>>> 1. While cancelling an application with -s option, it specifies >>>>>>> the savepoint location. Is there a way during application startup to >>>>>>> identify the last know savepoint from a folder by itself, and >>>>>>> restart from >>>>>>> there. Since I am saving my savepoints on s3, I want to avoid issues >>>>>>> arising from *ls* command on s3 due to read-after-write >>>>>>> consistency of s3. >>>>>>> >>>>>>> I don’t think that this feature exists, you have to specify the >>>>>>> savepoint. >>>>>>> >>>>>>> >>>>>>> 1. Suppose my application has a checkpoint at point t1, and say >>>>>>> i cancel this application sometime in future before the next >>>>>>> available >>>>>>> checkpoint( say t1+x). If I start the application without specifying >>>>>>> the >>>>>>> savepoint, it will start from the last known checkpoint(at t1), >>>>>>> which wont >>>>>>> have the application state saved, since I had cancelled the >>>>>>> application. >>>>>>> Would this is a correct assumption? >>>>>>> >>>>>>> If you restart a canceled application it will not consider >>>>>>> checkpoints. They are only considered in recovery on failure. You need >>>>>>> to >>>>>>> specify a savepoint or externalized checkpoint for restarts to make >>>>>>> explicit that you intend to restart a job, and not to run a new >>>>>>> instance of >>>>>>> the job. >>>>>>> >>>>>>> >>>>>>> 1. Would using >>>>>>> ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION be same as >>>>>>> manually saving regular savepoints? >>>>>>> >>>>>>> Not the same, because checkpoints and savepoints are different in >>>>>>> certain aspects, but both methods leave you with something that survives >>>>>>> job cancelation and can be used to restart from a certain state. >>>>>>> >>>>>>> Best, >>>>>>> Stefan >>>>>>> >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Thanks, >>>>>> Vipul >>>>>> >>>>>> >>>>>> >>>>> >>>>> >>>>> -- >>>>> Thanks, >>>>> Vipul >>>>> >>>> >>>> >>> >>> >>> -- >>> Thanks, >>> Vipul >>> >>> >>> >> >> >> -- >> Thanks, >> Vipul >> > > > > -- > Thanks, > Vipul >