Hi Naci,
Thanks for your answer.

We do not explicitly define the job-id. As we are using
the flink-kubernetes-operator, I suppose it's the operator handling this ID.
The job is defined in the FlinkDeployment charts, where we have specs for
jobmanager, taskmanager and the job :
job:
jarURI: local:///opt/flink/opt/flink-python-1.19.0.jar
entryClass: "org.apache.flink.client.python.PythonDriver"
args: ["-pyclientexec", "/usr/bin/python3", "-py",
"/opt/flink/usrlib/my_job.py", "--restoreMode", "CLAIM"]
parallelism: 1
upgradeMode: savepoint
state: running

We are testing without HA right now, can't reproduce the error yet, maybe
this could indeed be linked. But wouldn't we need to keep HA anyways when
in production ?
On the topic of HA, we have tried enabling it by adding
*high-availability.type:
kubernetes* & *high-availability.storageDir, *but even when we define*
kubernetes.jobmanager.replicas:
2*, the deployment interprets it at *1* anyways. Anything we could be
missing ?

Our charts :
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: my-job
spec:
image: {{ .Values.global.image.registry}}/{{
.Values.global.image.repository }}:{{ .Values.global.image.tag }}
flinkVersion: v1_19
ingress:
template: "{{ .Values.ingress.host }}/my-job/"
className: "traefik-traefik-ingress"
annotations:
traefik.ingress.kubernetes.io/router.entrypoints: "private"
traefik.ingress.kubernetes.io/router.middlewares:
"traefik-hstssecureheaders@kubernetescrd, flink-strip-prefix@kubernetescrd"
flinkConfiguration:
taskmanager.numberOfTaskSlots: "1"
kubernetes.operator.savepoint.history.max.age: "72 h"
kubernetes.operator.savepoint.history.max.count: "3"
kubernetes.operator.jm-deployment-recovery.enabled: "true"
kubernetes.operator.cluster.health-check.enabled: "true"
kubernetes.operator.job.restart.failed: "true"
kubernetes.operator.job.autoscaler.enabled: "true"
kubernetes.operator.job.autoscaler.stabilization.interval: 1m
kubernetes.operator.job.autoscaler.metrics.window: 5m
kubernetes.operator.job.autoscaler.target.utilization: "0.6"
kubernetes.operator.job.autoscaler.target.utilization.boundary: "0.2"
kubernetes.operator.job.autoscaler.restart.time: 2m
kubernetes.operator.job.autoscaler.catch-up.duration: 5m
pipeline.max-parallelism: "6"
kubernetes.jobmanager.replicas: "2"
kubernetes.jobmanager.tolerations:
key:dedicated,operator:Equal,value:low-churn,effect:NoSchedule
serviceAccount: flink
jobManager:
podTemplate:
spec:
terminationGracePeriodSeconds: 600
containers:
- image: {{ .Values.global.image.registry}}/{{
.Values.global.image.repository }}:{{ .Values.global.image.tag }}
name: flink-main-container
resources:
limits:
memory: "2048Mi"
requests:
memory: "2048Mi"
cpu: "1"
replicas: 2
nodeSelector:
churn-rate: low
tolerations:
- key: dedicated
value: low-churn
operator: Equal
effect: NoSchedule
envFrom:
- secretRef:
name: flink-external-secrets
taskManager:
podTemplate:
spec:
terminationGracePeriodSeconds: 600
securityContext:
runAsGroup: 9999
runAsUser: 9999
fsGroup: 9999
containers:
- image: {{ .Values.global.image.registry }}/{{
.Values.global.image.repository }}:{{ .Values.global.image.tag }}
name: flink-main-container
resources:
limits:
memory: "2048Mi"
requests:
memory: "2048Mi"
cpu: "0.01"
replicas: 1
nodeSelector:
churn-rate: low
tolerations:
- key: dedicated
value: low-churn
operator: Equal
effect: NoSchedule
podTemplate:
metadata:
labels:
app.kubernetes.io/part-of: flink
job:
jarURI: local:///opt/flink/opt/flink-python-1.19.0.jar
entryClass: "org.apache.flink.client.python.PythonDriver"
args: ["-pyclientexec", "/usr/bin/python3", "-py",
"/opt/flink/usrlib/my_job.py", "--restoreMode", "CLAIM"]
parallelism: 1
upgradeMode: savepoint
state: {{ .Values.jobs.my_job.state }}
and some extra variables

