This is the voting thread for PIP-121. It will stay open for at least 48

Pasted below for quoting convenience.

### Motivation
We have geo-replication to support Pulsar cluster level failover. We
can set up Pulsar cluster A as a primary cluster in data center A, and
setup Pulsar cluster B as backup cluster in data center B. Then we
configure geo-replication between cluster A and cluster B. All the
clients are connected to the Pulsar cluster by DNS. If cluster A is
down, we should switch the DNS to point the target Pulsar cluster from
cluster A to cluster B. After the clients are resolved to cluster B,
they can produce and consume messages normally. After cluster A
recovers, the administrator should switch the DNS back to cluster A.

However, the current method has two shortcomings.
1. The administrator should monitor the status of all Pulsar clusters,
and switch the DNS as soon as possible when cluster A is down. The
switch and recovery is not automatic and recovery time is controlled
by the administrator, which will put the administrator under heavy
2. The Pulsar client and DNS system have a cache. When the
administrator switches the DNS from cluster A to Cluster B, it will
take some time for cache trigger timeout, which will delay client
recovery time and lead to the product/consumer message failing.

### Goal
It's better to provide an automatic cluster level failure recovery
mechanism to make pulsar cluster failover more effective. We should
support pulsar clients auto switching from cluster A to cluster B when
it detects cluster A has been down according to the configured
detecting policy and switch back to cluster A when it has recovered.
The reason why we should switch back to cluster A is that most
applications may be deployed in data center A and they have low
network cost for communicating with pulsar cluster A. If they keep
visiting pulsar cluster B, they have high network cost, and cause high
produce/consume latency.

In order to improve the DNS cache problem, we should provide an
administrator controlled switch provider for administrators to update
service URLs.

In the end, we should provide an auto service URL switch provider and
administrator controlled switch provider.

### Design
We have already provided the `ServiceUrlProvider` interface to support
different service URLs. In order to support automatic cluster level
failure auto recovery, we can provide different ServiceUrlProvider
implementations. For current requirements, we can provide
`AutoClusterFailover` and `ControlledClusterFailover`.

#### AutoClusterFailover
In order to support auto switching from the primary cluster to the
secondary, we can provide a probe task, which will probe the activity
of the primary cluster and the secondary one. When it finds the
primary cluster failed more than `failoverDelayMs`, it will switch to
the secondary cluster by calling `updateServiceUrl`. After switching
to the secondary cluster, the `AutoClusterFailover` will continue to
probe the primary cluster. If the primary cluster comes back and
remains active for `switchBackDelayMs`, it will switch back to the
primary cluster.
The APIs are listed as follows.

In order to support multiple secondary clusters, use List to store
secondary cluster urls. When the primary cluster probe fails for
failoverDelayMs, it will start to probe the secondary cluster list one
by one, once it finds the active cluster, it will switch to the target
cluster. Notice: If you configured multiple clusters, you should turn
on cluster level geo-replication to ensure the topic data sync between
all primary and secondary clusters. Otherwise, it may distribute the
topic data into different clusters. And the consumers won’t get the
whole data of the topic.

In order to support different authentication configurations between
clusters, we provide the authentication relation configurations
updated with the target cluster.

public class AutoClusterFailover implements ServiceUrlProvider {

   private AutoClusterFailover(AutoClusterFailoverBuilderImpl builder) {

    public void initialize(PulsarClient client) {
        this.pulsarClient = client;

        // start to probe primary cluster active or not
        executor.scheduleAtFixedRate(catchingAndLoggingThrowables(() -> {
            // probe and switch
        }), intervalMs, intervalMs, TimeUnit.MILLISECONDS);


    public String getServiceUrl() {
        return this.currentPulsarServiceUrl;

    public void close() {

    // probe pulsar cluster available
    private boolean probeAvailable(String url, int timeout) {


In order to create an `AutoClusterFailover` instance, we use
`AutoClusterFailoverBuilder` interface to build the target instance.
The `AutoClusterFailoverBuilder` interface is located in the
`pulsar-client-api` package.

In the `probeAvailable` method, we will probe the Pulsar service port,
and check whether the port is open. This probe method has many
disadvantages, such as
We're connecting to a Pulsar proxy, but there are no available brokers
Using Istio on the server side, which always accepts the connection
even if the broker is in a bad state
We might have deadlocks in (all) brokers and while the connections get
accepted, the brokers are not able to serve them.
In order to solve this problem, we’d better provide a health check
command on the broker side, just like Zookeeper’s `ruok` command.
We can use the probe port method first, and in the next step, we will
provide the health check command on the broker side.

#### ControlledClusterFailover
If the users want to control the cluster switch operation, they can
provide the current service URL by a http service. The
`ControlledClusterFailover` will get the newest service url from the
provided http service periodically.
The APIs are listed as follows.
public class ControlledClusterFailover implements ServiceUrlProvider {

    private ControlledClusterFailover(String defaultServiceUrl, String
urlProvider) throws IOException {

    public void initialize(PulsarClient client) {
        this.pulsarClient = client;

        // start to check service url every 30 seconds
        executor.scheduleAtFixedRate(catchingAndLoggingThrowables(() -> {
            // probe and switch
        }), interval, interval, TimeUnit.MILLISECONDS);


    protected ControlledConfiguration fetchControlledConfiguration()
throws IOException {
        // call the service to get controlled configuration

    public String getServiceUrl() {
        return this.currentPulsarServiceUrl;

    public void close() {

protected static class ControlledConfiguration {
        private String serviceUrl;
        private String tlsTrustCertsFilePath;

        private String authPluginClassName;
        private String authParamsString;

The configuration we get from the third url provider, we define it as
java Bean by json format. In the configuration, we provide
authentication-related parameters to support different clusters that
have different authentication configurations. These
authentication-related parameters can support all current
authentication plugin types.

In order to create an `ControlledClusterFailover` instance, we use the
`ControlledClusterFailoverBuilder` interface to build the target
instance. The `ControlledClusterFailoverBuilder` interface is located
in the `pulsar-client-api` package.

### API Changes
For the current `ServiceUrlProvider` interface, we should add a
`close` method to close an allocated resource, such as a timer thread.
public interface ServiceUrlProvider {
     * Close the resource that the provider allocated.
    default void close() {
        // do nothing

     * Update the authentication this client is using.
     * @param authentication
     * @throws IOException
    void updateAuthentication(Authentication authentication)
            throws IOException;

     * Update the tlsTrustCertsFilePath this client is using.
     * @param tlsTrustCertsFilePath
    void updateTlsTrustCertsFilePath(String tlsTrustCertsFilePath);

     * Update the tlsTrustStorePath and tlsTrustStorePassword this
client is using.
     * @param tlsTrustStorePath
     * @param tlsTrustStorePassword
    void updateTlsTrustStorePathAndPassword(String tlsTrustStorePath,
String tlsTrustStorePassword);


### Tests
Add tests for the two service provider implementations.

For `AutoClusterFailover`, when the primary cluster shuts down, it
should switch to the secondary cluster. And then the primary cluster
came back, we should switch back.

For `ControlledClusterFailover`, when switching the service url on the
http service side, it should switch to the newest service url.

### Implementation
Prototype implementation PR:

Reply via email to