This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new b47e63d PIP-121: Introduce ServiceInfoProvider to update service info
dynamically (#541)
b47e63d is described below
commit b47e63dcb21949a0ac7da18d2336a8b92d569cc6
Author: Yunze Xu <[email protected]>
AuthorDate: Fri Mar 13 14:49:04 2026 +0800
PIP-121: Introduce ServiceInfoProvider to update service info dynamically
(#541)
---
include/pulsar/Client.h | 24 +++++
include/pulsar/ClientConfiguration.h | 29 +-----
include/pulsar/ServiceInfo.h | 57 ++++++++++
include/pulsar/ServiceInfoProvider.h | 62 +++++++++++
include/pulsar/c/client_configuration.h | 7 --
lib/AtomicSharedPtr.h | 41 ++++++++
lib/BinaryProtoLookupService.h | 5 +-
lib/Client.cc | 19 ++--
lib/ClientConfiguration.cc | 13 ---
lib/ClientConfigurationImpl.h | 7 ++
lib/ClientConnection.cc | 27 +++--
lib/ClientConnection.h | 8 +-
lib/ClientImpl.cc | 160 +++++++++++++++++++----------
lib/ClientImpl.h | 58 +++++++++--
lib/ConnectionPool.cc | 36 +++----
lib/ConnectionPool.h | 21 +++-
lib/ConsumerImpl.cc | 17 ++-
lib/ConsumerImpl.h | 7 ++
lib/DefaultServiceInfoProvider.h | 42 ++++++++
lib/HTTPLookupService.cc | 13 ++-
lib/HTTPLookupService.h | 4 +-
lib/MultiTopicsConsumerImpl.cc | 22 ++--
lib/MultiTopicsConsumerImpl.h | 7 +-
lib/PartitionedProducerImpl.cc | 8 +-
lib/PartitionedProducerImpl.h | 3 -
lib/PatternMultiTopicsConsumerImpl.cc | 13 +--
lib/PatternMultiTopicsConsumerImpl.h | 1 -
lib/ReaderImpl.cc | 1 -
lib/ServiceInfo.cc | 35 +++++++
lib/c/c_ClientConfiguration.cc | 12 ---
perf/PerfConsumer.cc | 5 -
perf/PerfProducer.cc | 5 -
tests/AuthTokenTest.cc | 2 +-
tests/BasicEndToEndTest.cc | 3 +-
tests/LookupServiceTest.cc | 91 ++++++++--------
tests/ServiceInfoProviderTest.cc | 177 ++++++++++++++++++++++++++++++++
36 files changed, 787 insertions(+), 255 deletions(-)
diff --git a/include/pulsar/Client.h b/include/pulsar/Client.h
index 5613066..e9813e3 100644
--- a/include/pulsar/Client.h
+++ b/include/pulsar/Client.h
@@ -29,9 +29,12 @@
#include <pulsar/Reader.h>
#include <pulsar/Result.h>
#include <pulsar/Schema.h>
+#include <pulsar/ServiceInfo.h>
+#include <pulsar/ServiceInfoProvider.h>
#include <pulsar/TableView.h>
#include <pulsar/defines.h>
+#include <memory>
#include <string>
namespace pulsar {
@@ -68,6 +71,20 @@ class PULSAR_PUBLIC Client {
*/
Client(const std::string& serviceUrl, const ClientConfiguration&
clientConfiguration);
+ /**
+ * Create a Pulsar client object using the specified ServiceInfoProvider.
+ *
+ * The ServiceInfoProvider is responsible for providing the service
information (such as service URL)
+ * dynamically. For example, if it detects a primary Pulsar service is
down, it can switch to a secondary
+ * service and update the client with the new service information.
+ *
+ * The Client instance takes ownership of the given ServiceInfoProvider.
The provider will be destroyed
+ * as part of the client's shutdown lifecycle, for example when
`Client::close()` or
+ * `Client::closeAsync()` is called, ensuring that its lifetime is
properly managed.
+ */
+ static Client create(std::unique_ptr<ServiceInfoProvider>
serviceInfoProvider,
+ const ClientConfiguration& clientConfiguration);
+
/**
* Create a producer with default configuration
*
@@ -414,6 +431,13 @@ class PULSAR_PUBLIC Client {
void getSchemaInfoAsync(const std::string& topic, int64_t version,
std::function<void(Result, const SchemaInfo&)>
callback);
+ /**
+ * Get the current service information of the client.
+ *
+ * @return the current service information
+ */
+ ServiceInfo getServiceInfo() const;
+
private:
Client(const std::shared_ptr<ClientImpl>&);
diff --git a/include/pulsar/ClientConfiguration.h
b/include/pulsar/ClientConfiguration.h
index 98ccff7..b37b7c6 100644
--- a/include/pulsar/ClientConfiguration.h
+++ b/include/pulsar/ClientConfiguration.h
@@ -70,15 +70,12 @@ class PULSAR_PUBLIC ClientConfiguration {
/**
* Set the authentication method to be used with the broker
*
+ * You can get the configured authentication data in `ServiceInfo`
returned by `Client::getServiceInfo`.
+ *
* @param authentication the authentication data to use
*/
ClientConfiguration& setAuth(const AuthenticationPtr& authentication);
- /**
- * @return the authentication data
- */
- Authentication& getAuth() const;
-
/**
* Set timeout on client operations (subscribe, create producer, close,
unsubscribe)
* Default is 30 seconds.
@@ -202,20 +199,6 @@ class PULSAR_PUBLIC ClientConfiguration {
*/
ClientConfiguration& setLogger(LoggerFactory* loggerFactory);
- /**
- * Configure whether to use the TLS encryption on the connections.
- *
- * The default value is false.
- *
- * @param useTls
- */
- ClientConfiguration& setUseTls(bool useTls);
-
- /**
- * @return whether the TLS encryption is used on the connections
- */
- bool isUseTls() const;
-
/**
* Set the path to the TLS private key file.
*
@@ -243,15 +226,13 @@ class PULSAR_PUBLIC ClientConfiguration {
/**
* Set the path to the trusted TLS certificate file.
*
+ * You can get the configured trusted TLS certificate file path in
`ServiceInfo` returned by
+ * `Client::getServiceInfo`.
+ *
* @param tlsTrustCertsFilePath
*/
ClientConfiguration& setTlsTrustCertsFilePath(const std::string&
tlsTrustCertsFilePath);
- /**
- * @return the path to the trusted TLS certificate file
- */
- const std::string& getTlsTrustCertsFilePath() const;
-
/**
* Configure whether the Pulsar client accepts untrusted TLS certificates
from brokers.
*
diff --git a/include/pulsar/ServiceInfo.h b/include/pulsar/ServiceInfo.h
new file mode 100644
index 0000000..1f63ce3
--- /dev/null
+++ b/include/pulsar/ServiceInfo.h
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef PULSAR_SERVICE_INFO_H_
+#define PULSAR_SERVICE_INFO_H_
+
+#include <pulsar/Authentication.h>
+
+#include <optional>
+#include <string>
+
+namespace pulsar {
+
+/**
+ * ServiceInfo encapsulates the information of a Pulsar service, which is used
by the client to connect to the
+ * service. It includes the service URL, authentication information, and TLS
configuration.
+ */
+class PULSAR_PUBLIC ServiceInfo final {
+ public:
+ ServiceInfo(std::string serviceUrl, AuthenticationPtr authentication =
AuthFactory::Disabled(),
+ std::optional<std::string> tlsTrustCertsFilePath =
std::nullopt);
+
+ auto& serviceUrl() const noexcept { return serviceUrl_; }
+ auto useTls() const noexcept { return useTls_; }
+ auto& authentication() const noexcept { return authentication_; }
+ auto& tlsTrustCertsFilePath() const noexcept { return
tlsTrustCertsFilePath_; }
+
+ bool operator==(const ServiceInfo& other) const noexcept {
+ return serviceUrl_ == other.serviceUrl_ && useTls_ == other.useTls_ &&
+ authentication_ == other.authentication_ &&
+ tlsTrustCertsFilePath_ == other.tlsTrustCertsFilePath_;
+ }
+
+ private:
+ std::string serviceUrl_;
+ bool useTls_;
+ AuthenticationPtr authentication_;
+ std::optional<std::string> tlsTrustCertsFilePath_;
+};
+
+} // namespace pulsar
+#endif
diff --git a/include/pulsar/ServiceInfoProvider.h
b/include/pulsar/ServiceInfoProvider.h
new file mode 100644
index 0000000..1b518da
--- /dev/null
+++ b/include/pulsar/ServiceInfoProvider.h
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef PULSAR_SERVICE_INFO_PROVIDER_H_
+#define PULSAR_SERVICE_INFO_PROVIDER_H_
+
+#include <pulsar/ServiceInfo.h>
+
+#include <functional>
+
+namespace pulsar {
+
+class PULSAR_PUBLIC ServiceInfoProvider {
+ public:
+ /**
+ * The destructor will be called when `Client::close()` is invoked, and
the provider should stop any
+ * ongoing work and release the resources in the destructor.
+ */
+ virtual ~ServiceInfoProvider() = default;
+
+ /**
+ * Get the initial `ServiceInfo` connection for the client.
+ * This method is called **only once** internally in `Client::create()` to
get the initial `ServiceInfo`
+ * for the client to connect to the Pulsar service, typically before
{@link initialize} is invoked.
+ * Since it's only called once, it's legal to return a moved `ServiceInfo`
object to avoid unnecessary
+ * copying.
+ */
+ virtual ServiceInfo initialServiceInfo() = 0;
+
+ /**
+ * Initialize the ServiceInfoProvider.
+ *
+ * After the client has obtained the initial `ServiceInfo` via {@link
initialServiceInfo}, this method is
+ * called to allow the provider to start any background work (for example,
service discovery or watching
+ * configuration changes) and to report subsequent updates to the service
information.
+ *
+ * @param onServiceInfoUpdate the callback to deliver updated
`ServiceInfo` values to the client after
+ * the initial connection has been established
+ *
+ * Implementations may choose not to invoke `onServiceInfoUpdate` if the
`ServiceInfo` never changes.
+ */
+ virtual void initialize(std::function<void(ServiceInfo)>
onServiceInfoUpdate) = 0;
+};
+
+}; // namespace pulsar
+
+#endif
diff --git a/include/pulsar/c/client_configuration.h
b/include/pulsar/c/client_configuration.h
index 9e15453..1be7c1f 100644
--- a/include/pulsar/c/client_configuration.h
+++ b/include/pulsar/c/client_configuration.h
@@ -147,16 +147,9 @@ PULSAR_PUBLIC void
pulsar_client_configuration_set_logger(pulsar_client_configur
PULSAR_PUBLIC void
pulsar_client_configuration_set_logger_t(pulsar_client_configuration_t *conf,
pulsar_logger_t
logger);
-PULSAR_PUBLIC void
pulsar_client_configuration_set_use_tls(pulsar_client_configuration_t *conf,
int useTls);
-
-PULSAR_PUBLIC int
pulsar_client_configuration_is_use_tls(pulsar_client_configuration_t *conf);
-
PULSAR_PUBLIC void pulsar_client_configuration_set_tls_trust_certs_file_path(
pulsar_client_configuration_t *conf, const char *tlsTrustCertsFilePath);
-PULSAR_PUBLIC const char
*pulsar_client_configuration_get_tls_trust_certs_file_path(
- pulsar_client_configuration_t *conf);
-
PULSAR_PUBLIC void
pulsar_client_configuration_set_tls_allow_insecure_connection(
pulsar_client_configuration_t *conf, int allowInsecure);
diff --git a/lib/AtomicSharedPtr.h b/lib/AtomicSharedPtr.h
new file mode 100644
index 0000000..30dea07
--- /dev/null
+++ b/lib/AtomicSharedPtr.h
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#include <memory>
+namespace pulsar {
+
+// C++17 does not have std::atomic<std::shared_ptr<T>>, so we have to manually
implement it.
+template <typename T>
+class AtomicSharedPtr {
+ public:
+ using Pointer = std::shared_ptr<const T>;
+
+ AtomicSharedPtr() = default;
+ explicit AtomicSharedPtr(T value) : ptr_(std::make_shared<const
T>(std::move(value))) {}
+
+ auto load() const { return std::atomic_load(&ptr_); }
+
+ void store(Pointer&& newPtr) { std::atomic_store(&ptr_,
std::move(newPtr)); }
+
+ private:
+ std::shared_ptr<const T> ptr_;
+};
+
+} // namespace pulsar
diff --git a/lib/BinaryProtoLookupService.h b/lib/BinaryProtoLookupService.h
index 948c7f1..35dcb16 100644
--- a/lib/BinaryProtoLookupService.h
+++ b/lib/BinaryProtoLookupService.h
@@ -22,6 +22,7 @@
#include <pulsar/Authentication.h>
#include <pulsar/ClientConfiguration.h>
#include <pulsar/Schema.h>
+#include <pulsar/ServiceInfo.h>
#include <mutex>
@@ -38,9 +39,9 @@ using GetSchemaPromisePtr = std::shared_ptr<Promise<Result,
SchemaInfo>>;
class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService {
public:
- BinaryProtoLookupService(const std::string& serviceUrl, ConnectionPool&
pool,
+ BinaryProtoLookupService(const ServiceInfo& serviceInfo, ConnectionPool&
pool,
const ClientConfiguration& clientConfiguration)
- : serviceNameResolver_(serviceUrl),
+ : serviceNameResolver_(serviceInfo.serviceUrl()),
cnxPool_(pool),
listenerName_(clientConfiguration.getListenerName()),
maxLookupRedirects_(clientConfiguration.getMaxLookupRedirects()) {}
diff --git a/lib/Client.cc b/lib/Client.cc
index 39a5948..48e92dd 100644
--- a/lib/Client.cc
+++ b/lib/Client.cc
@@ -17,6 +17,7 @@
* under the License.
*/
#include <pulsar/Client.h>
+#include <pulsar/ServiceInfoProvider.h>
#include <iostream>
#include <memory>
@@ -33,13 +34,17 @@ DECLARE_LOG_OBJECT()
namespace pulsar {
-Client::Client(const std::shared_ptr<ClientImpl>& impl) : impl_(impl) {}
+Client::Client(const std::shared_ptr<ClientImpl>& impl) : impl_(impl) {
impl_->initialize(); }
-Client::Client(const std::string& serviceUrl)
- : impl_(std::make_shared<ClientImpl>(serviceUrl, ClientConfiguration())) {}
+Client::Client(const std::string& serviceUrl) : Client(serviceUrl,
ClientConfiguration()) {}
Client::Client(const std::string& serviceUrl, const ClientConfiguration&
clientConfiguration)
- : impl_(std::make_shared<ClientImpl>(serviceUrl, clientConfiguration)) {}
+ : Client(std::make_shared<ClientImpl>(serviceUrl, clientConfiguration)) {}
+
+Client Client::create(std::unique_ptr<ServiceInfoProvider> serviceInfoProvider,
+ const ClientConfiguration& clientConfiguration) {
+ return Client(std::make_shared<ClientImpl>(std::move(serviceInfoProvider),
clientConfiguration));
+}
Result Client::createProducer(const std::string& topic, Producer& producer) {
return createProducer(topic, ProducerConfiguration(), producer);
@@ -193,8 +198,10 @@ uint64_t Client::getNumberOfConsumers() { return
impl_->getNumberOfConsumers();
void Client::getSchemaInfoAsync(const std::string& topic, int64_t version,
std::function<void(Result, const SchemaInfo&)>
callback) {
- impl_->getLookup()
- ->getSchema(TopicName::get(topic), (version >= 0) ?
toBigEndianBytes(version) : "")
+ impl_->getSchema(TopicName::get(topic), (version >= 0) ?
toBigEndianBytes(version) : "")
.addListener(std::move(callback));
}
+
+ServiceInfo Client::getServiceInfo() const { return impl_->getServiceInfo(); }
+
} // namespace pulsar
diff --git a/lib/ClientConfiguration.cc b/lib/ClientConfiguration.cc
index b99c5d2..c59dd43 100644
--- a/lib/ClientConfiguration.cc
+++ b/lib/ClientConfiguration.cc
@@ -57,8 +57,6 @@ ClientConfiguration& ClientConfiguration::setAuth(const
AuthenticationPtr& authe
return *this;
}
-Authentication& ClientConfiguration::getAuth() const { return
*impl_->authenticationPtr; }
-
const AuthenticationPtr& ClientConfiguration::getAuthPtr() const { return
impl_->authenticationPtr; }
ClientConfiguration& ClientConfiguration::setOperationTimeoutSeconds(int
timeout) {
@@ -94,13 +92,6 @@ ClientConfiguration&
ClientConfiguration::setMessageListenerThreads(int threads)
int ClientConfiguration::getMessageListenerThreads() const { return
impl_->messageListenerThreads; }
-ClientConfiguration& ClientConfiguration::setUseTls(bool useTls) {
- impl_->useTls = useTls;
- return *this;
-}
-
-bool ClientConfiguration::isUseTls() const { return impl_->useTls; }
-
ClientConfiguration& ClientConfiguration::setValidateHostName(bool
validateHostName) {
impl_->validateHostName = validateHostName;
return *this;
@@ -131,10 +122,6 @@ ClientConfiguration&
ClientConfiguration::setTlsTrustCertsFilePath(const std::st
return *this;
}
-const std::string& ClientConfiguration::getTlsTrustCertsFilePath() const {
- return impl_->tlsTrustCertsFilePath;
-}
-
ClientConfiguration& ClientConfiguration::setTlsAllowInsecureConnection(bool
allowInsecure) {
impl_->tlsAllowInsecureConnection = allowInsecure;
return *this;
diff --git a/lib/ClientConfigurationImpl.h b/lib/ClientConfigurationImpl.h
index e7a83a1..45b2aa3 100644
--- a/lib/ClientConfigurationImpl.h
+++ b/lib/ClientConfigurationImpl.h
@@ -20,8 +20,10 @@
#define LIB_CLIENTCONFIGURATIONIMPL_H_
#include <pulsar/ClientConfiguration.h>
+#include <pulsar/ServiceInfo.h>
#include <chrono>
+#include <optional>
namespace pulsar {
@@ -53,6 +55,11 @@ struct ClientConfigurationImpl {
ClientConfiguration::ProxyProtocol proxyProtocol;
std::unique_ptr<LoggerFactory> takeLogger() { return
std::move(loggerFactory); }
+
+ ServiceInfo toServiceInfo(const std::string& serviceUrl) const {
+ return {serviceUrl, authenticationPtr,
+ tlsTrustCertsFilePath.empty() ? std::nullopt :
std::make_optional(tlsTrustCertsFilePath)};
+ }
};
} // namespace pulsar
diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc
index c373c25..cc7e1f6 100644
--- a/lib/ClientConnection.cc
+++ b/lib/ClientConnection.cc
@@ -19,7 +19,9 @@
#include "ClientConnection.h"
#include <openssl/x509.h>
+#include <pulsar/Authentication.h>
#include <pulsar/MessageIdBuilder.h>
+#include <pulsar/ServiceInfo.h>
#include <chrono>
#include <fstream>
@@ -37,6 +39,8 @@
#include "ProducerImpl.h"
#include "PulsarApi.pb.h"
#include "ResultUtils.h"
+#include "ServiceNameResolver.h"
+#include "ServiceURI.h"
#include "Url.h"
#include "auth/AuthOauth2.h"
#include "auth/InitialAuthData.h"
@@ -179,12 +183,11 @@ static bool file_exists(const std::string& path) {
std::atomic<int32_t>
ClientConnection::maxMessageSize_{Commands::DefaultMaxMessageSize};
ClientConnection::ClientConnection(const std::string& logicalAddress, const
std::string& physicalAddress,
- const ExecutorServicePtr& executor,
+ const ServiceInfo& serviceInfo, const
ExecutorServicePtr& executor,
const ClientConfiguration&
clientConfiguration,
- const AuthenticationPtr& authentication,
const std::string& clientVersion,
- ConnectionPool& pool, size_t poolIndex)
+ const std::string& clientVersion,
ConnectionPool& pool, size_t poolIndex)
: operationsTimeout_(ClientImpl::getOperationTimeout(clientConfiguration)),
- authentication_(authentication),
+ authentication_(serviceInfo.authentication()),
serverProtocolVersion_(proto::ProtocolVersion_MIN),
executor_(executor),
resolver_(executor_->createTcpResolver()),
@@ -210,15 +213,14 @@ ClientConnection::ClientConnection(const std::string&
logicalAddress, const std:
return;
}
- auto oauth2Auth = std::dynamic_pointer_cast<AuthOauth2>(authentication_);
- if (oauth2Auth) {
+ if (auto oauth2Auth =
std::dynamic_pointer_cast<AuthOauth2>(authentication_)) {
// Configure the TLS trust certs file for Oauth2
auto authData = std::dynamic_pointer_cast<AuthenticationDataProvider>(
-
std::make_shared<InitialAuthData>(clientConfiguration.getTlsTrustCertsFilePath()));
+
std::make_shared<InitialAuthData>(serviceInfo.tlsTrustCertsFilePath().value_or("")));
oauth2Auth->getAuthData(authData);
}
- if (clientConfiguration.isUseTls()) {
+ if (serviceInfo.useTls()) {
ASIO::ssl::context ctx(ASIO::ssl::context::sslv23_client);
ctx.set_options(ASIO::ssl::context::default_workarounds |
ASIO::ssl::context::no_sslv2 |
ASIO::ssl::context::no_sslv3 |
ASIO::ssl::context::no_tlsv1 |
@@ -239,8 +241,8 @@ ClientConnection::ClientConnection(const std::string&
logicalAddress, const std:
} else {
ctx.set_verify_mode(ASIO::ssl::context::verify_peer);
- const auto& trustCertFilePath =
clientConfiguration.getTlsTrustCertsFilePath();
- if (!trustCertFilePath.empty()) {
+ if (serviceInfo.tlsTrustCertsFilePath()) {
+ const auto& trustCertFilePath =
*serviceInfo.tlsTrustCertsFilePath();
if (file_exists(trustCertFilePath)) {
ctx.load_verify_file(trustCertFilePath);
} else {
@@ -1247,7 +1249,7 @@ void ClientConnection::handleConsumerStatsTimeout(const
ASIO_ERROR& ec,
startConsumerStatsTimer(consumerStatsRequests);
}
-const std::future<void>& ClientConnection::close(Result result) {
+const std::future<void>& ClientConnection::close(Result result, bool
switchCluster) {
Lock lock(mutex_);
if (closeFuture_) {
connectPromise_.setFailed(result);
@@ -1332,6 +1334,9 @@ const std::future<void>& ClientConnection::close(Result
result) {
for (ConsumersMap::iterator it = consumers.begin(); it != consumers.end();
++it) {
auto consumer = it->second.lock();
if (consumer) {
+ if (switchCluster) {
+ consumer->onClusterSwitching();
+ }
consumer->handleDisconnection(result, self);
}
}
diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h
index b9880ee..75e4bca 100644
--- a/lib/ClientConnection.h
+++ b/lib/ClientConnection.h
@@ -20,6 +20,7 @@
#define _PULSAR_CLIENT_CONNECTION_HEADER_
#include <pulsar/ClientConfiguration.h>
+#include <pulsar/ServiceInfo.h>
#include <pulsar/defines.h>
#include <any>
@@ -145,8 +146,8 @@ class PULSAR_PUBLIC ClientConnection : public
std::enable_shared_from_this<Clien
*
*/
ClientConnection(const std::string& logicalAddress, const std::string&
physicalAddress,
- const ExecutorServicePtr& executor, const
ClientConfiguration& clientConfiguration,
- const AuthenticationPtr& authentication, const
std::string& clientVersion,
+ const ServiceInfo& serviceInfo, const ExecutorServicePtr&
executor,
+ const ClientConfiguration& clientConfiguration, const
std::string& clientVersion,
ConnectionPool& pool, size_t poolIndex);
~ClientConnection();
@@ -160,8 +161,9 @@ class PULSAR_PUBLIC ClientConnection : public
std::enable_shared_from_this<Clien
* Close the connection.
*
* @param result all pending futures will complete with this result
+ * @param switchCluster whether the close is triggered by cluster switching
*/
- const std::future<void>& close(Result result = ResultConnectError);
+ const std::future<void>& close(Result result = ResultConnectError, bool
switchCluster = false);
bool isClosed() const;
diff --git a/lib/ClientImpl.cc b/lib/ClientImpl.cc
index eec3b34..b84c14c 100644
--- a/lib/ClientImpl.cc
+++ b/lib/ClientImpl.cc
@@ -24,7 +24,9 @@
#include <algorithm>
#include <chrono>
#include <iterator>
+#include <mutex>
#include <random>
+#include <shared_mutex>
#include <sstream>
#include "BinaryProtoLookupService.h"
@@ -32,6 +34,7 @@
#include "Commands.h"
#include "ConsumerImpl.h"
#include "ConsumerInterceptors.h"
+#include "DefaultServiceInfoProvider.h"
#include "ExecutorService.h"
#include "HTTPLookupService.h"
#include "LogUtils.h"
@@ -74,20 +77,17 @@ std::string generateRandomName() {
return randomName;
}
-typedef std::unique_lock<std::mutex> Lock;
-
typedef std::vector<std::string> StringList;
-static LookupServicePtr defaultLookupServiceFactory(const std::string&
serviceUrl,
+static LookupServicePtr defaultLookupServiceFactory(const ServiceInfo&
serviceInfo,
const ClientConfiguration&
clientConfiguration,
- ConnectionPool& pool,
const AuthenticationPtr& auth) {
- if (ServiceNameResolver::useHttp(ServiceURI(serviceUrl))) {
+ ConnectionPool& pool) {
+ if (ServiceNameResolver::useHttp(ServiceURI(serviceInfo.serviceUrl()))) {
LOG_DEBUG("Using HTTP Lookup");
- return std::make_shared<HTTPLookupService>(serviceUrl,
std::cref(clientConfiguration),
- std::cref(auth));
+ return std::make_shared<HTTPLookupService>(std::cref(serviceInfo),
std::cref(clientConfiguration));
} else {
LOG_DEBUG("Using Binary Lookup");
- return std::make_shared<BinaryProtoLookupService>(serviceUrl,
std::ref(pool),
+ return
std::make_shared<BinaryProtoLookupService>(std::cref(serviceInfo),
std::ref(pool),
std::cref(clientConfiguration));
}
}
@@ -97,17 +97,28 @@ ClientImpl::ClientImpl(const std::string& serviceUrl, const
ClientConfiguration&
ClientImpl::ClientImpl(const std::string& serviceUrl, const
ClientConfiguration& clientConfiguration,
LookupServiceFactory&& lookupServiceFactory)
- : mutex_(),
+ :
ClientImpl(std::make_unique<DefaultServiceInfoProvider>(std::cref(serviceUrl),
+
std::cref(*clientConfiguration.impl_)),
+ clientConfiguration, std::move(lookupServiceFactory)) {}
+
+ClientImpl::ClientImpl(std::unique_ptr<ServiceInfoProvider>
serviceInfoProvider,
+ const ClientConfiguration& clientConfiguration)
+ : ClientImpl(std::move(serviceInfoProvider), clientConfiguration,
&defaultLookupServiceFactory) {}
+
+ClientImpl::ClientImpl(std::unique_ptr<ServiceInfoProvider>
serviceInfoProvider,
+ const ClientConfiguration& clientConfiguration,
+ LookupServiceFactory&& lookupServiceFactory)
+ : serviceInfoProvider_(std::move(serviceInfoProvider)),
state_(Open),
- clientConfiguration_(ClientConfiguration(clientConfiguration)
-
.setUseTls(ServiceNameResolver::useTls(ServiceURI(serviceUrl)))),
+ clientConfiguration_(clientConfiguration),
+ serviceInfo_(serviceInfoProvider_->initialServiceInfo()),
memoryLimitController_(clientConfiguration.getMemoryLimit()),
ioExecutorProvider_(std::make_shared<ExecutorServiceProvider>(clientConfiguration_.getIOThreads())),
listenerExecutorProvider_(
std::make_shared<ExecutorServiceProvider>(clientConfiguration_.getMessageListenerThreads())),
partitionListenerExecutorProvider_(
std::make_shared<ExecutorServiceProvider>(clientConfiguration_.getMessageListenerThreads())),
- pool_(clientConfiguration_, ioExecutorProvider_,
clientConfiguration_.getAuthPtr(),
+ pool_(serviceInfo_, clientConfiguration_, ioExecutorProvider_,
ClientImpl::getClientVersion(clientConfiguration)),
producerIdGenerator_(0),
consumerIdGenerator_(0),
@@ -119,14 +130,24 @@ ClientImpl::ClientImpl(const std::string& serviceUrl,
const ClientConfiguration&
if (loggerFactory) {
LogUtils::setLoggerFactory(std::move(loggerFactory));
}
- lookupServicePtr_ = createLookup(serviceUrl);
+
+ lookupServicePtr_ = createLookup(*serviceInfo_.load());
}
ClientImpl::~ClientImpl() { shutdown(); }
-LookupServicePtr ClientImpl::createLookup(const std::string& serviceUrl) {
+void ClientImpl::initialize() {
+ auto weakSelf = weak_from_this();
+ serviceInfoProvider_->initialize([weakSelf](ServiceInfo serviceInfo) {
+ if (auto self = weakSelf.lock()) {
+ self->updateServiceInfo(std::move(serviceInfo));
+ }
+ });
+}
+
+LookupServicePtr ClientImpl::createLookup(ServiceInfo serviceInfo) {
auto lookupServicePtr = RetryableLookupService::create(
- lookupServiceFactory_(serviceUrl, clientConfiguration_, pool_,
clientConfiguration_.getAuthPtr()),
+ lookupServiceFactory_(std::move(serviceInfo), clientConfiguration_,
pool_),
clientConfiguration_.impl_->operationTimeout, ioExecutorProvider_);
return lookupServicePtr;
}
@@ -144,19 +165,26 @@ ExecutorServiceProviderPtr
ClientImpl::getPartitionListenerExecutorProvider() {
}
LookupServicePtr ClientImpl::getLookup(const std::string&
redirectedClusterURI) {
+ std::shared_lock readLock(mutex_);
if (redirectedClusterURI.empty()) {
return lookupServicePtr_;
}
- Lock lock(mutex_);
- auto it = redirectedClusterLookupServicePtrs_.find(redirectedClusterURI);
- if (it == redirectedClusterLookupServicePtrs_.end()) {
- auto lookup = createLookup(redirectedClusterURI);
- redirectedClusterLookupServicePtrs_.emplace(redirectedClusterURI,
lookup);
- return lookup;
+ if (auto it =
redirectedClusterLookupServicePtrs_.find(redirectedClusterURI);
+ it != redirectedClusterLookupServicePtrs_.end()) {
+ return it->second;
}
+ readLock.unlock();
- return it->second;
+ std::unique_lock writeLock(mutex_);
+ // Double check in case another thread acquires the lock and inserts a
pair first
+ if (auto it =
redirectedClusterLookupServicePtrs_.find(redirectedClusterURI);
+ it != redirectedClusterLookupServicePtrs_.end()) {
+ return it->second;
+ }
+ auto lookup = createRedirectedLookup(redirectedClusterURI);
+ redirectedClusterLookupServicePtrs_.emplace(redirectedClusterURI, lookup);
+ return lookup;
}
void ClientImpl::createProducerAsync(const std::string& topic, const
ProducerConfiguration& conf,
@@ -166,7 +194,7 @@ void ClientImpl::createProducerAsync(const std::string&
topic, const ProducerCon
}
TopicNamePtr topicName;
{
- Lock lock(mutex_);
+ std::shared_lock lock(mutex_);
if (state_ != Open) {
lock.unlock();
callback(ResultAlreadyClosed, Producer());
@@ -180,7 +208,7 @@ void ClientImpl::createProducerAsync(const std::string&
topic, const ProducerCon
if (autoDownloadSchema) {
auto self = shared_from_this();
- lookupServicePtr_->getSchema(topicName).addListener(
+ getSchema(topicName).addListener(
[self, topicName, callback](Result res, const SchemaInfo&
topicSchema) {
if (res != ResultOk) {
callback(res, Producer());
@@ -188,12 +216,12 @@ void ClientImpl::createProducerAsync(const std::string&
topic, const ProducerCon
}
ProducerConfiguration conf;
conf.setSchema(topicSchema);
-
self->lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener(
+ self->getPartitionMetadataAsync(topicName).addListener(
std::bind(&ClientImpl::handleCreateProducer, self,
std::placeholders::_1,
std::placeholders::_2, topicName, conf,
callback));
});
} else {
- lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener(
+ getPartitionMetadataAsync(topicName).addListener(
std::bind(&ClientImpl::handleCreateProducer, shared_from_this(),
std::placeholders::_1,
std::placeholders::_2, topicName, conf, callback));
}
@@ -253,7 +281,7 @@ void ClientImpl::createReaderAsync(const std::string&
topic, const MessageId& st
const ReaderConfiguration& conf, const
ReaderCallback& callback) {
TopicNamePtr topicName;
{
- Lock lock(mutex_);
+ std::shared_lock lock(mutex_);
if (state_ != Open) {
lock.unlock();
callback(ResultAlreadyClosed, Reader());
@@ -266,7 +294,7 @@ void ClientImpl::createReaderAsync(const std::string&
topic, const MessageId& st
}
MessageId msgId(startMessageId);
- lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener(
+ getPartitionMetadataAsync(topicName).addListener(
std::bind(&ClientImpl::handleReaderMetadataLookup, shared_from_this(),
std::placeholders::_1,
std::placeholders::_2, topicName, msgId, conf, callback));
}
@@ -275,7 +303,7 @@ void ClientImpl::createTableViewAsync(const std::string&
topic, const TableViewC
const TableViewCallback& callback) {
TopicNamePtr topicName;
{
- Lock lock(mutex_);
+ std::shared_lock lock(mutex_);
if (state_ != Open) {
lock.unlock();
callback(ResultAlreadyClosed, TableView());
@@ -341,7 +369,7 @@ void ClientImpl::subscribeWithRegexAsync(const std::string&
regexPattern, const
const SubscribeCallback& callback) {
TopicNamePtr topicNamePtr = TopicName::get(regexPattern);
- Lock lock(mutex_);
+ std::shared_lock lock(mutex_);
if (state_ != Open) {
lock.unlock();
callback(ResultAlreadyClosed, Consumer());
@@ -379,7 +407,7 @@ void ClientImpl::subscribeWithRegexAsync(const std::string&
regexPattern, const
return;
}
-
lookupServicePtr_->getTopicsOfNamespaceAsync(topicNamePtr->getNamespaceName(),
mode)
+ getTopicsOfNamespaceAsync(topicNamePtr->getNamespaceName(), mode)
.addListener(std::bind(&ClientImpl::createPatternMultiTopicsConsumer,
shared_from_this(),
std::placeholders::_1, std::placeholders::_2,
regexPattern, mode,
subscriptionName, conf, callback));
@@ -401,9 +429,8 @@ void ClientImpl::createPatternMultiTopicsConsumer(Result
result, const Namespace
auto interceptors =
std::make_shared<ConsumerInterceptors>(conf.getInterceptors());
- consumer =
std::make_shared<PatternMultiTopicsConsumerImpl>(shared_from_this(),
regexPattern, mode,
-
*matchTopics, subscriptionName, conf,
-
lookupServicePtr_, interceptors);
+ consumer = std::make_shared<PatternMultiTopicsConsumerImpl>(
+ shared_from_this(), regexPattern, mode, *matchTopics,
subscriptionName, conf, interceptors);
consumer->getConsumerCreatedFuture().addListener(
std::bind(&ClientImpl::handleConsumerCreated, shared_from_this(),
std::placeholders::_1,
@@ -426,7 +453,7 @@ void ClientImpl::subscribeAsync(const
std::vector<std::string>& originalTopics,
auto it = std::unique(topics.begin(), topics.end());
auto newSize = std::distance(topics.begin(), it);
topics.resize(newSize);
- Lock lock(mutex_);
+ std::shared_lock lock(mutex_);
if (state_ != Open) {
lock.unlock();
callback(ResultAlreadyClosed, Consumer());
@@ -450,7 +477,7 @@ void ClientImpl::subscribeAsync(const
std::vector<std::string>& originalTopics,
auto interceptors =
std::make_shared<ConsumerInterceptors>(conf.getInterceptors());
ConsumerImplBasePtr consumer = std::make_shared<MultiTopicsConsumerImpl>(
- shared_from_this(), topics, subscriptionName, topicNamePtr, conf,
lookupServicePtr_, interceptors);
+ shared_from_this(), topics, subscriptionName, topicNamePtr, conf,
interceptors);
consumer->getConsumerCreatedFuture().addListener(std::bind(&ClientImpl::handleConsumerCreated,
shared_from_this(), std::placeholders::_1,
@@ -462,7 +489,7 @@ void ClientImpl::subscribeAsync(const std::string& topic,
const std::string& sub
const ConsumerConfiguration& conf, const
SubscribeCallback& callback) {
TopicNamePtr topicName;
{
- Lock lock(mutex_);
+ std::shared_lock lock(mutex_);
if (state_ != Open) {
lock.unlock();
callback(ResultAlreadyClosed, Consumer());
@@ -480,7 +507,7 @@ void ClientImpl::subscribeAsync(const std::string& topic,
const std::string& sub
}
}
- lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener(
+ getPartitionMetadataAsync(topicName).addListener(
std::bind(&ClientImpl::handleSubscribe, shared_from_this(),
std::placeholders::_1,
std::placeholders::_2, topicName, subscriptionName, conf,
callback));
}
@@ -503,9 +530,9 @@ void ClientImpl::handleSubscribe(Result result, const
LookupDataResultPtr& parti
callback(ResultInvalidConfiguration, Consumer());
return;
}
- consumer = std::make_shared<MultiTopicsConsumerImpl>(
- shared_from_this(), topicName,
partitionMetadata->getPartitions(), subscriptionName, conf,
- lookupServicePtr_, interceptors);
+ consumer =
std::make_shared<MultiTopicsConsumerImpl>(shared_from_this(), topicName,
+
partitionMetadata->getPartitions(),
+
subscriptionName, conf, interceptors);
} else {
auto consumerImpl =
std::make_shared<ConsumerImpl>(shared_from_this(), topicName->toString(),
subscriptionName, conf,
@@ -647,7 +674,7 @@ void ClientImpl::handleGetPartitions(Result result, const
LookupDataResultPtr& p
void ClientImpl::getPartitionsForTopicAsync(const std::string& topic, const
GetPartitionsCallback& callback) {
TopicNamePtr topicName;
{
- Lock lock(mutex_);
+ std::shared_lock lock(mutex_);
if (state_ != Open) {
lock.unlock();
callback(ResultAlreadyClosed, StringList());
@@ -658,13 +685,16 @@ void ClientImpl::getPartitionsForTopicAsync(const
std::string& topic, const GetP
return;
}
}
- lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener(
- std::bind(&ClientImpl::handleGetPartitions, shared_from_this(),
std::placeholders::_1,
- std::placeholders::_2, topicName, callback));
+
getPartitionMetadataAsync(topicName).addListener(std::bind(&ClientImpl::handleGetPartitions,
+
shared_from_this(), std::placeholders::_1,
+
std::placeholders::_2, topicName, callback));
}
void ClientImpl::closeAsync(const CloseCallback& callback) {
+ serviceInfoProvider_.reset();
+ std::unique_lock lock(mutex_);
if (state_ != Open) {
+ lock.unlock();
if (callback) {
callback(ResultAlreadyClosed);
}
@@ -678,6 +708,8 @@ void ClientImpl::closeAsync(const CloseCallback& callback) {
for (const auto& it : redirectedClusterLookupServicePtrs_) {
it.second->close();
}
+ redirectedClusterLookupServicePtrs_.clear();
+ lock.unlock();
auto producers = producers_.move();
auto consumers = consumers_.move();
@@ -726,7 +758,7 @@ void ClientImpl::handleClose(Result result, const
SharedInt& numberOfOpenHandler
--(*numberOfOpenHandlers);
}
if (*numberOfOpenHandlers == 0) {
- Lock lock(mutex_);
+ std::unique_lock lock(mutex_);
if (state_ == Closed) {
LOG_DEBUG("Client is already shutting down, possible race
condition in handleClose");
return;
@@ -776,7 +808,9 @@ void ClientImpl::shutdown() {
<< " consumers have been shutdown.");
}
+ std::shared_lock lock(mutex_);
lookupServicePtr_->close();
+ lock.unlock();
if (!pool_.close()) {
// pool_ has already been closed. It means shutdown() has been called
before.
return;
@@ -805,15 +839,9 @@ void ClientImpl::shutdown() {
lookupCount_ = 0;
}
-uint64_t ClientImpl::newProducerId() {
- Lock lock(mutex_);
- return producerIdGenerator_++;
-}
+uint64_t ClientImpl::newProducerId() { return producerIdGenerator_++; }
-uint64_t ClientImpl::newConsumerId() {
- Lock lock(mutex_);
- return consumerIdGenerator_++;
-}
+uint64_t ClientImpl::newConsumerId() { return consumerIdGenerator_++; }
uint64_t ClientImpl::newRequestId() { return (*requestIdGenerator_)++; }
@@ -854,4 +882,28 @@ std::chrono::nanoseconds
ClientImpl::getOperationTimeout(const ClientConfigurati
return clientConfiguration.impl_->operationTimeout;
}
+void ClientImpl::updateServiceInfo(ServiceInfo&& serviceInfo) {
+ std::unique_lock lock{mutex_};
+ if (state_ != Open) {
+ LOG_ERROR("Client is not open, cannot update service info");
+ return;
+ }
+
+ serviceInfo_.store(std::make_shared<const ServiceInfo>(serviceInfo));
+ pool_.closeAllConnectionsForNewCluster();
+ if (lookupServicePtr_) {
+ lookupServicePtr_->close();
+ }
+ lookupServicePtr_ = createLookup(serviceInfo);
+
+ for (auto&& it : redirectedClusterLookupServicePtrs_) {
+ it.second->close();
+ }
+ redirectedClusterLookupServicePtrs_.clear();
+ useProxy_ = false;
+ lookupCount_ = 0;
+}
+
+ServiceInfo ClientImpl::getServiceInfo() const { return
*(serviceInfo_.load()); }
+
} /* namespace pulsar */
diff --git a/lib/ClientImpl.h b/lib/ClientImpl.h
index 0b4d596..7772b15 100644
--- a/lib/ClientImpl.h
+++ b/lib/ClientImpl.h
@@ -20,14 +20,20 @@
#define LIB_CLIENTIMPL_H_
#include <pulsar/Client.h>
+#include <pulsar/ServiceInfo.h>
+#include <pulsar/ServiceInfoProvider.h>
#include <atomic>
#include <cstdint>
#include <memory>
+#include <mutex>
+#include <shared_mutex>
+#include "AtomicSharedPtr.h"
#include "ConnectionPool.h"
#include "Future.h"
#include "LookupDataResult.h"
+#include "LookupService.h"
#include "MemoryLimitController.h"
#include "ProtoApiEnums.h"
#include "SynchronizedHashMap.h"
@@ -52,10 +58,8 @@ typedef std::weak_ptr<ConsumerImplBase>
ConsumerImplBaseWeakPtr;
class ClientConnection;
using ClientConnectionPtr = std::shared_ptr<ClientConnection>;
-class LookupService;
-using LookupServicePtr = std::shared_ptr<LookupService>;
-using LookupServiceFactory = std::function<LookupServicePtr(const
std::string&, const ClientConfiguration&,
- ConnectionPool&
pool, const AuthenticationPtr&)>;
+using LookupServiceFactory = std::function<LookupServicePtr(
+ ServiceInfo&& serviceInfo, const ClientConfiguration&, ConnectionPool&
pool)>;
class ProducerImplBase;
using ProducerImplBaseWeakPtr = std::weak_ptr<ProducerImplBase>;
@@ -71,12 +75,17 @@ std::string generateRandomName();
class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
public:
+ ClientImpl(std::unique_ptr<ServiceInfoProvider> serviceInfoProvider,
+ const ClientConfiguration& clientConfiguration);
+ ClientImpl(std::unique_ptr<ServiceInfoProvider> serviceInfoProvider,
+ const ClientConfiguration& clientConfiguration,
LookupServiceFactory&& lookupServiceFactory);
ClientImpl(const std::string& serviceUrl, const ClientConfiguration&
clientConfiguration);
// only for tests
ClientImpl(const std::string& serviceUrl, const ClientConfiguration&
clientConfiguration,
LookupServiceFactory&& lookupServiceFactory);
+ void initialize();
virtual ~ClientImpl();
/**
@@ -128,7 +137,6 @@ class ClientImpl : public
std::enable_shared_from_this<ClientImpl> {
ExecutorServiceProviderPtr getIOExecutorProvider();
ExecutorServiceProviderPtr getListenerExecutorProvider();
ExecutorServiceProviderPtr getPartitionListenerExecutorProvider();
- LookupServicePtr getLookup(const std::string& redirectedClusterURI = "");
void cleanupProducer(ProducerImplBase* address) {
producers_.remove(address); }
@@ -139,6 +147,26 @@ class ClientImpl : public
std::enable_shared_from_this<ClientImpl> {
ConnectionPool& getConnectionPool() noexcept { return pool_; }
uint64_t getLookupCount() { return lookupCount_; }
+ void updateServiceInfo(ServiceInfo&& serviceInfo);
+ ServiceInfo getServiceInfo() const;
+
+ // Since the underlying `lookupServicePtr_` can be modified by
`updateServiceInfo`, we should not expose
+ // it to other classes, otherwise the update might not be visible.
+ auto getPartitionMetadataAsync(const TopicNamePtr& topicName) {
+ std::shared_lock lock(mutex_);
+ return lookupServicePtr_->getPartitionMetadataAsync(topicName);
+ }
+
+ auto getTopicsOfNamespaceAsync(const NamespaceNamePtr& nsName,
CommandGetTopicsOfNamespace_Mode mode) {
+ std::shared_lock lock(mutex_);
+ return lookupServicePtr_->getTopicsOfNamespaceAsync(nsName, mode);
+ }
+
+ auto getSchema(const TopicNamePtr& topicName, const std::string& version =
"") {
+ std::shared_lock lock(mutex_);
+ return lookupServicePtr_->getSchema(topicName, version);
+ }
+
static std::chrono::nanoseconds getOperationTimeout(const
ClientConfiguration& clientConfiguration);
friend class PulsarFriend;
@@ -177,7 +205,17 @@ class ClientImpl : public
std::enable_shared_from_this<ClientImpl> {
const std::string& getPhysicalAddress(const std::string&
redirectedClusterURI,
const std::string& logicalAddress);
- LookupServicePtr createLookup(const std::string& serviceUrl);
+ // This overload is only used for blue-green migration, where only the
service URL is modified, the other
+ // parameters remain the same
+ LookupServicePtr createRedirectedLookup(const std::string& redirectedUrl) {
+ auto serviceInfo = serviceInfo_.load();
+ return createLookup(
+ ServiceInfo{redirectedUrl, serviceInfo->authentication(),
serviceInfo->tlsTrustCertsFilePath()});
+ }
+
+ LookupServicePtr createLookup(ServiceInfo serviceInfo);
+
+ LookupServicePtr getLookup(const std::string& redirectedClusterURI);
static std::string getClientVersion(const ClientConfiguration&
clientConfiguration);
@@ -188,10 +226,12 @@ class ClientImpl : public
std::enable_shared_from_this<ClientImpl> {
Closed
};
- std::mutex mutex_;
+ std::unique_ptr<ServiceInfoProvider> serviceInfoProvider_;
+ mutable std::shared_mutex mutex_;
State state_;
ClientConfiguration clientConfiguration_;
+ AtomicSharedPtr<ServiceInfo> serviceInfo_;
MemoryLimitController memoryLimitController_;
ExecutorServiceProviderPtr ioExecutorProvider_;
@@ -202,8 +242,8 @@ class ClientImpl : public
std::enable_shared_from_this<ClientImpl> {
std::unordered_map<std::string, LookupServicePtr>
redirectedClusterLookupServicePtrs_;
ConnectionPool pool_;
- uint64_t producerIdGenerator_;
- uint64_t consumerIdGenerator_;
+ std::atomic_uint64_t producerIdGenerator_;
+ std::atomic_uint64_t consumerIdGenerator_;
std::shared_ptr<std::atomic<uint64_t>>
requestIdGenerator_{std::make_shared<std::atomic<uint64_t>>(0)};
SynchronizedHashMap<ProducerImplBase*, ProducerImplBaseWeakPtr> producers_;
diff --git a/lib/ConnectionPool.cc b/lib/ConnectionPool.cc
index c814cf8..6465ff7 100644
--- a/lib/ConnectionPool.cc
+++ b/lib/ConnectionPool.cc
@@ -38,12 +38,13 @@ DECLARE_LOG_OBJECT()
namespace pulsar {
-ConnectionPool::ConnectionPool(const ClientConfiguration& conf,
+ConnectionPool::ConnectionPool(const AtomicSharedPtr<ServiceInfo>& serviceInfo,
+ const ClientConfiguration& conf,
const ExecutorServiceProviderPtr&
executorProvider,
- const AuthenticationPtr& authentication, const
std::string& clientVersion)
- : clientConfiguration_(conf),
+ const std::string& clientVersion)
+ : serviceInfo_(serviceInfo),
+ clientConfiguration_(conf),
executorProvider_(executorProvider),
- authentication_(authentication),
clientVersion_(clientVersion),
randomDistribution_(0, conf.getConnectionsPerBroker() - 1),
randomEngine_(std::chrono::high_resolution_clock::now().time_since_epoch().count())
{}
@@ -54,19 +55,8 @@ bool ConnectionPool::close() {
return false;
}
- std::vector<ClientConnectionPtr> connectionsToClose;
- // ClientConnection::close() will remove the connection from the pool,
which is not allowed when iterating
- // over a map, so we store the connections to close in a vector first and
don't iterate the pool when
- // closing the connections.
- std::unique_lock<std::recursive_mutex> lock(mutex_);
- connectionsToClose.reserve(pool_.size());
- for (auto&& kv : pool_) {
- connectionsToClose.emplace_back(kv.second);
- }
- pool_.clear();
- lock.unlock();
-
- for (auto&& cnx : connectionsToClose) {
+ for (auto&& kv : releaseConnections()) {
+ auto& cnx = kv.second;
if (cnx) {
// Close with a fatal error to not let client retry
auto& future = cnx->close(ResultAlreadyClosed);
@@ -94,6 +84,12 @@ bool ConnectionPool::close() {
return true;
}
+void ConnectionPool::closeAllConnectionsForNewCluster() {
+ for (auto&& kv : releaseConnections()) {
+ kv.second->close(ResultDisconnected, true);
+ }
+}
+
static const std::string getKey(const std::string& logicalAddress, const
std::string& physicalAddress,
size_t keySuffix) {
std::stringstream ss;
@@ -134,9 +130,9 @@ Future<Result, ClientConnectionWeakPtr>
ConnectionPool::getConnectionAsync(const
// No valid or pending connection found in the pool, creating a new one
ClientConnectionPtr cnx;
try {
- cnx.reset(new ClientConnection(logicalAddress, physicalAddress,
executorProvider_->get(keySuffix),
- clientConfiguration_, authentication_,
clientVersion_, *this,
- keySuffix));
+ cnx.reset(new ClientConnection(logicalAddress, physicalAddress,
*serviceInfo_.load(),
+ executorProvider_->get(keySuffix),
clientConfiguration_,
+ clientVersion_, *this, keySuffix));
} catch (Result result) {
Promise<Result, ClientConnectionWeakPtr> promise;
promise.setFailed(result);
diff --git a/lib/ConnectionPool.h b/lib/ConnectionPool.h
index 0e3a6d0..f828ac6 100644
--- a/lib/ConnectionPool.h
+++ b/lib/ConnectionPool.h
@@ -21,6 +21,7 @@
#include <pulsar/ClientConfiguration.h>
#include <pulsar/Result.h>
+#include <pulsar/ServiceInfo.h>
#include <pulsar/defines.h>
#include <atomic>
@@ -31,6 +32,7 @@
#include <string>
#include "Future.h"
+#include "lib/AtomicSharedPtr.h"
namespace pulsar {
class ClientConnection;
@@ -41,8 +43,8 @@ using ExecutorServiceProviderPtr =
std::shared_ptr<ExecutorServiceProvider>;
class PULSAR_PUBLIC ConnectionPool {
public:
- ConnectionPool(const ClientConfiguration& conf, const
ExecutorServiceProviderPtr& executorProvider,
- const AuthenticationPtr& authentication, const std::string&
clientVersion);
+ ConnectionPool(const AtomicSharedPtr<ServiceInfo>& serviceInfo, const
ClientConfiguration& conf,
+ const ExecutorServiceProviderPtr& executorProvider, const
std::string& clientVersion);
/**
* Close the connection pool.
@@ -51,6 +53,12 @@ class PULSAR_PUBLIC ConnectionPool {
*/
bool close();
+ /**
+ * Close all existing connections and notify the connection that a new
cluster will be used.
+ * Unlike close(), the pool remains open for new connections.
+ */
+ void closeAllConnectionsForNewCluster();
+
void remove(const std::string& logicalAddress, const std::string&
physicalAddress, size_t keySuffix,
ClientConnection* value);
@@ -90,9 +98,9 @@ class PULSAR_PUBLIC ConnectionPool {
size_t generateRandomIndex() { return randomDistribution_(randomEngine_); }
private:
+ const AtomicSharedPtr<ServiceInfo>& serviceInfo_;
ClientConfiguration clientConfiguration_;
ExecutorServiceProviderPtr executorProvider_;
- AuthenticationPtr authentication_;
typedef std::map<std::string, std::shared_ptr<ClientConnection>> PoolMap;
PoolMap pool_;
const std::string clientVersion_;
@@ -102,6 +110,13 @@ class PULSAR_PUBLIC ConnectionPool {
std::uniform_int_distribution<> randomDistribution_;
std::mt19937 randomEngine_;
+ auto releaseConnections() {
+ decltype(pool_) pool;
+ std::lock_guard lock{mutex_};
+ pool.swap(pool_);
+ return pool;
+ }
+
friend class PulsarFriend;
};
} // namespace pulsar
diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index 757b6e8..026f146 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -125,7 +125,8 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr& client,
const std::string& topic
negativeAcksTracker_(std::make_shared<NegativeAcksTracker>(client,
*this, conf)),
ackGroupingTrackerPtr_(newAckGroupingTracker(topic, conf, client)),
readCompacted_(conf.isReadCompacted()),
- startMessageId_(pulsar::getStartMessageId(startMessageId,
conf.isStartMessageIdInclusive())),
+ startMessageIdFromConfig_(pulsar::getStartMessageId(startMessageId,
conf.isStartMessageIdInclusive())),
+ startMessageId_(startMessageIdFromConfig_),
maxPendingChunkedMessage_(conf.getMaxPendingChunkedMessage()),
autoAckOldestChunkedMessageOnQueueFull_(conf.isAutoAckOldestChunkedMessageOnQueueFull()),
expireTimeOfIncompleteChunkedMessageMs_(conf.getExpireTimeOfIncompleteChunkedMessageMs()),
@@ -1134,6 +1135,20 @@ void ConsumerImpl::messageProcessed(Message& msg, bool
track) {
}
}
+void ConsumerImpl::onClusterSwitching() {
+ {
+ LockGuard lock{mutex_};
+ incomingMessages_.clear();
+ startMessageId_ = startMessageIdFromConfig_;
+ lastDequedMessageId_ = MessageId::earliest();
+ lastMessageIdInBroker_ = MessageId::earliest();
+ seekStatus_ = SeekStatus::NOT_STARTED;
+ lastSeekArg_.reset();
+ }
+ setRedirectedClusterURI("");
+ ackGroupingTrackerPtr_->flushAndClean();
+}
+
/**
* Clear the internal receiver queue and returns the message id of what was
the 1st message in the queue that
* was
diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h
index 0da82a2..6f287aa 100644
--- a/lib/ConsumerImpl.h
+++ b/lib/ConsumerImpl.h
@@ -162,6 +162,8 @@ class ConsumerImpl : public ConsumerImplBase {
void doImmediateAck(const MessageId& msgId, const ResultCallback&
callback, CommandAck_AckType ackType);
void doImmediateAck(const std::set<MessageId>& msgIds, const
ResultCallback& callback);
+ void onClusterSwitching();
+
protected:
// overrided methods from HandlerBase
Future<Result, bool> connectionOpened(const ClientConnectionPtr& cnx)
override;
@@ -266,6 +268,11 @@ class ConsumerImpl : public ConsumerImplBase {
MessageId lastDequedMessageId_{MessageId::earliest()};
MessageId lastMessageIdInBroker_{MessageId::earliest()};
+
+ // When the consumer switches to a new cluster, we should reset
`startMessageId_` to the original value,
+ // otherwise, the message id of the old cluster might be passed in the
Subscribe request on the new
+ // cluster.
+ const optional<MessageId> startMessageIdFromConfig_;
optional<MessageId> startMessageId_;
SeekStatus seekStatus_{SeekStatus::NOT_STARTED};
diff --git a/lib/DefaultServiceInfoProvider.h b/lib/DefaultServiceInfoProvider.h
new file mode 100644
index 0000000..6479bf9
--- /dev/null
+++ b/lib/DefaultServiceInfoProvider.h
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#include <pulsar/ServiceInfoProvider.h>
+
+#include <string>
+
+#include "ClientConfigurationImpl.h"
+
+namespace pulsar {
+
+class DefaultServiceInfoProvider : public ServiceInfoProvider {
+ public:
+ DefaultServiceInfoProvider(const std::string& serviceUrl, const
ClientConfigurationImpl& config)
+ : serviceInfo_(config.toServiceInfo(serviceUrl)) {}
+
+ ServiceInfo initialServiceInfo() override { return
std::move(serviceInfo_); }
+
+ void initialize(std::function<void(ServiceInfo)> onServiceInfoUpdate)
override {}
+
+ private:
+ ServiceInfo serviceInfo_;
+};
+
+} // namespace pulsar
diff --git a/lib/HTTPLookupService.cc b/lib/HTTPLookupService.cc
index 79d9e94..0be9713 100644
--- a/lib/HTTPLookupService.cc
+++ b/lib/HTTPLookupService.cc
@@ -47,18 +47,17 @@ const static std::string ADMIN_PATH_V2 = "/admin/v2/";
const static std::string PARTITION_METHOD_NAME = "partitions";
const static int NUMBER_OF_LOOKUP_THREADS = 1;
-HTTPLookupService::HTTPLookupService(const std::string &serviceUrl,
- const ClientConfiguration
&clientConfiguration,
- const AuthenticationPtr &authData)
+HTTPLookupService::HTTPLookupService(const ServiceInfo &serviceInfo,
+ const ClientConfiguration
&clientConfiguration)
:
executorProvider_(std::make_shared<ExecutorServiceProvider>(NUMBER_OF_LOOKUP_THREADS)),
- serviceNameResolver_(serviceUrl),
- authenticationPtr_(authData),
+ serviceNameResolver_(serviceInfo.serviceUrl()),
+ authenticationPtr_(serviceInfo.authentication()),
lookupTimeoutInSeconds_(clientConfiguration.getOperationTimeoutSeconds()),
maxLookupRedirects_(clientConfiguration.getMaxLookupRedirects()),
tlsPrivateFilePath_(clientConfiguration.getTlsPrivateKeyFilePath()),
tlsCertificateFilePath_(clientConfiguration.getTlsCertificateFilePath()),
- tlsTrustCertsFilePath_(clientConfiguration.getTlsTrustCertsFilePath()),
- isUseTls_(clientConfiguration.isUseTls()),
+ tlsTrustCertsFilePath_(serviceInfo.tlsTrustCertsFilePath().value_or("")),
+ isUseTls_(serviceInfo.useTls()),
tlsAllowInsecure_(clientConfiguration.isTlsAllowInsecureConnection()),
tlsValidateHostname_(clientConfiguration.isValidateHostName()) {}
diff --git a/lib/HTTPLookupService.h b/lib/HTTPLookupService.h
index d17edd5..61a0615 100644
--- a/lib/HTTPLookupService.h
+++ b/lib/HTTPLookupService.h
@@ -19,6 +19,8 @@
#ifndef PULSAR_CPP_HTTPLOOKUPSERVICE_H
#define PULSAR_CPP_HTTPLOOKUPSERVICE_H
+#include <pulsar/ServiceInfo.h>
+
#include <cstdint>
#include "ClientImpl.h"
@@ -67,7 +69,7 @@ class HTTPLookupService : public LookupService, public
std::enable_shared_from_t
Result sendHTTPRequest(const std::string& completeUrl, std::string&
responseData, long& responseCode);
public:
- HTTPLookupService(const std::string&, const ClientConfiguration&, const
AuthenticationPtr&);
+ HTTPLookupService(const ServiceInfo& serviceInfo, const
ClientConfiguration& config);
LookupResultFuture getBroker(const TopicName& topicName) override;
diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc
index 0799eb6..a569978 100644
--- a/lib/MultiTopicsConsumerImpl.cc
+++ b/lib/MultiTopicsConsumerImpl.cc
@@ -44,20 +44,19 @@ using std::chrono::seconds;
MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(const ClientImplPtr& client,
const TopicNamePtr& topicName,
int numPartitions, const
std::string& subscriptionName,
const ConsumerConfiguration&
conf,
- const LookupServicePtr&
lookupServicePtr,
const
ConsumerInterceptorsPtr& interceptors,
Commands::SubscriptionMode
subscriptionMode,
const optional<MessageId>&
startMessageId)
: MultiTopicsConsumerImpl(client, {topicName->toString()},
subscriptionName, topicName, conf,
- lookupServicePtr, interceptors,
subscriptionMode, startMessageId) {
+ interceptors, subscriptionMode, startMessageId) {
topicsPartitions_[topicName->toString()] = numPartitions;
}
MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(
const ClientImplPtr& client, const std::vector<std::string>& topics, const
std::string& subscriptionName,
const TopicNamePtr& topicName, const ConsumerConfiguration& conf,
- const LookupServicePtr& lookupServicePtr, const ConsumerInterceptorsPtr&
interceptors,
- Commands::SubscriptionMode subscriptionMode, const optional<MessageId>&
startMessageId)
+ const ConsumerInterceptorsPtr& interceptors, Commands::SubscriptionMode
subscriptionMode,
+ const optional<MessageId>& startMessageId)
: ConsumerImplBase(client, topicName ? topicName->toString() :
"EmptyTopics",
Backoff(milliseconds(100), seconds(60),
milliseconds(0)), conf,
client->getListenerExecutorProvider()->get()),
@@ -66,7 +65,6 @@ MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(
conf_(conf),
incomingMessages_(conf.getReceiverQueueSize()),
messageListener_(conf.getMessageListener()),
- lookupServicePtr_(lookupServicePtr),
numberTopicPartitions_(std::make_shared<std::atomic<int>>(0)),
topics_(topics),
subscriptionMode_(subscriptionMode),
@@ -93,7 +91,6 @@ MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(
if (partitionsUpdateInterval > 0) {
partitionsUpdateTimer_ = listenerExecutor_->createDeadlineTimer();
partitionsUpdateInterval_ = seconds(partitionsUpdateInterval);
- lookupServicePtr_ = client->getLookup();
}
state_ = Pending;
@@ -185,7 +182,12 @@ Future<Result, Consumer>
MultiTopicsConsumerImpl::subscribeOneTopicAsync(const s
auto entry = topicsPartitions_.find(topic);
if (entry == topicsPartitions_.end()) {
lock.unlock();
- lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener(
+ auto client = client_.lock();
+ if (!client) {
+ topicPromise->setFailed(ResultAlreadyClosed);
+ return topicPromise->getFuture();
+ }
+ client->getPartitionMetadataAsync(topicName).addListener(
[this, topicName, topicPromise](Result result, const
LookupDataResultPtr& lookupDataResult) {
if (result != ResultOk) {
LOG_ERROR("Error Checking/Getting Partition Metadata while
MultiTopics Subscribing- "
@@ -1003,7 +1005,11 @@ void MultiTopicsConsumerImpl::topicPartitionUpdate() {
auto topicName = TopicName::get(item.first);
auto currentNumPartitions = item.second;
auto weakSelf = weak_from_this();
- lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener(
+ auto client = client_.lock();
+ if (!client) {
+ return;
+ }
+ client->getPartitionMetadataAsync(topicName).addListener(
[this, weakSelf, topicName, currentNumPartitions](Result result,
const
LookupDataResultPtr& lookupDataResult) {
auto self = weakSelf.lock();
diff --git a/lib/MultiTopicsConsumerImpl.h b/lib/MultiTopicsConsumerImpl.h
index dc62865..38a44cd 100644
--- a/lib/MultiTopicsConsumerImpl.h
+++ b/lib/MultiTopicsConsumerImpl.h
@@ -46,23 +46,19 @@ class MultiTopicsBrokerConsumerStatsImpl;
using MultiTopicsBrokerConsumerStatsPtr =
std::shared_ptr<MultiTopicsBrokerConsumerStatsImpl>;
class UnAckedMessageTrackerInterface;
using UnAckedMessageTrackerPtr =
std::shared_ptr<UnAckedMessageTrackerInterface>;
-class LookupService;
-using LookupServicePtr = std::shared_ptr<LookupService>;
class MultiTopicsConsumerImpl;
class MultiTopicsConsumerImpl : public ConsumerImplBase {
public:
MultiTopicsConsumerImpl(const ClientImplPtr& client, const TopicNamePtr&
topicName, int numPartitions,
const std::string& subscriptionName, const
ConsumerConfiguration& conf,
- const LookupServicePtr& lookupServicePtr,
const ConsumerInterceptorsPtr& interceptors,
Commands::SubscriptionMode =
Commands::SubscriptionModeDurable,
const optional<MessageId>& startMessageId =
optional<MessageId>{});
MultiTopicsConsumerImpl(const ClientImplPtr& client, const
std::vector<std::string>& topics,
const std::string& subscriptionName, const
TopicNamePtr& topicName,
- const ConsumerConfiguration& conf, const
LookupServicePtr& lookupServicePtr_,
- const ConsumerInterceptorsPtr& interceptors,
+ const ConsumerConfiguration& conf, const
ConsumerInterceptorsPtr& interceptors,
Commands::SubscriptionMode =
Commands::SubscriptionModeDurable,
const optional<MessageId>& startMessageId =
optional<MessageId>{});
@@ -119,7 +115,6 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase {
MessageListener messageListener_;
DeadlineTimerPtr partitionsUpdateTimer_;
TimeDuration partitionsUpdateInterval_;
- LookupServicePtr lookupServicePtr_;
std::shared_ptr<std::atomic<int>> numberTopicPartitions_;
std::atomic<Result> failedResult{ResultOk};
Promise<Result, ConsumerImplBaseWeakPtr>
multiTopicsConsumerCreatedPromise_;
diff --git a/lib/PartitionedProducerImpl.cc b/lib/PartitionedProducerImpl.cc
index 4a92366..1aa5c87 100644
--- a/lib/PartitionedProducerImpl.cc
+++ b/lib/PartitionedProducerImpl.cc
@@ -23,7 +23,6 @@
#include "ClientImpl.h"
#include "ExecutorService.h"
#include "LogUtils.h"
-#include "LookupService.h"
#include "ProducerImpl.h"
#include "RoundRobinMessageRouter.h"
#include "SinglePartitionMessageRouter.h"
@@ -59,7 +58,6 @@ PartitionedProducerImpl::PartitionedProducerImpl(const
ClientImplPtr& client, co
listenerExecutor_ = client->getListenerExecutorProvider()->get();
partitionsUpdateTimer_ = listenerExecutor_->createDeadlineTimer();
partitionsUpdateInterval_ =
std::chrono::seconds(partitionsUpdateInterval);
- lookupServicePtr_ = client->getLookup();
}
}
@@ -433,7 +431,11 @@ void PartitionedProducerImpl::runPartitionUpdateTask() {
void PartitionedProducerImpl::getPartitionMetadata() {
using namespace std::placeholders;
auto weakSelf = weak_from_this();
- lookupServicePtr_->getPartitionMetadataAsync(topicName_)
+ auto client = client_.lock();
+ if (!client) {
+ return;
+ }
+ client->getPartitionMetadataAsync(topicName_)
.addListener([weakSelf](Result result, const LookupDataResultPtr&
lookupDataResult) {
auto self = weakSelf.lock();
if (self) {
diff --git a/lib/PartitionedProducerImpl.h b/lib/PartitionedProducerImpl.h
index 40f2d34..94ba717 100644
--- a/lib/PartitionedProducerImpl.h
+++ b/lib/PartitionedProducerImpl.h
@@ -38,8 +38,6 @@ using ClientImplPtr = std::shared_ptr<ClientImpl>;
using ClientImplWeakPtr = std::weak_ptr<ClientImpl>;
class ExecutorService;
using ExecutorServicePtr = std::shared_ptr<ExecutorService>;
-class LookupService;
-using LookupServicePtr = std::shared_ptr<LookupService>;
class ProducerImpl;
using ProducerImplPtr = std::shared_ptr<ProducerImpl>;
class TopicName;
@@ -133,7 +131,6 @@ class PartitionedProducerImpl : public ProducerImplBase,
ExecutorServicePtr listenerExecutor_;
DeadlineTimerPtr partitionsUpdateTimer_;
TimeDuration partitionsUpdateInterval_;
- LookupServicePtr lookupServicePtr_;
ProducerInterceptorsPtr interceptors_;
diff --git a/lib/PatternMultiTopicsConsumerImpl.cc
b/lib/PatternMultiTopicsConsumerImpl.cc
index fd48fee..4b5aab7 100644
--- a/lib/PatternMultiTopicsConsumerImpl.cc
+++ b/lib/PatternMultiTopicsConsumerImpl.cc
@@ -21,7 +21,6 @@
#include "ClientImpl.h"
#include "ExecutorService.h"
#include "LogUtils.h"
-#include "LookupService.h"
DECLARE_LOG_OBJECT()
@@ -32,10 +31,8 @@ using std::chrono::seconds;
PatternMultiTopicsConsumerImpl::PatternMultiTopicsConsumerImpl(
const ClientImplPtr& client, const std::string& pattern,
CommandGetTopicsOfNamespace_Mode getTopicsMode,
const std::vector<std::string>& topics, const std::string&
subscriptionName,
- const ConsumerConfiguration& conf, const LookupServicePtr&
lookupServicePtr_,
- const ConsumerInterceptorsPtr& interceptors)
- : MultiTopicsConsumerImpl(client, topics, subscriptionName,
TopicName::get(pattern), conf,
- lookupServicePtr_, interceptors),
+ const ConsumerConfiguration& conf, const ConsumerInterceptorsPtr&
interceptors)
+ : MultiTopicsConsumerImpl(client, topics, subscriptionName,
TopicName::get(pattern), conf, interceptors),
patternString_(pattern),
pattern_(PULSAR_REGEX_NAMESPACE::regex(TopicName::removeDomain(pattern))),
getTopicsMode_(getTopicsMode),
@@ -84,7 +81,11 @@ void
PatternMultiTopicsConsumerImpl::autoDiscoveryTimerTask(const ASIO_ERROR& er
// already get namespace from pattern.
assert(namespaceName_);
- lookupServicePtr_->getTopicsOfNamespaceAsync(namespaceName_,
getTopicsMode_)
+ auto client = client_.lock();
+ if (!client) {
+ return;
+ }
+ client->getTopicsOfNamespaceAsync(namespaceName_, getTopicsMode_)
.addListener(std::bind(&PatternMultiTopicsConsumerImpl::timerGetTopicsOfNamespace,
this,
std::placeholders::_1, std::placeholders::_2));
}
diff --git a/lib/PatternMultiTopicsConsumerImpl.h
b/lib/PatternMultiTopicsConsumerImpl.h
index 6352796..796abcc 100644
--- a/lib/PatternMultiTopicsConsumerImpl.h
+++ b/lib/PatternMultiTopicsConsumerImpl.h
@@ -52,7 +52,6 @@ class PatternMultiTopicsConsumerImpl : public
MultiTopicsConsumerImpl {
CommandGetTopicsOfNamespace_Mode
getTopicsMode,
const std::vector<std::string>& topics,
const std::string& subscriptionName, const
ConsumerConfiguration& conf,
- const LookupServicePtr& lookupServicePtr_,
const ConsumerInterceptorsPtr&
interceptors);
~PatternMultiTopicsConsumerImpl() override;
diff --git a/lib/ReaderImpl.cc b/lib/ReaderImpl.cc
index 7fa7e8b..754137c 100644
--- a/lib/ReaderImpl.cc
+++ b/lib/ReaderImpl.cc
@@ -90,7 +90,6 @@ void ReaderImpl::start(const MessageId& startMessageId,
if (partitions_ > 0) {
auto consumerImpl = std::make_shared<MultiTopicsConsumerImpl>(
client_.lock(), TopicName::get(topic_), partitions_, subscription,
consumerConf,
- client_.lock()->getLookup(),
std::make_shared<ConsumerInterceptors>(std::vector<ConsumerInterceptorPtr>()),
Commands::SubscriptionModeNonDurable, startMessageId);
consumer_ = consumerImpl;
diff --git a/lib/ServiceInfo.cc b/lib/ServiceInfo.cc
new file mode 100644
index 0000000..642b39a
--- /dev/null
+++ b/lib/ServiceInfo.cc
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <pulsar/ServiceInfo.h>
+
+#include <utility>
+
+#include "ServiceNameResolver.h"
+#include "ServiceURI.h"
+
+namespace pulsar {
+
+ServiceInfo::ServiceInfo(std::string serviceUrl, AuthenticationPtr
authentication,
+ std::optional<std::string> tlsTrustCertsFilePath)
+ : serviceUrl_(std::move(serviceUrl)),
+ useTls_(ServiceNameResolver::useTls(ServiceURI(serviceUrl_))),
+ authentication_(std::move(authentication)),
+ tlsTrustCertsFilePath_(std::move(tlsTrustCertsFilePath)) {}
+
+} // namespace pulsar
diff --git a/lib/c/c_ClientConfiguration.cc b/lib/c/c_ClientConfiguration.cc
index 483c74e..6b11a2a 100644
--- a/lib/c/c_ClientConfiguration.cc
+++ b/lib/c/c_ClientConfiguration.cc
@@ -115,14 +115,6 @@ void
pulsar_client_configuration_set_logger_t(pulsar_client_configuration_t *con
conf->conf.setLogger(new PulsarCLoggerFactory(logger));
}
-void pulsar_client_configuration_set_use_tls(pulsar_client_configuration_t
*conf, int useTls) {
- conf->conf.setUseTls(useTls);
-}
-
-int pulsar_client_configuration_is_use_tls(pulsar_client_configuration_t
*conf) {
- return conf->conf.isUseTls();
-}
-
void
pulsar_client_configuration_set_validate_hostname(pulsar_client_configuration_t
*conf,
int validateHostName) {
conf->conf.setValidateHostName(validateHostName);
@@ -155,10 +147,6 @@ void
pulsar_client_configuration_set_tls_trust_certs_file_path(pulsar_client_con
conf->conf.setTlsTrustCertsFilePath(tlsTrustCertsFilePath);
}
-const char
*pulsar_client_configuration_get_tls_trust_certs_file_path(pulsar_client_configuration_t
*conf) {
- return conf->conf.getTlsTrustCertsFilePath().c_str();
-}
-
void
pulsar_client_configuration_set_tls_allow_insecure_connection(pulsar_client_configuration_t
*conf,
int
allowInsecure) {
conf->conf.setTlsAllowInsecureConnection(allowInsecure);
diff --git a/perf/PerfConsumer.cc b/perf/PerfConsumer.cc
index 7a707a1..88c4f5d 100644
--- a/perf/PerfConsumer.cc
+++ b/perf/PerfConsumer.cc
@@ -57,7 +57,6 @@ static int64_t currentTimeMillis() {
struct Arguments {
std::string authParams;
std::string authPlugin;
- bool isUseTls;
bool isTlsAllowInsecureConnection;
std::string tlsTrustCertsFilePath;
std::string topic;
@@ -155,7 +154,6 @@ void handleSubscribe(Result result, Consumer consumer,
Latch latch) {
void startPerfConsumer(const Arguments& args) {
ClientConfiguration conf;
- conf.setUseTls(args.isUseTls);
conf.setTlsAllowInsecureConnection(args.isTlsAllowInsecureConnection);
if (!args.tlsTrustCertsFilePath.empty()) {
std::string tlsTrustCertsFilePath(args.tlsTrustCertsFilePath);
@@ -262,9 +260,6 @@ int main(int argc, char** argv) {
("auth-plugin,a",
po::value<std::string>(&args.authPlugin)->default_value(""),
"Authentication plugin class library path") //
- ("use-tls,b", po::value<bool>(&args.isUseTls)->default_value(false),
- "Whether tls connection is used") //
-
("allow-insecure,d",
po::value<bool>(&args.isTlsAllowInsecureConnection)->default_value(true),
"Whether insecure tls connection is allowed") //
diff --git a/perf/PerfProducer.cc b/perf/PerfProducer.cc
index 17e70cd..784c651 100644
--- a/perf/PerfProducer.cc
+++ b/perf/PerfProducer.cc
@@ -47,7 +47,6 @@ typedef std::shared_ptr<pulsar::RateLimiter> RateLimiterPtr;
struct Arguments {
std::string authParams;
std::string authPlugin;
- bool isUseTls;
bool isTlsAllowInsecureConnection;
std::string tlsTrustCertsFilePath;
std::string topic;
@@ -223,9 +222,6 @@ int main(int argc, char** argv) {
("auth-plugin,a",
po::value<std::string>(&args.authPlugin)->default_value(""),
"Authentication plugin class library path") //
- ("use-tls,b", po::value<bool>(&args.isUseTls)->default_value(false),
- "Whether tls connection is used") //
-
("allow-insecure,d",
po::value<bool>(&args.isTlsAllowInsecureConnection)->default_value(true),
"Whether insecure tls connection is allowed") //
@@ -366,7 +362,6 @@ int main(int argc, char** argv) {
pulsar::ClientConfiguration conf;
conf.setConnectionsPerBroker(args.connectionsPerBroker);
conf.setMemoryLimit(args.memoryLimitMb * 1024 * 1024);
- conf.setUseTls(args.isUseTls);
conf.setTlsAllowInsecureConnection(args.isTlsAllowInsecureConnection);
if (!args.tlsTrustCertsFilePath.empty()) {
std::string tlsTrustCertsFilePath(args.tlsTrustCertsFilePath);
diff --git a/tests/AuthTokenTest.cc b/tests/AuthTokenTest.cc
index 84e8572..4bfd808 100644
--- a/tests/AuthTokenTest.cc
+++ b/tests/AuthTokenTest.cc
@@ -42,7 +42,7 @@ static const std::string serviceUrlHttp =
"http://localhost:8080";
static const std::string tokenPath = TOKEN_PATH;
-static std::string getToken() {
+std::string getToken() {
std::ifstream file(tokenPath);
std::string str((std::istreambuf_iterator<char>(file)),
std::istreambuf_iterator<char>());
return str;
diff --git a/tests/BasicEndToEndTest.cc b/tests/BasicEndToEndTest.cc
index d3c6e61..df2c161 100644
--- a/tests/BasicEndToEndTest.cc
+++ b/tests/BasicEndToEndTest.cc
@@ -700,7 +700,8 @@ TEST(BasicEndToEndTest, testConfigurationFile) {
ClientConfiguration config2 = config1;
AuthenticationDataPtr authData;
- ASSERT_EQ(ResultOk, config1.getAuth().getAuthData(authData));
+ Client client(lookupUrl, config1);
+ ASSERT_EQ(ResultOk,
client.getServiceInfo().authentication()->getAuthData(authData));
ASSERT_EQ(100, config2.getOperationTimeoutSeconds());
ASSERT_EQ(10, config2.getIOThreads());
ASSERT_EQ(1, config2.getMessageListenerThreads());
diff --git a/tests/LookupServiceTest.cc b/tests/LookupServiceTest.cc
index 92aa820..53cb76e 100644
--- a/tests/LookupServiceTest.cc
+++ b/tests/LookupServiceTest.cc
@@ -19,6 +19,7 @@
#include <gtest/gtest.h>
#include <pulsar/Authentication.h>
#include <pulsar/Client.h>
+#include <pulsar/ServiceInfo.h>
#include <algorithm>
#include <boost/exception/all.hpp>
@@ -30,6 +31,7 @@
#include "HttpHelper.h"
#include "PulsarFriend.h"
#include "PulsarWrapper.h"
+#include "lib/AtomicSharedPtr.h"
#include "lib/BinaryProtoLookupService.h"
#include "lib/ClientConnection.h"
#include "lib/ConnectionPool.h"
@@ -79,11 +81,12 @@ using namespace pulsar;
TEST(LookupServiceTest, basicLookup) {
ExecutorServiceProviderPtr service =
std::make_shared<ExecutorServiceProvider>(1);
- AuthenticationPtr authData = AuthFactory::Disabled();
std::string url = "pulsar://localhost:6650";
ClientConfiguration conf;
ExecutorServiceProviderPtr
ioExecutorProvider_(std::make_shared<ExecutorServiceProvider>(1));
- ConnectionPool pool_(conf, ioExecutorProvider_, authData, "");
+ AtomicSharedPtr<ServiceInfo> serviceInfo;
+ serviceInfo.store(std::make_shared<const ServiceInfo>(url));
+ ConnectionPool pool_(serviceInfo, conf, ioExecutorProvider_, "");
BinaryProtoLookupService lookupService(url, pool_, conf);
TopicNamePtr topicName = TopicName::get("topic");
@@ -146,24 +149,30 @@ static void testMultiAddresses(LookupService&
lookupService) {
}
TEST(LookupServiceTest, testMultiAddresses) {
- ConnectionPool pool({}, std::make_shared<ExecutorServiceProvider>(1),
AuthFactory::Disabled(), "");
+ AtomicSharedPtr<ServiceInfo> serviceInfo;
+ serviceInfo.store(std::make_shared<const ServiceInfo>(binaryLookupUrl));
+ ConnectionPool pool(serviceInfo, {},
std::make_shared<ExecutorServiceProvider>(1), "");
ClientConfiguration conf;
- BinaryProtoLookupService
binaryLookupService("pulsar://localhost,localhost:9999", pool, conf);
+ BinaryProtoLookupService
binaryLookupService(ServiceInfo{"pulsar://localhost,localhost:9999"}, pool,
+ conf);
testMultiAddresses(binaryLookupService);
// HTTPLookupService calls shared_from_this() internally, we must create a
shared pointer to test
auto httpLookupServicePtr = std::make_shared<HTTPLookupService>(
- "http://localhost,localhost:9999", ClientConfiguration{},
AuthFactory::Disabled());
+ ServiceInfo{"http://localhost,localhost:9999"}, ClientConfiguration{});
testMultiAddresses(*httpLookupServicePtr);
}
TEST(LookupServiceTest, testRetry) {
auto executorProvider = std::make_shared<ExecutorServiceProvider>(1);
- ConnectionPool pool({}, executorProvider, AuthFactory::Disabled(), "");
+ AtomicSharedPtr<ServiceInfo> serviceInfo;
+ serviceInfo.store(std::make_shared<const ServiceInfo>(binaryLookupUrl));
+ ConnectionPool pool(serviceInfo, {}, executorProvider, "");
ClientConfiguration conf;
- auto lookupService = RetryableLookupService::create(
-
std::make_shared<BinaryProtoLookupService>("pulsar://localhost:9999,localhost",
pool, conf),
- std::chrono::seconds(30), executorProvider);
+ auto lookupService =
+
RetryableLookupService::create(std::make_shared<BinaryProtoLookupService>(
+
ServiceInfo{"pulsar://localhost:9999,localhost"}, pool, conf),
+ std::chrono::seconds(30),
executorProvider);
ServiceNameResolver& serviceNameResolver =
lookupService->getServiceNameResolver();
PulsarFriend::setServiceUrlIndex(serviceNameResolver, 0);
@@ -192,13 +201,17 @@ TEST(LookupServiceTest, testRetry) {
TEST(LookupServiceTest, testTimeout) {
auto executorProvider = std::make_shared<ExecutorServiceProvider>(1);
- ConnectionPool pool({}, executorProvider, AuthFactory::Disabled(), "");
+ AtomicSharedPtr<ServiceInfo> serviceInfo;
+ serviceInfo.store(std::make_shared<const ServiceInfo>(binaryLookupUrl));
+ ConnectionPool pool(serviceInfo, {}, executorProvider, "");
ClientConfiguration conf;
constexpr int timeoutInSeconds = 2;
auto lookupService = RetryableLookupService::create(
-
std::make_shared<BinaryProtoLookupService>("pulsar://localhost:9990,localhost:9902,localhost:9904",
- pool, conf),
+ std::make_shared<BinaryProtoLookupService>(
+
ServiceInfo{"pulsar://localhost:9990,localhost:9902,localhost:9904",
AuthFactory::Disabled(),
+ std::nullopt},
+ pool, conf),
std::chrono::seconds(timeoutInSeconds), executorProvider);
auto topicNamePtr = TopicName::get("lookup-service-test-retry");
@@ -259,7 +272,7 @@ TEST_P(LookupServiceTest, basicGetNamespaceTopics) {
ASSERT_EQ(ResultOk, result);
// 2. verify getTopicsOfNamespace by regex mode.
- auto lookupServicePtr =
PulsarFriend::getClientImplPtr(client_)->getLookup();
+ auto lookupServicePtr = PulsarFriend::getClientImplPtr(client_);
auto verifyGetTopics = [&](CommandGetTopicsOfNamespace_Mode mode,
const std::set<std::string>& expectedTopics) {
Future<Result, NamespaceTopicsPtr> getTopicsFuture =
@@ -292,11 +305,8 @@ TEST_P(LookupServiceTest, testGetSchema) {
Producer producer;
ASSERT_EQ(ResultOk, client_.createProducer(topic, producerConfiguration,
producer));
- auto clientImplPtr = PulsarFriend::getClientImplPtr(client_);
- auto lookup = clientImplPtr->getLookup();
-
SchemaInfo schemaInfo;
- auto future = lookup->getSchema(TopicName::get(topic));
+ auto future =
PulsarFriend::getClientImplPtr(client_)->getSchema(TopicName::get(topic));
ASSERT_EQ(ResultOk, future.get(schemaInfo));
ASSERT_EQ(jsonSchema, schemaInfo.getSchema());
ASSERT_EQ(SchemaType::JSON, schemaInfo.getSchemaType());
@@ -310,11 +320,8 @@ TEST_P(LookupServiceTest, testGetSchemaNotFound) {
Producer producer;
ASSERT_EQ(ResultOk, client_.createProducer(topic, producer));
- auto clientImplPtr = PulsarFriend::getClientImplPtr(client_);
- auto lookup = clientImplPtr->getLookup();
-
SchemaInfo schemaInfo;
- auto future = lookup->getSchema(TopicName::get(topic));
+ auto future =
PulsarFriend::getClientImplPtr(client_)->getSchema(TopicName::get(topic));
ASSERT_EQ(ResultTopicNotFound, future.get(schemaInfo));
}
@@ -335,11 +342,8 @@ TEST_P(LookupServiceTest, testGetKeyValueSchema) {
Producer producer;
ASSERT_EQ(ResultOk, client_.createProducer(topic, producerConfiguration,
producer));
- auto clientImplPtr = PulsarFriend::getClientImplPtr(client_);
- auto lookup = clientImplPtr->getLookup();
-
SchemaInfo schemaInfo;
- auto future = lookup->getSchema(TopicName::get(topic));
+ auto future =
PulsarFriend::getClientImplPtr(client_)->getSchema(TopicName::get(topic));
ASSERT_EQ(ResultOk, future.get(schemaInfo));
ASSERT_EQ(keyValueSchema.getSchema(), schemaInfo.getSchema());
ASSERT_EQ(SchemaType::KEY_VALUE, schemaInfo.getSchemaType());
@@ -464,9 +468,9 @@ INSTANTIATE_TEST_SUITE_P(Pulsar, LookupServiceTest,
::testing::Values(binaryLook
class BinaryProtoLookupServiceRedirectTestHelper : public
BinaryProtoLookupService {
public:
- BinaryProtoLookupServiceRedirectTestHelper(const std::string& serviceUrl,
ConnectionPool& pool,
+ BinaryProtoLookupServiceRedirectTestHelper(const ServiceInfo& serviceInfo,
ConnectionPool& pool,
const ClientConfiguration&
clientConfiguration)
- : BinaryProtoLookupService(serviceUrl, pool, clientConfiguration) {}
+ : BinaryProtoLookupService(serviceInfo, pool, clientConfiguration) {}
LookupResultFuture findBroker(const std::string& address, bool
authoritative, const std::string& topic,
size_t redirectCount) {
@@ -476,13 +480,14 @@ class BinaryProtoLookupServiceRedirectTestHelper : public
BinaryProtoLookupServi
TEST(LookupServiceTest, testRedirectionLimit) {
const auto redirect_limit = 5;
- AuthenticationPtr authData = AuthFactory::Disabled();
ClientConfiguration conf;
conf.setMaxLookupRedirects(redirect_limit);
ExecutorServiceProviderPtr
ioExecutorProvider_(std::make_shared<ExecutorServiceProvider>(1));
- ConnectionPool pool_(conf, ioExecutorProvider_, authData, "");
- string url = "pulsar://localhost:6650";
- BinaryProtoLookupServiceRedirectTestHelper lookupService(url, pool_, conf);
+ AtomicSharedPtr<ServiceInfo> serviceInfo;
+ serviceInfo.store(std::make_shared<const ServiceInfo>(binaryLookupUrl));
+ ConnectionPool pool_(serviceInfo, conf, ioExecutorProvider_, "");
+ const ServiceInfo lookupServiceInfo{"pulsar://localhost:6650"};
+ BinaryProtoLookupServiceRedirectTestHelper
lookupService(lookupServiceInfo, pool_, conf);
const auto topicNamePtr = TopicName::get("topic");
for (auto idx = 0; idx < redirect_limit + 5; ++idx) {
@@ -493,8 +498,8 @@ TEST(LookupServiceTest, testRedirectionLimit) {
if (idx <= redirect_limit) {
ASSERT_EQ(ResultOk, result);
- ASSERT_EQ(url, lookupResult.logicalAddress);
- ASSERT_EQ(url, lookupResult.physicalAddress);
+ ASSERT_EQ(lookupServiceInfo.serviceUrl(),
lookupResult.logicalAddress);
+ ASSERT_EQ(lookupServiceInfo.serviceUrl(),
lookupResult.physicalAddress);
} else {
ASSERT_EQ(ResultTooManyLookupRequestException, result);
}
@@ -522,12 +527,12 @@ class MockLookupService : public BinaryProtoLookupService
{
};
TEST(LookupServiceTest, testAfterClientShutdown) {
- auto client = std::make_shared<ClientImpl>("pulsar://localhost:6650",
ClientConfiguration{},
- [](const std::string&
serviceUrl, const ClientConfiguration&,
- ConnectionPool& pool, const
AuthenticationPtr&) {
- return
std::make_shared<MockLookupService>(
- serviceUrl, pool,
ClientConfiguration{});
- });
+ auto client = std::make_shared<ClientImpl>(
+ "pulsar://localhost:6650", ClientConfiguration{},
+ [](const ServiceInfo& serviceInfo, const ClientConfiguration&,
ConnectionPool& pool) {
+ return std::make_shared<MockLookupService>(serviceInfo, pool,
ClientConfiguration{});
+ });
+
std::promise<Result> promise;
client->subscribeAsync("lookup-service-test-after-client-shutdown", "sub",
ConsumerConfiguration{},
[&promise](Result result, const Consumer&) {
promise.set_value(result); });
@@ -545,10 +550,12 @@ TEST(LookupServiceTest, testAfterClientShutdown) {
TEST(LookupServiceTest, testRetryAfterDestroyed) {
auto executorProvider = std::make_shared<ExecutorServiceProvider>(1);
- ConnectionPool pool({}, executorProvider, AuthFactory::Disabled(), "");
+ AtomicSharedPtr<ServiceInfo> serviceInfo;
+ serviceInfo.store(std::make_shared<const ServiceInfo>(binaryLookupUrl));
+ ConnectionPool pool(serviceInfo, {}, executorProvider, "");
- auto internalLookupService =
- std::make_shared<MockLookupService>("pulsar://localhost:6650", pool,
ClientConfiguration{});
+ auto internalLookupService =
std::make_shared<MockLookupService>(ServiceInfo{"pulsar://localhost:6650"},
+ pool,
ClientConfiguration{});
auto lookupService =
RetryableLookupService::create(internalLookupService,
std::chrono::seconds(30), executorProvider);
diff --git a/tests/ServiceInfoProviderTest.cc b/tests/ServiceInfoProviderTest.cc
new file mode 100644
index 0000000..82f5f6f
--- /dev/null
+++ b/tests/ServiceInfoProviderTest.cc
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <gtest/gtest.h>
+#include <pulsar/Client.h>
+
+#include <atomic>
+#include <memory>
+#include <mutex>
+#include <optional>
+#include <thread>
+
+#include "PulsarFriend.h"
+#include "WaitUtils.h"
+#include "lib/LogUtils.h"
+
+DECLARE_LOG_OBJECT()
+
+using namespace pulsar;
+using namespace std::chrono_literals;
+
+class ServiceInfoHolder {
+ public:
+ ServiceInfoHolder(ServiceInfo info) : serviceInfo_(std::move(info)) {}
+
+ std::optional<ServiceInfo> getUpdatedValue() {
+ std::lock_guard lock(mutex_);
+ if (!owned_) {
+ return std::nullopt;
+ }
+ owned_ = false;
+ return std::move(serviceInfo_);
+ }
+
+ void updateValue(ServiceInfo info) {
+ std::lock_guard lock(mutex_);
+ serviceInfo_ = std::move(info);
+ owned_ = true;
+ }
+
+ private:
+ ServiceInfo serviceInfo_;
+ bool owned_{true};
+
+ mutable std::mutex mutex_;
+};
+
+class TestServiceInfoProvider : public ServiceInfoProvider {
+ public:
+ TestServiceInfoProvider(ServiceInfoHolder &serviceInfo) :
serviceInfo_(serviceInfo) {}
+
+ ServiceInfo initialServiceInfo() override { return
serviceInfo_.getUpdatedValue().value(); }
+
+ void initialize(std::function<void(ServiceInfo)> onServiceInfoUpdate)
override {
+ thread_ = std::thread([this, onServiceInfoUpdate] {
+ while (running_) {
+ auto updatedValue = serviceInfo_.getUpdatedValue();
+ if (updatedValue) {
+ onServiceInfoUpdate(std::move(*updatedValue));
+ }
+ // Use a tight wait loop for tests
+ std::this_thread::sleep_for(10ms);
+ }
+ });
+ }
+
+ ~TestServiceInfoProvider() override {
+ running_ = false;
+ if (thread_.joinable()) {
+ thread_.join();
+ }
+ }
+
+ private:
+ std::thread thread_;
+ ServiceInfoHolder &serviceInfo_;
+ std::atomic_bool running_{true};
+ mutable std::mutex mutex_;
+};
+
+TEST(ServiceInfoProviderTest, testSwitchCluster) {
+ extern std::string getToken(); // from tests/AuthTokenTest.cc
+ // Access "private/auth" namespace in cluster 1
+ ServiceInfo info1{"pulsar://localhost:6650",
AuthToken::createWithToken(getToken())};
+ // Access "private/auth" namespace in cluster 2
+ ServiceInfo info2{"pulsar+ssl://localhost:6653",
+ AuthTls::create(TEST_CONF_DIR "/client-cert.pem",
TEST_CONF_DIR "/client-key.pem"),
+ TEST_CONF_DIR "/hn-verification/cacert.pem"};
+ // Access "public/default" namespace in cluster 1, which doesn't require
authentication
+ ServiceInfo info3{"pulsar://localhost:6650"};
+
+ ServiceInfoHolder serviceInfo{info1};
+ auto client =
Client::create(std::make_unique<TestServiceInfoProvider>(serviceInfo), {});
+
+ const auto topicRequiredAuth = "private/auth/testUpdateConnectionInfo-" +
std::to_string(time(nullptr));
+ Producer producer;
+ ASSERT_EQ(ResultOk, client.createProducer(topicRequiredAuth, producer));
+
+ Reader reader;
+ ASSERT_EQ(ResultOk, client.createReader(topicRequiredAuth,
MessageId::earliest(), {}, reader));
+
+ auto sendAndReceive = [&](const std::string &value) {
+ MessageId msgId;
+ ASSERT_EQ(ResultOk,
producer.send(MessageBuilder().setContent(value).build(), msgId));
+ LOG_INFO("Sent " << value << " to " << msgId);
+
+ Message msg;
+ ASSERT_EQ(ResultOk, reader.readNext(msg, 3000));
+ LOG_INFO("Read " << msg.getDataAsString() << " from " << msgId);
+ ASSERT_EQ(value, msg.getDataAsString());
+ };
+
+ sendAndReceive("msg-0");
+
+ // Switch to cluster 2 (started by
./build-support/start-mim-test-service-inside-container.sh)
+ ASSERT_FALSE(PulsarFriend::getConnections(client).empty());
+ serviceInfo.updateValue(info2);
+ ASSERT_TRUE(waitUntil(1s, [&] {
+ return PulsarFriend::getConnections(client).empty() &&
client.getServiceInfo() == info2;
+ }));
+
+ // Now the same will access the same topic in cluster 2
+ sendAndReceive("msg-1");
+
+ // Switch back to cluster 1 without any authentication, the previous
authentication info configured for
+ // cluster 2 will be cleared.
+ ASSERT_FALSE(PulsarFriend::getConnections(client).empty());
+ serviceInfo.updateValue(info3);
+ ASSERT_TRUE(waitUntil(1s, [&] {
+ return PulsarFriend::getConnections(client).empty() &&
client.getServiceInfo() == info3;
+ }));
+
+ const auto topicNoAuth = "testUpdateConnectionInfo-" +
std::to_string(time(nullptr));
+ producer.close();
+ ASSERT_EQ(ResultOk, client.createProducer(topicNoAuth, producer));
+ ASSERT_EQ(ResultOk,
producer.send(MessageBuilder().setContent("msg-2").build()));
+
+ client.close();
+
+ // Verify messages sent to cluster 1 and cluster 2 can be consumed
successfully with correct
+ // authentication info.
+ auto verify = [](Client &client, const std::string &topic, const
std::string &value) {
+ Reader reader;
+ ASSERT_EQ(ResultOk, client.createReader(topic, MessageId::earliest(),
{}, reader));
+ Message msg;
+ ASSERT_EQ(ResultOk, reader.readNext(msg, 3000));
+ ASSERT_EQ(value, msg.getDataAsString());
+ };
+ Client client1{info1.serviceUrl(),
ClientConfiguration().setAuth(info1.authentication())};
+ verify(client1, topicRequiredAuth, "msg-0");
+ client1.close();
+
+ Client client2{info2.serviceUrl(), ClientConfiguration()
+ .setAuth(info2.authentication())
+
.setTlsTrustCertsFilePath(*info2.tlsTrustCertsFilePath())};
+ verify(client2, topicRequiredAuth, "msg-1");
+ client2.close();
+
+ Client client3{info3.serviceUrl()};
+ verify(client3, topicNoAuth, "msg-2");
+ client3.close();
+}