taskmanager.memory.process.size: 2048m
jobmanager.memory.process.size: 2048m
taskmanager.memory.flink.size: 1024m
jobmanager.memory.flink.size: 1024m
taskmanager.memory.managed.fraction: 0.6
kubernetes.jobmanager.cpu.limit-factor: 10
state.backend.type: rocksdb
state.backend.rocksdb.localdir: /opt/flink/state
state.backend.incremental: true
state.backend.local-recovery: false
execution.checkpointing.max-concurrent-checkpoints: 1
high-availability.type: kubernetes
fs.s3a.aws.credentials.provider:
com.amazonaws.auth.DefaultAWSCredentialsProviderChain
fs.allowed-fallback-filesystems: s3
high-availability.storageDir: s3://my-bucket/recovery
state.savepoints.dir: s3://my-bucket/savepoints
state.checkpoints.dir: s3://my-bucket/checkpoints
process.working-dir: /tmp/workdir

Best regards,
Arthur


On Wed, Aug 28, 2024 at 11:03 PM Naci Simsek <nacisim...@gmail.com> wrote:

> Hi Arthur,
>
> How you submit your job? Are you explicitly setting job id when submitting
> the job?
> Have you also tried without HA to see the behavior?
>
> Looks like the job is submitted with the same ID with the previous job,
> which the job result stored in HA does not let you submit it with the same
> job_id.
>
> BR,
> Naci
>
> On 28. Aug 2024, at 17:32, Arthur Catrisse via user <user@flink.apache.org>
> wrote:
>
> 
> Hello,
>
> We are running into issues when deploying flink on kubernetes using the
> flink-kubernetes-operator with a FlinkDeployment
> Occasionally, when a *JobManager* gets rotated out (by karpenter in our
> case), the next JobManager is incapable of getting into a stable state and
> is stuck in a crash loop by a DuplicateJobSubmissionException
> We did increase the terminationGracePeriodSeconds but it doesn't seem to
> help.
> Is it expected that the operator isn't able to get jobmanagers back into a
> stable state ?  Do you have an idea if we're missing something ?
>
> *Here are our flink configurations :*
>
> kubernetes.jobmanager.replicas, 1
> execution.submit-failed-job-on-application-error, true
> high-availability.cluster-id, my_name
> kubernetes.jobmanager.cpu.limit-factor, 10 pipeline.max-parallelism, 6
> kubernetes.service-account, flink kubernetes.cluster-id, my_name
> high-availability.storageDir, s3://my_bucket/recovery
> taskmanager.memory.flink.size, 1024m parallelism.default, 1
> kubernetes.namespace, flink fs.s3a.aws.credentials.provider,
> com.amazonaws.auth.DefaultAWSCredentialsProviderChain
> kubernetes.jobmanager.owner.reference, apiVersion:
> flink.apache.org/v1beta1,kind:FlinkDeployment,uid:a42c1e37-8f5e-4ec0-a04f-00000,name:my_name,controller:false,blockOwnerDeletion:true
> state.backend.type, rocksdb kubernetes.container.image.ref,
> 00000.dkr.ecr.eu-west-3.amazonaws.com/data/my_image_ref
> jobmanager.memory.flink.size, 1024m taskmanager.memory.process.size, 2048m
> kubernetes.internal.jobmanager.entrypoint.class,
> org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint
> pipeline.name, my_name execution.savepoint.path,
> s3://my_bucket/savepoints/savepoint-044d28-f5000c2e4bc0
> state.backend.local-recovery, false state.backend.rocksdb.localdir,
> /opt/flink/state kubernetes.pod-template-file.taskmanager,
> /tmp/flink_op_generated_podTemplate_9407366845247969567.yaml
> state.backend.incremental, true web.cancel.enable, false
> execution.shutdown-on-application-finish, false
> job-result-store.delete-on-commit, false $internal.pipeline.job-id,
> 044d28b712536c1d1feed3475f2b8111 taskmanager.memory.managed.fraction, 0.6
> $internal.flink.version, v1_19
> execution.checkpointing.max-concurrent-checkpoints, 1
> kubernetes.pod-template-file.jobmanager,
> /tmp/flink_op_generated_podTemplate_834737432685891333.yaml
> blob.server.port, 6102 kubernetes.jobmanager.annotations, 
> flinkdeployment.flink.apache.org/generation:5
> job-result-store.storage-path
> <http://flinkdeployment.flink.apache.org/generation:5%0Djob-result-store.storage-path>,
>
> s3://my_bucket/recovery/job-result-store/my_name/9cf5a2e7-89c6-40e7-94dd-c272a2007000
> fs.allowed-fallback-filesystems, s3 high-availability.type, kubernetes
> state.savepoints.dir, s3://my_bucket/savepoints
> $internal.application.program-args,
> -pyclientexec;/usr/bin/python3;-py;/opt/flink/usrlib/my_name.py;--restoreMode;CLAIM
> taskmanager.numberOfTaskSlots, 2 kubernetes.rest-service.exposed.type,
> ClusterIP high-availability.jobmanager.port, 6101 process.working-dir,
> /tmp/workdir $internal.application.main,
> org.apache.flink.client.python.PythonDriver execution.target,
> kubernetes-application jobmanager.memory.process.size, 2048m
> taskmanager.rpc.port, 6100 internal.cluster.execution-mode, NORMAL
> kubernetes.jobmanager.tolerations,
> key:dedicated,operator:Equal,value:low-churn,effect:NoSchedule
> execution.checkpointing.externalized-checkpoint-retention,
> RETAIN_ON_CANCELLATION pipeline.jars,
> local:///opt/flink/opt/flink-python-1.19.0.jar state.checkpoints.dir,
> s3://my_bucket/checkpoints jobmanager.memory.off-heap.size, 134217728b
> jobmanager.memory.jvm-overhead.min, 805306368b
> jobmanager.memory.jvm-metaspace.size, 268435456b
> jobmanager.memory.heap.size, 939524096b jobmanager.memory.jvm-overhead.max,
> 805306368b
> *Example of logs when failing*
> *Last pod before error :*
> INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - RECEIVED
> SIGNAL 15: SIGTERM. Shutting down as requested.
> INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Shutting
> KubernetesApplicationClusterEntrypoint down with application status
> UNKNOWN. Diagnostics Cluster entrypoint has been closed externally..
> (nothing else after, while other times, when no fail, we have among others
> terminating logs e.g
> org.apache.pekko.remote.RemoteActorRefProvider$RemotingTerminator [] -
> Shutting down remote daemon.
> etc)
>
>
> *DuplicateJobSubmission Error crashloop :*
> 2024-08-26 09:16:36,385 INFO
> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor
> [] - Job e75c26c4679eb40acd89ed064665f9bb is submitted.
> 2024-08-26 09:16:36,385 INFO
> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor
> [] - Submitting Job with JobId=e75c26c4679eb40acd89ed064665f9bb.
> 2024-08-26 09:16:36,510 INFO
> org.apache.flink.runtime.blob.FileSystemBlobStore [] - Creating highly
> available BLOB storage directory at s3://my_bucket/recovery/my-job-name/blob
> 2024-08-26 09:16:36,603 WARN org.apache.hadoop.fs.s3a.S3ABlockOutputStream
> [] - Application invoked the Syncable API against stream writing to
> recovery/my-job-name/blob/job_e75c26c4679eb40acd89ed064665f9bb/blob_p-
> dfdbbf7b0120ec73a562e6841144da852ede93ef-4db3fad9c1a552ff3e784386310bf022.
> This is unsupported
> 2024-08-26 09:16:36,614 WARN com.amazonaws.services.s3.internal.Mimetypes
> [] - Unable to find 'mime.types' file in classpath
> 2024-08-26 09:16:37,178 INFO
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Received
> JobGraph submission 'my-job-name' (e75c26c4679eb40acd89ed064665f9bb).
> 2024-08-26 09:16:37,299 WARN
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Ignoring
> JobGraph submission 'my-job-name' (e75c26c4679eb40acd89ed064665f9bb)
> because the job already reached a globally-terminal state (i.e. FAILED,
> CANCELED, FINISHED) in a previous execution.
> Traceback (most recent call last):
> 2024-08-26 09:16:37,304 INFO org.apache.flink.client.python.PythonDriver
> [] - Traceback (most recent call last):
> File "/opt/flink/usrlib/my_job_name.py", line 159, in <module>
> 2024-08-26 09:16:37,304 INFO org.apache.flink.client.python.PythonDriver
> [] - File "/opt/flink/usrlib/my_job_name.py", line 159, in <module>
> publish_my_job_name()
> 2024-08-26 09:16:37,304 INFO org.apache.flink.client.python.PythonDriver
> [] - publish_my_job_name()
> File "/opt/flink/usrlib/my_job_name.py", line 127, in publish_my_job_name
> 2024-08-26 09:16:37,304 INFO org.apache.flink.client.python.PythonDriver
> [] - File "/opt/flink/usrlib/my_job_name.py", line 127, in
> publish_my_job_name
> table_env.execute_sql("""
> 2024-08-26 09:16:37,304 INFO org.apache.flink.client.python.PythonDriver
> [] - table_env.execute_sql("""
> File
> "/opt/flink/opt/python/pyflink.zip/pyflink/table/table_environment.py",
> line 837, in execute_sql
> 2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver
> [] - File
> "/opt/flink/opt/python/pyflink.zip/pyflink/table/table_environment.py",
> line 837, in execute_sql
> File "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/java_gateway.py",
> line 1322, in __call__
> 2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver
> [] - File
> "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line
> 1322, in __call__
> File "/opt/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py", line
> 158, in deco
> 2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver
> [] - File "/opt/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py",
> line 158, in deco
> pyflink.util.exceptions.TableException:
> org.apache.flink.table.api.TableException: Failed to execute sql
> 2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver
> [] - pyflink.util.exceptions.TableException:
> org.apache.flink.table.api.TableException: Failed to execute sql
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1060)
> 2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver
> [] - at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(
> TableEnvironmentImpl.java:1060)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:876)
> 2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver
> [] - at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(
> TableEnvironmentImpl.java:876)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1112)
> 2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver
> [] - at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(
> TableEnvironmentImpl.java:1112)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:735)
> 2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver
> [] - at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(
> TableEnvironmentImpl.java:735)
> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
> 2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver
> [] - at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown
> Source)
> 2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver
> [] - at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown
> Source)
> at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
> Source)
> 2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver
> [] - at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
> Source)
> at java.base/java.lang.reflect.Method.invoke(Unknown Source)
> 2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver
> [] - at java.base/java.lang.reflect.Method.invoke(Unknown Source)
> at
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> 2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver
> [] - at
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(
> MethodInvoker.java:244)
> at
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
> 2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver
> [] - at
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke
> (ReflectionEngine.java:374)
> at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
> 2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver
> [] - at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(
> Gateway.java:282)
> at
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> 2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver
> [] - at
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod
> (AbstractCommand.java:132)
> at
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
> 2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver
> [] - at
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(
> CallCommand.java:79)
> at
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
> 2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver
> [] - at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(
> GatewayConnection.java:238)
> at java.base/java.lang.Thread.run(Unknown Source)
> 2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver
> [] - at java.base/java.lang.Thread.run(Unknown Source)
> Caused by: org.apache.flink.util.FlinkException: Failed to execute job
> 'my-job-name'.
> 2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver
> [] - Caused by: org.apache.flink.util.FlinkException: Failed to execute
> job 'my-job-name'.
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2455)
> 2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver
> [] - at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync
> (StreamExecutionEnvironment.java:2455)
> at
> org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:188)
> 2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver
> [] - at
> org.apache.flink.client.program.StreamContextEnvironment.executeAsync(
> StreamContextEnvironment.java:188)
> at
> org.apache.flink.table.planner.delegation.DefaultExecutor.executeAsync(DefaultExecutor.java:110)
> 2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver
> [] - at
> org.apache.flink.table.planner.delegation.DefaultExecutor.executeAsync(
> DefaultExecutor.java:110)
> at
> org.apache.flink.table.executor.python.ChainingOptimizingExecutor.executeAsync(ChainingOptimizingExecutor.java:88)
> 2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver
> [] - at
> org.apache.flink.table.executor.python.ChainingOptimizingExecutor.executeAsync
> (ChainingOptimizingExecutor.java:88)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1032)
> 2024-08-26 09:16:37,306 INFO org.apache.flink.client.python.PythonDriver
> [] - at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(
> TableEnvironmentImpl.java:1032)
> ... 14 more
> 2024-08-26 09:16:37,306 INFO org.apache.flink.client.python.PythonDriver
> [] - ... 14 more
> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Job
> has already been submitted.
> 2024-08-26 09:16:37,306 INFO org.apache.flink.client.python.PythonDriver
> [] - Caused by:
> org.apache.flink.runtime.client.DuplicateJobSubmissionException: Job has
> already been submitted.
> at
> org.apache.flink.runtime.client.DuplicateJobSubmissionException.ofGloballyTerminated(DuplicateJobSubmissionException.java:35)
>
> Thank you in advance.
> Best regards,
> Arthur
>
>

Reply via email to