Hi Peter,

this sounds very strange. I just tried to reproduce the issue locally but
for me it worked without a problem. Could you maybe share the jobmanager
logs on DEBUG log level with us?

As a side note, enabling the asynchronous checkpointing mode for the
FsStateBackend does not have an effect on the RocksDBStateBackend. You
should rather call `new RocksDBStateBackend(new
FsStateBackend(stateStoreLocation), true)` if you want to enable
asynchronous checkpointing.

Cheers,
Till

On Fri, Jun 15, 2018 at 9:57 AM Peter Zende <peter.ze...@gmail.com> wrote:

> Hi Stefan,
>
> Thanks for the answer.
> Fixing the uids solved the problem, that's not an issue anymore.
> The savepoint directory is there, but the RocksDB state is not restored
> after restarting the application because
> that state directory has been removed when I stopped the application. It
> looks like that the savepoints itself don't
> contain the rocksdb state files. What we have is:
> env.setStateBackend(new RocksDBStateBackend(new
> FsStateBackend(stateStoreLocation, true)))
> -> this location got emptied.
> In the meanwhile we switched to cancelWithSavepoint which doesn't have
> this behavior and it works fine however the YARN application status
> results in FAILED instead of SUCCEED what we had in case of stopping.
>
> What are the uses cases of stopping? We implemented it because we wanted
> to ensure that the application shuts down correctly and
> we don't end in incosistent/broken state..
>
> Thanks,
> Peter
>
>
> 2018-06-11 11:31 GMT+02:00 Stefan Richter <s.rich...@data-artisans.com>:
>
>> Hi,
>>
>> > Am 08.06.2018 um 01:16 schrieb Peter Zende <peter.ze...@gmail.com>:
>> >
>> > Hi all,
>> >
>> > We have a streaming pipeline (Flink 1.4.2) for which we implemented
>> stoppable sources to be able to  gracefully exit from the job with Yarn
>> state "finished/succeeded".
>> > This works fine, however after creating a savepoint, stopping the job
>> (stop event) and restarting it we remarked that the RocksDB state hasn't
>> been recovered. It looks like that it's because the state directory on HDFS
>> was emptied after issueing a stop event. This isn't the case when we cancel
>> the job, but we'd like to distinguish between job failures and stop events.
>> After reading some related tickets (e.g. FLINK-4201, FLINK-5007) it's still
>> not clear why this is the intended behavior.
>> > Should we use cancel instead?
>>
>> Savepoints should _not_ be cleaned up in case of stop or cancellation,
>> checkpoints should be cleaned up. Where are you storing the created
>> savepoints? They should not go into the checkpoint directory. Stop is
>> intended to be a more „graceful“ variant of cancel, but I think it is
>> rarely used with Flink. I would prefer cancel except if you really require
>> to use stoppable for some particular reason.
>>
>> > When we backup the local state directory, stop the job, copy back the
>> directory and start a new job from the savepoint then it works fine.
>> > Another issue is that when we restart the job with different source
>> (1st job: HDFS and Kafka, 2nd job: Kafka), each having uids set, the
>> recovery from savepoint doesn't fail but the local state isn't restored. Is
>> there any trick besides setting allowNonRestoredState?
>>
>>
>> I need to clarify here, when you say „each having uids set“, do you set
>> the same uids for both types of sources? The uid must match, because Flink
>> will reassign the state in a restore based on the uids, i.e. state x goes
>> to the operator with the same uid as the uid of the operator that created
>> it in the previous job. The flag allowNonRestoredState has the purpose to
>> tolerate that some state from a checkpoint/savepoint does not find a
>> matching operator to which it should be assigned (no operator with matching
>> uid exists in the jobgraph). For example, you want this if you removed
>> operators from the job.
>>
>> Best,
>> Stefan
>>
>>
>

Reply via email to