Hello guys.

We run a stand-alone cluster that runs a single job (if you are familiar with 
the way Ververica Platform runs Flink jobs, we use a very similar approach). It 
runs Flink 1.11.1 straight from the official docker image.

Usually, when our jobs crash for any reason, they will resume from the latest 
checkpoint. This is the expected behavior and has been working fine for years.

But we encountered an issue with a job that crashed apparently because it lost 
connectivity with Zookeeper.

The logs for this job can be found here: https://pastebin.com/raw/uH9KDU2L (I 
redacted boring or private stuff and annotated the relevant parts).

>From what I can tell, this line was called:

```
// This is the general shutdown path. If a separate more specific shutdown was
// already triggered, this will do nothing
shutDownAsync(
        applicationStatus,
        null,
        true);
```
https://github.com/apache/flink/blob/6b9cdd41743edd24a929074d62a57b84e7b2dd97/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java#L243-L246

which seems pretty dangerous because it ends up calling

HighAvailabilityServices.closeAndCleanupAllData()
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java#L225-L239

To me this looks like a dangerous default... why would we want to delete the 
checkpoint metadata ever unless when explicitly canceling/stopping the job?

I think that if/else branch means something like: if the job crashed (i.e. 
`throwable != null`), then DO NOT wipe out the state. Otherwise, delete it. But 
in this case... it seems like `throwable` was indeed null, which caused the job 
to delete the checkpoint data before dying.

At this point, I'm just guessing really... I don't really know if this is what 
happened in this case. Hopefully someone with more kwoledge of how this works 
give us a hand.

Thanks.

Reply via email to