Hi Arthur,

How you submit your job? Are you explicitly setting job id when submitting the job?
Have you also tried without HA to see the behavior?

Looks like the job is submitted with the same ID with the previous job, which the job result stored in HA does not let you submit it with the same job_id.

BR,
Naci

On 28. Aug 2024, at 17:32, Arthur Catrisse via user <user@flink.apache.org> wrote:


Hello,

We are running into issues when deploying flink on kubernetes using the flink-kubernetes-operator with a FlinkDeployment
Occasionally, when a JobManager gets rotated out (by karpenter in our case), the next JobManager is incapable of getting into a stable state and is stuck in a crash loop by a DuplicateJobSubmissionException
We did increase the terminationGracePeriodSeconds but it doesn't seem to help.
Is it expected that the operator isn't able to get jobmanagers back into a stable state ?  Do you have an idea if we're missing something ?

Here are our flink configurations :

kubernetes.jobmanager.replicas, 1 execution.submit-failed-job-on-application-error, true high-availability.cluster-id, my_name kubernetes.jobmanager.cpu.limit-factor, 10 pipeline.max-parallelism, 6 kubernetes.service-account, flink kubernetes.cluster-id, my_name high-availability.storageDir, s3://my_bucket/recovery taskmanager.memory.flink.size, 1024m parallelism.default, 1 kubernetes.namespace, flink fs.s3a.aws.credentials.provider, com.amazonaws.auth.DefaultAWSCredentialsProviderChain kubernetes.jobmanager.owner.reference, apiVersion:flink.apache.org/v1beta1,kind:FlinkDeployment,uid:a42c1e37-8f5e-4ec0-a04f-00000,name:my_name,controller:false,blockOwnerDeletion:true state.backend.type, rocksdb kubernetes.container.image.ref, 00000.dkr.ecr.eu-west-3.amazonaws.com/data/my_image_ref jobmanager.memory.flink.size, 1024m taskmanager.memory.process.size, 2048m kubernetes.internal.jobmanager.entrypoint.class, org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint pipeline.name, my_name execution.savepoint.path, s3://my_bucket/savepoints/savepoint-044d28-f5000c2e4bc0 state.backend.local-recovery, false state.backend.rocksdb.localdir, /opt/flink/state kubernetes.pod-template-file.taskmanager, /tmp/flink_op_generated_podTemplate_9407366845247969567.yaml state.backend.incremental, true web.cancel.enable, false execution.shutdown-on-application-finish, false job-result-store.delete-on-commit, false $internal.pipeline.job-id, 044d28b712536c1d1feed3475f2b8111 taskmanager.memory.managed.fraction, 0.6 $internal.flink.version, v1_19 execution.checkpointing.max-concurrent-checkpoints, 1 kubernetes.pod-template-file.jobmanager, /tmp/flink_op_generated_podTemplate_834737432685891333.yaml blob.server.port, 6102 kubernetes.jobmanager.annotations, flinkdeployment.flink.apache.org/generation:5 job-result-store.storage-path, s3://my_bucket/recovery/job-result-store/my_name/9cf5a2e7-89c6-40e7-94dd-c272a2007000 fs.allowed-fallback-filesystems, s3 high-availability.type, kubernetes state.savepoints.dir, s3://my_bucket/savepoints $internal.application.program-args, -pyclientexec;/usr/bin/python3;-py;/opt/flink/usrlib/my_name.py;--restoreMode;CLAIM taskmanager.numberOfTaskSlots, 2 kubernetes.rest-service.exposed.type, ClusterIP high-availability.jobmanager.port, 6101 process.working-dir, /tmp/workdir $internal.application.main, org.apache.flink.client.python.PythonDriver execution.target, kubernetes-application jobmanager.memory.process.size, 2048m taskmanager.rpc.port, 6100 internal.cluster.execution-mode, NORMAL kubernetes.jobmanager.tolerations, key:dedicated,operator:Equal,value:low-churn,effect:NoSchedule execution.checkpointing.externalized-checkpoint-retention, RETAIN_ON_CANCELLATION pipeline.jars, local:///opt/flink/opt/flink-python-1.19.0.jar state.checkpoints.dir, s3://my_bucket/checkpoints jobmanager.memory.off-heap.size, 134217728b jobmanager.memory.jvm-overhead.min, 805306368b jobmanager.memory.jvm-metaspace.size, 268435456b jobmanager.memory.heap.size, 939524096b jobmanager.memory.jvm-overhead.max, 805306368b
Example of logs when failing
Last pod before error :
INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested.
INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Shutting KubernetesApplicationClusterEntrypoint down with application status UNKNOWN. Diagnostics Cluster entrypoint has been closed externally..
(nothing else after, while other times, when no fail, we have among others terminating logs e.g
org.apache.pekko.remote.RemoteActorRefProvider$RemotingTerminator [] - Shutting down remote daemon.
etc)

