Copilot commented on code in PR #551:
URL: https://github.com/apache/pulsar-client-cpp/pull/551#discussion_r2922515306


##########
tests/ClientTest.cc:
##########
@@ -42,6 +46,81 @@ using testing::AtLeast;
 static std::string lookupUrl = "pulsar://localhost:6650";
 static std::string adminUrl = "http://localhost:8080/";;
 
+namespace {
+
+class SilentTcpServer {
+   public:
+    SilentTcpServer()
+        : acceptor_(ioContext_, ASIO::ip::tcp::endpoint(ASIO::ip::tcp::v4(), 
0)),
+          acceptedFuture_(acceptedPromise_.get_future()) {}
+
+    ~SilentTcpServer() { stop(); }
+
+    int getPort() const { return acceptor_.local_endpoint().port(); }
+
+    void start() {
+        serverThread_ = std::thread([this] {
+            socket_.reset(new ASIO::ip::tcp::socket(ioContext_));
+
+            ASIO_ERROR acceptError;
+            acceptor_.accept(*socket_, acceptError);
+            acceptedPromise_.set_value(acceptError);
+
+            std::unique_lock<std::mutex> lock(mutex_);
+            cond_.wait(lock, [this] { return stopped_; });
+            lock.unlock();
+
+            if (socket_) {
+                ASIO_ERROR closeError;
+                socket_->close(closeError);
+            }
+
+            ASIO_ERROR closeError;
+            acceptor_.close(closeError);
+        });
+    }
+
+    bool waitUntilAccepted(std::chrono::milliseconds timeout) const {
+        return acceptedFuture_.wait_for(timeout) == std::future_status::ready;
+    }
+
+    ASIO_ERROR acceptedError() const { return acceptedFuture_.get(); }
+
+    void stop() {
+        {
+            std::lock_guard<std::mutex> lock(mutex_);
+            if (stopped_) {
+                return;
+            }
+            stopped_ = true;
+        }
+
+        ASIO_ERROR closeError;
+        acceptor_.close(closeError);
+        if (socket_) {
+            socket_->close(closeError);
+        }

Review Comment:
   `SilentTcpServer::stop()` calls `acceptor_.close()` / `socket_->close()` 
from a different thread while the server thread may be blocked in 
`acceptor_.accept()` (and later also closes the same acceptor). Asio objects 
are not safe as shared objects, so this cross-thread close can itself be 
racy/flaky. Consider running accept/close on the same thread (e.g., use 
`async_accept` + `io_context::run()` in `serverThread_`, and 
`ASIO::post`/`dispatch` a shutdown that closes the acceptor/socket), or 
otherwise ensure only the server thread touches the acceptor/socket.



##########
lib/ExecutorService.h:
##########
@@ -62,7 +66,19 @@ class PULSAR_PUBLIC ExecutorService : public 
std::enable_shared_from_this<Execut
     TcpResolverPtr createTcpResolver();
     // throws std::runtime_error if failed
     DeadlineTimerPtr createDeadlineTimer();
-    void postWork(std::function<void(void)> task);
+
+    // Execute the task in the event loop thread asynchronously, i.e. the task 
will be put in the event loop
+    // queue and executed later.
+    template <typename T>
+    void postWork(T &&task) {
+        ASIO::post(io_context_, std::forward<T>(task));
+    }
+
+    // Different from `postWork`, if it's already in the event loop, execute 
the task immediately
+    template <typename T>
+    void dispatch(T &&task) {
+        ASIO::dispatch(io_context_, std::forward<T>(task));
+    }

Review Comment:
   `postWork()` / `dispatch()` use `std::forward`, but this header does not 
include `<utility>`. Relying on transitive includes can break builds on some 
toolchains/standard library combinations. Add `#include <utility>` (and 
similarly anywhere else `std::forward` is used) to make the header 
self-contained.



##########
lib/ExecutorService.h:
##########
@@ -62,7 +66,19 @@ class PULSAR_PUBLIC ExecutorService : public 
std::enable_shared_from_this<Execut
     TcpResolverPtr createTcpResolver();
     // throws std::runtime_error if failed
     DeadlineTimerPtr createDeadlineTimer();
-    void postWork(std::function<void(void)> task);
+
+    // Execute the task in the event loop thread asynchronously, i.e. the task 
will be put in the event loop
+    // queue and executed later.
+    template <typename T>
+    void postWork(T &&task) {
+        ASIO::post(io_context_, std::forward<T>(task));
+    }

Review Comment:
   `ExecutorService::postWork` was changed from a non-template member (with a 
compiled symbol) to a templated inline function. Since `ExecutorService` is 
marked `PULSAR_PUBLIC`, removing the old overload can break ABI for downstream 
code linking against a previously-built shared library. Consider keeping the 
original `void postWork(std::function<void()>)` as an overload (it can forward 
to `ASIO::post`) while adding the templated version for flexibility.



##########
lib/ClientConnection.h:
##########
@@ -297,26 +319,26 @@ class PULSAR_PUBLIC ClientConnection : public 
std::enable_shared_from_this<Clien
     }
 
     template <typename ConstBufferSequence, typename WriteHandler>
-    inline void asyncWrite(const ConstBufferSequence& buffers, WriteHandler 
handler) {
+    inline void asyncWrite(const ConstBufferSequence& buffers, WriteHandler&& 
handler) {
         if (isClosed()) {
             return;
         }
         if (tlsSocket_) {
-            ASIO::async_write(*tlsSocket_, buffers, 
ASIO::bind_executor(strand_, handler));
+            ASIO::async_write(*tlsSocket_, buffers, 
std::forward<WriteHandler>(handler));
         } else {
             ASIO::async_write(*socket_, buffers, handler);
         }

Review Comment:
   `asyncWrite()` takes `WriteHandler&&` but the non-TLS branch passes 
`handler` as an lvalue (`ASIO::async_write(*socket_, ..., handler)`), which 
forces a copy and will fail for move-only handlers. Use 
`std::forward<WriteHandler>(handler)` in both branches. Also, since this uses 
`std::forward`, ensure this header includes `<utility>` instead of relying on 
transitive includes.



##########
lib/ConnectionPool.cc:
##########
@@ -54,16 +54,43 @@ bool ConnectionPool::close() {
         return false;
     }
 
+    std::vector<ClientConnectionPtr> connectionsToClose;
+    // ClientConnection::close() will remove the connection from the pool, 
which is not allowed when iterating
+    // over a map, so we store the connections to close in a vector first and 
don't iterate the pool when
+    // closing the connections.
     std::unique_lock<std::recursive_mutex> lock(mutex_);
+    connectionsToClose.reserve(pool_.size());
+    for (auto&& kv : pool_) {
+        connectionsToClose.emplace_back(kv.second);
+    }
+    pool_.clear();
+    lock.unlock();
 
-    for (auto cnxIt = pool_.begin(); cnxIt != pool_.end(); cnxIt++) {
-        auto& cnx = cnxIt->second;
+    for (auto&& cnx : connectionsToClose) {
         if (cnx) {
-            // The 2nd argument is false because removing a value during the 
iteration will cause segfault
-            cnx->close(ResultDisconnected, false);
+            // Close with a fatal error to not let client retry
+            auto& future = cnx->close(ResultAlreadyClosed);
+            using namespace std::chrono_literals;
+            if (auto status = future.wait_for(5s); status != 
std::future_status::ready) {
+                LOG_WARN("Connection close timed out for " << 
cnx.get()->cnxString());
+            }
+            if (cnx.use_count() > 1) {
+                // There are some asynchronous operations that hold the 
reference on the connection, we should
+                // wait until them to finish. Otherwise, `io_context::stop()` 
will be called in
+                // `ClientImpl::shutdown()` when closing the 
`ExecutorServiceProvider`. Then
+                // `io_context::run()` will return and the `io_context` object 
will be destroyed. In this
+                // case, if there is any pending handler, it will crash.
+                for (int i = 0; i < 500 && cnx.use_count() > 1; i++) {
+                    std::this_thread::sleep_for(10ms);
+                }
+                if (cnx.use_count() > 1) {
+                    LOG_WARN("Connection still has " << (cnx.use_count() - 1)
+                                                     << " references after 
waiting for 5 seconds for "
+                                                     << 
cnx.get()->cnxString());
+                }

Review Comment:
   `ConnectionPool::close()` waits for references to drop by polling 
`cnx.use_count()` with `sleep_for(10ms)` up to 5 seconds per connection. With 
multiple connections this can make shutdown take O(N * 5s) in the worst case 
and still isn’t a precise indicator of “no pending handlers” (other long-lived 
refs may keep the count > 1). Consider replacing this with an explicit “pending 
async ops” counter / completion signal in `ClientConnection`, or applying a 
single global time budget across all connections rather than per-connection 
busy-waiting.



##########
lib/ClientConnection.cc:
##########
@@ -581,73 +576,68 @@ void ClientConnection::tcpConnectAsync() {
     Url service_url;
     std::string hostUrl = isSniProxy_ ? proxyServiceUrl_ : physicalAddress_;
     if (!Url::parse(hostUrl, service_url)) {
-        LOG_ERROR(cnxString_ << "Invalid Url, unable to parse: " << err << " " 
<< err.message());
+        LOG_ERROR(cnxString() << "Invalid Url, unable to parse: " << err << " 
" << err.message());
         close();
         return;
     }
 
     if (service_url.protocol() != "pulsar" && service_url.protocol() != 
"pulsar+ssl") {
-        LOG_ERROR(cnxString_ << "Invalid Url protocol '" << 
service_url.protocol()
-                             << "'. Valid values are 'pulsar' and 
'pulsar+ssl'");
+        LOG_ERROR(cnxString() << "Invalid Url protocol '" << 
service_url.protocol()
+                              << "'. Valid values are 'pulsar' and 
'pulsar+ssl'");
         close();
         return;
     }
 
-    LOG_DEBUG(cnxString_ << "Resolving " << service_url.host() << ":" << 
service_url.port());
+    LOG_DEBUG(cnxString() << "Resolving " << service_url.host() << ":" << 
service_url.port());
 
-    auto weakSelf = weak_from_this();
-    resolver_->async_resolve(service_url.host(), 
std::to_string(service_url.port()),
-                             [weakSelf](auto err, const auto& results) {
-                                 auto self = weakSelf.lock();
-                                 if (self) {
-                                     self->handleResolve(err, results);
-                                 }
-                             });
+    resolver_->async_resolve(
+        service_url.host(), std::to_string(service_url.port()),
+        [this, self{shared_from_this()}](auto err, const auto& results) { 
handleResolve(err, results); });
 }
 
 void ClientConnection::handleResolve(ASIO_ERROR err, const 
tcp::resolver::results_type& results) {
     if (err) {
-        std::string hostUrl = isSniProxy_ ? cnxString_ : proxyServiceUrl_;
+        std::string hostUrl = isSniProxy_ ? cnxString() : proxyServiceUrl_;
         LOG_ERROR(hostUrl << "Resolve error: " << err << " : " << 
err.message());
         close();
         return;
     }
 
     if (!results.empty()) {
-        LOG_DEBUG(cnxString_ << "Resolved " << results.size() << " endpoints");
+        LOG_DEBUG(cnxString() << "Resolved " << results.size() << " 
endpoints");
         for (const auto& entry : results) {
             const auto& ep = entry.endpoint();
-            LOG_DEBUG(cnxString_ << "  " << ep.address().to_string() << ":" << 
ep.port());
+            LOG_DEBUG(cnxString() << "  " << ep.address().to_string() << ":" 
<< ep.port());
         }
     }
 
-    auto weakSelf = weak_from_this();
-    connectTimeoutTask_->setCallback([weakSelf, results = 
tcp::resolver::results_type(results)](
-                                         const PeriodicTask::ErrorCode& ec) {
-        ClientConnectionPtr ptr = weakSelf.lock();
-        if (!ptr) {
-            LOG_DEBUG("Connect timeout callback skipped: connection was 
already destroyed");
-            return;
-        }
-
-        if (ptr->state_ != Ready) {
-            LOG_ERROR(ptr->cnxString_ << "Connection to " << results << " was 
not established in "
-                                      << 
ptr->connectTimeoutTask_->getPeriodMs() << " ms, close the socket");
-            PeriodicTask::ErrorCode err;
-            ptr->socket_->close(err);
-            if (err) {
-                LOG_WARN(ptr->cnxString_ << "Failed to close socket: " << 
err.message());
+    // Acquire the lock to prevent the race:
+    // 1. thread 1: isClosed() returns false
+    // 2. thread 2: call `connectTimeoutTask_->stop()` and 
`connectTimeoutTask_.reset()` in `close()`
+    // 3. thread 1: call `connectTimeoutTask_->setCallback()` and 
`connectTimeoutTask_->start()`
+    // Then the self captured in the callback of `connectTimeoutTask_` would 
be kept alive unexpectedly and
+    // cannot be cancelled until the executor is destroyed.
+    std::lock_guard lock{mutex_};
+    if (isClosed() || !connectTimeoutTask_) {
+        return;
+    }
+    connectTimeoutTask_->setCallback(
+        [this, self{shared_from_this()},
+         results = tcp::resolver::results_type(results)](const 
PeriodicTask::ErrorCode& ec) {
+            if (state_ != Ready) {
+                LOG_ERROR(cnxString() << "Connection to " << results << " was 
not established in "
+                                      << connectTimeoutTask_->getPeriodMs() << 
" ms");
+                close();
+            } else {
+                connectTimeoutTask_->stop();
             }

Review Comment:
   The connect-timeout callback set in `handleResolve()` dereferences 
`connectTimeoutTask_` (`getPeriodMs()` / `stop()`) without holding `mutex_`, 
but `close()` and `handlePulsarConnected()` now reset `connectTimeoutTask_` 
under the mutex. This creates a race where the timer callback can run 
concurrently with `connectTimeoutTask_.reset()` and crash on a null pointer. A 
safer approach is to capture the current `connectTimeoutTask_` (shared_ptr) or 
`periodMs` by value in the callback and operate on that captured object/value 
instead of re-reading the member pointer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to