[ 
https://issues.apache.org/jira/browse/KAFKA-9752?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17173850#comment-17173850
 ] 

Dibyendu Bhattacharya commented on KAFKA-9752:
----------------------------------------------

Hi [~ijuma] [~hachikuji], I am seeing a different issue now with this fix . 
Earlier the ConsumerGroup was stuck in "PendingRebalance" state , which is not 
happening now , but now I see members not able to join the group . I see below 
logs where members are being removed after session timeout.

 

[2020-08-09 09:29:00,558] INFO [GroupCoordinator 5]: *Pending member* XXX in 
group YYY  *has been removed after session timeout expiration*. 
(kafka.coordinator.group.GroupCoordinator)

[2020-08-09 09:29:55,856] INFO [GroupCoordinator 5]: *Pending member* ZZZ in 
group YYY  *has been removed after session timeout expiration*. 
(kafka.coordinator.group.GroupCoordinator)

 

As I see the GroupCoridinator code,  when new member tries to join for first 
time,  GroupCoridinator also schedule a addPendingMemberExpiration (in 
doUnknownJoinGroup call ) with SessionTimeOut…

{code:}

addPendingMemberExpiration(group, newMemberId, sessionTimeoutMs)

{code:}

 

If for some reason , if addMemberAndRebalance call takes longer, and member 
still in “Pending” state, the above addPendingMemberExpiration can remove the 
pending member and they cannot join the group. I think that is what is 
happening. 

When for new member , Coordinator is already setting a timeout in 

{code:}

completeAndScheduleNextExpiration(group, member, NewMemberJoinTimeoutMs) 

{code:}

 

What the requirement for one more addPendingMemberExpiration task ? 

 

> Consumer rebalance can be stuck after new member timeout with old JoinGroup 
> version
> -----------------------------------------------------------------------------------
>
>                 Key: KAFKA-9752
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9752
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 2.2.2, 2.3.1, 2.4.1
>            Reporter: Jason Gustafson
>            Assignee: Jason Gustafson
>            Priority: Blocker
>             Fix For: 2.2.3, 2.5.0, 2.3.2, 2.4.2
>
>
> For older versions of the JoinGroup protocol (v3 and below), there was no way 
> for new consumer group members to get their memberId until the first 
> rebalance completed. If the JoinGroup request timed out and the client 
> disconnected, the member would nevertheless be left in the group until the 
> rebalance completed and the session timeout expired. 
> In order to prevent this situation from causing the group size to grow 
> unboundedly, we added logic in KAFKA-7610 to limit the maximum time a new 
> member will be left in the group before it would be kicked out (in spite of 
> rebalance state). 
> In KAFKA-9232, we addressed one issue with this solution. Basically the new 
> member expiration logic did not properly get cancelled after the rebalance 
> completed which means that in certain cases, a successfully joined member 
> might get kicked out of the group unnecessarily. 
> Unfortunately, this patch introduced a regression in the normal session 
> expiration logic following completion of the initial rebalance. Basically the 
> expiration task fails to get scheduled properly. The issue is in this 
> function:
> {code}
>   def shouldKeepAlive(deadlineMs: Long): Boolean = {
>     if (isNew) {
>       // New members are expired after the static join timeout
>       latestHeartbeat + GroupCoordinator.NewMemberJoinTimeoutMs > deadlineMs
>     } else if (isAwaitingJoin || isAwaitingSync) {
>       // Don't remove members as long as they have a request in purgatory
>       true
>     } else {
>       // Otherwise check for session expiration
>       latestHeartbeat + sessionTimeoutMs > deadlineMs
>     }
>   }
> {code}
> We use this logic in order to check for session expiration. On the surface, 
> there is nothing wrong with it, but it has an odd interaction with the 
> purgatory. When the heartbeat is first scheduled with `tryCompleteElseWatch`, 
> the code relies on `shouldKeepAlive` returning false so that the heartbeat 
> task is not immediately completed. This only works because we update 
> `latestHeartbeat` just prior to calling `tryCompleteElseWatch`, which means 
> that the first or third checks will fail, `shouldKeepAlive` will return 
> false, and the heartbeat expiration task will not be immediately completed. 
> The bug in this case has to do with the case when `isNew` is true. When we 
> schedule the session expiration task, the `isNew` flag is still set to true, 
> which means we will hit the first check above. Since in most cases, the 
> session timeout is less than the new member timeout of 5 minutes, the check 
> is very likely to return true. This seems like what we would want, but as 
> noted above, we rely on this function returning false when the expiration 
> task is passed to `tryCompleteElseWatch`. Since it returns true instead, the 
> task completes immediately, which means we cannot rely on its expiration.
> The impact of this bug in the worst case is that a consumer group can be left 
> in the `PreparingRebalance` state indefinitely. This state will persist until 
> there is a coordinator change (e.g. as a result of restarting the broker). 
> Note that this is only possible if 1) we have a consumer using an old 
> JoinGroup version, 2) the consumer times out and disconnects from its initial 
> JoinGroup request. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to