[ https://issues.apache.org/jira/browse/KAFKA-7018?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Boyang Chen resolved KAFKA-7018. -------------------------------- Fix Version/s: 2.4.0 Resolution: Fixed > persist memberId for consumer restart > ------------------------------------- > > Key: KAFKA-7018 > URL: https://issues.apache.org/jira/browse/KAFKA-7018 > Project: Kafka > Issue Type: Improvement > Components: consumer, streams > Reporter: Boyang Chen > Assignee: Boyang Chen > Priority: Major > Fix For: 2.4.0 > > > In group coordinator, there is a logic to neglect join group request from > existing follower consumers: > {code:java} > case Empty | Stable => > if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) { > // if the member id is unknown, register the member to the group > addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, clientId, > clientHost, protocolType, protocols, group, responseCallback) > } else { > val member = group.get(memberId) > if (group.isLeader(memberId) || !member.matches(protocols)) { > // force a rebalance if a member has changed metadata or if the leader > sends JoinGroup. > // The latter allows the leader to trigger rebalances for changes > affecting assignment > // which do not affect the member metadata (such as topic metadata > changes for the consumer) > updateMemberAndRebalance(group, member, protocols, responseCallback) > } else { > // for followers with no actual change to their metadata, just return > group information > // for the current generation which will allow them to issue SyncGroup > responseCallback(JoinGroupResult( > members = Map.empty, > memberId = memberId, > generationId = group.generationId, > subProtocol = group.protocolOrNull, > leaderId = group.leaderOrNull, > error = Errors.NONE)) > } > {code} > While looking at the AbstractCoordinator, I found that the generation was > hard-coded as > NO_GENERATION on restart, which means we will send UNKNOWN_MEMBER_ID in the > first join group request. This means we will treat the restarted consumer as > a new member, so the rebalance will be triggered until session timeout. > I'm trying to clarify the following things before we extend the discussion: > # Whether my understanding of the above logic is right (Hope [~mjsax] could > help me double check) > # Whether it makes sense to persist last round of memberId for consumers? We > currently only need this feature in stream application, but will do no harm > if we also use it for consumer in general. This would be a nice-to-have > feature on consumer restart when we configured the loading-previous-memberId > to true. If we failed, simply use the UNKNOWN_MEMBER_ID > # The behavior could also be changed on the broker side, but I suspect it is > very risky. So far client side change should be the least effort. The end > goal is to avoid excessive rebalance from the same consumer restart, so if > you feel server side change could also help, we could further discuss. > Thank you for helping out! [~mjsax] [~guozhang] > -- This message was sent by Atlassian Jira (v8.3.2#803003)