Hello,

We are running into issues when deploying flink on kubernetes using the
flink-kubernetes-operator with a FlinkDeployment
Occasionally, when a *JobManager* gets rotated out (by karpenter in our
case), the next JobManager is incapable of getting into a stable state and
is stuck in a crash loop by a DuplicateJobSubmissionException
We did increase the terminationGracePeriodSeconds but it doesn't seem to
help.
Is it expected that the operator isn't able to get jobmanagers back into a
stable state ?  Do you have an idea if we're missing something ?

*Here are our flink configurations :*

kubernetes.jobmanager.replicas, 1
execution.submit-failed-job-on-application-error, true
high-availability.cluster-id, my_name
kubernetes.jobmanager.cpu.limit-factor, 10 pipeline.max-parallelism, 6
kubernetes.service-account, flink kubernetes.cluster-id, my_name
high-availability.storageDir, s3://my_bucket/recovery
taskmanager.memory.flink.size, 1024m parallelism.default, 1
kubernetes.namespace, flink fs.s3a.aws.credentials.provider,
com.amazonaws.auth.DefaultAWSCredentialsProviderChain
kubernetes.jobmanager.owner.reference, apiVersion:
flink.apache.org/v1beta1,kind:FlinkDeployment,uid:a42c1e37-8f5e-4ec0-a04f-00000,name:my_name,controller:false,blockOwnerDeletion:true
state.backend.type, rocksdb kubernetes.container.image.ref,
00000.dkr.ecr.eu-west-3.amazonaws.com/data/my_image_ref
jobmanager.memory.flink.size, 1024m taskmanager.memory.process.size, 2048m
kubernetes.internal.jobmanager.entrypoint.class,
org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint
pipeline.name, my_name execution.savepoint.path,
s3://my_bucket/savepoints/savepoint-044d28-f5000c2e4bc0
state.backend.local-recovery, false state.backend.rocksdb.localdir,
/opt/flink/state kubernetes.pod-template-file.taskmanager,
/tmp/flink_op_generated_podTemplate_9407366845247969567.yaml
state.backend.incremental, true web.cancel.enable, false
execution.shutdown-on-application-finish, false
job-result-store.delete-on-commit, false $internal.pipeline.job-id,
044d28b712536c1d1feed3475f2b8111 taskmanager.memory.managed.fraction, 0.6
$internal.flink.version, v1_19
execution.checkpointing.max-concurrent-checkpoints, 1
kubernetes.pod-template-file.jobmanager,
/tmp/flink_op_generated_podTemplate_834737432685891333.yaml
blob.server.port, 6102 kubernetes.jobmanager.annotations,
flinkdeployment.flink.apache.org/generation:5
job-result-store.storage-path,
s3://my_bucket/recovery/job-result-store/my_name/9cf5a2e7-89c6-40e7-94dd-c272a2007000
fs.allowed-fallback-filesystems, s3 high-availability.type, kubernetes
state.savepoints.dir, s3://my_bucket/savepoints
$internal.application.program-args,
-pyclientexec;/usr/bin/python3;-py;/opt/flink/usrlib/my_name.py;--restoreMode;CLAIM
taskmanager.numberOfTaskSlots, 2 kubernetes.rest-service.exposed.type,
ClusterIP high-availability.jobmanager.port, 6101 process.working-dir,
/tmp/workdir $internal.application.main,
org.apache.flink.client.python.PythonDriver execution.target,
kubernetes-application jobmanager.memory.process.size, 2048m
taskmanager.rpc.port, 6100 internal.cluster.execution-mode, NORMAL
kubernetes.jobmanager.tolerations,
key:dedicated,operator:Equal,value:low-churn,effect:NoSchedule
execution.checkpointing.externalized-checkpoint-retention,
RETAIN_ON_CANCELLATION pipeline.jars,
local:///opt/flink/opt/flink-python-1.19.0.jar state.checkpoints.dir,
s3://my_bucket/checkpoints jobmanager.memory.off-heap.size, 134217728b
jobmanager.memory.jvm-overhead.min, 805306368b
jobmanager.memory.jvm-metaspace.size, 268435456b
jobmanager.memory.heap.size, 939524096b jobmanager.memory.jvm-overhead.max,
805306368b
*Example of logs when failing*
*Last pod before error :*
INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - RECEIVED
SIGNAL 15: SIGTERM. Shutting down as requested.
INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Shutting
KubernetesApplicationClusterEntrypoint down with application status
UNKNOWN. Diagnostics Cluster entrypoint has been closed externally..
(nothing else after, while other times, when no fail, we have among others
terminating logs e.g
org.apache.pekko.remote.RemoteActorRefProvider$RemotingTerminator [] -
Shutting down remote daemon.
etc)


