[ https://issues.apache.org/jira/browse/KAFKA-1012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13750580#comment-13750580 ]
Jun Rao commented on KAFKA-1012: -------------------------------- Thanks for the patch. This looks promising. Some comments: 20. ZookeeperConsumerConnector: 20.1 Currently, this class manages an OffsetFetchChannel and OffsetCommitProducer directly. Could we use an OffsetClientManager to manage them together? 20.2 commitOffsets() Instead of doing topicRegistry.flatMap(t => ) you can do topicRegistry.flatMap{case(topic, streamToPartitionMap) => } and reference the named fields directly. 20.3 The patch added a couple of locks to synchronize the offset commits. In Kafka-989, we changed shutdown() to synchronize on the rebalance lock. I am wondering if we can just synchronize on the same lock for offsets related stuff. 20.4 addPartitionTopicInfo: If getFetchOffset returns an OffsetLoadingCode error, should we keep retrying until we get a valid offset? If so, we could write the logic in a util function. Also, correlationId should change. 21. KafkaApis: 21.1 offsetFetchChannelPool: The controller also manages a socket connection channel pool. Could we reuse the same code or have a shared broker channel pool btw the controller and KafkaApis? 21.2 handleProducerRequest(): Since producer requests to OffsetTopic always use ack = -1, do we need to handle it in the case when ack is not -1? 21.3 handleTopicMetadataRequest(): during topic creation, it seems that we can put the following line in a final clause of try. topicsMetadata += new TopicMetadata(topicMetadata.topic, topicMetadata.partitionsMetadata, ErrorMapping.LeaderNotAvailableCode) 21.4 handleOffsetFetchRequest(): I am not sure if this should trigger the creation of OffsetTopic. It seems to me that OffsetTopic should only be created at CommitOffset time. If the OffsetTopic doesn't exist, we should probably just return an offset of -1 to the caller. 22. OffsetManager: 22.1 OffsetAndMetadata: The timestamp in the offset value is not being used. 22.2 triggerLoadOffsets: Instead of doing currOffset = currOffset + 1 we probably should do currOffset = m.offset + 1 This is because offsets are not guaranteed to be consecutive when dedupe is enabled. 22.3 Should we do the common statements like the following in a finally clause? loading.remove(offsetsPartition) // unlock to prevent from blocking offset fetch requests 23. ErrorMapping: Do we need a corresponding exception for OffsetsLoadingCode? A couple of high level questions: 24. How do we clean up the offsets for those consumers that are gone? For example, there could be many instances of console consumers that come and go. We probably don't want to keep their offsets in either the in-memory table or the offset log forever. 25. How do we plan to migrate existing consumers to this new offset storage? Do we need to stop all consumer instances and do an offset import first or can we do this online? > Implement an Offset Manager and hook offset requests to it > ---------------------------------------------------------- > > Key: KAFKA-1012 > URL: https://issues.apache.org/jira/browse/KAFKA-1012 > Project: Kafka > Issue Type: Sub-task > Components: consumer > Reporter: Tejas Patil > Assignee: Tejas Patil > Priority: Minor > Attachments: KAFKA-1012.patch, KAFKA-1012-v2.patch > > > After KAFKA-657, we have a protocol for consumers to commit and fetch offsets > from brokers. Currently, consumers are not using this API and directly > talking with Zookeeper. > This Jira will involve following: > 1. Add a special topic in kafka for storing offsets > 2. Add an OffsetManager interface which would handle storing, accessing, > loading and maintaining consumer offsets > 3. Implement offset managers for both of these 2 choices : existing ZK based > storage or inbuilt storage for offsets. > 4. Leader brokers would now maintain an additional hash table of offsets for > the group-topic-partitions that they lead > 5. Consumers should now use the OffsetCommit and OffsetFetch API -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira