[ https://issues.apache.org/jira/browse/KAFKA-2017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14979146#comment-14979146 ]
Guozhang Wang commented on KAFKA-2017: -------------------------------------- [~onurkaraman] [~becket_qin] [~jjkoshy] I have discussed with [~junrao] and [~hachikuji] about various options for 0.9.0: 1. We realized relaxing the generation id check for commit offset while not persisting grouping state does not perfectly solve the problem. Since when coordinator migrates, consumers will 1) discover the new coordinator, 2) send the HB request as scheduled without stop fetching. With a group of more than one consumer, it is likely that a first consumer member will find the new coordinator and send the HB request, then got the error back and rejoin the group. Since today coordinator will immediately create the group when receiving a join group request from an unknown group for the first time and finish the join-group phase immediately, right after that other consumer member's commit request will be rejected, hence still causing duplicates during coordinator migration. We have talked about delaying the creation of the group up to the session timeout for the first-ever join group, or relax the offset commit checking further to completely ignore the group id and always blindly accepts the requests. But those solutions also have their problems, as the former approach could delay the creation of the group by 30 seconds (as the default value of session timeout), and the latter approach cannot distinguish consumers using Kafka for coordinations with other consumers that get assignments themselves. So we think it is still necessary to have this feature in 0.9.0 release. 2. We also went through implementation details to enforce persistency in Kafka, and felt that it still have many tricky cases to be done right, for example: a) If we are going to use two topics, one for offset and one for group metadata, then we need to make sure these two topics will ALWAYS have the same leader (i.e. the coordinator) for their partitions. However, with the current reassignment mechanism, consecutive reassignments from bouncing brokers / broker failures cannot easily ensure that is the case. We can of course refactor the offset manager as a general key-value storage with multiple topics, but that is a much larger feature to add that is way beyond the scope of 0.9.0. b) If we are going to use the same topics with the new message format as Joel proposed, it is not clear how we can use log compaction to delete the old formatted messages as they will be different keys. If we are going to keep messages of both versions, it will further increase the latency of loading the whole log for consumer group metadata upon coordinator migration, and also we need to change the caching layer behavior to be able to override values while loading offsets from logs. c) Instead, what we can do with the same topics is to use the key version as the "type indicator": since both key and value have their own versions, we can use key version number to indicate the type of the message, for 0 it is the offset message, and for 1 it is the group metadata offset message. The value versions for offset and group metadata messages can still evolve separately; and we will never evolve key versions moving forward (we cannot do this even today anyways because of log compaction), but just change the topic if we ever have to do so. With this proposal: 1) OffsetManager will become ConsumerGroupManager, thought its related config names will still be "offsetXXX" for now since they are public, 2) loading a message from log will either return an offset object or group metadata object, both of which will be kept inside ConsumerGroupManager's cache, 3) we will store the assignment along with the metadata only after the sync phase is complete; for MM this assignment could be large and hence we may want to reconfig the "offsetMaxRecordSize" to handle this, 4) we will still need KIP-40 for querying the group metadata / assignment from ConsumerGroupManager's cache. Thoughts? > Persist Coordinator State for Coordinator Failover > -------------------------------------------------- > > Key: KAFKA-2017 > URL: https://issues.apache.org/jira/browse/KAFKA-2017 > Project: Kafka > Issue Type: Sub-task > Components: consumer > Affects Versions: 0.9.0.0 > Reporter: Onur Karaman > Assignee: Guozhang Wang > Fix For: 0.9.0.0 > > Attachments: KAFKA-2017.patch, KAFKA-2017_2015-05-20_09:13:39.patch, > KAFKA-2017_2015-05-21_19:02:47.patch > > > When a coordinator fails, the group membership protocol tries to failover to > a new coordinator without forcing all the consumers rejoin their groups. This > is possible if the coordinator persists its state so that the state can be > transferred during coordinator failover. This state consists of most of the > information in GroupRegistry and ConsumerRegistry. -- This message was sent by Atlassian JIRA (v6.3.4#6332)