Hi,
After some discussion with Wang Yang offline, it seems that there might be
a jobmanager failover. So would you like to share full jobmanager log?
Best,
Guowei


On Wed, Mar 24, 2021 at 10:04 PM Lukáš Drbal <lukas.dr...@gmail.com> wrote:

> Hi,
>
> I would like to use native kubernetes execution [1] for one batch job and
> let scheduling on kubernetes. Flink version: 1.12.2.
>
> Kubernetes job:
> apiVersion: batch/v1beta1
> kind: CronJob
> metadata:
>   name: scheduled-job
> spec:
>   schedule: "*/1 * * * *"
>   jobTemplate:
>     spec:
>       template:
>         metadata:
>           labels:
>             app: super-flink-batch-job
>         spec:
>           containers:
>           - name: runner
>             image: localhost:5000/batch-flink-app-v3:latest
>             imagePullPolicy: Always
>             command:
>               - /bin/sh
>               - -c
>               - /opt/flink/bin/flink run-application --target
> kubernetes-application -Dkubernetes.service-account=flink-service-account
> -Dkubernetes.rest-service.exposed.type=NodePort
> -Dkubernetes.cluster-id=batch-job-cluster
> -Dkubernetes.container.image=localhost:5000/batch-flink-app-v3:latest
> -Ds3.endpoint=http://minio-1616518256:9000 -Ds3.access-key=ACCESSKEY
> -Ds3.secret-key=SECRETKEY
> -Djobmanager.archive.fs.dir=s3://flink/completed-jobs/
> -Ds3.path-style-access=true -Ds3.ssl.enabled=false
> -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
> -Dhigh-availability.storageDir=s3://flink/flink-ha
> local:///opt/flink/usrlib/job.jar
>           restartPolicy: OnFailure
>
>
> This works well for me but I would like to write the result to the archive
> path and show it in the History server (running as separate deployment in
> k8)
>
> Anytime it creates JobId=00000000000000000000000000000000 which obviously
> leads to
>
> Caused by: java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.client.DuplicateJobSubmissionException: Job has
> already been submitted.
> at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> ~[?:1.8.0_282]
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> ~[?:1.8.0_282]
> at
> org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1056)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
> at
> org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:129)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
> at
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:70)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
> at
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:942)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
> at org.apache.flink.api.java.DataSet.collect(DataSet.java:417)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
> at org.apache.flink.api.java.DataSet.print(DataSet.java:1748)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
> at
> org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:96)
> ~[?:?]
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> ~[?:1.8.0_282]
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> ~[?:1.8.0_282]
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> ~[?:1.8.0_282]
> at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_282]
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
> at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
> at
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:242)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
> ... 10 more
>
> I assume it is because it will spawn a completely new cluster for each run.
>
> Can I somehow set jobId or I'm trying to do something unsupported/bad?
>
> Thanks for advice.
>
> L.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/native_kubernetes.html
>

Reply via email to