shibd commented on code in PR #551:
URL: https://github.com/apache/pulsar-client-cpp/pull/551#discussion_r2915471635


##########
lib/ClientConnection.h:
##########
@@ -298,25 +301,41 @@ class PULSAR_PUBLIC ClientConnection : public 
std::enable_shared_from_this<Clien
 
     template <typename ConstBufferSequence, typename WriteHandler>
     inline void asyncWrite(const ConstBufferSequence& buffers, WriteHandler 
handler) {
+        Lock lock{mutex_};
         if (isClosed()) {
             return;
         }
+        lock.unlock();
+        pendingOperations_++;

Review Comment:
   **TOCTOU race: `pendingOperations_++` must be moved before `lock.unlock()`**
   
   There is a narrow but real window between `lock.unlock()` (line 308) and 
`pendingOperations_++` (line 309) where a concurrent `close()` call can observe 
a zero counter and proceed to destroy the `io_context`, while this thread is 
about to start an async operation on it.
   
   **Sequence of events exposing the window:**
   
   ```
   Thread A (event loop)               Thread B (caller of client.close())
   ─────────────────────────────────   
───────────────────────────────────────────
   [1] Lock lock{mutex_}
   [2] isClosed() → false  (OK)
   [3] lock.unlock()                ← window opens here
                                       [4] close() → state_ = Disconnected
                                       [5] ConnectionPool::close() reads
                                           pendingOperations() == 0
                                       [6] io_context destroyed ✓
   [7] pendingOperations_++
   [8] async_write(*socket_, ...)   ← starts op on destroyed io_context
   [9] callback fires into           ← use-after-free / crash
       freed memory
   ```
   
   This is exactly the crash scenario this PR is fixing. The fix is to move 
`pendingOperations_++` **before** `lock.unlock()`, so the counter is always 
incremented while the mutex is still held:
   
   ```cpp
   Lock lock{mutex_};
   if (isClosed()) { return; }
   pendingOperations_++;   // ← increment under lock
   lock.unlock();
   // ... start async op
   ```
   
   With this ordering, Thread B can only observe one of two states:
   - Counter already incremented → waits for it to drain before destroying 
io_context.
   - `close()` ran before the increment → `isClosed()` returns `true` and the 
operation is skipped entirely, so the counter is never touched.
   
   The same issue exists in `asyncReceive` at lines 328–329 and should be fixed 
identically.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to