zhanglistar commented on code in PR #549:
URL: https://github.com/apache/pulsar-client-cpp/pull/549#discussion_r2943871818
##########
lib/ConsumerImpl.cc:
##########
@@ -1819,7 +1819,13 @@ void ConsumerImpl::seekAsyncInternal(long requestId,
const SharedBuffer& seek, c
if (result == ResultOk) {
LockGuard lock(mutex_);
if (getCnx().expired() || reconnectionPending_) {
- // It's during reconnection, complete the seek future
after connection is established
+ // It's during reconnection, complete the seek future
after connection is established.
+ // Clear local state now so hasMessageAvailable() does not
see stale prefetched messages.
+ ackGroupingTrackerPtr_->flushAndClean();
+ incomingMessages_.clear();
+ if (lastSeekArg_.has_value() &&
std::holds_alternative<MessageId>(lastSeekArg_.value())) {
+ startMessageId_ =
std::get<MessageId>(lastSeekArg_.value());
+ }
Review Comment:
Moved the clear in handleCreateConsumer() inside the lock (before unlock()),
so the queue is cleared while holding mutex_. That way the seek callback and
hasMessageAvailable() always see the cleared state and the flaky test is fixed.
No lock-order change, so no new deadlock risk; the only effect is a bit longer
lock hold during reconnection.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]