[ https://issues.apache.org/jira/browse/KAFKA-13563?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Guozhang Wang resolved KAFKA-13563. ----------------------------------- Fix Version/s: 3.2.0 3.1.1 Resolution: Fixed > FindCoordinatorFuture never get cleared in non-group mode( consumer#assign) > --------------------------------------------------------------------------- > > Key: KAFKA-13563 > URL: https://issues.apache.org/jira/browse/KAFKA-13563 > Project: Kafka > Issue Type: Bug > Components: clients > Affects Versions: 2.7.1, 3.0.0 > Reporter: Luke Chen > Assignee: Luke Chen > Priority: Major > Fix For: 3.2.0, 3.1.1 > > Attachments: kafka.zip > > > In KAFKA-10793, we fix the race condition when lookup coordinator by clearing > the _findCoordinatorFuture_ when handling the result, rather than in the > listener callbacks. It works well under consumer group mode (i.e. > Consumer#subscribe), but we found when user is using non consumer group mode > (i.e. Consumer#assign) with group id provided (for offset commitment, so that > there will be consumerCoordinator created), the _findCoordinatorFuture_ will > never be cleared in some situations, and cause the offset committing keeps > getting NOT_COORDINATOR error. > > After KAFKA-10793, we clear the _findCoordinatorFuture_ in 2 places: > # heartbeat thread > # AbstractCoordinator#ensureCoordinatorReady > But in non consumer group mode with group id provided, there will be no > (1)heartbeat thread , and it only call > (2)AbstractCoordinator#ensureCoordinatorReady when 1st time consumer wants to > fetch committed offset position. That is, after 2nd lookupCoordinator call, > we have no chance to clear the _findCoordinatorFuture_ . > > To avoid the race condition as KAFKA-10793 mentioned, it's not safe to clear > the _findCoordinatorFuture_ in the future listener. So, I think we can fix > this issue by calling AbstractCoordinator#ensureCoordinatorReady when > coordinator unknown in non consumer group case, under each Consumer#poll. > > Reproduce steps: > > 1. Start a 3 Broker cluster with a Topic having Replicas=3. > 2. Start a Client with Producer and Consumer (with Consumer#assign(), not > subscribe, and provide a group id) communicating over the Topic. > 3. Stop the Broker that is acting as the Group Coordinator. > 4. Observe successful Rediscovery of new Group Coordinator. > 5. Restart the stopped Broker. > 6. Stop the Broker that became the new Group Coordinator at step 4. > 7. Observe "Rediscovery will be attempted" message but no "Discovered group > coordinator" message. > > > -- This message was sent by Atlassian Jira (v8.20.1#820001)