[ 
https://issues.apache.org/jira/browse/KAFKA-18160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai updated KAFKA-18160:
-----------------------------------
    Description: 
I noticed this issue when testing KAFKA-17962. It includes two bugs listed 
below.

*ConsumerRebalanceListenerCallbackCompletedEvent is skipped*

`invokeRebalanceCallbacks`could throw WakeupException/InterruptException [0] 
and they are NOT handled. Hence, the event 
`ConsumerRebalanceListenerCallbackCompletedEvent` is NOT sent to background 
thread.

*Solution*: We should use try-catch blocks to propagate both 
InterruptedException and WakeupException to the background thread.

the second issue is expected behavior, so please ignore it :)

-*corrupted added partitions*-

-In the next iteration of invokeRebalanceCallbacks, non-fetchable assigned 
partitions are treated as owned partitions [1]. This results in "empty" 
partitions being passed to the listener, meaning that the listener never 
receives the correctly added partitions after the first execution fails. 
Consequently, this causes the test_pause_and_resume_sink (KAFKA-17962) to 
become unstable when using AsyncConsumer.-

-*Solution*: We should add only partitions where pendingOnAssignedCallback is 
false to the owned partitions.-


[0] 
https://github.com/apache/kafka/blob/2d39d5be64d4f5b6446f4b9ec3f32b039707d9d1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L2046

[1] 
https://github.com/apache/kafka/blob/2d39d5be64d4f5b6446f4b9ec3f32b039707d9d1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java#L828


  was:
I noticed this issue when testing KAFKA-17962. It includes two bugs listed 
below.

*ConsumerRebalanceListenerCallbackCompletedEvent is skipped*

`invokeRebalanceCallbacks`could throw WakeupException/InterruptException [0] 
and they are NOT handled. Hence, the event 
`ConsumerRebalanceListenerCallbackCompletedEvent` is NOT sent to background 
thread.

*Solution*: We should use try-catch blocks to propagate both 
InterruptedException and WakeupException to the background thread.


*corrupted added partitions*

In the next iteration of invokeRebalanceCallbacks, non-fetchable assigned 
partitions are treated as owned partitions [1]. This results in "empty" 
partitions being passed to the listener, meaning that the listener never 
receives the correctly added partitions after the first execution fails. 
Consequently, this causes the test_pause_and_resume_sink (KAFKA-17962) to 
become unstable when using AsyncConsumer.

*Solution*: We should add only partitions where pendingOnAssignedCallback is 
false to the owned partitions.


[0] 
https://github.com/apache/kafka/blob/2d39d5be64d4f5b6446f4b9ec3f32b039707d9d1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L2046

[1] 
https://github.com/apache/kafka/blob/2d39d5be64d4f5b6446f4b9ec3f32b039707d9d1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java#L828



> Interrupting or waking up onPartitionsAssigned in AsyncConsumer can cause the 
> ConsumerRebalanceListenerCallbackCompletedEvent to be skipped, potentially 
> leading to corrupted added partitions
> ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-18160
>                 URL: https://issues.apache.org/jira/browse/KAFKA-18160
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: clients, consumer
>            Reporter: Chia-Ping Tsai
>            Assignee: Kuan Po Tseng
>            Priority: Blocker
>              Labels: consumer-threading-refactor
>             Fix For: 4.0.0
>
>
> I noticed this issue when testing KAFKA-17962. It includes two bugs listed 
> below.
> *ConsumerRebalanceListenerCallbackCompletedEvent is skipped*
> `invokeRebalanceCallbacks`could throw WakeupException/InterruptException [0] 
> and they are NOT handled. Hence, the event 
> `ConsumerRebalanceListenerCallbackCompletedEvent` is NOT sent to background 
> thread.
> *Solution*: We should use try-catch blocks to propagate both 
> InterruptedException and WakeupException to the background thread.
> the second issue is expected behavior, so please ignore it :)
> -*corrupted added partitions*-
> -In the next iteration of invokeRebalanceCallbacks, non-fetchable assigned 
> partitions are treated as owned partitions [1]. This results in "empty" 
> partitions being passed to the listener, meaning that the listener never 
> receives the correctly added partitions after the first execution fails. 
> Consequently, this causes the test_pause_and_resume_sink (KAFKA-17962) to 
> become unstable when using AsyncConsumer.-
> -*Solution*: We should add only partitions where pendingOnAssignedCallback is 
> false to the owned partitions.-
> [0] 
> https://github.com/apache/kafka/blob/2d39d5be64d4f5b6446f4b9ec3f32b039707d9d1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L2046
> [1] 
> https://github.com/apache/kafka/blob/2d39d5be64d4f5b6446f4b9ec3f32b039707d9d1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java#L828



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to