BewareMyPower commented on code in PR #551:
URL: https://github.com/apache/pulsar-client-cpp/pull/551#discussion_r2923048276


##########
lib/ClientConnection.cc:
##########
@@ -581,73 +576,68 @@ void ClientConnection::tcpConnectAsync() {
     Url service_url;
     std::string hostUrl = isSniProxy_ ? proxyServiceUrl_ : physicalAddress_;
     if (!Url::parse(hostUrl, service_url)) {
-        LOG_ERROR(cnxString_ << "Invalid Url, unable to parse: " << err << " " 
<< err.message());
+        LOG_ERROR(cnxString() << "Invalid Url, unable to parse: " << err << " 
" << err.message());
         close();
         return;
     }
 
     if (service_url.protocol() != "pulsar" && service_url.protocol() != 
"pulsar+ssl") {
-        LOG_ERROR(cnxString_ << "Invalid Url protocol '" << 
service_url.protocol()
-                             << "'. Valid values are 'pulsar' and 
'pulsar+ssl'");
+        LOG_ERROR(cnxString() << "Invalid Url protocol '" << 
service_url.protocol()
+                              << "'. Valid values are 'pulsar' and 
'pulsar+ssl'");
         close();
         return;
     }
 
-    LOG_DEBUG(cnxString_ << "Resolving " << service_url.host() << ":" << 
service_url.port());
+    LOG_DEBUG(cnxString() << "Resolving " << service_url.host() << ":" << 
service_url.port());
 
-    auto weakSelf = weak_from_this();
-    resolver_->async_resolve(service_url.host(), 
std::to_string(service_url.port()),
-                             [weakSelf](auto err, const auto& results) {
-                                 auto self = weakSelf.lock();
-                                 if (self) {
-                                     self->handleResolve(err, results);
-                                 }
-                             });
+    resolver_->async_resolve(
+        service_url.host(), std::to_string(service_url.port()),
+        [this, self{shared_from_this()}](auto err, const auto& results) { 
handleResolve(err, results); });
 }
 
 void ClientConnection::handleResolve(ASIO_ERROR err, const 
tcp::resolver::results_type& results) {
     if (err) {
-        std::string hostUrl = isSniProxy_ ? cnxString_ : proxyServiceUrl_;
+        std::string hostUrl = isSniProxy_ ? cnxString() : proxyServiceUrl_;
         LOG_ERROR(hostUrl << "Resolve error: " << err << " : " << 
err.message());
         close();
         return;
     }
 
     if (!results.empty()) {
-        LOG_DEBUG(cnxString_ << "Resolved " << results.size() << " endpoints");
+        LOG_DEBUG(cnxString() << "Resolved " << results.size() << " 
endpoints");
         for (const auto& entry : results) {
             const auto& ep = entry.endpoint();
-            LOG_DEBUG(cnxString_ << "  " << ep.address().to_string() << ":" << 
ep.port());
+            LOG_DEBUG(cnxString() << "  " << ep.address().to_string() << ":" 
<< ep.port());
         }
     }
 
-    auto weakSelf = weak_from_this();
-    connectTimeoutTask_->setCallback([weakSelf, results = 
tcp::resolver::results_type(results)](
-                                         const PeriodicTask::ErrorCode& ec) {
-        ClientConnectionPtr ptr = weakSelf.lock();
-        if (!ptr) {
-            LOG_DEBUG("Connect timeout callback skipped: connection was 
already destroyed");
-            return;
-        }
-
-        if (ptr->state_ != Ready) {
-            LOG_ERROR(ptr->cnxString_ << "Connection to " << results << " was 
not established in "
-                                      << 
ptr->connectTimeoutTask_->getPeriodMs() << " ms, close the socket");
-            PeriodicTask::ErrorCode err;
-            ptr->socket_->close(err);
-            if (err) {
-                LOG_WARN(ptr->cnxString_ << "Failed to close socket: " << 
err.message());
+    // Acquire the lock to prevent the race:
+    // 1. thread 1: isClosed() returns false
+    // 2. thread 2: call `connectTimeoutTask_->stop()` and 
`connectTimeoutTask_.reset()` in `close()`
+    // 3. thread 1: call `connectTimeoutTask_->setCallback()` and 
`connectTimeoutTask_->start()`
+    // Then the self captured in the callback of `connectTimeoutTask_` would 
be kept alive unexpectedly and
+    // cannot be cancelled until the executor is destroyed.
+    std::lock_guard lock{mutex_};
+    if (isClosed() || !connectTimeoutTask_) {
+        return;
+    }
+    connectTimeoutTask_->setCallback(
+        [this, self{shared_from_this()},
+         results = tcp::resolver::results_type(results)](const 
PeriodicTask::ErrorCode& ec) {
+            if (state_ != Ready) {
+                LOG_ERROR(cnxString() << "Connection to " << results << " was 
not established in "
+                                      << connectTimeoutTask_->getPeriodMs() << 
" ms");
+                close();
+            } else {
+                connectTimeoutTask_->stop();
             }

Review Comment:
   Technically it should exit quickly after `close()`, so the suggested 
optimization here might not really make sense.
   
   Adding the timeout just in case of an extreme case that a connection's 
pending operations are not done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to