Copilot commented on code in PR #541: URL: https://github.com/apache/pulsar-client-cpp/pull/541#discussion_r2922504670
########## include/pulsar/ServiceInfoProvider.h: ########## @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#ifndef PULSAR_SERVICE_INFO_PROVIDER_H_ +#define PULSAR_SERVICE_INFO_PROVIDER_H_ + +#include <pulsar/ClientConfiguration.h> Review Comment: `ServiceInfoProvider.h` uses `std::function` but doesn’t include `<functional>` directly, and it includes `ClientConfiguration.h` even though no types from that header are referenced here. Please include `<functional>` explicitly and drop the unused `ClientConfiguration.h` include to avoid relying on transitive includes and to reduce header coupling. ```suggestion #include <functional> ``` ########## include/pulsar/Client.h: ########## @@ -68,6 +71,19 @@ class PULSAR_PUBLIC Client { */ Client(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration); + /** + * Create a Pulsar client object using the specified ServiceInfoProvider. + * + * The ServiceInfoProvider is responsible for providing the service information (such as service URL) + * dynamically. For example, if it detects a primary Pulsar service is down, it can switch to a secondary + * service and update the client with the new service information. + * + * When `close` is called, the client will call `ServiceInfoProvider::close` to guarantee the lifetime of + * the provider is properly managed. Review Comment: The docstring says the client will call `ServiceInfoProvider::close`, but `ServiceInfoProvider` only defines a destructor (no `close()` method). This is misleading for API users; please update the comment to describe the actual lifecycle (e.g., that the provider will be destroyed during `Client::close()` / `closeAsync()`). ```suggestion * The Client instance takes ownership of the given ServiceInfoProvider. The provider will be destroyed * as part of the client's shutdown lifecycle, for example when `Client::close()` or `Client::closeAsync()` * is called, ensuring that its lifetime is properly managed. ``` ########## include/pulsar/ServiceInfoProvider.h: ########## @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#ifndef PULSAR_SERVICE_INFO_PROVIDER_H_ +#define PULSAR_SERVICE_INFO_PROVIDER_H_ + +#include <pulsar/ClientConfiguration.h> +#include <pulsar/ServiceInfo.h> + +namespace pulsar { + +class PULSAR_PUBLIC ServiceInfoProvider { + public: + /** + * The destructor will be called when `Client::close()` is invoked, and the provider should stop any + * ongoing work and release the resources in the destructor. + */ + virtual ~ServiceInfoProvider() = default; + + /** + * Get the initial `ServiceInfo` connection for the client. + * This method is called **only once** internally in `Client::create()` to get the initial `ServiceInfo` + * for the client to connect to the Pulsar service. Since it's only called once, it's legal to return a + * moved `ServiceInfo` object to avoid unnecessary copying. + */ + virtual ServiceInfo initialServiceInfo() = 0; + + /** + * Initialize the ServiceInfoProvider. + * + * @param onServiceInfoUpdate the callback to update `client` with the new `ServiceInfo` + * + * Note: the implementation is responsible to invoke `onServiceInfoUpdate` at least once to provide the + * initial `ServiceInfo` for the client. Review Comment: The documentation for `initialServiceInfo()` and `initialize()` is inconsistent with the current usage pattern in this PR: `ClientImpl` obtains the initial `ServiceInfo` via `initialServiceInfo()` and (e.g. `DefaultServiceUrlProvider`) does not call `onServiceInfoUpdate` at all. Either adjust the docs to clarify that `onServiceInfoUpdate` is only for subsequent updates, or change the contract/implementations accordingly. ```suggestion * for the client to connect to the Pulsar service, typically before {@link initialize} is invoked. * Since it's only called once, it's legal to return a moved `ServiceInfo` object to avoid unnecessary * copying. */ virtual ServiceInfo initialServiceInfo() = 0; /** * Initialize the ServiceInfoProvider. * * After the client has obtained the initial `ServiceInfo` via {@link initialServiceInfo}, this method is * called to allow the provider to start any background work (for example, service discovery or watching * configuration changes) and to report subsequent updates to the service information. * * @param onServiceInfoUpdate the callback to deliver updated `ServiceInfo` values to the client after * the initial connection has been established * * Implementations may choose not to invoke `onServiceInfoUpdate` if the `ServiceInfo` never changes. ``` ########## lib/ClientImpl.h: ########## @@ -177,7 +205,17 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> { const std::string& getPhysicalAddress(const std::string& redirectedClusterURI, const std::string& logicalAddress); - LookupServicePtr createLookup(const std::string& serviceUrl); + // This overload is only used for blue-green migration, where only the service URL is modified, the other + // paramters remain the same Review Comment: Typo in comment: `paramters` → `parameters`. ```suggestion // parameters remain the same ``` ########## tests/BasicEndToEndTest.cc: ########## @@ -700,7 +700,8 @@ TEST(BasicEndToEndTest, testConfigurationFile) { ClientConfiguration config2 = config1; AuthenticationDataPtr authData; - ASSERT_EQ(ResultOk, config1.getAuth().getAuthData(authData)); + Client client(lookupUrl, config1); + ASSERT_EQ(ResultOk, client.getServiceInfo().authentication()->getAuthData(authData)); Review Comment: This test now constructs a `Client` just to retrieve auth data, but the client is never closed and will keep background threads/resources alive. Prefer reading auth directly from the configuration (e.g., via `getAuthPtr()`) or ensure the temporary client is closed before the test exits. ########## include/pulsar/ServiceInfo.h: ########## @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#ifndef PULSAR_SERVICE_INFO_H_ +#define PULSAR_SERVICE_INFO_H_ + +#include <pulsar/Authentication.h> + +#include <optional> +#include <string> + +namespace pulsar { + +/** + * ServiceInfo encapsulates the information of a Pulsar service, which is used by the client to connect to the + * service. It includes the service URL, authentication information, and TLS configuration. + */ +class PULSAR_PUBLIC ServiceInfo final { + public: + ServiceInfo(std::string serviceUrl, AuthenticationPtr authentication = AuthFactory::Disabled(), + std::optional<std::string> tlsTrustCertsFilePath = std::nullopt); + + auto& serviceUrl() const noexcept { return serviceUrl_; } + auto useTls() const noexcept { return useTls_; } + auto& authentication() const noexcept { return authentication_; } + auto& tlsTrustCertsFilePath() const noexcept { return tlsTrustCertsFilePath_; } + + bool operator==(const ServiceInfo& other) const noexcept { + return serviceUrl_ == other.serviceUrl_ && useTls_ == other.useTls_ && + authentication_ == other.authentication_ && + tlsTrustCertsFilePath_ == other.tlsTrustCertsFilePath_; + } Review Comment: `ServiceInfo::operator==` compares `authentication_` via `AuthenticationPtr` equality, which is pointer-identity (not credential equivalence). That can make equality checks unexpectedly fail if two equivalent authentication objects are created separately. Consider either documenting this explicitly or removing `authentication_` from `operator==` (or providing a different comparison helper) to avoid surprising semantics for a public API type. ########## lib/DefaultServiceUrlProvider.h: ########## @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#pragma once + +#include <pulsar/ServiceInfoProvider.h> + +#include <string> + +#include "ClientConfigurationImpl.h" + +namespace pulsar { + +class DefaultServiceUrlProvider : public ServiceInfoProvider { + public: + DefaultServiceUrlProvider(const std::string& serviceUrl, const ClientConfigurationImpl& config) + : serviceInfo_(config.toServiceInfo(serviceUrl)) {} + + ServiceInfo initialServiceInfo() override { return std::move(serviceInfo_); } + + void initialize(std::function<void(ServiceInfo)> onServiceInfoUpdate) override {} Review Comment: The class name `DefaultServiceUrlProvider` is now providing a full `ServiceInfo` (URL + auth + TLS trust path), not just a URL. Renaming to something like `DefaultServiceInfoProvider` (and matching the filename) would make the role clearer and avoid confusion with the old `ServiceUrlProvider` terminology. ########## tests/ServiceInfoProviderTest.cc: ########## @@ -0,0 +1,177 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include <gtest/gtest.h> +#include <pulsar/Client.h> + +#include <atomic> +#include <memory> +#include <mutex> +#include <optional> +#include <thread> + +#include "PulsarFriend.h" +#include "WaitUtils.h" +#include "lib/LogUtils.h" + +DECLARE_LOG_OBJECT() + +using namespace pulsar; +using namespace std::chrono_literals; + +class ServiceInfoHolder { + public: + ServiceInfoHolder(ServiceInfo info) : serviceInfo_(std::move(info)) {} + + std::optional<ServiceInfo> getUpdatedValue() { + std::lock_guard lock(mutex_); + if (!owned_) { + return std::nullopt; + } + owned_ = false; + return std::move(serviceInfo_); + } + + void updateValue(ServiceInfo info) { + std::lock_guard lock(mutex_); + serviceInfo_ = std::move(info); + owned_ = true; + } + + private: + ServiceInfo serviceInfo_; + bool owned_{true}; + + mutable std::mutex mutex_; +}; + +class TestServiceInfoProvider : public ServiceInfoProvider { + public: + TestServiceInfoProvider(ServiceInfoHolder &serviceInfo) : serviceInfo_(serviceInfo) {} + + ServiceInfo initialServiceInfo() override { return serviceInfo_.getUpdatedValue().value(); } + + void initialize(std::function<void(ServiceInfo)> onServiceInfoUpdate) override { + thread_ = std::thread([this, onServiceInfoUpdate] { + while (running_) { + auto updatedValue = serviceInfo_.getUpdatedValue(); + if (updatedValue) { + onServiceInfoUpdate(std::move(*updatedValue)); + } + // Use a tight wait loop for tests + std::this_thread::sleep_for(10ms); + } + }); + } + + ~TestServiceInfoProvider() override { + running_ = false; + if (thread_.joinable()) { + thread_.join(); + } + } + + private: + std::thread thread_; + ServiceInfoHolder &serviceInfo_; + std::atomic_bool running_{true}; + mutable std::mutex mutex_; +}; + +TEST(ServiceInfoProviderTest, testSwitchCluster) { + extern std::string getToken(); // from tests/AuthToken.cc + // Access "private/auth" namespace in cluster 1 Review Comment: The comment `// from tests/AuthToken.cc` is outdated/inaccurate (the helper is currently defined in `tests/AuthTokenTest.cc`). Please update the comment (or move the helper to a dedicated shared test utility) to avoid confusion for future readers. ########## perf/PerfProducer.cc: ########## @@ -366,7 +366,6 @@ int main(int argc, char** argv) { pulsar::ClientConfiguration conf; conf.setConnectionsPerBroker(args.connectionsPerBroker); conf.setMemoryLimit(args.memoryLimitMb * 1024 * 1024); - conf.setUseTls(args.isUseTls); conf.setTlsAllowInsecureConnection(args.isTlsAllowInsecureConnection); Review Comment: `--use-tls` is still exposed as a CLI flag, but with `ClientConfiguration::setUseTls()` removed this flag no longer has any effect (TLS is determined by the URL scheme). Please either (a) rewrite `args.serviceURL` to `pulsar+ssl://...` when `--use-tls` is set, or (b) reject the combination with a clear error so users don’t silently run without TLS. ########## lib/ClientConnection.h: ########## @@ -157,10 +158,11 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien * * @param result all pending futures will complete with this result * @param detach remove it from the pool if it's true + * @param switchCluster whether the close is triggered by cluster switching * * `detach` should only be false when the connection pool is closed. Review Comment: The comment says `detach` should only be false when the connection pool is closed, but `ConnectionPool::closeAllConnectionsForNewCluster()` now calls `close(..., detach=false, switchCluster=true)` while keeping the pool open. Please update this comment to reflect the new valid usage so future callers don’t treat it as a hard constraint. ```suggestion * @param detach remove it from the pool if it's true. When false, the connection remains * associated with the pool but is logically closed; this is currently used when the * pool itself is being closed or when switching clusters. * @param switchCluster whether the close is triggered by cluster switching ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
