Hi Stefan,

Thanks for the answer.
Fixing the uids solved the problem, that's not an issue anymore.
The savepoint directory is there, but the RocksDB state is not restored
after restarting the application because
that state directory has been removed when I stopped the application. It
looks like that the savepoints itself don't
contain the rocksdb state files. What we have is:
env.setStateBackend(new RocksDBStateBackend(new
FsStateBackend(stateStoreLocation, true)))
-> this location got emptied.
In the meanwhile we switched to cancelWithSavepoint which doesn't have this
behavior and it works fine however the YARN application status
results in FAILED instead of SUCCEED what we had in case of stopping.

What are the uses cases of stopping? We implemented it because we wanted to
ensure that the application shuts down correctly and
we don't end in incosistent/broken state..

Thanks,
Peter


2018-06-11 11:31 GMT+02:00 Stefan Richter <s.rich...@data-artisans.com>:

> Hi,
>
> > Am 08.06.2018 um 01:16 schrieb Peter Zende <peter.ze...@gmail.com>:
> >
> > Hi all,
> >
> > We have a streaming pipeline (Flink 1.4.2) for which we implemented
> stoppable sources to be able to  gracefully exit from the job with Yarn
> state "finished/succeeded".
> > This works fine, however after creating a savepoint, stopping the job
> (stop event) and restarting it we remarked that the RocksDB state hasn't
> been recovered. It looks like that it's because the state directory on HDFS
> was emptied after issueing a stop event. This isn't the case when we cancel
> the job, but we'd like to distinguish between job failures and stop events.
> After reading some related tickets (e.g. FLINK-4201, FLINK-5007) it's still
> not clear why this is the intended behavior.
> > Should we use cancel instead?
>
> Savepoints should _not_ be cleaned up in case of stop or cancellation,
> checkpoints should be cleaned up. Where are you storing the created
> savepoints? They should not go into the checkpoint directory. Stop is
> intended to be a more „graceful“ variant of cancel, but I think it is
> rarely used with Flink. I would prefer cancel except if you really require
> to use stoppable for some particular reason.
>
> > When we backup the local state directory, stop the job, copy back the
> directory and start a new job from the savepoint then it works fine.
> > Another issue is that when we restart the job with different source (1st
> job: HDFS and Kafka, 2nd job: Kafka), each having uids set, the recovery
> from savepoint doesn't fail but the local state isn't restored. Is there
> any trick besides setting allowNonRestoredState?
>
>
> I need to clarify here, when you say „each having uids set“, do you set
> the same uids for both types of sources? The uid must match, because Flink
> will reassign the state in a restore based on the uids, i.e. state x goes
> to the operator with the same uid as the uid of the operator that created
> it in the previous job. The flag allowNonRestoredState has the purpose to
> tolerate that some state from a checkpoint/savepoint does not find a
> matching operator to which it should be assigned (no operator with matching
> uid exists in the jobgraph). For example, you want this if you removed
> operators from the job.
>
> Best,
> Stefan
>
>

Reply via email to