Hello, sure. Here is log from first run which succeed - https://pastebin.com/tV75ZS5S and here is from second run (it's same for all next) - https://pastebin.com/pwTFyGvE
My Docker file is pretty simple, just take wordcount + S3 FROM flink:1.12.2 RUN mkdir -p $FLINK_HOME/usrlib COPY flink-examples-batch_2.12-1.12.2-WordCount.jar $FLINK_HOME/usrlib/wordcount.jar RUN mkdir -p ${FLINK_HOME}/plugins/s3-fs-presto COPY flink-s3-fs-presto-1.12.2.jar $FLINK_HOME/plugins/s3-fs-presto/ Thanks! On Thu, Mar 25, 2021 at 9:24 AM Guowei Ma <guowei....@gmail.com> wrote: > Hi, > After some discussion with Wang Yang offline, it seems that there might be > a jobmanager failover. So would you like to share full jobmanager log? > Best, > Guowei > > > On Wed, Mar 24, 2021 at 10:04 PM Lukáš Drbal <lukas.dr...@gmail.com> > wrote: > >> Hi, >> >> I would like to use native kubernetes execution [1] for one batch job and >> let scheduling on kubernetes. Flink version: 1.12.2. >> >> Kubernetes job: >> apiVersion: batch/v1beta1 >> kind: CronJob >> metadata: >> name: scheduled-job >> spec: >> schedule: "*/1 * * * *" >> jobTemplate: >> spec: >> template: >> metadata: >> labels: >> app: super-flink-batch-job >> spec: >> containers: >> - name: runner >> image: localhost:5000/batch-flink-app-v3:latest >> imagePullPolicy: Always >> command: >> - /bin/sh >> - -c >> - /opt/flink/bin/flink run-application --target >> kubernetes-application -Dkubernetes.service-account=flink-service-account >> -Dkubernetes.rest-service.exposed.type=NodePort >> -Dkubernetes.cluster-id=batch-job-cluster >> -Dkubernetes.container.image=localhost:5000/batch-flink-app-v3:latest >> -Ds3.endpoint=http://minio-1616518256:9000 -Ds3.access-key=ACCESSKEY >> -Ds3.secret-key=SECRETKEY >> -Djobmanager.archive.fs.dir=s3://flink/completed-jobs/ >> -Ds3.path-style-access=true -Ds3.ssl.enabled=false >> -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory >> -Dhigh-availability.storageDir=s3://flink/flink-ha >> local:///opt/flink/usrlib/job.jar >> restartPolicy: OnFailure >> >> >> This works well for me but I would like to write the result to the >> archive path and show it in the History server (running as separate >> deployment in k8) >> >> Anytime it creates JobId=00000000000000000000000000000000 which obviously >> leads to >> >> Caused by: java.util.concurrent.ExecutionException: >> org.apache.flink.runtime.client.DuplicateJobSubmissionException: Job has >> already been submitted. >> at >> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) >> ~[?:1.8.0_282] >> at >> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) >> ~[?:1.8.0_282] >> at >> org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1056) >> ~[flink-dist_2.12-1.12.2.jar:1.12.2] >> at >> org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:129) >> ~[flink-dist_2.12-1.12.2.jar:1.12.2] >> at >> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:70) >> ~[flink-dist_2.12-1.12.2.jar:1.12.2] >> at >> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:942) >> ~[flink-dist_2.12-1.12.2.jar:1.12.2] >> at org.apache.flink.api.java.DataSet.collect(DataSet.java:417) >> ~[flink-dist_2.12-1.12.2.jar:1.12.2] >> at org.apache.flink.api.java.DataSet.print(DataSet.java:1748) >> ~[flink-dist_2.12-1.12.2.jar:1.12.2] >> at >> org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:96) >> ~[?:?] >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> ~[?:1.8.0_282] >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> ~[?:1.8.0_282] >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> ~[?:1.8.0_282] >> at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_282] >> at >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349) >> ~[flink-dist_2.12-1.12.2.jar:1.12.2] >> at >> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219) >> ~[flink-dist_2.12-1.12.2.jar:1.12.2] >> at >> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) >> ~[flink-dist_2.12-1.12.2.jar:1.12.2] >> at >> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:242) >> ~[flink-dist_2.12-1.12.2.jar:1.12.2] >> ... 10 more >> >> I assume it is because it will spawn a completely new cluster for each >> run. >> >> Can I somehow set jobId or I'm trying to do something unsupported/bad? >> >> Thanks for advice. >> >> L. >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/native_kubernetes.html >> >