lianetm commented on code in PR #17440:
URL: https://github.com/apache/kafka/pull/17440#discussion_r1831724350


##########
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java:
##########
@@ -111,7 +111,7 @@ public class ConsumerConfig extends AbstractConfig {
      * <code>group.protocol</code>
      */
     public static final String GROUP_PROTOCOL_CONFIG = "group.protocol";
-    public static final String DEFAULT_GROUP_PROTOCOL = 
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT);
+    public static final String DEFAULT_GROUP_PROTOCOL = 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT);

Review Comment:
   we should leave this change of the default consumer in a separate PR, there 
is already one for it
   https://github.com/apache/kafka/pull/17107



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1574,7 +1583,11 @@ private boolean updateFetchPositions(final Timer timer) {
         try {
             CheckAndUpdatePositionsEvent checkAndUpdatePositionsEvent = new 
CheckAndUpdatePositionsEvent(calculateDeadlineMs(timer));
             wakeupTrigger.setActiveTask(checkAndUpdatePositionsEvent.future());
-            cachedSubscriptionHasAllFetchPositions = 
applicationEventHandler.addAndGet(checkAndUpdatePositionsEvent);
+            applicationEventHandler.add(checkAndUpdatePositionsEvent);
+            cachedSubscriptionHasAllFetchPositions = processBackgroundEvents(

Review Comment:
   Hey @m1a2st , thanks for the reply. The trick here is that this 
`processBackgroundEvents` is kind of a generic communication channel between 
the app thread and background thread, that was designed as a way to communicate 
from background to app thread when there was no direct triggering api call:
   
   1. heartbeat/coordinator errors received in a response to requests that are 
triggered internally in the background (HB and FindCoordinator), not coming 
from app thread events.
   2. callbacks needed as part of a reconciliation triggered by the broker (not 
app thread event)
   
   Using this generic channel for when we do have an event that started in the 
app thread seems to bring in complexity and loose ends:
   
   1. Complexity because api calls would need to sort through the generic 
queue/channel to find the response to the specific event they are interested 
in, processing all background events (callbacks, errors that have no event, 
errors that have event), and filter out / re-enqueue...
   2. We would leave events lingering uncompleted in the background, that will 
never complete really (eventually expired I expect).
   
   On the other hand, we have a “direct” way of communicating  the 2 threads: 
via events that get completed. So if we hand an app event handy, seems sensible 
to just complete it with metadata errors if they happen (just like the classic 
achieves, given that on every poll it will throw and bubble up the metadata 
exception, effectively failing the triggering call). Ex. `consumer.position` 
(the one that started this PR), sends a CheckAndUpdatePositions event to the 
background, so I was expecting that we could simply complete it with the 
TopicAuthorizationException as a way to fail the consumer call.  
   
   Those are the concepts, but I totally hear you about the challenges on the 
implementation. Could you elaborate more on that to see if we can sort them out 
together maybe? This is a more detailed description of what I have in mind:
   
   1. Catch metadata errors on `ConsumerNetworkThread.runOnce` and 
completeExceptionally all the requests it has (requests returned by the 
managers poll here
   
https://github.com/apache/kafka/blob/a0d4cbec402c6c09e601c76a7332747e80e518ca/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java#L145)
   This would propagate the failure to the app thread via the event.
   2. Notify managers of the metadata error too (ie. onMetadataERror), in case 
they have requests generated that did not make it to point 1, but still need to 
fail (ie. Commit request manager may have 
unsentOffsetCommitRequests/unsentOffsetFetchRequests. I don't think any other 
keeps requests without sending them to the network client but let's double 
check)
   
   What do you think? Would it work? Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to