Hi, Did you enable externalized checkpoints? [1]
Best, Tony Wei [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/checkpoints.html#externalized-checkpoints 2017-10-24 13:07 GMT+08:00 vipul singh <neoea...@gmail.com>: > Thanks Aljoscha for the answer above. > > I am experimenting with savepoints and checkpoints on my end, so that we > built fault tolerant application with exactly once semantics. > > I have been able to test various scenarios, but have doubts about one use > case. > > My app is running on an emr cluster, and I am trying to test the case when > a emr cluster is terminated. I have read that *state.checkpoints.dir *is > responsible for storing metadata information, and links to data files in > *state.backend.fs.checkpointdir.* > > For my application I have configured both > *state.backend.fs.checkpointdir* and *state.checkpoints.dir* > > Also I have the following in my main app: > > env.enableCheckpointing(CHECKPOINT_TIME_MS) > > val CHECKPOINT_LOCATION = > s"s3://${config.s3Bucket}/${config.s3BasePath}/${config.s3ExtensionPath}/checkpoints/rocksdb" > > val backend:RocksDBStateBackend = > new RocksDBStateBackend(CHECKPOINT_LOCATION) > > env.setStateBackend(backend) > env.getCheckpointConfig.setMinPauseBetweenCheckpoints(CHECKPOINT_MIN_PAUSE) > env.getCheckpointConfig.setCheckpointTimeout(CHECKPOINT_TIMEOUT_MS) > env.getCheckpointConfig.setMaxConcurrentCheckpoints(CHECKPOINT_MAX_CONCURRENT) > > > In the application startup logs I can see *state.backend.fs.checkpointdir* > and *state.checkpoints.dir, *values being loaded. However when the > checkpoint happens I dont see any content in the metadata dir. Is there > something I am missing? Please let me know. I am using flink version 1.3 > > Thanks, > Vipul > > > > On Tue, Oct 10, 2017 at 7:55 AM, Aljoscha Krettek <aljos...@apache.org> > wrote: > >> Hi, >> >> Flink does not rely on file system operations to list contents, all >> necessary file paths are stored in the meta data file, as you guessed. This >> is the reason savepoints also work with file systems that "only" have >> read-after-write consistency. >> >> Best, >> Aljoscha >> >> >> On 10. Oct 2017, at 03:01, vipul singh <neoea...@gmail.com> wrote: >> >> Thanks Stefan for the answers above. These are really helpful. >> >> I have a few followup questions: >> >> 1. I see my savepoints are created in a folder, which has a _metadata >> file and another file. Looking at the code >> >> <https://github.com/apache/flink/blob/6642768ad8f8c5d1856742a6d148f7724c20666c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java#L191> >> it seems like the metadata file contains tasks states, operator state >> and master states >> >> <https://github.com/apache/flink/blob/6642768ad8f8c5d1856742a6d148f7724c20666c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java#L87>. >> What is the purpose of the other file in the savepoint folder? My guess is >> it should be a checkpoint file? >> 2. I am planning to use s3 as my state backend, so want to ensure >> that application restarts are not affected by read-after-write consistency >> of s3( if I use s3 as a savepoint backend). I am curious how flink >> restores >> data from the _metadata file, and the other file? Does the _metadata file >> contain path to these other files? or would it do a listing on the s3 >> folder? >> >> >> Please let me know, >> >> Thanks, >> Vipul >> >> On Tue, Sep 26, 2017 at 2:36 AM, Stefan Richter < >> s.rich...@data-artisans.com> wrote: >> >>> Hi, >>> >>> I have answered your questions inline: >>> >>> >>> 1. It seems to me that checkpoints can be treated as flink internal >>> recovery mechanism, and savepoints act more as user-defined recovery >>> points. Would that be a correct assumption? >>> >>> You could see it that way, but I would describe savepoints more as >>> user-defined *restart* points than *recovery* points. Please take a look at >>> my answers in this thread, because they cover most of your question: >>> >>> http://apache-flink-user-mailing-list-archive.2336050.n4.nab >>> ble.com/difference-between-checkpoints-amp-savepoints-td14787.html . >>> >>> >>> 1. While cancelling an application with -s option, it specifies the >>> savepoint location. Is there a way during application startup to identify >>> the last know savepoint from a folder by itself, and restart from there. >>> Since I am saving my savepoints on s3, I want to avoid issues arising >>> from >>> *ls* command on s3 due to read-after-write consistency of s3. >>> >>> I don’t think that this feature exists, you have to specify the >>> savepoint. >>> >>> >>> 1. Suppose my application has a checkpoint at point t1, and say i >>> cancel this application sometime in future before the next available >>> checkpoint( say t1+x). If I start the application without specifying the >>> savepoint, it will start from the last known checkpoint(at t1), which >>> wont >>> have the application state saved, since I had cancelled the application. >>> Would this is a correct assumption? >>> >>> If you restart a canceled application it will not consider checkpoints. >>> They are only considered in recovery on failure. You need to specify a >>> savepoint or externalized checkpoint for restarts to make explicit that you >>> intend to restart a job, and not to run a new instance of the job. >>> >>> >>> 1. Would using ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION be >>> same as manually saving regular savepoints? >>> >>> Not the same, because checkpoints and savepoints are different in >>> certain aspects, but both methods leave you with something that survives >>> job cancelation and can be used to restart from a certain state. >>> >>> Best, >>> Stefan >>> >>> >> >> >> -- >> Thanks, >> Vipul >> >> >> > > > -- > Thanks, > Vipul >