hachikuji commented on code in PR #12862:
URL: https://github.com/apache/kafka/pull/12862#discussion_r1042773463


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java:
##########
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.utils.ExponentialBackoff;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Handles the timing of the next FindCoordinatorRequest based on the {@link 
CoordinatorRequestState}. It checks for:
+ * 1. If there's an existing coordinator.
+ * 2. If there is an inflight request
+ * 3. If the backoff timer has expired
+ *
+ * The {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} 
contains either a wait
+ * timer, or a singleton list of
+ * {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest}.
+ *
+ * The FindCoordinatorResponse will be handled by the {@link 
FindCoordinatorRequestHandler} callback, which
+ * subsequently invokes {@code onResponse} to handle the exceptions and 
responses. Note that, the coordinator node
+ * will be marked {@code null} upon receiving a failure.
+ */
+public class CoordinatorRequestManager implements RequestManager {
+
+    private final Logger log;
+    private final Time time;
+    private final long requestTimeoutMs;
+    private final ErrorEventHandler errorHandler;
+    private final long rebalanceTimeoutMs;
+    private final String groupId;
+
+    private final CoordinatorRequestState coordinatorRequestState;
+    private long timeMarkedUnknownMs = -1L; // starting logging a warning only 
after unable to connect for a while
+    private Node coordinator;
+
+
+    public CoordinatorRequestManager(final Time time,
+                                     final LogContext logContext,
+                                     final ConsumerConfig config,
+                                     final ErrorEventHandler errorHandler,
+                                     final String groupId,
+                                     final long rebalanceTimeoutMs) {
+        Objects.requireNonNull(groupId);
+        this.time = time;
+        this.log = logContext.logger(this.getClass());
+        this.errorHandler = errorHandler;
+        this.groupId = groupId;
+        this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+        this.requestTimeoutMs = 
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
+        this.coordinatorRequestState = new CoordinatorRequestState(config);
+    }
+
+    // Visible for testing
+    CoordinatorRequestManager(final Time time,
+                              final LogContext logContext,
+                              final ErrorEventHandler errorHandler,
+                              final String groupId,
+                              final long rebalanceTimeoutMs,
+                              final long requestTimeoutMs,
+                              final CoordinatorRequestState 
coordinatorRequestState) {
+        Objects.requireNonNull(groupId);
+        this.time = time;
+        this.log = logContext.logger(this.getClass());
+        this.errorHandler = errorHandler;
+        this.groupId = groupId;
+        this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+        this.requestTimeoutMs = requestTimeoutMs;
+        this.coordinatorRequestState = coordinatorRequestState;
+    }
+
+    @Override
+    public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
+        if (this.coordinator != null) {
+            return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, 
Collections.emptyList());
+        }
+
+        if (coordinatorRequestState.canSendRequest(currentTimeMs)) {
+            NetworkClientDelegate.UnsentRequest request = 
makeFindCoordinatorRequest(currentTimeMs);
+            return new NetworkClientDelegate.PollResult(0, 
Collections.singletonList(request));
+        }
+
+        return new NetworkClientDelegate.PollResult(
+                coordinatorRequestState.remainingBackoffMs(currentTimeMs),
+                new ArrayList<>());
+    }
+
+    private NetworkClientDelegate.UnsentRequest 
makeFindCoordinatorRequest(final long currentTimeMs) {
+        coordinatorRequestState.updateLastSend(currentTimeMs);
+        FindCoordinatorRequestData data = new FindCoordinatorRequestData()
+                .setKeyType(FindCoordinatorRequest.CoordinatorType.GROUP.id())
+                .setKey(this.groupId);
+        return NetworkClientDelegate.UnsentRequest.makeUnsentRequest(
+                this.time.timer(requestTimeoutMs),
+                new FindCoordinatorRequest.Builder(data),
+                new FindCoordinatorRequestHandler());
+    }
+
+    /**
+     * Mark the current coordinator null and return the old coordinator. 
Return an empty Optional
+     * if the current coordinator is unknown.
+     * @param cause why the coordinator is marked unknown
+     * @return Optional coordinator node that can be null.
+     */
+    protected void markCoordinatorUnknown(final String cause, final long 
currentTimeMs) {
+        if (this.coordinator != null) {
+            log.info("Group coordinator {} is unavailable or invalid due to 
cause: {}. "
+                            + "Rediscovery will be attempted.", 
this.coordinator, cause);
+            this.coordinator = null;
+            timeMarkedUnknownMs = currentTimeMs;
+        } else {
+            long durationOfOngoingDisconnect = Math.max(0, currentTimeMs - 
timeMarkedUnknownMs);
+            if (durationOfOngoingDisconnect > this.rebalanceTimeoutMs)
+                log.debug("Consumer has been disconnected from the group 
coordinator for {}ms",
+                        durationOfOngoingDisconnect);
+        }
+    }
+
+    private void handleSuccessFindCoordinatorResponse(
+            final FindCoordinatorResponseData.Coordinator coordinator,
+            final long currentTimeMS) {
+        // use MAX_VALUE - node.id as the coordinator id to allow separate 
connections
+        // for the coordinator in the underlying network client layer
+        int coordinatorConnectionId = Integer.MAX_VALUE - coordinator.nodeId();
+
+        this.coordinator = new Node(
+                coordinatorConnectionId,
+                coordinator.host(),
+                coordinator.port());
+        log.info("Discovered group coordinator {}", coordinator);
+        coordinatorRequestState.reset();
+        return;
+    }
+
+    private void handleFailedCoordinatorResponse(
+            final FindCoordinatorResponseData.Coordinator response,
+            final long currentTimeMS) {
+        Errors error = Errors.forCode(response.errorCode());
+        log.debug("FindCoordinator request failed due to {}", 
error.toString());
+        coordinatorRequestState.updateLastFailedAttempt(currentTimeMS);
+        markCoordinatorUnknown("coordinator unavailable", currentTimeMS);
+        if (!(error.exception() instanceof RetriableException)) {
+            log.info("FindCoordinator request hit fatal exception", 
error.exception());
+            // Remember the exception if fatal so we can ensure
+            // it gets thrown by the main thread
+            errorHandler.handle(error.exception());
+            return;
+        }
+
+        if (error == Errors.GROUP_AUTHORIZATION_FAILED) {

Review Comment:
   Do we need to move this check before the one above?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java:
##########
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.utils.ExponentialBackoff;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Handles the timing of the next FindCoordinatorRequest based on the {@link 
CoordinatorRequestState}. It checks for:
+ * 1. If there's an existing coordinator.
+ * 2. If there is an inflight request
+ * 3. If the backoff timer has expired
+ *
+ * The {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} 
contains either a wait
+ * timer, or a singleton list of
+ * {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest}.
+ *
+ * The FindCoordinatorResponse will be handled by the {@link 
FindCoordinatorRequestHandler} callback, which
+ * subsequently invokes {@code onResponse} to handle the exceptions and 
responses. Note that, the coordinator node
+ * will be marked {@code null} upon receiving a failure.
+ */
+public class CoordinatorRequestManager implements RequestManager {
+
+    private final Logger log;
+    private final Time time;
+    private final long requestTimeoutMs;
+    private final ErrorEventHandler errorHandler;
+    private final long rebalanceTimeoutMs;
+    private final String groupId;
+
+    private final CoordinatorRequestState coordinatorRequestState;
+    private long timeMarkedUnknownMs = -1L; // starting logging a warning only 
after unable to connect for a while
+    private Node coordinator;
+
+
+    public CoordinatorRequestManager(final Time time,
+                                     final LogContext logContext,
+                                     final ConsumerConfig config,
+                                     final ErrorEventHandler errorHandler,
+                                     final String groupId,
+                                     final long rebalanceTimeoutMs) {
+        Objects.requireNonNull(groupId);
+        this.time = time;
+        this.log = logContext.logger(this.getClass());
+        this.errorHandler = errorHandler;
+        this.groupId = groupId;
+        this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+        this.requestTimeoutMs = 
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
+        this.coordinatorRequestState = new CoordinatorRequestState(config);
+    }
+
+    // Visible for testing
+    CoordinatorRequestManager(final Time time,
+                              final LogContext logContext,
+                              final ErrorEventHandler errorHandler,
+                              final String groupId,
+                              final long rebalanceTimeoutMs,
+                              final long requestTimeoutMs,
+                              final CoordinatorRequestState 
coordinatorRequestState) {
+        Objects.requireNonNull(groupId);
+        this.time = time;
+        this.log = logContext.logger(this.getClass());
+        this.errorHandler = errorHandler;
+        this.groupId = groupId;
+        this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+        this.requestTimeoutMs = requestTimeoutMs;
+        this.coordinatorRequestState = coordinatorRequestState;
+    }
+
+    @Override
+    public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
+        if (this.coordinator != null) {
+            return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, 
Collections.emptyList());
+        }
+
+        if (coordinatorRequestState.canSendRequest(currentTimeMs)) {
+            NetworkClientDelegate.UnsentRequest request = 
makeFindCoordinatorRequest(currentTimeMs);
+            return new NetworkClientDelegate.PollResult(0, 
Collections.singletonList(request));
+        }
+
+        return new NetworkClientDelegate.PollResult(
+                coordinatorRequestState.remainingBackoffMs(currentTimeMs),
+                new ArrayList<>());
+    }
+
+    private NetworkClientDelegate.UnsentRequest 
makeFindCoordinatorRequest(final long currentTimeMs) {
+        coordinatorRequestState.updateLastSend(currentTimeMs);
+        FindCoordinatorRequestData data = new FindCoordinatorRequestData()
+                .setKeyType(FindCoordinatorRequest.CoordinatorType.GROUP.id())
+                .setKey(this.groupId);
+        return NetworkClientDelegate.UnsentRequest.makeUnsentRequest(
+                this.time.timer(requestTimeoutMs),
+                new FindCoordinatorRequest.Builder(data),
+                new FindCoordinatorRequestHandler());
+    }
+
+    /**
+     * Mark the current coordinator null and return the old coordinator. 
Return an empty Optional
+     * if the current coordinator is unknown.
+     * @param cause why the coordinator is marked unknown
+     * @return Optional coordinator node that can be null.
+     */
+    protected void markCoordinatorUnknown(final String cause, final long 
currentTimeMs) {
+        if (this.coordinator != null) {
+            log.info("Group coordinator {} is unavailable or invalid due to 
cause: {}. "
+                            + "Rediscovery will be attempted.", 
this.coordinator, cause);
+            this.coordinator = null;
+            timeMarkedUnknownMs = currentTimeMs;
+        } else {
+            long durationOfOngoingDisconnect = Math.max(0, currentTimeMs - 
timeMarkedUnknownMs);
+            if (durationOfOngoingDisconnect > this.rebalanceTimeoutMs)
+                log.debug("Consumer has been disconnected from the group 
coordinator for {}ms",
+                        durationOfOngoingDisconnect);
+        }
+    }
+
+    private void handleSuccessFindCoordinatorResponse(
+            final FindCoordinatorResponseData.Coordinator coordinator,
+            final long currentTimeMS) {
+        // use MAX_VALUE - node.id as the coordinator id to allow separate 
connections
+        // for the coordinator in the underlying network client layer
+        int coordinatorConnectionId = Integer.MAX_VALUE - coordinator.nodeId();
+
+        this.coordinator = new Node(
+                coordinatorConnectionId,
+                coordinator.host(),
+                coordinator.port());
+        log.info("Discovered group coordinator {}", coordinator);
+        coordinatorRequestState.reset();
+        return;
+    }
+
+    private void handleFailedCoordinatorResponse(
+            final FindCoordinatorResponseData.Coordinator response,
+            final long currentTimeMS) {
+        Errors error = Errors.forCode(response.errorCode());
+        log.debug("FindCoordinator request failed due to {}", 
error.toString());
+        coordinatorRequestState.updateLastFailedAttempt(currentTimeMS);
+        markCoordinatorUnknown("coordinator unavailable", currentTimeMS);
+        if (!(error.exception() instanceof RetriableException)) {
+            log.info("FindCoordinator request hit fatal exception", 
error.exception());
+            // Remember the exception if fatal so we can ensure
+            // it gets thrown by the main thread
+            errorHandler.handle(error.exception());
+            return;
+        }
+
+        if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
+            
errorHandler.handle(GroupAuthorizationException.forGroupId(this.groupId));
+            return;
+        }
+
+        log.debug("Group coordinator lookup failed: {}", 
response.errorMessage());

Review Comment:
   This log message seems redundant given the one above.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java:
##########
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.utils.ExponentialBackoff;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Handles the timing of the next FindCoordinatorRequest based on the {@link 
CoordinatorRequestState}. It checks for:
+ * 1. If there's an existing coordinator.
+ * 2. If there is an inflight request
+ * 3. If the backoff timer has expired
+ *
+ * The {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} 
contains either a wait
+ * timer, or a singleton list of
+ * {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest}.
+ *
+ * The FindCoordinatorResponse will be handled by the {@link 
FindCoordinatorRequestHandler} callback, which
+ * subsequently invokes {@code onResponse} to handle the exceptions and 
responses. Note that, the coordinator node
+ * will be marked {@code null} upon receiving a failure.
+ */
+public class CoordinatorRequestManager implements RequestManager {
+
+    private final Logger log;
+    private final Time time;
+    private final long requestTimeoutMs;
+    private final ErrorEventHandler errorHandler;
+    private final long rebalanceTimeoutMs;
+    private final String groupId;
+
+    private final CoordinatorRequestState coordinatorRequestState;
+    private long timeMarkedUnknownMs = -1L; // starting logging a warning only 
after unable to connect for a while
+    private Node coordinator;
+
+
+    public CoordinatorRequestManager(final Time time,
+                                     final LogContext logContext,
+                                     final ConsumerConfig config,
+                                     final ErrorEventHandler errorHandler,
+                                     final String groupId,
+                                     final long rebalanceTimeoutMs) {
+        Objects.requireNonNull(groupId);
+        this.time = time;
+        this.log = logContext.logger(this.getClass());
+        this.errorHandler = errorHandler;
+        this.groupId = groupId;
+        this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+        this.requestTimeoutMs = 
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
+        this.coordinatorRequestState = new CoordinatorRequestState(config);
+    }
+
+    // Visible for testing
+    CoordinatorRequestManager(final Time time,
+                              final LogContext logContext,
+                              final ErrorEventHandler errorHandler,
+                              final String groupId,
+                              final long rebalanceTimeoutMs,
+                              final long requestTimeoutMs,
+                              final CoordinatorRequestState 
coordinatorRequestState) {
+        Objects.requireNonNull(groupId);
+        this.time = time;
+        this.log = logContext.logger(this.getClass());
+        this.errorHandler = errorHandler;
+        this.groupId = groupId;
+        this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+        this.requestTimeoutMs = requestTimeoutMs;
+        this.coordinatorRequestState = coordinatorRequestState;
+    }
+
+    @Override
+    public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
+        if (this.coordinator != null) {
+            return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, 
Collections.emptyList());
+        }
+
+        if (coordinatorRequestState.canSendRequest(currentTimeMs)) {
+            NetworkClientDelegate.UnsentRequest request = 
makeFindCoordinatorRequest(currentTimeMs);
+            return new NetworkClientDelegate.PollResult(0, 
Collections.singletonList(request));
+        }
+
+        return new NetworkClientDelegate.PollResult(
+                coordinatorRequestState.remainingBackoffMs(currentTimeMs),
+                new ArrayList<>());
+    }
+
+    private NetworkClientDelegate.UnsentRequest 
makeFindCoordinatorRequest(final long currentTimeMs) {
+        coordinatorRequestState.updateLastSend(currentTimeMs);
+        FindCoordinatorRequestData data = new FindCoordinatorRequestData()
+                .setKeyType(FindCoordinatorRequest.CoordinatorType.GROUP.id())
+                .setKey(this.groupId);
+        return NetworkClientDelegate.UnsentRequest.makeUnsentRequest(
+                this.time.timer(requestTimeoutMs),
+                new FindCoordinatorRequest.Builder(data),
+                new FindCoordinatorRequestHandler());
+    }
+
+    /**
+     * Mark the current coordinator null and return the old coordinator. 
Return an empty Optional
+     * if the current coordinator is unknown.
+     * @param cause why the coordinator is marked unknown
+     * @return Optional coordinator node that can be null.
+     */
+    protected void markCoordinatorUnknown(final String cause, final long 
currentTimeMs) {
+        if (this.coordinator != null) {
+            log.info("Group coordinator {} is unavailable or invalid due to 
cause: {}. "
+                            + "Rediscovery will be attempted.", 
this.coordinator, cause);
+            this.coordinator = null;
+            timeMarkedUnknownMs = currentTimeMs;
+        } else {
+            long durationOfOngoingDisconnect = Math.max(0, currentTimeMs - 
timeMarkedUnknownMs);
+            if (durationOfOngoingDisconnect > this.rebalanceTimeoutMs)
+                log.debug("Consumer has been disconnected from the group 
coordinator for {}ms",
+                        durationOfOngoingDisconnect);
+        }
+    }
+
+    private void handleSuccessFindCoordinatorResponse(
+            final FindCoordinatorResponseData.Coordinator coordinator,
+            final long currentTimeMS) {
+        // use MAX_VALUE - node.id as the coordinator id to allow separate 
connections
+        // for the coordinator in the underlying network client layer
+        int coordinatorConnectionId = Integer.MAX_VALUE - coordinator.nodeId();
+
+        this.coordinator = new Node(
+                coordinatorConnectionId,
+                coordinator.host(),
+                coordinator.port());
+        log.info("Discovered group coordinator {}", coordinator);
+        coordinatorRequestState.reset();
+        return;
+    }
+
+    private void handleFailedCoordinatorResponse(
+            final FindCoordinatorResponseData.Coordinator response,
+            final long currentTimeMS) {
+        Errors error = Errors.forCode(response.errorCode());
+        log.debug("FindCoordinator request failed due to {}", 
error.toString());
+        coordinatorRequestState.updateLastFailedAttempt(currentTimeMS);
+        markCoordinatorUnknown("coordinator unavailable", currentTimeMS);
+        if (!(error.exception() instanceof RetriableException)) {
+            log.info("FindCoordinator request hit fatal exception", 
error.exception());
+            // Remember the exception if fatal so we can ensure
+            // it gets thrown by the main thread
+            errorHandler.handle(error.exception());
+            return;
+        }
+
+        if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
+            
errorHandler.handle(GroupAuthorizationException.forGroupId(this.groupId));
+            return;
+        }
+
+        log.debug("Group coordinator lookup failed: {}", 
response.errorMessage());
+        errorHandler.handle(error.exception());
+
+        log.debug("Coordinator discovery failed, refreshing metadata", 
error.exception());
+    }
+
+    public void onResponse(final FindCoordinatorResponse response, Throwable 
t) {
+        long currentTimeMs = time.milliseconds();

Review Comment:
   To be consistent, why don't we pass this through `onResponse`? We can use 
the `receivedTimeMs` from the `ClientResponse` object.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java:
##########
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.utils.ExponentialBackoff;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Handles the timing of the next FindCoordinatorRequest based on the {@link 
CoordinatorRequestState}. It checks for:
+ * 1. If there's an existing coordinator.
+ * 2. If there is an inflight request
+ * 3. If the backoff timer has expired
+ *
+ * The {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} 
contains either a wait
+ * timer, or a singleton list of
+ * {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest}.
+ *
+ * The FindCoordinatorResponse will be handled by the {@link 
FindCoordinatorRequestHandler} callback, which
+ * subsequently invokes {@code onResponse} to handle the exceptions and 
responses. Note that, the coordinator node
+ * will be marked {@code null} upon receiving a failure.
+ */
+public class CoordinatorRequestManager implements RequestManager {
+
+    private final Logger log;
+    private final Time time;
+    private final long requestTimeoutMs;
+    private final ErrorEventHandler errorHandler;
+    private final long rebalanceTimeoutMs;
+    private final String groupId;
+
+    private final CoordinatorRequestState coordinatorRequestState;
+    private long timeMarkedUnknownMs = -1L; // starting logging a warning only 
after unable to connect for a while
+    private Node coordinator;
+
+
+    public CoordinatorRequestManager(final Time time,
+                                     final LogContext logContext,
+                                     final ConsumerConfig config,
+                                     final ErrorEventHandler errorHandler,
+                                     final String groupId,
+                                     final long rebalanceTimeoutMs) {
+        Objects.requireNonNull(groupId);
+        this.time = time;
+        this.log = logContext.logger(this.getClass());
+        this.errorHandler = errorHandler;
+        this.groupId = groupId;
+        this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+        this.requestTimeoutMs = 
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
+        this.coordinatorRequestState = new CoordinatorRequestState(config);
+    }
+
+    // Visible for testing
+    CoordinatorRequestManager(final Time time,
+                              final LogContext logContext,
+                              final ErrorEventHandler errorHandler,
+                              final String groupId,
+                              final long rebalanceTimeoutMs,
+                              final long requestTimeoutMs,
+                              final CoordinatorRequestState 
coordinatorRequestState) {
+        Objects.requireNonNull(groupId);
+        this.time = time;
+        this.log = logContext.logger(this.getClass());
+        this.errorHandler = errorHandler;
+        this.groupId = groupId;
+        this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+        this.requestTimeoutMs = requestTimeoutMs;
+        this.coordinatorRequestState = coordinatorRequestState;
+    }
+
+    @Override
+    public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
+        if (this.coordinator != null) {
+            return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, 
Collections.emptyList());
+        }
+
+        if (coordinatorRequestState.canSendRequest(currentTimeMs)) {
+            NetworkClientDelegate.UnsentRequest request = 
makeFindCoordinatorRequest(currentTimeMs);
+            return new NetworkClientDelegate.PollResult(0, 
Collections.singletonList(request));
+        }
+
+        return new NetworkClientDelegate.PollResult(
+                coordinatorRequestState.remainingBackoffMs(currentTimeMs),
+                new ArrayList<>());
+    }
+
+    private NetworkClientDelegate.UnsentRequest 
makeFindCoordinatorRequest(final long currentTimeMs) {
+        coordinatorRequestState.updateLastSend(currentTimeMs);
+        FindCoordinatorRequestData data = new FindCoordinatorRequestData()
+                .setKeyType(FindCoordinatorRequest.CoordinatorType.GROUP.id())
+                .setKey(this.groupId);
+        return NetworkClientDelegate.UnsentRequest.makeUnsentRequest(
+                this.time.timer(requestTimeoutMs),
+                new FindCoordinatorRequest.Builder(data),
+                new FindCoordinatorRequestHandler());
+    }
+
+    /**
+     * Mark the current coordinator null and return the old coordinator. 
Return an empty Optional
+     * if the current coordinator is unknown.
+     * @param cause why the coordinator is marked unknown
+     * @return Optional coordinator node that can be null.
+     */
+    protected void markCoordinatorUnknown(final String cause, final long 
currentTimeMs) {
+        if (this.coordinator != null) {
+            log.info("Group coordinator {} is unavailable or invalid due to 
cause: {}. "
+                            + "Rediscovery will be attempted.", 
this.coordinator, cause);
+            this.coordinator = null;
+            timeMarkedUnknownMs = currentTimeMs;
+        } else {
+            long durationOfOngoingDisconnect = Math.max(0, currentTimeMs - 
timeMarkedUnknownMs);
+            if (durationOfOngoingDisconnect > this.rebalanceTimeoutMs)
+                log.debug("Consumer has been disconnected from the group 
coordinator for {}ms",
+                        durationOfOngoingDisconnect);
+        }
+    }
+
+    private void handleSuccessFindCoordinatorResponse(
+            final FindCoordinatorResponseData.Coordinator coordinator,
+            final long currentTimeMS) {
+        // use MAX_VALUE - node.id as the coordinator id to allow separate 
connections
+        // for the coordinator in the underlying network client layer
+        int coordinatorConnectionId = Integer.MAX_VALUE - coordinator.nodeId();
+
+        this.coordinator = new Node(
+                coordinatorConnectionId,
+                coordinator.host(),
+                coordinator.port());
+        log.info("Discovered group coordinator {}", coordinator);
+        coordinatorRequestState.reset();
+        return;
+    }
+
+    private void handleFailedCoordinatorResponse(
+            final FindCoordinatorResponseData.Coordinator response,
+            final long currentTimeMS) {

Review Comment:
   nit: `currentTimeMs`?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java:
##########
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.utils.ExponentialBackoff;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Handles the timing of the next FindCoordinatorRequest based on the {@link 
CoordinatorRequestState}. It checks for:
+ * 1. If there's an existing coordinator.
+ * 2. If there is an inflight request
+ * 3. If the backoff timer has expired
+ *
+ * The {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} 
contains either a wait
+ * timer, or a singleton list of
+ * {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest}.
+ *
+ * The FindCoordinatorResponse will be handled by the {@link 
FindCoordinatorRequestHandler} callback, which
+ * subsequently invokes {@code onResponse} to handle the exceptions and 
responses. Note that, the coordinator node
+ * will be marked {@code null} upon receiving a failure.
+ */
+public class CoordinatorRequestManager implements RequestManager {
+
+    private final Logger log;
+    private final Time time;
+    private final long requestTimeoutMs;
+    private final ErrorEventHandler errorHandler;
+    private final long rebalanceTimeoutMs;
+    private final String groupId;
+
+    private final CoordinatorRequestState coordinatorRequestState;
+    private long timeMarkedUnknownMs = -1L; // starting logging a warning only 
after unable to connect for a while
+    private Node coordinator;
+
+
+    public CoordinatorRequestManager(final Time time,
+                                     final LogContext logContext,
+                                     final ConsumerConfig config,
+                                     final ErrorEventHandler errorHandler,
+                                     final String groupId,
+                                     final long rebalanceTimeoutMs) {
+        Objects.requireNonNull(groupId);
+        this.time = time;
+        this.log = logContext.logger(this.getClass());
+        this.errorHandler = errorHandler;
+        this.groupId = groupId;
+        this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+        this.requestTimeoutMs = 
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
+        this.coordinatorRequestState = new CoordinatorRequestState(config);
+    }
+
+    // Visible for testing
+    CoordinatorRequestManager(final Time time,
+                              final LogContext logContext,
+                              final ErrorEventHandler errorHandler,
+                              final String groupId,
+                              final long rebalanceTimeoutMs,
+                              final long requestTimeoutMs,
+                              final CoordinatorRequestState 
coordinatorRequestState) {
+        Objects.requireNonNull(groupId);
+        this.time = time;
+        this.log = logContext.logger(this.getClass());
+        this.errorHandler = errorHandler;
+        this.groupId = groupId;
+        this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+        this.requestTimeoutMs = requestTimeoutMs;
+        this.coordinatorRequestState = coordinatorRequestState;
+    }
+
+    @Override
+    public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
+        if (this.coordinator != null) {
+            return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, 
Collections.emptyList());
+        }
+
+        if (coordinatorRequestState.canSendRequest(currentTimeMs)) {
+            NetworkClientDelegate.UnsentRequest request = 
makeFindCoordinatorRequest(currentTimeMs);
+            return new NetworkClientDelegate.PollResult(0, 
Collections.singletonList(request));
+        }
+
+        return new NetworkClientDelegate.PollResult(
+                coordinatorRequestState.remainingBackoffMs(currentTimeMs),
+                new ArrayList<>());
+    }
+
+    private NetworkClientDelegate.UnsentRequest 
makeFindCoordinatorRequest(final long currentTimeMs) {
+        coordinatorRequestState.updateLastSend(currentTimeMs);
+        FindCoordinatorRequestData data = new FindCoordinatorRequestData()
+                .setKeyType(FindCoordinatorRequest.CoordinatorType.GROUP.id())
+                .setKey(this.groupId);
+        return NetworkClientDelegate.UnsentRequest.makeUnsentRequest(
+                this.time.timer(requestTimeoutMs),
+                new FindCoordinatorRequest.Builder(data),
+                new FindCoordinatorRequestHandler());
+    }
+
+    /**
+     * Mark the current coordinator null and return the old coordinator. 
Return an empty Optional
+     * if the current coordinator is unknown.
+     * @param cause why the coordinator is marked unknown
+     * @return Optional coordinator node that can be null.
+     */
+    protected void markCoordinatorUnknown(final String cause, final long 
currentTimeMs) {
+        if (this.coordinator != null) {
+            log.info("Group coordinator {} is unavailable or invalid due to 
cause: {}. "
+                            + "Rediscovery will be attempted.", 
this.coordinator, cause);
+            this.coordinator = null;
+            timeMarkedUnknownMs = currentTimeMs;
+        } else {
+            long durationOfOngoingDisconnect = Math.max(0, currentTimeMs - 
timeMarkedUnknownMs);
+            if (durationOfOngoingDisconnect > this.rebalanceTimeoutMs)
+                log.debug("Consumer has been disconnected from the group 
coordinator for {}ms",
+                        durationOfOngoingDisconnect);
+        }
+    }
+
+    private void handleSuccessFindCoordinatorResponse(
+            final FindCoordinatorResponseData.Coordinator coordinator,
+            final long currentTimeMS) {
+        // use MAX_VALUE - node.id as the coordinator id to allow separate 
connections
+        // for the coordinator in the underlying network client layer
+        int coordinatorConnectionId = Integer.MAX_VALUE - coordinator.nodeId();
+
+        this.coordinator = new Node(
+                coordinatorConnectionId,
+                coordinator.host(),
+                coordinator.port());
+        log.info("Discovered group coordinator {}", coordinator);
+        coordinatorRequestState.reset();
+        return;
+    }
+
+    private void handleFailedCoordinatorResponse(
+            final FindCoordinatorResponseData.Coordinator response,
+            final long currentTimeMS) {
+        Errors error = Errors.forCode(response.errorCode());
+        log.debug("FindCoordinator request failed due to {}", 
error.toString());
+        coordinatorRequestState.updateLastFailedAttempt(currentTimeMS);
+        markCoordinatorUnknown("coordinator unavailable", currentTimeMS);
+        if (!(error.exception() instanceof RetriableException)) {
+            log.info("FindCoordinator request hit fatal exception", 
error.exception());
+            // Remember the exception if fatal so we can ensure
+            // it gets thrown by the main thread
+            errorHandler.handle(error.exception());
+            return;
+        }
+
+        if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
+            
errorHandler.handle(GroupAuthorizationException.forGroupId(this.groupId));
+            return;
+        }
+
+        log.debug("Group coordinator lookup failed: {}", 
response.errorMessage());
+        errorHandler.handle(error.exception());
+
+        log.debug("Coordinator discovery failed, refreshing metadata", 
error.exception());
+    }
+
+    public void onResponse(final FindCoordinatorResponse response, Throwable 
t) {
+        long currentTimeMs = time.milliseconds();
+        // handle Runtime exception
+        if (t != null) {
+            log.error("FindCoordinator request failed due to {}", 
t.getMessage());
+            markCoordinatorUnknown("coordinator unavailable", currentTimeMs);
+            return;
+        }
+
+        List<FindCoordinatorResponseData.Coordinator> coordinators = 
response.getCoordinatorByKey(this.groupId);
+        if (coordinators.size() != 1) {
+            coordinatorRequestState.updateLastFailedAttempt(currentTimeMs);
+            String msg = String.format("Group coordinator lookup failed: 
Response should contain exactly one " +
+                    "coordinator, it has %d", coordinators.size());
+            log.error(msg);
+            errorHandler.handle(new IllegalStateException(msg));

Review Comment:
   In the error case, we don't need the response, just an `Exception`. Why 
don't we simplify this a little:
   ```java
    private void handleFailedCoordinatorResponse(
               final Exception exception,
               final long currentTimeMS
   )
   ```
   
   Then here, we can just pass the `IllegalStateException` to 
`handleFailedCoordinatorResponse`.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java:
##########
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.utils.ExponentialBackoff;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Handles the timing of the next FindCoordinatorRequest based on the {@link 
CoordinatorRequestState}. It checks for:
+ * 1. If there's an existing coordinator.
+ * 2. If there is an inflight request
+ * 3. If the backoff timer has expired
+ *
+ * The {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} 
contains either a wait
+ * timer, or a singleton list of
+ * {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest}.
+ *
+ * The FindCoordinatorResponse will be handled by the {@link 
FindCoordinatorRequestHandler} callback, which
+ * subsequently invokes {@code onResponse} to handle the exceptions and 
responses. Note that, the coordinator node
+ * will be marked {@code null} upon receiving a failure.
+ */
+public class CoordinatorRequestManager implements RequestManager {
+
+    private final Logger log;
+    private final Time time;
+    private final long requestTimeoutMs;
+    private final ErrorEventHandler errorHandler;
+    private final long rebalanceTimeoutMs;
+    private final String groupId;
+
+    private final CoordinatorRequestState coordinatorRequestState;
+    private long timeMarkedUnknownMs = -1L; // starting logging a warning only 
after unable to connect for a while
+    private Node coordinator;
+
+
+    public CoordinatorRequestManager(final Time time,
+                                     final LogContext logContext,
+                                     final ConsumerConfig config,
+                                     final ErrorEventHandler errorHandler,
+                                     final String groupId,
+                                     final long rebalanceTimeoutMs) {
+        Objects.requireNonNull(groupId);
+        this.time = time;
+        this.log = logContext.logger(this.getClass());
+        this.errorHandler = errorHandler;
+        this.groupId = groupId;
+        this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+        this.requestTimeoutMs = 
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
+        this.coordinatorRequestState = new CoordinatorRequestState(config);
+    }
+
+    // Visible for testing
+    CoordinatorRequestManager(final Time time,
+                              final LogContext logContext,
+                              final ErrorEventHandler errorHandler,
+                              final String groupId,
+                              final long rebalanceTimeoutMs,
+                              final long requestTimeoutMs,
+                              final CoordinatorRequestState 
coordinatorRequestState) {
+        Objects.requireNonNull(groupId);
+        this.time = time;
+        this.log = logContext.logger(this.getClass());
+        this.errorHandler = errorHandler;
+        this.groupId = groupId;
+        this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+        this.requestTimeoutMs = requestTimeoutMs;
+        this.coordinatorRequestState = coordinatorRequestState;
+    }
+
+    @Override
+    public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
+        if (this.coordinator != null) {
+            return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, 
Collections.emptyList());
+        }
+
+        if (coordinatorRequestState.canSendRequest(currentTimeMs)) {
+            NetworkClientDelegate.UnsentRequest request = 
makeFindCoordinatorRequest(currentTimeMs);
+            return new NetworkClientDelegate.PollResult(0, 
Collections.singletonList(request));
+        }
+
+        return new NetworkClientDelegate.PollResult(
+                coordinatorRequestState.remainingBackoffMs(currentTimeMs),
+                new ArrayList<>());
+    }
+
+    private NetworkClientDelegate.UnsentRequest 
makeFindCoordinatorRequest(final long currentTimeMs) {
+        coordinatorRequestState.updateLastSend(currentTimeMs);
+        FindCoordinatorRequestData data = new FindCoordinatorRequestData()
+                .setKeyType(FindCoordinatorRequest.CoordinatorType.GROUP.id())
+                .setKey(this.groupId);
+        return NetworkClientDelegate.UnsentRequest.makeUnsentRequest(
+                this.time.timer(requestTimeoutMs),
+                new FindCoordinatorRequest.Builder(data),
+                new FindCoordinatorRequestHandler());
+    }
+
+    /**
+     * Mark the current coordinator null and return the old coordinator. 
Return an empty Optional
+     * if the current coordinator is unknown.
+     * @param cause why the coordinator is marked unknown
+     * @return Optional coordinator node that can be null.
+     */
+    protected void markCoordinatorUnknown(final String cause, final long 
currentTimeMs) {
+        if (this.coordinator != null) {
+            log.info("Group coordinator {} is unavailable or invalid due to 
cause: {}. "
+                            + "Rediscovery will be attempted.", 
this.coordinator, cause);
+            this.coordinator = null;
+            timeMarkedUnknownMs = currentTimeMs;
+        } else {
+            long durationOfOngoingDisconnect = Math.max(0, currentTimeMs - 
timeMarkedUnknownMs);
+            if (durationOfOngoingDisconnect > this.rebalanceTimeoutMs)
+                log.debug("Consumer has been disconnected from the group 
coordinator for {}ms",
+                        durationOfOngoingDisconnect);
+        }
+    }
+
+    private void handleSuccessFindCoordinatorResponse(
+            final FindCoordinatorResponseData.Coordinator coordinator,
+            final long currentTimeMS) {
+        // use MAX_VALUE - node.id as the coordinator id to allow separate 
connections
+        // for the coordinator in the underlying network client layer
+        int coordinatorConnectionId = Integer.MAX_VALUE - coordinator.nodeId();
+
+        this.coordinator = new Node(
+                coordinatorConnectionId,
+                coordinator.host(),
+                coordinator.port());
+        log.info("Discovered group coordinator {}", coordinator);
+        coordinatorRequestState.reset();
+        return;
+    }
+
+    private void handleFailedCoordinatorResponse(
+            final FindCoordinatorResponseData.Coordinator response,
+            final long currentTimeMS) {
+        Errors error = Errors.forCode(response.errorCode());
+        log.debug("FindCoordinator request failed due to {}", 
error.toString());
+        coordinatorRequestState.updateLastFailedAttempt(currentTimeMS);
+        markCoordinatorUnknown("coordinator unavailable", currentTimeMS);
+        if (!(error.exception() instanceof RetriableException)) {
+            log.info("FindCoordinator request hit fatal exception", 
error.exception());
+            // Remember the exception if fatal so we can ensure
+            // it gets thrown by the main thread
+            errorHandler.handle(error.exception());
+            return;
+        }
+
+        if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
+            
errorHandler.handle(GroupAuthorizationException.forGroupId(this.groupId));
+            return;
+        }
+
+        log.debug("Group coordinator lookup failed: {}", 
response.errorMessage());
+        errorHandler.handle(error.exception());
+
+        log.debug("Coordinator discovery failed, refreshing metadata", 
error.exception());
+    }
+
+    public void onResponse(final FindCoordinatorResponse response, Throwable 
t) {
+        long currentTimeMs = time.milliseconds();
+        // handle Runtime exception
+        if (t != null) {
+            log.error("FindCoordinator request failed due to {}", 
t.getMessage());
+            markCoordinatorUnknown("coordinator unavailable", currentTimeMs);
+            return;
+        }
+
+        List<FindCoordinatorResponseData.Coordinator> coordinators = 
response.getCoordinatorByKey(this.groupId);
+        if (coordinators.size() != 1) {
+            coordinatorRequestState.updateLastFailedAttempt(currentTimeMs);
+            String msg = String.format("Group coordinator lookup failed: 
Response should contain exactly one " +
+                    "coordinator, it has %d", coordinators.size());
+            log.error(msg);
+            errorHandler.handle(new IllegalStateException(msg));
+            return;
+        }
+
+        FindCoordinatorResponseData.Coordinator node = coordinators.get(0);
+        if (node.errorCode() != Errors.NONE.code()) {
+            coordinatorRequestState.updateLastFailedAttempt(currentTimeMs);
+            handleFailedCoordinatorResponse(node, currentTimeMs);
+            return;
+        }
+        handleSuccessFindCoordinatorResponse(node, currentTimeMs);
+    }
+
+    public Node coordinator() {
+        return this.coordinator;
+    }
+
+    // Visible for testing
+    static class CoordinatorRequestState {
+        final static int RECONNECT_BACKOFF_EXP_BASE = 2;
+        final static double RECONNECT_BACKOFF_JITTER = 0.2;
+        private final ExponentialBackoff exponentialBackoff;
+        private long lastSentMs = -1;
+        private long lastReceivedMs = -1;
+        private int numAttempts = 0;
+        private long backoffMs = 0;
+
+        public CoordinatorRequestState(ConsumerConfig config) {
+            this.exponentialBackoff = new ExponentialBackoff(
+                    config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG),
+                    RECONNECT_BACKOFF_EXP_BASE,
+                    
config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG),
+                    RECONNECT_BACKOFF_JITTER);
+        }
+
+        // Visible for testing
+        CoordinatorRequestState(final int reconnectBackoffMs,
+                                final int reconnectBackoffExpBase,
+                                final int reconnectBackoffMaxMs,
+                                final int jitter) {
+            this.exponentialBackoff = new ExponentialBackoff(
+                    reconnectBackoffMs,
+                    reconnectBackoffExpBase,
+                    reconnectBackoffMaxMs,
+                    jitter);
+        }
+
+        public void reset() {
+            this.lastSentMs = -1;
+            this.lastReceivedMs = -1;
+            this.numAttempts = 0;
+            this.backoffMs = exponentialBackoff.backoff(0);
+        }
+
+        public boolean canSendRequest(final long currentTimeMs) {
+            if (this.lastSentMs == -1) {
+                // no request has been sent
+                return true;
+            }
+
+            // TODO: I think there's a case when we want to resend the 
FindCoordinator when we haven't received

Review Comment:
   What case is this?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java:
##########
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.utils.ExponentialBackoff;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Handles the timing of the next FindCoordinatorRequest based on the {@link 
CoordinatorRequestState}. It checks for:
+ * 1. If there's an existing coordinator.
+ * 2. If there is an inflight request
+ * 3. If the backoff timer has expired
+ *
+ * The {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} 
contains either a wait
+ * timer, or a singleton list of
+ * {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest}.
+ *
+ * The FindCoordinatorResponse will be handled by the {@link 
FindCoordinatorRequestHandler} callback, which
+ * subsequently invokes {@code onResponse} to handle the exceptions and 
responses. Note that, the coordinator node
+ * will be marked {@code null} upon receiving a failure.
+ */
+public class CoordinatorRequestManager implements RequestManager {
+
+    private final Logger log;
+    private final Time time;
+    private final long requestTimeoutMs;
+    private final ErrorEventHandler errorHandler;
+    private final long rebalanceTimeoutMs;
+    private final String groupId;
+
+    private final CoordinatorRequestState coordinatorRequestState;
+    private long timeMarkedUnknownMs = -1L; // starting logging a warning only 
after unable to connect for a while
+    private Node coordinator;
+
+
+    public CoordinatorRequestManager(final Time time,
+                                     final LogContext logContext,
+                                     final ConsumerConfig config,
+                                     final ErrorEventHandler errorHandler,
+                                     final String groupId,
+                                     final long rebalanceTimeoutMs) {
+        Objects.requireNonNull(groupId);
+        this.time = time;
+        this.log = logContext.logger(this.getClass());
+        this.errorHandler = errorHandler;
+        this.groupId = groupId;
+        this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+        this.requestTimeoutMs = 
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
+        this.coordinatorRequestState = new CoordinatorRequestState(config);
+    }
+
+    // Visible for testing
+    CoordinatorRequestManager(final Time time,
+                              final LogContext logContext,
+                              final ErrorEventHandler errorHandler,
+                              final String groupId,
+                              final long rebalanceTimeoutMs,
+                              final long requestTimeoutMs,
+                              final CoordinatorRequestState 
coordinatorRequestState) {
+        Objects.requireNonNull(groupId);
+        this.time = time;
+        this.log = logContext.logger(this.getClass());
+        this.errorHandler = errorHandler;
+        this.groupId = groupId;
+        this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+        this.requestTimeoutMs = requestTimeoutMs;
+        this.coordinatorRequestState = coordinatorRequestState;
+    }
+
+    @Override
+    public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
+        if (this.coordinator != null) {
+            return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, 
Collections.emptyList());
+        }
+
+        if (coordinatorRequestState.canSendRequest(currentTimeMs)) {
+            NetworkClientDelegate.UnsentRequest request = 
makeFindCoordinatorRequest(currentTimeMs);
+            return new NetworkClientDelegate.PollResult(0, 
Collections.singletonList(request));
+        }
+
+        return new NetworkClientDelegate.PollResult(
+                coordinatorRequestState.remainingBackoffMs(currentTimeMs),
+                new ArrayList<>());
+    }
+
+    private NetworkClientDelegate.UnsentRequest 
makeFindCoordinatorRequest(final long currentTimeMs) {
+        coordinatorRequestState.updateLastSend(currentTimeMs);
+        FindCoordinatorRequestData data = new FindCoordinatorRequestData()
+                .setKeyType(FindCoordinatorRequest.CoordinatorType.GROUP.id())
+                .setKey(this.groupId);
+        return NetworkClientDelegate.UnsentRequest.makeUnsentRequest(
+                this.time.timer(requestTimeoutMs),
+                new FindCoordinatorRequest.Builder(data),
+                new FindCoordinatorRequestHandler());
+    }
+
+    /**
+     * Mark the current coordinator null and return the old coordinator. 
Return an empty Optional
+     * if the current coordinator is unknown.
+     * @param cause why the coordinator is marked unknown
+     * @return Optional coordinator node that can be null.
+     */
+    protected void markCoordinatorUnknown(final String cause, final long 
currentTimeMs) {
+        if (this.coordinator != null) {
+            log.info("Group coordinator {} is unavailable or invalid due to 
cause: {}. "
+                            + "Rediscovery will be attempted.", 
this.coordinator, cause);
+            this.coordinator = null;
+            timeMarkedUnknownMs = currentTimeMs;
+        } else {
+            long durationOfOngoingDisconnect = Math.max(0, currentTimeMs - 
timeMarkedUnknownMs);
+            if (durationOfOngoingDisconnect > this.rebalanceTimeoutMs)
+                log.debug("Consumer has been disconnected from the group 
coordinator for {}ms",
+                        durationOfOngoingDisconnect);
+        }
+    }
+
+    private void handleSuccessFindCoordinatorResponse(
+            final FindCoordinatorResponseData.Coordinator coordinator,
+            final long currentTimeMS) {
+        // use MAX_VALUE - node.id as the coordinator id to allow separate 
connections
+        // for the coordinator in the underlying network client layer
+        int coordinatorConnectionId = Integer.MAX_VALUE - coordinator.nodeId();
+
+        this.coordinator = new Node(
+                coordinatorConnectionId,
+                coordinator.host(),
+                coordinator.port());
+        log.info("Discovered group coordinator {}", coordinator);
+        coordinatorRequestState.reset();
+        return;
+    }
+
+    private void handleFailedCoordinatorResponse(
+            final FindCoordinatorResponseData.Coordinator response,
+            final long currentTimeMS) {
+        Errors error = Errors.forCode(response.errorCode());
+        log.debug("FindCoordinator request failed due to {}", 
error.toString());
+        coordinatorRequestState.updateLastFailedAttempt(currentTimeMS);
+        markCoordinatorUnknown("coordinator unavailable", currentTimeMS);
+        if (!(error.exception() instanceof RetriableException)) {
+            log.info("FindCoordinator request hit fatal exception", 
error.exception());
+            // Remember the exception if fatal so we can ensure
+            // it gets thrown by the main thread
+            errorHandler.handle(error.exception());
+            return;
+        }
+
+        if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
+            
errorHandler.handle(GroupAuthorizationException.forGroupId(this.groupId));
+            return;
+        }
+
+        log.debug("Group coordinator lookup failed: {}", 
response.errorMessage());
+        errorHandler.handle(error.exception());
+
+        log.debug("Coordinator discovery failed, refreshing metadata", 
error.exception());
+    }
+
+    public void onResponse(final FindCoordinatorResponse response, Throwable 
t) {
+        long currentTimeMs = time.milliseconds();
+        // handle Runtime exception
+        if (t != null) {
+            log.error("FindCoordinator request failed due to {}", 
t.getMessage());
+            markCoordinatorUnknown("coordinator unavailable", currentTimeMs);

Review Comment:
   Is this necessary?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java:
##########
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.utils.ExponentialBackoff;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Handles the timing of the next FindCoordinatorRequest based on the {@link 
CoordinatorRequestState}. It checks for:
+ * 1. If there's an existing coordinator.
+ * 2. If there is an inflight request
+ * 3. If the backoff timer has expired
+ *
+ * The {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} 
contains either a wait
+ * timer, or a singleton list of
+ * {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest}.
+ *
+ * The FindCoordinatorResponse will be handled by the {@link 
FindCoordinatorRequestHandler} callback, which
+ * subsequently invokes {@code onResponse} to handle the exceptions and 
responses. Note that, the coordinator node
+ * will be marked {@code null} upon receiving a failure.
+ */
+public class CoordinatorRequestManager implements RequestManager {
+
+    private final Logger log;
+    private final Time time;
+    private final long requestTimeoutMs;
+    private final ErrorEventHandler errorHandler;
+    private final long rebalanceTimeoutMs;
+    private final String groupId;
+
+    private final CoordinatorRequestState coordinatorRequestState;
+    private long timeMarkedUnknownMs = -1L; // starting logging a warning only 
after unable to connect for a while
+    private Node coordinator;
+
+
+    public CoordinatorRequestManager(final Time time,
+                                     final LogContext logContext,
+                                     final ConsumerConfig config,
+                                     final ErrorEventHandler errorHandler,
+                                     final String groupId,
+                                     final long rebalanceTimeoutMs) {
+        Objects.requireNonNull(groupId);
+        this.time = time;
+        this.log = logContext.logger(this.getClass());
+        this.errorHandler = errorHandler;
+        this.groupId = groupId;
+        this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+        this.requestTimeoutMs = 
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
+        this.coordinatorRequestState = new CoordinatorRequestState(config);
+    }
+
+    // Visible for testing
+    CoordinatorRequestManager(final Time time,
+                              final LogContext logContext,
+                              final ErrorEventHandler errorHandler,
+                              final String groupId,
+                              final long rebalanceTimeoutMs,
+                              final long requestTimeoutMs,
+                              final CoordinatorRequestState 
coordinatorRequestState) {
+        Objects.requireNonNull(groupId);
+        this.time = time;
+        this.log = logContext.logger(this.getClass());
+        this.errorHandler = errorHandler;
+        this.groupId = groupId;
+        this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+        this.requestTimeoutMs = requestTimeoutMs;
+        this.coordinatorRequestState = coordinatorRequestState;
+    }
+
+    @Override
+    public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
+        if (this.coordinator != null) {
+            return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, 
Collections.emptyList());
+        }
+
+        if (coordinatorRequestState.canSendRequest(currentTimeMs)) {
+            NetworkClientDelegate.UnsentRequest request = 
makeFindCoordinatorRequest(currentTimeMs);
+            return new NetworkClientDelegate.PollResult(0, 
Collections.singletonList(request));
+        }
+
+        return new NetworkClientDelegate.PollResult(
+                coordinatorRequestState.remainingBackoffMs(currentTimeMs),
+                new ArrayList<>());
+    }
+
+    private NetworkClientDelegate.UnsentRequest 
makeFindCoordinatorRequest(final long currentTimeMs) {
+        coordinatorRequestState.updateLastSend(currentTimeMs);
+        FindCoordinatorRequestData data = new FindCoordinatorRequestData()
+                .setKeyType(FindCoordinatorRequest.CoordinatorType.GROUP.id())
+                .setKey(this.groupId);
+        return NetworkClientDelegate.UnsentRequest.makeUnsentRequest(
+                this.time.timer(requestTimeoutMs),
+                new FindCoordinatorRequest.Builder(data),
+                new FindCoordinatorRequestHandler());
+    }
+
+    /**
+     * Mark the current coordinator null and return the old coordinator. 
Return an empty Optional
+     * if the current coordinator is unknown.
+     * @param cause why the coordinator is marked unknown
+     * @return Optional coordinator node that can be null.
+     */
+    protected void markCoordinatorUnknown(final String cause, final long 
currentTimeMs) {
+        if (this.coordinator != null) {
+            log.info("Group coordinator {} is unavailable or invalid due to 
cause: {}. "
+                            + "Rediscovery will be attempted.", 
this.coordinator, cause);
+            this.coordinator = null;
+            timeMarkedUnknownMs = currentTimeMs;
+        } else {
+            long durationOfOngoingDisconnect = Math.max(0, currentTimeMs - 
timeMarkedUnknownMs);
+            if (durationOfOngoingDisconnect > this.rebalanceTimeoutMs)
+                log.debug("Consumer has been disconnected from the group 
coordinator for {}ms",
+                        durationOfOngoingDisconnect);
+        }
+    }
+
+    private void handleSuccessFindCoordinatorResponse(
+            final FindCoordinatorResponseData.Coordinator coordinator,
+            final long currentTimeMS) {
+        // use MAX_VALUE - node.id as the coordinator id to allow separate 
connections
+        // for the coordinator in the underlying network client layer
+        int coordinatorConnectionId = Integer.MAX_VALUE - coordinator.nodeId();
+
+        this.coordinator = new Node(
+                coordinatorConnectionId,
+                coordinator.host(),
+                coordinator.port());
+        log.info("Discovered group coordinator {}", coordinator);
+        coordinatorRequestState.reset();
+        return;
+    }
+
+    private void handleFailedCoordinatorResponse(
+            final FindCoordinatorResponseData.Coordinator response,
+            final long currentTimeMS) {
+        Errors error = Errors.forCode(response.errorCode());
+        log.debug("FindCoordinator request failed due to {}", 
error.toString());
+        coordinatorRequestState.updateLastFailedAttempt(currentTimeMS);
+        markCoordinatorUnknown("coordinator unavailable", currentTimeMS);
+        if (!(error.exception() instanceof RetriableException)) {
+            log.info("FindCoordinator request hit fatal exception", 
error.exception());
+            // Remember the exception if fatal so we can ensure
+            // it gets thrown by the main thread
+            errorHandler.handle(error.exception());
+            return;
+        }
+
+        if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
+            
errorHandler.handle(GroupAuthorizationException.forGroupId(this.groupId));
+            return;
+        }
+
+        log.debug("Group coordinator lookup failed: {}", 
response.errorMessage());
+        errorHandler.handle(error.exception());
+
+        log.debug("Coordinator discovery failed, refreshing metadata", 
error.exception());
+    }
+
+    public void onResponse(final FindCoordinatorResponse response, Throwable 
t) {
+        long currentTimeMs = time.milliseconds();
+        // handle Runtime exception
+        if (t != null) {
+            log.error("FindCoordinator request failed due to {}", 
t.getMessage());
+            markCoordinatorUnknown("coordinator unavailable", currentTimeMs);
+            return;
+        }
+
+        List<FindCoordinatorResponseData.Coordinator> coordinators = 
response.getCoordinatorByKey(this.groupId);
+        if (coordinators.size() != 1) {
+            coordinatorRequestState.updateLastFailedAttempt(currentTimeMs);
+            String msg = String.format("Group coordinator lookup failed: 
Response should contain exactly one " +
+                    "coordinator, it has %d", coordinators.size());
+            log.error(msg);
+            errorHandler.handle(new IllegalStateException(msg));
+            return;
+        }
+
+        FindCoordinatorResponseData.Coordinator node = coordinators.get(0);
+        if (node.errorCode() != Errors.NONE.code()) {
+            coordinatorRequestState.updateLastFailedAttempt(currentTimeMs);
+            handleFailedCoordinatorResponse(node, currentTimeMs);
+            return;
+        }
+        handleSuccessFindCoordinatorResponse(node, currentTimeMs);
+    }
+
+    public Node coordinator() {
+        return this.coordinator;
+    }
+
+    // Visible for testing
+    static class CoordinatorRequestState {
+        final static int RECONNECT_BACKOFF_EXP_BASE = 2;
+        final static double RECONNECT_BACKOFF_JITTER = 0.2;
+        private final ExponentialBackoff exponentialBackoff;
+        private long lastSentMs = -1;
+        private long lastReceivedMs = -1;
+        private int numAttempts = 0;
+        private long backoffMs = 0;
+
+        public CoordinatorRequestState(ConsumerConfig config) {
+            this.exponentialBackoff = new ExponentialBackoff(
+                    config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG),
+                    RECONNECT_BACKOFF_EXP_BASE,
+                    
config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG),
+                    RECONNECT_BACKOFF_JITTER);
+        }
+
+        // Visible for testing
+        CoordinatorRequestState(final int reconnectBackoffMs,
+                                final int reconnectBackoffExpBase,
+                                final int reconnectBackoffMaxMs,
+                                final int jitter) {
+            this.exponentialBackoff = new ExponentialBackoff(
+                    reconnectBackoffMs,
+                    reconnectBackoffExpBase,
+                    reconnectBackoffMaxMs,
+                    jitter);
+        }
+
+        public void reset() {
+            this.lastSentMs = -1;
+            this.lastReceivedMs = -1;
+            this.numAttempts = 0;
+            this.backoffMs = exponentialBackoff.backoff(0);
+        }
+
+        public boolean canSendRequest(final long currentTimeMs) {
+            if (this.lastSentMs == -1) {
+                // no request has been sent
+                return true;
+            }
+
+            // TODO: I think there's a case when we want to resend the 
FindCoordinator when we haven't received
+            //  anything yet.
+            if (this.lastReceivedMs == -1 ||
+                    this.lastReceivedMs < this.lastSentMs) {
+                // there is an inflight request
+                return false;
+            }
+
+            return requestBackoffExpired(currentTimeMs);
+        }
+
+        public void updateLastSend(final long currentTimeMs) {
+            // Here we update the timer everytime we try to send a request. 
Also increment number of attempts.
+            this.lastSentMs = currentTimeMs;
+        }
+
+        public void updateLastFailedAttempt(final long currentTimeMs) {
+            this.lastReceivedMs = currentTimeMs;
+            this.backoffMs = exponentialBackoff.backoff(numAttempts);
+            this.numAttempts++;
+        }
+
+        private boolean requestBackoffExpired(final long currentTimeMs) {
+            return (currentTimeMs - this.lastReceivedMs) >= this.backoffMs;

Review Comment:
   I think this overflows if `currentTimeMs` is less than `lastReceivedMs`. 
Could we just write this as this:
   ```java
   return remainingBackoffMs(currentTimeMs) > 0;
   ```



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java:
##########
@@ -109,101 +134,102 @@ public void run() {
                 try {
                     runOnce();
                 } catch (final WakeupException e) {
-                    log.debug(
-                        "Exception thrown, background thread won't terminate",
-                        e
-                    );
-                    // swallow the wakeup exception to prevent killing the
-                    // background thread.
+                    log.debug("WakeupException caught, background thread won't 
be interrupted");
+                    // swallow the wakeup exception to prevent killing the 
background thread.
                 }
             }
         } catch (final Throwable t) {
-            log.error(
-                "The background thread failed due to unexpected error",
-                t
-            );
-            if (t instanceof RuntimeException)
-                this.exception.set(Optional.of((RuntimeException) t));
-            else
-                this.exception.set(Optional.of(new RuntimeException(t)));
+            log.error("The background thread failed due to unexpected error", 
t);
+            throw new RuntimeException(t);
         } finally {
             close();
             log.debug("{} closed", getClass());
         }
     }
 
     /**
-     * Process event from a single poll
+     * Process event from a single poll. It performs the following tasks 
sequentially:
+     *  1. Try to poll and event from the queue, and try to process it.
+     *  2. Try to find Coordinator if needed
+     *  3. Try to send fetches.
+     *  4. Poll the networkClient for outstanding requests. If non: poll until 
next
+     *  iteration.
      */
     void runOnce() {
-        this.inflightEvent = maybePollEvent();
-        if (this.inflightEvent.isPresent()) {
-            log.debug("processing application event: {}", this.inflightEvent);
+        Optional<ApplicationEvent> event = maybePollEvent();
+
+        if (event.isPresent()) {
+            log.debug("processing application event: {}", event);
+            consumeApplicationEvent(event.get());
         }
-        if (this.inflightEvent.isPresent() && 
maybeConsumeInflightEvent(this.inflightEvent.get())) {
-            // clear inflight event upon successful consumption
-            this.inflightEvent = Optional.empty();
+
+        final long currentTimeMs = time.milliseconds();
+        long pollWaitTimeMs = timeToNextHeartbeatMs();
+
+        // TODO: Add a condition here, like shouldFindCoordinator in the 
future.  Since we don't always need to find
+        //  the coordinator.
+        if (coordinatorManager.isPresent()) {
+            pollWaitTimeMs = Math.min(pollWaitTimeMs,
+                    
handlePollResult(coordinatorManager.get().poll(currentTimeMs)));
         }
 
         // if there are pending events to process, poll then continue without
         // blocking.
-        if (!applicationEventQueue.isEmpty() || inflightEvent.isPresent()) {
-            networkClient.poll(time.timer(0));
+        if (!applicationEventQueue.isEmpty()) {
+            networkClientDelegate.poll(time.timer(0), false);
             return;
         }
         // if there are no events to process, poll until timeout. The timeout
         // will be the minimum of the requestTimeoutMs, nextHeartBeatMs, and
         // nextMetadataUpdate. See NetworkClient.poll impl.
-        
networkClient.poll(time.timer(timeToNextHeartbeatMs(time.milliseconds())));
+        networkClientDelegate.poll(time.timer(pollWaitTimeMs), false);
+    }
+
+    long handlePollResult(NetworkClientDelegate.PollResult res) {
+        Objects.requireNonNull(res);
+        if (!res.unsentRequests.isEmpty()) {
+            networkClientDelegate.addAll(res.unsentRequests);
+            return Long.MAX_VALUE;

Review Comment:
   Why don't we just always return `timeMsTillNextPoll`? When we start handling 
fetches, we may be backing off fetching from some brokers while still fetching 
from others.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##########
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.RequestCompletionHandler;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.Set;
+
+/**
+ * A wrapper around the {@link org.apache.kafka.clients.NetworkClient} to 
handle poll and send operations.
+ */
+public class NetworkClientDelegate implements AutoCloseable {
+    private final KafkaClient client;
+    private final Time time;
+    private final Logger log;
+    private final int requestTimeoutMs;
+    private boolean wakeup = false;
+    private final Queue<UnsentRequest> unsentRequests;
+    private final Set<Node> activeNodes;
+
+    public NetworkClientDelegate(
+            final Time time,
+            final ConsumerConfig config,
+            final LogContext logContext,
+            final KafkaClient client) {
+        this.time = time;
+        this.client = client;
+        this.log = logContext.logger(getClass());
+        this.unsentRequests = new ArrayDeque<>();
+        this.requestTimeoutMs = 
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
+        this.activeNodes = new HashSet<>();
+    }
+
+    public List<ClientResponse> poll(Timer timer, boolean disableWakeup) {

Review Comment:
   Can we get rid of `disableWakeup`? We don't really need it when the polling 
is done in the background. Also, why don't we just use `long timeoutMs` instead 
of `Timer`.



##########
clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java:
##########
@@ -50,6 +52,22 @@ public FindCoordinatorResponse(FindCoordinatorResponseData 
data) {
         this.data = data;
     }
 
+    public List<Coordinator> getCoordinatorByKey(String key) {

Review Comment:
   Why don't we return `Optional<Coordinator>`? I don't think it can match more 
than one.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java:
##########
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.utils.ExponentialBackoff;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Handles the timing of the next FindCoordinatorRequest based on the {@link 
CoordinatorRequestState}. It checks for:
+ * 1. If there's an existing coordinator.
+ * 2. If there is an inflight request
+ * 3. If the backoff timer has expired
+ *
+ * The {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} 
contains either a wait
+ * timer, or a singleton list of
+ * {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest}.
+ *
+ * The FindCoordinatorResponse will be handled by the {@link 
FindCoordinatorRequestHandler} callback, which
+ * subsequently invokes {@code onResponse} to handle the exceptions and 
responses. Note that, the coordinator node
+ * will be marked {@code null} upon receiving a failure.
+ */
+public class CoordinatorRequestManager implements RequestManager {
+
+    private final Logger log;
+    private final Time time;
+    private final long requestTimeoutMs;
+    private final ErrorEventHandler errorHandler;
+    private final long rebalanceTimeoutMs;
+    private final String groupId;
+
+    private final CoordinatorRequestState coordinatorRequestState;
+    private long timeMarkedUnknownMs = -1L; // starting logging a warning only 
after unable to connect for a while
+    private Node coordinator;
+
+
+    public CoordinatorRequestManager(final Time time,
+                                     final LogContext logContext,
+                                     final ConsumerConfig config,
+                                     final ErrorEventHandler errorHandler,
+                                     final String groupId,
+                                     final long rebalanceTimeoutMs) {
+        Objects.requireNonNull(groupId);
+        this.time = time;
+        this.log = logContext.logger(this.getClass());
+        this.errorHandler = errorHandler;
+        this.groupId = groupId;
+        this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+        this.requestTimeoutMs = 
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
+        this.coordinatorRequestState = new CoordinatorRequestState(config);
+    }
+
+    // Visible for testing
+    CoordinatorRequestManager(final Time time,
+                              final LogContext logContext,
+                              final ErrorEventHandler errorHandler,
+                              final String groupId,
+                              final long rebalanceTimeoutMs,
+                              final long requestTimeoutMs,
+                              final CoordinatorRequestState 
coordinatorRequestState) {
+        Objects.requireNonNull(groupId);
+        this.time = time;
+        this.log = logContext.logger(this.getClass());
+        this.errorHandler = errorHandler;
+        this.groupId = groupId;
+        this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+        this.requestTimeoutMs = requestTimeoutMs;
+        this.coordinatorRequestState = coordinatorRequestState;
+    }
+
+    @Override
+    public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
+        if (this.coordinator != null) {
+            return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, 
Collections.emptyList());
+        }
+
+        if (coordinatorRequestState.canSendRequest(currentTimeMs)) {
+            NetworkClientDelegate.UnsentRequest request = 
makeFindCoordinatorRequest(currentTimeMs);
+            return new NetworkClientDelegate.PollResult(0, 
Collections.singletonList(request));
+        }
+
+        return new NetworkClientDelegate.PollResult(
+                coordinatorRequestState.remainingBackoffMs(currentTimeMs),
+                new ArrayList<>());
+    }
+
+    private NetworkClientDelegate.UnsentRequest 
makeFindCoordinatorRequest(final long currentTimeMs) {
+        coordinatorRequestState.updateLastSend(currentTimeMs);
+        FindCoordinatorRequestData data = new FindCoordinatorRequestData()
+                .setKeyType(FindCoordinatorRequest.CoordinatorType.GROUP.id())
+                .setKey(this.groupId);
+        return NetworkClientDelegate.UnsentRequest.makeUnsentRequest(
+                this.time.timer(requestTimeoutMs),
+                new FindCoordinatorRequest.Builder(data),
+                new FindCoordinatorRequestHandler());
+    }
+
+    /**
+     * Mark the current coordinator null and return the old coordinator. 
Return an empty Optional
+     * if the current coordinator is unknown.
+     * @param cause why the coordinator is marked unknown
+     * @return Optional coordinator node that can be null.
+     */
+    protected void markCoordinatorUnknown(final String cause, final long 
currentTimeMs) {
+        if (this.coordinator != null) {
+            log.info("Group coordinator {} is unavailable or invalid due to 
cause: {}. "
+                            + "Rediscovery will be attempted.", 
this.coordinator, cause);
+            this.coordinator = null;
+            timeMarkedUnknownMs = currentTimeMs;
+        } else {
+            long durationOfOngoingDisconnect = Math.max(0, currentTimeMs - 
timeMarkedUnknownMs);
+            if (durationOfOngoingDisconnect > this.rebalanceTimeoutMs)
+                log.debug("Consumer has been disconnected from the group 
coordinator for {}ms",
+                        durationOfOngoingDisconnect);
+        }
+    }
+
+    private void handleSuccessFindCoordinatorResponse(
+            final FindCoordinatorResponseData.Coordinator coordinator,
+            final long currentTimeMS) {
+        // use MAX_VALUE - node.id as the coordinator id to allow separate 
connections
+        // for the coordinator in the underlying network client layer
+        int coordinatorConnectionId = Integer.MAX_VALUE - coordinator.nodeId();
+
+        this.coordinator = new Node(
+                coordinatorConnectionId,
+                coordinator.host(),
+                coordinator.port());
+        log.info("Discovered group coordinator {}", coordinator);
+        coordinatorRequestState.reset();
+        return;
+    }
+
+    private void handleFailedCoordinatorResponse(
+            final FindCoordinatorResponseData.Coordinator response,
+            final long currentTimeMS) {
+        Errors error = Errors.forCode(response.errorCode());
+        log.debug("FindCoordinator request failed due to {}", 
error.toString());
+        coordinatorRequestState.updateLastFailedAttempt(currentTimeMS);
+        markCoordinatorUnknown("coordinator unavailable", currentTimeMS);
+        if (!(error.exception() instanceof RetriableException)) {
+            log.info("FindCoordinator request hit fatal exception", 
error.exception());
+            // Remember the exception if fatal so we can ensure
+            // it gets thrown by the main thread
+            errorHandler.handle(error.exception());
+            return;
+        }
+
+        if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
+            
errorHandler.handle(GroupAuthorizationException.forGroupId(this.groupId));
+            return;
+        }
+
+        log.debug("Group coordinator lookup failed: {}", 
response.errorMessage());
+        errorHandler.handle(error.exception());

Review Comment:
   Do we need to pass through retriable errors? I had thought this would only 
be for fatal errors.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java:
##########
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.utils.ExponentialBackoff;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Handles the timing of the next FindCoordinatorRequest based on the {@link 
CoordinatorRequestState}. It checks for:
+ * 1. If there's an existing coordinator.
+ * 2. If there is an inflight request
+ * 3. If the backoff timer has expired
+ *
+ * The {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} 
contains either a wait
+ * timer, or a singleton list of
+ * {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest}.
+ *
+ * The FindCoordinatorResponse will be handled by the {@link 
FindCoordinatorRequestHandler} callback, which
+ * subsequently invokes {@code onResponse} to handle the exceptions and 
responses. Note that, the coordinator node
+ * will be marked {@code null} upon receiving a failure.
+ */
+public class CoordinatorRequestManager implements RequestManager {
+
+    private final Logger log;
+    private final Time time;
+    private final long requestTimeoutMs;
+    private final ErrorEventHandler errorHandler;
+    private final long rebalanceTimeoutMs;
+    private final String groupId;
+
+    private final CoordinatorRequestState coordinatorRequestState;
+    private long timeMarkedUnknownMs = -1L; // starting logging a warning only 
after unable to connect for a while
+    private Node coordinator;
+
+
+    public CoordinatorRequestManager(final Time time,
+                                     final LogContext logContext,
+                                     final ConsumerConfig config,
+                                     final ErrorEventHandler errorHandler,
+                                     final String groupId,
+                                     final long rebalanceTimeoutMs) {
+        Objects.requireNonNull(groupId);
+        this.time = time;
+        this.log = logContext.logger(this.getClass());
+        this.errorHandler = errorHandler;
+        this.groupId = groupId;
+        this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+        this.requestTimeoutMs = 
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
+        this.coordinatorRequestState = new CoordinatorRequestState(config);
+    }
+
+    // Visible for testing
+    CoordinatorRequestManager(final Time time,
+                              final LogContext logContext,
+                              final ErrorEventHandler errorHandler,
+                              final String groupId,
+                              final long rebalanceTimeoutMs,
+                              final long requestTimeoutMs,
+                              final CoordinatorRequestState 
coordinatorRequestState) {
+        Objects.requireNonNull(groupId);
+        this.time = time;
+        this.log = logContext.logger(this.getClass());
+        this.errorHandler = errorHandler;
+        this.groupId = groupId;
+        this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+        this.requestTimeoutMs = requestTimeoutMs;
+        this.coordinatorRequestState = coordinatorRequestState;
+    }
+
+    @Override
+    public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
+        if (this.coordinator != null) {
+            return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, 
Collections.emptyList());
+        }
+
+        if (coordinatorRequestState.canSendRequest(currentTimeMs)) {
+            NetworkClientDelegate.UnsentRequest request = 
makeFindCoordinatorRequest(currentTimeMs);
+            return new NetworkClientDelegate.PollResult(0, 
Collections.singletonList(request));
+        }
+
+        return new NetworkClientDelegate.PollResult(
+                coordinatorRequestState.remainingBackoffMs(currentTimeMs),
+                new ArrayList<>());
+    }
+
+    private NetworkClientDelegate.UnsentRequest 
makeFindCoordinatorRequest(final long currentTimeMs) {
+        coordinatorRequestState.updateLastSend(currentTimeMs);
+        FindCoordinatorRequestData data = new FindCoordinatorRequestData()
+                .setKeyType(FindCoordinatorRequest.CoordinatorType.GROUP.id())
+                .setKey(this.groupId);
+        return NetworkClientDelegate.UnsentRequest.makeUnsentRequest(
+                this.time.timer(requestTimeoutMs),
+                new FindCoordinatorRequest.Builder(data),
+                new FindCoordinatorRequestHandler());
+    }
+
+    /**
+     * Mark the current coordinator null and return the old coordinator. 
Return an empty Optional
+     * if the current coordinator is unknown.
+     * @param cause why the coordinator is marked unknown
+     * @return Optional coordinator node that can be null.
+     */
+    protected void markCoordinatorUnknown(final String cause, final long 
currentTimeMs) {
+        if (this.coordinator != null) {
+            log.info("Group coordinator {} is unavailable or invalid due to 
cause: {}. "
+                            + "Rediscovery will be attempted.", 
this.coordinator, cause);
+            this.coordinator = null;
+            timeMarkedUnknownMs = currentTimeMs;
+        } else {
+            long durationOfOngoingDisconnect = Math.max(0, currentTimeMs - 
timeMarkedUnknownMs);
+            if (durationOfOngoingDisconnect > this.rebalanceTimeoutMs)
+                log.debug("Consumer has been disconnected from the group 
coordinator for {}ms",
+                        durationOfOngoingDisconnect);
+        }
+    }
+
+    private void handleSuccessFindCoordinatorResponse(
+            final FindCoordinatorResponseData.Coordinator coordinator,
+            final long currentTimeMS) {
+        // use MAX_VALUE - node.id as the coordinator id to allow separate 
connections
+        // for the coordinator in the underlying network client layer
+        int coordinatorConnectionId = Integer.MAX_VALUE - coordinator.nodeId();
+
+        this.coordinator = new Node(
+                coordinatorConnectionId,
+                coordinator.host(),
+                coordinator.port());
+        log.info("Discovered group coordinator {}", coordinator);
+        coordinatorRequestState.reset();
+        return;
+    }
+
+    private void handleFailedCoordinatorResponse(
+            final FindCoordinatorResponseData.Coordinator response,
+            final long currentTimeMS) {
+        Errors error = Errors.forCode(response.errorCode());
+        log.debug("FindCoordinator request failed due to {}", 
error.toString());
+        coordinatorRequestState.updateLastFailedAttempt(currentTimeMS);
+        markCoordinatorUnknown("coordinator unavailable", currentTimeMS);
+        if (!(error.exception() instanceof RetriableException)) {
+            log.info("FindCoordinator request hit fatal exception", 
error.exception());
+            // Remember the exception if fatal so we can ensure
+            // it gets thrown by the main thread
+            errorHandler.handle(error.exception());
+            return;
+        }
+
+        if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
+            
errorHandler.handle(GroupAuthorizationException.forGroupId(this.groupId));
+            return;
+        }
+
+        log.debug("Group coordinator lookup failed: {}", 
response.errorMessage());
+        errorHandler.handle(error.exception());
+
+        log.debug("Coordinator discovery failed, refreshing metadata", 
error.exception());
+    }
+
+    public void onResponse(final FindCoordinatorResponse response, Throwable 
t) {
+        long currentTimeMs = time.milliseconds();
+        // handle Runtime exception
+        if (t != null) {
+            log.error("FindCoordinator request failed due to {}", 
t.getMessage());
+            markCoordinatorUnknown("coordinator unavailable", currentTimeMs);
+            return;
+        }
+
+        List<FindCoordinatorResponseData.Coordinator> coordinators = 
response.getCoordinatorByKey(this.groupId);
+        if (coordinators.size() != 1) {
+            coordinatorRequestState.updateLastFailedAttempt(currentTimeMs);
+            String msg = String.format("Group coordinator lookup failed: 
Response should contain exactly one " +
+                    "coordinator, it has %d", coordinators.size());
+            log.error(msg);
+            errorHandler.handle(new IllegalStateException(msg));
+            return;
+        }
+
+        FindCoordinatorResponseData.Coordinator node = coordinators.get(0);
+        if (node.errorCode() != Errors.NONE.code()) {
+            coordinatorRequestState.updateLastFailedAttempt(currentTimeMs);

Review Comment:
   We already do this in `handleFailedCoordinatorResponse`?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java:
##########
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.utils.ExponentialBackoff;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Handles the timing of the next FindCoordinatorRequest based on the {@link 
CoordinatorRequestState}. It checks for:
+ * 1. If there's an existing coordinator.
+ * 2. If there is an inflight request
+ * 3. If the backoff timer has expired
+ *
+ * The {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} 
contains either a wait
+ * timer, or a singleton list of
+ * {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest}.
+ *
+ * The FindCoordinatorResponse will be handled by the {@link 
FindCoordinatorRequestHandler} callback, which
+ * subsequently invokes {@code onResponse} to handle the exceptions and 
responses. Note that, the coordinator node
+ * will be marked {@code null} upon receiving a failure.
+ */
+public class CoordinatorRequestManager implements RequestManager {
+
+    private final Logger log;
+    private final Time time;
+    private final long requestTimeoutMs;
+    private final ErrorEventHandler errorHandler;
+    private final long rebalanceTimeoutMs;
+    private final String groupId;
+
+    private final CoordinatorRequestState coordinatorRequestState;
+    private long timeMarkedUnknownMs = -1L; // starting logging a warning only 
after unable to connect for a while
+    private Node coordinator;
+
+
+    public CoordinatorRequestManager(final Time time,
+                                     final LogContext logContext,
+                                     final ConsumerConfig config,
+                                     final ErrorEventHandler errorHandler,
+                                     final String groupId,
+                                     final long rebalanceTimeoutMs) {
+        Objects.requireNonNull(groupId);
+        this.time = time;
+        this.log = logContext.logger(this.getClass());
+        this.errorHandler = errorHandler;
+        this.groupId = groupId;
+        this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+        this.requestTimeoutMs = 
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
+        this.coordinatorRequestState = new CoordinatorRequestState(config);
+    }
+
+    // Visible for testing
+    CoordinatorRequestManager(final Time time,
+                              final LogContext logContext,
+                              final ErrorEventHandler errorHandler,
+                              final String groupId,
+                              final long rebalanceTimeoutMs,
+                              final long requestTimeoutMs,
+                              final CoordinatorRequestState 
coordinatorRequestState) {
+        Objects.requireNonNull(groupId);
+        this.time = time;
+        this.log = logContext.logger(this.getClass());
+        this.errorHandler = errorHandler;
+        this.groupId = groupId;
+        this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+        this.requestTimeoutMs = requestTimeoutMs;
+        this.coordinatorRequestState = coordinatorRequestState;
+    }
+
+    @Override
+    public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
+        if (this.coordinator != null) {
+            return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, 
Collections.emptyList());
+        }
+
+        if (coordinatorRequestState.canSendRequest(currentTimeMs)) {
+            NetworkClientDelegate.UnsentRequest request = 
makeFindCoordinatorRequest(currentTimeMs);
+            return new NetworkClientDelegate.PollResult(0, 
Collections.singletonList(request));
+        }
+
+        return new NetworkClientDelegate.PollResult(
+                coordinatorRequestState.remainingBackoffMs(currentTimeMs),
+                new ArrayList<>());
+    }
+
+    private NetworkClientDelegate.UnsentRequest 
makeFindCoordinatorRequest(final long currentTimeMs) {
+        coordinatorRequestState.updateLastSend(currentTimeMs);
+        FindCoordinatorRequestData data = new FindCoordinatorRequestData()
+                .setKeyType(FindCoordinatorRequest.CoordinatorType.GROUP.id())
+                .setKey(this.groupId);
+        return NetworkClientDelegate.UnsentRequest.makeUnsentRequest(
+                this.time.timer(requestTimeoutMs),
+                new FindCoordinatorRequest.Builder(data),
+                new FindCoordinatorRequestHandler());
+    }
+
+    /**
+     * Mark the current coordinator null and return the old coordinator. 
Return an empty Optional
+     * if the current coordinator is unknown.
+     * @param cause why the coordinator is marked unknown
+     * @return Optional coordinator node that can be null.
+     */
+    protected void markCoordinatorUnknown(final String cause, final long 
currentTimeMs) {
+        if (this.coordinator != null) {
+            log.info("Group coordinator {} is unavailable or invalid due to 
cause: {}. "
+                            + "Rediscovery will be attempted.", 
this.coordinator, cause);
+            this.coordinator = null;
+            timeMarkedUnknownMs = currentTimeMs;
+        } else {
+            long durationOfOngoingDisconnect = Math.max(0, currentTimeMs - 
timeMarkedUnknownMs);
+            if (durationOfOngoingDisconnect > this.rebalanceTimeoutMs)
+                log.debug("Consumer has been disconnected from the group 
coordinator for {}ms",
+                        durationOfOngoingDisconnect);
+        }
+    }
+
+    private void handleSuccessFindCoordinatorResponse(

Review Comment:
   I think we can make these names more concise. The request type is already 
clear in the context, so we could call this `onSuccessfulResponse` for example.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java:
##########
@@ -50,57 +49,83 @@ public class DefaultBackgroundThread extends KafkaThread {
     private final Logger log;
     private final BlockingQueue<ApplicationEvent> applicationEventQueue;
     private final BlockingQueue<BackgroundEvent> backgroundEventQueue;
-    private final ConsumerNetworkClient networkClient;
-    private final SubscriptionState subscriptions;
     private final ConsumerMetadata metadata;
-    private final Metrics metrics;
     private final ConsumerConfig config;
+    // empty if groupId is null
+    private final Optional<CoordinatorRequestManager> coordinatorManager;

Review Comment:
   Instead of using `Optional`, could we use a NOOP implementation?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java:
##########
@@ -109,101 +134,102 @@ public void run() {
                 try {
                     runOnce();
                 } catch (final WakeupException e) {
-                    log.debug(
-                        "Exception thrown, background thread won't terminate",
-                        e
-                    );
-                    // swallow the wakeup exception to prevent killing the
-                    // background thread.
+                    log.debug("WakeupException caught, background thread won't 
be interrupted");
+                    // swallow the wakeup exception to prevent killing the 
background thread.
                 }
             }
         } catch (final Throwable t) {
-            log.error(
-                "The background thread failed due to unexpected error",
-                t
-            );
-            if (t instanceof RuntimeException)
-                this.exception.set(Optional.of((RuntimeException) t));
-            else
-                this.exception.set(Optional.of(new RuntimeException(t)));
+            log.error("The background thread failed due to unexpected error", 
t);
+            throw new RuntimeException(t);
         } finally {
             close();
             log.debug("{} closed", getClass());
         }
     }
 
     /**
-     * Process event from a single poll
+     * Process event from a single poll. It performs the following tasks 
sequentially:
+     *  1. Try to poll and event from the queue, and try to process it.
+     *  2. Try to find Coordinator if needed
+     *  3. Try to send fetches.
+     *  4. Poll the networkClient for outstanding requests. If non: poll until 
next
+     *  iteration.
      */
     void runOnce() {
-        this.inflightEvent = maybePollEvent();
-        if (this.inflightEvent.isPresent()) {
-            log.debug("processing application event: {}", this.inflightEvent);
+        Optional<ApplicationEvent> event = maybePollEvent();
+
+        if (event.isPresent()) {
+            log.debug("processing application event: {}", event);
+            consumeApplicationEvent(event.get());
         }
-        if (this.inflightEvent.isPresent() && 
maybeConsumeInflightEvent(this.inflightEvent.get())) {
-            // clear inflight event upon successful consumption
-            this.inflightEvent = Optional.empty();
+
+        final long currentTimeMs = time.milliseconds();
+        long pollWaitTimeMs = timeToNextHeartbeatMs();
+
+        // TODO: Add a condition here, like shouldFindCoordinator in the 
future.  Since we don't always need to find
+        //  the coordinator.
+        if (coordinatorManager.isPresent()) {
+            pollWaitTimeMs = Math.min(pollWaitTimeMs,
+                    
handlePollResult(coordinatorManager.get().poll(currentTimeMs)));
         }
 
         // if there are pending events to process, poll then continue without
         // blocking.
-        if (!applicationEventQueue.isEmpty() || inflightEvent.isPresent()) {
-            networkClient.poll(time.timer(0));
+        if (!applicationEventQueue.isEmpty()) {
+            networkClientDelegate.poll(time.timer(0), false);

Review Comment:
   We can just set `pollWaitTimeMs` to 0.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java:
##########
@@ -109,101 +134,102 @@ public void run() {
                 try {
                     runOnce();
                 } catch (final WakeupException e) {
-                    log.debug(
-                        "Exception thrown, background thread won't terminate",
-                        e
-                    );
-                    // swallow the wakeup exception to prevent killing the
-                    // background thread.
+                    log.debug("WakeupException caught, background thread won't 
be interrupted");
+                    // swallow the wakeup exception to prevent killing the 
background thread.
                 }
             }
         } catch (final Throwable t) {
-            log.error(
-                "The background thread failed due to unexpected error",
-                t
-            );
-            if (t instanceof RuntimeException)
-                this.exception.set(Optional.of((RuntimeException) t));
-            else
-                this.exception.set(Optional.of(new RuntimeException(t)));
+            log.error("The background thread failed due to unexpected error", 
t);
+            throw new RuntimeException(t);
         } finally {
             close();
             log.debug("{} closed", getClass());
         }
     }
 
     /**
-     * Process event from a single poll
+     * Process event from a single poll. It performs the following tasks 
sequentially:
+     *  1. Try to poll and event from the queue, and try to process it.
+     *  2. Try to find Coordinator if needed
+     *  3. Try to send fetches.
+     *  4. Poll the networkClient for outstanding requests. If non: poll until 
next
+     *  iteration.
      */
     void runOnce() {
-        this.inflightEvent = maybePollEvent();
-        if (this.inflightEvent.isPresent()) {
-            log.debug("processing application event: {}", this.inflightEvent);
+        Optional<ApplicationEvent> event = maybePollEvent();
+
+        if (event.isPresent()) {
+            log.debug("processing application event: {}", event);
+            consumeApplicationEvent(event.get());
         }
-        if (this.inflightEvent.isPresent() && 
maybeConsumeInflightEvent(this.inflightEvent.get())) {
-            // clear inflight event upon successful consumption
-            this.inflightEvent = Optional.empty();
+
+        final long currentTimeMs = time.milliseconds();
+        long pollWaitTimeMs = timeToNextHeartbeatMs();
+
+        // TODO: Add a condition here, like shouldFindCoordinator in the 
future.  Since we don't always need to find
+        //  the coordinator.
+        if (coordinatorManager.isPresent()) {
+            pollWaitTimeMs = Math.min(pollWaitTimeMs,
+                    
handlePollResult(coordinatorManager.get().poll(currentTimeMs)));
         }
 
         // if there are pending events to process, poll then continue without
         // blocking.
-        if (!applicationEventQueue.isEmpty() || inflightEvent.isPresent()) {
-            networkClient.poll(time.timer(0));
+        if (!applicationEventQueue.isEmpty()) {
+            networkClientDelegate.poll(time.timer(0), false);
             return;
         }
         // if there are no events to process, poll until timeout. The timeout
         // will be the minimum of the requestTimeoutMs, nextHeartBeatMs, and
         // nextMetadataUpdate. See NetworkClient.poll impl.
-        
networkClient.poll(time.timer(timeToNextHeartbeatMs(time.milliseconds())));
+        networkClientDelegate.poll(time.timer(pollWaitTimeMs), false);
+    }
+
+    long handlePollResult(NetworkClientDelegate.PollResult res) {
+        Objects.requireNonNull(res);
+        if (!res.unsentRequests.isEmpty()) {
+            networkClientDelegate.addAll(res.unsentRequests);
+            return Long.MAX_VALUE;
+        }
+        return res.timeMsTillNextPoll;
     }
 
-    private long timeToNextHeartbeatMs(final long nowMs) {
+    private long timeToNextHeartbeatMs() {

Review Comment:
   Let's get rid of this for now. I imagine we'll have a HeartbeatManager or 
something like that later.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##########
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.RequestCompletionHandler;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.Set;
+
+/**
+ * A wrapper around the {@link org.apache.kafka.clients.NetworkClient} to 
handle poll and send operations.
+ */
+public class NetworkClientDelegate implements AutoCloseable {
+    private final KafkaClient client;
+    private final Time time;
+    private final Logger log;
+    private final int requestTimeoutMs;
+    private boolean wakeup = false;
+    private final Queue<UnsentRequest> unsentRequests;
+    private final Set<Node> activeNodes;
+
+    public NetworkClientDelegate(
+            final Time time,
+            final ConsumerConfig config,
+            final LogContext logContext,
+            final KafkaClient client) {
+        this.time = time;
+        this.client = client;
+        this.log = logContext.logger(getClass());
+        this.unsentRequests = new ArrayDeque<>();
+        this.requestTimeoutMs = 
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
+        this.activeNodes = new HashSet<>();
+    }
+
+    public List<ClientResponse> poll(Timer timer, boolean disableWakeup) {
+        final long currentTimeMs = time.milliseconds();
+        // 1. Try to send request in the unsentRequests queue. It is either 
caused by timeout or network error (node
+        // not available)
+        // 2. poll for the results if there's any.
+        // 3. Check connection status for each node, disconnect ones that are 
not reachable.
+        client.wakeup();

Review Comment:
   Why do we wakeup? It will ensure that `poll` returns immediately regardless 
of the timer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to