[ https://issues.apache.org/jira/browse/KAFKA-13563?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Luke Chen updated KAFKA-13563: ------------------------------ Description: In KAFKA-10793, we fix the race condition when lookup coordinator by clearing the _findCoordinatorFuture_ when handling the result, rather than in the listener callbacks. It works well under consumer group mode (i.e. Consumer#subscribe), but we found when user is using non consumer group mode (i.e. Consumer#assign) with group id provided (for offset commitment, so that there will be consumerCoordinator created), the _findCoordinatorFuture_ will never be cleared in some situations, and cause the offset committing keeps getting NOT_COORDINATOR error. After KAFKA-10793, we clear the _findCoordinatorFuture_ in 2 places: # heartbeat thread # AbstractCoordinator#ensureCoordinatorReady But in non consumer group mode with group id provided, there will be no (1)heartbeat thread , and it only call (2)AbstractCoordinator#ensureCoordinatorReady when 1st time consumer wants to fetch committed offset position. That is, after 2nd lookupCoordinator call, we have no chance to clear the _findCoordinatorFuture_ . To avoid the race condition as KAFKA-10793 mentioned, it's not safe to clear the _findCoordinatorFuture_ in the future listener. So, I think we can fix this issue by calling AbstractCoordinator#ensureCoordinatorReady when coordinator unknown in non consumer group case, under each Consumer#poll. Reproduce steps: 1. Start a 3 Broker cluster with a Topic having Replicas=3. 2. Start a Client with Producer and Consumer (with Consumer#assign(), not subscribe, and provide a group id) communicating over the Topic. 3. Stop the Broker that is acting as the Group Coordinator. 4. Observe successful Rediscovery of new Group Coordinator. 5. Restart the stopped Broker. 6. Stop the Broker that became the new Group Coordinator at step 4. 7. Observe "Rediscovery will be attempted" message but no "Discovered group coordinator" message. was: In KAFKA-10793, we fix the race condition when lookup coordinator by clearing the _findCoordinatorFuture_ when handling the result, rather than in the listener callbacks. It works well under consumer group mode (i.e. Consumer#subscribe), but we found when user is using non consumer group mode (i.e. Consumer#assign) with group id provided (for offset commitment, so that there will be consumerCoordinator created), the _findCoordinatorFuture_ will never be cleared in some situations, and cause the offset committing keeps getting NOT_COORDINATOR error. After KAFKA-10793, we clear the _findCoordinatorFuture_ has 2 places: # heartbeat thread # AbstractCoordinator#ensureCoordinatorReady But in non consumer group mode with group id provided, there will be no (1)heartbeat thread , and it only call (2)AbstractCoordinator#ensureCoordinatorReady when 1st time consumer wants to fetch committed offset position. That is, after 2nd lookupCoordinator call, we have no chance to clear the _findCoordinatorFuture_ . To avoid the race condition as KAFKA-10793 mentioned, it's not safe to clear the _findCoordinatorFuture_ in the future listener. So, I think we can fix this issue by calling AbstractCoordinator#ensureCoordinatorReady when coordinator unknown in non consumer group case, under each Consumer#poll. Reproduce steps: 1. Start a 3 Broker cluster with a Topic having Replicas=3. 2. Start a Client with Producer and Consumer (with Consumer#assign(), not subscribe, and provide a group id) communicating over the Topic. 3. Stop the Broker that is acting as the Group Coordinator. 4. Observe successful Rediscovery of new Group Coordinator. 5. Restart the stopped Broker. 6. Stop the Broker that became the new Group Coordinator at step 4. 7. Observe "Rediscovery will be attempted" message but no "Discovered group coordinator" message. > FindCoordinatorFuture never get cleared in non-group mode( consumer#assign) > --------------------------------------------------------------------------- > > Key: KAFKA-13563 > URL: https://issues.apache.org/jira/browse/KAFKA-13563 > Project: Kafka > Issue Type: Bug > Components: clients > Reporter: Luke Chen > Assignee: Luke Chen > Priority: Major > Attachments: kafka.zip > > > In KAFKA-10793, we fix the race condition when lookup coordinator by clearing > the _findCoordinatorFuture_ when handling the result, rather than in the > listener callbacks. It works well under consumer group mode (i.e. > Consumer#subscribe), but we found when user is using non consumer group mode > (i.e. Consumer#assign) with group id provided (for offset commitment, so that > there will be consumerCoordinator created), the _findCoordinatorFuture_ will > never be cleared in some situations, and cause the offset committing keeps > getting NOT_COORDINATOR error. > > After KAFKA-10793, we clear the _findCoordinatorFuture_ in 2 places: > # heartbeat thread > # AbstractCoordinator#ensureCoordinatorReady > But in non consumer group mode with group id provided, there will be no > (1)heartbeat thread , and it only call > (2)AbstractCoordinator#ensureCoordinatorReady when 1st time consumer wants to > fetch committed offset position. That is, after 2nd lookupCoordinator call, > we have no chance to clear the _findCoordinatorFuture_ . > > To avoid the race condition as KAFKA-10793 mentioned, it's not safe to clear > the _findCoordinatorFuture_ in the future listener. So, I think we can fix > this issue by calling AbstractCoordinator#ensureCoordinatorReady when > coordinator unknown in non consumer group case, under each Consumer#poll. > > Reproduce steps: > > 1. Start a 3 Broker cluster with a Topic having Replicas=3. > 2. Start a Client with Producer and Consumer (with Consumer#assign(), not > subscribe, and provide a group id) communicating over the Topic. > 3. Stop the Broker that is acting as the Group Coordinator. > 4. Observe successful Rediscovery of new Group Coordinator. > 5. Restart the stopped Broker. > 6. Stop the Broker that became the new Group Coordinator at step 4. > 7. Observe "Rediscovery will be attempted" message but no "Discovered group > coordinator" message. > > > -- This message was sent by Atlassian Jira (v8.20.1#820001)