lianetm commented on code in PR #17440: URL: https://github.com/apache/kafka/pull/17440#discussion_r1831724350
########## clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java: ########## @@ -111,7 +111,7 @@ public class ConsumerConfig extends AbstractConfig { * <code>group.protocol</code> */ public static final String GROUP_PROTOCOL_CONFIG = "group.protocol"; - public static final String DEFAULT_GROUP_PROTOCOL = GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT); + public static final String DEFAULT_GROUP_PROTOCOL = GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT); Review Comment: we should leave this change of the default consumer in a separate PR, there is already one for it https://github.com/apache/kafka/pull/17107 ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -1574,7 +1583,11 @@ private boolean updateFetchPositions(final Timer timer) { try { CheckAndUpdatePositionsEvent checkAndUpdatePositionsEvent = new CheckAndUpdatePositionsEvent(calculateDeadlineMs(timer)); wakeupTrigger.setActiveTask(checkAndUpdatePositionsEvent.future()); - cachedSubscriptionHasAllFetchPositions = applicationEventHandler.addAndGet(checkAndUpdatePositionsEvent); + applicationEventHandler.add(checkAndUpdatePositionsEvent); + cachedSubscriptionHasAllFetchPositions = processBackgroundEvents( Review Comment: Hey @m1a2st , thanks for the reply. The trick here is that this `processBackgroundEvents` is kind of a generic communication channel between the app thread and background thread, that was designed as a way to communicate from background to app thread when there was no direct triggering api call: 1. heartbeat/coordinator errors received in a response to requests that are triggered internally in the background (HB and FindCoordinator), not coming from app thread events. 2. callbacks needed as part of a reconciliation triggered by the broker (not app thread event) Using this generic channel for when we do have an event that started in the app thread seems to bring in complexity and loose ends: 1. Complexity because api calls would need to sort through the generic queue/channel to find the response to the specific event they are interested in, processing all background events (callbacks, errors that have no event, errors that have event), and filter out / re-enqueue... 2. We would leave events lingering uncompleted in the background, that will never complete really (eventually expired I expect). On the other hand, we have a “direct” way of communicating the 2 threads: via events that get completed. So if we hand an app event handy, seems sensible to just complete it with metadata errors if they happen (just like the classic achieves, given that on every poll it will throw and bubble up the metadata exception, effectively failing the triggering call). Ex. `consumer.position` (the one that started this PR), sends a CheckAndUpdatePositions event to the background, so I was expecting that we could simply complete it with the TopicAuthorizationException as a way to fail the consumer call. Those are the concepts, but I totally hear you about the challenges on the implementation. Could you elaborate more on that to see if we can sort them out together maybe? This is a more detailed description of what I have in mind: 1. Catch metadata errors on `ConsumerNetworkThread.runOnce` and completeExceptionally all the requests it has (requests returned by the managers poll here https://github.com/apache/kafka/blob/a0d4cbec402c6c09e601c76a7332747e80e518ca/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java#L145) This would propagate the failure to the app thread via the event. 2. Notify managers of the metadata error too (ie. onMetadataERror), in case they have requests generated that did not make it to point 1, but still need to fail (ie. Commit request manager may have unsentOffsetCommitRequests/unsentOffsetFetchRequests. I don't think any other keeps requests without sending them to the network client but let's double check) What do you think? Would it work? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org