Thanks Evgeniy for reaching out to the community and Gyula for picking it
up. I haven't looked into the k8s operator in much detail, yet. So, help me
out if I miss something here. But I'm afraid that this is not something
that would be fixed by upgrading to 1.15.
The issue here is that we're recovering from an external checkpoint using
the same job ID (the default one used for any Flink cluster in Application
Mode) and the same cluster ID, if I understand correctly. Now, the job is
failing during initialization. Currently, this causes a global cleanup [1].
All HA data including the checkpoints are going to be deleted. I created
FLINK-29415 [2] to cover this.

I'm wondering whether we could work around this problem by specifying a
random job ID through PipelineOptionsInternal [3] in the Kubernetes
Operator. But I haven't looked into all the consequences around that. And
it feels wrong to make this configuration parameter publicly usable.

Another option might be to use ExecutionMode.RECOVERY in case of an
initialization failure when recovering from an external Checkpoint in
Application Mode (like we do it for internal recovery already).

I'm looking forward to your opinion.
Matthias

[1]
https://github.com/apache/flink/blob/41ac1ba13679121f1ddf14b26a36f4f4a3cc73e4/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L663
[2] https://issues.apache.org/jira/browse/FLINK-29415
[3]
https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-core/src/main/java/org/apache/flink/configuration/PipelineOptionsInternal.java#L29

On Tue, Sep 20, 2022 at 3:45 PM Gyula Fóra <gyula.f...@gmail.com> wrote:

> I see I think we have seen this issue with others before, in Flink 1.15 it
> is solved by the newly introduced JobResultStore. The operator also
> configures that automatically for 1.15 to avoid this.
>
> Gyula
>
> On Tue, Sep 20, 2022 at 3:27 PM Evgeniy Lyutikov <eblyuti...@avito.ru>
> wrote:
>
>> Thanks for the answer.
>> I think this is not about the operator issue, kubernetes deployment just
>> restarts the fallen pod, restarted jobmanager without HA metadata starts
>> the job itself from an empty state.
>>
>> I'm looking for a way to prevent it from exiting in case of an job error
>> (we use application mode cluster).
>>
>>
>>
>> ------------------------------
>> *От:* Gyula Fóra <gyula.f...@gmail.com>
>> *Отправлено:* 20 сентября 2022 г. 19:49:37
>> *Кому:* Evgeniy Lyutikov
>> *Копия:* user@flink.apache.org
>> *Тема:* Re: JobManager restarts on job failure
>>
>> The best thing for you to do would be to upgrade to Flink 1.15 and the
>> latest operator version.
>> In Flink 1.15 we have the option to interact with the Flink jobmanager
>> even after the job FAILED and the operator leverages this for a much more
>> robust behaviour.
>>
>> In any case the operator should not ever start the job from an empty
>> state (even if it FAILED), if you think that's happening could you please
>> open a JIRA ticket with the accompanying JM and Operator logs?
>>
>> Thanks
>> Gyula
>>
>> On Tue, Sep 20, 2022 at 1:00 PM Evgeniy Lyutikov <eblyuti...@avito.ru>
>> wrote:
>>
>>> Hi,
>>> We using flink 1.14.4 with flink kubernetes operator.
>>>
>>> Sometimes when updating a job, it fails on startup and flink removes all
>>> HA metadata and exits the jobmanager.
>>>
>>>
>>> 2022-09-14 14:54:44,534 INFO
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Restoring
>>> job 00000000000000000000000000000000 from Checkpoint 30829 @ 1663167158684
>>> for 00000000000000000000000000000000 located at
>>> s3p://flink-checkpoints/k8s-checkpoint-job-name/00000000000000000000000000000000/chk-30829.
>>> 2022-09-14 14:54:44,638 INFO
>>> org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] - Job
>>> 00000000000000000000000000000000 reached terminal state FAILED.
>>> org.apache.flink.runtime.client.JobInitializationException: Could not
>>> start the JobMaster.
>>> Caused by: java.util.concurrent.CompletionException:
>>> java.lang.IllegalStateException: There is no operator for the state
>>> 4e1d9dde287c33a35e7970cbe64a40fe
>>> 2022-09-14 14:54:44,930 ERROR
>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Fatal
>>> error occurred in the cluster entrypoint.
>>> 2022-09-14 14:54:45,020 INFO
>>> org.apache.flink.kubernetes.highavailability.KubernetesHaServices [] -
>>> Clean up the high availability data for job
>>> 00000000000000000000000000000000.
>>> 2022-09-14 14:54:45,020 INFO
>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Shutting
>>> KubernetesApplicationClusterEntrypoint down with application status
>>> UNKNOWN. Diagnostics Cluster entrypoint has been closed externally..
>>> 2022-09-14 14:54:45,026 INFO
>>> org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shutting
>>> down rest endpoint.
>>> 2022-09-14 14:54:46,122 INFO
>>> akka.remote.RemoteActorRefProvider$RemotingTerminator        [] - Shutting
>>> down remote daemon.
>>> 2022-09-14 14:54:46,321 INFO
>>> akka.remote.RemoteActorRefProvider$RemotingTerminator        [] - Remoting
>>> shut down.
>>>
>>> Kubernetes restarts the pod jobmanager and the new instance, not finding
>>> the HA metadata, starts the job from an empty state.
>>> Is there some option to prevent jobmanager from exiting after an job FAILED
>>> state?
>>>
>>>
>>> * ------------------------------ *“This message contains confidential
>>> information/commercial secret. If you are not the intended addressee of
>>> this message you may not copy, save, print or forward it to any third party
>>> and you are kindly requested to destroy this message and notify the sender
>>> thereof by email.
>>> Данное сообщение содержит конфиденциальную информацию/информацию,
>>> являющуюся коммерческой тайной. Если Вы не являетесь надлежащим адресатом
>>> данного сообщения, Вы не вправе копировать, сохранять, печатать или
>>> пересылать его каким либо иным лицам. Просьба уничтожить данное сообщение и
>>> уведомить об этом отправителя электронным письмом.”
>>>
>>

Reply via email to