This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 967529b  [fix][client-cpp] Fix Reader segfault when 
messageListenerThreads=0 (#553)
967529b is described below

commit 967529b79f8fae55902b679b7e2248ff8e8f98c3
Author: zhanglistar <[email protected]>
AuthorDate: Tue Mar 17 19:57:26 2026 +0800

    [fix][client-cpp] Fix Reader segfault when messageListenerThreads=0 (#553)
---
 lib/ConsumerImpl.cc                  | 20 ++++++++++++----
 lib/ExecutorService.cc               |  7 +++++-
 tests/ReaderTest.cc                  | 45 ++++++++++++++++++++++++++++++++++++
 tests/RetryableOperationCacheTest.cc | 13 +++++++++++
 4 files changed, 80 insertions(+), 5 deletions(-)

diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index c3b839a..7cb4821 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -622,6 +622,11 @@ void ConsumerImpl::messageReceived(const 
ClientConnectionPtr& cnx, const proto::
     if (state == Closing || state == Closed) {
         return;
     }
+    if (!listenerExecutor_) {
+        LOG_ERROR(getName() << " listenerExecutor_ is null, discarding message 
to avoid null dereference");
+        increaseAvailablePermits(cnx);
+        return;
+    }
     uint32_t numOfMessageReceived = m.impl_->metadata.num_messages_in_batch();
     if (ackGroupingTrackerPtr_->isDuplicate(m.getMessageId())) {
         LOG_DEBUG(getName() << " Ignoring message as it was ACKed earlier by 
same consumer.");
@@ -663,8 +668,11 @@ void ConsumerImpl::messageReceived(const 
ClientConnectionPtr& cnx, const proto::
             return;
         }
         // Trigger message listener callback in a separate thread
-        while (numOfMessageReceived--) {
-            
listenerExecutor_->postWork(std::bind(&ConsumerImpl::internalListener, 
get_shared_this_ptr()));
+        if (listenerExecutor_) {
+            while (numOfMessageReceived--) {
+                listenerExecutor_->postWork(
+                    std::bind(&ConsumerImpl::internalListener, 
get_shared_this_ptr()));
+            }
         }
     }
 }
@@ -713,8 +721,12 @@ void ConsumerImpl::executeNotifyCallback(Message& msg) {
 
     // has pending receive, direct callback.
     if (asyncReceivedWaiting) {
-        
listenerExecutor_->postWork(std::bind(&ConsumerImpl::notifyPendingReceivedCallback,
-                                              get_shared_this_ptr(), ResultOk, 
msg, callback));
+        if (listenerExecutor_) {
+            
listenerExecutor_->postWork(std::bind(&ConsumerImpl::notifyPendingReceivedCallback,
+                                                  get_shared_this_ptr(), 
ResultOk, msg, callback));
+        } else {
+            notifyPendingReceivedCallback(ResultOk, msg, callback);
+        }
         return;
     }
 
diff --git a/lib/ExecutorService.cc b/lib/ExecutorService.cc
index eba7486..8bd8972 100644
--- a/lib/ExecutorService.cc
+++ b/lib/ExecutorService.cc
@@ -18,6 +18,8 @@
  */
 #include "ExecutorService.h"
 
+#include <algorithm>
+
 #include "LogUtils.h"
 #include "TimeUtils.h"
 DECLARE_LOG_OBJECT()
@@ -128,9 +130,12 @@ void ExecutorService::close(long timeoutMs) {
 /////////////////////
 
 ExecutorServiceProvider::ExecutorServiceProvider(int nthreads)
-    : executors_(nthreads), executorIdx_(0), mutex_() {}
+    : executors_(std::max(1, nthreads)), executorIdx_(0), mutex_() {}
 
 ExecutorServicePtr ExecutorServiceProvider::get(size_t idx) {
+    if (executors_.empty()) {
+        return nullptr;
+    }
     idx %= executors_.size();
     Lock lock(mutex_);
 
diff --git a/tests/ReaderTest.cc b/tests/ReaderTest.cc
index 77719a1..3462b55 100644
--- a/tests/ReaderTest.cc
+++ b/tests/ReaderTest.cc
@@ -18,6 +18,7 @@
  */
 #include <gtest/gtest.h>
 #include <pulsar/Client.h>
+#include <pulsar/ClientConfiguration.h>
 #include <pulsar/Reader.h>
 #include <time.h>
 
@@ -982,5 +983,49 @@ TEST_F(ReaderSeekTest, testSeekInclusiveChunkMessage) {
     assertStartMessageId(false, secondMsgId);
 }
 
+// Regression test for segfault when Reader is used with 
messageListenerThreads=0.
+// Verifies ExecutorServiceProvider(0) does not cause undefined behavior and
+// ConsumerImpl::messageReceived does not dereference null listenerExecutor_.
+TEST(ReaderTest, testReaderWithZeroMessageListenerThreads) {
+    ClientConfiguration clientConf;
+    clientConf.setMessageListenerThreads(0);
+    Client client(serviceUrl, clientConf);
+
+    const std::string topicName = "testReaderWithZeroMessageListenerThreads-" 
+ std::to_string(time(nullptr));
+
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));
+
+    ReaderConfiguration readerConf;
+    Reader reader;
+    ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), 
readerConf, reader));
+
+    constexpr int numMessages = 5;
+    for (int i = 0; i < numMessages; i++) {
+        Message msg = MessageBuilder().setContent("msg-" + 
std::to_string(i)).build();
+        ASSERT_EQ(ResultOk, producer.send(msg));
+    }
+
+    int received = 0;
+    for (int i = 0; i < numMessages + 2; i++) {
+        bool hasMessageAvailable = false;
+        ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
+        if (!hasMessageAvailable) {
+            break;
+        }
+        Message msg;
+        Result res = reader.readNext(msg, 3000);
+        ASSERT_EQ(ResultOk, res) << "readNext failed at iteration " << i;
+        std::string content = msg.getDataAsString();
+        EXPECT_EQ("msg-" + std::to_string(received), content);
+        ++received;
+    }
+    EXPECT_EQ(received, numMessages);
+
+    producer.close();
+    reader.close();
+    client.close();
+}
+
 INSTANTIATE_TEST_SUITE_P(Pulsar, ReaderTest, ::testing::Values(true, false));
 INSTANTIATE_TEST_SUITE_P(Pulsar, ReaderSeekTest, ::testing::Values(true, 
false));
diff --git a/tests/RetryableOperationCacheTest.cc 
b/tests/RetryableOperationCacheTest.cc
index c9b8a1d..2daaf3f 100644
--- a/tests/RetryableOperationCacheTest.cc
+++ b/tests/RetryableOperationCacheTest.cc
@@ -22,6 +22,7 @@
 #include <chrono>
 #include <stdexcept>
 
+#include "lib/ExecutorService.h"
 #include "lib/RetryableOperationCache.h"
 
 namespace pulsar {
@@ -82,6 +83,18 @@ class RetryableOperationCacheTest : public ::testing::Test {
 
 using namespace pulsar;
 
+// Regression test: ExecutorServiceProvider(0) must not cause undefined 
behavior (e.g. idx % 0).
+// After fix, nthreads is clamped to at least 1, so get() returns a valid 
executor.
+TEST(ExecutorServiceProviderTest, ZeroThreadsReturnsValidExecutor) {
+    ExecutorServiceProviderPtr provider = 
std::make_shared<ExecutorServiceProvider>(0);
+    for (int i = 0; i < 3; i++) {
+        ExecutorServicePtr executor = provider->get();
+        ASSERT_NE(executor, nullptr)
+            << "get() must not return null when created with 0 threads 
(clamped to 1)";
+    }
+    provider->close();
+}
+
 TEST_F(RetryableOperationCacheTest, testRetry) {
     auto cache = RetryableOperationCache<int>::create(provider_, 
std::chrono::seconds(30));
     for (int i = 0; i < 10; i++) {

Reply via email to