+1 (non-binding) On Wed, Jan 12, 2022 at 9:52 AM Haiting Jiang <jianghait...@apache.org> wrote: > > +1 > > On 2022/01/12 00:09:26 Matteo Merli wrote: > > +1 > > -- > > Matteo Merli > > <matteo.me...@gmail.com> > > > > On Tue, Jan 11, 2022 at 12:07 PM Neng Lu <nl...@apache.org> wrote: > > > > > > +1 (non-binding) > > > > > > On Mon, Jan 10, 2022 at 12:40 AM PengHui Li <peng...@apache.org> wrote: > > > > > > > +1 (binding) > > > > > > > > Penghui > > > > > > > > On Mon, Jan 10, 2022 at 4:38 PM Enrico Olivelli <eolive...@gmail.com> > > > > wrote: > > > > > > > > > +1 (binding) > > > > > > > > > > Enrico > > > > > > > > > > Il giorno lun 10 gen 2022 alle ore 07:45 Hang Chen > > > > > <chenh...@apache.org> ha scritto: > > > > > > > > > > > > This is the voting thread for PIP-121. It will stay open for at > > > > > > least > > > > 48 > > > > > > hours. > > > > > > > > > > > > https://github.com/apache/pulsar/issues/13315 > > > > > > > > > > > > Pasted below for quoting convenience. > > > > > > > > > > > > ----- > > > > > > ### Motivation > > > > > > We have geo-replication to support Pulsar cluster level failover. We > > > > > > can set up Pulsar cluster A as a primary cluster in data center A, > > > > > > and > > > > > > setup Pulsar cluster B as backup cluster in data center B. Then we > > > > > > configure geo-replication between cluster A and cluster B. All the > > > > > > clients are connected to the Pulsar cluster by DNS. If cluster A is > > > > > > down, we should switch the DNS to point the target Pulsar cluster > > > > > > from > > > > > > cluster A to cluster B. After the clients are resolved to cluster B, > > > > > > they can produce and consume messages normally. After cluster A > > > > > > recovers, the administrator should switch the DNS back to cluster A. > > > > > > > > > > > > However, the current method has two shortcomings. > > > > > > 1. The administrator should monitor the status of all Pulsar > > > > > > clusters, > > > > > > and switch the DNS as soon as possible when cluster A is down. The > > > > > > switch and recovery is not automatic and recovery time is controlled > > > > > > by the administrator, which will put the administrator under heavy > > > > > > load. > > > > > > 2. The Pulsar client and DNS system have a cache. When the > > > > > > administrator switches the DNS from cluster A to Cluster B, it will > > > > > > take some time for cache trigger timeout, which will delay client > > > > > > recovery time and lead to the product/consumer message failing. > > > > > > > > > > > > ### Goal > > > > > > It's better to provide an automatic cluster level failure recovery > > > > > > mechanism to make pulsar cluster failover more effective. We should > > > > > > support pulsar clients auto switching from cluster A to cluster B > > > > > > when > > > > > > it detects cluster A has been down according to the configured > > > > > > detecting policy and switch back to cluster A when it has recovered. > > > > > > The reason why we should switch back to cluster A is that most > > > > > > applications may be deployed in data center A and they have low > > > > > > network cost for communicating with pulsar cluster A. If they keep > > > > > > visiting pulsar cluster B, they have high network cost, and cause > > > > > > high > > > > > > produce/consume latency. > > > > > > > > > > > > In order to improve the DNS cache problem, we should provide an > > > > > > administrator controlled switch provider for administrators to > > > > > > update > > > > > > service URLs. > > > > > > > > > > > > In the end, we should provide an auto service URL switch provider > > > > > > and > > > > > > administrator controlled switch provider. > > > > > > > > > > > > ### Design > > > > > > We have already provided the `ServiceUrlProvider` interface to > > > > > > support > > > > > > different service URLs. In order to support automatic cluster level > > > > > > failure auto recovery, we can provide different ServiceUrlProvider > > > > > > implementations. For current requirements, we can provide > > > > > > `AutoClusterFailover` and `ControlledClusterFailover`. > > > > > > > > > > > > #### AutoClusterFailover > > > > > > In order to support auto switching from the primary cluster to the > > > > > > secondary, we can provide a probe task, which will probe the > > > > > > activity > > > > > > of the primary cluster and the secondary one. When it finds the > > > > > > primary cluster failed more than `failoverDelayMs`, it will switch > > > > > > to > > > > > > the secondary cluster by calling `updateServiceUrl`. After switching > > > > > > to the secondary cluster, the `AutoClusterFailover` will continue to > > > > > > probe the primary cluster. If the primary cluster comes back and > > > > > > remains active for `switchBackDelayMs`, it will switch back to the > > > > > > primary cluster. > > > > > > The APIs are listed as follows. > > > > > > > > > > > > In order to support multiple secondary clusters, use List to store > > > > > > secondary cluster urls. When the primary cluster probe fails for > > > > > > failoverDelayMs, it will start to probe the secondary cluster list > > > > > > one > > > > > > by one, once it finds the active cluster, it will switch to the > > > > > > target > > > > > > cluster. Notice: If you configured multiple clusters, you should > > > > > > turn > > > > > > on cluster level geo-replication to ensure the topic data sync > > > > > > between > > > > > > all primary and secondary clusters. Otherwise, it may distribute the > > > > > > topic data into different clusters. And the consumers won’t get the > > > > > > whole data of the topic. > > > > > > > > > > > > In order to support different authentication configurations between > > > > > > clusters, we provide the authentication relation configurations > > > > > > updated with the target cluster. > > > > > > > > > > > > ```Java > > > > > > public class AutoClusterFailover implements ServiceUrlProvider { > > > > > > > > > > > > private AutoClusterFailover(AutoClusterFailoverBuilderImpl > > > > > > builder) > > > > { > > > > > > // > > > > > > } > > > > > > > > > > > > @Override > > > > > > public void initialize(PulsarClient client) { > > > > > > this.pulsarClient = client; > > > > > > > > > > > > // start to probe primary cluster active or not > > > > > > executor.scheduleAtFixedRate(catchingAndLoggingThrowables(() > > > > -> { > > > > > > // probe and switch > > > > > > }), intervalMs, intervalMs, TimeUnit.MILLISECONDS); > > > > > > > > > > > > } > > > > > > > > > > > > @Override > > > > > > public String getServiceUrl() { > > > > > > return this.currentPulsarServiceUrl; > > > > > > } > > > > > > > > > > > > @Override > > > > > > public void close() { > > > > > > this.executor.shutdown(); > > > > > > } > > > > > > > > > > > > // probe pulsar cluster available > > > > > > private boolean probeAvailable(String url, int timeout) { > > > > > > > > > > > > } > > > > > > ``` > > > > > > > > > > > > In order to create an `AutoClusterFailover` instance, we use > > > > > > `AutoClusterFailoverBuilder` interface to build the target instance. > > > > > > The `AutoClusterFailoverBuilder` interface is located in the > > > > > > `pulsar-client-api` package. > > > > > > > > > > > > In the `probeAvailable` method, we will probe the Pulsar service > > > > > > port, > > > > > > and check whether the port is open. This probe method has many > > > > > > disadvantages, such as > > > > > > We're connecting to a Pulsar proxy, but there are no available > > > > > > brokers > > > > > > Using Istio on the server side, which always accepts the connection > > > > > > even if the broker is in a bad state > > > > > > We might have deadlocks in (all) brokers and while the connections > > > > > > get > > > > > > accepted, the brokers are not able to serve them. > > > > > > In order to solve this problem, we’d better provide a health check > > > > > > command on the broker side, just like Zookeeper’s `ruok` command. > > > > > > We can use the probe port method first, and in the next step, we > > > > > > will > > > > > > provide the health check command on the broker side. > > > > > > > > > > > > #### ControlledClusterFailover > > > > > > If the users want to control the cluster switch operation, they can > > > > > > provide the current service URL by a http service. The > > > > > > `ControlledClusterFailover` will get the newest service url from the > > > > > > provided http service periodically. > > > > > > The APIs are listed as follows. > > > > > > ```Java > > > > > > public class ControlledClusterFailover implements > > > > > > ServiceUrlProvider { > > > > > > > > > > > > private ControlledClusterFailover(String defaultServiceUrl, > > > > > > String > > > > > > urlProvider) throws IOException { > > > > > > } > > > > > > > > > > > > @Override > > > > > > public void initialize(PulsarClient client) { > > > > > > this.pulsarClient = client; > > > > > > > > > > > > // start to check service url every 30 seconds > > > > > > executor.scheduleAtFixedRate(catchingAndLoggingThrowables(() > > > > -> { > > > > > > // probe and switch > > > > > > }), interval, interval, TimeUnit.MILLISECONDS); > > > > > > > > > > > > } > > > > > > > > > > > > protected ControlledConfiguration fetchControlledConfiguration() > > > > > > throws IOException { > > > > > > // call the service to get controlled configuration > > > > > > } > > > > > > > > > > > > @Override > > > > > > public String getServiceUrl() { > > > > > > return this.currentPulsarServiceUrl; > > > > > > } > > > > > > > > > > > > @Override > > > > > > public void close() { > > > > > > this.executor.shutdown(); > > > > > > } > > > > > > > > > > > > protected static class ControlledConfiguration { > > > > > > private String serviceUrl; > > > > > > private String tlsTrustCertsFilePath; > > > > > > > > > > > > private String authPluginClassName; > > > > > > private String authParamsString; > > > > > > > > > > > > } > > > > > > ``` > > > > > > The configuration we get from the third url provider, we define it > > > > > > as > > > > > > java Bean by json format. In the configuration, we provide > > > > > > authentication-related parameters to support different clusters that > > > > > > have different authentication configurations. These > > > > > > authentication-related parameters can support all current > > > > > > authentication plugin types. > > > > > > > > > > > > In order to create an `ControlledClusterFailover` instance, we use > > > > > > the > > > > > > `ControlledClusterFailoverBuilder` interface to build the target > > > > > > instance. The `ControlledClusterFailoverBuilder` interface is > > > > > > located > > > > > > in the `pulsar-client-api` package. > > > > > > > > > > > > ### API Changes > > > > > > For the current `ServiceUrlProvider` interface, we should add a > > > > > > `close` method to close an allocated resource, such as a timer > > > > > > thread. > > > > > > ```Java > > > > > > public interface ServiceUrlProvider { > > > > > > /** > > > > > > * Close the resource that the provider allocated. > > > > > > * > > > > > > */ > > > > > > default void close() { > > > > > > // do nothing > > > > > > } > > > > > > > > > > > > /** > > > > > > * Update the authentication this client is using. > > > > > > * > > > > > > * @param authentication > > > > > > * > > > > > > * @throws IOException > > > > > > */ > > > > > > void updateAuthentication(Authentication authentication) > > > > > > throws IOException; > > > > > > > > > > > > /** > > > > > > * Update the tlsTrustCertsFilePath this client is using. > > > > > > * > > > > > > * @param tlsTrustCertsFilePath > > > > > > */ > > > > > > void updateTlsTrustCertsFilePath(String tlsTrustCertsFilePath); > > > > > > > > > > > > /** > > > > > > * Update the tlsTrustStorePath and tlsTrustStorePassword this > > > > > > client is using. > > > > > > * > > > > > > * @param tlsTrustStorePath > > > > > > * @param tlsTrustStorePassword > > > > > > */ > > > > > > void updateTlsTrustStorePathAndPassword(String > > > > > > tlsTrustStorePath, > > > > > > String tlsTrustStorePassword); > > > > > > > > > > > > } > > > > > > ``` > > > > > > > > > > > > ### Tests > > > > > > Add tests for the two service provider implementations. > > > > > > > > > > > > For `AutoClusterFailover`, when the primary cluster shuts down, it > > > > > > should switch to the secondary cluster. And then the primary cluster > > > > > > came back, we should switch back. > > > > > > > > > > > > For `ControlledClusterFailover`, when switching the service url on > > > > > > the > > > > > > http service side, it should switch to the newest service url. > > > > > > > > > > > > ### Implementation > > > > > > Prototype implementation PR: > > > > https://github.com/apache/pulsar/pull/13316 > > > > > > > > > > >
-- Zike Yang