zhanglistar opened a new pull request, #549:
URL: https://github.com/apache/pulsar-client-cpp/pull/549

   <!--
   ### Contribution Checklist
     
     - PR title format should be *[type][component] summary*. For details, see 
*[Guideline - Pulsar PR Naming 
Convention](https://docs.google.com/document/d/1d8Pw6ZbWk-_pCKdOmdvx9rnhPiyuxwq60_TrD68d7BA/edit#heading=h.trs9rsex3xom)*.
 
   
     - Fill out the template below to describe the changes contributed by the 
pull request. That will give reviewers the context they need to do the review.
     
     - Each pull request should address only one issue, not mix up code from 
multiple issues.
     
     - Each commit in the pull request has a meaningful commit message
   
     - Once all items of the checklist are addressed, remove the above text and 
this checklist, leaving only the filled out template below.
   -->
   
   Fixes #<xyz>
   
   ### Motivation
   
   When a reader seeks to `MessageId::latest()`, the broker may close the 
consumer and trigger a reconnect. In `seekAsyncInternal`, when the seek 
succeeds but the connection is already expired or `reconnectionPending_` is 
true, we only set `seekStatus_ = COMPLETED` and did not clear the local receive 
queue or update `startMessageId_`. As a result, `incomingMessages_` still held 
prefetched messages from before the seek until `connectionOpened()` later 
called `clearReceiveQueue()`. During that window, `hasMessageAvailable()` saw 
`!incomingMessages_.empty()` and returned true, causing the flaky test 
`ReaderSeekTest.testHasMessageAvailableAfterSeekToEnd`.
   
   ### Modifications
   
   - In `ConsumerImpl::seekAsyncInternal`, when seek result is OK and 
`getCnx().expired() || reconnectionPending_` (reconnection path), we now 
perform the same local state updates as the non-reconnection path:
     - `ackGroupingTrackerPtr_->flushAndClean()`
     - `incomingMessages_.clear()`
     - If `lastSeekArg_` holds a `MessageId`, set `startMessageId_` from it
   
   So after the seek is acknowledged, stale prefetched messages are cleared 
immediately and `hasMessageAvailable()` no longer returns true from old queue 
content before reconnect completes.
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.
   
   This change is already covered by existing tests:
   
   - `ReaderSeekTest.testHasMessageAvailableAfterSeekToEnd` (parameterized) – 
asserts that after seek to latest, `hasMessageAvailable()` becomes false, then 
after producing a new message it becomes true and the message can be read; the 
fix removes the flakiness when the broker triggers reconnect on seek-to-end.
   
   ### Documentation
   
   - [x] `doc-not-needed`
   
   Bug fix in C++ client implementation only; no API or behavior contract 
change and no user-facing docs update required.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to