This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 40259cc  PIP-121: Implement AutoClusterFailover (#547)
40259cc is described below

commit 40259cc7afb788ea4d27291f2df134a752c1a714
Author: Yunze Xu <[email protected]>
AuthorDate: Tue Mar 17 19:18:04 2026 +0800

    PIP-121: Implement AutoClusterFailover (#547)
---
 include/pulsar/AutoClusterFailover.h | 116 ++++++++++
 lib/AutoClusterFailover.cc           | 418 +++++++++++++++++++++++++++++++++++
 tests/ServiceInfoProviderTest.cc     | 218 ++++++++++++++++++
 3 files changed, 752 insertions(+)

diff --git a/include/pulsar/AutoClusterFailover.h 
b/include/pulsar/AutoClusterFailover.h
new file mode 100644
index 0000000..a9d7442
--- /dev/null
+++ b/include/pulsar/AutoClusterFailover.h
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef PULSAR_AUTO_CLUSTER_FAILOVER_H_
+#define PULSAR_AUTO_CLUSTER_FAILOVER_H_
+
+#include <pulsar/ServiceInfoProvider.h>
+
+#include <chrono>
+#include <cstdint>
+#include <functional>
+#include <memory>
+#include <vector>
+
+namespace pulsar {
+
+class Client;
+class AutoClusterFailoverImpl;
+
+class PULSAR_PUBLIC AutoClusterFailover final : public ServiceInfoProvider {
+   public:
+    struct Config {
+        const ServiceInfo primary;
+        const std::vector<ServiceInfo> secondary;
+        std::chrono::milliseconds checkInterval{5000};  // 5 seconds
+        uint32_t failoverThreshold{1};
+        uint32_t switchBackThreshold{1};
+
+        Config(ServiceInfo primary, std::vector<ServiceInfo> secondary)
+            : primary(std::move(primary)), secondary(std::move(secondary)) {}
+    };
+
+    /**
+     * Builder helps create an AutoClusterFailover configuration.
+     *
+     * Example:
+     *   ServiceInfo primary{...};
+     *   std::vector<ServiceInfo> secondaries{...};
+     *   AutoClusterFailover provider = AutoClusterFailover::Builder(primary, 
secondaries)
+     *       .withCheckInterval(std::chrono::seconds(5))
+     *       .withFailoverThreshold(3)
+     *       .withSwitchBackThreshold(3)
+     *       .build();
+     *
+     * Notes:
+     * - primary: the preferred cluster to use when available.
+     * - secondary: ordered list of fallback clusters.
+     * - checkInterval: frequency of health probes.
+     * - failoverThreshold: the number of consecutive failed probes required 
before switching away from
+     *   the current cluster.
+     * - switchBackThreshold: the number of consecutive successful probes to 
the primary required before
+     *   switching back from a secondary while that secondary remains 
available. If the active secondary
+     *   becomes unavailable and the primary is available, the implementation 
may switch back to the
+     *   primary immediately, regardless of this threshold.
+     */
+    class Builder {
+       public:
+        Builder(ServiceInfo primary, std::vector<ServiceInfo> secondary)
+            : config_(std::move(primary), std::move(secondary)) {}
+
+        // Set how frequently probes run against the active cluster(s). 
Default: 5 seconds.
+        Builder& withCheckInterval(std::chrono::milliseconds interval) {
+            config_.checkInterval = interval;
+            return *this;
+        }
+
+        // Set the number of consecutive failed probes required before 
attempting failover. Default: 1.
+        Builder& withFailoverThreshold(uint32_t threshold) {
+            config_.failoverThreshold = threshold;
+            return *this;
+        }
+
+        // Set the number of consecutive successful primary probes required 
before switching back from a
+        // healthy secondary. If the active secondary becomes unavailable and 
the primary is available,
+        // the implementation may switch back immediately regardless of this 
threshold. Default: 1.
+        Builder& withSwitchBackThreshold(uint32_t threshold) {
+            config_.switchBackThreshold = threshold;
+            return *this;
+        }
+
+        AutoClusterFailover build() { return 
AutoClusterFailover(std::move(config_)); }
+
+       private:
+        Config config_;
+    };
+
+    explicit AutoClusterFailover(Config&& config);
+
+    ~AutoClusterFailover() final;
+
+    ServiceInfo initialServiceInfo() final;
+
+    void initialize(std::function<void(ServiceInfo)> onServiceInfoUpdate) 
final;
+
+   private:
+    std::shared_ptr<AutoClusterFailoverImpl> impl_;
+};
+
+}  // namespace pulsar
+
+#endif
diff --git a/lib/AutoClusterFailover.cc b/lib/AutoClusterFailover.cc
new file mode 100644
index 0000000..4fdfc1e
--- /dev/null
+++ b/lib/AutoClusterFailover.cc
@@ -0,0 +1,418 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <pulsar/AutoClusterFailover.h>
+
+#include <chrono>
+#include <future>
+#include <memory>
+#include <optional>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "AsioTimer.h"
+#include "LogUtils.h"
+#include "ServiceURI.h"
+#include "Url.h"
+
+#ifdef USE_ASIO
+#include <asio/connect.hpp>
+#include <asio/executor_work_guard.hpp>
+#include <asio/io_context.hpp>
+#include <asio/ip/tcp.hpp>
+#include <asio/post.hpp>
+#include <asio/steady_timer.hpp>
+#else
+#include <boost/asio/connect.hpp>
+#include <boost/asio/executor_work_guard.hpp>
+#include <boost/asio/io_context.hpp>
+#include <boost/asio/ip/tcp.hpp>
+#include <boost/asio/post.hpp>
+#include <boost/asio/steady_timer.hpp>
+#endif
+
+#include "AsioDefines.h"
+
+DECLARE_LOG_OBJECT()
+
+namespace pulsar {
+
+class AutoClusterFailoverImpl : public 
std::enable_shared_from_this<AutoClusterFailoverImpl> {
+   public:
+    AutoClusterFailoverImpl(AutoClusterFailover::Config&& config)
+        : config_(std::move(config)), currentServiceInfo_(&config_.primary) {}
+
+    ~AutoClusterFailoverImpl() {
+        using namespace std::chrono_literals;
+        if (!thread_.joinable()) {
+            return;
+        }
+
+        cancelTimer(*timer_);
+        workGuard_.reset();
+        ioContext_.stop();
+
+        if (future_.wait_for(3s) != std::future_status::ready) {
+            LOG_WARN("AutoClusterFailoverImpl is not stopped within 3 seconds, 
waiting for it to finish");
+        }
+        thread_.join();
+    }
+
+    auto primary() const noexcept { return config_.primary; }
+
+    void initialize(std::function<void(ServiceInfo)>&& onServiceInfoUpdate) {
+        onServiceInfoUpdate_ = std::move(onServiceInfoUpdate);
+        workGuard_.emplace(ASIO::make_work_guard(ioContext_));
+        timer_.emplace(ioContext_);
+
+        auto weakSelf = weak_from_this();
+        ASIO::post(ioContext_, [weakSelf] {
+            if (auto self = weakSelf.lock()) {
+                self->scheduleFailoverCheck();
+            }
+        });
+
+        // Capturing `this` is safe because the thread will be joined in the 
destructor
+        std::promise<void> promise;
+        future_ = promise.get_future();
+        thread_ = std::thread([this, promise{std::move(promise)}]() mutable {
+            ioContext_.run();
+            promise.set_value();
+        });
+    }
+
+   private:
+    static constexpr std::chrono::milliseconds probeTimeout_{30000};
+    using CompletionCallback = std::function<void()>;
+    using ProbeCallback = std::function<void(bool)>;
+
+    struct ProbeContext {
+        ASIO::ip::tcp::resolver resolver;
+        ASIO::ip::tcp::socket socket;
+        ASIO::steady_timer timer;
+        ProbeCallback callback;
+        bool done{false};
+        std::string hostUrl;
+
+        ProbeContext(ASIO::io_context& ioContext, std::string hostUrl, 
ProbeCallback callback)
+            : resolver(ioContext),
+              socket(ioContext),
+              timer(ioContext),
+              callback(std::move(callback)),
+              hostUrl(std::move(hostUrl)) {}
+    };
+
+    AutoClusterFailover::Config config_;
+    const ServiceInfo* currentServiceInfo_;
+    uint32_t consecutiveFailureCount_{0};
+    uint32_t consecutivePrimaryRecoveryCount_{0};
+
+    std::thread thread_;
+    std::future<void> future_;
+
+    ASIO::io_context ioContext_;
+    std::function<void(ServiceInfo)> onServiceInfoUpdate_;
+
+    std::optional<ASIO::executor_work_guard<ASIO::io_context::executor_type>> 
workGuard_;
+    std::optional<ASIO::steady_timer> timer_;
+
+    bool isUsingPrimary() const noexcept { return currentServiceInfo_ == 
&config_.primary; }
+
+    const ServiceInfo& current() const noexcept { return *currentServiceInfo_; 
}
+
+    void scheduleFailoverCheck() {
+        timer_->expires_after(config_.checkInterval);
+        auto weakSelf = weak_from_this();
+        timer_->async_wait([weakSelf](ASIO_ERROR error) {
+            if (error) {
+                return;
+            }
+            if (auto self = weakSelf.lock()) {
+                self->executeFailoverCheck();
+            }
+        });
+    }
+
+    void executeFailoverCheck() {
+        auto done = [weakSelf = weak_from_this()] {
+            if (auto self = weakSelf.lock()) {
+                self->scheduleFailoverCheck();
+            }
+        };
+
+        if (isUsingPrimary()) {
+            checkAndFailoverToSecondaryAsync(std::move(done));
+        } else {
+            checkSecondaryAndPrimaryAsync(std::move(done));
+        }
+    }
+
+    static void completeProbe(const std::shared_ptr<ProbeContext>& context, 
bool success,
+                              const ASIO_ERROR& error = ASIO_SUCCESS) {
+        if (context->done) {
+            return;
+        }
+
+        context->done = true;
+        ASIO_ERROR ignored;
+        context->resolver.cancel();
+        context->socket.close(ignored);
+        context->timer.cancel(ignored);
+
+        context->callback(success);
+    }
+
+    void probeHostAsync(const std::string& hostUrl, ProbeCallback callback) {
+        Url parsedUrl;
+        if (!Url::parse(hostUrl, parsedUrl)) {
+            LOG_WARN("Failed to parse service URL for probing: " << hostUrl);
+            callback(false);
+            return;
+        }
+
+        auto context = std::make_shared<ProbeContext>(ioContext_, hostUrl, 
std::move(callback));
+        context->timer.expires_after(probeTimeout_);
+        context->timer.async_wait([context](const ASIO_ERROR& error) {
+            if (!error) {
+                completeProbe(context, false, ASIO::error::timed_out);
+            }
+        });
+
+        context->resolver.async_resolve(
+            parsedUrl.host(), std::to_string(parsedUrl.port()),
+            [context](const ASIO_ERROR& error, const 
ASIO::ip::tcp::resolver::results_type& endpoints) {
+                if (error) {
+                    completeProbe(context, false, error);
+                    return;
+                }
+
+                ASIO::async_connect(
+                    context->socket, endpoints,
+                    [context](const ASIO_ERROR& connectError, const 
ASIO::ip::tcp::endpoint&) {
+                        completeProbe(context, !connectError, connectError);
+                    });
+            });
+    }
+
+    void probeHostsAsync(const std::shared_ptr<std::vector<std::string>>& 
hosts, size_t index,
+                         ProbeCallback callback) {
+        if (index >= hosts->size()) {
+            callback(false);
+            return;
+        }
+
+        auto hostUrl = (*hosts)[index];
+        auto weakSelf = weak_from_this();
+        probeHostAsync(hostUrl,
+                       [weakSelf, hosts, index, callback = 
std::move(callback)](bool available) mutable {
+                           if (available) {
+                               callback(true);
+                               return;
+                           }
+                           if (auto self = weakSelf.lock()) {
+                               self->probeHostsAsync(hosts, index + 1, 
std::move(callback));
+                           }
+                       });
+    }
+
+    void probeAvailableAsync(const ServiceInfo& serviceInfo, ProbeCallback 
callback) {
+        try {
+            ServiceURI serviceUri{serviceInfo.serviceUrl()};
+            auto hosts = 
std::make_shared<std::vector<std::string>>(serviceUri.getServiceHosts());
+            if (hosts->empty()) {
+                callback(false);
+                return;
+            }
+            probeHostsAsync(hosts, 0, std::move(callback));
+        } catch (const std::exception& e) {
+            LOG_WARN("Failed to probe service URL " << 
serviceInfo.serviceUrl() << ": " << e.what());
+            callback(false);
+        }
+    }
+
+    void switchTo(const ServiceInfo* serviceInfo) {
+        if (currentServiceInfo_ == serviceInfo) {
+            return;
+        }
+
+        LOG_INFO("Switch service URL from " << current().serviceUrl() << " to 
" << serviceInfo->serviceUrl());
+        currentServiceInfo_ = serviceInfo;
+        consecutiveFailureCount_ = 0;
+        consecutivePrimaryRecoveryCount_ = 0;
+        onServiceInfoUpdate_(current());
+    }
+
+    void probeSecondaryFrom(size_t index, const ServiceInfo* 
excludedServiceInfo, ProbeCallback callback) {
+        if (index >= config_.secondary.size()) {
+            callback(false);
+            return;
+        }
+
+        if (&config_.secondary[index] == excludedServiceInfo) {
+            probeSecondaryFrom(index + 1, excludedServiceInfo, 
std::move(callback));
+            return;
+        }
+
+        auto weakSelf = weak_from_this();
+        probeAvailableAsync(
+            config_.secondary[index],
+            [weakSelf, index, excludedServiceInfo, callback = 
std::move(callback)](bool available) mutable {
+                auto self = weakSelf.lock();
+                if (!self) {
+                    return;
+                }
+
+                LOG_DEBUG("Detected secondary " << 
self->config_.secondary[index].serviceUrl()
+                                                << " availability: " << 
available);
+                if (available) {
+                    self->switchTo(&self->config_.secondary[index]);
+                    callback(true);
+                    return;
+                }
+
+                self->probeSecondaryFrom(index + 1, excludedServiceInfo, 
std::move(callback));
+            });
+    }
+
+    void checkAndFailoverToSecondaryAsync(CompletionCallback done) {
+        auto weakSelf = weak_from_this();
+        probeAvailableAsync(current(), [weakSelf, done = std::move(done)](bool 
primaryAvailable) mutable {
+            auto self = weakSelf.lock();
+            if (!self) {
+                return;
+            }
+
+            LOG_DEBUG("Detected primary " << self->current().serviceUrl()
+                                          << " availability: " << 
primaryAvailable);
+            if (primaryAvailable) {
+                self->consecutiveFailureCount_ = 0;
+                done();
+                return;
+            }
+
+            if (++self->consecutiveFailureCount_ < 
self->config_.failoverThreshold) {
+                done();
+                return;
+            }
+
+            self->probeSecondaryFrom(0, nullptr, [done = 
std::move(done)](bool) mutable { done(); });
+        });
+    }
+
+    void failoverFromUnavailableSecondaryAsync(CompletionCallback done) {
+        auto weakSelf = weak_from_this();
+        probeAvailableAsync(
+            config_.primary, [weakSelf, done = std::move(done)](bool 
primaryAvailable) mutable {
+                auto self = weakSelf.lock();
+                if (!self) {
+                    return;
+                }
+
+                LOG_DEBUG("Detected primary while secondary is unavailable "
+                          << self->config_.primary.serviceUrl() << " 
availability: " << primaryAvailable);
+                if (primaryAvailable) {
+                    self->switchTo(&self->config_.primary);
+                    done();
+                    return;
+                }
+
+                self->probeSecondaryFrom(
+                    0, self->currentServiceInfo_,
+                    [weakSelf, done = std::move(done)](bool 
switchedToAnotherSecondary) mutable {
+                        auto self = weakSelf.lock();
+                        if (!self) {
+                            return;
+                        }
+
+                        if (switchedToAnotherSecondary) {
+                            done();
+                            return;
+                        }
+
+                        self->checkSwitchBackToPrimaryAsync(std::move(done), 
false);
+                    });
+            });
+    }
+
+    void checkSwitchBackToPrimaryAsync(CompletionCallback done, 
std::optional<bool> primaryAvailableHint) {
+        auto handlePrimaryAvailable = [weakSelf = weak_from_this(),
+                                       done = std::move(done)](bool 
primaryAvailable) mutable {
+            auto self = weakSelf.lock();
+            if (!self) {
+                return;
+            }
+
+            if (!primaryAvailable) {
+                self->consecutivePrimaryRecoveryCount_ = 0;
+                done();
+                return;
+            }
+
+            if (++self->consecutivePrimaryRecoveryCount_ >= 
self->config_.switchBackThreshold) {
+                self->switchTo(&self->config_.primary);
+            }
+            done();
+        };
+
+        if (primaryAvailableHint.has_value()) {
+            handlePrimaryAvailable(*primaryAvailableHint);
+            return;
+        }
+
+        probeAvailableAsync(config_.primary, 
std::move(handlePrimaryAvailable));
+    }
+
+    void checkSecondaryAndPrimaryAsync(CompletionCallback done) {
+        auto weakSelf = weak_from_this();
+        probeAvailableAsync(current(), [weakSelf, done = std::move(done)](bool 
secondaryAvailable) mutable {
+            auto self = weakSelf.lock();
+            if (!self) {
+                return;
+            }
+
+            LOG_DEBUG("Detected secondary " << self->current().serviceUrl()
+                                            << " availability: " << 
secondaryAvailable);
+            if (secondaryAvailable) {
+                self->consecutiveFailureCount_ = 0;
+                self->checkSwitchBackToPrimaryAsync(std::move(done), 
std::nullopt);
+                return;
+            }
+
+            if (++self->consecutiveFailureCount_ < 
self->config_.failoverThreshold) {
+                self->checkSwitchBackToPrimaryAsync(std::move(done), 
std::nullopt);
+                return;
+            }
+
+            self->failoverFromUnavailableSecondaryAsync(std::move(done));
+        });
+    }
+};
+
+AutoClusterFailover::AutoClusterFailover(Config&& config)
+    : impl_(std::make_shared<AutoClusterFailoverImpl>(std::move(config))) {}
+
+AutoClusterFailover::~AutoClusterFailover() {}
+
+ServiceInfo AutoClusterFailover::initialServiceInfo() { return 
impl_->primary(); }
+
+void AutoClusterFailover::initialize(std::function<void(ServiceInfo)> 
onServiceInfoUpdate) {
+    impl_->initialize(std::move(onServiceInfoUpdate));
+}
+
+}  // namespace pulsar
diff --git a/tests/ServiceInfoProviderTest.cc b/tests/ServiceInfoProviderTest.cc
index 82f5f6f..175c531 100644
--- a/tests/ServiceInfoProviderTest.cc
+++ b/tests/ServiceInfoProviderTest.cc
@@ -17,16 +17,20 @@
  * under the License.
  */
 #include <gtest/gtest.h>
+#include <pulsar/AutoClusterFailover.h>
 #include <pulsar/Client.h>
 
 #include <atomic>
+#include <chrono>
 #include <memory>
 #include <mutex>
 #include <optional>
 #include <thread>
+#include <vector>
 
 #include "PulsarFriend.h"
 #include "WaitUtils.h"
+#include "lib/AsioDefines.h"
 #include "lib/LogUtils.h"
 
 DECLARE_LOG_OBJECT()
@@ -34,6 +38,113 @@ DECLARE_LOG_OBJECT()
 using namespace pulsar;
 using namespace std::chrono_literals;
 
+namespace {
+
+class ProbeTcpServer {
+   public:
+    ProbeTcpServer() { start(); }
+
+    ~ProbeTcpServer() { stop(); }
+
+    void start() {
+        if (running_) {
+            return;
+        }
+
+        auto ioContext = std::unique_ptr<ASIO::io_context>(new 
ASIO::io_context);
+        auto acceptor = std::unique_ptr<ASIO::ip::tcp::acceptor>(new 
ASIO::ip::tcp::acceptor(*ioContext));
+        ASIO::ip::tcp::endpoint endpoint{ASIO::ip::tcp::v4(), 
static_cast<unsigned short>(port_)};
+        acceptor->open(endpoint.protocol());
+        acceptor->set_option(ASIO::ip::tcp::acceptor::reuse_address(true));
+        acceptor->bind(endpoint);
+        acceptor->listen();
+
+        port_ = acceptor->local_endpoint().port();
+        ioContext_ = std::move(ioContext);
+        acceptor_ = std::move(acceptor);
+        running_ = true;
+
+        scheduleAccept();
+        serverThread_ = std::thread([this] { ioContext_->run(); });
+    }
+
+    void stop() {
+        if (!running_.exchange(false)) {
+            return;
+        }
+
+        ASIO::post(*ioContext_, [this] {
+            ASIO_ERROR ignored;
+            if (acceptor_ && acceptor_->is_open()) {
+                acceptor_->close(ignored);
+            }
+        });
+
+        if (serverThread_.joinable()) {
+            serverThread_.join();
+        }
+
+        acceptor_.reset();
+        ioContext_.reset();
+    }
+
+    std::string getServiceUrl() const { return "pulsar://127.0.0.1:" + 
std::to_string(port_); }
+
+   private:
+    void scheduleAccept() {
+        if (!running_ || !acceptor_ || !acceptor_->is_open()) {
+            return;
+        }
+
+        auto socket = std::make_shared<ASIO::ip::tcp::socket>(*ioContext_);
+        acceptor_->async_accept(*socket, [this, socket](const ASIO_ERROR 
&error) {
+            if (!error) {
+                ASIO_ERROR ignored;
+                socket->close(ignored);
+            }
+
+            if (running_ && acceptor_ && acceptor_->is_open()) {
+                scheduleAccept();
+            }
+        });
+    }
+
+    int port_{0};
+    std::atomic_bool running_{false};
+    std::unique_ptr<ASIO::io_context> ioContext_;
+    std::unique_ptr<ASIO::ip::tcp::acceptor> acceptor_;
+    std::thread serverThread_;
+};
+
+class ServiceUrlObserver {
+   public:
+    void onUpdate(const ServiceInfo &serviceInfo) {
+        std::lock_guard<std::mutex> lock(mutex_);
+        serviceUrls_.emplace_back(serviceInfo.serviceUrl());
+    }
+
+    size_t size() const {
+        std::lock_guard<std::mutex> lock(mutex_);
+        return serviceUrls_.size();
+    }
+
+    std::string last() const {
+        std::lock_guard<std::mutex> lock(mutex_);
+        return serviceUrls_.empty() ? std::string() : serviceUrls_.back();
+    }
+
+    std::vector<std::string> snapshot() const {
+        std::lock_guard<std::mutex> lock(mutex_);
+        return serviceUrls_;
+    }
+
+   private:
+    mutable std::mutex mutex_;
+    std::vector<std::string> serviceUrls_;
+};
+
+}  // namespace
+
 class ServiceInfoHolder {
    public:
     ServiceInfoHolder(ServiceInfo info) : serviceInfo_(std::move(info)) {}
@@ -93,6 +204,113 @@ class TestServiceInfoProvider : public ServiceInfoProvider 
{
     mutable std::mutex mutex_;
 };
 
+TEST(AutoClusterFailoverTest, testFailoverToFirstAvailableSecondaryAfterDelay) 
{
+    ProbeTcpServer availableSecondary;
+    ProbeTcpServer unavailableSecondary;
+    const auto primaryUrl = unavailableSecondary.getServiceUrl();
+    unavailableSecondary.stop();
+
+    ProbeTcpServer skippedSecondary;
+    const auto skippedSecondaryUrl = skippedSecondary.getServiceUrl();
+    skippedSecondary.stop();
+
+    const auto availableSecondaryUrl = availableSecondary.getServiceUrl();
+    ServiceUrlObserver observer;
+    AutoClusterFailover provider =
+        AutoClusterFailover::Builder(ServiceInfo(primaryUrl),
+                                     {ServiceInfo(skippedSecondaryUrl), 
ServiceInfo(availableSecondaryUrl)})
+            .withCheckInterval(20ms)
+            .withFailoverThreshold(6)
+            .withSwitchBackThreshold(6)
+            .build();
+
+    ASSERT_EQ(provider.initialServiceInfo().serviceUrl(), primaryUrl);
+
+    observer.onUpdate(provider.initialServiceInfo());
+    provider.initialize([&observer](const ServiceInfo &serviceInfo) { 
observer.onUpdate(serviceInfo); });
+
+    ASSERT_FALSE(waitUntil(
+        80ms, [&observer, &availableSecondaryUrl] { return observer.last() == 
availableSecondaryUrl; }));
+    ASSERT_TRUE(waitUntil(
+        2s, [&observer, &availableSecondaryUrl] { return observer.last() == 
availableSecondaryUrl; }));
+
+    const auto updates = observer.snapshot();
+    ASSERT_EQ(updates.size(), 2u);
+    ASSERT_EQ(updates[0], primaryUrl);
+    ASSERT_EQ(updates[1], availableSecondaryUrl);
+}
+
+TEST(AutoClusterFailoverTest, testSwitchBackToPrimaryAfterRecoveryDelay) {
+    ProbeTcpServer primary;
+    const auto primaryUrl = primary.getServiceUrl();
+    primary.stop();
+
+    ProbeTcpServer secondary;
+    const auto secondaryUrl = secondary.getServiceUrl();
+
+    ServiceUrlObserver observer;
+    AutoClusterFailover provider =
+        AutoClusterFailover::Builder(ServiceInfo(primaryUrl), 
{ServiceInfo(secondaryUrl)})
+            .withCheckInterval(20ms)
+            .withFailoverThreshold(4)
+            .withSwitchBackThreshold(6)
+            .build();
+
+    observer.onUpdate(provider.initialServiceInfo());
+    provider.initialize([&observer](const ServiceInfo &serviceInfo) { 
observer.onUpdate(serviceInfo); });
+
+    ASSERT_TRUE(waitUntil(2s, [&observer, &secondaryUrl] { return 
observer.last() == secondaryUrl; }));
+
+    primary.start();
+
+    ASSERT_FALSE(waitUntil(80ms, [&observer, &primaryUrl] { return 
observer.last() == primaryUrl; }));
+    ASSERT_TRUE(waitUntil(2s, [&observer, &primaryUrl] { return 
observer.last() == primaryUrl; }));
+
+    const auto updates = observer.snapshot();
+    ASSERT_EQ(updates.size(), 3u);
+    ASSERT_EQ(updates[0], primaryUrl);
+    ASSERT_EQ(updates[1], secondaryUrl);
+    ASSERT_EQ(updates[2], primaryUrl);
+}
+
+TEST(AutoClusterFailoverTest, 
testFailoverToAnotherSecondaryWhenCurrentSecondaryIsUnavailable) {
+    ProbeTcpServer primary;
+    const auto primaryUrl = primary.getServiceUrl();
+    primary.stop();
+
+    ProbeTcpServer firstSecondary;
+    const auto firstSecondaryUrl = firstSecondary.getServiceUrl();
+
+    ProbeTcpServer secondSecondary;
+    const auto secondSecondaryUrl = secondSecondary.getServiceUrl();
+
+    ServiceUrlObserver observer;
+    AutoClusterFailover provider =
+        AutoClusterFailover::Builder(ServiceInfo(primaryUrl),
+                                     {ServiceInfo(firstSecondaryUrl), 
ServiceInfo(secondSecondaryUrl)})
+            .withCheckInterval(20ms)
+            .withFailoverThreshold(4)
+            .withSwitchBackThreshold(6)
+            .build();
+
+    observer.onUpdate(provider.initialServiceInfo());
+    provider.initialize([&observer](const ServiceInfo &serviceInfo) { 
observer.onUpdate(serviceInfo); });
+
+    ASSERT_TRUE(
+        waitUntil(2s, [&observer, &firstSecondaryUrl] { return observer.last() 
== firstSecondaryUrl; }));
+
+    firstSecondary.stop();
+
+    ASSERT_TRUE(
+        waitUntil(2s, [&observer, &secondSecondaryUrl] { return 
observer.last() == secondSecondaryUrl; }));
+
+    const auto updates = observer.snapshot();
+    ASSERT_EQ(updates.size(), 3u);
+    ASSERT_EQ(updates[0], primaryUrl);
+    ASSERT_EQ(updates[1], firstSecondaryUrl);
+    ASSERT_EQ(updates[2], secondSecondaryUrl);
+}
+
 TEST(ServiceInfoProviderTest, testSwitchCluster) {
     extern std::string getToken();  // from tests/AuthTokenTest.cc
     // Access "private/auth" namespace in cluster 1

Reply via email to