BewareMyPower commented on code in PR #551:
URL: https://github.com/apache/pulsar-client-cpp/pull/551#discussion_r2923048276


##########
lib/ClientConnection.cc:
##########
@@ -581,73 +576,68 @@ void ClientConnection::tcpConnectAsync() {
     Url service_url;
     std::string hostUrl = isSniProxy_ ? proxyServiceUrl_ : physicalAddress_;
     if (!Url::parse(hostUrl, service_url)) {
-        LOG_ERROR(cnxString_ << "Invalid Url, unable to parse: " << err << " " 
<< err.message());
+        LOG_ERROR(cnxString() << "Invalid Url, unable to parse: " << err << " 
" << err.message());
         close();
         return;
     }
 
     if (service_url.protocol() != "pulsar" && service_url.protocol() != 
"pulsar+ssl") {
-        LOG_ERROR(cnxString_ << "Invalid Url protocol '" << 
service_url.protocol()
-                             << "'. Valid values are 'pulsar' and 
'pulsar+ssl'");
+        LOG_ERROR(cnxString() << "Invalid Url protocol '" << 
service_url.protocol()
+                              << "'. Valid values are 'pulsar' and 
'pulsar+ssl'");
         close();
         return;
     }
 
-    LOG_DEBUG(cnxString_ << "Resolving " << service_url.host() << ":" << 
service_url.port());
+    LOG_DEBUG(cnxString() << "Resolving " << service_url.host() << ":" << 
service_url.port());
 
-    auto weakSelf = weak_from_this();
-    resolver_->async_resolve(service_url.host(), 
std::to_string(service_url.port()),
-                             [weakSelf](auto err, const auto& results) {
-                                 auto self = weakSelf.lock();
-                                 if (self) {
-                                     self->handleResolve(err, results);
-                                 }
-                             });
+    resolver_->async_resolve(
+        service_url.host(), std::to_string(service_url.port()),
+        [this, self{shared_from_this()}](auto err, const auto& results) { 
handleResolve(err, results); });
 }
 
 void ClientConnection::handleResolve(ASIO_ERROR err, const 
tcp::resolver::results_type& results) {
     if (err) {
-        std::string hostUrl = isSniProxy_ ? cnxString_ : proxyServiceUrl_;
+        std::string hostUrl = isSniProxy_ ? cnxString() : proxyServiceUrl_;
         LOG_ERROR(hostUrl << "Resolve error: " << err << " : " << 
err.message());
         close();
         return;
     }
 
     if (!results.empty()) {
-        LOG_DEBUG(cnxString_ << "Resolved " << results.size() << " endpoints");
+        LOG_DEBUG(cnxString() << "Resolved " << results.size() << " 
endpoints");
         for (const auto& entry : results) {
             const auto& ep = entry.endpoint();
-            LOG_DEBUG(cnxString_ << "  " << ep.address().to_string() << ":" << 
ep.port());
+            LOG_DEBUG(cnxString() << "  " << ep.address().to_string() << ":" 
<< ep.port());
         }
     }
 
-    auto weakSelf = weak_from_this();
-    connectTimeoutTask_->setCallback([weakSelf, results = 
tcp::resolver::results_type(results)](
-                                         const PeriodicTask::ErrorCode& ec) {
-        ClientConnectionPtr ptr = weakSelf.lock();
-        if (!ptr) {
-            LOG_DEBUG("Connect timeout callback skipped: connection was 
already destroyed");
-            return;
-        }
-
-        if (ptr->state_ != Ready) {
-            LOG_ERROR(ptr->cnxString_ << "Connection to " << results << " was 
not established in "
-                                      << 
ptr->connectTimeoutTask_->getPeriodMs() << " ms, close the socket");
-            PeriodicTask::ErrorCode err;
-            ptr->socket_->close(err);
-            if (err) {
-                LOG_WARN(ptr->cnxString_ << "Failed to close socket: " << 
err.message());
+    // Acquire the lock to prevent the race:
+    // 1. thread 1: isClosed() returns false
+    // 2. thread 2: call `connectTimeoutTask_->stop()` and 
`connectTimeoutTask_.reset()` in `close()`
+    // 3. thread 1: call `connectTimeoutTask_->setCallback()` and 
`connectTimeoutTask_->start()`
+    // Then the self captured in the callback of `connectTimeoutTask_` would 
be kept alive unexpectedly and
+    // cannot be cancelled until the executor is destroyed.
+    std::lock_guard lock{mutex_};
+    if (isClosed() || !connectTimeoutTask_) {
+        return;
+    }
+    connectTimeoutTask_->setCallback(
+        [this, self{shared_from_this()},
+         results = tcp::resolver::results_type(results)](const 
PeriodicTask::ErrorCode& ec) {
+            if (state_ != Ready) {
+                LOG_ERROR(cnxString() << "Connection to " << results << " was 
not established in "
+                                      << connectTimeoutTask_->getPeriodMs() << 
" ms");
+                close();
+            } else {
+                connectTimeoutTask_->stop();
             }

Review Comment:
   Technically it should exit quickly after `close()`, so the suggested 
optimization here might not really make sense.
   
   Adding the timeout just in case of an extreme case that a connection's 
pending operations are not done.



##########
lib/ConnectionPool.cc:
##########
@@ -54,16 +54,43 @@ bool ConnectionPool::close() {
         return false;
     }
 
+    std::vector<ClientConnectionPtr> connectionsToClose;
+    // ClientConnection::close() will remove the connection from the pool, 
which is not allowed when iterating
+    // over a map, so we store the connections to close in a vector first and 
don't iterate the pool when
+    // closing the connections.
     std::unique_lock<std::recursive_mutex> lock(mutex_);
+    connectionsToClose.reserve(pool_.size());
+    for (auto&& kv : pool_) {
+        connectionsToClose.emplace_back(kv.second);
+    }
+    pool_.clear();
+    lock.unlock();
 
-    for (auto cnxIt = pool_.begin(); cnxIt != pool_.end(); cnxIt++) {
-        auto& cnx = cnxIt->second;
+    for (auto&& cnx : connectionsToClose) {
         if (cnx) {
-            // The 2nd argument is false because removing a value during the 
iteration will cause segfault
-            cnx->close(ResultDisconnected, false);
+            // Close with a fatal error to not let client retry
+            auto& future = cnx->close(ResultAlreadyClosed);
+            using namespace std::chrono_literals;
+            if (auto status = future.wait_for(5s); status != 
std::future_status::ready) {
+                LOG_WARN("Connection close timed out for " << 
cnx.get()->cnxString());
+            }
+            if (cnx.use_count() > 1) {
+                // There are some asynchronous operations that hold the 
reference on the connection, we should
+                // wait until them to finish. Otherwise, `io_context::stop()` 
will be called in
+                // `ClientImpl::shutdown()` when closing the 
`ExecutorServiceProvider`. Then
+                // `io_context::run()` will return and the `io_context` object 
will be destroyed. In this
+                // case, if there is any pending handler, it will crash.
+                for (int i = 0; i < 500 && cnx.use_count() > 1; i++) {
+                    std::this_thread::sleep_for(10ms);
+                }
+                if (cnx.use_count() > 1) {
+                    LOG_WARN("Connection still has " << (cnx.use_count() - 1)
+                                                     << " references after 
waiting for 5 seconds for "
+                                                     << 
cnx.get()->cnxString());
+                }

Review Comment:
   Technically it should exit quickly after `close()`, so the suggested 
optimization here might not really make sense.
   
   Adding the timeout just in case of an extreme case that a connection's 
pending operations are not done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to