lucasbru commented on code in PR #14879: URL: https://github.com/apache/kafka/pull/14879#discussion_r1413890315
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ########## @@ -232,6 +233,12 @@ private void onResponse(final ConsumerGroupHeartbeatResponse response, long curr this.heartbeatRequestState.onSuccessfulAttempt(currentTimeMs); this.heartbeatRequestState.resetTimer(); this.membershipManager.onHeartbeatResponseReceived(response.data()); + this.backgroundEventHandler.add(new GroupMetadataUpdateEvent( Review Comment: Could we only send this if the member epoch or member ID changes? I think it's somewhat inelegant that we enqueue this for every single heartbeat. For example, we had the default max.poll.interval.ms = 300 seconds between all polls and the default heartbeat.interval= 3 seconds for each heartbeat, means we have 100 metadata update events to go through in each poll. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -131,6 +133,77 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> { private static final long NO_CURRENT_THREAD = -1L; + /** + * An {@link org.apache.kafka.clients.consumer.internals.events.EventProcessor} that is created and executes in the + * application thread for the purpose of processing {@link BackgroundEvent background events} generated by the + * {@link ConsumerNetworkThread network thread}. + * Those events are generally of two types: + * + * <ul> + * <li>Errors that occur in the network thread that need to be propagated to the application thread</li> + * <li>{@link ConsumerRebalanceListener} callbacks that are to be executed on the application thread</li> + * </ul> + */ + public class BackgroundEventProcessor extends EventProcessor<BackgroundEvent> { + + public BackgroundEventProcessor(final LogContext logContext, + final BlockingQueue<BackgroundEvent> backgroundEventQueue) { + super(logContext, backgroundEventQueue); + } + + /** + * Process the events—if any—that were produced by the {@link ConsumerNetworkThread network thread}. + * It is possible that {@link org.apache.kafka.clients.consumer.internals.events.ErrorBackgroundEvent an error} + * could occur when processing the events. In such cases, the processor will take a reference to the first + * error, continue to process the remaining events, and then throw the first error that occurred. + */ + @Override + public void process() { + AtomicReference<RuntimeException> firstError = new AtomicReference<>(); + process((event, error) -> firstError.compareAndSet(null, error)); + + if (firstError.get() != null) { + throw firstError.get(); + } + } + + @Override + public void process(final BackgroundEvent event) { + switch (event.type()) { + case ERROR: + process((ErrorBackgroundEvent) event); + break; + case GROUP_METADATA_UPDATE: + process((GroupMetadataUpdateEvent) event); + break; + default: + throw new IllegalArgumentException("Background event type " + event.type() + " was not expected"); + + } + } + + @Override + protected Class<BackgroundEvent> getEventClass() { + return BackgroundEvent.class; + } + + private void process(final ErrorBackgroundEvent event) { + throw event.error(); + } + + private void process(final GroupMetadataUpdateEvent event) { + if (AsyncKafkaConsumer.this.groupMetadata.isPresent()) { + final ConsumerGroupMetadata currentGroupMetadata = AsyncKafkaConsumer.this.groupMetadata.get(); + AsyncKafkaConsumer.this.groupMetadata = Optional.of(new ConsumerGroupMetadata( + event.groupId(), + event.memberEpoch(), + event.memberId() != null ? event.memberId() : currentGroupMetadata.memberId(), + event.groupInstanceId() Review Comment: Same as above, we read it from the config, does it ever change? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -131,6 +133,77 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> { private static final long NO_CURRENT_THREAD = -1L; + /** + * An {@link org.apache.kafka.clients.consumer.internals.events.EventProcessor} that is created and executes in the + * application thread for the purpose of processing {@link BackgroundEvent background events} generated by the + * {@link ConsumerNetworkThread network thread}. + * Those events are generally of two types: + * + * <ul> + * <li>Errors that occur in the network thread that need to be propagated to the application thread</li> + * <li>{@link ConsumerRebalanceListener} callbacks that are to be executed on the application thread</li> + * </ul> + */ + public class BackgroundEventProcessor extends EventProcessor<BackgroundEvent> { + + public BackgroundEventProcessor(final LogContext logContext, + final BlockingQueue<BackgroundEvent> backgroundEventQueue) { + super(logContext, backgroundEventQueue); + } + + /** + * Process the events—if any—that were produced by the {@link ConsumerNetworkThread network thread}. + * It is possible that {@link org.apache.kafka.clients.consumer.internals.events.ErrorBackgroundEvent an error} + * could occur when processing the events. In such cases, the processor will take a reference to the first + * error, continue to process the remaining events, and then throw the first error that occurred. + */ + @Override + public void process() { + AtomicReference<RuntimeException> firstError = new AtomicReference<>(); + process((event, error) -> firstError.compareAndSet(null, error)); + + if (firstError.get() != null) { + throw firstError.get(); + } + } + + @Override + public void process(final BackgroundEvent event) { + switch (event.type()) { + case ERROR: + process((ErrorBackgroundEvent) event); + break; + case GROUP_METADATA_UPDATE: + process((GroupMetadataUpdateEvent) event); + break; + default: + throw new IllegalArgumentException("Background event type " + event.type() + " was not expected"); + + } + } + + @Override + protected Class<BackgroundEvent> getEventClass() { + return BackgroundEvent.class; + } + + private void process(final ErrorBackgroundEvent event) { + throw event.error(); + } + + private void process(final GroupMetadataUpdateEvent event) { + if (AsyncKafkaConsumer.this.groupMetadata.isPresent()) { + final ConsumerGroupMetadata currentGroupMetadata = AsyncKafkaConsumer.this.groupMetadata.get(); + AsyncKafkaConsumer.this.groupMetadata = Optional.of(new ConsumerGroupMetadata( + event.groupId(), + event.memberEpoch(), + event.memberId() != null ? event.memberId() : currentGroupMetadata.memberId(), Review Comment: I'd slightly prefer this logic to be resolved on the background thread and make CurrentGroupMetadataEvent.memberId always non-null ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -131,6 +133,77 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> { private static final long NO_CURRENT_THREAD = -1L; + /** + * An {@link org.apache.kafka.clients.consumer.internals.events.EventProcessor} that is created and executes in the + * application thread for the purpose of processing {@link BackgroundEvent background events} generated by the + * {@link ConsumerNetworkThread network thread}. + * Those events are generally of two types: + * + * <ul> + * <li>Errors that occur in the network thread that need to be propagated to the application thread</li> + * <li>{@link ConsumerRebalanceListener} callbacks that are to be executed on the application thread</li> + * </ul> + */ + public class BackgroundEventProcessor extends EventProcessor<BackgroundEvent> { + + public BackgroundEventProcessor(final LogContext logContext, + final BlockingQueue<BackgroundEvent> backgroundEventQueue) { + super(logContext, backgroundEventQueue); + } + + /** + * Process the events—if any—that were produced by the {@link ConsumerNetworkThread network thread}. + * It is possible that {@link org.apache.kafka.clients.consumer.internals.events.ErrorBackgroundEvent an error} + * could occur when processing the events. In such cases, the processor will take a reference to the first + * error, continue to process the remaining events, and then throw the first error that occurred. + */ + @Override + public void process() { + AtomicReference<RuntimeException> firstError = new AtomicReference<>(); + process((event, error) -> firstError.compareAndSet(null, error)); + + if (firstError.get() != null) { + throw firstError.get(); + } + } + + @Override + public void process(final BackgroundEvent event) { + switch (event.type()) { + case ERROR: + process((ErrorBackgroundEvent) event); + break; + case GROUP_METADATA_UPDATE: + process((GroupMetadataUpdateEvent) event); + break; + default: + throw new IllegalArgumentException("Background event type " + event.type() + " was not expected"); + + } + } + + @Override + protected Class<BackgroundEvent> getEventClass() { + return BackgroundEvent.class; + } + + private void process(final ErrorBackgroundEvent event) { + throw event.error(); + } + + private void process(final GroupMetadataUpdateEvent event) { + if (AsyncKafkaConsumer.this.groupMetadata.isPresent()) { + final ConsumerGroupMetadata currentGroupMetadata = AsyncKafkaConsumer.this.groupMetadata.get(); + AsyncKafkaConsumer.this.groupMetadata = Optional.of(new ConsumerGroupMetadata( + event.groupId(), Review Comment: So `groupId` in `this.groupMetadata` comes from `GroupRebalanceConfig.groupId` the `config.getString(CommonClientConfigs.GROUP_ID_CONFIG)`. and `groupId` in `event.groupId` comes from `MembershipManagerImpl.groupId` which comes from `GroupState.groupId` which comes from `GroupRebalanceConfig.groupId`. So, aren't you always overwriting the group Id with itself here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org