zhanglistar opened a new pull request, #553:
URL: https://github.com/apache/pulsar-client-cpp/pull/553
<!--
### Contribution Checklist
- PR title format should be *[type][component] summary*. For details, see
*[Guideline - Pulsar PR Naming
Convention](https://docs.google.com/document/d/1d8Pw6ZbWk-_pCKdOmdvx9rnhPiyuxwq60_TrD68d7BA/edit#heading=h.trs9rsex3xom)*.
- Fill out the template below to describe the changes contributed by the
pull request. That will give reviewers the context they need to do the review.
- Each pull request should address only one issue, not mix up code from
multiple issues.
- Each commit in the pull request has a meaningful commit message
- Once all items of the checklist are addressed, remove the above text and
this checklist, leaving only the filled out template below.
-->
Fixes #<xyz>
<!-- If there is an existing GitHub issue for "Reader segfault in
messageReceived with messageListenerThreads=0", put the issue number above;
otherwise remove the Fixes line or leave as Fixes #<xyz> -->
### Motivation
When using a Reader with `ClientConfiguration::setMessageListenerThreads(0)`
(or when the listener executor is otherwise null), the application can crash
with a segmentation fault in `ConsumerImpl::messageReceived()`. Two root causes
were identified:
1. **ExecutorServiceProvider(0)** – When `getMessageListenerThreads()` is 0,
the provider is constructed with an empty executor list. In `get()`, the code
does `idx %= executors_.size()`, which is **undefined behavior** when `size()
== 0` (division by zero). This can lead to wrong or null executor pointers.
2. **Null listenerExecutor_** – The Reader creates its internal consumer
with a null `ExecutorServicePtr`. The consumer then falls back to
`client->getListenerExecutorProvider()->get()`. If that returns null (e.g. due
to the above), `messageReceived` and `executeNotifyCallback` call
`listenerExecutor_->postWork(...)` without a null check, causing a null pointer
dereference (SIGSEGV at address 0x0).
This was observed in the wild with Reader usage (e.g. periodic
`hasMessageAvailable()` and `readNext()`) under moderate to high load or long
runtime, with stack traces pointing at `pulsar::ConsumerImpl::messageReceived`.
### Modifications
- **lib/ExecutorService.cc**
- Construct with `executors_(std::max(1, nthreads))` so the executor list
is never empty when `nthreads == 0`, avoiding undefined behavior in `get()`.
- In `get()`, add an early `if (executors_.empty()) return nullptr;` guard
before using `executors_.size()`.
- Add `#include <algorithm>` for `std::max`.
- **lib/ConsumerImpl.cc**
- In `messageReceived()`, after the closing-state check, add a check for
null `listenerExecutor_`: log an error, call `increaseAvailablePermits(cnx)`,
and return to avoid dereferencing null.
- In the `messageListener_` block, only call
`listenerExecutor_->postWork(...)` when `listenerExecutor_` is non-null.
- In `executeNotifyCallback()`, when notifying a pending async receive: if
`listenerExecutor_` is non-null, post the callback as before; otherwise call
`notifyPendingReceivedCallback(...)` on the current thread so the callback is
still invoked and no null dereference occurs.
### Verifying this change
- [ ] Make sure that the change passes the CI checks.
This change added tests and can be verified as follows:
- **ExecutorServiceProviderTest.ZeroThreadsReturnsValidExecutor** (unit
test, no broker): Creates `ExecutorServiceProvider(0)`, calls `get()` three
times, and asserts each return is non-null. Confirms that 0-thread
configuration no longer leads to undefined behavior and that a valid executor
is always returned.
- **ReaderTest.testReaderWithZeroMessageListenerThreads** (integration test,
requires Pulsar broker): Creates a `Client` with
`setMessageListenerThreads(0)`, creates a Reader, produces several messages,
then in a loop calls `hasMessageAvailable()` and `readNext()` until all
messages are consumed. Asserts the correct count and content. Verifies that
Reader path with 0 listener threads does not segfault and delivers messages
correctly.
Run the unit test (no broker):
```bash
./tests/pulsar-tests
--gtest_filter='ExecutorServiceProviderTest.ZeroThreadsReturnsValidExecutor'
```
Run the Reader integration test (with broker at `pulsar://localhost:6650`):
```bash
./tests/pulsar-tests
--gtest_filter='ReaderTest.testReaderWithZeroMessageListenerThreads'
```
### Documentation
- [ ] `doc-required`
- [x] `doc-not-needed`
(Behavior change is internal and backward-compatible: 0 listener threads
is now handled safely; no API or user-facing doc update required.)
- [ ] `doc`
- [ ] `doc-complete`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]