zhanglistar opened a new pull request, #553:
URL: https://github.com/apache/pulsar-client-cpp/pull/553

   <!--
   ### Contribution Checklist
   
     - PR title format should be *[type][component] summary*. For details, see 
*[Guideline - Pulsar PR Naming 
Convention](https://docs.google.com/document/d/1d8Pw6ZbWk-_pCKdOmdvx9rnhPiyuxwq60_TrD68d7BA/edit#heading=h.trs9rsex3xom)*.
   
     - Fill out the template below to describe the changes contributed by the 
pull request. That will give reviewers the context they need to do the review.
   
     - Each pull request should address only one issue, not mix up code from 
multiple issues.
   
     - Each commit in the pull request has a meaningful commit message
   
     - Once all items of the checklist are addressed, remove the above text and 
this checklist, leaving only the filled out template below.
   -->
   
   Fixes #<xyz>
   
   <!-- If there is an existing GitHub issue for "Reader segfault in 
messageReceived with messageListenerThreads=0", put the issue number above; 
otherwise remove the Fixes line or leave as Fixes #<xyz> -->
   
   ### Motivation
   
   When using a Reader with `ClientConfiguration::setMessageListenerThreads(0)` 
(or when the listener executor is otherwise null), the application can crash 
with a segmentation fault in `ConsumerImpl::messageReceived()`. Two root causes 
were identified:
   
   1. **ExecutorServiceProvider(0)** – When `getMessageListenerThreads()` is 0, 
the provider is constructed with an empty executor list. In `get()`, the code 
does `idx %= executors_.size()`, which is **undefined behavior** when `size() 
== 0` (division by zero). This can lead to wrong or null executor pointers.
   
   2. **Null listenerExecutor_** – The Reader creates its internal consumer 
with a null `ExecutorServicePtr`. The consumer then falls back to 
`client->getListenerExecutorProvider()->get()`. If that returns null (e.g. due 
to the above), `messageReceived` and `executeNotifyCallback` call 
`listenerExecutor_->postWork(...)` without a null check, causing a null pointer 
dereference (SIGSEGV at address 0x0).
   
   This was observed in the wild with Reader usage (e.g. periodic 
`hasMessageAvailable()` and `readNext()`) under moderate to high load or long 
runtime, with stack traces pointing at `pulsar::ConsumerImpl::messageReceived`.
   
   ### Modifications
   
   - **lib/ExecutorService.cc**
     - Construct with `executors_(std::max(1, nthreads))` so the executor list 
is never empty when `nthreads == 0`, avoiding undefined behavior in `get()`.
     - In `get()`, add an early `if (executors_.empty()) return nullptr;` guard 
before using `executors_.size()`.
     - Add `#include <algorithm>` for `std::max`.
   
   - **lib/ConsumerImpl.cc**
     - In `messageReceived()`, after the closing-state check, add a check for 
null `listenerExecutor_`: log an error, call `increaseAvailablePermits(cnx)`, 
and return to avoid dereferencing null.
     - In the `messageListener_` block, only call 
`listenerExecutor_->postWork(...)` when `listenerExecutor_` is non-null.
     - In `executeNotifyCallback()`, when notifying a pending async receive: if 
`listenerExecutor_` is non-null, post the callback as before; otherwise call 
`notifyPendingReceivedCallback(...)` on the current thread so the callback is 
still invoked and no null dereference occurs.
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.
   
   This change added tests and can be verified as follows:
   
   - **ExecutorServiceProviderTest.ZeroThreadsReturnsValidExecutor** (unit 
test, no broker): Creates `ExecutorServiceProvider(0)`, calls `get()` three 
times, and asserts each return is non-null. Confirms that 0-thread 
configuration no longer leads to undefined behavior and that a valid executor 
is always returned.
   - **ReaderTest.testReaderWithZeroMessageListenerThreads** (integration test, 
requires Pulsar broker): Creates a `Client` with 
`setMessageListenerThreads(0)`, creates a Reader, produces several messages, 
then in a loop calls `hasMessageAvailable()` and `readNext()` until all 
messages are consumed. Asserts the correct count and content. Verifies that 
Reader path with 0 listener threads does not segfault and delivers messages 
correctly.
   
   Run the unit test (no broker):
   
   ```bash
   ./tests/pulsar-tests 
--gtest_filter='ExecutorServiceProviderTest.ZeroThreadsReturnsValidExecutor'
   ```
   
   Run the Reader integration test (with broker at `pulsar://localhost:6650`):
   
   ```bash
   ./tests/pulsar-tests 
--gtest_filter='ReaderTest.testReaderWithZeroMessageListenerThreads'
   ```
   
   ### Documentation
   
   - [ ] `doc-required`
   
   - [x] `doc-not-needed`  
     (Behavior change is internal and backward-compatible: 0 listener threads 
is now handled safely; no API or user-facing doc update required.)
   
   - [ ] `doc`
   
   - [ ] `doc-complete`
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to