philipnee commented on code in PR #14680:
URL: https://github.com/apache/kafka/pull/14680#discussion_r1388398462
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##########
@@ -147,6 +151,10 @@ public class PrototypeAsyncConsumer<K, V> implements
Consumer<K, V> {
// to keep from repeatedly scanning subscriptions in poll(), cache the
result during metadata updates
private boolean cachedSubscriptionHasAllFetchPositions;
private final WakeupTrigger wakeupTrigger = new WakeupTrigger();
+ private volatile boolean isFenced = false;
Review Comment:
Thanks, we don't need this to be volatile. In terms of exception throwing,
I think once the instance is fenced, the we want to keep throwing if the user
tries to commit or poll, because the expectation is that the user should close
and reconfigure the instance. See the original implementation
```
void invokeCompletedOffsetCommitCallbacks() {
if (asyncCommitFenced.get()) {
throw new FencedInstanceIdException("Get fenced exception for
group.instance.id "
+ rebalanceConfig.groupInstanceId.orElse("unset_instance_id")
+ ", current member.id is " + memberId());
}
...
}
```
Here asyncCommitFenced is set during the callback failure in the original
implementation.
```
if (commitException instanceof FencedInstanceIdException) {
asyncCommitFenced.set(true);
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]