lucasbru commented on code in PR #14879:
URL: https://github.com/apache/kafka/pull/14879#discussion_r1413890315


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##########
@@ -232,6 +233,12 @@ private void onResponse(final 
ConsumerGroupHeartbeatResponse response, long curr
             this.heartbeatRequestState.onSuccessfulAttempt(currentTimeMs);
             this.heartbeatRequestState.resetTimer();
             
this.membershipManager.onHeartbeatResponseReceived(response.data());
+            this.backgroundEventHandler.add(new GroupMetadataUpdateEvent(

Review Comment:
   Could we only send this if the member epoch or member ID changes? I think 
it's somewhat inelegant that we enqueue this for every single heartbeat. For 
example, we had the default max.poll.interval.ms = 300 seconds between all 
polls and the default heartbeat.interval= 3 seconds for each heartbeat, means 
we have 100 metadata update events to go through in each poll.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -131,6 +133,77 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
 
     private static final long NO_CURRENT_THREAD = -1L;
 
+    /**
+     * An {@link 
org.apache.kafka.clients.consumer.internals.events.EventProcessor} that is 
created and executes in the
+     * application thread for the purpose of processing {@link BackgroundEvent 
background events} generated by the
+     * {@link ConsumerNetworkThread network thread}.
+     * Those events are generally of two types:
+     *
+     * <ul>
+     *     <li>Errors that occur in the network thread that need to be 
propagated to the application thread</li>
+     *     <li>{@link ConsumerRebalanceListener} callbacks that are to be 
executed on the application thread</li>
+     * </ul>
+     */
+    public class BackgroundEventProcessor extends 
EventProcessor<BackgroundEvent> {
+
+        public BackgroundEventProcessor(final LogContext logContext,
+                                        final BlockingQueue<BackgroundEvent> 
backgroundEventQueue) {
+            super(logContext, backgroundEventQueue);
+        }
+
+        /**
+         * Process the events—if any—that were produced by the {@link 
ConsumerNetworkThread network thread}.
+         * It is possible that {@link 
org.apache.kafka.clients.consumer.internals.events.ErrorBackgroundEvent an 
error}
+         * could occur when processing the events. In such cases, the 
processor will take a reference to the first
+         * error, continue to process the remaining events, and then throw the 
first error that occurred.
+         */
+        @Override
+        public void process() {
+            AtomicReference<RuntimeException> firstError = new 
AtomicReference<>();
+            process((event, error) -> firstError.compareAndSet(null, error));
+
+            if (firstError.get() != null) {
+                throw firstError.get();
+            }
+        }
+
+        @Override
+        public void process(final BackgroundEvent event) {
+            switch (event.type()) {
+                case ERROR:
+                    process((ErrorBackgroundEvent) event);
+                    break;
+                case GROUP_METADATA_UPDATE:
+                    process((GroupMetadataUpdateEvent) event);
+                    break;
+                default:
+                    throw new IllegalArgumentException("Background event type 
" + event.type() + " was not expected");
+
+            }
+        }
+
+        @Override
+        protected Class<BackgroundEvent> getEventClass() {
+            return BackgroundEvent.class;
+        }
+
+        private void process(final ErrorBackgroundEvent event) {
+            throw event.error();
+        }
+
+        private void process(final GroupMetadataUpdateEvent event) {
+            if (AsyncKafkaConsumer.this.groupMetadata.isPresent()) {
+                final ConsumerGroupMetadata currentGroupMetadata = 
AsyncKafkaConsumer.this.groupMetadata.get();
+                AsyncKafkaConsumer.this.groupMetadata = Optional.of(new 
ConsumerGroupMetadata(
+                    event.groupId(),
+                    event.memberEpoch(),
+                    event.memberId() != null ? event.memberId() : 
currentGroupMetadata.memberId(),
+                    event.groupInstanceId()

Review Comment:
   Same as above, we read it from the config, does it ever change?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -131,6 +133,77 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
 
     private static final long NO_CURRENT_THREAD = -1L;
 
+    /**
+     * An {@link 
org.apache.kafka.clients.consumer.internals.events.EventProcessor} that is 
created and executes in the
+     * application thread for the purpose of processing {@link BackgroundEvent 
background events} generated by the
+     * {@link ConsumerNetworkThread network thread}.
+     * Those events are generally of two types:
+     *
+     * <ul>
+     *     <li>Errors that occur in the network thread that need to be 
propagated to the application thread</li>
+     *     <li>{@link ConsumerRebalanceListener} callbacks that are to be 
executed on the application thread</li>
+     * </ul>
+     */
+    public class BackgroundEventProcessor extends 
EventProcessor<BackgroundEvent> {
+
+        public BackgroundEventProcessor(final LogContext logContext,
+                                        final BlockingQueue<BackgroundEvent> 
backgroundEventQueue) {
+            super(logContext, backgroundEventQueue);
+        }
+
+        /**
+         * Process the events—if any—that were produced by the {@link 
ConsumerNetworkThread network thread}.
+         * It is possible that {@link 
org.apache.kafka.clients.consumer.internals.events.ErrorBackgroundEvent an 
error}
+         * could occur when processing the events. In such cases, the 
processor will take a reference to the first
+         * error, continue to process the remaining events, and then throw the 
first error that occurred.
+         */
+        @Override
+        public void process() {
+            AtomicReference<RuntimeException> firstError = new 
AtomicReference<>();
+            process((event, error) -> firstError.compareAndSet(null, error));
+
+            if (firstError.get() != null) {
+                throw firstError.get();
+            }
+        }
+
+        @Override
+        public void process(final BackgroundEvent event) {
+            switch (event.type()) {
+                case ERROR:
+                    process((ErrorBackgroundEvent) event);
+                    break;
+                case GROUP_METADATA_UPDATE:
+                    process((GroupMetadataUpdateEvent) event);
+                    break;
+                default:
+                    throw new IllegalArgumentException("Background event type 
" + event.type() + " was not expected");
+
+            }
+        }
+
+        @Override
+        protected Class<BackgroundEvent> getEventClass() {
+            return BackgroundEvent.class;
+        }
+
+        private void process(final ErrorBackgroundEvent event) {
+            throw event.error();
+        }
+
+        private void process(final GroupMetadataUpdateEvent event) {
+            if (AsyncKafkaConsumer.this.groupMetadata.isPresent()) {
+                final ConsumerGroupMetadata currentGroupMetadata = 
AsyncKafkaConsumer.this.groupMetadata.get();
+                AsyncKafkaConsumer.this.groupMetadata = Optional.of(new 
ConsumerGroupMetadata(
+                    event.groupId(),
+                    event.memberEpoch(),
+                    event.memberId() != null ? event.memberId() : 
currentGroupMetadata.memberId(),

Review Comment:
   I'd slightly prefer this logic to be resolved on the background thread and 
make CurrentGroupMetadataEvent.memberId always non-null



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -131,6 +133,77 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
 
     private static final long NO_CURRENT_THREAD = -1L;
 
+    /**
+     * An {@link 
org.apache.kafka.clients.consumer.internals.events.EventProcessor} that is 
created and executes in the
+     * application thread for the purpose of processing {@link BackgroundEvent 
background events} generated by the
+     * {@link ConsumerNetworkThread network thread}.
+     * Those events are generally of two types:
+     *
+     * <ul>
+     *     <li>Errors that occur in the network thread that need to be 
propagated to the application thread</li>
+     *     <li>{@link ConsumerRebalanceListener} callbacks that are to be 
executed on the application thread</li>
+     * </ul>
+     */
+    public class BackgroundEventProcessor extends 
EventProcessor<BackgroundEvent> {
+
+        public BackgroundEventProcessor(final LogContext logContext,
+                                        final BlockingQueue<BackgroundEvent> 
backgroundEventQueue) {
+            super(logContext, backgroundEventQueue);
+        }
+
+        /**
+         * Process the events—if any—that were produced by the {@link 
ConsumerNetworkThread network thread}.
+         * It is possible that {@link 
org.apache.kafka.clients.consumer.internals.events.ErrorBackgroundEvent an 
error}
+         * could occur when processing the events. In such cases, the 
processor will take a reference to the first
+         * error, continue to process the remaining events, and then throw the 
first error that occurred.
+         */
+        @Override
+        public void process() {
+            AtomicReference<RuntimeException> firstError = new 
AtomicReference<>();
+            process((event, error) -> firstError.compareAndSet(null, error));
+
+            if (firstError.get() != null) {
+                throw firstError.get();
+            }
+        }
+
+        @Override
+        public void process(final BackgroundEvent event) {
+            switch (event.type()) {
+                case ERROR:
+                    process((ErrorBackgroundEvent) event);
+                    break;
+                case GROUP_METADATA_UPDATE:
+                    process((GroupMetadataUpdateEvent) event);
+                    break;
+                default:
+                    throw new IllegalArgumentException("Background event type 
" + event.type() + " was not expected");
+
+            }
+        }
+
+        @Override
+        protected Class<BackgroundEvent> getEventClass() {
+            return BackgroundEvent.class;
+        }
+
+        private void process(final ErrorBackgroundEvent event) {
+            throw event.error();
+        }
+
+        private void process(final GroupMetadataUpdateEvent event) {
+            if (AsyncKafkaConsumer.this.groupMetadata.isPresent()) {
+                final ConsumerGroupMetadata currentGroupMetadata = 
AsyncKafkaConsumer.this.groupMetadata.get();
+                AsyncKafkaConsumer.this.groupMetadata = Optional.of(new 
ConsumerGroupMetadata(
+                    event.groupId(),

Review Comment:
   So `groupId` in `this.groupMetadata` comes from 
`GroupRebalanceConfig.groupId` the 
`config.getString(CommonClientConfigs.GROUP_ID_CONFIG)`.
   
   and `groupId` in `event.groupId` comes from `MembershipManagerImpl.groupId` 
which comes from `GroupState.groupId` which comes from 
`GroupRebalanceConfig.groupId`.
   
   So, aren't you always overwriting the group Id with itself here?
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to