[ https://issues.apache.org/jira/browse/KAFKA-1910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14336985#comment-14336985 ]
Guozhang Wang edited comment on KAFKA-1910 at 3/3/15 12:45 AM: --------------------------------------------------------------- The uploaded patch contains multiple fixes to the related JIRAs as well as refactoring the new consumer itself. I will summarize them here instead of in the RB: 1. Fix ConsumerTest.testXXXwithBrokerFailure: in RestartDeadBroker we need to call startup() on the old brokers instead of creating new ones as the last approach will case the metadata to be mess up and cause the test to hang (KAFKA-1948). Also make sure the "test" topic is created with correct replication factor to avoid hanging when the only replica broker was shutdown. Also make the bouncing of the brokers in the background thread so that it will eventually be restarted. 2. Fix ConsumerTest's __consumer_offsets topic: when we call partitionFor() the __consumer_offsets topic may be created with replication as min(offsetTopicRaplicationFactor, aliveBrokers.size), see KAFKA-1864 for details (KAFKA-1975). 3. Add the IllegalGeneration logic in the coordinator as it is important for consumers rebalancing after rediscovering the coordinator, in the current stub it always return OK and hence consumers migrating to the new coordinator will not trigger rebalance (KAFKA-1964). 4. Create the Coodinator and the FetchManager modules as KafkaConsumer internals. Coordinator is responsible for assign partitions (join groups), commit offsets and fetch offsets from coordinator, and FetchManager is responsible for handling fetch request / responses. 4.1 After the refactoring it is easier to detect and fix a bug where response callbacks being triggered multiple times, causing the coordinator NPE (KAFKA-1969). 4.2 Avoid always trying to fetch offsets from coordinator whenever the consumer decides to update fetch positions, introduce a few new variables / APIs in SubscriptionState accordingly. 4.3 Move serializer / de-serializer configs / constructors to AbstractConfig. 4.4 Add missing error handling in commit offset / heartbeat responses. In general I think we should make notes about possible error codes in each of the response type to help coding error handling logic, has filed KAFKA-1985 for that. was (Author: guozhang): The uploaded patch contains multiple fixes to the related JIRAs as well as refactoring the new consumer itself. I will summarize them here instead of in the RB: 1. Fix ConsumerTest.testXXXwithBrokerFailure: in RestartDeadBroker we need to call startup() on the old brokers instead of creating new ones as the last approach will case the metadata to be mess up and cause the test to hang (KAFKA-1948). Also make sure the "test" topic is created with correct replication factor to avoid hanging when the only replica broker was shutdown. 2. Fix ConsumerTest's __consumer_offsets topic: when we call partitionFor() the __consumer_offsets topic may be created with replication as min(offsetTopicRaplicationFactor, aliveBrokers.size), see KAFKA-1864 for details (KAFKA-1975). 3. Add the IllegalGeneration logic in the coordinator as it is important for consumers rebalancing after rediscovering the coordinator, in the current stub it always return OK and hence consumers migrating to the new coordinator will not trigger rebalance (KAFKA-1964). 4. Create the Coodinator and the FetchManager modules as KafkaConsumer internals. Coordinator is responsible for assign partitions (join groups), commit offsets and fetch offsets from coordinator, and FetchManager is responsible for handling fetch request / responses. 4.1 After the refactoring it is easier to detect and fix a bug where response callbacks being triggered multiple times, causing the coordinator NPE (KAFKA-1969). 4.2 Avoid always trying to fetch offsets from coordinator whenever the consumer decides to update fetch positions, introduce a few new variables / APIs in SubscriptionState accordingly. 4.3 Move serializer / de-serializer configs / constructors to AbstractConfig. 4.4 Add missing error handling in commit offset / heartbeat responses. In general I think we should make notes about possible error codes in each of the response type to help coding error handling logic, has filed KAFKA-1985 for that. > Refactor KafkaConsumer > ---------------------- > > Key: KAFKA-1910 > URL: https://issues.apache.org/jira/browse/KAFKA-1910 > Project: Kafka > Issue Type: Sub-task > Components: consumer > Reporter: Guozhang Wang > Assignee: Guozhang Wang > > KafkaConsumer now contains all the logic on the consumer side, making it a > very huge class file, better re-factoring it to have multiple layers on top > of KafkaClient. -- This message was sent by Atlassian JIRA (v6.3.4#6332)