This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new e80f65e  Fix crash due to asio object lifetime and thread safety issue 
(#551)
e80f65e is described below

commit e80f65ef6db05eacda6ff02ca8fcba9963a38a93
Author: Yunze Xu <[email protected]>
AuthorDate: Thu Mar 12 17:58:29 2026 +0800

    Fix crash due to asio object lifetime and thread safety issue (#551)
---
 .github/workflows/ci-pr-validation.yaml |   9 +
 lib/ClientConnection.cc                 | 535 +++++++++++++++-----------------
 lib/ClientConnection.h                  |  52 +++-
 lib/ConnectionPool.cc                   |  37 ++-
 lib/ExecutorService.cc                  |   2 -
 lib/ExecutorService.h                   |  19 +-
 lib/PeriodicTask.h                      |   2 +-
 tests/BasicEndToEndTest.cc              |  44 ++-
 tests/ClientTest.cc                     | 107 ++++++-
 tests/MultiTopicsConsumerTest.cc        |   9 +-
 tests/PulsarFriend.h                    |   5 +
 11 files changed, 494 insertions(+), 327 deletions(-)

diff --git a/.github/workflows/ci-pr-validation.yaml 
b/.github/workflows/ci-pr-validation.yaml
index b5a0973..d209b53 100644
--- a/.github/workflows/ci-pr-validation.yaml
+++ b/.github/workflows/ci-pr-validation.yaml
@@ -260,6 +260,15 @@ jobs:
             Pop-Location
           }
 
+      - name: Ensure vcpkg has full history(windows)
+        if: runner.os == 'Windows'
+        shell: pwsh
+        run: |
+          $isShallow = (git -C "${{ env.VCPKG_ROOT }}" rev-parse 
--is-shallow-repository).Trim()
+          if ($isShallow -eq "true") {
+            git -C "${{ env.VCPKG_ROOT }}" fetch --unshallow
+          }
+
       - name: remove system vcpkg(windows)
         if: runner.os == 'Windows'
         run: rm -rf "$VCPKG_INSTALLATION_ROOT"
diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc
index 0a850ed..c373c25 100644
--- a/lib/ClientConnection.cc
+++ b/lib/ClientConnection.cc
@@ -189,13 +189,12 @@ ClientConnection::ClientConnection(const std::string& 
logicalAddress, const std:
       executor_(executor),
       resolver_(executor_->createTcpResolver()),
       socket_(executor_->createSocket()),
-      strand_(ASIO::make_strand(executor_->getIOService().get_executor())),
       logicalAddress_(logicalAddress),
       physicalAddress_(physicalAddress),
-      cnxString_("[<none> -> " + physicalAddress + "] "),
+      cnxStringPtr_(std::make_shared<std::string>("[<none> -> " + 
physicalAddress + "] ")),
       incomingBuffer_(SharedBuffer::allocate(DefaultBufferSize)),
-      connectTimeoutTask_(
-          std::make_shared<PeriodicTask>(*executor_, 
clientConfiguration.getConnectionTimeout())),
+      
connectTimeout_(std::chrono::milliseconds(clientConfiguration.getConnectionTimeout())),
+      connectTimer_(executor_->createDeadlineTimer()),
       outgoingBuffer_(SharedBuffer::allocate(DefaultBufferSize)),
       
keepAliveIntervalInSeconds_(clientConfiguration.getKeepAliveIntervalInSeconds()),
       consumerStatsRequestTimer_(executor_->createDeadlineTimer()),
@@ -203,7 +202,8 @@ ClientConnection::ClientConnection(const std::string& 
logicalAddress, const std:
       clientVersion_(clientVersion),
       pool_(pool),
       poolIndex_(poolIndex) {
-    LOG_INFO(cnxString_ << "Create ClientConnection, timeout=" << 
clientConfiguration.getConnectionTimeout());
+    LOG_INFO(cnxString() << "Create ClientConnection, timeout="
+                         << clientConfiguration.getConnectionTimeout());
     if (!authentication_) {
         LOG_ERROR("Invalid authentication plugin");
         throw ResultAuthenticationError;
@@ -295,12 +295,12 @@ ClientConnection::ClientConnection(const std::string& 
logicalAddress, const std:
 }
 
 ClientConnection::~ClientConnection() {
-    LOG_INFO(cnxString_ << "Destroyed connection to " << logicalAddress_ << 
"-" << poolIndex_);
+    LOG_INFO(cnxString() << "Destroyed connection to " << logicalAddress_ << 
"-" << poolIndex_);
 }
 
 void ClientConnection::handlePulsarConnected(const proto::CommandConnected& 
cmdConnected) {
     if (!cmdConnected.has_server_version()) {
-        LOG_ERROR(cnxString_ << "Server version is not set");
+        LOG_ERROR(cnxString() << "Server version is not set");
         close();
         return;
     }
@@ -314,11 +314,11 @@ void ClientConnection::handlePulsarConnected(const 
proto::CommandConnected& cmdC
     Lock lock(mutex_);
 
     if (isClosed()) {
-        LOG_INFO(cnxString_ << "Connection already closed");
+        LOG_INFO(cnxString() << "Connection already closed");
         return;
     }
+    cancelTimer(*connectTimer_);
     state_ = Ready;
-    connectTimeoutTask_->stop();
     serverProtocolVersion_ = cmdConnected.protocol_version();
 
     if (serverProtocolVersion_ >= proto::v1) {
@@ -326,13 +326,8 @@ void ClientConnection::handlePulsarConnected(const 
proto::CommandConnected& cmdC
         keepAliveTimer_ = executor_->createDeadlineTimer();
         if (keepAliveTimer_) {
             
keepAliveTimer_->expires_after(std::chrono::seconds(keepAliveIntervalInSeconds_));
-            auto weakSelf = weak_from_this();
-            keepAliveTimer_->async_wait([weakSelf](const ASIO_ERROR&) {
-                auto self = weakSelf.lock();
-                if (self) {
-                    self->handleKeepAliveTimeout();
-                }
-            });
+            keepAliveTimer_->async_wait(
+                [this, self{shared_from_this()}](const ASIO_ERROR& err) { 
handleKeepAliveTimeout(err); });
         }
     }
 
@@ -352,12 +347,12 @@ void 
ClientConnection::startConsumerStatsTimer(std::vector<uint64_t> consumerSta
     for (int i = 0; i < consumerStatsRequests.size(); i++) {
         PendingConsumerStatsMap::iterator it = 
pendingConsumerStatsMap_.find(consumerStatsRequests[i]);
         if (it != pendingConsumerStatsMap_.end()) {
-            LOG_DEBUG(cnxString_ << " removing request_id " << it->first
-                                 << " from the pendingConsumerStatsMap_");
+            LOG_DEBUG(cnxString() << " removing request_id " << it->first
+                                  << " from the pendingConsumerStatsMap_");
             consumerStatsPromises.push_back(it->second);
             pendingConsumerStatsMap_.erase(it);
         } else {
-            LOG_DEBUG(cnxString_ << "request_id " << it->first << " already 
fulfilled - not removing it");
+            LOG_DEBUG(cnxString() << "request_id " << it->first << " already 
fulfilled - not removing it");
         }
     }
 
@@ -371,19 +366,16 @@ void 
ClientConnection::startConsumerStatsTimer(std::vector<uint64_t> consumerSta
     // Check if we have a timer still before we set the request timer to pop 
again.
     if (consumerStatsRequestTimer_) {
         consumerStatsRequestTimer_->expires_after(operationsTimeout_);
-        auto weakSelf = weak_from_this();
-        consumerStatsRequestTimer_->async_wait([weakSelf, 
consumerStatsRequests](const ASIO_ERROR& err) {
-            auto self = weakSelf.lock();
-            if (self) {
-                self->handleConsumerStatsTimeout(err, consumerStatsRequests);
-            }
-        });
+        consumerStatsRequestTimer_->async_wait(
+            [this, self{shared_from_this()}, consumerStatsRequests](const 
ASIO_ERROR& err) {
+                handleConsumerStatsTimeout(err, consumerStatsRequests);
+            });
     }
     lock.unlock();
     // Complex logic since promises need to be fulfilled outside the lock
     for (int i = 0; i < consumerStatsPromises.size(); i++) {
         consumerStatsPromises[i].setFailed(ResultTimeout);
-        LOG_WARN(cnxString_ << " Operation timedout, didn't get response from 
broker");
+        LOG_WARN(cnxString() << " Operation timedout, didn't get response from 
broker");
     }
 }
 
@@ -416,23 +408,23 @@ void ClientConnection::handleTcpConnected(const 
ASIO_ERROR& err, const tcp::endp
         try {
             cnxStringStream << "[" << socket_->local_endpoint() << " -> " << 
socket_->remote_endpoint()
                             << "] ";
-            cnxString_ = cnxStringStream.str();
+            std::atomic_store(&cnxStringPtr_, 
std::make_shared<std::string>(cnxStringStream.str()));
         } catch (const ASIO_SYSTEM_ERROR& e) {
             LOG_ERROR("Failed to get endpoint: " << e.what());
             close(ResultRetryable);
             return;
         }
         if (logicalAddress_ == physicalAddress_) {
-            LOG_INFO(cnxString_ << "Connected to broker");
+            LOG_INFO(cnxString() << "Connected to broker");
         } else {
-            LOG_INFO(cnxString_ << "Connected to broker through proxy. Logical 
broker: " << logicalAddress_
-                                << ", proxy: " << proxyServiceUrl_
-                                << ", physical address:" << physicalAddress_);
+            LOG_INFO(cnxString() << "Connected to broker through proxy. 
Logical broker: " << logicalAddress_
+                                 << ", proxy: " << proxyServiceUrl_
+                                 << ", physical address:" << physicalAddress_);
         }
 
         Lock lock(mutex_);
         if (isClosed()) {
-            LOG_INFO(cnxString_ << "Connection already closed");
+            LOG_INFO(cnxString() << "Connection already closed");
             return;
         }
         state_ = TcpConnected;
@@ -441,12 +433,12 @@ void ClientConnection::handleTcpConnected(const 
ASIO_ERROR& err, const tcp::endp
         ASIO_ERROR error;
         socket_->set_option(tcp::no_delay(true), error);
         if (error) {
-            LOG_WARN(cnxString_ << "Socket failed to set tcp::no_delay: " << 
error.message());
+            LOG_WARN(cnxString() << "Socket failed to set tcp::no_delay: " << 
error.message());
         }
 
         socket_->set_option(tcp::socket::keep_alive(true), error);
         if (error) {
-            LOG_WARN(cnxString_ << "Socket failed to set 
tcp::socket::keep_alive: " << error.message());
+            LOG_WARN(cnxString() << "Socket failed to set 
tcp::socket::keep_alive: " << error.message());
         }
 
         // Start TCP keep-alive probes after connection has been idle after 1 
minute. Ideally this
@@ -454,19 +446,19 @@ void ClientConnection::handleTcpConnected(const 
ASIO_ERROR& err, const tcp::endp
         // connection) every 30 seconds
         socket_->set_option(tcp_keep_alive_idle(1 * 60), error);
         if (error) {
-            LOG_DEBUG(cnxString_ << "Socket failed to set tcp_keep_alive_idle: 
" << error.message());
+            LOG_DEBUG(cnxString() << "Socket failed to set 
tcp_keep_alive_idle: " << error.message());
         }
 
         // Send up to 10 probes before declaring the connection broken
         socket_->set_option(tcp_keep_alive_count(10), error);
         if (error) {
-            LOG_DEBUG(cnxString_ << "Socket failed to set 
tcp_keep_alive_count: " << error.message());
+            LOG_DEBUG(cnxString() << "Socket failed to set 
tcp_keep_alive_count: " << error.message());
         }
 
         // Interval between probes: 6 seconds
         socket_->set_option(tcp_keep_alive_interval(6), error);
         if (error) {
-            LOG_DEBUG(cnxString_ << "Socket failed to set 
tcp_keep_alive_interval: " << error.message());
+            LOG_DEBUG(cnxString() << "Socket failed to set 
tcp_keep_alive_interval: " << error.message());
         }
 
         if (tlsSocket_) {
@@ -474,29 +466,28 @@ void ClientConnection::handleTcpConnected(const 
ASIO_ERROR& err, const tcp::endp
                 ASIO_ERROR err;
                 Url service_url;
                 if (!Url::parse(physicalAddress_, service_url)) {
-                    LOG_ERROR(cnxString_ << "Invalid Url, unable to parse: " 
<< err << " " << err.message());
+                    LOG_ERROR(cnxString() << "Invalid Url, unable to parse: " 
<< err << " " << err.message());
                     close();
                     return;
                 }
             }
-            auto weakSelf = weak_from_this();
-            auto socket = socket_;
-            auto tlsSocket = tlsSocket_;
             // socket and ssl::stream objects must exist until async_handshake 
is done, otherwise segmentation
             // fault might happen
-            auto callback = [weakSelf, socket, tlsSocket](const ASIO_ERROR& 
err) {
-                auto self = weakSelf.lock();
-                if (self) {
-                    self->handleHandshake(err);
-                }
-            };
-            tlsSocket_->async_handshake(ASIO::ssl::stream<tcp::socket>::client,
-                                        ASIO::bind_executor(strand_, 
callback));
+            tlsSocket_->async_handshake(
+                ASIO::ssl::stream<tcp::socket>::client,
+                [this, self{shared_from_this()}](const auto& err) { 
handleHandshake(err); });
         } else {
             handleHandshake(ASIO_SUCCESS);
         }
     } else {
-        LOG_ERROR(cnxString_ << "Failed to establish connection to " << 
endpoint << ": " << err.message());
+        LOG_ERROR(cnxString() << "Failed to establish connection to " << 
endpoint << ": " << err.message());
+        {
+            std::lock_guard lock{mutex_};
+            if (isClosed()) {
+                return;
+            }
+            cancelTimer(*connectTimer_);
+        }
         if (err == ASIO::error::operation_aborted) {
             close();
         } else {
@@ -508,10 +499,10 @@ void ClientConnection::handleTcpConnected(const 
ASIO_ERROR& err, const tcp::endp
 void ClientConnection::handleHandshake(const ASIO_ERROR& err) {
     if (err) {
         if (err.value() == ASIO::ssl::error::stream_truncated) {
-            LOG_WARN(cnxString_ << "Handshake failed: " << err.message());
+            LOG_WARN(cnxString() << "Handshake failed: " << err.message());
             close(ResultRetryable);
         } else {
-            LOG_ERROR(cnxString_ << "Handshake failed: " << err.message());
+            LOG_ERROR(cnxString() << "Handshake failed: " << err.message());
             close();
         }
         return;
@@ -524,12 +515,12 @@ void ClientConnection::handleHandshake(const ASIO_ERROR& 
err) {
         buffer = Commands::newConnect(authentication_, logicalAddress_, 
connectingThroughProxy,
                                       clientVersion_, result);
     } catch (const std::exception& e) {
-        LOG_ERROR(cnxString_ << "Failed to create Connect command: " << 
e.what());
+        LOG_ERROR(cnxString() << "Failed to create Connect command: " << 
e.what());
         close(ResultAuthenticationError);
         return;
     }
     if (result != ResultOk) {
-        LOG_ERROR(cnxString_ << "Failed to establish connection: " << result);
+        LOG_ERROR(cnxString() << "Failed to establish connection: " << result);
         close(result);
         return;
     }
@@ -546,7 +537,7 @@ void ClientConnection::handleSentPulsarConnect(const 
ASIO_ERROR& err, const Shar
         return;
     }
     if (err) {
-        LOG_ERROR(cnxString_ << "Failed to establish connection: " << 
err.message());
+        LOG_ERROR(cnxString() << "Failed to establish connection: " << 
err.message());
         close();
         return;
     }
@@ -560,7 +551,7 @@ void ClientConnection::handleSentAuthResponse(const 
ASIO_ERROR& err, const Share
         return;
     }
     if (err) {
-        LOG_WARN(cnxString_ << "Failed to send auth response: " << 
err.message());
+        LOG_WARN(cnxString() << "Failed to send auth response: " << 
err.message());
         close();
         return;
     }
@@ -581,73 +572,65 @@ void ClientConnection::tcpConnectAsync() {
     Url service_url;
     std::string hostUrl = isSniProxy_ ? proxyServiceUrl_ : physicalAddress_;
     if (!Url::parse(hostUrl, service_url)) {
-        LOG_ERROR(cnxString_ << "Invalid Url, unable to parse: " << err << " " 
<< err.message());
+        LOG_ERROR(cnxString() << "Invalid Url, unable to parse: " << err << " 
" << err.message());
         close();
         return;
     }
 
     if (service_url.protocol() != "pulsar" && service_url.protocol() != 
"pulsar+ssl") {
-        LOG_ERROR(cnxString_ << "Invalid Url protocol '" << 
service_url.protocol()
-                             << "'. Valid values are 'pulsar' and 
'pulsar+ssl'");
+        LOG_ERROR(cnxString() << "Invalid Url protocol '" << 
service_url.protocol()
+                              << "'. Valid values are 'pulsar' and 
'pulsar+ssl'");
         close();
         return;
     }
 
-    LOG_DEBUG(cnxString_ << "Resolving " << service_url.host() << ":" << 
service_url.port());
+    LOG_DEBUG(cnxString() << "Resolving " << service_url.host() << ":" << 
service_url.port());
 
-    auto weakSelf = weak_from_this();
-    resolver_->async_resolve(service_url.host(), 
std::to_string(service_url.port()),
-                             [weakSelf](auto err, const auto& results) {
-                                 auto self = weakSelf.lock();
-                                 if (self) {
-                                     self->handleResolve(err, results);
-                                 }
-                             });
+    resolver_->async_resolve(
+        service_url.host(), std::to_string(service_url.port()),
+        [this, self{shared_from_this()}](auto err, const auto& results) { 
handleResolve(err, results); });
 }
 
 void ClientConnection::handleResolve(ASIO_ERROR err, const 
tcp::resolver::results_type& results) {
     if (err) {
-        std::string hostUrl = isSniProxy_ ? cnxString_ : proxyServiceUrl_;
+        std::string hostUrl = isSniProxy_ ? cnxString() : proxyServiceUrl_;
         LOG_ERROR(hostUrl << "Resolve error: " << err << " : " << 
err.message());
         close();
         return;
     }
 
     if (!results.empty()) {
-        LOG_DEBUG(cnxString_ << "Resolved " << results.size() << " endpoints");
+        LOG_DEBUG(cnxString() << "Resolved " << results.size() << " 
endpoints");
         for (const auto& entry : results) {
             const auto& ep = entry.endpoint();
-            LOG_DEBUG(cnxString_ << "  " << ep.address().to_string() << ":" << 
ep.port());
+            LOG_DEBUG(cnxString() << "  " << ep.address().to_string() << ":" 
<< ep.port());
         }
     }
 
-    auto weakSelf = weak_from_this();
-    connectTimeoutTask_->setCallback([weakSelf, results = 
tcp::resolver::results_type(results)](
-                                         const PeriodicTask::ErrorCode& ec) {
-        ClientConnectionPtr ptr = weakSelf.lock();
-        if (!ptr) {
-            LOG_DEBUG("Connect timeout callback skipped: connection was 
already destroyed");
-            return;
-        }
+    std::lock_guard lock{mutex_};
+    if (isClosed()) {
+        return;
+    }
 
-        if (ptr->state_ != Ready) {
-            LOG_ERROR(ptr->cnxString_ << "Connection to " << results << " was 
not established in "
-                                      << 
ptr->connectTimeoutTask_->getPeriodMs() << " ms, close the socket");
-            PeriodicTask::ErrorCode err;
-            ptr->socket_->close(err);
-            if (err) {
-                LOG_WARN(ptr->cnxString_ << "Failed to close socket: " << 
err.message());
-            }
-        }
-        ptr->connectTimeoutTask_->stop();
-    });
-    connectTimeoutTask_->start();
-    ASIO::async_connect(*socket_, results, [weakSelf](const ASIO_ERROR& err, 
const tcp::endpoint& endpoint) {
-        auto self = weakSelf.lock();
-        if (self) {
-            self->handleTcpConnected(err, endpoint);
+    connectTimer_->expires_after(connectTimeout_);
+    connectTimer_->async_wait([this, results, self{shared_from_this()}](const 
auto& err) {
+        if (err) {
+            return;
         }
+        Lock lock{mutex_};
+        if (!isClosed() && state_ != Ready) {
+            LOG_ERROR(cnxString() << "Connection to " << results << " was not 
established in "
+                                  << connectTimeout_.count() << " ms");
+            lock.unlock();
+            close();
+        }  // else: the connection is closed or already established
     });
+
+    ASIO::async_connect(
+        *socket_, results,
+        [this, self{shared_from_this()}](const ASIO_ERROR& err, const 
tcp::endpoint& endpoint) {
+            handleTcpConnected(err, endpoint);
+        });
 }
 
 void ClientConnection::readNextCommand() {
@@ -668,11 +651,11 @@ void ClientConnection::handleRead(const ASIO_ERROR& err, 
size_t bytesTransferred
 
     if (err || bytesTransferred == 0) {
         if (err == ASIO::error::operation_aborted) {
-            LOG_DEBUG(cnxString_ << "Read operation was canceled: " << 
err.message());
+            LOG_DEBUG(cnxString() << "Read operation was canceled: " << 
err.message());
         } else if (bytesTransferred == 0 || err == ASIO::error::eof) {
-            LOG_DEBUG(cnxString_ << "Server closed the connection: " << 
err.message());
+            LOG_DEBUG(cnxString() << "Server closed the connection: " << 
err.message());
         } else {
-            LOG_ERROR(cnxString_ << "Read operation failed: " << 
err.message());
+            LOG_ERROR(cnxString() << "Read operation failed: " << 
err.message());
         }
         close(ResultDisconnected);
     } else if (bytesTransferred < minReadSize) {
@@ -724,7 +707,7 @@ void ClientConnection::processIncomingBuffer() {
         uint32_t cmdSize = incomingBuffer_.readUnsignedInt();
         proto::BaseCommand incomingCmd;
         if (!incomingCmd.ParseFromArray(incomingBuffer_.data(), cmdSize)) {
-            LOG_ERROR(cnxString_ << "Error parsing protocol buffer command");
+            LOG_ERROR(cnxString() << "Error parsing protocol buffer command");
             close(ResultDisconnected);
             return;
         }
@@ -744,11 +727,11 @@ void ClientConnection::processIncomingBuffer() {
                 // broker entry metadata is present
                 uint32_t brokerEntryMetadataSize = 
incomingBuffer_.readUnsignedInt();
                 if 
(!brokerEntryMetadata.ParseFromArray(incomingBuffer_.data(), 
brokerEntryMetadataSize)) {
-                    LOG_ERROR(cnxString_ << "[consumer id " << 
incomingCmd.message().consumer_id()
-                                         << ", message ledger id "
-                                         << 
incomingCmd.message().message_id().ledgerid() << ", entry id "
-                                         << 
incomingCmd.message().message_id().entryid()
-                                         << "] Error parsing broker entry 
metadata");
+                    LOG_ERROR(cnxString()
+                              << "[consumer id " << 
incomingCmd.message().consumer_id()
+                              << ", message ledger id " << 
incomingCmd.message().message_id().ledgerid()
+                              << ", entry id " << 
incomingCmd.message().message_id().entryid()
+                              << "] Error parsing broker entry metadata");
                     close(ResultDisconnected);
                     return;
                 }
@@ -762,11 +745,11 @@ void ClientConnection::processIncomingBuffer() {
 
             uint32_t metadataSize = incomingBuffer_.readUnsignedInt();
             if (!msgMetadata.ParseFromArray(incomingBuffer_.data(), 
metadataSize)) {
-                LOG_ERROR(cnxString_ << "[consumer id " << 
incomingCmd.message().consumer_id()  //
-                                     << ", message ledger id "
-                                     << 
incomingCmd.message().message_id().ledgerid()  //
-                                     << ", entry id " << 
incomingCmd.message().message_id().entryid()
-                                     << "] Error parsing message metadata");
+                LOG_ERROR(cnxString()
+                          << "[consumer id " << 
incomingCmd.message().consumer_id()                   //
+                          << ", message ledger id " << 
incomingCmd.message().message_id().ledgerid()  //
+                          << ", entry id " << 
incomingCmd.message().message_id().entryid()
+                          << "] Error parsing message metadata");
                 close(ResultDisconnected);
                 return;
             }
@@ -839,8 +822,8 @@ bool ClientConnection::verifyChecksum(SharedBuffer& 
incomingBuffer_, uint32_t& r
 }
 
 void ClientConnection::handleActiveConsumerChange(const 
proto::CommandActiveConsumerChange& change) {
-    LOG_DEBUG(cnxString_ << "Received notification about active consumer 
change, consumer_id: "
-                         << change.consumer_id() << " isActive: " << 
change.is_active());
+    LOG_DEBUG(cnxString() << "Received notification about active consumer 
change, consumer_id: "
+                          << change.consumer_id() << " isActive: " << 
change.is_active());
     Lock lock(mutex_);
     ConsumersMap::iterator it = consumers_.find(change.consumer_id());
     if (it != consumers_.end()) {
@@ -851,19 +834,19 @@ void ClientConnection::handleActiveConsumerChange(const 
proto::CommandActiveCons
             consumer->activeConsumerChanged(change.is_active());
         } else {
             consumers_.erase(change.consumer_id());
-            LOG_DEBUG(cnxString_ << "Ignoring incoming message for already 
destroyed consumer "
-                                 << change.consumer_id());
+            LOG_DEBUG(cnxString() << "Ignoring incoming message for already 
destroyed consumer "
+                                  << change.consumer_id());
         }
     } else {
-        LOG_DEBUG(cnxString_ << "Got invalid consumer Id in " << 
change.consumer_id()
-                             << " -- isActive: " << change.is_active());
+        LOG_DEBUG(cnxString() << "Got invalid consumer Id in " << 
change.consumer_id()
+                              << " -- isActive: " << change.is_active());
     }
 }
 
 void ClientConnection::handleIncomingMessage(const proto::CommandMessage& msg, 
bool isChecksumValid,
                                              proto::BrokerEntryMetadata& 
brokerEntryMetadata,
                                              proto::MessageMetadata& 
msgMetadata, SharedBuffer& payload) {
-    LOG_DEBUG(cnxString_ << "Received a message from the server for consumer: 
" << msg.consumer_id());
+    LOG_DEBUG(cnxString() << "Received a message from the server for consumer: 
" << msg.consumer_id());
 
     Lock lock(mutex_);
     ConsumersMap::iterator it = consumers_.find(msg.consumer_id());
@@ -878,21 +861,21 @@ void ClientConnection::handleIncomingMessage(const 
proto::CommandMessage& msg, b
                                       msgMetadata, payload);
         } else {
             consumers_.erase(msg.consumer_id());
-            LOG_DEBUG(cnxString_ << "Ignoring incoming message for already 
destroyed consumer "
-                                 << msg.consumer_id());
+            LOG_DEBUG(cnxString() << "Ignoring incoming message for already 
destroyed consumer "
+                                  << msg.consumer_id());
         }
     } else {
-        LOG_DEBUG(cnxString_ << "Got invalid consumer Id in "  //
-                             << msg.consumer_id() << " -- msg: " << 
msgMetadata.sequence_id());
+        LOG_DEBUG(cnxString() << "Got invalid consumer Id in "  //
+                              << msg.consumer_id() << " -- msg: " << 
msgMetadata.sequence_id());
     }
 }
 
 void ClientConnection::handleIncomingCommand(BaseCommand& incomingCmd) {
-    LOG_DEBUG(cnxString_ << "Handling incoming command: " << 
Commands::messageType(incomingCmd.type()));
+    LOG_DEBUG(cnxString() << "Handling incoming command: " << 
Commands::messageType(incomingCmd.type()));
 
     switch (state_.load()) {
         case Pending: {
-            LOG_ERROR(cnxString_ << "Connection is not ready yet");
+            LOG_ERROR(cnxString() << "Connection is not ready yet");
             break;
         }
 
@@ -908,7 +891,7 @@ void ClientConnection::handleIncomingCommand(BaseCommand& 
incomingCmd) {
         }
 
         case Disconnected: {
-            LOG_ERROR(cnxString_ << "Connection already disconnected");
+            LOG_ERROR(cnxString() << "Connection already disconnected");
             break;
         }
 
@@ -967,12 +950,12 @@ void ClientConnection::handleIncomingCommand(BaseCommand& 
incomingCmd) {
 
                 case BaseCommand::PING:
                     // Respond to ping request
-                    LOG_DEBUG(cnxString_ << "Replying to ping command");
+                    LOG_DEBUG(cnxString() << "Replying to ping command");
                     sendCommand(Commands::newPong());
                     break;
 
                 case BaseCommand::PONG:
-                    LOG_DEBUG(cnxString_ << "Received response to ping 
message");
+                    LOG_DEBUG(cnxString() << "Received response to ping 
message");
                     break;
 
                 case BaseCommand::AUTH_CHALLENGE:
@@ -1000,7 +983,7 @@ void ClientConnection::handleIncomingCommand(BaseCommand& 
incomingCmd) {
                     break;
 
                 default:
-                    LOG_WARN(cnxString_ << "Received invalid message from 
server");
+                    LOG_WARN(cnxString() << "Received invalid message from 
server");
                     close(ResultDisconnected);
                     break;
             }
@@ -1014,7 +997,7 @@ Future<Result, BrokerConsumerStatsImpl> 
ClientConnection::newConsumerStats(uint6
     Promise<Result, BrokerConsumerStatsImpl> promise;
     if (isClosed()) {
         lock.unlock();
-        LOG_ERROR(cnxString_ << " Client is not connected to the broker");
+        LOG_ERROR(cnxString() << " Client is not connected to the broker");
         promise.setFailed(ResultNotConnected);
         return promise.getFuture();
     }
@@ -1059,18 +1042,14 @@ void ClientConnection::newLookup(const SharedBuffer& 
cmd, uint64_t requestId, co
     requestData.promise = promise;
     requestData.timer = executor_->createDeadlineTimer();
     requestData.timer->expires_after(operationsTimeout_);
-    auto weakSelf = weak_from_this();
-    requestData.timer->async_wait([weakSelf, requestData](const ASIO_ERROR& 
ec) {
-        auto self = weakSelf.lock();
-        if (self) {
-            self->handleLookupTimeout(ec, requestData);
-        }
+    requestData.timer->async_wait([this, self{shared_from_this()}, 
requestData](const ASIO_ERROR& ec) {
+        handleLookupTimeout(ec, requestData);
     });
 
     pendingLookupRequests_.insert(std::make_pair(requestId, requestData));
     numOfPendingLookupRequest_++;
     lock.unlock();
-    LOG_DEBUG(cnxString_ << "Inserted lookup request " << requestType << " 
(req_id: " << requestId << ")");
+    LOG_DEBUG(cnxString() << "Inserted lookup request " << requestType << " 
(req_id: " << requestId << ")");
     sendCommand(cmd);
 }
 
@@ -1079,18 +1058,7 @@ void ClientConnection::sendCommand(const SharedBuffer& 
cmd) {
 
     if (pendingWriteOperations_++ == 0) {
         // Write immediately to socket
-        if (tlsSocket_) {
-            auto weakSelf = weak_from_this();
-            auto callback = [weakSelf, cmd]() {
-                auto self = weakSelf.lock();
-                if (self) {
-                    self->sendCommandInternal(cmd);
-                }
-            };
-            ASIO::post(strand_, callback);
-        } else {
-            sendCommandInternal(cmd);
-        }
+        executor_->dispatch([this, cmd, self{shared_from_this()}] { 
sendCommandInternal(cmd); });
     } else {
         // Queue to send later
         pendingWriteBuffers_.push_back(cmd);
@@ -1122,11 +1090,7 @@ void ClientConnection::sendMessage(const 
std::shared_ptr<SendArguments>& args) {
                                    handleSendPair(err);
                                }));
     };
-    if (tlsSocket_) {
-        ASIO::post(strand_, sendMessageInternal);
-    } else {
-        sendMessageInternal();
-    }
+    executor_->dispatch(sendMessageInternal);
 }
 
 void ClientConnection::handleSend(const ASIO_ERROR& err, const SharedBuffer&) {
@@ -1134,7 +1098,7 @@ void ClientConnection::handleSend(const ASIO_ERROR& err, 
const SharedBuffer&) {
         return;
     }
     if (err) {
-        LOG_WARN(cnxString_ << "Could not send message on connection: " << err 
<< " " << err.message());
+        LOG_WARN(cnxString() << "Could not send message on connection: " << 
err << " " << err.message());
         close(ResultDisconnected);
     } else {
         sendPendingCommands();
@@ -1146,7 +1110,7 @@ void ClientConnection::handleSendPair(const ASIO_ERROR& 
err) {
         return;
     }
     if (err) {
-        LOG_WARN(cnxString_ << "Could not send pair message on connection: " 
<< err << " " << err.message());
+        LOG_WARN(cnxString() << "Could not send pair message on connection: " 
<< err << " " << err.message());
         close(ResultDisconnected);
     } else {
         sendPendingCommands();
@@ -1194,8 +1158,8 @@ Future<Result, ResponseData> 
ClientConnection::sendRequestWithId(const SharedBuf
     if (isClosed()) {
         lock.unlock();
         Promise<Result, ResponseData> promise;
-        LOG_DEBUG(cnxString_ << "Fail " << requestType << "(req_id: " << 
requestId
-                             << ") to a closed connection");
+        LOG_DEBUG(cnxString() << "Fail " << requestType << "(req_id: " << 
requestId
+                              << ") to a closed connection");
         promise.setFailed(ResultNotConnected);
         return promise.getFuture();
     }
@@ -1203,21 +1167,17 @@ Future<Result, ResponseData> 
ClientConnection::sendRequestWithId(const SharedBuf
     PendingRequestData requestData;
     requestData.timer = executor_->createDeadlineTimer();
     requestData.timer->expires_after(operationsTimeout_);
-    auto weakSelf = weak_from_this();
-    requestData.timer->async_wait([weakSelf, requestData](const ASIO_ERROR& 
ec) {
-        auto self = weakSelf.lock();
-        if (self) {
-            self->handleRequestTimeout(ec, requestData);
-        }
+    requestData.timer->async_wait([this, self{shared_from_this()}, 
requestData](const ASIO_ERROR& ec) {
+        handleRequestTimeout(ec, requestData);
     });
 
     pendingRequests_.insert(std::make_pair(requestId, requestData));
     lock.unlock();
 
-    LOG_DEBUG(cnxString_ << "Inserted request " << requestType << " (req_id: " 
<< requestId << ")");
+    LOG_DEBUG(cnxString() << "Inserted request " << requestType << " (req_id: 
" << requestId << ")");
     if (mockingRequests_.load(std::memory_order_acquire)) {
         if (mockServer_ == nullptr) {
-            LOG_WARN(cnxString_ << "Mock server is unexpectedly null when 
processing " << requestType);
+            LOG_WARN(cnxString() << "Mock server is unexpectedly null when 
processing " << requestType);
             sendCommand(cmd);
         } else if (!mockServer_->sendRequest(requestType, requestId)) {
             sendCommand(cmd);
@@ -1231,7 +1191,7 @@ Future<Result, ResponseData> 
ClientConnection::sendRequestWithId(const SharedBuf
 void ClientConnection::handleRequestTimeout(const ASIO_ERROR& ec,
                                             const PendingRequestData& 
pendingRequestData) {
     if (!ec && !pendingRequestData.hasGotResponse->load()) {
-        LOG_WARN(cnxString_ << "Network request timeout to broker, remote: " 
<< physicalAddress_);
+        LOG_WARN(cnxString() << "Network request timeout to broker, remote: " 
<< physicalAddress_);
         pendingRequestData.promise.setFailed(ResultTimeout);
     }
 }
@@ -1239,7 +1199,7 @@ void ClientConnection::handleRequestTimeout(const 
ASIO_ERROR& ec,
 void ClientConnection::handleLookupTimeout(const ASIO_ERROR& ec,
                                            const LookupRequestData& 
pendingRequestData) {
     if (!ec) {
-        LOG_WARN(cnxString_ << "Lookup request timeout to broker, remote: " << 
physicalAddress_);
+        LOG_WARN(cnxString() << "Lookup request timeout to broker, remote: " 
<< physicalAddress_);
         pendingRequestData.promise->setFailed(ResultTimeout);
     }
 }
@@ -1247,22 +1207,22 @@ void ClientConnection::handleLookupTimeout(const 
ASIO_ERROR& ec,
 void ClientConnection::handleGetLastMessageIdTimeout(const ASIO_ERROR& ec,
                                                      const 
ClientConnection::LastMessageIdRequestData& data) {
     if (!ec) {
-        LOG_WARN(cnxString_ << "GetLastMessageId request timeout to broker, 
remote: " << physicalAddress_);
+        LOG_WARN(cnxString() << "GetLastMessageId request timeout to broker, 
remote: " << physicalAddress_);
         data.promise->setFailed(ResultTimeout);
     }
 }
 
-void ClientConnection::handleKeepAliveTimeout() {
-    if (isClosed()) {
+void ClientConnection::handleKeepAliveTimeout(const ASIO_ERROR& ec) {
+    if (isClosed() || ec) {
         return;
     }
 
     if (havePendingPingRequest_) {
-        LOG_WARN(cnxString_ << "Forcing connection to close after keep-alive 
timeout");
+        LOG_WARN(cnxString() << "Forcing connection to close after keep-alive 
timeout");
         close(ResultDisconnected);
     } else {
         // Send keep alive probe to peer
-        LOG_DEBUG(cnxString_ << "Sending ping message");
+        LOG_DEBUG(cnxString() << "Sending ping message");
         havePendingPingRequest_ = true;
         sendCommand(Commands::newPing());
 
@@ -1271,13 +1231,8 @@ void ClientConnection::handleKeepAliveTimeout() {
         Lock lock(mutex_);
         if (keepAliveTimer_) {
             
keepAliveTimer_->expires_after(std::chrono::seconds(keepAliveIntervalInSeconds_));
-            auto weakSelf = weak_from_this();
-            keepAliveTimer_->async_wait([weakSelf](const ASIO_ERROR&) {
-                auto self = weakSelf.lock();
-                if (self) {
-                    self->handleKeepAliveTimeout();
-                }
-            });
+            keepAliveTimer_->async_wait(
+                [this, self{shared_from_this()}](const auto& err) { 
handleKeepAliveTimeout(err); });
         }
         lock.unlock();
     }
@@ -1286,39 +1241,32 @@ void ClientConnection::handleKeepAliveTimeout() {
 void ClientConnection::handleConsumerStatsTimeout(const ASIO_ERROR& ec,
                                                   const std::vector<uint64_t>& 
consumerStatsRequests) {
     if (ec) {
-        LOG_DEBUG(cnxString_ << " Ignoring timer cancelled event, code[" << ec 
<< "]");
+        LOG_DEBUG(cnxString() << " Ignoring timer cancelled event, code[" << 
ec << "]");
         return;
     }
     startConsumerStatsTimer(consumerStatsRequests);
 }
 
-void ClientConnection::close(Result result, bool detach) {
+const std::future<void>& ClientConnection::close(Result result) {
     Lock lock(mutex_);
-    if (isClosed()) {
-        return;
-    }
+    if (closeFuture_) {
+        connectPromise_.setFailed(result);
+        return *closeFuture_;
+    }
+    auto promise = std::make_shared<std::promise<void>>();
+    closeFuture_ = promise->get_future();
+    // The atomic update on state_ guarantees the previous modification on 
closeFuture_ is visible once the
+    // atomic read on state_ returns Disconnected `isClosed()`.
+    // However, it cannot prevent the race like:
+    // 1. thread 1: Check `isClosed()`, which returns false.
+    // 2. thread 2: call `close()`, now, `state_` becomes Disconnected, and 
`closeFuture_` is set.
+    // 3. thread 1: post the `async_write` to the `io_context`,
+    // 4. io thread: call `socket_->close()`
+    // 5. io thread: execute `async_write` on `socket_`, which has been closed
+    // However, even the race happens, it's still safe because all the socket 
operations happen in the same
+    // io thread, the `async_write` operation will simply fail with an error, 
no crash will happen.
     state_ = Disconnected;
 
-    if (socket_) {
-        ASIO_ERROR err;
-        socket_->shutdown(ASIO::socket_base::shutdown_both, err);
-        socket_->close(err);
-        if (err) {
-            LOG_WARN(cnxString_ << "Failed to close socket: " << 
err.message());
-        }
-    }
-    if (tlsSocket_) {
-        ASIO_ERROR err;
-        tlsSocket_->lowest_layer().close(err);
-        if (err) {
-            LOG_WARN(cnxString_ << "Failed to close TLS socket: " << 
err.message());
-        }
-    }
-
-    if (executor_) {
-        executor_.reset();
-    }
-
     // Move the internal fields to process them after `mutex_` was unlocked
     auto consumers = std::move(consumers_);
     auto producers = std::move(producers_);
@@ -1341,21 +1289,37 @@ void ClientConnection::close(Result result, bool 
detach) {
         consumerStatsRequestTimer_.reset();
     }
 
-    if (connectTimeoutTask_) {
-        connectTimeoutTask_->stop();
-    }
-
+    cancelTimer(*connectTimer_);
     lock.unlock();
     int refCount = weak_from_this().use_count();
     if (!isResultRetryable(result)) {
-        LOG_ERROR(cnxString_ << "Connection closed with " << result << " 
(refCnt: " << refCount << ")");
+        LOG_ERROR(cnxString() << "Connection closed with " << result << " 
(refCnt: " << refCount << ")");
     } else {
-        LOG_INFO(cnxString_ << "Connection disconnected (refCnt: " << refCount 
<< ")");
+        LOG_INFO(cnxString() << "Connection disconnected (refCnt: " << 
refCount << ")");
     }
     // Remove the connection from the pool before completing any promise
-    if (detach) {
-        pool_.remove(logicalAddress_, physicalAddress_, poolIndex_, this);
-    }
+    pool_.remove(logicalAddress_, physicalAddress_, poolIndex_, this);
+
+    // Close the socket after removing itself from the pool so that other 
requests won't be able to acquire
+    // this connection after the socket is closed.
+    executor_->dispatch([this, promise, self{shared_from_this()}] {
+        // According to asio document, ip::tcp::socket and ssl::stream are 
unsafe as shared objects, so the
+        // methods must be called within the same implicit or explicit strand.
+        // The implementation of `ExecutorService` guarantees the internal 
`io_context::run()` is only called
+        // in one thread, so we can safely call the socket methods without 
posting to a strand instance.
+        ASIO_ERROR err;
+        socket_->shutdown(ASIO::socket_base::shutdown_both, err);
+        socket_->close(err);
+        if (err) {
+            LOG_WARN(cnxString() << "Failed to close socket: " << 
err.message());
+        }
+        if (tlsSocket_) {
+            auto tlsSocket = tlsSocket_;
+            tlsSocket->async_shutdown([promise, self, tlsSocket](const auto&) 
{ promise->set_value(); });
+        } else {
+            promise->set_value();
+        }
+    });
 
     auto self = shared_from_this();
     for (ProducersMap::iterator it = producers.begin(); it != producers.end(); 
++it) {
@@ -1377,24 +1341,25 @@ void ClientConnection::close(Result result, bool 
detach) {
 
     // Fail all pending requests, all these type are map whose value type 
contains the Promise object
     for (auto& kv : pendingRequests) {
-        kv.second.promise.setFailed(result);
+        kv.second.fail(result);
     }
     for (auto& kv : pendingLookupRequests) {
-        kv.second.promise->setFailed(result);
+        kv.second.fail(result);
     }
     for (auto& kv : pendingConsumerStatsMap) {
-        LOG_ERROR(cnxString_ << " Closing Client Connection, please try again 
later");
+        LOG_ERROR(cnxString() << " Closing Client Connection, please try again 
later");
         kv.second.setFailed(result);
     }
     for (auto& kv : pendingGetLastMessageIdRequests) {
-        kv.second.promise->setFailed(result);
+        kv.second.fail(result);
     }
     for (auto& kv : pendingGetNamespaceTopicsRequests) {
         kv.second.setFailed(result);
     }
     for (auto& kv : pendingGetSchemaRequests) {
-        kv.second.promise.setFailed(result);
+        kv.second.fail(result);
     }
+    return *closeFuture_;
 }
 
 bool ClientConnection::isClosed() const { return state_ == Disconnected; }
@@ -1425,8 +1390,6 @@ void ClientConnection::removeConsumer(int consumerId) {
 
 const std::string& ClientConnection::brokerAddress() const { return 
physicalAddress_; }
 
-const std::string& ClientConnection::cnxString() const { return cnxString_; }
-
 int ClientConnection::getServerProtocolVersion() const { return 
serverProtocolVersion_; }
 
 int32_t ClientConnection::getMaxMessageSize() { return 
maxMessageSize_.load(std::memory_order_acquire); }
@@ -1441,7 +1404,7 @@ Future<Result, GetLastMessageIdResponse> 
ClientConnection::newGetLastMessageId(u
     auto promise = 
std::make_shared<GetLastMessageIdResponsePromisePtr::element_type>();
     if (isClosed()) {
         lock.unlock();
-        LOG_ERROR(cnxString_ << " Client is not connected to the broker");
+        LOG_ERROR(cnxString() << " Client is not connected to the broker");
         promise->setFailed(ResultNotConnected);
         return promise->getFuture();
     }
@@ -1450,12 +1413,8 @@ Future<Result, GetLastMessageIdResponse> 
ClientConnection::newGetLastMessageId(u
     requestData.promise = promise;
     requestData.timer = executor_->createDeadlineTimer();
     requestData.timer->expires_after(operationsTimeout_);
-    auto weakSelf = weak_from_this();
-    requestData.timer->async_wait([weakSelf, requestData](const ASIO_ERROR& 
ec) {
-        auto self = weakSelf.lock();
-        if (self) {
-            self->handleGetLastMessageIdTimeout(ec, requestData);
-        }
+    requestData.timer->async_wait([this, self{shared_from_this()}, 
requestData](const ASIO_ERROR& ec) {
+        handleGetLastMessageIdTimeout(ec, requestData);
     });
     pendingGetLastMessageIdRequests_.insert(std::make_pair(requestId, 
requestData));
     lock.unlock();
@@ -1469,7 +1428,7 @@ Future<Result, NamespaceTopicsPtr> 
ClientConnection::newGetTopicsOfNamespace(
     Promise<Result, NamespaceTopicsPtr> promise;
     if (isClosed()) {
         lock.unlock();
-        LOG_ERROR(cnxString_ << "Client is not connected to the broker");
+        LOG_ERROR(cnxString() << "Client is not connected to the broker");
         promise.setFailed(ResultNotConnected);
         return promise.getFuture();
     }
@@ -1487,7 +1446,7 @@ Future<Result, SchemaInfo> 
ClientConnection::newGetSchema(const std::string& top
     Promise<Result, SchemaInfo> promise;
     if (isClosed()) {
         lock.unlock();
-        LOG_ERROR(cnxString_ << "Client is not connected to the broker");
+        LOG_ERROR(cnxString() << "Client is not connected to the broker");
         promise.setFailed(ResultNotConnected);
         return promise.getFuture();
     }
@@ -1496,11 +1455,9 @@ Future<Result, SchemaInfo> 
ClientConnection::newGetSchema(const std::string& top
     pendingGetSchemaRequests_.emplace(requestId, GetSchemaRequest{promise, 
timer});
     lock.unlock();
 
-    auto weakSelf = weak_from_this();
     timer->expires_after(operationsTimeout_);
-    timer->async_wait([this, weakSelf, requestId](const ASIO_ERROR& ec) {
-        auto self = weakSelf.lock();
-        if (!self) {
+    timer->async_wait([this, self{shared_from_this()}, requestId](const 
ASIO_ERROR& ec) {
+        if (ec) {
             return;
         }
         Lock lock(mutex_);
@@ -1527,8 +1484,8 @@ void ClientConnection::handleSendReceipt(const 
proto::CommandSendReceipt& sendRe
     const proto::MessageIdData& messageIdData = sendReceipt.message_id();
     auto messageId = toMessageId(messageIdData);
 
-    LOG_DEBUG(cnxString_ << "Got receipt for producer: " << producerId << " -- 
msg: " << sequenceId
-                         << "-- message id: " << messageId);
+    LOG_DEBUG(cnxString() << "Got receipt for producer: " << producerId << " 
-- msg: " << sequenceId
+                          << "-- message id: " << messageId);
 
     Lock lock(mutex_);
     auto it = producers_.find(producerId);
@@ -1544,13 +1501,13 @@ void ClientConnection::handleSendReceipt(const 
proto::CommandSendReceipt& sendRe
             }
         }
     } else {
-        LOG_ERROR(cnxString_ << "Got invalid producer Id in SendReceipt: "  //
-                             << producerId << " -- msg: " << sequenceId);
+        LOG_ERROR(cnxString() << "Got invalid producer Id in SendReceipt: "  //
+                              << producerId << " -- msg: " << sequenceId);
     }
 }
 
 void ClientConnection::handleSendError(const proto::CommandSendError& error) {
-    LOG_WARN(cnxString_ << "Received send error from server: " << 
error.message());
+    LOG_WARN(cnxString() << "Received send error from server: " << 
error.message());
     if (ChecksumError == error.error()) {
         long producerId = error.producer_id();
         long sequenceId = error.sequence_id();
@@ -1574,7 +1531,7 @@ void ClientConnection::handleSendError(const 
proto::CommandSendError& error) {
 }
 
 void ClientConnection::handleSuccess(const proto::CommandSuccess& success) {
-    LOG_DEBUG(cnxString_ << "Received success response from server. req_id: " 
<< success.request_id());
+    LOG_DEBUG(cnxString() << "Received success response from server. req_id: " 
<< success.request_id());
 
     Lock lock(mutex_);
     auto it = pendingRequests_.find(success.request_id());
@@ -1590,8 +1547,8 @@ void ClientConnection::handleSuccess(const 
proto::CommandSuccess& success) {
 
 void ClientConnection::handlePartitionedMetadataResponse(
     const proto::CommandPartitionedTopicMetadataResponse& 
partitionMetadataResponse) {
-    LOG_DEBUG(cnxString_ << "Received partition-metadata response from server. 
req_id: "
-                         << partitionMetadataResponse.request_id());
+    LOG_DEBUG(cnxString() << "Received partition-metadata response from 
server. req_id: "
+                          << partitionMetadataResponse.request_id());
 
     Lock lock(mutex_);
     auto it = 
pendingLookupRequests_.find(partitionMetadataResponse.request_id());
@@ -1607,16 +1564,16 @@ void 
ClientConnection::handlePartitionedMetadataResponse(
             (partitionMetadataResponse.response() ==
              proto::CommandPartitionedTopicMetadataResponse::Failed)) {
             if (partitionMetadataResponse.has_error()) {
-                LOG_ERROR(cnxString_ << "Failed partition-metadata lookup 
req_id: "
-                                     << partitionMetadataResponse.request_id()
-                                     << " error: " << 
partitionMetadataResponse.error()
-                                     << " msg: " << 
partitionMetadataResponse.message());
+                LOG_ERROR(cnxString() << "Failed partition-metadata lookup 
req_id: "
+                                      << partitionMetadataResponse.request_id()
+                                      << " error: " << 
partitionMetadataResponse.error()
+                                      << " msg: " << 
partitionMetadataResponse.message());
                 checkServerError(partitionMetadataResponse.error(), 
partitionMetadataResponse.message());
                 lookupDataPromise->setFailed(
                     getResult(partitionMetadataResponse.error(), 
partitionMetadataResponse.message()));
             } else {
-                LOG_ERROR(cnxString_ << "Failed partition-metadata lookup 
req_id: "
-                                     << partitionMetadataResponse.request_id() 
<< " with empty response: ");
+                LOG_ERROR(cnxString() << "Failed partition-metadata lookup 
req_id: "
+                                      << 
partitionMetadataResponse.request_id() << " with empty response: ");
                 lookupDataPromise->setFailed(ResultConnectError);
             }
         } else {
@@ -1632,9 +1589,9 @@ void ClientConnection::handlePartitionedMetadataResponse(
 
 void ClientConnection::handleConsumerStatsResponse(
     const proto::CommandConsumerStatsResponse& consumerStatsResponse) {
-    LOG_DEBUG(cnxString_ << "ConsumerStatsResponse command - Received consumer 
stats "
-                            "response from server. req_id: "
-                         << consumerStatsResponse.request_id());
+    LOG_DEBUG(cnxString() << "ConsumerStatsResponse command - Received 
consumer stats "
+                             "response from server. req_id: "
+                          << consumerStatsResponse.request_id());
     Lock lock(mutex_);
     auto it = 
pendingConsumerStatsMap_.find(consumerStatsResponse.request_id());
     if (it != pendingConsumerStatsMap_.end()) {
@@ -1644,15 +1601,15 @@ void ClientConnection::handleConsumerStatsResponse(
 
         if (consumerStatsResponse.has_error_code()) {
             if (consumerStatsResponse.has_error_message()) {
-                LOG_ERROR(cnxString_ << " Failed to get consumer stats - "
-                                     << consumerStatsResponse.error_message());
+                LOG_ERROR(cnxString()
+                          << " Failed to get consumer stats - " << 
consumerStatsResponse.error_message());
             }
             consumerStatsPromise.setFailed(
                 getResult(consumerStatsResponse.error_code(), 
consumerStatsResponse.error_message()));
         } else {
-            LOG_DEBUG(cnxString_ << "ConsumerStatsResponse command - Received 
consumer stats "
-                                    "response from server. req_id: "
-                                 << consumerStatsResponse.request_id() << " 
Stats: ");
+            LOG_DEBUG(cnxString() << "ConsumerStatsResponse command - Received 
consumer stats "
+                                     "response from server. req_id: "
+                                  << consumerStatsResponse.request_id() << " 
Stats: ");
             BrokerConsumerStatsImpl brokerStats(
                 consumerStatsResponse.msgrateout(), 
consumerStatsResponse.msgthroughputout(),
                 consumerStatsResponse.msgrateredeliver(), 
consumerStatsResponse.consumername(),
@@ -1682,25 +1639,25 @@ void ClientConnection::handleLookupTopicRespose(
         if (!lookupTopicResponse.has_response() ||
             (lookupTopicResponse.response() == 
proto::CommandLookupTopicResponse::Failed)) {
             if (lookupTopicResponse.has_error()) {
-                LOG_ERROR(cnxString_ << "Failed lookup req_id: " << 
lookupTopicResponse.request_id()
-                                     << " error: " << 
lookupTopicResponse.error()
-                                     << " msg: " << 
lookupTopicResponse.message());
+                LOG_ERROR(cnxString() << "Failed lookup req_id: " << 
lookupTopicResponse.request_id()
+                                      << " error: " << 
lookupTopicResponse.error()
+                                      << " msg: " << 
lookupTopicResponse.message());
                 checkServerError(lookupTopicResponse.error(), 
lookupTopicResponse.message());
                 lookupDataPromise->setFailed(
                     getResult(lookupTopicResponse.error(), 
lookupTopicResponse.message()));
             } else {
-                LOG_ERROR(cnxString_ << "Failed lookup req_id: " << 
lookupTopicResponse.request_id()
-                                     << " with empty response: ");
+                LOG_ERROR(cnxString() << "Failed lookup req_id: " << 
lookupTopicResponse.request_id()
+                                      << " with empty response: ");
                 lookupDataPromise->setFailed(ResultConnectError);
             }
         } else {
-            LOG_DEBUG(cnxString_ << "Received lookup response from server. 
req_id: "
-                                 << lookupTopicResponse.request_id()  //
-                                 << " -- broker-url: " << 
lookupTopicResponse.brokerserviceurl()
-                                 << " -- broker-tls-url: "  //
-                                 << lookupTopicResponse.brokerserviceurltls()
-                                 << " authoritative: " << 
lookupTopicResponse.authoritative()  //
-                                 << " redirect: " << 
lookupTopicResponse.response());
+            LOG_DEBUG(cnxString() << "Received lookup response from server. 
req_id: "
+                                  << lookupTopicResponse.request_id()  //
+                                  << " -- broker-url: " << 
lookupTopicResponse.brokerserviceurl()
+                                  << " -- broker-tls-url: "  //
+                                  << lookupTopicResponse.brokerserviceurltls()
+                                  << " authoritative: " << 
lookupTopicResponse.authoritative()  //
+                                  << " redirect: " << 
lookupTopicResponse.response());
             LookupDataResultPtr lookupResultPtr = 
std::make_shared<LookupDataResult>();
 
             if (tlsSocket_) {
@@ -1723,17 +1680,18 @@ void ClientConnection::handleLookupTopicRespose(
 }
 
 void ClientConnection::handleProducerSuccess(const 
proto::CommandProducerSuccess& producerSuccess) {
-    LOG_DEBUG(cnxString_ << "Received success producer response from server. 
req_id: "
-                         << producerSuccess.request_id()  //
-                         << " -- producer name: " << 
producerSuccess.producer_name());
+    LOG_DEBUG(cnxString() << "Received success producer response from server. 
req_id: "
+                          << producerSuccess.request_id()  //
+                          << " -- producer name: " << 
producerSuccess.producer_name());
 
     Lock lock(mutex_);
     auto it = pendingRequests_.find(producerSuccess.request_id());
     if (it != pendingRequests_.end()) {
         PendingRequestData requestData = it->second;
         if (!producerSuccess.producer_ready()) {
-            LOG_INFO(cnxString_ << " Producer " << 
producerSuccess.producer_name()
-                                << " has been queued up at broker. req_id: " 
<< producerSuccess.request_id());
+            LOG_INFO(cnxString() << " Producer " << 
producerSuccess.producer_name()
+                                 << " has been queued up at broker. req_id: "
+                                 << producerSuccess.request_id());
             requestData.hasGotResponse->store(true);
             lock.unlock();
         } else {
@@ -1758,9 +1716,9 @@ void ClientConnection::handleProducerSuccess(const 
proto::CommandProducerSuccess
 
 void ClientConnection::handleError(const proto::CommandError& error) {
     Result result = getResult(error.error(), error.message());
-    LOG_WARN(cnxString_ << "Received error response from server: " << result
-                        << (error.has_message() ? (" (" + error.message() + 
")") : "")
-                        << " -- req_id: " << error.request_id());
+    LOG_WARN(cnxString() << "Received error response from server: " << result
+                         << (error.has_message() ? (" (" + error.message() + 
")") : "")
+                         << " -- req_id: " << error.request_id());
 
     Lock lock(mutex_);
 
@@ -1890,7 +1848,7 @@ void ClientConnection::handleCloseProducer(const 
proto::CommandCloseProducer& cl
             producer->disconnectProducer(assignedBrokerServiceUrl);
         }
     } else {
-        LOG_ERROR(cnxString_ << "Got invalid producer Id in closeProducer 
command: " << producerId);
+        LOG_ERROR(cnxString() << "Got invalid producer Id in closeProducer 
command: " << producerId);
     }
 }
 
@@ -1911,17 +1869,17 @@ void ClientConnection::handleCloseConsumer(const 
proto::CommandCloseConsumer& cl
             consumer->disconnectConsumer(assignedBrokerServiceUrl);
         }
     } else {
-        LOG_ERROR(cnxString_ << "Got invalid consumer Id in closeConsumer 
command: " << consumerId);
+        LOG_ERROR(cnxString() << "Got invalid consumer Id in closeConsumer 
command: " << consumerId);
     }
 }
 
 void ClientConnection::handleAuthChallenge() {
-    LOG_DEBUG(cnxString_ << "Received auth challenge from broker");
+    LOG_DEBUG(cnxString() << "Received auth challenge from broker");
 
     Result result;
     SharedBuffer buffer = Commands::newAuthResponse(authentication_, result);
     if (result != ResultOk) {
-        LOG_ERROR(cnxString_ << "Failed to send auth response: " << result);
+        LOG_ERROR(cnxString() << "Failed to send auth response: " << result);
         close(result);
         return;
     }
@@ -1934,8 +1892,8 @@ void ClientConnection::handleAuthChallenge() {
 
 void ClientConnection::handleGetLastMessageIdResponse(
     const proto::CommandGetLastMessageIdResponse& getLastMessageIdResponse) {
-    LOG_DEBUG(cnxString_ << "Received getLastMessageIdResponse from server. 
req_id: "
-                         << getLastMessageIdResponse.request_id());
+    LOG_DEBUG(cnxString() << "Received getLastMessageIdResponse from server. 
req_id: "
+                          << getLastMessageIdResponse.request_id());
 
     Lock lock(mutex_);
     auto it = 
pendingGetLastMessageIdRequests_.find(getLastMessageIdResponse.request_id());
@@ -1961,8 +1919,8 @@ void ClientConnection::handleGetLastMessageIdResponse(
 
 void ClientConnection::handleGetTopicOfNamespaceResponse(
     const proto::CommandGetTopicsOfNamespaceResponse& response) {
-    LOG_DEBUG(cnxString_ << "Received GetTopicsOfNamespaceResponse from 
server. req_id: "
-                         << response.request_id() << " topicsSize" << 
response.topics_size());
+    LOG_DEBUG(cnxString() << "Received GetTopicsOfNamespaceResponse from 
server. req_id: "
+                          << response.request_id() << " topicsSize" << 
response.topics_size());
 
     Lock lock(mutex_);
     auto it = pendingGetNamespaceTopicsRequests_.find(response.request_id());
@@ -2001,7 +1959,7 @@ void ClientConnection::handleGetTopicOfNamespaceResponse(
 }
 
 void ClientConnection::handleGetSchemaResponse(const 
proto::CommandGetSchemaResponse& response) {
-    LOG_DEBUG(cnxString_ << "Received GetSchemaResponse from server. req_id: " 
<< response.request_id());
+    LOG_DEBUG(cnxString() << "Received GetSchemaResponse from server. req_id: 
" << response.request_id());
     Lock lock(mutex_);
     auto it = pendingGetSchemaRequests_.find(response.request_id());
     if (it != pendingGetSchemaRequests_.end()) {
@@ -2012,10 +1970,11 @@ void ClientConnection::handleGetSchemaResponse(const 
proto::CommandGetSchemaResp
         if (response.has_error_code()) {
             Result result = getResult(response.error_code(), 
response.error_message());
             if (response.error_code() != proto::TopicNotFound) {
-                LOG_WARN(cnxString_ << "Received error GetSchemaResponse from 
server " << result
-                                    << (response.has_error_message() ? (" (" + 
response.error_message() + ")")
-                                                                     : "")
-                                    << " -- req_id: " << 
response.request_id());
+                LOG_WARN(cnxString() << "Received error GetSchemaResponse from 
server " << result
+                                     << (response.has_error_message()
+                                             ? (" (" + 
response.error_message() + ")")
+                                             : "")
+                                     << " -- req_id: " << 
response.request_id());
             }
             getSchemaPromise.setFailed(result);
             return;
@@ -2039,7 +1998,7 @@ void ClientConnection::handleGetSchemaResponse(const 
proto::CommandGetSchemaResp
 }
 
 void ClientConnection::handleAckResponse(const proto::CommandAckResponse& 
response) {
-    LOG_DEBUG(cnxString_ << "Received AckResponse from server. req_id: " << 
response.request_id());
+    LOG_DEBUG(cnxString() << "Received AckResponse from server. req_id: " << 
response.request_id());
 
     Lock lock(mutex_);
     auto it = pendingRequests_.find(response.request_id());
diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h
index b277000..b9880ee 100644
--- a/lib/ClientConnection.h
+++ b/lib/ClientConnection.h
@@ -25,6 +25,8 @@
 #include <any>
 #include <atomic>
 #include <cstdint>
+#include <future>
+#include <optional>
 #ifdef USE_ASIO
 #include <asio/bind_executor.hpp>
 #include <asio/io_context.hpp>
@@ -41,8 +43,10 @@
 #include <deque>
 #include <functional>
 #include <memory>
+#include <mutex>
 #include <string>
 #include <unordered_map>
+#include <utility>
 #include <vector>
 
 #include "AsioTimer.h"
@@ -156,11 +160,8 @@ class PULSAR_PUBLIC ClientConnection : public 
std::enable_shared_from_this<Clien
      * Close the connection.
      *
      * @param result all pending futures will complete with this result
-     * @param detach remove it from the pool if it's true
-     *
-     * `detach` should only be false when the connection pool is closed.
      */
-    void close(Result result = ResultConnectError, bool detach = true);
+    const std::future<void>& close(Result result = ResultConnectError);
 
     bool isClosed() const;
 
@@ -193,7 +194,7 @@ class PULSAR_PUBLIC ClientConnection : public 
std::enable_shared_from_this<Clien
 
     const std::string& brokerAddress() const;
 
-    const std::string& cnxString() const;
+    auto cnxString() const { return *std::atomic_load(&cnxStringPtr_); }
 
     int getServerProtocolVersion() const;
 
@@ -219,28 +220,48 @@ class PULSAR_PUBLIC ClientConnection : public 
std::enable_shared_from_this<Clien
         mockingRequests_.store(true, std::memory_order_release);
     }
 
-    void handleKeepAliveTimeout();
+    void handleKeepAliveTimeout(const ASIO_ERROR& ec);
 
    private:
     struct PendingRequestData {
         Promise<Result, ResponseData> promise;
         DeadlineTimerPtr timer;
         std::shared_ptr<std::atomic_bool> 
hasGotResponse{std::make_shared<std::atomic_bool>(false)};
+
+        void fail(Result result) {
+            cancelTimer(*timer);
+            promise.setFailed(result);
+        }
     };
 
     struct LookupRequestData {
         LookupDataResultPromisePtr promise;
         DeadlineTimerPtr timer;
+
+        void fail(Result result) {
+            cancelTimer(*timer);
+            promise->setFailed(result);
+        }
     };
 
     struct LastMessageIdRequestData {
         GetLastMessageIdResponsePromisePtr promise;
         DeadlineTimerPtr timer;
+
+        void fail(Result result) {
+            cancelTimer(*timer);
+            promise->setFailed(result);
+        }
     };
 
     struct GetSchemaRequest {
         Promise<Result, SchemaInfo> promise;
         DeadlineTimerPtr timer;
+
+        void fail(Result result) {
+            cancelTimer(*timer);
+            promise.setFailed(result);
+        }
     };
 
     /*
@@ -297,26 +318,26 @@ class PULSAR_PUBLIC ClientConnection : public 
std::enable_shared_from_this<Clien
     }
 
     template <typename ConstBufferSequence, typename WriteHandler>
-    inline void asyncWrite(const ConstBufferSequence& buffers, WriteHandler 
handler) {
+    inline void asyncWrite(const ConstBufferSequence& buffers, WriteHandler&& 
handler) {
         if (isClosed()) {
             return;
         }
         if (tlsSocket_) {
-            ASIO::async_write(*tlsSocket_, buffers, 
ASIO::bind_executor(strand_, handler));
+            ASIO::async_write(*tlsSocket_, buffers, 
std::forward<WriteHandler>(handler));
         } else {
-            ASIO::async_write(*socket_, buffers, handler);
+            ASIO::async_write(*socket_, buffers, 
std::forward<WriteHandler>(handler));
         }
     }
 
     template <typename MutableBufferSequence, typename ReadHandler>
-    inline void asyncReceive(const MutableBufferSequence& buffers, ReadHandler 
handler) {
+    inline void asyncReceive(const MutableBufferSequence& buffers, 
ReadHandler&& handler) {
         if (isClosed()) {
             return;
         }
         if (tlsSocket_) {
-            tlsSocket_->async_read_some(buffers, ASIO::bind_executor(strand_, 
handler));
+            tlsSocket_->async_read_some(buffers, 
std::forward<ReadHandler>(handler));
         } else {
-            socket_->async_receive(buffers, handler);
+            socket_->async_receive(buffers, 
std::forward<ReadHandler>(handler));
         }
     }
 
@@ -337,7 +358,6 @@ class PULSAR_PUBLIC ClientConnection : public 
std::enable_shared_from_this<Clien
      */
     SocketPtr socket_;
     TlsSocketPtr tlsSocket_;
-    ASIO::strand<ASIO::io_context::executor_type> strand_;
 
     const std::string logicalAddress_;
     /*
@@ -350,7 +370,7 @@ class PULSAR_PUBLIC ClientConnection : public 
std::enable_shared_from_this<Clien
     ClientConfiguration::ProxyProtocol proxyProtocol_;
 
     // Represent both endpoint of the tcp connection. eg: [client:1234 -> 
server:6650]
-    std::string cnxString_;
+    std::shared_ptr<std::string> cnxStringPtr_;
 
     /*
      *  indicates if async connection establishment failed
@@ -360,7 +380,8 @@ class PULSAR_PUBLIC ClientConnection : public 
std::enable_shared_from_this<Clien
     SharedBuffer incomingBuffer_;
 
     Promise<Result, ClientConnectionWeakPtr> connectPromise_;
-    std::shared_ptr<PeriodicTask> connectTimeoutTask_;
+    const std::chrono::milliseconds connectTimeout_;
+    const DeadlineTimerPtr connectTimer_;
 
     typedef std::map<long, PendingRequestData> PendingRequestsMap;
     PendingRequestsMap pendingRequests_;
@@ -419,6 +440,7 @@ class PULSAR_PUBLIC ClientConnection : public 
std::enable_shared_from_this<Clien
     const std::string clientVersion_;
     ConnectionPool& pool_;
     const size_t poolIndex_;
+    std::optional<std::future<void>> closeFuture_;
 
     friend class PulsarFriend;
     friend class ConsumerTest;
diff --git a/lib/ConnectionPool.cc b/lib/ConnectionPool.cc
index df050b0..c814cf8 100644
--- a/lib/ConnectionPool.cc
+++ b/lib/ConnectionPool.cc
@@ -54,16 +54,43 @@ bool ConnectionPool::close() {
         return false;
     }
 
+    std::vector<ClientConnectionPtr> connectionsToClose;
+    // ClientConnection::close() will remove the connection from the pool, 
which is not allowed when iterating
+    // over a map, so we store the connections to close in a vector first and 
don't iterate the pool when
+    // closing the connections.
     std::unique_lock<std::recursive_mutex> lock(mutex_);
+    connectionsToClose.reserve(pool_.size());
+    for (auto&& kv : pool_) {
+        connectionsToClose.emplace_back(kv.second);
+    }
+    pool_.clear();
+    lock.unlock();
 
-    for (auto cnxIt = pool_.begin(); cnxIt != pool_.end(); cnxIt++) {
-        auto& cnx = cnxIt->second;
+    for (auto&& cnx : connectionsToClose) {
         if (cnx) {
-            // The 2nd argument is false because removing a value during the 
iteration will cause segfault
-            cnx->close(ResultDisconnected, false);
+            // Close with a fatal error to not let client retry
+            auto& future = cnx->close(ResultAlreadyClosed);
+            using namespace std::chrono_literals;
+            if (auto status = future.wait_for(5s); status != 
std::future_status::ready) {
+                LOG_WARN("Connection close timed out for " << 
cnx.get()->cnxString());
+            }
+            if (cnx.use_count() > 1) {
+                // There are some asynchronous operations that hold the 
reference on the connection, we should
+                // wait until them to finish. Otherwise, `io_context::stop()` 
will be called in
+                // `ClientImpl::shutdown()` when closing the 
`ExecutorServiceProvider`. Then
+                // `io_context::run()` will return and the `io_context` object 
will be destroyed. In this
+                // case, if there is any pending handler, it will crash.
+                for (int i = 0; i < 500 && cnx.use_count() > 1; i++) {
+                    std::this_thread::sleep_for(10ms);
+                }
+                if (cnx.use_count() > 1) {
+                    LOG_WARN("Connection still has " << (cnx.use_count() - 1)
+                                                     << " references after 
waiting for 5 seconds for "
+                                                     << 
cnx.get()->cnxString());
+                }
+            }
         }
     }
-    pool_.clear();
     return true;
 }
 
diff --git a/lib/ExecutorService.cc b/lib/ExecutorService.cc
index 99e2393..eba7486 100644
--- a/lib/ExecutorService.cc
+++ b/lib/ExecutorService.cc
@@ -125,8 +125,6 @@ void ExecutorService::close(long timeoutMs) {
     }
 }
 
-void ExecutorService::postWork(std::function<void(void)> task) { 
ASIO::post(io_context_, std::move(task)); }
-
 /////////////////////
 
 ExecutorServiceProvider::ExecutorServiceProvider(int nthreads)
diff --git a/lib/ExecutorService.h b/lib/ExecutorService.h
index 626cb20..80659d4 100644
--- a/lib/ExecutorService.h
+++ b/lib/ExecutorService.h
@@ -23,12 +23,16 @@
 
 #include <atomic>
 #ifdef USE_ASIO
+#include <asio/dispatch.hpp>
 #include <asio/io_context.hpp>
 #include <asio/ip/tcp.hpp>
+#include <asio/post.hpp>
 #include <asio/ssl.hpp>
 #else
+#include <boost/asio/dispatch.hpp>
 #include <boost/asio/io_context.hpp>
 #include <boost/asio/ip/tcp.hpp>
+#include <boost/asio/post.hpp>
 #include <boost/asio/ssl.hpp>
 #endif
 #include <chrono>
@@ -37,6 +41,7 @@
 #include <memory>
 #include <mutex>
 #include <thread>
+#include <utility>
 
 #include "AsioTimer.h"
 
@@ -62,7 +67,19 @@ class PULSAR_PUBLIC ExecutorService : public 
std::enable_shared_from_this<Execut
     TcpResolverPtr createTcpResolver();
     // throws std::runtime_error if failed
     DeadlineTimerPtr createDeadlineTimer();
-    void postWork(std::function<void(void)> task);
+
+    // Execute the task in the event loop thread asynchronously, i.e. the task 
will be put in the event loop
+    // queue and executed later.
+    template <typename T>
+    void postWork(T &&task) {
+        ASIO::post(io_context_, std::forward<T>(task));
+    }
+
+    // Different from `postWork`, if it's already in the event loop, execute 
the task immediately
+    template <typename T>
+    void dispatch(T &&task) {
+        ASIO::dispatch(io_context_, std::forward<T>(task));
+    }
 
     // See TimeoutProcessor for the semantics of the parameter.
     void close(long timeoutMs = 3000);
diff --git a/lib/PeriodicTask.h b/lib/PeriodicTask.h
index bc18634..ee19182 100644
--- a/lib/PeriodicTask.h
+++ b/lib/PeriodicTask.h
@@ -53,7 +53,7 @@ class PeriodicTask : public 
std::enable_shared_from_this<PeriodicTask> {
 
     void stop() noexcept;
 
-    void setCallback(CallbackType callback) noexcept { callback_ = callback; }
+    void setCallback(CallbackType&& callback) noexcept { callback_ = 
std::move(callback); }
 
     State getState() const noexcept { return state_; }
     int getPeriodMs() const noexcept { return periodMs_; }
diff --git a/tests/BasicEndToEndTest.cc b/tests/BasicEndToEndTest.cc
index 9a02df0..d3c6e61 100644
--- a/tests/BasicEndToEndTest.cc
+++ b/tests/BasicEndToEndTest.cc
@@ -3188,7 +3188,17 @@ static void expectTimeoutOnRecv(Consumer &consumer) {
     ASSERT_EQ(ResultTimeout, res);
 }
 
-void testNegativeAcks(const std::string &topic, bool batchingEnabled) {
+static std::vector<std::string> expectedNegativeAckMessages(size_t 
numMessages) {
+    std::vector<std::string> expected;
+    expected.reserve(numMessages);
+    for (size_t i = 0; i < numMessages; i++) {
+        expected.emplace_back("test-" + std::to_string(i));
+    }
+    return expected;
+}
+
+void testNegativeAcks(const std::string &topic, bool batchingEnabled, bool 
expectOrdered = true) {
+    constexpr size_t numMessages = 10;
     Client client(lookupUrl);
     Consumer consumer;
     ConsumerConfiguration conf;
@@ -3202,22 +3212,32 @@ void testNegativeAcks(const std::string &topic, bool 
batchingEnabled) {
     result = client.createProducer(topic, producerConf, producer);
     ASSERT_EQ(ResultOk, result);
 
-    for (int i = 0; i < 10; i++) {
+    for (size_t i = 0; i < numMessages; i++) {
         Message msg = MessageBuilder().setContent("test-" + 
std::to_string(i)).build();
         producer.sendAsync(msg, nullptr);
     }
 
     producer.flush();
 
+    std::vector<std::string> receivedMessages;
+    receivedMessages.reserve(numMessages);
     std::vector<MessageId> toNeg;
-    for (int i = 0; i < 10; i++) {
+    for (size_t i = 0; i < numMessages; i++) {
         Message msg;
         consumer.receive(msg);
 
         LOG_INFO("Received message " << msg.getDataAsString());
-        ASSERT_EQ(msg.getDataAsString(), "test-" + std::to_string(i));
+        if (expectOrdered) {
+            ASSERT_EQ(msg.getDataAsString(), "test-" + std::to_string(i));
+        }
+        receivedMessages.emplace_back(msg.getDataAsString());
         toNeg.push_back(msg.getMessageId());
     }
+    if (!expectOrdered) {
+        auto expectedMessages = expectedNegativeAckMessages(numMessages);
+        std::sort(receivedMessages.begin(), receivedMessages.end());
+        ASSERT_EQ(expectedMessages, receivedMessages);
+    }
     // No more messages expected
     expectTimeoutOnRecv(consumer);
 
@@ -3228,15 +3248,25 @@ void testNegativeAcks(const std::string &topic, bool 
batchingEnabled) {
     }
     PulsarFriend::setNegativeAckEnabled(consumer, true);
 
-    for (int i = 0; i < 10; i++) {
+    std::vector<std::string> redeliveredMessages;
+    redeliveredMessages.reserve(numMessages);
+    for (size_t i = 0; i < numMessages; i++) {
         Message msg;
         consumer.receive(msg);
         LOG_INFO("-- Redelivery -- Received message " << 
msg.getDataAsString());
 
-        ASSERT_EQ(msg.getDataAsString(), "test-" + std::to_string(i));
+        if (expectOrdered) {
+            ASSERT_EQ(msg.getDataAsString(), "test-" + std::to_string(i));
+        }
+        redeliveredMessages.emplace_back(msg.getDataAsString());
 
         consumer.acknowledge(msg);
     }
+    if (!expectOrdered) {
+        auto expectedMessages = expectedNegativeAckMessages(numMessages);
+        std::sort(redeliveredMessages.begin(), redeliveredMessages.end());
+        ASSERT_EQ(expectedMessages, redeliveredMessages);
+    }
 
     // No more messages expected
     expectTimeoutOnRecv(consumer);
@@ -3262,7 +3292,7 @@ TEST(BasicEndToEndTest, testNegativeAcksWithPartitions) {
     LOG_INFO("res = " << res);
     ASSERT_FALSE(res != 204 && res != 409);
 
-    testNegativeAcks(topicName, true);
+    testNegativeAcks(topicName, true, false);
 }
 
 void testNegativeAckPrecisionBitCnt(const std::string &topic, int 
precisionBitCnt) {
diff --git a/tests/ClientTest.cc b/tests/ClientTest.cc
index dd89268..6bd6cc8 100644
--- a/tests/ClientTest.cc
+++ b/tests/ClientTest.cc
@@ -21,14 +21,18 @@
 #include <pulsar/Version.h>
 
 #include <algorithm>
+#include <atomic>
 #include <chrono>
 #include <future>
 #include <sstream>
+#include <thread>
+#include <utility>
 
 #include "MockClientImpl.h"
 #include "PulsarAdminHelper.h"
 #include "PulsarFriend.h"
 #include "WaitUtils.h"
+#include "lib/AsioDefines.h"
 #include "lib/ClientConnection.h"
 #include "lib/LogUtils.h"
 #include "lib/checksum/ChecksumProvider.h"
@@ -42,6 +46,70 @@ using testing::AtLeast;
 static std::string lookupUrl = "pulsar://localhost:6650";
 static std::string adminUrl = "http://localhost:8080/";;
 
+namespace {
+
+class SilentTcpServer {
+   public:
+    SilentTcpServer()
+        : acceptor_(ioContext_, ASIO::ip::tcp::endpoint(ASIO::ip::tcp::v4(), 
0)),
+          acceptedFuture_(acceptedPromise_.get_future()),
+          port_(acceptor_.local_endpoint().port()),
+          workGuard_(ASIO::make_work_guard(ioContext_)) {}
+
+    ~SilentTcpServer() { stop(); }
+
+    int getPort() const noexcept { return port_; }
+
+    void start() {
+        serverThread_ = std::thread([this] {
+            socket_.reset(new ASIO::ip::tcp::socket(ioContext_));
+            acceptor_.async_accept(
+                *socket_, [this](const ASIO_ERROR &acceptError) { 
acceptedPromise_.set_value(acceptError); });
+
+            ioContext_.run();
+        });
+    }
+
+    bool waitUntilAccepted(std::chrono::milliseconds timeout) const {
+        return acceptedFuture_.wait_for(timeout) == std::future_status::ready;
+    }
+
+    auto acceptedError() const { return acceptedFuture_.get(); }
+
+    void stop() {
+        bool expected = false;
+        if (!stopped_.compare_exchange_strong(expected, true) || 
!serverThread_.joinable()) {
+            return;
+        }
+        ASIO::post(ioContext_, [this] {
+            ASIO_ERROR closeError;
+            if (socket_ && socket_->is_open()) {
+                socket_->close(closeError);
+            }
+            if (acceptor_.is_open()) {
+                acceptor_.close(closeError);
+            }
+            workGuard_.reset();
+        });
+        serverThread_.join();
+    }
+
+   private:
+    using WorkGuard = 
decltype(ASIO::make_work_guard(std::declval<ASIO::io_context &>()));
+
+    ASIO::io_context ioContext_;
+    ASIO::ip::tcp::acceptor acceptor_;
+    std::unique_ptr<ASIO::ip::tcp::socket> socket_;
+    std::promise<ASIO_ERROR> acceptedPromise_;
+    std::shared_future<ASIO_ERROR> acceptedFuture_;
+    const int port_;
+    WorkGuard workGuard_;
+    std::atomic_bool stopped_{false};
+    std::thread serverThread_;
+};
+
+}  // namespace
+
 TEST(ClientTest, testChecksumComputation) {
     std::string data = "test";
     std::string doubleData = "testtest";
@@ -137,6 +205,32 @@ TEST(ClientTest, testConnectTimeout) {
     ASSERT_EQ(futureDefault.get(), ResultDisconnected);
 }
 
+TEST(ClientTest, testConnectTimeoutAfterTcpConnected) {
+    std::unique_ptr<SilentTcpServer> server;
+    try {
+        server.reset(new SilentTcpServer);
+    } catch (const ASIO_SYSTEM_ERROR &e) {
+        GTEST_SKIP() << "Cannot bind local test server in this environment: " 
<< e.what();
+    }
+    server->start();
+
+    const std::string serviceUrl = "pulsar://127.0.0.1:" + 
std::to_string(server->getPort());
+    Client client(serviceUrl, ClientConfiguration().setConnectionTimeout(200));
+
+    std::promise<Result> promise;
+    auto future = promise.get_future();
+    client.createProducerAsync("test-connect-timeout-after-tcp-connected",
+                               [&promise](Result result, const Producer &) { 
promise.set_value(result); });
+
+    ASSERT_TRUE(server->waitUntilAccepted(std::chrono::seconds(1)));
+    ASSERT_FALSE(server->acceptedError());
+    ASSERT_EQ(future.wait_for(std::chrono::seconds(2)), 
std::future_status::ready);
+    ASSERT_EQ(future.get(), ResultConnectError);
+
+    client.close();
+    server->stop();
+}
+
 TEST(ClientTest, testGetNumberOfReferences) {
     Client client("pulsar://localhost:6650");
 
@@ -309,14 +403,19 @@ TEST(ClientTest, testMultiBrokerUrl) {
 TEST(ClientTest, testCloseClient) {
     const std::string topic = "client-test-close-client-" + 
std::to_string(time(nullptr));
 
+    using namespace std::chrono;
     for (int i = 0; i < 1000; ++i) {
         Client client(lookupUrl);
         client.createProducerAsync(topic, [](Result result, Producer producer) 
{ producer.close(); });
         // simulate different time interval before close
-        auto t0 = std::chrono::steady_clock::now();
-        while ((std::chrono::steady_clock::now() - t0) < 
std::chrono::microseconds(i)) {
+        auto t0 = steady_clock::now();
+        while ((steady_clock::now() - t0) < microseconds(i)) {
         }
-        client.close();
+
+        auto t1 = std::chrono::steady_clock::now();
+        ASSERT_EQ(ResultOk, client.close());
+        auto closeTimeMs = duration_cast<milliseconds>(steady_clock::now() - 
t1).count();
+        ASSERT_TRUE(closeTimeMs < 1000) << "close time: " << closeTimeMs << " 
ms";
     }
 }
 
@@ -413,7 +512,7 @@ TEST(ClientTest, testConnectionClose) {
             LOG_INFO("Connection refcnt: " << cnx.use_count() << " before 
close");
             auto executor = PulsarFriend::getExecutor(*cnx);
             // Simulate the close() happens in the event loop
-            executor->postWork([cnx, &client, numConnections] {
+            executor->dispatch([cnx, &client, numConnections] {
                 cnx->close();
                 ASSERT_EQ(PulsarFriend::getConnections(client).size(), 
numConnections - 1);
                 LOG_INFO("Connection refcnt: " << cnx.use_count() << " after 
close");
diff --git a/tests/MultiTopicsConsumerTest.cc b/tests/MultiTopicsConsumerTest.cc
index db3bc96..57407fb 100644
--- a/tests/MultiTopicsConsumerTest.cc
+++ b/tests/MultiTopicsConsumerTest.cc
@@ -162,11 +162,12 @@ TEST(MultiTopicsConsumerTest, testGetConsumerStatsFail) {
         BrokerConsumerStats stats;
         return consumer.getBrokerConsumerStats(stats);
     });
-    // Trigger the `getBrokerConsumerStats` in a new thread
-    future.wait_for(std::chrono::milliseconds(100));
-    std::this_thread::sleep_for(std::chrono::milliseconds(100));
+    const auto expectedRequests = topics.size();
+    ASSERT_TRUE(waitUntil(std::chrono::seconds(1), [connection, 
expectedRequests] {
+        return PulsarFriend::getPendingConsumerStatsRequests(*connection) == 
expectedRequests;
+    }));
 
-    connection->handleKeepAliveTimeout();
+    connection->close(ResultDisconnected);
     ASSERT_EQ(ResultDisconnected, future.get());
 
     mockServer->close();
diff --git a/tests/PulsarFriend.h b/tests/PulsarFriend.h
index e708405..1f351d1 100644
--- a/tests/PulsarFriend.h
+++ b/tests/PulsarFriend.h
@@ -162,6 +162,11 @@ class PulsarFriend {
         return consumers;
     }
 
+    static size_t getPendingConsumerStatsRequests(const ClientConnection& cnx) 
{
+        std::lock_guard<std::mutex> lock(cnx.mutex_);
+        return cnx.pendingConsumerStatsMap_.size();
+    }
+
     static void setNegativeAckEnabled(Consumer consumer, bool enabled) {
         consumer.impl_->setNegativeAcknowledgeEnabledForTesting(enabled);
     }

Reply via email to