Hi, That distinction with externalised checkpoints is a bit of a pitfall and I'm hoping that we can actually get rid of that distinction in the next version or the version after that. With that change, all checkpoints would always be externalised, since it's not really any noticeable overhead.
Regarding read-after-write consistency, you should be fine since an the "externalised checkpoint", i.e. the metadata, is only one file. If you know the file-path (either from the Flink dashboard or by looking at the S3 bucket) you can restore from it. Best, Aljoscha > On 24. Oct 2017, at 08:22, vipul singh <neoea...@gmail.com> wrote: > > Thanks Tony, that was the issue. I was thinking that when we use Rocksdb and > provide an s3 path, it uses externalized checkpoints by default. Thanks so > much! > > I have one followup question. Say in above case, I terminate the cluster, and > since the metadata is on s3, and not on local storage, does flink avoid read > after write consistency of s3? Would it be a valid concern, or we handle that > case in externalized checkpoints as well, and dont deal with file system > operations while dealing with retrieving externalized checkpoints on s3. > > Thanks, > Vipul > > > > On Mon, Oct 23, 2017 at 11:00 PM, Tony Wei <tony19920...@gmail.com > <mailto:tony19920...@gmail.com>> wrote: > Hi, > > Did you enable externalized checkpoints? [1] > > Best, > Tony Wei > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/checkpoints.html#externalized-checkpoints > > <https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/checkpoints.html#externalized-checkpoints> > > 2017-10-24 13:07 GMT+08:00 vipul singh <neoea...@gmail.com > <mailto:neoea...@gmail.com>>: > Thanks Aljoscha for the answer above. > > I am experimenting with savepoints and checkpoints on my end, so that we > built fault tolerant application with exactly once semantics. > > I have been able to test various scenarios, but have doubts about one use > case. > > My app is running on an emr cluster, and I am trying to test the case when a > emr cluster is terminated. I have read that state.checkpoints.dir is > responsible for storing metadata information, and links to data files in > state.backend.fs.checkpointdir. > > For my application I have configured both > state.backend.fs.checkpointdir and state.checkpoints.dir > > Also I have the following in my main app: > env.enableCheckpointing(CHECKPOINT_TIME_MS) > > val CHECKPOINT_LOCATION = > s"s3://${config.s3Bucket}/${config.s3BasePath}/${config.s3ExtensionPath}/checkpoints/rocksdb" > > val backend:RocksDBStateBackend = > new RocksDBStateBackend(CHECKPOINT_LOCATION) > > env.setStateBackend(backend) > env.getCheckpointConfig.setMinPauseBetweenCheckpoints(CHECKPOINT_MIN_PAUSE) > env.getCheckpointConfig.setCheckpointTimeout(CHECKPOINT_TIMEOUT_MS) > env.getCheckpointConfig.setMaxConcurrentCheckpoints(CHECKPOINT_MAX_CONCURRENT) > > In the application startup logs I can see state.backend.fs.checkpointdir and > state.checkpoints.dir, values being loaded. However when the checkpoint > happens I dont see any content in the metadata dir. Is there something I am > missing? Please let me know. I am using flink version 1.3 > > Thanks, > Vipul > > > > On Tue, Oct 10, 2017 at 7:55 AM, Aljoscha Krettek <aljos...@apache.org > <mailto:aljos...@apache.org>> wrote: > Hi, > > Flink does not rely on file system operations to list contents, all necessary > file paths are stored in the meta data file, as you guessed. This is the > reason savepoints also work with file systems that "only" have > read-after-write consistency. > > Best, > Aljoscha > > >> On 10. Oct 2017, at 03:01, vipul singh <neoea...@gmail.com >> <mailto:neoea...@gmail.com>> wrote: >> >> Thanks Stefan for the answers above. These are really helpful. >> >> I have a few followup questions: >> I see my savepoints are created in a folder, which has a _metadata file and >> another file. Looking at the code >> <https://github.com/apache/flink/blob/6642768ad8f8c5d1856742a6d148f7724c20666c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java#L191> >> it seems like the metadata file contains tasks states, operator state and >> master states >> <https://github.com/apache/flink/blob/6642768ad8f8c5d1856742a6d148f7724c20666c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java#L87>. >> What is the purpose of the other file in the savepoint folder? My guess is >> it should be a checkpoint file? >> I am planning to use s3 as my state backend, so want to ensure that >> application restarts are not affected by read-after-write consistency of s3( >> if I use s3 as a savepoint backend). I am curious how flink restores data >> from the _metadata file, and the other file? Does the _metadata file contain >> path to these other files? or would it do a listing on the s3 folder? >> >> Please let me know, >> >> Thanks, >> Vipul >> >> On Tue, Sep 26, 2017 at 2:36 AM, Stefan Richter <s.rich...@data-artisans.com >> <mailto:s.rich...@data-artisans.com>> wrote: >> Hi, >> >> I have answered your questions inline: >>> It seems to me that checkpoints can be treated as flink internal recovery >>> mechanism, and savepoints act more as user-defined recovery points. Would >>> that be a correct assumption? >> >> You could see it that way, but I would describe savepoints more as >> user-defined *restart* points than *recovery* points. Please take a look at >> my answers in this thread, because they cover most of your question: >> >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/difference-between-checkpoints-amp-savepoints-td14787.html >> >> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/difference-between-checkpoints-amp-savepoints-td14787.html> >> . >> >>> While cancelling an application with -s option, it specifies the savepoint >>> location. Is there a way during application startup to identify the last >>> know savepoint from a folder by itself, and restart from there. Since I am >>> saving my savepoints on s3, I want to avoid issues arising from ls command >>> on s3 due to read-after-write consistency of s3. >> >> I don’t think that this feature exists, you have to specify the savepoint. >> >>> Suppose my application has a checkpoint at point t1, and say i cancel this >>> application sometime in future before the next available checkpoint( say >>> t1+x). If I start the application without specifying the savepoint, it will >>> start from the last known checkpoint(at t1), which wont have the >>> application state saved, since I had cancelled the application. Would this >>> is a correct assumption? >> >> If you restart a canceled application it will not consider checkpoints. They >> are only considered in recovery on failure. You need to specify a savepoint >> or externalized checkpoint for restarts to make explicit that you intend to >> restart a job, and not to run a new instance of the job. >> >>> Would using ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION be same as >>> manually saving regular savepoints? >> Not the same, because checkpoints and savepoints are different in certain >> aspects, but both methods leave you with something that survives job >> cancelation and can be used to restart from a certain state. >> >> Best, >> Stefan >> >> >> >> >> -- >> Thanks, >> Vipul > > > > > -- > Thanks, > Vipul > > > > > -- > Thanks, > Vipul