Hi Naci, Thanks for your answer. We do not explicitly define the job-id. As we are using the flink-kubernetes-operator, I suppose it's the operator handling this ID. The job is defined in the FlinkDeployment charts, where we have specs for jobmanager, taskmanager and the job : job: jarURI: local:///opt/flink/opt/flink-python-1.19.0.jar entryClass: "org.apache.flink.client.python.PythonDriver" args: ["-pyclientexec", "/usr/bin/python3", "-py", "/opt/flink/usrlib/my_job.py", "--restoreMode", "CLAIM"] parallelism: 1 upgradeMode: savepoint state: running
We are testing without HA right now, can't reproduce the error yet, maybe this could indeed be linked. But wouldn't we need to keep HA anyways when in production ? On the topic of HA, we have tried enabling it by adding *high-availability.type: kubernetes* & *high-availability.storageDir, *but even when we define* kubernetes.jobmanager.replicas: 2*, the deployment interprets it at *1* anyways. Anything we could be missing ? Our charts : apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: my-job spec: image: {{ .Values.global.image.registry}}/{{ .Values.global.image.repository }}:{{ .Values.global.image.tag }} flinkVersion: v1_19 ingress: template: "{{ .Values.ingress.host }}/my-job/" className: "traefik-traefik-ingress" annotations: traefik.ingress.kubernetes.io/router.entrypoints: "private" traefik.ingress.kubernetes.io/router.middlewares: "traefik-hstssecureheaders@kubernetescrd, flink-strip-prefix@kubernetescrd" flinkConfiguration: taskmanager.numberOfTaskSlots: "1" kubernetes.operator.savepoint.history.max.age: "72 h" kubernetes.operator.savepoint.history.max.count: "3" kubernetes.operator.jm-deployment-recovery.enabled: "true" kubernetes.operator.cluster.health-check.enabled: "true" kubernetes.operator.job.restart.failed: "true" kubernetes.operator.job.autoscaler.enabled: "true" kubernetes.operator.job.autoscaler.stabilization.interval: 1m kubernetes.operator.job.autoscaler.metrics.window: 5m kubernetes.operator.job.autoscaler.target.utilization: "0.6" kubernetes.operator.job.autoscaler.target.utilization.boundary: "0.2" kubernetes.operator.job.autoscaler.restart.time: 2m kubernetes.operator.job.autoscaler.catch-up.duration: 5m pipeline.max-parallelism: "6" kubernetes.jobmanager.replicas: "2" kubernetes.jobmanager.tolerations: key:dedicated,operator:Equal,value:low-churn,effect:NoSchedule serviceAccount: flink jobManager: podTemplate: spec: terminationGracePeriodSeconds: 600 containers: - image: {{ .Values.global.image.registry}}/{{ .Values.global.image.repository }}:{{ .Values.global.image.tag }} name: flink-main-container resources: limits: memory: "2048Mi" requests: memory: "2048Mi" cpu: "1" replicas: 2 nodeSelector: churn-rate: low tolerations: - key: dedicated value: low-churn operator: Equal effect: NoSchedule envFrom: - secretRef: name: flink-external-secrets taskManager: podTemplate: spec: terminationGracePeriodSeconds: 600 securityContext: runAsGroup: 9999 runAsUser: 9999 fsGroup: 9999 containers: - image: {{ .Values.global.image.registry }}/{{ .Values.global.image.repository }}:{{ .Values.global.image.tag }} name: flink-main-container resources: limits: memory: "2048Mi" requests: memory: "2048Mi" cpu: "0.01" replicas: 1 nodeSelector: churn-rate: low tolerations: - key: dedicated value: low-churn operator: Equal effect: NoSchedule podTemplate: metadata: labels: app.kubernetes.io/part-of: flink job: jarURI: local:///opt/flink/opt/flink-python-1.19.0.jar entryClass: "org.apache.flink.client.python.PythonDriver" args: ["-pyclientexec", "/usr/bin/python3", "-py", "/opt/flink/usrlib/my_job.py", "--restoreMode", "CLAIM"] parallelism: 1 upgradeMode: savepoint state: {{ .Values.jobs.my_job.state }} and some extra variables taskmanager.memory.process.size: 2048m jobmanager.memory.process.size: 2048m taskmanager.memory.flink.size: 1024m jobmanager.memory.flink.size: 1024m taskmanager.memory.managed.fraction: 0.6 kubernetes.jobmanager.cpu.limit-factor: 10 state.backend.type: rocksdb state.backend.rocksdb.localdir: /opt/flink/state state.backend.incremental: true state.backend.local-recovery: false execution.checkpointing.max-concurrent-checkpoints: 1 high-availability.type: kubernetes fs.s3a.aws.credentials.provider: com.amazonaws.auth.DefaultAWSCredentialsProviderChain fs.allowed-fallback-filesystems: s3 high-availability.storageDir: s3://my-bucket/recovery state.savepoints.dir: s3://my-bucket/savepoints state.checkpoints.dir: s3://my-bucket/checkpoints process.working-dir: /tmp/workdir Best regards, Arthur On Wed, Aug 28, 2024 at 11:03 PM Naci Simsek <nacisim...@gmail.com> wrote: > Hi Arthur, > > How you submit your job? Are you explicitly setting job id when submitting > the job? > Have you also tried without HA to see the behavior? > > Looks like the job is submitted with the same ID with the previous job, > which the job result stored in HA does not let you submit it with the same > job_id. > > BR, > Naci > > On 28. Aug 2024, at 17:32, Arthur Catrisse via user <user@flink.apache.org> > wrote: > > > Hello, > > We are running into issues when deploying flink on kubernetes using the > flink-kubernetes-operator with a FlinkDeployment > Occasionally, when a *JobManager* gets rotated out (by karpenter in our > case), the next JobManager is incapable of getting into a stable state and > is stuck in a crash loop by a DuplicateJobSubmissionException > We did increase the terminationGracePeriodSeconds but it doesn't seem to > help. > Is it expected that the operator isn't able to get jobmanagers back into a > stable state ? Do you have an idea if we're missing something ? > > *Here are our flink configurations :* > > kubernetes.jobmanager.replicas, 1 > execution.submit-failed-job-on-application-error, true > high-availability.cluster-id, my_name > kubernetes.jobmanager.cpu.limit-factor, 10 pipeline.max-parallelism, 6 > kubernetes.service-account, flink kubernetes.cluster-id, my_name > high-availability.storageDir, s3://my_bucket/recovery > taskmanager.memory.flink.size, 1024m parallelism.default, 1 > kubernetes.namespace, flink fs.s3a.aws.credentials.provider, > com.amazonaws.auth.DefaultAWSCredentialsProviderChain > kubernetes.jobmanager.owner.reference, apiVersion: > flink.apache.org/v1beta1,kind:FlinkDeployment,uid:a42c1e37-8f5e-4ec0-a04f-00000,name:my_name,controller:false,blockOwnerDeletion:true > state.backend.type, rocksdb kubernetes.container.image.ref, > 00000.dkr.ecr.eu-west-3.amazonaws.com/data/my_image_ref > jobmanager.memory.flink.size, 1024m taskmanager.memory.process.size, 2048m > kubernetes.internal.jobmanager.entrypoint.class, > org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint > pipeline.name, my_name execution.savepoint.path, > s3://my_bucket/savepoints/savepoint-044d28-f5000c2e4bc0 > state.backend.local-recovery, false state.backend.rocksdb.localdir, > /opt/flink/state kubernetes.pod-template-file.taskmanager, > /tmp/flink_op_generated_podTemplate_9407366845247969567.yaml > state.backend.incremental, true web.cancel.enable, false > execution.shutdown-on-application-finish, false > job-result-store.delete-on-commit, false $internal.pipeline.job-id, > 044d28b712536c1d1feed3475f2b8111 taskmanager.memory.managed.fraction, 0.6 > $internal.flink.version, v1_19 > execution.checkpointing.max-concurrent-checkpoints, 1 > kubernetes.pod-template-file.jobmanager, > /tmp/flink_op_generated_podTemplate_834737432685891333.yaml > blob.server.port, 6102 kubernetes.jobmanager.annotations, > flinkdeployment.flink.apache.org/generation:5 > job-result-store.storage-path > <http://flinkdeployment.flink.apache.org/generation:5%0Djob-result-store.storage-path>, > > s3://my_bucket/recovery/job-result-store/my_name/9cf5a2e7-89c6-40e7-94dd-c272a2007000 > fs.allowed-fallback-filesystems, s3 high-availability.type, kubernetes > state.savepoints.dir, s3://my_bucket/savepoints > $internal.application.program-args, > -pyclientexec;/usr/bin/python3;-py;/opt/flink/usrlib/my_name.py;--restoreMode;CLAIM > taskmanager.numberOfTaskSlots, 2 kubernetes.rest-service.exposed.type, > ClusterIP high-availability.jobmanager.port, 6101 process.working-dir, > /tmp/workdir $internal.application.main, > org.apache.flink.client.python.PythonDriver execution.target, > kubernetes-application jobmanager.memory.process.size, 2048m > taskmanager.rpc.port, 6100 internal.cluster.execution-mode, NORMAL > kubernetes.jobmanager.tolerations, > key:dedicated,operator:Equal,value:low-churn,effect:NoSchedule > execution.checkpointing.externalized-checkpoint-retention, > RETAIN_ON_CANCELLATION pipeline.jars, > local:///opt/flink/opt/flink-python-1.19.0.jar state.checkpoints.dir, > s3://my_bucket/checkpoints jobmanager.memory.off-heap.size, 134217728b > jobmanager.memory.jvm-overhead.min, 805306368b > jobmanager.memory.jvm-metaspace.size, 268435456b > jobmanager.memory.heap.size, 939524096b jobmanager.memory.jvm-overhead.max, > 805306368b > *Example of logs when failing* > *Last pod before error :* > INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - RECEIVED > SIGNAL 15: SIGTERM. Shutting down as requested. > INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Shutting > KubernetesApplicationClusterEntrypoint down with application status > UNKNOWN. Diagnostics Cluster entrypoint has been closed externally.. > (nothing else after, while other times, when no fail, we have among others > terminating logs e.g > org.apache.pekko.remote.RemoteActorRefProvider$RemotingTerminator [] - > Shutting down remote daemon. > etc) > > > *DuplicateJobSubmission Error crashloop :* > 2024-08-26 09:16:36,385 INFO > org.apache.flink.client.deployment.application.executors.EmbeddedExecutor > [] - Job e75c26c4679eb40acd89ed064665f9bb is submitted. > 2024-08-26 09:16:36,385 INFO > org.apache.flink.client.deployment.application.executors.EmbeddedExecutor > [] - Submitting Job with JobId=e75c26c4679eb40acd89ed064665f9bb. > 2024-08-26 09:16:36,510 INFO > org.apache.flink.runtime.blob.FileSystemBlobStore [] - Creating highly > available BLOB storage directory at s3://my_bucket/recovery/my-job-name/blob > 2024-08-26 09:16:36,603 WARN org.apache.hadoop.fs.s3a.S3ABlockOutputStream > [] - Application invoked the Syncable API against stream writing to > recovery/my-job-name/blob/job_e75c26c4679eb40acd89ed064665f9bb/blob_p- > dfdbbf7b0120ec73a562e6841144da852ede93ef-4db3fad9c1a552ff3e784386310bf022. > This is unsupported > 2024-08-26 09:16:36,614 WARN com.amazonaws.services.s3.internal.Mimetypes > [] - Unable to find 'mime.types' file in classpath > 2024-08-26 09:16:37,178 INFO > org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Received > JobGraph submission 'my-job-name' (e75c26c4679eb40acd89ed064665f9bb). > 2024-08-26 09:16:37,299 WARN > org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Ignoring > JobGraph submission 'my-job-name' (e75c26c4679eb40acd89ed064665f9bb) > because the job already reached a globally-terminal state (i.e. FAILED, > CANCELED, FINISHED) in a previous execution. > Traceback (most recent call last): > 2024-08-26 09:16:37,304 INFO org.apache.flink.client.python.PythonDriver > [] - Traceback (most recent call last): > File "/opt/flink/usrlib/my_job_name.py", line 159, in <module> > 2024-08-26 09:16:37,304 INFO org.apache.flink.client.python.PythonDriver > [] - File "/opt/flink/usrlib/my_job_name.py", line 159, in <module> > publish_my_job_name() > 2024-08-26 09:16:37,304 INFO org.apache.flink.client.python.PythonDriver > [] - publish_my_job_name() > File "/opt/flink/usrlib/my_job_name.py", line 127, in publish_my_job_name > 2024-08-26 09:16:37,304 INFO org.apache.flink.client.python.PythonDriver > [] - File "/opt/flink/usrlib/my_job_name.py", line 127, in > publish_my_job_name > table_env.execute_sql(""" > 2024-08-26 09:16:37,304 INFO org.apache.flink.client.python.PythonDriver > [] - table_env.execute_sql(""" > File > "/opt/flink/opt/python/pyflink.zip/pyflink/table/table_environment.py", > line 837, in execute_sql > 2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver > [] - File > "/opt/flink/opt/python/pyflink.zip/pyflink/table/table_environment.py", > line 837, in execute_sql > File "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", > line 1322, in __call__ > 2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver > [] - File > "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line > 1322, in __call__ > File "/opt/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py", line > 158, in deco > 2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver > [] - File "/opt/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py", > line 158, in deco > pyflink.util.exceptions.TableException: > org.apache.flink.table.api.TableException: Failed to execute sql > 2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver > [] - pyflink.util.exceptions.TableException: > org.apache.flink.table.api.TableException: Failed to execute sql > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1060) > 2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver > [] - at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal( > TableEnvironmentImpl.java:1060) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:876) > 2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver > [] - at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal( > TableEnvironmentImpl.java:876) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1112) > 2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver > [] - at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal( > TableEnvironmentImpl.java:1112) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:735) > 2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver > [] - at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql( > TableEnvironmentImpl.java:735) > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > 2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver > [] - at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown > Source) > 2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver > [] - at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown > Source) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown > Source) > 2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver > [] - at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown > Source) > at java.base/java.lang.reflect.Method.invoke(Unknown Source) > 2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver > [] - at java.base/java.lang.reflect.Method.invoke(Unknown Source) > at > org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > 2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver > [] - at > org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke( > MethodInvoker.java:244) > at > org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374) > 2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver > [] - at > org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke > (ReflectionEngine.java:374) > at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) > 2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver > [] - at org.apache.flink.api.python.shaded.py4j.Gateway.invoke( > Gateway.java:282) > at > org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > 2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver > [] - at > org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod > (AbstractCommand.java:132) > at > org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) > 2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver > [] - at > org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute( > CallCommand.java:79) > at > org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) > 2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver > [] - at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run( > GatewayConnection.java:238) > at java.base/java.lang.Thread.run(Unknown Source) > 2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver > [] - at java.base/java.lang.Thread.run(Unknown Source) > Caused by: org.apache.flink.util.FlinkException: Failed to execute job > 'my-job-name'. > 2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver > [] - Caused by: org.apache.flink.util.FlinkException: Failed to execute > job 'my-job-name'. > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2455) > 2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver > [] - at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync > (StreamExecutionEnvironment.java:2455) > at > org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:188) > 2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver > [] - at > org.apache.flink.client.program.StreamContextEnvironment.executeAsync( > StreamContextEnvironment.java:188) > at > org.apache.flink.table.planner.delegation.DefaultExecutor.executeAsync(DefaultExecutor.java:110) > 2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver > [] - at > org.apache.flink.table.planner.delegation.DefaultExecutor.executeAsync( > DefaultExecutor.java:110) > at > org.apache.flink.table.executor.python.ChainingOptimizingExecutor.executeAsync(ChainingOptimizingExecutor.java:88) > 2024-08-26 09:16:37,305 INFO org.apache.flink.client.python.PythonDriver > [] - at > org.apache.flink.table.executor.python.ChainingOptimizingExecutor.executeAsync > (ChainingOptimizingExecutor.java:88) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1032) > 2024-08-26 09:16:37,306 INFO org.apache.flink.client.python.PythonDriver > [] - at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal( > TableEnvironmentImpl.java:1032) > ... 14 more > 2024-08-26 09:16:37,306 INFO org.apache.flink.client.python.PythonDriver > [] - ... 14 more > Caused by: org.apache.flink.runtime.client.JobSubmissionException: Job > has already been submitted. > 2024-08-26 09:16:37,306 INFO org.apache.flink.client.python.PythonDriver > [] - Caused by: > org.apache.flink.runtime.client.DuplicateJobSubmissionException: Job has > already been submitted. > at > org.apache.flink.runtime.client.DuplicateJobSubmissionException.ofGloballyTerminated(DuplicateJobSubmissionException.java:35) > > Thank you in advance. > Best regards, > Arthur > >