+1 (binding) Enrico
Il giorno lun 10 gen 2022 alle ore 07:45 Hang Chen <chenh...@apache.org> ha scritto: > > This is the voting thread for PIP-121. It will stay open for at least 48 > hours. > > https://github.com/apache/pulsar/issues/13315 > > Pasted below for quoting convenience. > > ----- > ### Motivation > We have geo-replication to support Pulsar cluster level failover. We > can set up Pulsar cluster A as a primary cluster in data center A, and > setup Pulsar cluster B as backup cluster in data center B. Then we > configure geo-replication between cluster A and cluster B. All the > clients are connected to the Pulsar cluster by DNS. If cluster A is > down, we should switch the DNS to point the target Pulsar cluster from > cluster A to cluster B. After the clients are resolved to cluster B, > they can produce and consume messages normally. After cluster A > recovers, the administrator should switch the DNS back to cluster A. > > However, the current method has two shortcomings. > 1. The administrator should monitor the status of all Pulsar clusters, > and switch the DNS as soon as possible when cluster A is down. The > switch and recovery is not automatic and recovery time is controlled > by the administrator, which will put the administrator under heavy > load. > 2. The Pulsar client and DNS system have a cache. When the > administrator switches the DNS from cluster A to Cluster B, it will > take some time for cache trigger timeout, which will delay client > recovery time and lead to the product/consumer message failing. > > ### Goal > It's better to provide an automatic cluster level failure recovery > mechanism to make pulsar cluster failover more effective. We should > support pulsar clients auto switching from cluster A to cluster B when > it detects cluster A has been down according to the configured > detecting policy and switch back to cluster A when it has recovered. > The reason why we should switch back to cluster A is that most > applications may be deployed in data center A and they have low > network cost for communicating with pulsar cluster A. If they keep > visiting pulsar cluster B, they have high network cost, and cause high > produce/consume latency. > > In order to improve the DNS cache problem, we should provide an > administrator controlled switch provider for administrators to update > service URLs. > > In the end, we should provide an auto service URL switch provider and > administrator controlled switch provider. > > ### Design > We have already provided the `ServiceUrlProvider` interface to support > different service URLs. In order to support automatic cluster level > failure auto recovery, we can provide different ServiceUrlProvider > implementations. For current requirements, we can provide > `AutoClusterFailover` and `ControlledClusterFailover`. > > #### AutoClusterFailover > In order to support auto switching from the primary cluster to the > secondary, we can provide a probe task, which will probe the activity > of the primary cluster and the secondary one. When it finds the > primary cluster failed more than `failoverDelayMs`, it will switch to > the secondary cluster by calling `updateServiceUrl`. After switching > to the secondary cluster, the `AutoClusterFailover` will continue to > probe the primary cluster. If the primary cluster comes back and > remains active for `switchBackDelayMs`, it will switch back to the > primary cluster. > The APIs are listed as follows. > > In order to support multiple secondary clusters, use List to store > secondary cluster urls. When the primary cluster probe fails for > failoverDelayMs, it will start to probe the secondary cluster list one > by one, once it finds the active cluster, it will switch to the target > cluster. Notice: If you configured multiple clusters, you should turn > on cluster level geo-replication to ensure the topic data sync between > all primary and secondary clusters. Otherwise, it may distribute the > topic data into different clusters. And the consumers won’t get the > whole data of the topic. > > In order to support different authentication configurations between > clusters, we provide the authentication relation configurations > updated with the target cluster. > > ```Java > public class AutoClusterFailover implements ServiceUrlProvider { > > private AutoClusterFailover(AutoClusterFailoverBuilderImpl builder) { > // > } > > @Override > public void initialize(PulsarClient client) { > this.pulsarClient = client; > > // start to probe primary cluster active or not > executor.scheduleAtFixedRate(catchingAndLoggingThrowables(() -> { > // probe and switch > }), intervalMs, intervalMs, TimeUnit.MILLISECONDS); > > } > > @Override > public String getServiceUrl() { > return this.currentPulsarServiceUrl; > } > > @Override > public void close() { > this.executor.shutdown(); > } > > // probe pulsar cluster available > private boolean probeAvailable(String url, int timeout) { > > } > ``` > > In order to create an `AutoClusterFailover` instance, we use > `AutoClusterFailoverBuilder` interface to build the target instance. > The `AutoClusterFailoverBuilder` interface is located in the > `pulsar-client-api` package. > > In the `probeAvailable` method, we will probe the Pulsar service port, > and check whether the port is open. This probe method has many > disadvantages, such as > We're connecting to a Pulsar proxy, but there are no available brokers > Using Istio on the server side, which always accepts the connection > even if the broker is in a bad state > We might have deadlocks in (all) brokers and while the connections get > accepted, the brokers are not able to serve them. > In order to solve this problem, we’d better provide a health check > command on the broker side, just like Zookeeper’s `ruok` command. > We can use the probe port method first, and in the next step, we will > provide the health check command on the broker side. > > #### ControlledClusterFailover > If the users want to control the cluster switch operation, they can > provide the current service URL by a http service. The > `ControlledClusterFailover` will get the newest service url from the > provided http service periodically. > The APIs are listed as follows. > ```Java > public class ControlledClusterFailover implements ServiceUrlProvider { > > private ControlledClusterFailover(String defaultServiceUrl, String > urlProvider) throws IOException { > } > > @Override > public void initialize(PulsarClient client) { > this.pulsarClient = client; > > // start to check service url every 30 seconds > executor.scheduleAtFixedRate(catchingAndLoggingThrowables(() -> { > // probe and switch > }), interval, interval, TimeUnit.MILLISECONDS); > > } > > protected ControlledConfiguration fetchControlledConfiguration() > throws IOException { > // call the service to get controlled configuration > } > > @Override > public String getServiceUrl() { > return this.currentPulsarServiceUrl; > } > > @Override > public void close() { > this.executor.shutdown(); > } > > protected static class ControlledConfiguration { > private String serviceUrl; > private String tlsTrustCertsFilePath; > > private String authPluginClassName; > private String authParamsString; > > } > ``` > The configuration we get from the third url provider, we define it as > java Bean by json format. In the configuration, we provide > authentication-related parameters to support different clusters that > have different authentication configurations. These > authentication-related parameters can support all current > authentication plugin types. > > In order to create an `ControlledClusterFailover` instance, we use the > `ControlledClusterFailoverBuilder` interface to build the target > instance. The `ControlledClusterFailoverBuilder` interface is located > in the `pulsar-client-api` package. > > ### API Changes > For the current `ServiceUrlProvider` interface, we should add a > `close` method to close an allocated resource, such as a timer thread. > ```Java > public interface ServiceUrlProvider { > /** > * Close the resource that the provider allocated. > * > */ > default void close() { > // do nothing > } > > /** > * Update the authentication this client is using. > * > * @param authentication > * > * @throws IOException > */ > void updateAuthentication(Authentication authentication) > throws IOException; > > /** > * Update the tlsTrustCertsFilePath this client is using. > * > * @param tlsTrustCertsFilePath > */ > void updateTlsTrustCertsFilePath(String tlsTrustCertsFilePath); > > /** > * Update the tlsTrustStorePath and tlsTrustStorePassword this > client is using. > * > * @param tlsTrustStorePath > * @param tlsTrustStorePassword > */ > void updateTlsTrustStorePathAndPassword(String tlsTrustStorePath, > String tlsTrustStorePassword); > > } > ``` > > ### Tests > Add tests for the two service provider implementations. > > For `AutoClusterFailover`, when the primary cluster shuts down, it > should switch to the secondary cluster. And then the primary cluster > came back, we should switch back. > > For `ControlledClusterFailover`, when switching the service url on the > http service side, it should switch to the newest service url. > > ### Implementation > Prototype implementation PR: https://github.com/apache/pulsar/pull/13316