Hi,

how did you start the job masters? Could you maybe share the logs of all
components? It looks as if the leader election is not working properly. One
thing to make sure is that you specify for every new HA cluster a different
cluster ID via `high-availability.cluster-id: cluster_xy`. That way you
separate the ZNodes in ZooKeeper so that every cluster uses their own nodes
and does not interfere with other clusters. Usually this happens via the
JobID but in the case of the `StandaloneJobClusterEntrypoint` we set it to
0. More recently, this was slightly changed. See
https://issues.apache.org/jira/browse/FLINK-12617 for more information.

Cheers,
Till

On Mon, Jul 1, 2019 at 11:36 AM Alex.Hu <huyc...@163.com> wrote:

> *Hi,All:*
>
> *   I found some problems about on kubernates flink of 1.6.0 mentioned by
> Till in "HA for 1.6.0 job cluster with docker-compose" in the email list,
> but I found that Jira of flink-10291 in the email has been shut down in
> 1.7.0, and I also found similar errors in on kubernates flink of 1.7.2 at
> present. Could you please help me check the Settings where I have problems?
> Here are my Settings:*
> web.log.path: /var/log/flink/flinkweb.log
> taskmanager.log.pth: /var/log/flink/taskmanager/task.log
>
> jobmanager.rpc.address: tdh2
> jobmanager.rpc.port: 16223
> jobstore.cache-size: 5368709120
> jobstore.expiration-time: 864000
> jobmanager.heap.size: 4096m
>
> taskmanager.heap.size:  6000m
> taskmanager.numberOfTaskSlots: 6
> parallelism.default: 2
>
> high-availability: zookeeper
> high-availability.storageDir: hdfs:///flink1/ha/
> high-availability.zookeeper.quorum: tdh2:2181,tdh4:2181,tdh3:2181
> high-availability.zookeeper.path.root: /flink
> high-availability.zookeeper.client.acl: open
> high-availability.jobmanager.port: 62236-62239
>
> rest.port: 18801
> io.tmp.dirs: /data/disk1:/data/disk2:/data/disk3:/data/disk4:/data/disk5
>
> security.kerberos.login.use-ticket-cache: true
> security.kerberos.login.contexts: Client
> security.kerberos.login.keytab: /etc/flink/conf/hdfs.keytab
> security.kerberos.login.principal: hdfs
>
> blob.server.port: 16224
> query.server.port: 16225
>
>
>    *And the following is the new error report, the earliest error report
> in the forwarded email message:*
> apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing token
> not set: Ignoring message
> LocalFencedMessage(98b7a69a48c04a9ca01b1eca2b714146,
> LocalRpcInvocation(requestRestAddress(Time))) sent to 
> akka.tcp://flink@tdh2:62236/user/dispatcher
> because the fencing token is null.
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:59)
> ... 14 common frames omitted
> 2019-07-01 17:10:40.159 [flink-rest-server-netty-worker-thread-39] ERROR
> o.a.f.r.rest.handler.legacy.files.StaticFileServerHandler  - Could not
> retrieve the redirect address.
> java.util.concurrent.CompletionException:
> org.apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing
> token not set: Ignoring message
> LocalFencedMessage(98b7a69a48c04a9ca01b1eca2b714146,
> LocalRpcInvocation(requestRestAddress(Time))) sent to 
> akka.tcp://flink@tdh2:62236/user/dispatcher
> because the fencing token is null.
> at
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
> at
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
> at
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
> at
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> at
> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:772)
> at akka.dispatch.OnComplete.internal(Future.scala:258)
> at akka.dispatch.OnComplete.internal(Future.scala:256)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
> at
> org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
> at
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
> at
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:534)
> at akka.actor.ActorRef.tell(ActorRef.scala:130)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.sendErrorIfSender(AkkaRpcActor.java:371)
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:57)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
> at
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: org.apache.flink.runtime.rpc.exceptions.FencingTokenException:
> Fencing token not set: Ignoring message
> LocalFencedMessage(98b7a69a48c04a9ca01b1eca2b714146,
> LocalRpcInvocation(requestRestAddress(Time))) sent to 
> akka.tcp://flink@tdh2:62236/user/dispatcher
> because the fencing token is null.
>
>
>
>
>
>
> -------- Forwarding messages --------
> From: "Alex.Hu" <huyc...@163.com>
> Date: 2019-07-01 10:17:15
> To: "Yang Wang" <danrtsey...@gmail.com>
> Cc: user <user@flink.apache.org>
> Subject: Re:Re: About "Flink 1.7.0 HA based on zookeepers "
> Hi,Wang:
>     Thank you very much for answering my question!
>     I hope to start multiple jobmanagers on kubernete, because according
> to my understanding in the document of jobmanager_high_availability, the
> standalone mode should be that the jobmanager can be replaced seamlessly
> when the fault node goes down through two jobmanagers nodes.
>     In my follow-up test last Friday, I did use the ability to enable only
> one jobmanager in kubernete with the jobmanager_high_availability mode
> turned on, and to set multiple nodes as jobmanager tags to allow kubernate
> to enable the failed node to automatically transition node startup. But
> there will still be some switching time. So I am not sure whether flink in
> kubernate can achieve the above setting of jobmanager seamless hot-swap?
>
>
> At 2019-06-28 14:11:27, "Yang Wang" <danrtsey...@gmail.com> wrote:
>
> Hi, hu
>
> I am not sure why do you need to start multiple jobmanagers on kubernetes.
> Just as the manual [1], we use a deployment of 1 to make sure kubernetes
> detect the crash of jobmanager and start a new one. What we should do is to
> add the high availability configurations [2] in flink-conf.yaml. You could
> use the configMap [3] to save your flink-conf.yaml and then mount into to
> jobmanager pod. Also you could update the flink-conf.yaml in your flink
> image.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/deployment/kubernetes.html
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/jobmanager_high_availability.html
> [3]
> https://kubernetes.io/docs/tasks/configure-pod-container/configure-pod-configmap/
>
> 胡逸才 <huyc...@163.com> 于2019年6月28日周五 上午11:09写道:
>
>> HI Tan:
>> I have the same problem with you when running "flink-1.7.2 ON KUBERNATE
>> HA" mode, may I ask if you have solved this problem? How? After I started
>> the two jobmanagers normally, when I tried to kill one of them, he could
>> not restart normally. Both jobmanagers reported this error. The specific
>> log is as follows:
>>
>>
>>
>>
>> 2019-06-28 09:57:57.253 [flink-akka.actor.default-dispatcher-4] WARN
>>  akka.remote.transport.netty.NettyTransport New I/O boss #3 - Remote
>> connection to [null] failed with java.net.ConnectException: Connection
>> refused: tdh2/192.168.208.55:56529
>> 2019-06-28 09:57:57.253 [flink-akka.actor.default-dispatcher-4] WARN
>>  akka.remote.ReliableDeliverySupervisor
>> flink-akka.remote.default-remote-dispatcher-14 - Association with remote
>> system [akka.tcp://flink@tdh2:56529] has failed, address is now gated
>> for [50] ms. Reason: [Association failed with [akka.tcp://flink@tdh2:56529]]
>> Caused by: [Connection refused: tdh2/192.168.208.55:56529]
>> 2019-06-28 09:57:57.253 [flink-akka.actor.default-dispatcher-4] WARN
>>  akka.remote.ReliableDeliverySupervisor
>> flink-akka.remote.default-remote-dispatcher-14 - Association with remote
>> system [akka.tcp://flink@tdh2:56529] has failed, address is now gated
>> for [50] ms. Reason: [Association failed with [akka.tcp://flink@tdh2:56529]]
>> Caused by: [Connection refused: tdh2/192.168.208.55:56529]
>> 2019-06-28 09:57:57.260 [flink-rest-server-netty-worker-thread-7] ERROR
>> o.a.f.r.rest.handler.legacy.files.StaticFileServerHandler  - Could not
>> retrieve the redirect address.
>> java.util.concurrent.CompletionException:
>> akka.pattern.AskTimeoutException: Ask timed out on
>> [Actor[akka.tcp://flink@tdh2:56529/user/dispatcher#299521377]] after
>> [10000 ms]. Sender[null] sent message of type
>> "org.apache.flink.runtime.rpc.messages.RemoteFencedMessage".
>> at
>> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
>> at
>> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
>> at
>> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
>> at
>> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
>> at
>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>> at
>> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>> at
>> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:772)
>> at akka.dispatch.OnComplete.internal(Future.scala:258)
>> at akka.dispatch.OnComplete.internal(Future.scala:256)
>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>> at
>> org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
>> at
>> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
>> at
>> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
>> at
>> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:603)
>> at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
>> at
>> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
>> at
>> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
>> at
>> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
>> at
>> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
>> at
>> akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
>> at
>> akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
>> at
>> akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: akka.pattern.AskTimeoutException: Ask timed out on
>> [Actor[akka.tcp://flink@tdh2:56529/user/dispatcher#299521377]] after
>> [10000 ms]. Sender[null] sent message of type
>> "org.apache.flink.runtime.rpc.messages.RemoteFencedMessage".
>> at
>> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)
>> ... 9 common frames omitted
>>
>>
>>
>>
>
>
>
>
>
>
>

Reply via email to