Ryan Leslie created KAFKA-13435:
-----------------------------------

             Summary: Group won't consume partitions added after static member 
restart
                 Key: KAFKA-13435
                 URL: https://issues.apache.org/jira/browse/KAFKA-13435
             Project: Kafka
          Issue Type: Bug
          Components: consumer
    Affects Versions: 2.7.0
            Reporter: Ryan Leslie


When using consumer groups with static membership, if the consumer marked as 
leader has restarted, then metadata changes such as partition increase are not 
triggering expected rebalances.

To reproduce this issue, simply:
 # Create a static consumer subscribed to a single topic
 # Close the consumer and create a new one with the same group instance id
 # Increase partitions for the topic
 # Observe that no rebalance occurs and the new partitions are not assigned

I have only tested this in 2.7, but it may apply to newer versions as well.
h3. Analysis

In _ConsumerCoordinator_, one responsibility of the leader consumer is to track 
metadata and trigger a rebalance if there are changes such as new partitions 
added:

[https://github.com/apache/kafka/blob/43bcc5682da82a602a4c0a000dc7433d0507b450/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L793]
{code:java}
if (assignmentSnapshot != null && 
!assignmentSnapshot.matches(metadataSnapshot)) {
    ...
    requestRejoinIfNecessary(reason);
    return true;
}
{code}
Note thatĀ _assignmentSnapshot_ is currently only set if the consumer is the 
leader:

[https://github.com/apache/kafka/blob/43bcc5682da82a602a4c0a000dc7433d0507b450/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L353]
{code:java}
// Only the leader is responsible for monitoring for metadata changes (i.e. 
partition changes)
if (!isLeader)
    assignmentSnapshot = null;
{code}
And _isLeader_ is only true after an assignment is performed during a rebalance:

[https://github.com/apache/kafka/blob/43bcc5682da82a602a4c0a000dc7433d0507b450/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L634]

That is, when a consumer group forms, exactly one consumer in the group should 
haveĀ _isLeader == True_ and be responsible for triggering rebalances on 
metadata changes.

However, in the case of static membership, if the leader has been restarted and 
rejoined the group, the group essentially no longer has a current leader. Even 
though the metadata changes are fetched, no rebalance will be triggered. That 
is, _isLeader_ will be false for all members.

This issue does not resolve until after an actual group change that causes a 
proper rebalance. In order to safely make a partition increase when using 
static membership, consumers must be stopped and have timed out, or forcibly 
removed with _AdminClient.removeMembersFromConsumerGroup()_.

Correcting this in the client probably also requires help from the broker. 
Currently, when a static consumer that is leader is restarted, the coordinator 
does recognize the change:

e.g. leader _bbfcb930-61a3-4d21-945c-85f4576490ff_ was restarted
{noformat}
[2021-11-04 13:53:13,487] INFO [GroupCoordinator 4]: Static member 
Some(1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0) of group ryan_test with 
unknown member id rejoins, assigning new member id 
353WV-1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0-af88ecf2-
6ebf-47da-95ef-c54fef17ab74, while old member id 
1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0-bbfcb930-61a3-4d21-945c-85f4576490ff
 will be removed. (
kafka.coordinator.group.GroupCoordinator){noformat}
However, it does not attempt to update the leader id since this isn't a new 
rebalance, and JOIN_GROUP will continue returning the now stale member id as 
leader:
{noformat}
2021-11-04 13:53:13,490 DEBUG o.a.k.c.c.i.AbstractCoordinator [Consumer 
instanceId=353WV-1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0, 
clientId=1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0, groupId=ryan_test] 
Received successful JoinGroup response: JoinGroupResponseData(throttleTimeMs=0, 
errorCode=0, generationId=40, protocolType='consumer', protocolName='range', 
leader='1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0-bbfcb930-61a3-4d21-945c-85f4576490ff',
 
memberId='1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0-af88ecf2-6ebf-47da-95ef-c54fef17ab74',
 members=[]){noformat}
This means that it's not easy for any particular restarted member to identify 
that it should consider itself leader and handle metadata changes.

There is reference to the difficulty of leader restarts in KAFKA-7728 but the 
focus seemed mainly on avoiding needless rebalances for static members. That 
goal was accomplished, but this issue seems to be a side effect of both not 
rebalancing AND not having the rejoined member re-claim its leadership status.

Also, I have not verified if it's strictly related or valid, but noticed this 
ticket has been opened too: KAFKA-12759.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to