Hello, We are running into issues when deploying flink on kubernetes using the flink-kubernetes-operator with a FlinkDeployment Occasionally, when a *JobManager* gets rotated out (by karpenter in our case), the next JobManager is incapable of getting into a stable state and is stuck in a crash loop by a DuplicateJobSubmissionException We did increase the terminationGracePeriodSeconds but it doesn't seem to help. Is it expected that the operator isn't able to get jobmanagers back into a stable state ? Do you have an idea if we're missing something ?
*Here are our flink configurations :* kubernetes.jobmanager.replicas, 1 execution.submit-failed-job-on-application-error, true high-availability.cluster-id, my_name kubernetes.jobmanager.cpu.limit-factor, 10 pipeline.max-parallelism, 6 kubernetes.service-account, flink kubernetes.cluster-id, my_name high-availability.storageDir, s3://my_bucket/recovery taskmanager.memory.flink.size, 1024m parallelism.default, 1 kubernetes.namespace, flink fs.s3a.aws.credentials.provider, com.amazonaws.auth.DefaultAWSCredentialsProviderChain kubernetes.jobmanager.owner.reference, apiVersion: flink.apache.org/v1beta1,kind:FlinkDeployment,uid:a42c1e37-8f5e-4ec0-a04f-00000,name:my_name,controller:false,blockOwnerDeletion:true state.backend.type, rocksdb kubernetes.container.image.ref, 00000.dkr.ecr.eu-west-3.amazonaws.com/data/my_image_ref jobmanager.memory.flink.size, 1024m taskmanager.memory.process.size, 2048m kubernetes.internal.jobmanager.entrypoint.class, org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint pipeline.name, my_name execution.savepoint.path, s3://my_bucket/savepoints/savepoint-044d28-f5000c2e4bc0 state.backend.local-recovery, false state.backend.rocksdb.localdir, /opt/flink/state kubernetes.pod-template-file.taskmanager, /tmp/flink_op_generated_podTemplate_9407366845247969567.yaml state.backend.incremental, true web.cancel.enable, false execution.shutdown-on-application-finish, false job-result-store.delete-on-commit, false $internal.pipeline.job-id, 044d28b712536c1d1feed3475f2b8111 taskmanager.memory.managed.fraction, 0.6 $internal.flink.version, v1_19 execution.checkpointing.max-concurrent-checkpoints, 1 kubernetes.pod-template-file.jobmanager, /tmp/flink_op_generated_podTemplate_834737432685891333.yaml blob.server.port, 6102 kubernetes.jobmanager.annotations, flinkdeployment.flink.apache.org/generation:5 job-result-store.storage-path, s3://my_bucket/recovery/job-result-store/my_name/9cf5a2e7-89c6-40e7-94dd-c272a2007000 fs.allowed-fallback-filesystems, s3 high-availability.type, kubernetes state.savepoints.dir, s3://my_bucket/savepoints $internal.application.program-args, -pyclientexec;/usr/bin/python3;-py;/opt/flink/usrlib/my_name.py;--restoreMode;CLAIM taskmanager.numberOfTaskSlots, 2 kubernetes.rest-service.exposed.type, ClusterIP high-availability.jobmanager.port, 6101 process.working-dir, /tmp/workdir $internal.application.main, org.apache.flink.client.python.PythonDriver execution.target, kubernetes-application jobmanager.memory.process.size, 2048m taskmanager.rpc.port, 6100 internal.cluster.execution-mode, NORMAL kubernetes.jobmanager.tolerations, key:dedicated,operator:Equal,value:low-churn,effect:NoSchedule execution.checkpointing.externalized-checkpoint-retention, RETAIN_ON_CANCELLATION pipeline.jars, local:///opt/flink/opt/flink-python-1.19.0.jar state.checkpoints.dir, s3://my_bucket/checkpoints jobmanager.memory.off-heap.size, 134217728b jobmanager.memory.jvm-overhead.min, 805306368b jobmanager.memory.jvm-metaspace.size, 268435456b jobmanager.memory.heap.size, 939524096b jobmanager.memory.jvm-overhead.max, 805306368b *Example of logs when failing* *Last pod before error :* INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested. INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Shutting KubernetesApplicationClusterEntrypoint down with application status UNKNOWN. Diagnostics Cluster entrypoint has been closed externally.. (nothing else after, while other times, when no fail, we have among others terminating logs e.g org.apache.pekko.remote.RemoteActorRefProvider$RemotingTerminator [] - Shutting down remote daemon. etc) *DuplicateJobSubmission Error crashloop :* 2024-08-26 09:16:36,385 INFO org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - Job e75c26c4679eb40acd89ed064665f9bb is submitted. 2024-08-26 09:16:36,385 INFO org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - Submitting Job with JobId=e75c26c4679eb40acd89ed064665f9bb. 2024-08-26 09:16:36,510 INFO org.apache.flink.runtime.blob.FileSystemBlobStore [] - Creating highly available BLOB storage directory at s3://my_bucket/recovery/my-job-name/blob 2024-08-26 09:16:36,603 WARN org.apache.hadoop.fs.s3a.S3ABlockOutputStream [] - Application invoked the Syncable API against stream writing to recovery/my-job-name/blob/job_e75c26c4679eb40acd89ed064665f9bb/blob_p- dfdbbf7b0120ec73a562e6841144da852ede93ef-4db3fad9c1a552ff3e784386310bf022. This is unsupported 2024-08-26 09:16:36,614 WARN com.amazonaws.services.s3.internal.Mimetypes [] - Unable to find 'mime.types' file in classpath 2024-08-26 09:16:37,178 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Received JobGraph submission 'my-job-name' (e75c26c4679eb40acd89ed064665f9bb). 2024-08-26 09:16:37,299 WARN org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Ignoring JobGraph submission 'my-job-name' (e75c26c4679eb40acd89ed064665f9bb) because the job already reached a globally-terminal state (i.e. FAILED, CANCELED, FINISHED) in a previous execution. Traceback (most recent call last): 2024-08-26 09:16:37,304 INFO org.apache.flink.client.python.PythonDriver [] - Traceback (most recent call last): File "/opt/flink/usrlib/my_job_name.py", line 159, in <module> 2024-08-26 09:16:37,304 INFO org.apache.flink.client.python.PythonDriver [] - File "/opt/flink/usrlib/my_job_name.py", line 159, in <module> publish_my_job_name() 2024-08-26 09:16:37,304 INFO org.apache.flink.client.python.PythonDriver [] - publish_my_job_name() File "/opt/flink/usrlib/my_job_name.py", line 127, in publish_my_job_name 2024-08-26 09:16:37,304 INFO org.apache.flink.client.python.PythonDriver [] - File "/opt/flink/usrlib/my_job_name.py", line 127, in publish_my_job_name table_env.execute_sql(""" 2024-08-26 09:16:37,304 INFO org.apache.flink.client.python.PythonDriver [] - table_env.execute_sql(""" File "/opt/flink/opt/python/pyflink.zip/pyflink/table/table_environment.py", line 837, in execute_sql 2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver [] - File "/opt/flink/opt/python/pyflink.zip/pyflink/table/table_environment.py", line 837, in execute_sql File "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__ 2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver [] - File "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__ File "/opt/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 158, in deco 2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver [] - File "/opt/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 158, in deco pyflink.util.exceptions.TableException: org.apache.flink.table.api.TableException: Failed to execute sql 2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver [] - pyflink.util.exceptions.TableException: org.apache.flink.table.api.TableException: Failed to execute sql at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1060) 2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver [] - at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal( TableEnvironmentImpl.java:1060) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:876) 2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver [] - at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal( TableEnvironmentImpl.java:876) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1112) 2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver [] - at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal( TableEnvironmentImpl.java:1112) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:735) 2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver [] - at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql( TableEnvironmentImpl.java:735) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver [] - at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) 2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver [] - at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) 2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver [] - at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) at java.base/java.lang.reflect.Method.invoke(Unknown Source) 2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver [] - at java.base/java.lang.reflect.Method.invoke(Unknown Source) at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) 2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver [] - at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke (MethodInvoker.java:244) at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374) 2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver [] - at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke( ReflectionEngine.java:374) at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) 2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver [] - at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282 ) at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) 2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver [] - at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod (AbstractCommand.java:132) at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) 2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver [] - at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute( CallCommand.java:79) at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) 2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver [] - at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run( GatewayConnection.java:238) at java.base/java.lang.Thread.run(Unknown Source) 2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver [] - at java.base/java.lang.Thread.run(Unknown Source) Caused by: org.apache.flink.util.FlinkException: Failed to execute job 'my-job-name'. 2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver [] - Caused by: org.apache.flink.util.FlinkException: Failed to execute job 'my-job-name'. at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2455) 2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver [] - at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync (StreamExecutionEnvironment.java:2455) at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:188) 2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver [] - at org.apache.flink.client.program.StreamContextEnvironment.executeAsync( StreamContextEnvironment.java:188) at org.apache.flink.table.planner.delegation.DefaultExecutor.executeAsync(DefaultExecutor.java:110) 2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver [] - at org.apache.flink.table.planner.delegation.DefaultExecutor.executeAsync( DefaultExecutor.java:110) at org.apache.flink.table.executor.python.ChainingOptimizingExecutor.executeAsync(ChainingOptimizingExecutor.java:88) 2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver [] - at org.apache.flink.table.executor.python.ChainingOptimizingExecutor.executeAsync (ChainingOptimizingExecutor.java:88) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1032) 2024-08-26 09:16:37,306 INFO org.apache.flink.client.python.PythonDriver [] - at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal( TableEnvironmentImpl.java:1032) ... 14 more 2024-08-26 09:16:37,306 INFO org.apache.flink.client.python.PythonDriver [] - ... 14 more Caused by: org.apache.flink.runtime.client.JobSubmissionException: Job has already been submitted. 2024-08-26 09:16:37,306 INFO org.apache.flink.client.python.PythonDriver [] - Caused by: org.apache.flink.runtime.client.DuplicateJobSubmissionException: Job has already been submitted. at org.apache.flink.runtime.client.DuplicateJobSubmissionException.ofGloballyTerminated(DuplicateJobSubmissionException.java:35) Thank you in advance. Best regards, Arthur