DuplicateJobSubmission Error crashloop :
2024-08-26 09:16:36,385 INFO org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - Job e75c26c4679eb40acd89ed064665f9bb is submitted.
2024-08-26 09:16:36,385 INFO org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - Submitting Job with JobId=e75c26c4679eb40acd89ed064665f9bb.
2024-08-26 09:16:36,510 INFO org.apache.flink.runtime.blob.FileSystemBlobStore [] - Creating highly available BLOB storage directory at s3://my_bucket/recovery/my-job-name/blob
2024-08-26 09:16:36,603 WARN org.apache.hadoop.fs.s3a.S3ABlockOutputStream [] - Application invoked the Syncable API against stream writing to recovery/my-job-name/blob/job_e75c26c4679eb40acd89ed064665f9bb/blob_p-dfdbbf7b0120ec73a562e6841144da852ede93ef-4db3fad9c1a552ff3e784386310bf022. This is unsupported
2024-08-26 09:16:36,614 WARN com.amazonaws.services.s3.internal.Mimetypes [] - Unable to find 'mime.types' file in classpath
2024-08-26 09:16:37,178 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Received JobGraph submission 'my-job-name' (e75c26c4679eb40acd89ed064665f9bb).
2024-08-26 09:16:37,299 WARN org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Ignoring JobGraph submission 'my-job-name' (e75c26c4679eb40acd89ed064665f9bb) because the job already reached a globally-terminal state (i.e. FAILED, CANCELED, FINISHED) in a previous execution.
Traceback (most recent call last):
2024-08-26 09:16:37,304 INFO org.apache.flink.client.python.PythonDriver [] - Traceback (most recent call last):
File "/opt/flink/usrlib/my_job_name.py", line 159, in <module>
2024-08-26 09:16:37,304 INFO org.apache.flink.client.python.PythonDriver [] - File "/opt/flink/usrlib/my_job_name.py", line 159, in <module>
publish_my_job_name()
2024-08-26 09:16:37,304 INFO org.apache.flink.client.python.PythonDriver [] - publish_my_job_name()
File "/opt/flink/usrlib/my_job_name.py", line 127, in publish_my_job_name
2024-08-26 09:16:37,304 INFO org.apache.flink.client.python.PythonDriver [] - File "/opt/flink/usrlib/my_job_name.py", line 127, in publish_my_job_name
table_env.execute_sql("""
2024-08-26 09:16:37,304 INFO org.apache.flink.client.python.PythonDriver [] - table_env.execute_sql("""
File "/opt/flink/opt/python/pyflink.zip/pyflink/table/table_environment.py", line 837, in execute_sql
2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver [] - File "/opt/flink/opt/python/pyflink.zip/pyflink/table/table_environment.py", line 837, in execute_sql
File "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__
2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver [] - File "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__
File "/opt/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 158, in deco
2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver [] - File "/opt/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 158, in deco
pyflink.util.exceptions.TableException: org.apache.flink.table.api.TableException: Failed to execute sql
2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver [] - pyflink.util.exceptions.TableException: org.apache.flink.table.api.TableException: Failed to execute sql
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1060)
2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver [] - at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1060)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:876)
2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver [] - at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:876)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1112)
2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver [] - at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1112)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:735)
2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver [] - at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:735)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver [] - at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver [] - at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver [] - at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.base/java.lang.reflect.Method.invoke(Unknown Source)
2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver [] - at java.base/java.lang.reflect.Method.invoke(Unknown Source)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver [] - at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver [] - at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver [] - at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver [] - at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver [] - at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver [] - at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Unknown Source)
2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver [] - at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.apache.flink.util.FlinkException: Failed to execute job 'my-job-name'.
2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver [] - Caused by: org.apache.flink.util.FlinkException: Failed to execute job 'my-job-name'.
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2455)
2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver [] - at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2455)
at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:188)
2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver [] - at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:188)
at org.apache.flink.table.planner.delegation.DefaultExecutor.executeAsync(DefaultExecutor.java:110)
2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver [] - at org.apache.flink.table.planner.delegation.DefaultExecutor.executeAsync(DefaultExecutor.java:110)
at org.apache.flink.table.executor.python.ChainingOptimizingExecutor.executeAsync(ChainingOptimizingExecutor.java:88)
2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver [] - at org.apache.flink.table.executor.python.ChainingOptimizingExecutor.executeAsync(ChainingOptimizingExecutor.java:88)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1032)
2024-08-26 09:16:37,306 INFO org.apache.flink.client.python.PythonDriver [] - at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1032)
... 14 more
2024-08-26 09:16:37,306 INFO org.apache.flink.client.python.PythonDriver [] - ... 14 more
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Job has already been submitted.
2024-08-26 09:16:37,306 INFO org.apache.flink.client.python.PythonDriver [] - Caused by: org.apache.flink.runtime.client.DuplicateJobSubmissionException: Job has already been submitted.
at org.apache.flink.runtime.client.DuplicateJobSubmissionException.ofGloballyTerminated(DuplicateJobSubmissionException.java:35)

Thank you in advance.
Best regards,
Arthur

Reply via email to