hachikuji commented on code in PR #12862:
URL: https://github.com/apache/kafka/pull/12862#discussion_r1047701086


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * This is responsible for timing to send the next {@link 
FindCoordinatorRequest} based on the following criteria:
+ *
+ * Whether there is an existing coordinator.
+ * Whether there is an inflight request.
+ * Whether the backoff timer has expired.
+ * The {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} 
contains either a wait timer
+ * or a singleton list of {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest}.
+ *
+ * The {@link FindCoordinatorRequest} will be handled by the {@link 
FindCoordinatorRequestHandler} callback, which
+ * subsequently invokes {@code onResponse} to handle the exception and 
response. Note that the coordinator node will be
+ * marked {@code null} upon receiving a failure.
+ */
+public class CoordinatorRequestManager implements RequestManager {
+
+    private final Logger log;
+    private final ErrorEventHandler errorHandler;

Review Comment:
   Perhaps we could call this `nonRetriableErrorHandler` or something like that 
to make the usage a little clearer?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * This is responsible for timing to send the next {@link 
FindCoordinatorRequest} based on the following criteria:
+ *
+ * Whether there is an existing coordinator.
+ * Whether there is an inflight request.
+ * Whether the backoff timer has expired.
+ * The {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} 
contains either a wait timer
+ * or a singleton list of {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest}.
+ *
+ * The {@link FindCoordinatorRequest} will be handled by the {@link 
FindCoordinatorRequestHandler} callback, which
+ * subsequently invokes {@code onResponse} to handle the exception and 
response. Note that the coordinator node will be
+ * marked {@code null} upon receiving a failure.
+ */
+public class CoordinatorRequestManager implements RequestManager {
+
+    private final Logger log;
+    private final ErrorEventHandler errorHandler;
+    private final long rebalanceTimeoutMs;
+    private final String groupId;
+
+    private final RequestState coordinatorRequestState;
+    private long timeMarkedUnknownMs = -1L; // starting logging a warning only 
after unable to connect for a while
+    private Node coordinator;
+
+    public CoordinatorRequestManager(final LogContext logContext,
+                                     final ConsumerConfig config,
+                                     final ErrorEventHandler errorHandler,
+                                     final String groupId,
+                                     final long rebalanceTimeoutMs) {
+        Objects.requireNonNull(groupId);
+        this.log = logContext.logger(this.getClass());
+        this.errorHandler = errorHandler;
+        this.groupId = groupId;
+        this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+        this.coordinatorRequestState = new RequestState(config);
+    }
+
+    // Visible for testing
+    CoordinatorRequestManager(final LogContext logContext,
+                              final ErrorEventHandler errorHandler,
+                              final String groupId,
+                              final long rebalanceTimeoutMs,
+                              final RequestState coordinatorRequestState) {
+        Objects.requireNonNull(groupId);
+        this.log = logContext.logger(this.getClass());
+        this.errorHandler = errorHandler;
+        this.groupId = groupId;
+        this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+        this.coordinatorRequestState = coordinatorRequestState;
+    }
+
+    /**
+     * Poll for the FindCoordinator request.
+     * If we don't need to discover a coordinator, this method will return a 
PollResult with Long.MAX_VALUE backoff time and an empty list.
+     * If we are still backing off from a previous attempt, this method will 
return a PollResult with the remaining backoff time and an empty list.
+     * Otherwise, this returns will return a PollResult with a singleton list 
of UnsentRequest and Long.MAX_VALUE backoff time.
+     * Note that this method does not involve any actual network IO, and it 
only determines if we need to send a new request or not.
+     *
+     * @param currentTimeMs current time in ms.
+     * @return {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult}. 
This will not be {@code null}.
+     */
+    @Override
+    public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
+        if (this.coordinator != null) {
+            return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, 
Collections.emptyList());
+        }
+
+        if (coordinatorRequestState.canSendRequest(currentTimeMs)) {
+            NetworkClientDelegate.UnsentRequest request = 
makeFindCoordinatorRequest(currentTimeMs);
+            return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, 
Collections.singletonList(request));
+        }
+
+        return new NetworkClientDelegate.PollResult(
+                coordinatorRequestState.remainingBackoffMs(currentTimeMs),
+                new ArrayList<>());
+    }
+
+    private NetworkClientDelegate.UnsentRequest 
makeFindCoordinatorRequest(final long currentTimeMs) {
+        coordinatorRequestState.updateLastSend(currentTimeMs);
+        FindCoordinatorRequestData data = new FindCoordinatorRequestData()
+                .setKeyType(FindCoordinatorRequest.CoordinatorType.GROUP.id())
+                .setKey(this.groupId);
+        return NetworkClientDelegate.UnsentRequest.makeUnsentRequest(
+                new FindCoordinatorRequest.Builder(data),
+                new FindCoordinatorRequestHandler(),
+                null);
+    }
+
+    /**
+     * Mark the current coordinator null.
+     * @param cause why the coordinator is marked unknown.
+     * @param currentTimeMs the current time in ms.
+     */
+    protected void markCoordinatorUnknown(final String cause, final long 
currentTimeMs) {
+        if (this.coordinator != null) {
+            log.info("Group coordinator {} is unavailable or invalid due to 
cause: {}. "
+                            + "Rediscovery will be attempted.", 
this.coordinator, cause);
+            this.coordinator = null;
+            timeMarkedUnknownMs = currentTimeMs;
+        } else {
+            long durationOfOngoingDisconnect = Math.max(0, currentTimeMs - 
timeMarkedUnknownMs);
+            if (durationOfOngoingDisconnect > this.rebalanceTimeoutMs)
+                log.debug("Consumer has been disconnected from the group 
coordinator for {}ms",
+                        durationOfOngoingDisconnect);
+        }
+    }
+
+    private void onSuccessfulResponse(final 
FindCoordinatorResponseData.Coordinator coordinator) {
+        // use MAX_VALUE - node.id as the coordinator id to allow separate 
connections
+        // for the coordinator in the underlying network client layer
+        int coordinatorConnectionId = Integer.MAX_VALUE - coordinator.nodeId();
+
+        this.coordinator = new Node(
+                coordinatorConnectionId,
+                coordinator.host(),
+                coordinator.port());
+        log.info("Discovered group coordinator {}", coordinator);
+        coordinatorRequestState.reset();
+    }
+
+    private void onFailedCoordinatorResponse(
+            final Exception exception,
+            final long currentTimeMs) {
+        coordinatorRequestState.updateLastFailedAttempt(currentTimeMs);
+        markCoordinatorUnknown("coordinator unavailable", currentTimeMs);
+
+        if (exception == Errors.GROUP_AUTHORIZATION_FAILED.exception()) {
+            log.debug("FindCoordinator request failed due to authorization 
error {}", exception.getMessage());
+            
errorHandler.handle(GroupAuthorizationException.forGroupId(this.groupId));
+            return;
+        }
+
+        if (exception instanceof RetriableException) {
+            log.debug("FindCoordinator request failed due to retriable 
exception", exception);
+            return;
+        }
+
+        log.warn("FindCoordinator request failed due to fatal exception", 
exception);
+        errorHandler.handle(exception);
+    }
+
+    /**
+     * Handles the response and exception upon completing the {@link 
FindCoordinatorRequest}. This is invoked in the callback
+     * {@link FindCoordinatorRequestHandler}. If the response was successful, 
a coordinator node will be updated. If the
+     * response failed due to errors, the current coordinator will be marked 
unknown.
+     * @param currentTimeMs current time ins ms.
+     * @param response the response, can be null.
+     * @param e the exception, can be null.
+     */
+    public void onResponse(final long currentTimeMs, final 
FindCoordinatorResponse response, final Exception e) {
+        // handles Runtime exception
+        if (e != null) {
+            onFailedCoordinatorResponse(e, currentTimeMs);
+            return;
+        }
+
+        Optional<FindCoordinatorResponseData.Coordinator> coordinator = 
response.getCoordinatorByKey(this.groupId);
+        if (!coordinator.isPresent()) {
+            String msg = String.format("Coordinator not found for groupId: 
%s", this.groupId);
+            onFailedCoordinatorResponse(new IllegalStateException(msg), 
currentTimeMs);
+            return;
+        }
+
+        FindCoordinatorResponseData.Coordinator node = coordinator.get();
+        if (node.errorCode() != Errors.NONE.code()) {
+            
onFailedCoordinatorResponse(Errors.forCode(node.errorCode()).exception(), 
currentTimeMs);
+            return;
+        }
+        onSuccessfulResponse(node);
+    }
+
+    /**
+     * Returns the current coordinator node. It can be {@code null}.

Review Comment:
   I wonder if it it would be safer to return `Optional`.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * This is responsible for timing to send the next {@link 
FindCoordinatorRequest} based on the following criteria:
+ *
+ * Whether there is an existing coordinator.
+ * Whether there is an inflight request.
+ * Whether the backoff timer has expired.
+ * The {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} 
contains either a wait timer
+ * or a singleton list of {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest}.
+ *
+ * The {@link FindCoordinatorRequest} will be handled by the {@link 
FindCoordinatorRequestHandler} callback, which
+ * subsequently invokes {@code onResponse} to handle the exception and 
response. Note that the coordinator node will be
+ * marked {@code null} upon receiving a failure.
+ */
+public class CoordinatorRequestManager implements RequestManager {
+
+    private final Logger log;
+    private final ErrorEventHandler errorHandler;
+    private final long rebalanceTimeoutMs;
+    private final String groupId;
+
+    private final RequestState coordinatorRequestState;
+    private long timeMarkedUnknownMs = -1L; // starting logging a warning only 
after unable to connect for a while
+    private Node coordinator;
+
+    public CoordinatorRequestManager(final LogContext logContext,
+                                     final ConsumerConfig config,
+                                     final ErrorEventHandler errorHandler,
+                                     final String groupId,
+                                     final long rebalanceTimeoutMs) {
+        Objects.requireNonNull(groupId);
+        this.log = logContext.logger(this.getClass());
+        this.errorHandler = errorHandler;
+        this.groupId = groupId;
+        this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+        this.coordinatorRequestState = new RequestState(config);
+    }
+
+    // Visible for testing
+    CoordinatorRequestManager(final LogContext logContext,
+                              final ErrorEventHandler errorHandler,
+                              final String groupId,
+                              final long rebalanceTimeoutMs,
+                              final RequestState coordinatorRequestState) {
+        Objects.requireNonNull(groupId);
+        this.log = logContext.logger(this.getClass());
+        this.errorHandler = errorHandler;
+        this.groupId = groupId;
+        this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+        this.coordinatorRequestState = coordinatorRequestState;
+    }
+
+    /**
+     * Poll for the FindCoordinator request.
+     * If we don't need to discover a coordinator, this method will return a 
PollResult with Long.MAX_VALUE backoff time and an empty list.
+     * If we are still backing off from a previous attempt, this method will 
return a PollResult with the remaining backoff time and an empty list.
+     * Otherwise, this returns will return a PollResult with a singleton list 
of UnsentRequest and Long.MAX_VALUE backoff time.
+     * Note that this method does not involve any actual network IO, and it 
only determines if we need to send a new request or not.
+     *
+     * @param currentTimeMs current time in ms.
+     * @return {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult}. 
This will not be {@code null}.
+     */
+    @Override
+    public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
+        if (this.coordinator != null) {
+            return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, 
Collections.emptyList());
+        }
+
+        if (coordinatorRequestState.canSendRequest(currentTimeMs)) {
+            NetworkClientDelegate.UnsentRequest request = 
makeFindCoordinatorRequest(currentTimeMs);
+            return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, 
Collections.singletonList(request));
+        }
+
+        return new NetworkClientDelegate.PollResult(
+                coordinatorRequestState.remainingBackoffMs(currentTimeMs),
+                new ArrayList<>());
+    }
+
+    private NetworkClientDelegate.UnsentRequest 
makeFindCoordinatorRequest(final long currentTimeMs) {
+        coordinatorRequestState.updateLastSend(currentTimeMs);
+        FindCoordinatorRequestData data = new FindCoordinatorRequestData()
+                .setKeyType(FindCoordinatorRequest.CoordinatorType.GROUP.id())
+                .setKey(this.groupId);
+        return NetworkClientDelegate.UnsentRequest.makeUnsentRequest(
+                new FindCoordinatorRequest.Builder(data),
+                new FindCoordinatorRequestHandler(),
+                null);
+    }
+
+    /**
+     * Mark the current coordinator null.
+     * @param cause why the coordinator is marked unknown.
+     * @param currentTimeMs the current time in ms.
+     */
+    protected void markCoordinatorUnknown(final String cause, final long 
currentTimeMs) {
+        if (this.coordinator != null) {
+            log.info("Group coordinator {} is unavailable or invalid due to 
cause: {}. "
+                            + "Rediscovery will be attempted.", 
this.coordinator, cause);
+            this.coordinator = null;
+            timeMarkedUnknownMs = currentTimeMs;
+        } else {
+            long durationOfOngoingDisconnect = Math.max(0, currentTimeMs - 
timeMarkedUnknownMs);
+            if (durationOfOngoingDisconnect > this.rebalanceTimeoutMs)
+                log.debug("Consumer has been disconnected from the group 
coordinator for {}ms",
+                        durationOfOngoingDisconnect);
+        }
+    }
+
+    private void onSuccessfulResponse(final 
FindCoordinatorResponseData.Coordinator coordinator) {
+        // use MAX_VALUE - node.id as the coordinator id to allow separate 
connections
+        // for the coordinator in the underlying network client layer
+        int coordinatorConnectionId = Integer.MAX_VALUE - coordinator.nodeId();
+
+        this.coordinator = new Node(
+                coordinatorConnectionId,
+                coordinator.host(),
+                coordinator.port());
+        log.info("Discovered group coordinator {}", coordinator);
+        coordinatorRequestState.reset();
+    }
+
+    private void onFailedCoordinatorResponse(
+            final Exception exception,
+            final long currentTimeMs) {
+        coordinatorRequestState.updateLastFailedAttempt(currentTimeMs);
+        markCoordinatorUnknown("coordinator unavailable", currentTimeMs);
+
+        if (exception == Errors.GROUP_AUTHORIZATION_FAILED.exception()) {

Review Comment:
   nit: could we move the retriable case to the top here so that the fatal 
cases are not split?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * This is responsible for timing to send the next {@link 
FindCoordinatorRequest} based on the following criteria:
+ *
+ * Whether there is an existing coordinator.
+ * Whether there is an inflight request.
+ * Whether the backoff timer has expired.
+ * The {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} 
contains either a wait timer
+ * or a singleton list of {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest}.
+ *
+ * The {@link FindCoordinatorRequest} will be handled by the {@link 
FindCoordinatorRequestHandler} callback, which
+ * subsequently invokes {@code onResponse} to handle the exception and 
response. Note that the coordinator node will be
+ * marked {@code null} upon receiving a failure.
+ */
+public class CoordinatorRequestManager implements RequestManager {
+
+    private final Logger log;
+    private final ErrorEventHandler errorHandler;
+    private final long rebalanceTimeoutMs;
+    private final String groupId;
+
+    private final RequestState coordinatorRequestState;
+    private long timeMarkedUnknownMs = -1L; // starting logging a warning only 
after unable to connect for a while
+    private Node coordinator;
+
+    public CoordinatorRequestManager(final LogContext logContext,
+                                     final ConsumerConfig config,
+                                     final ErrorEventHandler errorHandler,
+                                     final String groupId,
+                                     final long rebalanceTimeoutMs) {
+        Objects.requireNonNull(groupId);
+        this.log = logContext.logger(this.getClass());
+        this.errorHandler = errorHandler;
+        this.groupId = groupId;
+        this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+        this.coordinatorRequestState = new RequestState(config);
+    }
+
+    // Visible for testing
+    CoordinatorRequestManager(final LogContext logContext,
+                              final ErrorEventHandler errorHandler,
+                              final String groupId,
+                              final long rebalanceTimeoutMs,
+                              final RequestState coordinatorRequestState) {
+        Objects.requireNonNull(groupId);
+        this.log = logContext.logger(this.getClass());
+        this.errorHandler = errorHandler;
+        this.groupId = groupId;
+        this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+        this.coordinatorRequestState = coordinatorRequestState;
+    }
+
+    /**
+     * Poll for the FindCoordinator request.
+     * If we don't need to discover a coordinator, this method will return a 
PollResult with Long.MAX_VALUE backoff time and an empty list.
+     * If we are still backing off from a previous attempt, this method will 
return a PollResult with the remaining backoff time and an empty list.
+     * Otherwise, this returns will return a PollResult with a singleton list 
of UnsentRequest and Long.MAX_VALUE backoff time.
+     * Note that this method does not involve any actual network IO, and it 
only determines if we need to send a new request or not.
+     *
+     * @param currentTimeMs current time in ms.
+     * @return {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult}. 
This will not be {@code null}.
+     */
+    @Override
+    public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
+        if (this.coordinator != null) {
+            return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, 
Collections.emptyList());
+        }
+
+        if (coordinatorRequestState.canSendRequest(currentTimeMs)) {
+            NetworkClientDelegate.UnsentRequest request = 
makeFindCoordinatorRequest(currentTimeMs);
+            return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, 
Collections.singletonList(request));
+        }
+
+        return new NetworkClientDelegate.PollResult(
+                coordinatorRequestState.remainingBackoffMs(currentTimeMs),
+                new ArrayList<>());
+    }
+
+    private NetworkClientDelegate.UnsentRequest 
makeFindCoordinatorRequest(final long currentTimeMs) {
+        coordinatorRequestState.updateLastSend(currentTimeMs);
+        FindCoordinatorRequestData data = new FindCoordinatorRequestData()
+                .setKeyType(FindCoordinatorRequest.CoordinatorType.GROUP.id())
+                .setKey(this.groupId);
+        return NetworkClientDelegate.UnsentRequest.makeUnsentRequest(
+                new FindCoordinatorRequest.Builder(data),
+                new FindCoordinatorRequestHandler(),
+                null);
+    }
+
+    /**
+     * Mark the current coordinator null.
+     * @param cause why the coordinator is marked unknown.
+     * @param currentTimeMs the current time in ms.
+     */
+    protected void markCoordinatorUnknown(final String cause, final long 
currentTimeMs) {
+        if (this.coordinator != null) {
+            log.info("Group coordinator {} is unavailable or invalid due to 
cause: {}. "
+                            + "Rediscovery will be attempted.", 
this.coordinator, cause);
+            this.coordinator = null;
+            timeMarkedUnknownMs = currentTimeMs;
+        } else {
+            long durationOfOngoingDisconnect = Math.max(0, currentTimeMs - 
timeMarkedUnknownMs);
+            if (durationOfOngoingDisconnect > this.rebalanceTimeoutMs)
+                log.debug("Consumer has been disconnected from the group 
coordinator for {}ms",
+                        durationOfOngoingDisconnect);
+        }
+    }
+
+    private void onSuccessfulResponse(final 
FindCoordinatorResponseData.Coordinator coordinator) {
+        // use MAX_VALUE - node.id as the coordinator id to allow separate 
connections
+        // for the coordinator in the underlying network client layer
+        int coordinatorConnectionId = Integer.MAX_VALUE - coordinator.nodeId();
+
+        this.coordinator = new Node(
+                coordinatorConnectionId,
+                coordinator.host(),
+                coordinator.port());
+        log.info("Discovered group coordinator {}", coordinator);
+        coordinatorRequestState.reset();
+    }
+
+    private void onFailedCoordinatorResponse(
+            final Exception exception,
+            final long currentTimeMs) {
+        coordinatorRequestState.updateLastFailedAttempt(currentTimeMs);
+        markCoordinatorUnknown("coordinator unavailable", currentTimeMs);
+
+        if (exception == Errors.GROUP_AUTHORIZATION_FAILED.exception()) {
+            log.debug("FindCoordinator request failed due to authorization 
error {}", exception.getMessage());
+            
errorHandler.handle(GroupAuthorizationException.forGroupId(this.groupId));
+            return;
+        }
+
+        if (exception instanceof RetriableException) {
+            log.debug("FindCoordinator request failed due to retriable 
exception", exception);
+            return;
+        }
+
+        log.warn("FindCoordinator request failed due to fatal exception", 
exception);
+        errorHandler.handle(exception);
+    }
+
+    /**
+     * Handles the response and exception upon completing the {@link 
FindCoordinatorRequest}. This is invoked in the callback
+     * {@link FindCoordinatorRequestHandler}. If the response was successful, 
a coordinator node will be updated. If the
+     * response failed due to errors, the current coordinator will be marked 
unknown.
+     * @param currentTimeMs current time ins ms.
+     * @param response the response, can be null.

Review Comment:
   Perhaps we could clarify this? It looks like the code expects the response 
to be non-null if the exception is null?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java:
##########
@@ -109,101 +132,89 @@ public void run() {
                 try {
                     runOnce();
                 } catch (final WakeupException e) {
-                    log.debug(
-                        "Exception thrown, background thread won't terminate",
-                        e
-                    );
-                    // swallow the wakeup exception to prevent killing the
-                    // background thread.
+                    log.debug("WakeupException caught, background thread won't 
be interrupted");
+                    // swallow the wakeup exception to prevent killing the 
background thread.
                 }
             }
         } catch (final Throwable t) {
-            log.error(
-                "The background thread failed due to unexpected error",
-                t
-            );
-            if (t instanceof RuntimeException)
-                this.exception.set(Optional.of((RuntimeException) t));
-            else
-                this.exception.set(Optional.of(new RuntimeException(t)));
+            log.error("The background thread failed due to unexpected error", 
t);
+            throw new RuntimeException(t);
         } finally {
             close();
             log.debug("{} closed", getClass());
         }
     }
 
     /**
-     * Process event from a single poll
+     * Poll and process an {@link ApplicationEvent}. It performs the following 
tasks:
+     *  1. Try to poll and event from the queue, and try to process it using 
the coorsponding {@link ApplicationEventProcessor}.
+     *  2. Try to find Coordinator if needed
+     *  3. Poll the networkClient for outstanding requests.
      */
     void runOnce() {
-        this.inflightEvent = maybePollEvent();
-        if (this.inflightEvent.isPresent()) {
-            log.debug("processing application event: {}", this.inflightEvent);
+        Optional<ApplicationEvent> event = maybePollEvent();
+
+        if (event.isPresent()) {
+            log.debug("processing application event: {}", event);
+            consumeApplicationEvent(event.get());
         }
-        if (this.inflightEvent.isPresent() && 
maybeConsumeInflightEvent(this.inflightEvent.get())) {
-            // clear inflight event upon successful consumption
-            this.inflightEvent = Optional.empty();
+
+        final long currentTimeMs = time.milliseconds();
+        // TODO: This is just a place holder value.
+        long pollWaitTimeMs = 100;

Review Comment:
   In `ConsumerNetworkClient`, we hard-code the following:
   ```java
       private static final int MAX_POLL_TIMEOUT_MS = 5000;
   ```
   Perhaps we can do the same here and drop the TODO? 



##########
clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java:
##########
@@ -50,6 +52,22 @@ public FindCoordinatorResponse(FindCoordinatorResponseData 
data) {
         this.data = data;
     }
 
+    public Optional<Coordinator> getCoordinatorByKey(String key) {

Review Comment:
   nit: usually we drop `get` from method names



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java:
##########
@@ -109,101 +132,89 @@ public void run() {
                 try {
                     runOnce();
                 } catch (final WakeupException e) {
-                    log.debug(
-                        "Exception thrown, background thread won't terminate",
-                        e
-                    );
-                    // swallow the wakeup exception to prevent killing the
-                    // background thread.
+                    log.debug("WakeupException caught, background thread won't 
be interrupted");
+                    // swallow the wakeup exception to prevent killing the 
background thread.
                 }
             }
         } catch (final Throwable t) {
-            log.error(
-                "The background thread failed due to unexpected error",
-                t
-            );
-            if (t instanceof RuntimeException)
-                this.exception.set(Optional.of((RuntimeException) t));
-            else
-                this.exception.set(Optional.of(new RuntimeException(t)));
+            log.error("The background thread failed due to unexpected error", 
t);
+            throw new RuntimeException(t);
         } finally {
             close();
             log.debug("{} closed", getClass());
         }
     }
 
     /**
-     * Process event from a single poll
+     * Poll and process an {@link ApplicationEvent}. It performs the following 
tasks:
+     *  1. Try to poll and event from the queue, and try to process it using 
the coorsponding {@link ApplicationEventProcessor}.
+     *  2. Try to find Coordinator if needed
+     *  3. Poll the networkClient for outstanding requests.
      */
     void runOnce() {
-        this.inflightEvent = maybePollEvent();
-        if (this.inflightEvent.isPresent()) {
-            log.debug("processing application event: {}", this.inflightEvent);
+        Optional<ApplicationEvent> event = maybePollEvent();
+
+        if (event.isPresent()) {
+            log.debug("processing application event: {}", event);
+            consumeApplicationEvent(event.get());
         }
-        if (this.inflightEvent.isPresent() && 
maybeConsumeInflightEvent(this.inflightEvent.get())) {
-            // clear inflight event upon successful consumption
-            this.inflightEvent = Optional.empty();
+
+        final long currentTimeMs = time.milliseconds();
+        // TODO: This is just a place holder value.
+        long pollWaitTimeMs = 100;
+
+        // TODO: Add a condition here, like shouldFindCoordinator in the 
future.  Since we don't always need to find

Review Comment:
   Can we drop this? This is already built into the call to 
`CoordinatorManager.poll`.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * This is responsible for timing to send the next {@link 
FindCoordinatorRequest} based on the following criteria:
+ *
+ * Whether there is an existing coordinator.
+ * Whether there is an inflight request.
+ * Whether the backoff timer has expired.
+ * The {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} 
contains either a wait timer
+ * or a singleton list of {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest}.
+ *
+ * The {@link FindCoordinatorRequest} will be handled by the {@link 
FindCoordinatorRequestHandler} callback, which
+ * subsequently invokes {@code onResponse} to handle the exception and 
response. Note that the coordinator node will be
+ * marked {@code null} upon receiving a failure.
+ */
+public class CoordinatorRequestManager implements RequestManager {
+
+    private final Logger log;
+    private final ErrorEventHandler errorHandler;
+    private final long rebalanceTimeoutMs;
+    private final String groupId;
+
+    private final RequestState coordinatorRequestState;
+    private long timeMarkedUnknownMs = -1L; // starting logging a warning only 
after unable to connect for a while
+    private Node coordinator;
+
+    public CoordinatorRequestManager(final LogContext logContext,
+                                     final ConsumerConfig config,
+                                     final ErrorEventHandler errorHandler,
+                                     final String groupId,
+                                     final long rebalanceTimeoutMs) {
+        Objects.requireNonNull(groupId);
+        this.log = logContext.logger(this.getClass());
+        this.errorHandler = errorHandler;
+        this.groupId = groupId;
+        this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+        this.coordinatorRequestState = new RequestState(config);
+    }
+
+    // Visible for testing
+    CoordinatorRequestManager(final LogContext logContext,
+                              final ErrorEventHandler errorHandler,
+                              final String groupId,
+                              final long rebalanceTimeoutMs,

Review Comment:
   The dependence on the rebalance timeout is a little strange if the user is 
using simple partition assignment. I wonder if we could hard-code a value 
instead. Perhaps we can log a message every one minute (say) that the 
coordinator is unknown?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java:
##########
@@ -20,10 +20,23 @@
  * This is the abstract definition of the events created by the KafkaConsumer 
API
  */
 abstract public class ApplicationEvent {
+    public final Type type;
+
+    protected ApplicationEvent(Type type) {
+        this.type = type;
+    }
     /**
      * process the application event. Return true upon succesful execution,
      * false otherwise.
      * @return true if the event was successfully executed; false otherwise.
      */
-    public abstract boolean process();
+
+    @Override
+    public String toString() {
+        return type + " ApplicationEvent";
+    }
+    public enum Type {

Review Comment:
   Perhaps we can leave this for a follow-up since we are not implementing the 
COMMIT type here.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * This is responsible for timing to send the next {@link 
FindCoordinatorRequest} based on the following criteria:
+ *
+ * Whether there is an existing coordinator.
+ * Whether there is an inflight request.
+ * Whether the backoff timer has expired.
+ * The {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} 
contains either a wait timer
+ * or a singleton list of {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest}.
+ *
+ * The {@link FindCoordinatorRequest} will be handled by the {@link 
FindCoordinatorRequestHandler} callback, which
+ * subsequently invokes {@code onResponse} to handle the exception and 
response. Note that the coordinator node will be
+ * marked {@code null} upon receiving a failure.
+ */
+public class CoordinatorRequestManager implements RequestManager {
+
+    private final Logger log;
+    private final ErrorEventHandler errorHandler;
+    private final long rebalanceTimeoutMs;
+    private final String groupId;
+
+    private final RequestState coordinatorRequestState;
+    private long timeMarkedUnknownMs = -1L; // starting logging a warning only 
after unable to connect for a while
+    private Node coordinator;
+
+    public CoordinatorRequestManager(final LogContext logContext,
+                                     final ConsumerConfig config,
+                                     final ErrorEventHandler errorHandler,
+                                     final String groupId,
+                                     final long rebalanceTimeoutMs) {
+        Objects.requireNonNull(groupId);
+        this.log = logContext.logger(this.getClass());
+        this.errorHandler = errorHandler;
+        this.groupId = groupId;
+        this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+        this.coordinatorRequestState = new RequestState(config);
+    }
+
+    // Visible for testing
+    CoordinatorRequestManager(final LogContext logContext,
+                              final ErrorEventHandler errorHandler,
+                              final String groupId,
+                              final long rebalanceTimeoutMs,
+                              final RequestState coordinatorRequestState) {
+        Objects.requireNonNull(groupId);
+        this.log = logContext.logger(this.getClass());
+        this.errorHandler = errorHandler;
+        this.groupId = groupId;
+        this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+        this.coordinatorRequestState = coordinatorRequestState;
+    }
+
+    /**
+     * Poll for the FindCoordinator request.
+     * If we don't need to discover a coordinator, this method will return a 
PollResult with Long.MAX_VALUE backoff time and an empty list.
+     * If we are still backing off from a previous attempt, this method will 
return a PollResult with the remaining backoff time and an empty list.
+     * Otherwise, this returns will return a PollResult with a singleton list 
of UnsentRequest and Long.MAX_VALUE backoff time.
+     * Note that this method does not involve any actual network IO, and it 
only determines if we need to send a new request or not.
+     *
+     * @param currentTimeMs current time in ms.
+     * @return {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult}. 
This will not be {@code null}.
+     */
+    @Override
+    public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
+        if (this.coordinator != null) {
+            return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, 
Collections.emptyList());
+        }
+
+        if (coordinatorRequestState.canSendRequest(currentTimeMs)) {
+            NetworkClientDelegate.UnsentRequest request = 
makeFindCoordinatorRequest(currentTimeMs);
+            return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, 
Collections.singletonList(request));
+        }
+
+        return new NetworkClientDelegate.PollResult(
+                coordinatorRequestState.remainingBackoffMs(currentTimeMs),
+                new ArrayList<>());

Review Comment:
   nit: we can use `Collections.emptyList()`?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##########
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.RequestCompletionHandler;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.Set;
+
+/**
+ * A wrapper around the {@link org.apache.kafka.clients.NetworkClient} to 
handle poll and send operations.
+ */
+public class NetworkClientDelegate implements AutoCloseable {
+    private final KafkaClient client;
+    private final Time time;
+    private final Logger log;
+    private final int requestTimeoutMs;
+    private boolean wakeup = false;
+    private final Queue<UnsentRequest> unsentRequests;
+    private final Set<Node> activeNodes;
+
+    public NetworkClientDelegate(
+            final Time time,
+            final ConsumerConfig config,
+            final LogContext logContext,
+            final KafkaClient client) {
+        this.time = time;
+        this.client = client;
+        this.log = logContext.logger(getClass());
+        this.unsentRequests = new ArrayDeque<>();
+        this.requestTimeoutMs = 
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
+        this.activeNodes = new HashSet<>();
+    }
+
+    /**
+     * Returns the responses of the sent requests. This methods will try to 
send the unsent requests, poll for responses,
+     * and check the disconnected nodes.
+     * @param timeoutMs
+     * @return
+     */
+    public List<ClientResponse> poll(final long timeoutMs) {
+        final long currentTimeMs = time.milliseconds();
+        trySend(currentTimeMs);
+        List<ClientResponse> res = this.client.poll(timeoutMs, currentTimeMs);
+        checkDisconnects();
+        maybeTriggerWakeup();
+        return res;
+    }
+
+    /**
+     * Iterates through the unsentRequests queue and tries to send the unsent 
requests. If the request doesn't have an
+     * assigned node, it will find the leastLoadedOne.
+     * Here it also stores all the nodes in the {@code activeNodes}, which 
will then be used to check for the disconnection.
+     */
+    void trySend(final long currentTimeMs) {

Review Comment:
   Can this be private? Seems like we can test it indirectly through a call to 
`poll`?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java:
##########
@@ -50,57 +49,81 @@ public class DefaultBackgroundThread extends KafkaThread {
     private final Logger log;
     private final BlockingQueue<ApplicationEvent> applicationEventQueue;
     private final BlockingQueue<BackgroundEvent> backgroundEventQueue;
-    private final ConsumerNetworkClient networkClient;
-    private final SubscriptionState subscriptions;
     private final ConsumerMetadata metadata;
-    private final Metrics metrics;
     private final ConsumerConfig config;
+    // empty if groupId is null
+    private final Optional<CoordinatorRequestManager> coordinatorManager;
+    private final ApplicationEventProcessor applicationEventProcessor;
+    private final NetworkClientDelegate networkClientDelegate;
+    private final ErrorEventHandler errorEventHandler;
 
-    private String clientId;
-    private long retryBackoffMs;
-    private int heartbeatIntervalMs;
     private boolean running;
-    private Optional<ApplicationEvent> inflightEvent = Optional.empty();
-    private final AtomicReference<Optional<RuntimeException>> exception =
-        new AtomicReference<>(Optional.empty());
 
+    // Visible for testing
+    DefaultBackgroundThread(final Time time,
+                            final ConsumerConfig config,
+                            final LogContext logContext,
+                            final BlockingQueue<ApplicationEvent> 
applicationEventQueue,
+                            final BlockingQueue<BackgroundEvent> 
backgroundEventQueue,
+                            final ErrorEventHandler errorEventHandler,
+                            final ApplicationEventProcessor processor,
+                            final ConsumerMetadata metadata,
+                            final NetworkClientDelegate networkClient,
+                            final CoordinatorRequestManager 
coordinatorManager) {
+        super(BACKGROUND_THREAD_NAME, true);
+        this.time = time;
+        this.running = true;
+        this.log = logContext.logger(getClass());
+        this.applicationEventQueue = applicationEventQueue;
+        this.backgroundEventQueue = backgroundEventQueue;
+        this.applicationEventProcessor = processor;
+        this.config = config;
+        this.metadata = metadata;
+        this.networkClientDelegate = networkClient;
+        this.coordinatorManager = Optional.ofNullable(coordinatorManager);
+        this.errorEventHandler = errorEventHandler;
+    }
     public DefaultBackgroundThread(final Time time,
                                    final ConsumerConfig config,
+                                   final GroupRebalanceConfig rebalanceConfig,
                                    final LogContext logContext,
                                    final BlockingQueue<ApplicationEvent> 
applicationEventQueue,
                                    final BlockingQueue<BackgroundEvent> 
backgroundEventQueue,
-                                   final SubscriptionState subscriptions,
                                    final ConsumerMetadata metadata,
-                                   final ConsumerNetworkClient networkClient,
-                                   final Metrics metrics) {
+                                   final KafkaClient networkClient) {
         super(BACKGROUND_THREAD_NAME, true);
         try {
             this.time = time;
-            this.log = logContext.logger(DefaultBackgroundThread.class);
+            this.log = logContext.logger(getClass());
             this.applicationEventQueue = applicationEventQueue;
             this.backgroundEventQueue = backgroundEventQueue;
             this.config = config;
-            setConfig();
-            this.inflightEvent = Optional.empty();
             // subscriptionState is initialized by the polling thread
-            this.subscriptions = subscriptions;
             this.metadata = metadata;
-            this.networkClient = networkClient;
-            this.metrics = metrics;
+            this.networkClientDelegate = new NetworkClientDelegate(
+                    this.time,
+                    this.config,
+                    logContext,
+                    networkClient);
             this.running = true;
+            this.errorEventHandler = new 
ErrorEventHandler(this.backgroundEventQueue);
+            String groupId = rebalanceConfig.groupId;
+            // TODO: Maybe consider a NOOP implementation

Review Comment:
   Let's not leave TODOs in the code. We can file jiras instead.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java:
##########
@@ -109,101 +132,89 @@ public void run() {
                 try {
                     runOnce();
                 } catch (final WakeupException e) {
-                    log.debug(
-                        "Exception thrown, background thread won't terminate",
-                        e
-                    );
-                    // swallow the wakeup exception to prevent killing the
-                    // background thread.
+                    log.debug("WakeupException caught, background thread won't 
be interrupted");
+                    // swallow the wakeup exception to prevent killing the 
background thread.
                 }
             }
         } catch (final Throwable t) {
-            log.error(
-                "The background thread failed due to unexpected error",
-                t
-            );
-            if (t instanceof RuntimeException)
-                this.exception.set(Optional.of((RuntimeException) t));
-            else
-                this.exception.set(Optional.of(new RuntimeException(t)));
+            log.error("The background thread failed due to unexpected error", 
t);
+            throw new RuntimeException(t);
         } finally {
             close();
             log.debug("{} closed", getClass());
         }
     }
 
     /**
-     * Process event from a single poll
+     * Poll and process an {@link ApplicationEvent}. It performs the following 
tasks:
+     *  1. Try to poll and event from the queue, and try to process it using 
the coorsponding {@link ApplicationEventProcessor}.
+     *  2. Try to find Coordinator if needed
+     *  3. Poll the networkClient for outstanding requests.
      */
     void runOnce() {
-        this.inflightEvent = maybePollEvent();
-        if (this.inflightEvent.isPresent()) {
-            log.debug("processing application event: {}", this.inflightEvent);
+        Optional<ApplicationEvent> event = maybePollEvent();
+
+        if (event.isPresent()) {
+            log.debug("processing application event: {}", event);
+            consumeApplicationEvent(event.get());
         }
-        if (this.inflightEvent.isPresent() && 
maybeConsumeInflightEvent(this.inflightEvent.get())) {
-            // clear inflight event upon successful consumption
-            this.inflightEvent = Optional.empty();
+
+        final long currentTimeMs = time.milliseconds();
+        // TODO: This is just a place holder value.
+        long pollWaitTimeMs = 100;
+
+        // TODO: Add a condition here, like shouldFindCoordinator in the 
future.  Since we don't always need to find
+        //  the coordinator.
+        if (coordinatorManager.isPresent()) {
+            pollWaitTimeMs = Math.min(pollWaitTimeMs, 
handlePollResult(coordinatorManager.get().poll(currentTimeMs)));
         }
 
         // if there are pending events to process, poll then continue without
         // blocking.
-        if (!applicationEventQueue.isEmpty() || inflightEvent.isPresent()) {
-            networkClient.poll(time.timer(0));
+        if (!applicationEventQueue.isEmpty()) {
+            networkClientDelegate.poll(0);
             return;
         }
-        // if there are no events to process, poll until timeout. The timeout
+        // if there are no pending application event, poll until timeout. The 
timeout
         // will be the minimum of the requestTimeoutMs, nextHeartBeatMs, and
         // nextMetadataUpdate. See NetworkClient.poll impl.
-        
networkClient.poll(time.timer(timeToNextHeartbeatMs(time.milliseconds())));
+        networkClientDelegate.poll(pollWaitTimeMs);
     }
 
-    private long timeToNextHeartbeatMs(final long nowMs) {
-        // TODO: implemented when heartbeat is added to the impl
-        return 100;
+    long handlePollResult(NetworkClientDelegate.PollResult res) {
+        Objects.requireNonNull(res);

Review Comment:
   nit: seems like overkill. Was there any reason you wanted to be careful with 
the result?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##########
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.RequestCompletionHandler;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.Set;
+
+/**
+ * A wrapper around the {@link org.apache.kafka.clients.NetworkClient} to 
handle poll and send operations.
+ */
+public class NetworkClientDelegate implements AutoCloseable {
+    private final KafkaClient client;
+    private final Time time;
+    private final Logger log;
+    private final int requestTimeoutMs;
+    private boolean wakeup = false;
+    private final Queue<UnsentRequest> unsentRequests;
+    private final Set<Node> activeNodes;
+
+    public NetworkClientDelegate(
+            final Time time,
+            final ConsumerConfig config,
+            final LogContext logContext,
+            final KafkaClient client) {
+        this.time = time;
+        this.client = client;
+        this.log = logContext.logger(getClass());
+        this.unsentRequests = new ArrayDeque<>();
+        this.requestTimeoutMs = 
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
+        this.activeNodes = new HashSet<>();
+    }
+
+    /**
+     * Returns the responses of the sent requests. This methods will try to 
send the unsent requests, poll for responses,
+     * and check the disconnected nodes.
+     * @param timeoutMs
+     * @return
+     */
+    public List<ClientResponse> poll(final long timeoutMs) {
+        final long currentTimeMs = time.milliseconds();
+        trySend(currentTimeMs);
+        List<ClientResponse> res = this.client.poll(timeoutMs, currentTimeMs);
+        checkDisconnects();
+        maybeTriggerWakeup();
+        return res;
+    }
+
+    /**
+     * Iterates through the unsentRequests queue and tries to send the unsent 
requests. If the request doesn't have an
+     * assigned node, it will find the leastLoadedOne.
+     * Here it also stores all the nodes in the {@code activeNodes}, which 
will then be used to check for the disconnection.
+     */
+    void trySend(final long currentTimeMs) {
+        Queue<UnsentRequest> unsentAndUnreadyRequests = new LinkedList<>();
+        while (!unsentRequests.isEmpty()) {
+            UnsentRequest unsent = unsentRequests.poll();
+            unsent.timer.update(currentTimeMs);
+            if (unsent.timer.isExpired()) {
+                unsent.callback.ifPresent(c -> c.onFailure(new 
TimeoutException(
+                        "Failed to send request after " + 
unsent.timer.timeoutMs() + " " + "ms.")));
+                continue;
+            }
+
+            if (!doSend(unsent, currentTimeMs, unsentAndUnreadyRequests)) {
+                log.debug("No broker available to send the request: {}", 
unsent);
+                unsent.callback.ifPresent(v -> 
v.onFailure(Errors.NETWORK_EXCEPTION.exception(
+                        "No node available in the kafka cluster to send the 
request")));
+            }
+        }
+
+        if (!unsentAndUnreadyRequests.isEmpty()) {
+            // Handle the unready requests in the next event loop
+            unsentRequests.addAll(unsentAndUnreadyRequests);
+        }
+    }
+
+    // Visible for testing
+    boolean doSend(final UnsentRequest r,
+                   final long currentTimeMs,
+                   final Queue<UnsentRequest> unsentAndUnreadyRequests) {
+        Node node = r.node.orElse(client.leastLoadedNode(currentTimeMs));
+        if (node == null || nodeUnavailable(node)) {
+            return false;
+        }
+        ClientRequest request = makeClientRequest(r, node);
+        if (client.isReady(node, currentTimeMs)) {
+            activeNodes.add(node);
+            client.send(request, currentTimeMs);
+        } else {
+            // enqueue the request again if the node isn't ready yet. The 
request will be handled in the next iteration
+            // of the event loop
+            log.debug("Node is not ready, handle the request in the next event 
loop: node={}, request={}", node, r);
+            unsentAndUnreadyRequests.add(r);
+        }
+        return true;
+    }
+
+    private void checkDisconnects() {
+        // Check the connection status by all the nodes that are active. 
Disconnect the disconnected node if it is
+        // unable to be connected.
+        Iterator<Node> iter = activeNodes.iterator();
+        while (iter.hasNext()) {
+            Node node = iter.next();
+            iter.remove();
+            if (client.connectionFailed(node)) {
+                client.disconnect(node.idString());
+            }
+        }
+    }
+
+    private ClientRequest makeClientRequest(final UnsentRequest unsent, final 
Node node) {
+        return client.newClientRequest(
+                node.idString(),
+                unsent.abstractBuilder,
+                time.milliseconds(),

Review Comment:
   nit: we can pass through `currentTimeMs` from the poll call



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##########
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.RequestCompletionHandler;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.Set;
+
+/**
+ * A wrapper around the {@link org.apache.kafka.clients.NetworkClient} to 
handle poll and send operations.
+ */
+public class NetworkClientDelegate implements AutoCloseable {
+    private final KafkaClient client;
+    private final Time time;
+    private final Logger log;
+    private final int requestTimeoutMs;
+    private boolean wakeup = false;
+    private final Queue<UnsentRequest> unsentRequests;
+    private final Set<Node> activeNodes;
+
+    public NetworkClientDelegate(
+            final Time time,
+            final ConsumerConfig config,
+            final LogContext logContext,
+            final KafkaClient client) {
+        this.time = time;
+        this.client = client;
+        this.log = logContext.logger(getClass());
+        this.unsentRequests = new ArrayDeque<>();
+        this.requestTimeoutMs = 
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
+        this.activeNodes = new HashSet<>();
+    }
+
+    /**
+     * Returns the responses of the sent requests. This methods will try to 
send the unsent requests, poll for responses,
+     * and check the disconnected nodes.
+     * @param timeoutMs
+     * @return
+     */
+    public List<ClientResponse> poll(final long timeoutMs) {
+        final long currentTimeMs = time.milliseconds();
+        trySend(currentTimeMs);
+        List<ClientResponse> res = this.client.poll(timeoutMs, currentTimeMs);
+        checkDisconnects();
+        maybeTriggerWakeup();
+        return res;
+    }
+
+    /**
+     * Iterates through the unsentRequests queue and tries to send the unsent 
requests. If the request doesn't have an
+     * assigned node, it will find the leastLoadedOne.
+     * Here it also stores all the nodes in the {@code activeNodes}, which 
will then be used to check for the disconnection.
+     */
+    void trySend(final long currentTimeMs) {
+        Queue<UnsentRequest> unsentAndUnreadyRequests = new LinkedList<>();
+        while (!unsentRequests.isEmpty()) {
+            UnsentRequest unsent = unsentRequests.poll();
+            unsent.timer.update(currentTimeMs);
+            if (unsent.timer.isExpired()) {
+                unsent.callback.ifPresent(c -> c.onFailure(new 
TimeoutException(
+                        "Failed to send request after " + 
unsent.timer.timeoutMs() + " " + "ms.")));
+                continue;
+            }
+
+            if (!doSend(unsent, currentTimeMs, unsentAndUnreadyRequests)) {
+                log.debug("No broker available to send the request: {}", 
unsent);
+                unsent.callback.ifPresent(v -> 
v.onFailure(Errors.NETWORK_EXCEPTION.exception(
+                        "No node available in the kafka cluster to send the 
request")));
+            }
+        }
+
+        if (!unsentAndUnreadyRequests.isEmpty()) {
+            // Handle the unready requests in the next event loop
+            unsentRequests.addAll(unsentAndUnreadyRequests);
+        }
+    }
+
+    // Visible for testing
+    boolean doSend(final UnsentRequest r,
+                   final long currentTimeMs,
+                   final Queue<UnsentRequest> unsentAndUnreadyRequests) {
+        Node node = r.node.orElse(client.leastLoadedNode(currentTimeMs));
+        if (node == null || nodeUnavailable(node)) {
+            return false;
+        }
+        ClientRequest request = makeClientRequest(r, node);
+        if (client.isReady(node, currentTimeMs)) {
+            activeNodes.add(node);
+            client.send(request, currentTimeMs);
+        } else {
+            // enqueue the request again if the node isn't ready yet. The 
request will be handled in the next iteration
+            // of the event loop
+            log.debug("Node is not ready, handle the request in the next event 
loop: node={}, request={}", node, r);
+            unsentAndUnreadyRequests.add(r);
+        }
+        return true;
+    }
+
+    private void checkDisconnects() {
+        // Check the connection status by all the nodes that are active. 
Disconnect the disconnected node if it is
+        // unable to be connected.
+        Iterator<Node> iter = activeNodes.iterator();
+        while (iter.hasNext()) {
+            Node node = iter.next();
+            iter.remove();
+            if (client.connectionFailed(node)) {
+                client.disconnect(node.idString());
+            }
+        }
+    }
+
+    private ClientRequest makeClientRequest(final UnsentRequest unsent, final 
Node node) {
+        return client.newClientRequest(
+                node.idString(),
+                unsent.abstractBuilder,
+                time.milliseconds(),
+                true,
+                (int) unsent.timer.remainingMs(),
+                unsent.callback.orElse(new 
DefaultRequestFutureCompletionHandler()));
+    }
+
+    public void maybeTriggerWakeup() {
+        if (this.wakeup) {
+            this.wakeup = false;
+            throw new WakeupException();
+        }
+    }
+
+    public void wakeup() {
+        this.wakeup = true;
+        this.client.wakeup();
+    }
+
+    public Node leastLoadedNode() {
+        return this.client.leastLoadedNode(time.milliseconds());
+    }
+
+    public void add(final UnsentRequest r) {
+        r.setTimer(this.time, this.requestTimeoutMs);
+        unsentRequests.add(r);
+    }
+
+    public void ready(final Node node) {

Review Comment:
   nit: we should resist exposing all of these if we can. I think we want a 
really basic api.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##########
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.RequestCompletionHandler;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.Set;
+
+/**
+ * A wrapper around the {@link org.apache.kafka.clients.NetworkClient} to 
handle poll and send operations.
+ */
+public class NetworkClientDelegate implements AutoCloseable {
+    private final KafkaClient client;
+    private final Time time;
+    private final Logger log;
+    private final int requestTimeoutMs;
+    private boolean wakeup = false;
+    private final Queue<UnsentRequest> unsentRequests;
+    private final Set<Node> activeNodes;
+
+    public NetworkClientDelegate(
+            final Time time,
+            final ConsumerConfig config,
+            final LogContext logContext,
+            final KafkaClient client) {
+        this.time = time;
+        this.client = client;
+        this.log = logContext.logger(getClass());
+        this.unsentRequests = new ArrayDeque<>();
+        this.requestTimeoutMs = 
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
+        this.activeNodes = new HashSet<>();
+    }
+
+    /**
+     * Returns the responses of the sent requests. This methods will try to 
send the unsent requests, poll for responses,
+     * and check the disconnected nodes.
+     * @param timeoutMs
+     * @return
+     */
+    public List<ClientResponse> poll(final long timeoutMs) {
+        final long currentTimeMs = time.milliseconds();
+        trySend(currentTimeMs);
+        List<ClientResponse> res = this.client.poll(timeoutMs, currentTimeMs);
+        checkDisconnects();
+        maybeTriggerWakeup();
+        return res;
+    }
+
+    /**
+     * Iterates through the unsentRequests queue and tries to send the unsent 
requests. If the request doesn't have an
+     * assigned node, it will find the leastLoadedOne.
+     * Here it also stores all the nodes in the {@code activeNodes}, which 
will then be used to check for the disconnection.
+     */
+    void trySend(final long currentTimeMs) {
+        Queue<UnsentRequest> unsentAndUnreadyRequests = new LinkedList<>();
+        while (!unsentRequests.isEmpty()) {
+            UnsentRequest unsent = unsentRequests.poll();
+            unsent.timer.update(currentTimeMs);
+            if (unsent.timer.isExpired()) {
+                unsent.callback.ifPresent(c -> c.onFailure(new 
TimeoutException(
+                        "Failed to send request after " + 
unsent.timer.timeoutMs() + " " + "ms.")));
+                continue;
+            }
+
+            if (!doSend(unsent, currentTimeMs, unsentAndUnreadyRequests)) {
+                log.debug("No broker available to send the request: {}", 
unsent);
+                unsent.callback.ifPresent(v -> 
v.onFailure(Errors.NETWORK_EXCEPTION.exception(
+                        "No node available in the kafka cluster to send the 
request")));
+            }
+        }
+
+        if (!unsentAndUnreadyRequests.isEmpty()) {
+            // Handle the unready requests in the next event loop
+            unsentRequests.addAll(unsentAndUnreadyRequests);
+        }
+    }
+
+    // Visible for testing
+    boolean doSend(final UnsentRequest r,
+                   final long currentTimeMs,
+                   final Queue<UnsentRequest> unsentAndUnreadyRequests) {
+        Node node = r.node.orElse(client.leastLoadedNode(currentTimeMs));
+        if (node == null || nodeUnavailable(node)) {
+            return false;
+        }
+        ClientRequest request = makeClientRequest(r, node);
+        if (client.isReady(node, currentTimeMs)) {
+            activeNodes.add(node);
+            client.send(request, currentTimeMs);
+        } else {
+            // enqueue the request again if the node isn't ready yet. The 
request will be handled in the next iteration
+            // of the event loop
+            log.debug("Node is not ready, handle the request in the next event 
loop: node={}, request={}", node, r);
+            unsentAndUnreadyRequests.add(r);
+        }
+        return true;
+    }
+
+    private void checkDisconnects() {
+        // Check the connection status by all the nodes that are active. 
Disconnect the disconnected node if it is
+        // unable to be connected.
+        Iterator<Node> iter = activeNodes.iterator();
+        while (iter.hasNext()) {
+            Node node = iter.next();
+            iter.remove();
+            if (client.connectionFailed(node)) {
+                client.disconnect(node.idString());

Review Comment:
   Why do we do this?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##########
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.RequestCompletionHandler;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.Set;
+
+/**
+ * A wrapper around the {@link org.apache.kafka.clients.NetworkClient} to 
handle poll and send operations.
+ */
+public class NetworkClientDelegate implements AutoCloseable {
+    private final KafkaClient client;
+    private final Time time;
+    private final Logger log;
+    private final int requestTimeoutMs;
+    private boolean wakeup = false;
+    private final Queue<UnsentRequest> unsentRequests;
+    private final Set<Node> activeNodes;
+
+    public NetworkClientDelegate(
+            final Time time,
+            final ConsumerConfig config,
+            final LogContext logContext,
+            final KafkaClient client) {
+        this.time = time;
+        this.client = client;
+        this.log = logContext.logger(getClass());
+        this.unsentRequests = new ArrayDeque<>();
+        this.requestTimeoutMs = 
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
+        this.activeNodes = new HashSet<>();
+    }
+
+    /**
+     * Returns the responses of the sent requests. This methods will try to 
send the unsent requests, poll for responses,
+     * and check the disconnected nodes.
+     * @param timeoutMs
+     * @return
+     */
+    public List<ClientResponse> poll(final long timeoutMs) {
+        final long currentTimeMs = time.milliseconds();
+        trySend(currentTimeMs);
+        List<ClientResponse> res = this.client.poll(timeoutMs, currentTimeMs);
+        checkDisconnects();
+        maybeTriggerWakeup();
+        return res;
+    }
+
+    /**
+     * Iterates through the unsentRequests queue and tries to send the unsent 
requests. If the request doesn't have an
+     * assigned node, it will find the leastLoadedOne.
+     * Here it also stores all the nodes in the {@code activeNodes}, which 
will then be used to check for the disconnection.
+     */
+    void trySend(final long currentTimeMs) {
+        Queue<UnsentRequest> unsentAndUnreadyRequests = new LinkedList<>();
+        while (!unsentRequests.isEmpty()) {
+            UnsentRequest unsent = unsentRequests.poll();
+            unsent.timer.update(currentTimeMs);
+            if (unsent.timer.isExpired()) {
+                unsent.callback.ifPresent(c -> c.onFailure(new 
TimeoutException(
+                        "Failed to send request after " + 
unsent.timer.timeoutMs() + " " + "ms.")));
+                continue;
+            }
+
+            if (!doSend(unsent, currentTimeMs, unsentAndUnreadyRequests)) {
+                log.debug("No broker available to send the request: {}", 
unsent);
+                unsent.callback.ifPresent(v -> 
v.onFailure(Errors.NETWORK_EXCEPTION.exception(
+                        "No node available in the kafka cluster to send the 
request")));
+            }
+        }
+
+        if (!unsentAndUnreadyRequests.isEmpty()) {
+            // Handle the unready requests in the next event loop
+            unsentRequests.addAll(unsentAndUnreadyRequests);
+        }
+    }
+
+    // Visible for testing
+    boolean doSend(final UnsentRequest r,
+                   final long currentTimeMs,
+                   final Queue<UnsentRequest> unsentAndUnreadyRequests) {
+        Node node = r.node.orElse(client.leastLoadedNode(currentTimeMs));
+        if (node == null || nodeUnavailable(node)) {
+            return false;
+        }
+        ClientRequest request = makeClientRequest(r, node);
+        if (client.isReady(node, currentTimeMs)) {
+            activeNodes.add(node);
+            client.send(request, currentTimeMs);
+        } else {
+            // enqueue the request again if the node isn't ready yet. The 
request will be handled in the next iteration
+            // of the event loop
+            log.debug("Node is not ready, handle the request in the next event 
loop: node={}, request={}", node, r);
+            unsentAndUnreadyRequests.add(r);
+        }
+        return true;
+    }
+
+    private void checkDisconnects() {
+        // Check the connection status by all the nodes that are active. 
Disconnect the disconnected node if it is
+        // unable to be connected.
+        Iterator<Node> iter = activeNodes.iterator();
+        while (iter.hasNext()) {
+            Node node = iter.next();
+            iter.remove();
+            if (client.connectionFailed(node)) {
+                client.disconnect(node.idString());
+            }
+        }
+    }
+
+    private ClientRequest makeClientRequest(final UnsentRequest unsent, final 
Node node) {
+        return client.newClientRequest(
+                node.idString(),
+                unsent.abstractBuilder,
+                time.milliseconds(),
+                true,
+                (int) unsent.timer.remainingMs(),
+                unsent.callback.orElse(new 
DefaultRequestFutureCompletionHandler()));
+    }
+
+    public void maybeTriggerWakeup() {
+        if (this.wakeup) {
+            this.wakeup = false;
+            throw new WakeupException();
+        }
+    }
+
+    public void wakeup() {
+        this.wakeup = true;
+        this.client.wakeup();
+    }
+
+    public Node leastLoadedNode() {
+        return this.client.leastLoadedNode(time.milliseconds());
+    }
+
+    public void add(final UnsentRequest r) {
+        r.setTimer(this.time, this.requestTimeoutMs);
+        unsentRequests.add(r);
+    }
+
+    public void ready(final Node node) {
+        client.ready(node, time.milliseconds());
+    }
+
+    /**
+     * Check if the code is disconnected and unavailable for immediate 
reconnection (i.e. if it is in
+     * reconnect backoff window following the disconnect).
+     */
+    public boolean nodeUnavailable(final Node node) {
+        return client.connectionFailed(node) && client.connectionDelay(node, 
time.milliseconds()) > 0;
+    }
+
+    public void close() throws IOException {
+        this.client.close();
+    }
+
+    public void addAll(final List<UnsentRequest> unsentRequests) {
+        unsentRequests.forEach(this::add);

Review Comment:
   nit: `addAll`?



##########
clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java:
##########
@@ -50,6 +52,22 @@ public FindCoordinatorResponse(FindCoordinatorResponseData 
data) {
         this.data = data;
     }
 
+    public Optional<Coordinator> getCoordinatorByKey(String key) {
+        Objects.requireNonNull(key);
+        if (this.data.coordinators().isEmpty()) {
+            // version <= 3

Review Comment:
   Can we write a test case which covers all versions?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java:
##########
@@ -109,101 +132,89 @@ public void run() {
                 try {
                     runOnce();
                 } catch (final WakeupException e) {
-                    log.debug(
-                        "Exception thrown, background thread won't terminate",
-                        e
-                    );
-                    // swallow the wakeup exception to prevent killing the
-                    // background thread.
+                    log.debug("WakeupException caught, background thread won't 
be interrupted");
+                    // swallow the wakeup exception to prevent killing the 
background thread.
                 }
             }
         } catch (final Throwable t) {
-            log.error(
-                "The background thread failed due to unexpected error",
-                t
-            );
-            if (t instanceof RuntimeException)
-                this.exception.set(Optional.of((RuntimeException) t));
-            else
-                this.exception.set(Optional.of(new RuntimeException(t)));
+            log.error("The background thread failed due to unexpected error", 
t);
+            throw new RuntimeException(t);
         } finally {
             close();
             log.debug("{} closed", getClass());
         }
     }
 
     /**
-     * Process event from a single poll
+     * Poll and process an {@link ApplicationEvent}. It performs the following 
tasks:
+     *  1. Try to poll and event from the queue, and try to process it using 
the coorsponding {@link ApplicationEventProcessor}.
+     *  2. Try to find Coordinator if needed
+     *  3. Poll the networkClient for outstanding requests.
      */
     void runOnce() {
-        this.inflightEvent = maybePollEvent();
-        if (this.inflightEvent.isPresent()) {
-            log.debug("processing application event: {}", this.inflightEvent);
+        Optional<ApplicationEvent> event = maybePollEvent();
+
+        if (event.isPresent()) {
+            log.debug("processing application event: {}", event);
+            consumeApplicationEvent(event.get());
         }
-        if (this.inflightEvent.isPresent() && 
maybeConsumeInflightEvent(this.inflightEvent.get())) {
-            // clear inflight event upon successful consumption
-            this.inflightEvent = Optional.empty();
+
+        final long currentTimeMs = time.milliseconds();
+        // TODO: This is just a place holder value.
+        long pollWaitTimeMs = 100;
+
+        // TODO: Add a condition here, like shouldFindCoordinator in the 
future.  Since we don't always need to find
+        //  the coordinator.
+        if (coordinatorManager.isPresent()) {
+            pollWaitTimeMs = Math.min(pollWaitTimeMs, 
handlePollResult(coordinatorManager.get().poll(currentTimeMs)));

Review Comment:
   nit: can we move the call to `handlePollResult` to a separate line? It makes 
the code easier to follow.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##########
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.RequestCompletionHandler;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.Set;
+
+/**
+ * A wrapper around the {@link org.apache.kafka.clients.NetworkClient} to 
handle poll and send operations.
+ */
+public class NetworkClientDelegate implements AutoCloseable {
+    private final KafkaClient client;
+    private final Time time;
+    private final Logger log;
+    private final int requestTimeoutMs;
+    private boolean wakeup = false;
+    private final Queue<UnsentRequest> unsentRequests;
+    private final Set<Node> activeNodes;
+
+    public NetworkClientDelegate(
+            final Time time,
+            final ConsumerConfig config,
+            final LogContext logContext,
+            final KafkaClient client) {
+        this.time = time;
+        this.client = client;
+        this.log = logContext.logger(getClass());
+        this.unsentRequests = new ArrayDeque<>();
+        this.requestTimeoutMs = 
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
+        this.activeNodes = new HashSet<>();
+    }
+
+    /**
+     * Returns the responses of the sent requests. This methods will try to 
send the unsent requests, poll for responses,
+     * and check the disconnected nodes.
+     * @param timeoutMs
+     * @return
+     */
+    public List<ClientResponse> poll(final long timeoutMs) {
+        final long currentTimeMs = time.milliseconds();
+        trySend(currentTimeMs);
+        List<ClientResponse> res = this.client.poll(timeoutMs, currentTimeMs);
+        checkDisconnects();
+        maybeTriggerWakeup();
+        return res;
+    }
+
+    /**
+     * Iterates through the unsentRequests queue and tries to send the unsent 
requests. If the request doesn't have an
+     * assigned node, it will find the leastLoadedOne.
+     * Here it also stores all the nodes in the {@code activeNodes}, which 
will then be used to check for the disconnection.
+     */
+    void trySend(final long currentTimeMs) {
+        Queue<UnsentRequest> unsentAndUnreadyRequests = new LinkedList<>();
+        while (!unsentRequests.isEmpty()) {
+            UnsentRequest unsent = unsentRequests.poll();
+            unsent.timer.update(currentTimeMs);
+            if (unsent.timer.isExpired()) {
+                unsent.callback.ifPresent(c -> c.onFailure(new 
TimeoutException(
+                        "Failed to send request after " + 
unsent.timer.timeoutMs() + " " + "ms.")));
+                continue;
+            }
+
+            if (!doSend(unsent, currentTimeMs, unsentAndUnreadyRequests)) {
+                log.debug("No broker available to send the request: {}", 
unsent);
+                unsent.callback.ifPresent(v -> 
v.onFailure(Errors.NETWORK_EXCEPTION.exception(
+                        "No node available in the kafka cluster to send the 
request")));
+            }
+        }
+
+        if (!unsentAndUnreadyRequests.isEmpty()) {
+            // Handle the unready requests in the next event loop
+            unsentRequests.addAll(unsentAndUnreadyRequests);
+        }
+    }
+
+    // Visible for testing
+    boolean doSend(final UnsentRequest r,
+                   final long currentTimeMs,
+                   final Queue<UnsentRequest> unsentAndUnreadyRequests) {
+        Node node = r.node.orElse(client.leastLoadedNode(currentTimeMs));
+        if (node == null || nodeUnavailable(node)) {
+            return false;
+        }
+        ClientRequest request = makeClientRequest(r, node);
+        if (client.isReady(node, currentTimeMs)) {
+            activeNodes.add(node);
+            client.send(request, currentTimeMs);
+        } else {
+            // enqueue the request again if the node isn't ready yet. The 
request will be handled in the next iteration
+            // of the event loop
+            log.debug("Node is not ready, handle the request in the next event 
loop: node={}, request={}", node, r);
+            unsentAndUnreadyRequests.add(r);
+        }
+        return true;
+    }
+
+    private void checkDisconnects() {
+        // Check the connection status by all the nodes that are active. 
Disconnect the disconnected node if it is
+        // unable to be connected.
+        Iterator<Node> iter = activeNodes.iterator();
+        while (iter.hasNext()) {
+            Node node = iter.next();
+            iter.remove();
+            if (client.connectionFailed(node)) {
+                client.disconnect(node.idString());
+            }
+        }
+    }
+
+    private ClientRequest makeClientRequest(final UnsentRequest unsent, final 
Node node) {
+        return client.newClientRequest(
+                node.idString(),
+                unsent.abstractBuilder,
+                time.milliseconds(),
+                true,
+                (int) unsent.timer.remainingMs(),
+                unsent.callback.orElse(new 
DefaultRequestFutureCompletionHandler()));
+    }
+
+    public void maybeTriggerWakeup() {
+        if (this.wakeup) {
+            this.wakeup = false;
+            throw new WakeupException();
+        }
+    }
+
+    public void wakeup() {
+        this.wakeup = true;
+        this.client.wakeup();
+    }
+
+    public Node leastLoadedNode() {
+        return this.client.leastLoadedNode(time.milliseconds());
+    }
+
+    public void add(final UnsentRequest r) {
+        r.setTimer(this.time, this.requestTimeoutMs);
+        unsentRequests.add(r);
+    }
+
+    public void ready(final Node node) {
+        client.ready(node, time.milliseconds());
+    }
+
+    /**
+     * Check if the code is disconnected and unavailable for immediate 
reconnection (i.e. if it is in
+     * reconnect backoff window following the disconnect).
+     */
+    public boolean nodeUnavailable(final Node node) {
+        return client.connectionFailed(node) && client.connectionDelay(node, 
time.milliseconds()) > 0;
+    }
+
+    public void close() throws IOException {
+        this.client.close();
+    }
+
+    public void addAll(final List<UnsentRequest> unsentRequests) {
+        unsentRequests.forEach(this::add);
+    }
+
+    public static class PollResult {
+        public final long timeMsTillNextPoll;

Review Comment:
   nit: how about `timeUntilNextPollMs`?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * This is responsible for timing to send the next {@link 
FindCoordinatorRequest} based on the following criteria:
+ *
+ * Whether there is an existing coordinator.
+ * Whether there is an inflight request.
+ * Whether the backoff timer has expired.
+ * The {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} 
contains either a wait timer
+ * or a singleton list of {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest}.
+ *
+ * The {@link FindCoordinatorRequest} will be handled by the {@link 
FindCoordinatorRequestHandler} callback, which
+ * subsequently invokes {@code onResponse} to handle the exception and 
response. Note that the coordinator node will be
+ * marked {@code null} upon receiving a failure.
+ */
+public class CoordinatorRequestManager implements RequestManager {
+
+    private final Logger log;
+    private final ErrorEventHandler errorHandler;
+    private final long rebalanceTimeoutMs;
+    private final String groupId;
+
+    private final RequestState coordinatorRequestState;
+    private long timeMarkedUnknownMs = -1L; // starting logging a warning only 
after unable to connect for a while
+    private Node coordinator;
+
+    public CoordinatorRequestManager(final LogContext logContext,
+                                     final ConsumerConfig config,
+                                     final ErrorEventHandler errorHandler,
+                                     final String groupId,
+                                     final long rebalanceTimeoutMs) {
+        Objects.requireNonNull(groupId);
+        this.log = logContext.logger(this.getClass());
+        this.errorHandler = errorHandler;
+        this.groupId = groupId;
+        this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+        this.coordinatorRequestState = new RequestState(config);
+    }
+
+    // Visible for testing
+    CoordinatorRequestManager(final LogContext logContext,
+                              final ErrorEventHandler errorHandler,
+                              final String groupId,
+                              final long rebalanceTimeoutMs,
+                              final RequestState coordinatorRequestState) {
+        Objects.requireNonNull(groupId);
+        this.log = logContext.logger(this.getClass());
+        this.errorHandler = errorHandler;
+        this.groupId = groupId;
+        this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+        this.coordinatorRequestState = coordinatorRequestState;
+    }
+
+    /**
+     * Poll for the FindCoordinator request.
+     * If we don't need to discover a coordinator, this method will return a 
PollResult with Long.MAX_VALUE backoff time and an empty list.
+     * If we are still backing off from a previous attempt, this method will 
return a PollResult with the remaining backoff time and an empty list.
+     * Otherwise, this returns will return a PollResult with a singleton list 
of UnsentRequest and Long.MAX_VALUE backoff time.
+     * Note that this method does not involve any actual network IO, and it 
only determines if we need to send a new request or not.
+     *
+     * @param currentTimeMs current time in ms.
+     * @return {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult}. 
This will not be {@code null}.
+     */
+    @Override
+    public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
+        if (this.coordinator != null) {
+            return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, 
Collections.emptyList());
+        }
+
+        if (coordinatorRequestState.canSendRequest(currentTimeMs)) {
+            NetworkClientDelegate.UnsentRequest request = 
makeFindCoordinatorRequest(currentTimeMs);
+            return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, 
Collections.singletonList(request));
+        }
+
+        return new NetworkClientDelegate.PollResult(
+                coordinatorRequestState.remainingBackoffMs(currentTimeMs),
+                new ArrayList<>());
+    }
+
+    private NetworkClientDelegate.UnsentRequest 
makeFindCoordinatorRequest(final long currentTimeMs) {
+        coordinatorRequestState.updateLastSend(currentTimeMs);
+        FindCoordinatorRequestData data = new FindCoordinatorRequestData()
+                .setKeyType(FindCoordinatorRequest.CoordinatorType.GROUP.id())
+                .setKey(this.groupId);
+        return NetworkClientDelegate.UnsentRequest.makeUnsentRequest(
+                new FindCoordinatorRequest.Builder(data),
+                new FindCoordinatorRequestHandler(),
+                null);
+    }
+
+    /**
+     * Mark the current coordinator null.
+     * @param cause why the coordinator is marked unknown.
+     * @param currentTimeMs the current time in ms.
+     */
+    protected void markCoordinatorUnknown(final String cause, final long 
currentTimeMs) {
+        if (this.coordinator != null) {
+            log.info("Group coordinator {} is unavailable or invalid due to 
cause: {}. "
+                            + "Rediscovery will be attempted.", 
this.coordinator, cause);
+            this.coordinator = null;
+            timeMarkedUnknownMs = currentTimeMs;
+        } else {
+            long durationOfOngoingDisconnect = Math.max(0, currentTimeMs - 
timeMarkedUnknownMs);
+            if (durationOfOngoingDisconnect > this.rebalanceTimeoutMs)
+                log.debug("Consumer has been disconnected from the group 
coordinator for {}ms",
+                        durationOfOngoingDisconnect);
+        }
+    }
+
+    private void onSuccessfulResponse(final 
FindCoordinatorResponseData.Coordinator coordinator) {
+        // use MAX_VALUE - node.id as the coordinator id to allow separate 
connections
+        // for the coordinator in the underlying network client layer
+        int coordinatorConnectionId = Integer.MAX_VALUE - coordinator.nodeId();
+
+        this.coordinator = new Node(
+                coordinatorConnectionId,
+                coordinator.host(),
+                coordinator.port());
+        log.info("Discovered group coordinator {}", coordinator);
+        coordinatorRequestState.reset();
+    }
+
+    private void onFailedCoordinatorResponse(
+            final Exception exception,
+            final long currentTimeMs) {
+        coordinatorRequestState.updateLastFailedAttempt(currentTimeMs);
+        markCoordinatorUnknown("coordinator unavailable", currentTimeMs);
+
+        if (exception == Errors.GROUP_AUTHORIZATION_FAILED.exception()) {
+            log.debug("FindCoordinator request failed due to authorization 
error {}", exception.getMessage());
+            
errorHandler.handle(GroupAuthorizationException.forGroupId(this.groupId));
+            return;
+        }
+
+        if (exception instanceof RetriableException) {
+            log.debug("FindCoordinator request failed due to retriable 
exception", exception);
+            return;
+        }
+
+        log.warn("FindCoordinator request failed due to fatal exception", 
exception);
+        errorHandler.handle(exception);
+    }
+
+    /**
+     * Handles the response and exception upon completing the {@link 
FindCoordinatorRequest}. This is invoked in the callback
+     * {@link FindCoordinatorRequestHandler}. If the response was successful, 
a coordinator node will be updated. If the
+     * response failed due to errors, the current coordinator will be marked 
unknown.
+     * @param currentTimeMs current time ins ms.
+     * @param response the response, can be null.
+     * @param e the exception, can be null.
+     */
+    public void onResponse(final long currentTimeMs, final 
FindCoordinatorResponse response, final Exception e) {
+        // handles Runtime exception
+        if (e != null) {
+            onFailedCoordinatorResponse(e, currentTimeMs);
+            return;
+        }
+
+        Optional<FindCoordinatorResponseData.Coordinator> coordinator = 
response.getCoordinatorByKey(this.groupId);
+        if (!coordinator.isPresent()) {
+            String msg = String.format("Coordinator not found for groupId: 
%s", this.groupId);

Review Comment:
   I think this message is a little too generic. How about something like this?
   > Response did not contain expected coordinator section for groupId: %s



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java:
##########
@@ -109,101 +132,89 @@ public void run() {
                 try {
                     runOnce();
                 } catch (final WakeupException e) {
-                    log.debug(
-                        "Exception thrown, background thread won't terminate",
-                        e
-                    );
-                    // swallow the wakeup exception to prevent killing the
-                    // background thread.
+                    log.debug("WakeupException caught, background thread won't 
be interrupted");
+                    // swallow the wakeup exception to prevent killing the 
background thread.
                 }
             }
         } catch (final Throwable t) {
-            log.error(
-                "The background thread failed due to unexpected error",
-                t
-            );
-            if (t instanceof RuntimeException)
-                this.exception.set(Optional.of((RuntimeException) t));
-            else
-                this.exception.set(Optional.of(new RuntimeException(t)));
+            log.error("The background thread failed due to unexpected error", 
t);
+            throw new RuntimeException(t);
         } finally {
             close();
             log.debug("{} closed", getClass());
         }
     }
 
     /**
-     * Process event from a single poll
+     * Poll and process an {@link ApplicationEvent}. It performs the following 
tasks:
+     *  1. Try to poll and event from the queue, and try to process it using 
the coorsponding {@link ApplicationEventProcessor}.
+     *  2. Try to find Coordinator if needed
+     *  3. Poll the networkClient for outstanding requests.
      */
     void runOnce() {
-        this.inflightEvent = maybePollEvent();
-        if (this.inflightEvent.isPresent()) {
-            log.debug("processing application event: {}", this.inflightEvent);
+        Optional<ApplicationEvent> event = maybePollEvent();
+
+        if (event.isPresent()) {
+            log.debug("processing application event: {}", event);
+            consumeApplicationEvent(event.get());
         }
-        if (this.inflightEvent.isPresent() && 
maybeConsumeInflightEvent(this.inflightEvent.get())) {
-            // clear inflight event upon successful consumption
-            this.inflightEvent = Optional.empty();
+
+        final long currentTimeMs = time.milliseconds();
+        // TODO: This is just a place holder value.
+        long pollWaitTimeMs = 100;
+
+        // TODO: Add a condition here, like shouldFindCoordinator in the 
future.  Since we don't always need to find
+        //  the coordinator.
+        if (coordinatorManager.isPresent()) {
+            pollWaitTimeMs = Math.min(pollWaitTimeMs, 
handlePollResult(coordinatorManager.get().poll(currentTimeMs)));
         }
 
         // if there are pending events to process, poll then continue without
         // blocking.
-        if (!applicationEventQueue.isEmpty() || inflightEvent.isPresent()) {
-            networkClient.poll(time.timer(0));
+        if (!applicationEventQueue.isEmpty()) {
+            networkClientDelegate.poll(0);

Review Comment:
   It may take a little while before we are able to send a request (e.g. the 
connection might have reached the max inflight requests allowed). We do not 
want to busy loop in this case. I'd suggest using `retryBackoffMs` in this 
case. We can structure the logic like this.
   
   ```java
   if (!applicationEventQueue.isEmpty()) {
     pollWaitTimeMs = Math.min(retryBackoffMs, pollWaitTimeMs);
   }
   
   networkClientDelegate.poll(pollWaitTimeMs);
   ```
   
   



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * This is responsible for timing to send the next {@link 
FindCoordinatorRequest} based on the following criteria:
+ *
+ * Whether there is an existing coordinator.
+ * Whether there is an inflight request.
+ * Whether the backoff timer has expired.
+ * The {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} 
contains either a wait timer
+ * or a singleton list of {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest}.
+ *
+ * The {@link FindCoordinatorRequest} will be handled by the {@link 
FindCoordinatorRequestHandler} callback, which
+ * subsequently invokes {@code onResponse} to handle the exception and 
response. Note that the coordinator node will be
+ * marked {@code null} upon receiving a failure.
+ */
+public class CoordinatorRequestManager implements RequestManager {
+
+    private final Logger log;
+    private final ErrorEventHandler errorHandler;
+    private final long rebalanceTimeoutMs;
+    private final String groupId;
+
+    private final RequestState coordinatorRequestState;
+    private long timeMarkedUnknownMs = -1L; // starting logging a warning only 
after unable to connect for a while
+    private Node coordinator;
+
+    public CoordinatorRequestManager(final LogContext logContext,
+                                     final ConsumerConfig config,
+                                     final ErrorEventHandler errorHandler,
+                                     final String groupId,
+                                     final long rebalanceTimeoutMs) {
+        Objects.requireNonNull(groupId);
+        this.log = logContext.logger(this.getClass());
+        this.errorHandler = errorHandler;
+        this.groupId = groupId;
+        this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+        this.coordinatorRequestState = new RequestState(config);
+    }
+
+    // Visible for testing
+    CoordinatorRequestManager(final LogContext logContext,
+                              final ErrorEventHandler errorHandler,
+                              final String groupId,
+                              final long rebalanceTimeoutMs,
+                              final RequestState coordinatorRequestState) {
+        Objects.requireNonNull(groupId);
+        this.log = logContext.logger(this.getClass());
+        this.errorHandler = errorHandler;
+        this.groupId = groupId;
+        this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+        this.coordinatorRequestState = coordinatorRequestState;
+    }
+
+    /**
+     * Poll for the FindCoordinator request.
+     * If we don't need to discover a coordinator, this method will return a 
PollResult with Long.MAX_VALUE backoff time and an empty list.
+     * If we are still backing off from a previous attempt, this method will 
return a PollResult with the remaining backoff time and an empty list.
+     * Otherwise, this returns will return a PollResult with a singleton list 
of UnsentRequest and Long.MAX_VALUE backoff time.
+     * Note that this method does not involve any actual network IO, and it 
only determines if we need to send a new request or not.
+     *
+     * @param currentTimeMs current time in ms.
+     * @return {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult}. 
This will not be {@code null}.
+     */
+    @Override
+    public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
+        if (this.coordinator != null) {
+            return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, 
Collections.emptyList());
+        }
+
+        if (coordinatorRequestState.canSendRequest(currentTimeMs)) {
+            NetworkClientDelegate.UnsentRequest request = 
makeFindCoordinatorRequest(currentTimeMs);
+            return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, 
Collections.singletonList(request));
+        }
+
+        return new NetworkClientDelegate.PollResult(
+                coordinatorRequestState.remainingBackoffMs(currentTimeMs),
+                new ArrayList<>());
+    }
+
+    private NetworkClientDelegate.UnsentRequest 
makeFindCoordinatorRequest(final long currentTimeMs) {
+        coordinatorRequestState.updateLastSend(currentTimeMs);
+        FindCoordinatorRequestData data = new FindCoordinatorRequestData()
+                .setKeyType(FindCoordinatorRequest.CoordinatorType.GROUP.id())
+                .setKey(this.groupId);
+        return NetworkClientDelegate.UnsentRequest.makeUnsentRequest(
+                new FindCoordinatorRequest.Builder(data),
+                new FindCoordinatorRequestHandler(),
+                null);
+    }
+
+    /**
+     * Mark the current coordinator null.
+     * @param cause why the coordinator is marked unknown.
+     * @param currentTimeMs the current time in ms.
+     */
+    protected void markCoordinatorUnknown(final String cause, final long 
currentTimeMs) {
+        if (this.coordinator != null) {
+            log.info("Group coordinator {} is unavailable or invalid due to 
cause: {}. "
+                            + "Rediscovery will be attempted.", 
this.coordinator, cause);
+            this.coordinator = null;
+            timeMarkedUnknownMs = currentTimeMs;
+        } else {
+            long durationOfOngoingDisconnect = Math.max(0, currentTimeMs - 
timeMarkedUnknownMs);
+            if (durationOfOngoingDisconnect > this.rebalanceTimeoutMs)
+                log.debug("Consumer has been disconnected from the group 
coordinator for {}ms",
+                        durationOfOngoingDisconnect);
+        }
+    }
+
+    private void onSuccessfulResponse(final 
FindCoordinatorResponseData.Coordinator coordinator) {
+        // use MAX_VALUE - node.id as the coordinator id to allow separate 
connections
+        // for the coordinator in the underlying network client layer
+        int coordinatorConnectionId = Integer.MAX_VALUE - coordinator.nodeId();
+
+        this.coordinator = new Node(
+                coordinatorConnectionId,
+                coordinator.host(),
+                coordinator.port());
+        log.info("Discovered group coordinator {}", coordinator);
+        coordinatorRequestState.reset();
+    }
+
+    private void onFailedCoordinatorResponse(
+            final Exception exception,
+            final long currentTimeMs) {
+        coordinatorRequestState.updateLastFailedAttempt(currentTimeMs);
+        markCoordinatorUnknown("coordinator unavailable", currentTimeMs);

Review Comment:
   Could we improve this message? Perhaps "FindCoordinator failed with 
exception"?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##########
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.RequestCompletionHandler;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.Set;
+
+/**
+ * A wrapper around the {@link org.apache.kafka.clients.NetworkClient} to 
handle poll and send operations.
+ */
+public class NetworkClientDelegate implements AutoCloseable {
+    private final KafkaClient client;
+    private final Time time;
+    private final Logger log;
+    private final int requestTimeoutMs;
+    private boolean wakeup = false;
+    private final Queue<UnsentRequest> unsentRequests;
+    private final Set<Node> activeNodes;
+
+    public NetworkClientDelegate(
+            final Time time,
+            final ConsumerConfig config,
+            final LogContext logContext,
+            final KafkaClient client) {
+        this.time = time;
+        this.client = client;
+        this.log = logContext.logger(getClass());
+        this.unsentRequests = new ArrayDeque<>();
+        this.requestTimeoutMs = 
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
+        this.activeNodes = new HashSet<>();
+    }
+
+    /**
+     * Returns the responses of the sent requests. This methods will try to 
send the unsent requests, poll for responses,
+     * and check the disconnected nodes.
+     * @param timeoutMs
+     * @return
+     */
+    public List<ClientResponse> poll(final long timeoutMs) {
+        final long currentTimeMs = time.milliseconds();
+        trySend(currentTimeMs);
+        List<ClientResponse> res = this.client.poll(timeoutMs, currentTimeMs);
+        checkDisconnects();
+        maybeTriggerWakeup();
+        return res;
+    }
+
+    /**
+     * Iterates through the unsentRequests queue and tries to send the unsent 
requests. If the request doesn't have an
+     * assigned node, it will find the leastLoadedOne.
+     * Here it also stores all the nodes in the {@code activeNodes}, which 
will then be used to check for the disconnection.
+     */
+    void trySend(final long currentTimeMs) {
+        Queue<UnsentRequest> unsentAndUnreadyRequests = new LinkedList<>();
+        while (!unsentRequests.isEmpty()) {
+            UnsentRequest unsent = unsentRequests.poll();
+            unsent.timer.update(currentTimeMs);
+            if (unsent.timer.isExpired()) {
+                unsent.callback.ifPresent(c -> c.onFailure(new 
TimeoutException(
+                        "Failed to send request after " + 
unsent.timer.timeoutMs() + " " + "ms.")));
+                continue;
+            }
+
+            if (!doSend(unsent, currentTimeMs, unsentAndUnreadyRequests)) {
+                log.debug("No broker available to send the request: {}", 
unsent);

Review Comment:
   Can't we retry internally until the request expires? What is the advantage 
to propagating this back at this point?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##########
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.RequestCompletionHandler;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.Set;
+
+/**
+ * A wrapper around the {@link org.apache.kafka.clients.NetworkClient} to 
handle poll and send operations.
+ */
+public class NetworkClientDelegate implements AutoCloseable {
+    private final KafkaClient client;
+    private final Time time;
+    private final Logger log;
+    private final int requestTimeoutMs;
+    private boolean wakeup = false;
+    private final Queue<UnsentRequest> unsentRequests;
+    private final Set<Node> activeNodes;
+
+    public NetworkClientDelegate(
+            final Time time,
+            final ConsumerConfig config,
+            final LogContext logContext,
+            final KafkaClient client) {
+        this.time = time;
+        this.client = client;
+        this.log = logContext.logger(getClass());
+        this.unsentRequests = new ArrayDeque<>();
+        this.requestTimeoutMs = 
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
+        this.activeNodes = new HashSet<>();
+    }
+
+    /**
+     * Returns the responses of the sent requests. This methods will try to 
send the unsent requests, poll for responses,
+     * and check the disconnected nodes.
+     * @param timeoutMs
+     * @return
+     */
+    public List<ClientResponse> poll(final long timeoutMs) {
+        final long currentTimeMs = time.milliseconds();
+        trySend(currentTimeMs);
+        List<ClientResponse> res = this.client.poll(timeoutMs, currentTimeMs);
+        checkDisconnects();
+        maybeTriggerWakeup();
+        return res;
+    }
+
+    /**
+     * Iterates through the unsentRequests queue and tries to send the unsent 
requests. If the request doesn't have an
+     * assigned node, it will find the leastLoadedOne.
+     * Here it also stores all the nodes in the {@code activeNodes}, which 
will then be used to check for the disconnection.
+     */
+    void trySend(final long currentTimeMs) {
+        Queue<UnsentRequest> unsentAndUnreadyRequests = new LinkedList<>();
+        while (!unsentRequests.isEmpty()) {
+            UnsentRequest unsent = unsentRequests.poll();
+            unsent.timer.update(currentTimeMs);
+            if (unsent.timer.isExpired()) {
+                unsent.callback.ifPresent(c -> c.onFailure(new 
TimeoutException(
+                        "Failed to send request after " + 
unsent.timer.timeoutMs() + " " + "ms.")));
+                continue;
+            }
+
+            if (!doSend(unsent, currentTimeMs, unsentAndUnreadyRequests)) {
+                log.debug("No broker available to send the request: {}", 
unsent);
+                unsent.callback.ifPresent(v -> 
v.onFailure(Errors.NETWORK_EXCEPTION.exception(
+                        "No node available in the kafka cluster to send the 
request")));
+            }
+        }
+
+        if (!unsentAndUnreadyRequests.isEmpty()) {
+            // Handle the unready requests in the next event loop
+            unsentRequests.addAll(unsentAndUnreadyRequests);

Review Comment:
   Instead of rebuilding the queue after each poll(), can we use an iterator 
with `Iterator.remove()` to remove the request after it has been successfully 
sent?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##########
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.RequestCompletionHandler;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.Set;
+
+/**
+ * A wrapper around the {@link org.apache.kafka.clients.NetworkClient} to 
handle poll and send operations.
+ */
+public class NetworkClientDelegate implements AutoCloseable {
+    private final KafkaClient client;
+    private final Time time;
+    private final Logger log;
+    private final int requestTimeoutMs;
+    private boolean wakeup = false;
+    private final Queue<UnsentRequest> unsentRequests;
+    private final Set<Node> activeNodes;
+
+    public NetworkClientDelegate(
+            final Time time,
+            final ConsumerConfig config,
+            final LogContext logContext,
+            final KafkaClient client) {
+        this.time = time;
+        this.client = client;
+        this.log = logContext.logger(getClass());
+        this.unsentRequests = new ArrayDeque<>();
+        this.requestTimeoutMs = 
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
+        this.activeNodes = new HashSet<>();
+    }
+
+    /**
+     * Returns the responses of the sent requests. This methods will try to 
send the unsent requests, poll for responses,
+     * and check the disconnected nodes.
+     * @param timeoutMs
+     * @return
+     */
+    public List<ClientResponse> poll(final long timeoutMs) {

Review Comment:
   nit: since we are computing the current time in `DefaultBackgroundThread`, 
perhaps we could just pass it through to `poll`?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##########
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.RequestCompletionHandler;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.Set;
+
+/**
+ * A wrapper around the {@link org.apache.kafka.clients.NetworkClient} to 
handle poll and send operations.
+ */
+public class NetworkClientDelegate implements AutoCloseable {
+    private final KafkaClient client;
+    private final Time time;
+    private final Logger log;
+    private final int requestTimeoutMs;
+    private boolean wakeup = false;

Review Comment:
   Not sure if you missed my previous comment, but we don't need this. We want 
`poll` to just return after wakeup, not raise the `WakeupException`.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##########
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.RequestCompletionHandler;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.Set;
+
+/**
+ * A wrapper around the {@link org.apache.kafka.clients.NetworkClient} to 
handle poll and send operations.
+ */
+public class NetworkClientDelegate implements AutoCloseable {
+    private final KafkaClient client;
+    private final Time time;
+    private final Logger log;
+    private final int requestTimeoutMs;
+    private boolean wakeup = false;
+    private final Queue<UnsentRequest> unsentRequests;
+    private final Set<Node> activeNodes;
+
+    public NetworkClientDelegate(
+            final Time time,
+            final ConsumerConfig config,
+            final LogContext logContext,
+            final KafkaClient client) {
+        this.time = time;
+        this.client = client;
+        this.log = logContext.logger(getClass());
+        this.unsentRequests = new ArrayDeque<>();
+        this.requestTimeoutMs = 
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
+        this.activeNodes = new HashSet<>();
+    }
+
+    /**
+     * Returns the responses of the sent requests. This methods will try to 
send the unsent requests, poll for responses,
+     * and check the disconnected nodes.
+     * @param timeoutMs
+     * @return
+     */
+    public List<ClientResponse> poll(final long timeoutMs) {
+        final long currentTimeMs = time.milliseconds();
+        trySend(currentTimeMs);
+        List<ClientResponse> res = this.client.poll(timeoutMs, currentTimeMs);
+        checkDisconnects();
+        maybeTriggerWakeup();
+        return res;
+    }
+
+    /**
+     * Iterates through the unsentRequests queue and tries to send the unsent 
requests. If the request doesn't have an
+     * assigned node, it will find the leastLoadedOne.
+     * Here it also stores all the nodes in the {@code activeNodes}, which 
will then be used to check for the disconnection.
+     */
+    void trySend(final long currentTimeMs) {
+        Queue<UnsentRequest> unsentAndUnreadyRequests = new LinkedList<>();
+        while (!unsentRequests.isEmpty()) {
+            UnsentRequest unsent = unsentRequests.poll();
+            unsent.timer.update(currentTimeMs);
+            if (unsent.timer.isExpired()) {
+                unsent.callback.ifPresent(c -> c.onFailure(new 
TimeoutException(
+                        "Failed to send request after " + 
unsent.timer.timeoutMs() + " " + "ms.")));
+                continue;
+            }
+
+            if (!doSend(unsent, currentTimeMs, unsentAndUnreadyRequests)) {
+                log.debug("No broker available to send the request: {}", 
unsent);
+                unsent.callback.ifPresent(v -> 
v.onFailure(Errors.NETWORK_EXCEPTION.exception(
+                        "No node available in the kafka cluster to send the 
request")));
+            }
+        }
+
+        if (!unsentAndUnreadyRequests.isEmpty()) {
+            // Handle the unready requests in the next event loop
+            unsentRequests.addAll(unsentAndUnreadyRequests);
+        }
+    }
+
+    // Visible for testing

Review Comment:
   Similar comment. Best not to expose the innards like this. We can test 
behavior through `poll`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to