[ 
https://issues.apache.org/jira/browse/KAFKA-14024?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shawn Wang updated KAFKA-14024:
-------------------------------
    Description: 
Hi 

I think this is introduce in https://issues.apache.org/jira/browse/KAFKA-13310. 
 

[https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L752]

 

we didn't wait for client to receive commit offset response here, so 
onJoinPrepareAsyncCommitCompleted will be false in cooperative rebalance, and 
client will loop in invoking onJoinPrepare.

I think the EAGER mode don't have this problem simply because it will revoke 
the partitions even if onJoinPrepareAsyncCommitCompleted=false and will not try 
to commit next round.

reproduce:
 * single node Kafka version 3.2.0 && client version 3.2.0
 * topic1 have 5 partititons
 * start a consumer1 (cooperative rebalance)
 * start another consumer2 (same consumer group)
 * consumer1 will hang for a long time before re-join
 * from server log consumer1 rebalance timeout before joineGroup and re-join 
with another memberId

consume1's log keeps printing:

16:59:16 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
clientId=consumer-xx-1, groupId=xxx] Executing onJoinPrepare with generation 54 
and memberId consumer-xxx-1-fd3d04a8-009a-4ed1-949e-71b636716938 
(ConsumerCoordinator.java:739)
16:59:16 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
clientId=consumer-xxx-1, groupId=xxx] Sending asynchronous auto-commit of 
offsets \{topic1-4=OffsetAndMetadata{offset=5, leaderEpoch=0, metadata=''}} 
(ConsumerCoordinator.java:1143)

 

and coordinator's log:

[2022-06-26 17:00:13,855] INFO [GroupCoordinator 0]: Preparing to rebalance 
group xxx in state PreparingRebalance with old generation 56 
(__consumer_offsets-30) (reason: Adding new member 
consumer-xxx-1-fa7fe5ec-bd2f-42f6-b5d7-c5caeafe71ac with group instance id 
None; client reason: rebalance failed due to 'The group member needs to have a 
valid member id before actually entering a consumer group.' 
(MemberIdRequiredException)) (kafka.coordinator.group.GroupCoordinator)
[2022-06-26 17:00:43,855] INFO [GroupCoordinator 0]: Group xxx removed dynamic 
members who haven't joined: 
Set(consumer-xxx-1-d62a0923-6ca6-48dd-a84e-f97136d4603a) 
(kafka.coordinator.group.GroupCoordinator)
[2022-06-26 17:00:43,856] INFO [GroupCoordinator 0]: Stabilized group xxx 
generation 57 (__consumer_offsets-30) with 3 members 
(kafka.coordinator.group.GroupCoordinator)
[2022-06-26 17:00:44,048] INFO [GroupCoordinator 0]: Dynamic member with 
unknown member id joins group xxx in CompletingRebalance state. Created a new 
member id consumer-xxx-1-f0298aa0-711c-498e-bdfd-1dd205d7b640 and request the 
member to rejoin with this id. (kafka.coordinator.group.GroupCoordinator)
[2022-06-26 17:00:44,053] INFO [GroupCoordinator 0]: Assignment received from 
leader consumer-xxx-1-e842a14c-eff7-4b55-9463-72b9c2534afd for group xxx for 
generation 57. The group has 3 members, 0 of which are static. 
(kafka.coordinator.group.GroupCoordinator)
[2022-06-26 17:00:44,243] INFO [GroupCoordinator 0]: Preparing to rebalance 
group xxx in state PreparingRebalance with old generation 57 
(__consumer_offsets-30) (reason: Adding new member 
consumer-xxx-1-f0298aa0-711c-498e-bdfd-1dd205d7b640 with group instance id 
None; client reason: rebalance failed due to 'The group member needs to have a 
valid member id before actually entering a consumer group.' 
(MemberIdRequiredException)) (kafka.coordinator.group.GroupCoordinator)

  was:
