Just wanted to throw in a couple more details here from what I have learned from working with Kubernetes.
All processes restart (a lost JobManager restarts eventually). Should be given in Kubernetes: * This works very well, we run multiple jobs with a single Jobmanager and Flink/Kubernetes recovers quite well. A way for TaskManagers to discover the restarted JobManager. Should work via Kubernetes as well (restarted containers retain the external hostname): * We use StatefulSets which provide a DNS based discovery mechanism. Provided DNS is set up correctly with TTLs this works well. You could also leverage the built-in Kubernetes services if you are only running a single Job Manager. Kubernetes will just route the traffic to the single pod. This works fine with a single Job Manager (I have tested it). However multiple Job Managers won’t work because Kubernetes will route this round-robin to the Job Managers A way to isolate different "leader sessions" against each other. Flink currently uses ZooKeeper to also attach a "leader session ID" to leader election, which is a fencing token to avoid that processes talk to each other despite having different views on who is the leader, or whether the leaser lost and re-gained leadership: * This is probably the most difficult thing. You could leverage the built in ETCD cluster. Connecting directly to the Kubernetes ETCD database directly is probably a bad idea however. You should be able to create a counter using the PATCH API that Kubernetes supplies in the API which follows: https://tools.ietf.org/html/rfc6902 you could probably leverage https://tools.ietf.org/html/rfc6902#section-4.6 to allow for atomic updates to counters. Combining this with: https://kubernetes.io/docs/concepts/api-extension/custom-resources/#custom-resources should give a good way to work with ETCD without actually connecting directly to the Kubernetes ETCD directly. This integration would require modifying the Job Manager leader election code. A distributed atomic counter for the checkpoint ID. This is crucial to ensure correctness of checkpoints in the presence of JobManager failures and re-elections or split-brain situations. * This is very similar to the above, we should be able to accomplish that through the PATCH API combined with update if condition. If you don’t want to actually rip way into the code for the Job Manager the ETCD Operator<https://github.com/coreos/etcd-operator> would be a good way to bring up an ETCD cluster that is separate from the core Kubernetes ETCD database. Combined with zetcd you could probably have that up and running quickly. Thanks, James Bucher From: Hao Sun <ha...@zendesk.com<mailto:ha...@zendesk.com>> Date: Monday, August 21, 2017 at 9:45 AM To: Stephan Ewen <se...@apache.org<mailto:se...@apache.org>>, Shannon Carey <sca...@expedia.com<mailto:sca...@expedia.com>> Cc: "user@flink.apache.org<mailto:user@flink.apache.org>" <user@flink.apache.org<mailto:user@flink.apache.org>> Subject: Re: Flink HA with Kubernetes, without Zookeeper Thanks Shannon for the https://github.com/coreos/zetcd tips, I will check that out and share my results if we proceed on that path. Thanks Stephan for the details, this is very useful, I was about to ask what exactly is stored into zookeeper, haha. On Mon, Aug 21, 2017 at 9:31 AM Stephan Ewen <se...@apache.org<mailto:se...@apache.org>> wrote: Hi! That is a very interesting proposition. In cases where you have a single master only, you may bet away with quite good guarantees without ZK. In fact, Flink does not store significant data in ZK at all, it only uses locks and counters. You can have a setup without ZK, provided you have the following: - All processes restart (a lost JobManager restarts eventually). Should be given in Kubernetes. - A way for TaskManagers to discover the restarted JobManager. Should work via Kubernetes as well (restarted containers retain the external hostname) - A way to isolate different "leader sessions" against each other. Flink currently uses ZooKeeper to also attach a "leader session ID" to leader election, which is a fencing token to avoid that processes talk to each other despite having different views on who is the leader, or whether the leaser lost and re-gained leadership. - An atomic marker for what is the latest completed checkpoint. - A distributed atomic counter for the checkpoint ID. This is crucial to ensure correctness of checkpoints in the presence of JobManager failures and re-elections or split-brain situations. I would assume that etcd can provide all of those services. The best way to integrate it would probably be to add an implementation of Flink's "HighAvailabilityServices" based on etcd. Have a look at this class: https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java If you want to contribute an extension of Flink using etcd, that would be awesome. This should have a FLIP though, and a plan on how to set up rigorous unit testing of that implementation (because its correctness is very crucial to Flink's HA resilience). Best, Stephan On Mon, Aug 21, 2017 at 4:15 PM, Shannon Carey <sca...@expedia.com<mailto:sca...@expedia.com>> wrote: Zookeeper should still be necessary even in that case, because it is where the JobManager stores information which needs to be recovered after the JobManager fails. We're eyeing https://github.com/coreos/zetcd as a way to run Zookeeper on top of Kubernetes' etcd cluster so that we don't have to rely on a separate Zookeeper cluster. However, we haven't tried it yet. -Shannon From: Hao Sun <ha...@zendesk.com<mailto:ha...@zendesk.com>> Date: Sunday, August 20, 2017 at 9:04 PM To: "user@flink.apache.org<mailto:user@flink.apache.org>" <user@flink.apache.org<mailto:user@flink.apache.org>> Subject: Flink HA with Kubernetes, without Zookeeper Hi, I am new to Flink and trying to bring up a Flink cluster on top of Kubernetes. For HA setup, with kubernetes, I think I just need one job manager and do not need Zookeeper? I will store all states to S3 buckets. So in case of failure, kubernetes can just bring up a new job manager without losing anything? I want to confirm my assumptions above make sense. Thanks