[ https://issues.apache.org/jira/browse/KAFKA-18160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Chia-Ping Tsai updated KAFKA-18160: ----------------------------------- Summary: Interrupting or waking up onPartitionsAssigned in AsyncConsumer can cause the ConsumerRebalanceListenerCallbackCompletedEvent to be skipped (was: Interrupting or waking up onPartitionsAssigned in AsyncConsumer can cause the ConsumerRebalanceListenerCallbackCompletedEvent to be skipped, potentially leading to corrupted added partitions) > Interrupting or waking up onPartitionsAssigned in AsyncConsumer can cause the > ConsumerRebalanceListenerCallbackCompletedEvent to be skipped > ------------------------------------------------------------------------------------------------------------------------------------------- > > Key: KAFKA-18160 > URL: https://issues.apache.org/jira/browse/KAFKA-18160 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer > Reporter: Chia-Ping Tsai > Assignee: Kuan Po Tseng > Priority: Blocker > Labels: consumer-threading-refactor > Fix For: 4.0.0 > > > I noticed this issue when testing KAFKA-17962. It includes two bugs listed > below. > *ConsumerRebalanceListenerCallbackCompletedEvent is skipped* > `invokeRebalanceCallbacks`could throw WakeupException/InterruptException [0] > and they are NOT handled. Hence, the event > `ConsumerRebalanceListenerCallbackCompletedEvent` is NOT sent to background > thread. > *Solution*: We should use try-catch blocks to propagate both > InterruptedException and WakeupException to the background thread. > the second issue is expected behavior, so please ignore it :) > -*corrupted added partitions*- > -In the next iteration of invokeRebalanceCallbacks, non-fetchable assigned > partitions are treated as owned partitions [1]. This results in "empty" > partitions being passed to the listener, meaning that the listener never > receives the correctly added partitions after the first execution fails. > Consequently, this causes the test_pause_and_resume_sink (KAFKA-17962) to > become unstable when using AsyncConsumer.- > -*Solution*: We should add only partitions where pendingOnAssignedCallback is > false to the owned partitions.- > [0] > https://github.com/apache/kafka/blob/2d39d5be64d4f5b6446f4b9ec3f32b039707d9d1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L2046 > [1] > https://github.com/apache/kafka/blob/2d39d5be64d4f5b6446f4b9ec3f32b039707d9d1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java#L828 -- This message was sent by Atlassian Jira (v8.20.10#820010)