Hi Tison,

thanks for starting this discussion. I think your mail includes multiple
points which are worth being treated separately (might even make sense to
have separate discussion threads). Please correct me if I understood things
wrongly:

1. Adding new non-ha HAServices:

Based on your description I could see the "ZooKeeper-light" non-ha
HAServices implementation work. Would any changes to the existing
interfaces be needed? How would the LeaderServer integrate in the lifecycle
of the cluster entrypoint?

2. Replacing existing non-ha HAServices with LeaderServer implementation:

I'm not sure whether we need to enforce that every non-ha HAServices
implementation works as you've described. I think it is pretty much an
implementation detail whether the services talk to a LeaderServer or are
being started with a pre-configured address. I also think that it is fair
to have different implementations with different characteristics and usage
scenarios. As you've said the EmbeddedHaServices are targeted for single
process cluster setups and they are only used by the MiniCluster.

What I like about the StandaloneHaServices is that they are dead simple
(apart from the configuration). With a new implementation based on the
LeaderServer, the client side implementation becomes much more complex
because now one needs to handle all kind of network issues properly.
Moreover, it adds more complexity to the system because it starts a new
distributed component which needs to be managed. I could see that once the
new implementation has matured enough that it might replace the
EmbeddedHaServices. But I wouldn't start with removing them.

You are right that due to the fact that we don't know the JM address before
it's being started that we need to send the address with every slot
request. Moreover we have the method #getJobManagerLeaderRetriever(JobID,
defaultJMAddress) on the HAServices. While this is not super nice, I don't
think that this is a fundamental problem at the moment. What we pay is a
couple of extra bytes we need to send over the network.

Configuration-wise, I'm not so sure whether we gain too much by replacing
the StandaloneHaServices with the LeaderServer based implementation. For
the new implementation one needs to configure a static address as well at
cluster start-up time. The only benefit I can see is that we don't need to
send the JM address to the RM and TMs. But as I've said, I don't think that
this is a big problem for which we need to introduce new HAServices.
Instead I could see that we might be able to remove it once the
LeaderServer HAServices implementation has proven to be stable.

3. Configuration of HAServices:

I agree that Flink's address and port configuration is not done
consistently. I might make sense to group the address and port
configuration under the ha service configuration section. Maybe it makes
also sense to rename ha services into ServiceDiscovery because it also
works in the non-ha case. it could be possible to only configure address
and port if one is using the non-ha services, for example. However, this
definitely deserves a separate discussion and design because one needs to
check where exactly the respective configuration options are being used.

I think improving the configuration of HAServices is actually orthogonal to
introducing the LeaderServer HAServices implementation and could also be
done for the existing HAServices.

4. Clean up of HAServices implementations:

You are right that some of the existing HAServices implementations are
"dead code" at the moment. They are the result of some implementation ideas
which haven't been completed. I would suggest to start a separate
discussion to discuss what to do with them.

Cheers,
Till

On Mon, Sep 9, 2019 at 9:16 AM Zili Chen <wander4...@gmail.com> wrote:

> Hi devs,
>
> I'd like to start a discussion thread on the topic how we provide
> retrieval services in non-high-availability scenario. To clarify
> terminology, non-high-availability scenario refers to
> StandaloneHaServices and EmbeddedHaServices.
>
> ***The problem***
>
> We notice that retrieval services of current StandaloneHAServices
> (pre-configured) and EmbeddedHAServices(in-memory) has their
> respective problems.
>
> For pre-configured scenario, we now have a
> getJobManagerLeaderRetriever(JobID, defaultJMAddress) method
> to workaround the problem that it is impossible to configure JM
> address previously. The parameter defaultJMAddress is not in use in
> any other defaultJMAddress with any other high-availability mode.
> Also in MiniCluster scenario and anywhere else leader address
> pre-configure becomes impossible, StandaloneHAServices cannot be used.
>
> For in-memory case, it is clearly that it doesn't fit any distributed
> scenario.
>
> ***The proposal***
>
> In order to address the inconsistency between pre-configured retrieval
> services and zookeeper based retrieval services, we reconsider the
> promises provided by "non-high-availability" and regard it as
> similar services as zookeeper based one except it doesn't tolerate
> node failure. Thus, we implement a service acts like a standalone
> zookeeper cluster, named LeaderServer.
>
> A leader server is an actor runs on jobmanager actor system and reacts
> to leader contender register and leader retriever request. If
> jobmanager fails, the leader server associated fails, too, where
> "non-high-availability" stands.
>
> In order to communicate with leader server, we start leader client per
> high-availability services(JM, TM, ClusterClient). When leader
> election service starts, it registers the contender to leader server
> via leader client(by akka communication); when leader retriever
> starts, it registers itself to leader server via leader client.
>
> Leader server handles leader election internally just like Embedded
> implementation, and notify retrievers with new leader information
> when there is new leader elected.
>
> In this way, we unify the view of retrieval services in all scenario:
>
> 1. Configure a name services to communicate with. In zookeeper mode
> it is zookeeper and in non-high-availability mode it is leader server.
> 2. Any retrieval request is sent to the name services and is handled
> by that services.
>
> Apart from a unified view, there are other advantages:
>
> + We need not to use a special method
> getJobManagerLeaderRetriever(JobID, defaultJMAddress), instead, use
> getJobManagerLeaderRetriever(JobID). And so that we need not include
> JobManager address in slot request which might become stale during
> transmission.
>
> + Separated configuration concerns on launch and retrieval. JobManager
> address & port, REST address & port is only configured when launch
> a cluster(even in YARN scenario, no need to configure). And when
> retrieval requested, configure the connect info to name services(zk
> or leader server).
>
> + Embedded implementation could be also included in this abstraction
> without any regression on multiple leader simulation for test purpose.
> Actually, leader server acts as a limited standalone zookeeper
> cluster. And thus, from where this proposal comes from, when we
> refactor metadata storage with transaction store proposed in
> FLINK-10333, we only take care of zookeeper implementation and a
> unified non-high-availability implementation.
>
> ***Clean up***
>
> It is also noticed that there are several stale & unimplemented
> high-availability services implementations which I'd like to remove for
> a clean codebase work on this thread and FLINK-10333. They are:
>
> - YarnHighAvailabilityServices
> - AbstractYarnNonHaServices
> - YarnIntraNonHaMasterServices
> - YarnPreConfiguredMasterNonHaServices
> - SingleLeaderElectionService
> - FsNegativeRunningJobsRegistry
>
> Any feedback is appreciated.
>
> Best,
> tison.
>

Reply via email to