Hi, After some discussion with Wang Yang offline, it seems that there might be a jobmanager failover. So would you like to share full jobmanager log? Best, Guowei
On Wed, Mar 24, 2021 at 10:04 PM Lukáš Drbal <lukas.dr...@gmail.com> wrote: > Hi, > > I would like to use native kubernetes execution [1] for one batch job and > let scheduling on kubernetes. Flink version: 1.12.2. > > Kubernetes job: > apiVersion: batch/v1beta1 > kind: CronJob > metadata: > name: scheduled-job > spec: > schedule: "*/1 * * * *" > jobTemplate: > spec: > template: > metadata: > labels: > app: super-flink-batch-job > spec: > containers: > - name: runner > image: localhost:5000/batch-flink-app-v3:latest > imagePullPolicy: Always > command: > - /bin/sh > - -c > - /opt/flink/bin/flink run-application --target > kubernetes-application -Dkubernetes.service-account=flink-service-account > -Dkubernetes.rest-service.exposed.type=NodePort > -Dkubernetes.cluster-id=batch-job-cluster > -Dkubernetes.container.image=localhost:5000/batch-flink-app-v3:latest > -Ds3.endpoint=http://minio-1616518256:9000 -Ds3.access-key=ACCESSKEY > -Ds3.secret-key=SECRETKEY > -Djobmanager.archive.fs.dir=s3://flink/completed-jobs/ > -Ds3.path-style-access=true -Ds3.ssl.enabled=false > -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory > -Dhigh-availability.storageDir=s3://flink/flink-ha > local:///opt/flink/usrlib/job.jar > restartPolicy: OnFailure > > > This works well for me but I would like to write the result to the archive > path and show it in the History server (running as separate deployment in > k8) > > Anytime it creates JobId=00000000000000000000000000000000 which obviously > leads to > > Caused by: java.util.concurrent.ExecutionException: > org.apache.flink.runtime.client.DuplicateJobSubmissionException: Job has > already been submitted. > at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > ~[?:1.8.0_282] > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) > ~[?:1.8.0_282] > at > org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1056) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] > at > org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:129) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] > at > org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:70) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] > at > org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:942) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] > at org.apache.flink.api.java.DataSet.collect(DataSet.java:417) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] > at org.apache.flink.api.java.DataSet.print(DataSet.java:1748) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] > at > org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:96) > ~[?:?] > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > ~[?:1.8.0_282] > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > ~[?:1.8.0_282] > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > ~[?:1.8.0_282] > at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_282] > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] > at > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] > at > org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:242) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] > ... 10 more > > I assume it is because it will spawn a completely new cluster for each run. > > Can I somehow set jobId or I'm trying to do something unsupported/bad? > > Thanks for advice. > > L. > > [1] > https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/native_kubernetes.html >