Hi Cristian,
>From  this code , we could see that the Exception or Error was ignored in
dispatcher.shutDownCluster(applicationStatus) .

``
org.apache.flink.runtime.dispatcher.DispatcherGateway#shutDownCluster

return applicationCompletionFuture
                .handle((r, t) -> {
                        final ApplicationStatus applicationStatus;
                        if (t != null) {

                                final Optional<JobCancellationException> 
cancellationException =
                                                ExceptionUtils.findThrowable(t, 
JobCancellationException.class);

                                if (cancellationException.isPresent()) {
                                        // this means the Flink Job was 
cancelled
                                        applicationStatus = 
ApplicationStatus.CANCELED;
                                } else if (t instanceof CancellationException) {
                                        // this means that the future was 
cancelled
                                        applicationStatus = 
ApplicationStatus.UNKNOWN;
                                } else {
                                        applicationStatus = 
ApplicationStatus.FAILED;
                                }

                                LOG.warn("Application {}: ", applicationStatus, 
t);
                        } else {
                                applicationStatus = ApplicationStatus.SUCCEEDED;
                                LOG.info("Application completed SUCCESSFULLY");
                        }
                        return dispatcher.shutDownCluster(applicationStatus);
                })
                .thenCompose(Function.identity());

``


So when it come to  java.util.concurrent.CompletableFuture#whenComplete  ,
there is no throwable, only ApplicationStatus.FAILED , and data was cleaned
up.


``
        clusterComponent.getShutDownFuture().whenComplete(
                (ApplicationStatus applicationStatus, Throwable throwable) -> {
                        if (throwable != null) {
                                shutDownAsync(
                                        ApplicationStatus.UNKNOWN,
                                        
ExceptionUtils.stringifyException(throwable),
                                        false);
                        } else {
                                // This is the general shutdown path. If a 
separate more specific
shutdown was
                                // already triggered, this will do nothing
                                shutDownAsync(
                                        applicationStatus,
                                        null,
                                        true);
                        }
                });
}

``

If you want to save your checkPoint,you could  refer to this document:

https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/checkpoints.html

In another way,you could change the code in 
 org.apache.flink.runtime.dispatcher.DispatcherGateway#shutDownCluster,when
it came to faied,save the data.

In fact, I'm wondering why it ignore the Throwable,default to delete Ha Data
in any solution. Is there anyone could help me to solve this question?

Best,
Husky Zeng





-----
Chinese,NanJing , Huawei.
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to