Maybe it is a stupid question but in Flink 1.15 with the following configs
enabled:

SHUTDOWN_ON_APPLICATION_FINISH = false
SUBMIT_FAILED_JOB_ON_APPLICATION_ERROR = true

I think jobmanager pod would not restart but simply go to a terminal failed
state right?

Gyula

On Mon, Sep 26, 2022 at 12:31 PM Matthias Pohl <matthias.p...@aiven.io>
wrote:

> Thanks Evgeniy for reaching out to the community and Gyula for picking it
> up. I haven't looked into the k8s operator in much detail, yet. So, help me
> out if I miss something here. But I'm afraid that this is not something
> that would be fixed by upgrading to 1.15.
> The issue here is that we're recovering from an external checkpoint using
> the same job ID (the default one used for any Flink cluster in Application
> Mode) and the same cluster ID, if I understand correctly. Now, the job is
> failing during initialization. Currently, this causes a global cleanup [1].
> All HA data including the checkpoints are going to be deleted. I created
> FLINK-29415 [2] to cover this.
>
> I'm wondering whether we could work around this problem by specifying a
> random job ID through PipelineOptionsInternal [3] in the Kubernetes
> Operator. But I haven't looked into all the consequences around that. And
> it feels wrong to make this configuration parameter publicly usable.
>
> Another option might be to use ExecutionMode.RECOVERY in case of an
> initialization failure when recovering from an external Checkpoint in
> Application Mode (like we do it for internal recovery already).
>
> I'm looking forward to your opinion.
> Matthias
>
> [1]
> https://github.com/apache/flink/blob/41ac1ba13679121f1ddf14b26a36f4f4a3cc73e4/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L663
> [2] https://issues.apache.org/jira/browse/FLINK-29415
> [3]
> https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-core/src/main/java/org/apache/flink/configuration/PipelineOptionsInternal.java#L29
>
> On Tue, Sep 20, 2022 at 3:45 PM Gyula Fóra <gyula.f...@gmail.com> wrote:
>
>> I see I think we have seen this issue with others before, in Flink 1.15
>> it is solved by the newly introduced JobResultStore. The operator also
>> configures that automatically for 1.15 to avoid this.
>>
>> Gyula
>>
>> On Tue, Sep 20, 2022 at 3:27 PM Evgeniy Lyutikov <eblyuti...@avito.ru>
>> wrote:
>>
>>> Thanks for the answer.
>>> I think this is not about the operator issue, kubernetes deployment just
>>> restarts the fallen pod, restarted jobmanager without HA metadata
>>> starts the job itself from an empty state.
>>>
>>> I'm looking for a way to prevent it from exiting in case of an job error
>>> (we use application mode cluster).
>>>
>>>
>>>
>>> ------------------------------
>>> *От:* Gyula Fóra <gyula.f...@gmail.com>
>>> *Отправлено:* 20 сентября 2022 г. 19:49:37
>>> *Кому:* Evgeniy Lyutikov
>>> *Копия:* user@flink.apache.org
>>> *Тема:* Re: JobManager restarts on job failure
>>>
>>> The best thing for you to do would be to upgrade to Flink 1.15 and the
>>> latest operator version.
>>> In Flink 1.15 we have the option to interact with the Flink jobmanager
>>> even after the job FAILED and the operator leverages this for a much more
>>> robust behaviour.
>>>
>>> In any case the operator should not ever start the job from an empty
>>> state (even if it FAILED), if you think that's happening could you please
>>> open a JIRA ticket with the accompanying JM and Operator logs?
>>>
>>> Thanks
>>> Gyula
>>>
>>> On Tue, Sep 20, 2022 at 1:00 PM Evgeniy Lyutikov <eblyuti...@avito.ru>
>>> wrote:
>>>
>>>> Hi,
>>>> We using flink 1.14.4 with flink kubernetes operator.
>>>>
>>>> Sometimes when updating a job, it fails on startup and flink removes
>>>> all HA metadata and exits the jobmanager.
>>>>
>>>>
>>>> 2022-09-14 14:54:44,534 INFO
>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Restoring
>>>> job 00000000000000000000000000000000 from Checkpoint 30829 @ 1663167158684
>>>> for 00000000000000000000000000000000 located at
>>>> s3p://flink-checkpoints/k8s-checkpoint-job-name/00000000000000000000000000000000/chk-30829.
>>>> 2022-09-14 14:54:44,638 INFO
>>>> org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] - Job
>>>> 00000000000000000000000000000000 reached terminal state FAILED.
>>>> org.apache.flink.runtime.client.JobInitializationException: Could not
>>>> start the JobMaster.
>>>> Caused by: java.util.concurrent.CompletionException:
>>>> java.lang.IllegalStateException: There is no operator for the state
>>>> 4e1d9dde287c33a35e7970cbe64a40fe
>>>> 2022-09-14 14:54:44,930 ERROR
>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Fatal
>>>> error occurred in the cluster entrypoint.
>>>> 2022-09-14 14:54:45,020 INFO
>>>> org.apache.flink.kubernetes.highavailability.KubernetesHaServices [] -
>>>> Clean up the high availability data for job
>>>> 00000000000000000000000000000000.
>>>> 2022-09-14 14:54:45,020 INFO
>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Shutting
>>>> KubernetesApplicationClusterEntrypoint down with application status
>>>> UNKNOWN. Diagnostics Cluster entrypoint has been closed externally..
>>>> 2022-09-14 14:54:45,026 INFO
>>>> org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shutting
>>>> down rest endpoint.
>>>> 2022-09-14 14:54:46,122 INFO
>>>> akka.remote.RemoteActorRefProvider$RemotingTerminator        [] - Shutting
>>>> down remote daemon.
>>>> 2022-09-14 14:54:46,321 INFO
>>>> akka.remote.RemoteActorRefProvider$RemotingTerminator        [] - Remoting
>>>> shut down.
>>>>
>>>> Kubernetes restarts the pod jobmanager and the new instance, not
>>>> finding the HA metadata, starts the job from an empty state.
>>>> Is there some option to prevent jobmanager from exiting after an job FAILED
>>>> state?
>>>>
>>>>
>>>> * ------------------------------ *“This message contains confidential
>>>> information/commercial secret. If you are not the intended addressee of
>>>> this message you may not copy, save, print or forward it to any third party
>>>> and you are kindly requested to destroy this message and notify the sender
>>>> thereof by email.
>>>> Данное сообщение содержит конфиденциальную информацию/информацию,
>>>> являющуюся коммерческой тайной. Если Вы не являетесь надлежащим адресатом
>>>> данного сообщения, Вы не вправе копировать, сохранять, печатать или
>>>> пересылать его каким либо иным лицам. Просьба уничтожить данное сообщение и
>>>> уведомить об этом отправителя электронным письмом.”
>>>>
>>>

Reply via email to