Just wanted to throw in a couple more details here from what I have learned 
from working with Kubernetes.

All processes restart (a lost JobManager restarts eventually). Should be given 
in Kubernetes:

  *   This works very well, we run multiple jobs with a single Jobmanager and 
Flink/Kubernetes recovers quite well.

A way for TaskManagers to discover the restarted JobManager. Should work via 
Kubernetes as well (restarted containers retain the external hostname):

  *   We use StatefulSets which provide a DNS based discovery mechanism. 
Provided DNS is set up correctly with TTLs this works well. You could also 
leverage the built-in Kubernetes services if you are only running a single Job 
Manager. Kubernetes will just route the traffic to the single pod. This works 
fine with a single Job Manager (I have tested it). However multiple Job 
Managers won’t work because Kubernetes will route this round-robin to the Job 
Managers

A way to isolate different "leader sessions" against each other. Flink 
currently uses ZooKeeper to also attach a "leader session ID" to leader 
election, which is a fencing token to avoid that processes talk to each other 
despite having different views on who is the leader, or whether the leaser lost 
and re-gained leadership:

  *   This is probably the most difficult thing. You could leverage the built 
in ETCD cluster. Connecting directly to the Kubernetes ETCD database directly 
is probably a bad idea however. You should be able to create a counter using 
the PATCH API that Kubernetes supplies in the API which follows: 
https://tools.ietf.org/html/rfc6902 you could probably leverage 
https://tools.ietf.org/html/rfc6902#section-4.6 to allow for atomic updates to 
counters. Combining this with: 
https://kubernetes.io/docs/concepts/api-extension/custom-resources/#custom-resources
 should give a good way to work with ETCD without actually connecting directly 
to the Kubernetes ETCD directly. This integration would require modifying the 
Job Manager leader election code.

A distributed atomic counter for the checkpoint ID. This is crucial to ensure 
correctness of checkpoints in the presence of JobManager failures and 
re-elections or split-brain situations.

  *   This is very similar to the above, we should be able to accomplish that 
through the PATCH API combined with update if condition.

If you don’t want to actually rip way into the code for the Job Manager the 
ETCD Operator<https://github.com/coreos/etcd-operator> would be a good way to 
bring up an ETCD cluster that is separate from the core Kubernetes ETCD 
database. Combined with zetcd you could probably have that up and running 
quickly.

Thanks,
James Bucher

From: Hao Sun <ha...@zendesk.com<mailto:ha...@zendesk.com>>
Date: Monday, August 21, 2017 at 9:45 AM
To: Stephan Ewen <se...@apache.org<mailto:se...@apache.org>>, Shannon Carey 
<sca...@expedia.com<mailto:sca...@expedia.com>>
Cc: "user@flink.apache.org<mailto:user@flink.apache.org>" 
<user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Re: Flink HA with Kubernetes, without Zookeeper

Thanks Shannon for the https://github.com/coreos/zetcd tips, I will check that 
out and share my results if we proceed on that path.
Thanks Stephan for the details, this is very useful, I was about to ask what 
exactly is stored into zookeeper, haha.

On Mon, Aug 21, 2017 at 9:31 AM Stephan Ewen 
<se...@apache.org<mailto:se...@apache.org>> wrote:
Hi!

That is a very interesting proposition. In cases where you have a single master 
only, you may bet away with quite good guarantees without ZK. In fact, Flink 
does not store significant data in ZK at all, it only uses locks and counters.

You can have a setup without ZK, provided you have the following:

  - All processes restart (a lost JobManager restarts eventually). Should be 
given in Kubernetes.

  - A way for TaskManagers to discover the restarted JobManager. Should work 
via Kubernetes as well (restarted containers retain the external hostname)

  - A way to isolate different "leader sessions" against each other. Flink 
currently uses ZooKeeper to also attach a "leader session ID" to leader 
election, which is a fencing token to avoid that processes talk to each other 
despite having different views on who is the leader, or whether the leaser lost 
and re-gained leadership.

  - An atomic marker for what is the latest completed checkpoint.

  - A distributed atomic counter for the checkpoint ID. This is crucial to 
ensure correctness of checkpoints in the presence of JobManager failures and 
re-elections or split-brain situations.

I would assume that etcd can provide all of those services. The best way to 
integrate it would probably be to add an implementation of Flink's 
"HighAvailabilityServices" based on etcd.

Have a look at this class: 
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java

If you want to contribute an extension of Flink using etcd, that would be 
awesome.
This should have a FLIP though, and a plan on how to set up rigorous unit 
testing of that implementation (because its correctness is very crucial to 
Flink's HA resilience).

Best,
Stephan


On Mon, Aug 21, 2017 at 4:15 PM, Shannon Carey 
<sca...@expedia.com<mailto:sca...@expedia.com>> wrote:
Zookeeper should still be necessary even in that case, because it is where the 
JobManager stores information which needs to be recovered after the JobManager 
fails.

We're eyeing https://github.com/coreos/zetcd as a way to run Zookeeper on top 
of Kubernetes' etcd cluster so that we don't have to rely on a separate 
Zookeeper cluster. However, we haven't tried it yet.

-Shannon

From: Hao Sun <ha...@zendesk.com<mailto:ha...@zendesk.com>>
Date: Sunday, August 20, 2017 at 9:04 PM
To: "user@flink.apache.org<mailto:user@flink.apache.org>" 
<user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Flink HA with Kubernetes, without Zookeeper

Hi, I am new to Flink and trying to bring up a Flink cluster on top of 
Kubernetes.

For HA setup, with kubernetes, I think I just need one job manager and do not 
need Zookeeper? I will store all states to S3 buckets. So in case of failure, 
kubernetes can just bring up a new job manager without losing anything?

I want to confirm my assumptions above make sense. Thanks

Reply via email to