Hi, Thanks for providing the logs. From the logs this is a known bug.[1] Maybe you could use `$internal.pipeline.job-id` to set your own job-id.(Thanks to Wang Yang) But keep in mind this is only for internal use and may be changed in some release. So you should keep an eye on [1] for the correct solution.
[1] https://issues.apache.org/jira/browse/FLINK-19358 Best, Guowei On Thu, Mar 25, 2021 at 5:31 PM Lukáš Drbal <lukas.dr...@gmail.com> wrote: > Hello, > > sure. Here is log from first run which succeed - > https://pastebin.com/tV75ZS5S > and here is from second run (it's same for all next) - > https://pastebin.com/pwTFyGvE > > My Docker file is pretty simple, just take wordcount + S3 > > FROM flink:1.12.2 > > RUN mkdir -p $FLINK_HOME/usrlib > COPY flink-examples-batch_2.12-1.12.2-WordCount.jar > $FLINK_HOME/usrlib/wordcount.jar > > RUN mkdir -p ${FLINK_HOME}/plugins/s3-fs-presto > COPY flink-s3-fs-presto-1.12.2.jar $FLINK_HOME/plugins/s3-fs-presto/ > > Thanks! > > On Thu, Mar 25, 2021 at 9:24 AM Guowei Ma <guowei....@gmail.com> wrote: > >> Hi, >> After some discussion with Wang Yang offline, it seems that there might >> be a jobmanager failover. So would you like to share full jobmanager log? >> Best, >> Guowei >> >> >> On Wed, Mar 24, 2021 at 10:04 PM Lukáš Drbal <lukas.dr...@gmail.com> >> wrote: >> >>> Hi, >>> >>> I would like to use native kubernetes execution [1] for one batch job >>> and let scheduling on kubernetes. Flink version: 1.12.2. >>> >>> Kubernetes job: >>> apiVersion: batch/v1beta1 >>> kind: CronJob >>> metadata: >>> name: scheduled-job >>> spec: >>> schedule: "*/1 * * * *" >>> jobTemplate: >>> spec: >>> template: >>> metadata: >>> labels: >>> app: super-flink-batch-job >>> spec: >>> containers: >>> - name: runner >>> image: localhost:5000/batch-flink-app-v3:latest >>> imagePullPolicy: Always >>> command: >>> - /bin/sh >>> - -c >>> - /opt/flink/bin/flink run-application --target >>> kubernetes-application -Dkubernetes.service-account=flink-service-account >>> -Dkubernetes.rest-service.exposed.type=NodePort >>> -Dkubernetes.cluster-id=batch-job-cluster >>> -Dkubernetes.container.image=localhost:5000/batch-flink-app-v3:latest >>> -Ds3.endpoint=http://minio-1616518256:9000 -Ds3.access-key=ACCESSKEY >>> -Ds3.secret-key=SECRETKEY >>> -Djobmanager.archive.fs.dir=s3://flink/completed-jobs/ >>> -Ds3.path-style-access=true -Ds3.ssl.enabled=false >>> -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory >>> -Dhigh-availability.storageDir=s3://flink/flink-ha >>> local:///opt/flink/usrlib/job.jar >>> restartPolicy: OnFailure >>> >>> >>> This works well for me but I would like to write the result to the >>> archive path and show it in the History server (running as separate >>> deployment in k8) >>> >>> Anytime it creates JobId=00000000000000000000000000000000 which >>> obviously leads to >>> >>> Caused by: java.util.concurrent.ExecutionException: >>> org.apache.flink.runtime.client.DuplicateJobSubmissionException: Job has >>> already been submitted. >>> at >>> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) >>> ~[?:1.8.0_282] >>> at >>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) >>> ~[?:1.8.0_282] >>> at >>> org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1056) >>> ~[flink-dist_2.12-1.12.2.jar:1.12.2] >>> at >>> org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:129) >>> ~[flink-dist_2.12-1.12.2.jar:1.12.2] >>> at >>> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:70) >>> ~[flink-dist_2.12-1.12.2.jar:1.12.2] >>> at >>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:942) >>> ~[flink-dist_2.12-1.12.2.jar:1.12.2] >>> at org.apache.flink.api.java.DataSet.collect(DataSet.java:417) >>> ~[flink-dist_2.12-1.12.2.jar:1.12.2] >>> at org.apache.flink.api.java.DataSet.print(DataSet.java:1748) >>> ~[flink-dist_2.12-1.12.2.jar:1.12.2] >>> at >>> org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:96) >>> ~[?:?] >>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>> ~[?:1.8.0_282] >>> at >>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>> ~[?:1.8.0_282] >>> at >>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> ~[?:1.8.0_282] >>> at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_282] >>> at >>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349) >>> ~[flink-dist_2.12-1.12.2.jar:1.12.2] >>> at >>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219) >>> ~[flink-dist_2.12-1.12.2.jar:1.12.2] >>> at >>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) >>> ~[flink-dist_2.12-1.12.2.jar:1.12.2] >>> at >>> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:242) >>> ~[flink-dist_2.12-1.12.2.jar:1.12.2] >>> ... 10 more >>> >>> I assume it is because it will spawn a completely new cluster for each >>> run. >>> >>> Can I somehow set jobId or I'm trying to do something unsupported/bad? >>> >>> Thanks for advice. >>> >>> L. >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/native_kubernetes.html >>> >>