Hi 

I think this is introduce in https://issues.apache.org/jira/browse/KAFKA-13310. 
 

[https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L752]

 

we didn't wait for client to receive commit offset response here, so 
onJoinPrepareAsyncCommitCompleted will be false in cooperative rebalance.

I think the EAGER mode don't have this problem simply because it will revoke 
the partitions even if onJoinPrepareAsyncCommitCompleted=false and will not try 
to commit next round.

reproduce:
 * single node Kafka version 3.2.0 && client version 3.2.0
 * topic1 have 5 partititons
 * start a consumer1 (cooperative rebalance)
 * start another consumer2 (same consumer group)
 * consumer1 will hang for a long time before re-join
 * from server log consumer1 rebalance timeout before joineGroup and re-join 
with another memberId

consume1's log keeps printing:

16:59:16 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
clientId=consumer-xx-1, groupId=xxx] Executing onJoinPrepare with generation 54 
and memberId consumer-xxx-1-fd3d04a8-009a-4ed1-949e-71b636716938 
(ConsumerCoordinator.java:739)
16:59:16 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
clientId=consumer-xxx-1, groupId=xxx] Sending asynchronous auto-commit of 
offsets \{topic1-4=OffsetAndMetadata{offset=5, leaderEpoch=0, metadata=''}} 
(ConsumerCoordinator.java:1143)

 

and coordinator's log:

[2022-06-26 17:00:13,855] INFO [GroupCoordinator 0]: Preparing to rebalance 
group xxx in state PreparingRebalance with old generation 56 
(__consumer_offsets-30) (reason: Adding new member 
consumer-xxx-1-fa7fe5ec-bd2f-42f6-b5d7-c5caeafe71ac with group instance id 
None; client reason: rebalance failed due to 'The group member needs to have a 
valid member id before actually entering a consumer group.' 
(MemberIdRequiredException)) (kafka.coordinator.group.GroupCoordinator)
[2022-06-26 17:00:43,855] INFO [GroupCoordinator 0]: Group xxx removed dynamic 
members who haven't joined: 
Set(consumer-xxx-1-d62a0923-6ca6-48dd-a84e-f97136d4603a) 
(kafka.coordinator.group.GroupCoordinator)
[2022-06-26 17:00:43,856] INFO [GroupCoordinator 0]: Stabilized group xxx 
generation 57 (__consumer_offsets-30) with 3 members 
(kafka.coordinator.group.GroupCoordinator)
[2022-06-26 17:00:44,048] INFO [GroupCoordinator 0]: Dynamic member with 
unknown member id joins group xxx in CompletingRebalance state. Created a new 
member id consumer-xxx-1-f0298aa0-711c-498e-bdfd-1dd205d7b640 and request the 
member to rejoin with this id. (kafka.coordinator.group.GroupCoordinator)
[2022-06-26 17:00:44,053] INFO [GroupCoordinator 0]: Assignment received from 
leader consumer-xxx-1-e842a14c-eff7-4b55-9463-72b9c2534afd for group xxx for 
generation 57. The group has 3 members, 0 of which are static. 
(kafka.coordinator.group.GroupCoordinator)
[2022-06-26 17:00:44,243] INFO [GroupCoordinator 0]: Preparing to rebalance 
group xxx in state PreparingRebalance with old generation 57 
(__consumer_offsets-30) (reason: Adding new member 
consumer-xxx-1-f0298aa0-711c-498e-bdfd-1dd205d7b640 with group instance id 
None; client reason: rebalance failed due to 'The group member needs to have a 
valid member id before actually entering a consumer group.' 
(MemberIdRequiredException)) (kafka.coordinator.group.GroupCoordinator)


> Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance
> ----------------------------------------------------------------------
>
>                 Key: KAFKA-14024
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14024
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>    Affects Versions: 3.2.0
>            Reporter: Shawn Wang
>            Priority: Major
>
> Hi 
> I think this is introduce in 
> https://issues.apache.org/jira/browse/KAFKA-13310.  
> [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L752]
>  
> we didn't wait for client to receive commit offset response here, so 
> onJoinPrepareAsyncCommitCompleted will be false in cooperative rebalance, and 
> client will loop in invoking onJoinPrepare.
> I think the EAGER mode don't have this problem simply because it will revoke 
> the partitions even if onJoinPrepareAsyncCommitCompleted=false and will not 
> try to commit next round.
> reproduce:
>  * single node Kafka version 3.2.0 && client version 3.2.0
>  * topic1 have 5 partititons
>  * start a consumer1 (cooperative rebalance)
>  * start another consumer2 (same consumer group)
>  * consumer1 will hang for a long time before re-join
>  * from server log consumer1 rebalance timeout before joineGroup and re-join 
> with another memberId
> consume1's log keeps printing:
> 16:59:16 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
> clientId=consumer-xx-1, groupId=xxx] Executing onJoinPrepare with generation 
> 54 and memberId consumer-xxx-1-fd3d04a8-009a-4ed1-949e-71b636716938 
> (ConsumerCoordinator.java:739)
> 16:59:16 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
> clientId=consumer-xxx-1, groupId=xxx] Sending asynchronous auto-commit of 
> offsets \{topic1-4=OffsetAndMetadata{offset=5, leaderEpoch=0, metadata=''}} 
> (ConsumerCoordinator.java:1143)
>  
> and coordinator's log:
> [2022-06-26 17:00:13,855] INFO [GroupCoordinator 0]: Preparing to rebalance 
> group xxx in state PreparingRebalance with old generation 56 
> (__consumer_offsets-30) (reason: Adding new member 
> consumer-xxx-1-fa7fe5ec-bd2f-42f6-b5d7-c5caeafe71ac with group instance id 
> None; client reason: rebalance failed due to 'The group member needs to have 
> a valid member id before actually entering a consumer group.' 
> (MemberIdRequiredException)) (kafka.coordinator.group.GroupCoordinator)
> [2022-06-26 17:00:43,855] INFO [GroupCoordinator 0]: Group xxx removed 
> dynamic members who haven't joined: 
> Set(consumer-xxx-1-d62a0923-6ca6-48dd-a84e-f97136d4603a) 
> (kafka.coordinator.group.GroupCoordinator)
> [2022-06-26 17:00:43,856] INFO [GroupCoordinator 0]: Stabilized group xxx 
> generation 57 (__consumer_offsets-30) with 3 members 
> (kafka.coordinator.group.GroupCoordinator)
> [2022-06-26 17:00:44,048] INFO [GroupCoordinator 0]: Dynamic member with 
> unknown member id joins group xxx in CompletingRebalance state. Created a new 
> member id consumer-xxx-1-f0298aa0-711c-498e-bdfd-1dd205d7b640 and request the 
> member to rejoin with this id. (kafka.coordinator.group.GroupCoordinator)
> [2022-06-26 17:00:44,053] INFO [GroupCoordinator 0]: Assignment received from 
> leader consumer-xxx-1-e842a14c-eff7-4b55-9463-72b9c2534afd for group xxx for 
> generation 57. The group has 3 members, 0 of which are static. 
> (kafka.coordinator.group.GroupCoordinator)
> [2022-06-26 17:00:44,243] INFO [GroupCoordinator 0]: Preparing to rebalance 
> group xxx in state PreparingRebalance with old generation 57 
> (__consumer_offsets-30) (reason: Adding new member 
> consumer-xxx-1-f0298aa0-711c-498e-bdfd-1dd205d7b640 with group instance id 
> None; client reason: rebalance failed due to 'The group member needs to have 
> a valid member id before actually entering a consumer group.' 
> (MemberIdRequiredException)) (kafka.coordinator.group.GroupCoordinator)



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to