[
https://issues.apache.org/jira/browse/KAFKA-1012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13750580#comment-13750580
]
Jun Rao commented on KAFKA-1012:
--------------------------------
Thanks for the patch. This looks promising. Some comments:
20. ZookeeperConsumerConnector:
20.1 Currently, this class manages an OffsetFetchChannel and
OffsetCommitProducer directly. Could we use an OffsetClientManager to manage
them together?
20.2 commitOffsets()
Instead of doing
topicRegistry.flatMap(t => )
you can do
topicRegistry.flatMap{case(topic, streamToPartitionMap) => }
and reference the named fields directly.
20.3 The patch added a couple of locks to synchronize the offset commits. In
Kafka-989, we changed shutdown() to synchronize on the rebalance lock. I am
wondering if we can just synchronize on the same lock for offsets related stuff.
20.4 addPartitionTopicInfo: If getFetchOffset returns an OffsetLoadingCode
error, should we keep retrying until we get a valid offset? If so, we could
write the logic in a util function. Also, correlationId should change.
21. KafkaApis:
21.1 offsetFetchChannelPool: The controller also manages a socket connection
channel pool. Could we reuse the same code or have a shared broker channel pool
btw the controller and KafkaApis?
21.2 handleProducerRequest(): Since producer requests to OffsetTopic always use
ack = -1, do we need to handle it in the case when ack is not -1?
21.3 handleTopicMetadataRequest():
during topic creation, it seems that we can put the following line in a final
clause of try.
topicsMetadata += new TopicMetadata(topicMetadata.topic,
topicMetadata.partitionsMetadata, ErrorMapping.LeaderNotAvailableCode)
21.4 handleOffsetFetchRequest(): I am not sure if this should trigger the
creation of OffsetTopic. It seems to me that OffsetTopic should only be created
at CommitOffset time. If the OffsetTopic doesn't exist, we should probably just
return an offset of -1 to the caller.
22. OffsetManager:
22.1 OffsetAndMetadata: The timestamp in the offset value is not being used.
22.2 triggerLoadOffsets: Instead of doing
currOffset = currOffset + 1
we probably should do
currOffset = m.offset + 1
This is because offsets are not guaranteed to be consecutive when dedupe is
enabled.
22.3 Should we do the common statements like the following in a finally clause?
loading.remove(offsetsPartition) // unlock to prevent from blocking
offset fetch requests
23. ErrorMapping: Do we need a corresponding exception for OffsetsLoadingCode?
A couple of high level questions:
24. How do we clean up the offsets for those consumers that are gone? For
example, there could be many instances of console consumers that come and go.
We probably don't want to keep their offsets in either the in-memory table or
the offset log forever.
25. How do we plan to migrate existing consumers to this new offset storage? Do
we need to stop all consumer instances and do an offset import first or can we
do this online?
> Implement an Offset Manager and hook offset requests to it
> ----------------------------------------------------------
>
> Key: KAFKA-1012
> URL: https://issues.apache.org/jira/browse/KAFKA-1012
> Project: Kafka
> Issue Type: Sub-task
> Components: consumer
> Reporter: Tejas Patil
> Assignee: Tejas Patil
> Priority: Minor
> Attachments: KAFKA-1012.patch, KAFKA-1012-v2.patch
>
>
> After KAFKA-657, we have a protocol for consumers to commit and fetch offsets
> from brokers. Currently, consumers are not using this API and directly
> talking with Zookeeper.
> This Jira will involve following:
> 1. Add a special topic in kafka for storing offsets
> 2. Add an OffsetManager interface which would handle storing, accessing,
> loading and maintaining consumer offsets
> 3. Implement offset managers for both of these 2 choices : existing ZK based
> storage or inbuilt storage for offsets.
> 4. Leader brokers would now maintain an additional hash table of offsets for
> the group-topic-partitions that they lead
> 5. Consumers should now use the OffsetCommit and OffsetFetch API
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira