Thanks Tony, that was the issue. I was thinking that when we use Rocksdb and provide an s3 path, it uses externalized checkpoints by default. Thanks so much!
I have one followup question. Say in above case, I terminate the cluster, and since the metadata is on s3, and not on local storage, does flink avoid read after write consistency of s3? Would it be a valid concern, or we handle that case in externalized checkpoints as well, and dont deal with file system operations while dealing with retrieving externalized checkpoints on s3. Thanks, Vipul On Mon, Oct 23, 2017 at 11:00 PM, Tony Wei <tony19920...@gmail.com> wrote: > Hi, > > Did you enable externalized checkpoints? [1] > > Best, > Tony Wei > > [1] https://ci.apache.org/projects/flink/flink-docs- > release-1.3/setup/checkpoints.html#externalized-checkpoints > > 2017-10-24 13:07 GMT+08:00 vipul singh <neoea...@gmail.com>: > >> Thanks Aljoscha for the answer above. >> >> I am experimenting with savepoints and checkpoints on my end, so that we >> built fault tolerant application with exactly once semantics. >> >> I have been able to test various scenarios, but have doubts about one use >> case. >> >> My app is running on an emr cluster, and I am trying to test the case >> when a emr cluster is terminated. I have read that >> *state.checkpoints.dir *is responsible for storing metadata information, >> and links to data files in *state.backend.fs.checkpointdir.* >> >> For my application I have configured both >> *state.backend.fs.checkpointdir* and *state.checkpoints.dir* >> >> Also I have the following in my main app: >> >> env.enableCheckpointing(CHECKPOINT_TIME_MS) >> >> val CHECKPOINT_LOCATION = >> s"s3://${config.s3Bucket}/${config.s3BasePath}/${config.s3ExtensionPath}/checkpoints/rocksdb" >> >> val backend:RocksDBStateBackend = >> new RocksDBStateBackend(CHECKPOINT_LOCATION) >> >> env.setStateBackend(backend) >> env.getCheckpointConfig.setMinPauseBetweenCheckpoints(CHECKPOINT_MIN_PAUSE) >> env.getCheckpointConfig.setCheckpointTimeout(CHECKPOINT_TIMEOUT_MS) >> env.getCheckpointConfig.setMaxConcurrentCheckpoints(CHECKPOINT_MAX_CONCURRENT) >> >> >> In the application startup logs I can see >> *state.backend.fs.checkpointdir* and *state.checkpoints.dir, *values >> being loaded. However when the checkpoint happens I dont see any content in >> the metadata dir. Is there something I am missing? Please let me know. I am >> using flink version 1.3 >> >> Thanks, >> Vipul >> >> >> >> On Tue, Oct 10, 2017 at 7:55 AM, Aljoscha Krettek <aljos...@apache.org> >> wrote: >> >>> Hi, >>> >>> Flink does not rely on file system operations to list contents, all >>> necessary file paths are stored in the meta data file, as you guessed. This >>> is the reason savepoints also work with file systems that "only" have >>> read-after-write consistency. >>> >>> Best, >>> Aljoscha >>> >>> >>> On 10. Oct 2017, at 03:01, vipul singh <neoea...@gmail.com> wrote: >>> >>> Thanks Stefan for the answers above. These are really helpful. >>> >>> I have a few followup questions: >>> >>> 1. I see my savepoints are created in a folder, which has a >>> _metadata file and another file. Looking at the code >>> >>> <https://github.com/apache/flink/blob/6642768ad8f8c5d1856742a6d148f7724c20666c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java#L191> >>> it seems like the metadata file contains tasks states, operator >>> state and master states >>> >>> <https://github.com/apache/flink/blob/6642768ad8f8c5d1856742a6d148f7724c20666c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java#L87>. >>> What is the purpose of the other file in the savepoint folder? My guess >>> is >>> it should be a checkpoint file? >>> 2. I am planning to use s3 as my state backend, so want to ensure >>> that application restarts are not affected by read-after-write >>> consistency >>> of s3( if I use s3 as a savepoint backend). I am curious how flink >>> restores >>> data from the _metadata file, and the other file? Does the _metadata file >>> contain path to these other files? or would it do a listing on the s3 >>> folder? >>> >>> >>> Please let me know, >>> >>> Thanks, >>> Vipul >>> >>> On Tue, Sep 26, 2017 at 2:36 AM, Stefan Richter < >>> s.rich...@data-artisans.com> wrote: >>> >>>> Hi, >>>> >>>> I have answered your questions inline: >>>> >>>> >>>> 1. It seems to me that checkpoints can be treated as flink internal >>>> recovery mechanism, and savepoints act more as user-defined recovery >>>> points. Would that be a correct assumption? >>>> >>>> You could see it that way, but I would describe savepoints more as >>>> user-defined *restart* points than *recovery* points. Please take a look at >>>> my answers in this thread, because they cover most of your question: >>>> >>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nab >>>> ble.com/difference-between-checkpoints-amp-savepoints-td14787.html . >>>> >>>> >>>> 1. While cancelling an application with -s option, it specifies the >>>> savepoint location. Is there a way during application startup to >>>> identify >>>> the last know savepoint from a folder by itself, and restart from there. >>>> Since I am saving my savepoints on s3, I want to avoid issues arising >>>> from >>>> *ls* command on s3 due to read-after-write consistency of s3. >>>> >>>> I don’t think that this feature exists, you have to specify the >>>> savepoint. >>>> >>>> >>>> 1. Suppose my application has a checkpoint at point t1, and say i >>>> cancel this application sometime in future before the next available >>>> checkpoint( say t1+x). If I start the application without specifying the >>>> savepoint, it will start from the last known checkpoint(at t1), which >>>> wont >>>> have the application state saved, since I had cancelled the application. >>>> Would this is a correct assumption? >>>> >>>> If you restart a canceled application it will not consider checkpoints. >>>> They are only considered in recovery on failure. You need to specify a >>>> savepoint or externalized checkpoint for restarts to make explicit that you >>>> intend to restart a job, and not to run a new instance of the job. >>>> >>>> >>>> 1. Would using ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION >>>> be same as manually saving regular savepoints? >>>> >>>> Not the same, because checkpoints and savepoints are different in >>>> certain aspects, but both methods leave you with something that survives >>>> job cancelation and can be used to restart from a certain state. >>>> >>>> Best, >>>> Stefan >>>> >>>> >>> >>> >>> -- >>> Thanks, >>> Vipul >>> >>> >>> >> >> >> -- >> Thanks, >> Vipul >> > > -- Thanks, Vipul