*DuplicateJobSubmission Error crashloop :*
2024-08-26 09:16:36,385 INFO
org.apache.flink.client.deployment.application.executors.EmbeddedExecutor
[] - Job e75c26c4679eb40acd89ed064665f9bb is submitted.
2024-08-26 09:16:36,385 INFO
org.apache.flink.client.deployment.application.executors.EmbeddedExecutor
[] - Submitting Job with JobId=e75c26c4679eb40acd89ed064665f9bb.
2024-08-26 09:16:36,510 INFO
org.apache.flink.runtime.blob.FileSystemBlobStore [] - Creating highly
available BLOB storage directory at s3://my_bucket/recovery/my-job-name/blob
2024-08-26 09:16:36,603 WARN org.apache.hadoop.fs.s3a.S3ABlockOutputStream
[] - Application invoked the Syncable API against stream writing to
recovery/my-job-name/blob/job_e75c26c4679eb40acd89ed064665f9bb/blob_p-
dfdbbf7b0120ec73a562e6841144da852ede93ef-4db3fad9c1a552ff3e784386310bf022.
This is unsupported
2024-08-26 09:16:36,614 WARN com.amazonaws.services.s3.internal.Mimetypes
[] - Unable to find 'mime.types' file in classpath
2024-08-26 09:16:37,178 INFO
org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Received
JobGraph submission 'my-job-name' (e75c26c4679eb40acd89ed064665f9bb).
2024-08-26 09:16:37,299 WARN
org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Ignoring
JobGraph submission 'my-job-name' (e75c26c4679eb40acd89ed064665f9bb)
because the job already reached a globally-terminal state (i.e. FAILED,
CANCELED, FINISHED) in a previous execution.
Traceback (most recent call last):
2024-08-26 09:16:37,304 INFO org.apache.flink.client.python.PythonDriver []
- Traceback (most recent call last):
File "/opt/flink/usrlib/my_job_name.py", line 159, in <module>
2024-08-26 09:16:37,304 INFO org.apache.flink.client.python.PythonDriver []
- File "/opt/flink/usrlib/my_job_name.py", line 159, in <module>
publish_my_job_name()
2024-08-26 09:16:37,304 INFO org.apache.flink.client.python.PythonDriver []
- publish_my_job_name()
File "/opt/flink/usrlib/my_job_name.py", line 127, in publish_my_job_name
2024-08-26 09:16:37,304 INFO org.apache.flink.client.python.PythonDriver []
- File "/opt/flink/usrlib/my_job_name.py", line 127, in publish_my_job_name
table_env.execute_sql("""
2024-08-26 09:16:37,304 INFO org.apache.flink.client.python.PythonDriver []
- table_env.execute_sql("""
File "/opt/flink/opt/python/pyflink.zip/pyflink/table/table_environment.py",
line 837, in execute_sql
2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver []
- File
"/opt/flink/opt/python/pyflink.zip/pyflink/table/table_environment.py",
line 837, in execute_sql
File "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/java_gateway.py",
line 1322, in __call__
2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver []
- File "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/java_gateway.py",
line 1322, in __call__
File "/opt/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py", line
158, in deco
2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver []
- File "/opt/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py", line
158, in deco
pyflink.util.exceptions.TableException:
org.apache.flink.table.api.TableException: Failed to execute sql
2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver []
- pyflink.util.exceptions.TableException:
org.apache.flink.table.api.TableException: Failed to execute sql
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1060)
2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver []
- at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(
TableEnvironmentImpl.java:1060)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:876)
2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver []
- at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(
TableEnvironmentImpl.java:876)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1112)
2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver []
- at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(
TableEnvironmentImpl.java:1112)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:735)
2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver []
- at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(
TableEnvironmentImpl.java:735)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver []
- at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown
Source)
2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver []
- at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown
Source)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
Source)
2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver []
- at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
Source)
at java.base/java.lang.reflect.Method.invoke(Unknown Source)
2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver []
- at java.base/java.lang.reflect.Method.invoke(Unknown Source)
at
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver []
- at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke
(MethodInvoker.java:244)
at
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver []
- at
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(
ReflectionEngine.java:374)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver []
- at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282
)
at
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver []
- at
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod
(AbstractCommand.java:132)
at
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver []
- at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(
CallCommand.java:79)
at
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver []
- at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(
GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Unknown Source)
2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver []
- at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.apache.flink.util.FlinkException: Failed to execute job
'my-job-name'.
2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver []
- Caused by: org.apache.flink.util.FlinkException: Failed to execute job
'my-job-name'.
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2455)
2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver []
- at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync
(StreamExecutionEnvironment.java:2455)
at
org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:188)
2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver []
- at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(
StreamContextEnvironment.java:188)
at
org.apache.flink.table.planner.delegation.DefaultExecutor.executeAsync(DefaultExecutor.java:110)
2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver []
- at org.apache.flink.table.planner.delegation.DefaultExecutor.executeAsync(
DefaultExecutor.java:110)
at
org.apache.flink.table.executor.python.ChainingOptimizingExecutor.executeAsync(ChainingOptimizingExecutor.java:88)
2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver []
- at
org.apache.flink.table.executor.python.ChainingOptimizingExecutor.executeAsync
(ChainingOptimizingExecutor.java:88)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1032)
2024-08-26 09:16:37,306 INFO org.apache.flink.client.python.PythonDriver []
- at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(
TableEnvironmentImpl.java:1032)
... 14 more
2024-08-26 09:16:37,306 INFO org.apache.flink.client.python.PythonDriver []
- ... 14 more
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Job has
already been submitted.
2024-08-26 09:16:37,306 INFO org.apache.flink.client.python.PythonDriver []
- Caused by: org.apache.flink.runtime.client.DuplicateJobSubmissionException:
Job has already been submitted.
at
org.apache.flink.runtime.client.DuplicateJobSubmissionException.ofGloballyTerminated(DuplicateJobSubmissionException.java:35)

Thank you in advance.
Best regards,
Arthur

Reply via email to