Hi Peter, this sounds very strange. I just tried to reproduce the issue locally but for me it worked without a problem. Could you maybe share the jobmanager logs on DEBUG log level with us?
As a side note, enabling the asynchronous checkpointing mode for the FsStateBackend does not have an effect on the RocksDBStateBackend. You should rather call `new RocksDBStateBackend(new FsStateBackend(stateStoreLocation), true)` if you want to enable asynchronous checkpointing. Cheers, Till On Fri, Jun 15, 2018 at 9:57 AM Peter Zende <peter.ze...@gmail.com> wrote: > Hi Stefan, > > Thanks for the answer. > Fixing the uids solved the problem, that's not an issue anymore. > The savepoint directory is there, but the RocksDB state is not restored > after restarting the application because > that state directory has been removed when I stopped the application. It > looks like that the savepoints itself don't > contain the rocksdb state files. What we have is: > env.setStateBackend(new RocksDBStateBackend(new > FsStateBackend(stateStoreLocation, true))) > -> this location got emptied. > In the meanwhile we switched to cancelWithSavepoint which doesn't have > this behavior and it works fine however the YARN application status > results in FAILED instead of SUCCEED what we had in case of stopping. > > What are the uses cases of stopping? We implemented it because we wanted > to ensure that the application shuts down correctly and > we don't end in incosistent/broken state.. > > Thanks, > Peter > > > 2018-06-11 11:31 GMT+02:00 Stefan Richter <s.rich...@data-artisans.com>: > >> Hi, >> >> > Am 08.06.2018 um 01:16 schrieb Peter Zende <peter.ze...@gmail.com>: >> > >> > Hi all, >> > >> > We have a streaming pipeline (Flink 1.4.2) for which we implemented >> stoppable sources to be able to gracefully exit from the job with Yarn >> state "finished/succeeded". >> > This works fine, however after creating a savepoint, stopping the job >> (stop event) and restarting it we remarked that the RocksDB state hasn't >> been recovered. It looks like that it's because the state directory on HDFS >> was emptied after issueing a stop event. This isn't the case when we cancel >> the job, but we'd like to distinguish between job failures and stop events. >> After reading some related tickets (e.g. FLINK-4201, FLINK-5007) it's still >> not clear why this is the intended behavior. >> > Should we use cancel instead? >> >> Savepoints should _not_ be cleaned up in case of stop or cancellation, >> checkpoints should be cleaned up. Where are you storing the created >> savepoints? They should not go into the checkpoint directory. Stop is >> intended to be a more „graceful“ variant of cancel, but I think it is >> rarely used with Flink. I would prefer cancel except if you really require >> to use stoppable for some particular reason. >> >> > When we backup the local state directory, stop the job, copy back the >> directory and start a new job from the savepoint then it works fine. >> > Another issue is that when we restart the job with different source >> (1st job: HDFS and Kafka, 2nd job: Kafka), each having uids set, the >> recovery from savepoint doesn't fail but the local state isn't restored. Is >> there any trick besides setting allowNonRestoredState? >> >> >> I need to clarify here, when you say „each having uids set“, do you set >> the same uids for both types of sources? The uid must match, because Flink >> will reassign the state in a restore based on the uids, i.e. state x goes >> to the operator with the same uid as the uid of the operator that created >> it in the previous job. The flag allowNonRestoredState has the purpose to >> tolerate that some state from a checkpoint/savepoint does not find a >> matching operator to which it should be assigned (no operator with matching >> uid exists in the jobgraph). For example, you want this if you removed >> operators from the job. >> >> Best, >> Stefan >> >> >