Maybe it is a stupid question but in Flink 1.15 with the following configs enabled:
SHUTDOWN_ON_APPLICATION_FINISH = false SUBMIT_FAILED_JOB_ON_APPLICATION_ERROR = true I think jobmanager pod would not restart but simply go to a terminal failed state right? Gyula On Mon, Sep 26, 2022 at 12:31 PM Matthias Pohl <matthias.p...@aiven.io> wrote: > Thanks Evgeniy for reaching out to the community and Gyula for picking it > up. I haven't looked into the k8s operator in much detail, yet. So, help me > out if I miss something here. But I'm afraid that this is not something > that would be fixed by upgrading to 1.15. > The issue here is that we're recovering from an external checkpoint using > the same job ID (the default one used for any Flink cluster in Application > Mode) and the same cluster ID, if I understand correctly. Now, the job is > failing during initialization. Currently, this causes a global cleanup [1]. > All HA data including the checkpoints are going to be deleted. I created > FLINK-29415 [2] to cover this. > > I'm wondering whether we could work around this problem by specifying a > random job ID through PipelineOptionsInternal [3] in the Kubernetes > Operator. But I haven't looked into all the consequences around that. And > it feels wrong to make this configuration parameter publicly usable. > > Another option might be to use ExecutionMode.RECOVERY in case of an > initialization failure when recovering from an external Checkpoint in > Application Mode (like we do it for internal recovery already). > > I'm looking forward to your opinion. > Matthias > > [1] > https://github.com/apache/flink/blob/41ac1ba13679121f1ddf14b26a36f4f4a3cc73e4/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L663 > [2] https://issues.apache.org/jira/browse/FLINK-29415 > [3] > https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-core/src/main/java/org/apache/flink/configuration/PipelineOptionsInternal.java#L29 > > On Tue, Sep 20, 2022 at 3:45 PM Gyula Fóra <gyula.f...@gmail.com> wrote: > >> I see I think we have seen this issue with others before, in Flink 1.15 >> it is solved by the newly introduced JobResultStore. The operator also >> configures that automatically for 1.15 to avoid this. >> >> Gyula >> >> On Tue, Sep 20, 2022 at 3:27 PM Evgeniy Lyutikov <eblyuti...@avito.ru> >> wrote: >> >>> Thanks for the answer. >>> I think this is not about the operator issue, kubernetes deployment just >>> restarts the fallen pod, restarted jobmanager without HA metadata >>> starts the job itself from an empty state. >>> >>> I'm looking for a way to prevent it from exiting in case of an job error >>> (we use application mode cluster). >>> >>> >>> >>> ------------------------------ >>> *От:* Gyula Fóra <gyula.f...@gmail.com> >>> *Отправлено:* 20 сентября 2022 г. 19:49:37 >>> *Кому:* Evgeniy Lyutikov >>> *Копия:* user@flink.apache.org >>> *Тема:* Re: JobManager restarts on job failure >>> >>> The best thing for you to do would be to upgrade to Flink 1.15 and the >>> latest operator version. >>> In Flink 1.15 we have the option to interact with the Flink jobmanager >>> even after the job FAILED and the operator leverages this for a much more >>> robust behaviour. >>> >>> In any case the operator should not ever start the job from an empty >>> state (even if it FAILED), if you think that's happening could you please >>> open a JIRA ticket with the accompanying JM and Operator logs? >>> >>> Thanks >>> Gyula >>> >>> On Tue, Sep 20, 2022 at 1:00 PM Evgeniy Lyutikov <eblyuti...@avito.ru> >>> wrote: >>> >>>> Hi, >>>> We using flink 1.14.4 with flink kubernetes operator. >>>> >>>> Sometimes when updating a job, it fails on startup and flink removes >>>> all HA metadata and exits the jobmanager. >>>> >>>> >>>> 2022-09-14 14:54:44,534 INFO >>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Restoring >>>> job 00000000000000000000000000000000 from Checkpoint 30829 @ 1663167158684 >>>> for 00000000000000000000000000000000 located at >>>> s3p://flink-checkpoints/k8s-checkpoint-job-name/00000000000000000000000000000000/chk-30829. >>>> 2022-09-14 14:54:44,638 INFO >>>> org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Job >>>> 00000000000000000000000000000000 reached terminal state FAILED. >>>> org.apache.flink.runtime.client.JobInitializationException: Could not >>>> start the JobMaster. >>>> Caused by: java.util.concurrent.CompletionException: >>>> java.lang.IllegalStateException: There is no operator for the state >>>> 4e1d9dde287c33a35e7970cbe64a40fe >>>> 2022-09-14 14:54:44,930 ERROR >>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Fatal >>>> error occurred in the cluster entrypoint. >>>> 2022-09-14 14:54:45,020 INFO >>>> org.apache.flink.kubernetes.highavailability.KubernetesHaServices [] - >>>> Clean up the high availability data for job >>>> 00000000000000000000000000000000. >>>> 2022-09-14 14:54:45,020 INFO >>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Shutting >>>> KubernetesApplicationClusterEntrypoint down with application status >>>> UNKNOWN. Diagnostics Cluster entrypoint has been closed externally.. >>>> 2022-09-14 14:54:45,026 INFO >>>> org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shutting >>>> down rest endpoint. >>>> 2022-09-14 14:54:46,122 INFO >>>> akka.remote.RemoteActorRefProvider$RemotingTerminator [] - Shutting >>>> down remote daemon. >>>> 2022-09-14 14:54:46,321 INFO >>>> akka.remote.RemoteActorRefProvider$RemotingTerminator [] - Remoting >>>> shut down. >>>> >>>> Kubernetes restarts the pod jobmanager and the new instance, not >>>> finding the HA metadata, starts the job from an empty state. >>>> Is there some option to prevent jobmanager from exiting after an job FAILED >>>> state? >>>> >>>> >>>> * ------------------------------ *“This message contains confidential >>>> information/commercial secret. If you are not the intended addressee of >>>> this message you may not copy, save, print or forward it to any third party >>>> and you are kindly requested to destroy this message and notify the sender >>>> thereof by email. >>>> Данное сообщение содержит конфиденциальную информацию/информацию, >>>> являющуюся коммерческой тайной. Если Вы не являетесь надлежащим адресатом >>>> данного сообщения, Вы не вправе копировать, сохранять, печатать или >>>> пересылать его каким либо иным лицам. Просьба уничтожить данное сообщение и >>>> уведомить об этом отправителя электронным письмом.” >>>> >>>