[ https://issues.apache.org/jira/browse/KAFKA-13435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17451922#comment-17451922 ]
Ryan Leslie commented on KAFKA-13435: ------------------------------------- [~dajac] Yes, I was going to suggest something similar to this. The broker knows that the re-joining member is the leader based on its instance id, and can inform it of such in the JoinGroupResponse. And SyncGroup is also already responding with assignment data. So putting that together can possibly allow the client to rebuild enough state to continue monitoring metadata changes. You are completely right that this won't handle partitions being added in between the leader exiting and coming back. However, that is usually not meant to be a long time period or else it will time out or stall progress in the consumer group. Though it is a very clever idea, I'm not 100% about taking it further to have the assignment actually recomputed instead of trying to have the broker return it. Users might define their own assignors and we can't be sure if they are side-effect free or not, regardless of whether or not they are deterministic. Rebuilding state in the consumer without invoking the assignor may require a separate code path in the client-side coordinator, however. Perhaps others can weigh in on this trade-off too. This bug is already quite rare, and AFAIK only reported once since the inception of static membership / internal.leave.group.on.close. It may be better to start with a 95% solution than a perfect one involving more frequent rebalancing or much more invasive changes that could lead to newer bugs. And sorry, when I said restart consumers above, I meant not only to close them but call _AdminClient.removeMembersFromConsumerGroup()_ as well so they are new members when coming back up. To answer your other question, the main purpose of static membership is obviously to avoid rebalances when restarting, and we have to assume those who explicitly enable are very interested in this for one reason or another. Any solution that changes that guarantee and involves rebalancing every time a particular member restarts is a pretty serious negative and goes against the documented behavior. I feel it would have to be a very serious bug to make that decision, not an edge case like this where we can be patient and explore workarounds. You are right that triggering the rebalance should never break any application, but it can result in the kind of slow down that users aim to avoid. In our case, we happen to have some processing decoupled from the actual Kafka consumer loop and rebalances can occur asynchronous of processing. Because of this we are not always able to wait and commit all processed offsets prior to a rebalance which leads to duplicate messages being processed afterwards. That is why we use static membership, to help reduce that and limit duplicates to only certain, and more rare, situations. I haven't gone into full detail, but I hope that clarifies the concern a bit more. > Group won't consume partitions added after static member restart > ---------------------------------------------------------------- > > Key: KAFKA-13435 > URL: https://issues.apache.org/jira/browse/KAFKA-13435 > Project: Kafka > Issue Type: Bug > Components: consumer > Affects Versions: 3.0.0 > Reporter: Ryan Leslie > Assignee: David Jacot > Priority: Critical > > When using consumer groups with static membership, if the consumer marked as > leader has restarted, then metadata changes such as partition increase are > not triggering expected rebalances. > To reproduce this issue, simply: > # Create a static consumer subscribed to a single topic > # Close the consumer and create a new one with the same group instance id > # Increase partitions for the topic > # Observe that no rebalance occurs and the new partitions are not assigned > I have only tested this in 2.7, but it may apply to newer versions as well. > h3. Analysis > In {_}ConsumerCoordinator{_}, one responsibility of the leader consumer is to > track metadata and trigger a rebalance if there are changes such as new > partitions added: > [https://github.com/apache/kafka/blob/43bcc5682da82a602a4c0a000dc7433d0507b450/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L793] > {code:java} > if (assignmentSnapshot != null && > !assignmentSnapshot.matches(metadataSnapshot)) { > ... > requestRejoinIfNecessary(reason); > return true; > } > {code} > Note thatĀ _assignmentSnapshot_ is currently only set if the consumer is the > leader: > [https://github.com/apache/kafka/blob/43bcc5682da82a602a4c0a000dc7433d0507b450/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L353] > {code:java} > // Only the leader is responsible for monitoring for metadata changes (i.e. > partition changes) > if (!isLeader) > assignmentSnapshot = null; > {code} > And _isLeader_ is only true after an assignment is performed during a > rebalance: > [https://github.com/apache/kafka/blob/43bcc5682da82a602a4c0a000dc7433d0507b450/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L634] > That is, when a consumer group forms, exactly one consumer in the group > should haveĀ _isLeader == True_ and be responsible for triggering rebalances > on metadata changes. > However, in the case of static membership, if the leader has been restarted > and rejoined the group, the group essentially no longer has a current leader. > Even though the metadata changes are fetched, no rebalance will be triggered. > That is, _isLeader_ will be false for all members. > This issue does not resolve until after an actual group change that causes a > proper rebalance. In order to safely make a partition increase when using > static membership, consumers must be stopped and have timed out, or forcibly > removed with {_}AdminClient.removeMembersFromConsumerGroup(){_}. > Correcting this in the client probably also requires help from the broker. > Currently, when a static consumer that is leader is restarted, the > coordinator does recognize the change: > e.g. leader _bbfcb930-61a3-4d21-945c-85f4576490ff_ was restarted > {noformat} > [2021-11-04 13:53:13,487] INFO [GroupCoordinator 4]: Static member > Some(1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0) of group ryan_test > with unknown member id rejoins, assigning new member id > 1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0-af88ecf2- > 6ebf-47da-95ef-c54fef17ab74, while old member id > 1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0-bbfcb930-61a3-4d21-945c-85f4576490ff > will be removed. ( > kafka.coordinator.group.GroupCoordinator){noformat} > However, it does not attempt to update the leader id since this isn't a new > rebalance, and JOIN_GROUP will continue returning the now stale member id as > leader: > {noformat} > 2021-11-04 13:53:13,490 DEBUG o.a.k.c.c.i.AbstractCoordinator [Consumer > instanceId=1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0, > clientId=1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0, > groupId=ryan_test] Received successful JoinGroup response: > JoinGroupResponseData(throttleTimeMs=0, errorCode=0, generationId=40, > protocolType='consumer', protocolName='range', > leader='1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0-bbfcb930-61a3-4d21-945c-85f4576490ff', > > memberId='1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0-af88ecf2-6ebf-47da-95ef-c54fef17ab74', > members=[]){noformat} > This means that it's not easy for any particular restarted member to identify > that it should consider itself leader and handle metadata changes. > There is reference to the difficulty of leader restarts in KAFKA-7728 but the > focus seemed mainly on avoiding needless rebalances for static members. That > goal was accomplished, but this issue seems to be a side effect of both not > rebalancing AND not having the rejoined member reclaim its leadership status. > Also, I have not verified if it's strictly related or valid, but noticed this > ticket has been opened too: KAFKA-12759. -- This message was sent by Atlassian Jira (v8.20.1#820001)