大家好:
我在使用1.6.2的时候遇到一个问题,flink1.6.2,hadoop-2.7.6,Flink on
YARN环境,flink配置了HA,配置文件内容如下:
jobmanager.execution.failover-strategy: individual
state.backend: filesystem
state.backend.fs.checkpointdir: hdfs:///flink/checkpoints
high-availability: zookeeper
high-availability.zookeeper.quorum:
172.23.71.133:2181,172.23.71.135:2181,172.23.71.136:2181
high-availability.storageDir: hdfs:///flink/recovery
high-availability.zookeeper.path.root: /flink
yarn.application-attempts: 10
yarn.reallocate-failed: true
yarn.maximum-failed-containers: 10
main函数中的主要逻辑:
DataSource dataSource = new DataSource(args);
DataStream<Message> messageStream = env.addSource(dataSource);
messageStream.map(new FlowMapper()).keyBy(0).flatMap(new
FilterflowMapper()).addSink(new RedisSink());
env.execute();
其中FlatMap中使用了 MapState<String,Integer> mapState;
报错信息:
Starting execution of program
org.apache.flink.client.program.ProgramInvocationException: Could not retrieve
the execution result. (JobID: 246bb8240a70ce17683285bf87ba20a2)
at
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:261)
at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:486)
at
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1511)
at com.jd.ma.flink.TestMain.main(TestMain.java:39)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:426)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:816)
at
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:290)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:216)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1053)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1129)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1129)
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to
submit JobGraph.
at
org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:380)
at
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
at
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:203)
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
at
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
at
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Internal
server error., <Exception on server side:
java.util.concurrent.CompletionException:
org.apache.flink.runtime.client.JobSubmissionException: Failed to submit job.
at
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$submitJob$2(Dispatcher.java:267)
at
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
at
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:739)
at
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
at
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to
submit job.
... 24 more
Caused by: java.lang.RuntimeException:
org.apache.flink.runtime.client.JobExecutionException: Could not set up
JobManager
at
org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36)
at
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
... 4 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not set
up JobManager
at
org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:176)
at
org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:1058)
at
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:308)
at
org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
... 7 more
Caused by: org.apache.flink.util.FlinkRuntimeException: Incompatible failover
strategy - strategy 'Individual Task Restart' can only handle jobs with only
disconnected tasks.
at
org.apache.flink.runtime.executiongraph.failover.RestartIndividualStrategy.notifyNewVertices(RestartIndividualStrategy.java:142)
at
org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:857)
at
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:232)
at
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100)
at
org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1151)
at
org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1131)
at
org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:294)
at
org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:157)
... 10 more
End of exception on server side>]
at
org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:351)
at
org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:335)
at
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
at
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
... 4 more
The program didn't contain a Flink job. Perhaps you forgot to call execute() on
the execution environment.
如果我Flink配置文件中不用HA配置,任务提交正常,请问下知道是什么原因造成的吗?