> If you want to save your checkPoint,you could  refer to this document

What do you mean? We already persist our savepoints, and we do not delete them 
explicitly ever. 
The problem is that Flink deleted the data from zookeeper when it shouldn't 
have. Is it possible to start a job from a checkpoint using - - fromSavepoint? 


On Sat, Sep 5, 2020, at 2:05 AM, Husky Zeng wrote:
> 
> Hi Cristian,
> 
> From  this code , we could see that the Exception or Error was ignored in
> dispatcher.shutDownCluster(applicationStatus) .
> 
> ``
> org.apache.flink.runtime.dispatcher.DispatcherGateway#shutDownCluster
> 
> return applicationCompletionFuture
>               .handle((r, t) -> {
>                       final ApplicationStatus applicationStatus;
>                       if (t != null) {
> 
>                               final Optional<JobCancellationException> 
> cancellationException =
>                                               ExceptionUtils.findThrowable(t, 
> JobCancellationException.class);
> 
>                               if (cancellationException.isPresent()) {
>                                       // this means the Flink Job was 
> cancelled
>                                       applicationStatus = 
> ApplicationStatus.CANCELED;
>                               } else if (t instanceof CancellationException) {
>                                       // this means that the future was 
> cancelled
>                                       applicationStatus = 
> ApplicationStatus.UNKNOWN;
>                               } else {
>                                       applicationStatus = 
> ApplicationStatus.FAILED;
>                               }
> 
>                               LOG.warn("Application {}: ", applicationStatus, 
> t);
>                       } else {
>                               applicationStatus = ApplicationStatus.SUCCEEDED;
>                               LOG.info("Application completed SUCCESSFULLY");
>                       }
>                       return dispatcher.shutDownCluster(applicationStatus);
>               })
>               .thenCompose(Function.identity());
> 
> ``
> 
> 
> So when it come to  java.util.concurrent.CompletableFuture#whenComplete  ,
> there is no throwable, only ApplicationStatus.FAILED , and data was cleaned
> up.
> 
> 
> ``
>       clusterComponent.getShutDownFuture().whenComplete(
>               (ApplicationStatus applicationStatus, Throwable throwable) -> {
>                       if (throwable != null) {
>                               shutDownAsync(
>                                       ApplicationStatus.UNKNOWN,
>                                       
> ExceptionUtils.stringifyException(throwable),
>                                       false);
>                       } else {
>                               // This is the general shutdown path. If a 
> separate more specific
> shutdown was
>                               // already triggered, this will do nothing
>                               shutDownAsync(
>                                       applicationStatus,
>                                       null,
>                                       true);
>                       }
>               });
> }
> 
> ``
> 
> If you want to save your checkPoint,you could  refer to this document:
> 
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/checkpoints.html
> 
> In another way,you could change the code in 
>  org.apache.flink.runtime.dispatcher.DispatcherGateway#shutDownCluster,when
> it came to faied,save the data.
> 
> In fact, I'm wondering why it ignore the Throwable,default to delete Ha Data
> in any solution. Is there anyone could help me to solve this question?
> 
> Best,
> Husky Zeng
> 
> 
> 
> 
> 
> -----
> Chinese,NanJing , Huawei.
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>

Reply via email to