This is the voting thread for PIP-121. It will stay open for at least 48 hours.
https://github.com/apache/pulsar/issues/13315 Pasted below for quoting convenience. ----- ### Motivation We have geo-replication to support Pulsar cluster level failover. We can set up Pulsar cluster A as a primary cluster in data center A, and setup Pulsar cluster B as backup cluster in data center B. Then we configure geo-replication between cluster A and cluster B. All the clients are connected to the Pulsar cluster by DNS. If cluster A is down, we should switch the DNS to point the target Pulsar cluster from cluster A to cluster B. After the clients are resolved to cluster B, they can produce and consume messages normally. After cluster A recovers, the administrator should switch the DNS back to cluster A. However, the current method has two shortcomings. 1. The administrator should monitor the status of all Pulsar clusters, and switch the DNS as soon as possible when cluster A is down. The switch and recovery is not automatic and recovery time is controlled by the administrator, which will put the administrator under heavy load. 2. The Pulsar client and DNS system have a cache. When the administrator switches the DNS from cluster A to Cluster B, it will take some time for cache trigger timeout, which will delay client recovery time and lead to the product/consumer message failing. ### Goal It's better to provide an automatic cluster level failure recovery mechanism to make pulsar cluster failover more effective. We should support pulsar clients auto switching from cluster A to cluster B when it detects cluster A has been down according to the configured detecting policy and switch back to cluster A when it has recovered. The reason why we should switch back to cluster A is that most applications may be deployed in data center A and they have low network cost for communicating with pulsar cluster A. If they keep visiting pulsar cluster B, they have high network cost, and cause high produce/consume latency. In order to improve the DNS cache problem, we should provide an administrator controlled switch provider for administrators to update service URLs. In the end, we should provide an auto service URL switch provider and administrator controlled switch provider. ### Design We have already provided the `ServiceUrlProvider` interface to support different service URLs. In order to support automatic cluster level failure auto recovery, we can provide different ServiceUrlProvider implementations. For current requirements, we can provide `AutoClusterFailover` and `ControlledClusterFailover`. #### AutoClusterFailover In order to support auto switching from the primary cluster to the secondary, we can provide a probe task, which will probe the activity of the primary cluster and the secondary one. When it finds the primary cluster failed more than `failoverDelayMs`, it will switch to the secondary cluster by calling `updateServiceUrl`. After switching to the secondary cluster, the `AutoClusterFailover` will continue to probe the primary cluster. If the primary cluster comes back and remains active for `switchBackDelayMs`, it will switch back to the primary cluster. The APIs are listed as follows. In order to support multiple secondary clusters, use List to store secondary cluster urls. When the primary cluster probe fails for failoverDelayMs, it will start to probe the secondary cluster list one by one, once it finds the active cluster, it will switch to the target cluster. Notice: If you configured multiple clusters, you should turn on cluster level geo-replication to ensure the topic data sync between all primary and secondary clusters. Otherwise, it may distribute the topic data into different clusters. And the consumers won’t get the whole data of the topic. In order to support different authentication configurations between clusters, we provide the authentication relation configurations updated with the target cluster. ```Java public class AutoClusterFailover implements ServiceUrlProvider { private AutoClusterFailover(AutoClusterFailoverBuilderImpl builder) { // } @Override public void initialize(PulsarClient client) { this.pulsarClient = client; // start to probe primary cluster active or not executor.scheduleAtFixedRate(catchingAndLoggingThrowables(() -> { // probe and switch }), intervalMs, intervalMs, TimeUnit.MILLISECONDS); } @Override public String getServiceUrl() { return this.currentPulsarServiceUrl; } @Override public void close() { this.executor.shutdown(); } // probe pulsar cluster available private boolean probeAvailable(String url, int timeout) { } ``` In order to create an `AutoClusterFailover` instance, we use `AutoClusterFailoverBuilder` interface to build the target instance. The `AutoClusterFailoverBuilder` interface is located in the `pulsar-client-api` package. In the `probeAvailable` method, we will probe the Pulsar service port, and check whether the port is open. This probe method has many disadvantages, such as We're connecting to a Pulsar proxy, but there are no available brokers Using Istio on the server side, which always accepts the connection even if the broker is in a bad state We might have deadlocks in (all) brokers and while the connections get accepted, the brokers are not able to serve them. In order to solve this problem, we’d better provide a health check command on the broker side, just like Zookeeper’s `ruok` command. We can use the probe port method first, and in the next step, we will provide the health check command on the broker side. #### ControlledClusterFailover If the users want to control the cluster switch operation, they can provide the current service URL by a http service. The `ControlledClusterFailover` will get the newest service url from the provided http service periodically. The APIs are listed as follows. ```Java public class ControlledClusterFailover implements ServiceUrlProvider { private ControlledClusterFailover(String defaultServiceUrl, String urlProvider) throws IOException { } @Override public void initialize(PulsarClient client) { this.pulsarClient = client; // start to check service url every 30 seconds executor.scheduleAtFixedRate(catchingAndLoggingThrowables(() -> { // probe and switch }), interval, interval, TimeUnit.MILLISECONDS); } protected ControlledConfiguration fetchControlledConfiguration() throws IOException { // call the service to get controlled configuration } @Override public String getServiceUrl() { return this.currentPulsarServiceUrl; } @Override public void close() { this.executor.shutdown(); } protected static class ControlledConfiguration { private String serviceUrl; private String tlsTrustCertsFilePath; private String authPluginClassName; private String authParamsString; } ``` The configuration we get from the third url provider, we define it as java Bean by json format. In the configuration, we provide authentication-related parameters to support different clusters that have different authentication configurations. These authentication-related parameters can support all current authentication plugin types. In order to create an `ControlledClusterFailover` instance, we use the `ControlledClusterFailoverBuilder` interface to build the target instance. The `ControlledClusterFailoverBuilder` interface is located in the `pulsar-client-api` package. ### API Changes For the current `ServiceUrlProvider` interface, we should add a `close` method to close an allocated resource, such as a timer thread. ```Java public interface ServiceUrlProvider { /** * Close the resource that the provider allocated. * */ default void close() { // do nothing } /** * Update the authentication this client is using. * * @param authentication * * @throws IOException */ void updateAuthentication(Authentication authentication) throws IOException; /** * Update the tlsTrustCertsFilePath this client is using. * * @param tlsTrustCertsFilePath */ void updateTlsTrustCertsFilePath(String tlsTrustCertsFilePath); /** * Update the tlsTrustStorePath and tlsTrustStorePassword this client is using. * * @param tlsTrustStorePath * @param tlsTrustStorePassword */ void updateTlsTrustStorePathAndPassword(String tlsTrustStorePath, String tlsTrustStorePassword); } ``` ### Tests Add tests for the two service provider implementations. For `AutoClusterFailover`, when the primary cluster shuts down, it should switch to the secondary cluster. And then the primary cluster came back, we should switch back. For `ControlledClusterFailover`, when switching the service url on the http service side, it should switch to the newest service url. ### Implementation Prototype implementation PR: https://github.com/apache/pulsar/pull/13316