hachikuji commented on code in PR #12862: URL: https://github.com/apache/kafka/pull/12862#discussion_r1047701086
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java: ########## @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.message.FindCoordinatorRequestData; +import org.apache.kafka.common.message.FindCoordinatorResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.FindCoordinatorRequest; +import org.apache.kafka.common.requests.FindCoordinatorResponse; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Objects; +import java.util.Optional; + +/** + * This is responsible for timing to send the next {@link FindCoordinatorRequest} based on the following criteria: + * + * Whether there is an existing coordinator. + * Whether there is an inflight request. + * Whether the backoff timer has expired. + * The {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} contains either a wait timer + * or a singleton list of {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest}. + * + * The {@link FindCoordinatorRequest} will be handled by the {@link FindCoordinatorRequestHandler} callback, which + * subsequently invokes {@code onResponse} to handle the exception and response. Note that the coordinator node will be + * marked {@code null} upon receiving a failure. + */ +public class CoordinatorRequestManager implements RequestManager { + + private final Logger log; + private final ErrorEventHandler errorHandler; Review Comment: Perhaps we could call this `nonRetriableErrorHandler` or something like that to make the usage a little clearer? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java: ########## @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.message.FindCoordinatorRequestData; +import org.apache.kafka.common.message.FindCoordinatorResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.FindCoordinatorRequest; +import org.apache.kafka.common.requests.FindCoordinatorResponse; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Objects; +import java.util.Optional; + +/** + * This is responsible for timing to send the next {@link FindCoordinatorRequest} based on the following criteria: + * + * Whether there is an existing coordinator. + * Whether there is an inflight request. + * Whether the backoff timer has expired. + * The {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} contains either a wait timer + * or a singleton list of {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest}. + * + * The {@link FindCoordinatorRequest} will be handled by the {@link FindCoordinatorRequestHandler} callback, which + * subsequently invokes {@code onResponse} to handle the exception and response. Note that the coordinator node will be + * marked {@code null} upon receiving a failure. + */ +public class CoordinatorRequestManager implements RequestManager { + + private final Logger log; + private final ErrorEventHandler errorHandler; + private final long rebalanceTimeoutMs; + private final String groupId; + + private final RequestState coordinatorRequestState; + private long timeMarkedUnknownMs = -1L; // starting logging a warning only after unable to connect for a while + private Node coordinator; + + public CoordinatorRequestManager(final LogContext logContext, + final ConsumerConfig config, + final ErrorEventHandler errorHandler, + final String groupId, + final long rebalanceTimeoutMs) { + Objects.requireNonNull(groupId); + this.log = logContext.logger(this.getClass()); + this.errorHandler = errorHandler; + this.groupId = groupId; + this.rebalanceTimeoutMs = rebalanceTimeoutMs; + this.coordinatorRequestState = new RequestState(config); + } + + // Visible for testing + CoordinatorRequestManager(final LogContext logContext, + final ErrorEventHandler errorHandler, + final String groupId, + final long rebalanceTimeoutMs, + final RequestState coordinatorRequestState) { + Objects.requireNonNull(groupId); + this.log = logContext.logger(this.getClass()); + this.errorHandler = errorHandler; + this.groupId = groupId; + this.rebalanceTimeoutMs = rebalanceTimeoutMs; + this.coordinatorRequestState = coordinatorRequestState; + } + + /** + * Poll for the FindCoordinator request. + * If we don't need to discover a coordinator, this method will return a PollResult with Long.MAX_VALUE backoff time and an empty list. + * If we are still backing off from a previous attempt, this method will return a PollResult with the remaining backoff time and an empty list. + * Otherwise, this returns will return a PollResult with a singleton list of UnsentRequest and Long.MAX_VALUE backoff time. + * Note that this method does not involve any actual network IO, and it only determines if we need to send a new request or not. + * + * @param currentTimeMs current time in ms. + * @return {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult}. This will not be {@code null}. + */ + @Override + public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { + if (this.coordinator != null) { + return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, Collections.emptyList()); + } + + if (coordinatorRequestState.canSendRequest(currentTimeMs)) { + NetworkClientDelegate.UnsentRequest request = makeFindCoordinatorRequest(currentTimeMs); + return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, Collections.singletonList(request)); + } + + return new NetworkClientDelegate.PollResult( + coordinatorRequestState.remainingBackoffMs(currentTimeMs), + new ArrayList<>()); + } + + private NetworkClientDelegate.UnsentRequest makeFindCoordinatorRequest(final long currentTimeMs) { + coordinatorRequestState.updateLastSend(currentTimeMs); + FindCoordinatorRequestData data = new FindCoordinatorRequestData() + .setKeyType(FindCoordinatorRequest.CoordinatorType.GROUP.id()) + .setKey(this.groupId); + return NetworkClientDelegate.UnsentRequest.makeUnsentRequest( + new FindCoordinatorRequest.Builder(data), + new FindCoordinatorRequestHandler(), + null); + } + + /** + * Mark the current coordinator null. + * @param cause why the coordinator is marked unknown. + * @param currentTimeMs the current time in ms. + */ + protected void markCoordinatorUnknown(final String cause, final long currentTimeMs) { + if (this.coordinator != null) { + log.info("Group coordinator {} is unavailable or invalid due to cause: {}. " + + "Rediscovery will be attempted.", this.coordinator, cause); + this.coordinator = null; + timeMarkedUnknownMs = currentTimeMs; + } else { + long durationOfOngoingDisconnect = Math.max(0, currentTimeMs - timeMarkedUnknownMs); + if (durationOfOngoingDisconnect > this.rebalanceTimeoutMs) + log.debug("Consumer has been disconnected from the group coordinator for {}ms", + durationOfOngoingDisconnect); + } + } + + private void onSuccessfulResponse(final FindCoordinatorResponseData.Coordinator coordinator) { + // use MAX_VALUE - node.id as the coordinator id to allow separate connections + // for the coordinator in the underlying network client layer + int coordinatorConnectionId = Integer.MAX_VALUE - coordinator.nodeId(); + + this.coordinator = new Node( + coordinatorConnectionId, + coordinator.host(), + coordinator.port()); + log.info("Discovered group coordinator {}", coordinator); + coordinatorRequestState.reset(); + } + + private void onFailedCoordinatorResponse( + final Exception exception, + final long currentTimeMs) { + coordinatorRequestState.updateLastFailedAttempt(currentTimeMs); + markCoordinatorUnknown("coordinator unavailable", currentTimeMs); + + if (exception == Errors.GROUP_AUTHORIZATION_FAILED.exception()) { + log.debug("FindCoordinator request failed due to authorization error {}", exception.getMessage()); + errorHandler.handle(GroupAuthorizationException.forGroupId(this.groupId)); + return; + } + + if (exception instanceof RetriableException) { + log.debug("FindCoordinator request failed due to retriable exception", exception); + return; + } + + log.warn("FindCoordinator request failed due to fatal exception", exception); + errorHandler.handle(exception); + } + + /** + * Handles the response and exception upon completing the {@link FindCoordinatorRequest}. This is invoked in the callback + * {@link FindCoordinatorRequestHandler}. If the response was successful, a coordinator node will be updated. If the + * response failed due to errors, the current coordinator will be marked unknown. + * @param currentTimeMs current time ins ms. + * @param response the response, can be null. + * @param e the exception, can be null. + */ + public void onResponse(final long currentTimeMs, final FindCoordinatorResponse response, final Exception e) { + // handles Runtime exception + if (e != null) { + onFailedCoordinatorResponse(e, currentTimeMs); + return; + } + + Optional<FindCoordinatorResponseData.Coordinator> coordinator = response.getCoordinatorByKey(this.groupId); + if (!coordinator.isPresent()) { + String msg = String.format("Coordinator not found for groupId: %s", this.groupId); + onFailedCoordinatorResponse(new IllegalStateException(msg), currentTimeMs); + return; + } + + FindCoordinatorResponseData.Coordinator node = coordinator.get(); + if (node.errorCode() != Errors.NONE.code()) { + onFailedCoordinatorResponse(Errors.forCode(node.errorCode()).exception(), currentTimeMs); + return; + } + onSuccessfulResponse(node); + } + + /** + * Returns the current coordinator node. It can be {@code null}. Review Comment: I wonder if it it would be safer to return `Optional`. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java: ########## @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.message.FindCoordinatorRequestData; +import org.apache.kafka.common.message.FindCoordinatorResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.FindCoordinatorRequest; +import org.apache.kafka.common.requests.FindCoordinatorResponse; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Objects; +import java.util.Optional; + +/** + * This is responsible for timing to send the next {@link FindCoordinatorRequest} based on the following criteria: + * + * Whether there is an existing coordinator. + * Whether there is an inflight request. + * Whether the backoff timer has expired. + * The {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} contains either a wait timer + * or a singleton list of {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest}. + * + * The {@link FindCoordinatorRequest} will be handled by the {@link FindCoordinatorRequestHandler} callback, which + * subsequently invokes {@code onResponse} to handle the exception and response. Note that the coordinator node will be + * marked {@code null} upon receiving a failure. + */ +public class CoordinatorRequestManager implements RequestManager { + + private final Logger log; + private final ErrorEventHandler errorHandler; + private final long rebalanceTimeoutMs; + private final String groupId; + + private final RequestState coordinatorRequestState; + private long timeMarkedUnknownMs = -1L; // starting logging a warning only after unable to connect for a while + private Node coordinator; + + public CoordinatorRequestManager(final LogContext logContext, + final ConsumerConfig config, + final ErrorEventHandler errorHandler, + final String groupId, + final long rebalanceTimeoutMs) { + Objects.requireNonNull(groupId); + this.log = logContext.logger(this.getClass()); + this.errorHandler = errorHandler; + this.groupId = groupId; + this.rebalanceTimeoutMs = rebalanceTimeoutMs; + this.coordinatorRequestState = new RequestState(config); + } + + // Visible for testing + CoordinatorRequestManager(final LogContext logContext, + final ErrorEventHandler errorHandler, + final String groupId, + final long rebalanceTimeoutMs, + final RequestState coordinatorRequestState) { + Objects.requireNonNull(groupId); + this.log = logContext.logger(this.getClass()); + this.errorHandler = errorHandler; + this.groupId = groupId; + this.rebalanceTimeoutMs = rebalanceTimeoutMs; + this.coordinatorRequestState = coordinatorRequestState; + } + + /** + * Poll for the FindCoordinator request. + * If we don't need to discover a coordinator, this method will return a PollResult with Long.MAX_VALUE backoff time and an empty list. + * If we are still backing off from a previous attempt, this method will return a PollResult with the remaining backoff time and an empty list. + * Otherwise, this returns will return a PollResult with a singleton list of UnsentRequest and Long.MAX_VALUE backoff time. + * Note that this method does not involve any actual network IO, and it only determines if we need to send a new request or not. + * + * @param currentTimeMs current time in ms. + * @return {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult}. This will not be {@code null}. + */ + @Override + public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { + if (this.coordinator != null) { + return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, Collections.emptyList()); + } + + if (coordinatorRequestState.canSendRequest(currentTimeMs)) { + NetworkClientDelegate.UnsentRequest request = makeFindCoordinatorRequest(currentTimeMs); + return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, Collections.singletonList(request)); + } + + return new NetworkClientDelegate.PollResult( + coordinatorRequestState.remainingBackoffMs(currentTimeMs), + new ArrayList<>()); + } + + private NetworkClientDelegate.UnsentRequest makeFindCoordinatorRequest(final long currentTimeMs) { + coordinatorRequestState.updateLastSend(currentTimeMs); + FindCoordinatorRequestData data = new FindCoordinatorRequestData() + .setKeyType(FindCoordinatorRequest.CoordinatorType.GROUP.id()) + .setKey(this.groupId); + return NetworkClientDelegate.UnsentRequest.makeUnsentRequest( + new FindCoordinatorRequest.Builder(data), + new FindCoordinatorRequestHandler(), + null); + } + + /** + * Mark the current coordinator null. + * @param cause why the coordinator is marked unknown. + * @param currentTimeMs the current time in ms. + */ + protected void markCoordinatorUnknown(final String cause, final long currentTimeMs) { + if (this.coordinator != null) { + log.info("Group coordinator {} is unavailable or invalid due to cause: {}. " + + "Rediscovery will be attempted.", this.coordinator, cause); + this.coordinator = null; + timeMarkedUnknownMs = currentTimeMs; + } else { + long durationOfOngoingDisconnect = Math.max(0, currentTimeMs - timeMarkedUnknownMs); + if (durationOfOngoingDisconnect > this.rebalanceTimeoutMs) + log.debug("Consumer has been disconnected from the group coordinator for {}ms", + durationOfOngoingDisconnect); + } + } + + private void onSuccessfulResponse(final FindCoordinatorResponseData.Coordinator coordinator) { + // use MAX_VALUE - node.id as the coordinator id to allow separate connections + // for the coordinator in the underlying network client layer + int coordinatorConnectionId = Integer.MAX_VALUE - coordinator.nodeId(); + + this.coordinator = new Node( + coordinatorConnectionId, + coordinator.host(), + coordinator.port()); + log.info("Discovered group coordinator {}", coordinator); + coordinatorRequestState.reset(); + } + + private void onFailedCoordinatorResponse( + final Exception exception, + final long currentTimeMs) { + coordinatorRequestState.updateLastFailedAttempt(currentTimeMs); + markCoordinatorUnknown("coordinator unavailable", currentTimeMs); + + if (exception == Errors.GROUP_AUTHORIZATION_FAILED.exception()) { Review Comment: nit: could we move the retriable case to the top here so that the fatal cases are not split? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java: ########## @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.message.FindCoordinatorRequestData; +import org.apache.kafka.common.message.FindCoordinatorResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.FindCoordinatorRequest; +import org.apache.kafka.common.requests.FindCoordinatorResponse; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Objects; +import java.util.Optional; + +/** + * This is responsible for timing to send the next {@link FindCoordinatorRequest} based on the following criteria: + * + * Whether there is an existing coordinator. + * Whether there is an inflight request. + * Whether the backoff timer has expired. + * The {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} contains either a wait timer + * or a singleton list of {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest}. + * + * The {@link FindCoordinatorRequest} will be handled by the {@link FindCoordinatorRequestHandler} callback, which + * subsequently invokes {@code onResponse} to handle the exception and response. Note that the coordinator node will be + * marked {@code null} upon receiving a failure. + */ +public class CoordinatorRequestManager implements RequestManager { + + private final Logger log; + private final ErrorEventHandler errorHandler; + private final long rebalanceTimeoutMs; + private final String groupId; + + private final RequestState coordinatorRequestState; + private long timeMarkedUnknownMs = -1L; // starting logging a warning only after unable to connect for a while + private Node coordinator; + + public CoordinatorRequestManager(final LogContext logContext, + final ConsumerConfig config, + final ErrorEventHandler errorHandler, + final String groupId, + final long rebalanceTimeoutMs) { + Objects.requireNonNull(groupId); + this.log = logContext.logger(this.getClass()); + this.errorHandler = errorHandler; + this.groupId = groupId; + this.rebalanceTimeoutMs = rebalanceTimeoutMs; + this.coordinatorRequestState = new RequestState(config); + } + + // Visible for testing + CoordinatorRequestManager(final LogContext logContext, + final ErrorEventHandler errorHandler, + final String groupId, + final long rebalanceTimeoutMs, + final RequestState coordinatorRequestState) { + Objects.requireNonNull(groupId); + this.log = logContext.logger(this.getClass()); + this.errorHandler = errorHandler; + this.groupId = groupId; + this.rebalanceTimeoutMs = rebalanceTimeoutMs; + this.coordinatorRequestState = coordinatorRequestState; + } + + /** + * Poll for the FindCoordinator request. + * If we don't need to discover a coordinator, this method will return a PollResult with Long.MAX_VALUE backoff time and an empty list. + * If we are still backing off from a previous attempt, this method will return a PollResult with the remaining backoff time and an empty list. + * Otherwise, this returns will return a PollResult with a singleton list of UnsentRequest and Long.MAX_VALUE backoff time. + * Note that this method does not involve any actual network IO, and it only determines if we need to send a new request or not. + * + * @param currentTimeMs current time in ms. + * @return {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult}. This will not be {@code null}. + */ + @Override + public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { + if (this.coordinator != null) { + return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, Collections.emptyList()); + } + + if (coordinatorRequestState.canSendRequest(currentTimeMs)) { + NetworkClientDelegate.UnsentRequest request = makeFindCoordinatorRequest(currentTimeMs); + return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, Collections.singletonList(request)); + } + + return new NetworkClientDelegate.PollResult( + coordinatorRequestState.remainingBackoffMs(currentTimeMs), + new ArrayList<>()); + } + + private NetworkClientDelegate.UnsentRequest makeFindCoordinatorRequest(final long currentTimeMs) { + coordinatorRequestState.updateLastSend(currentTimeMs); + FindCoordinatorRequestData data = new FindCoordinatorRequestData() + .setKeyType(FindCoordinatorRequest.CoordinatorType.GROUP.id()) + .setKey(this.groupId); + return NetworkClientDelegate.UnsentRequest.makeUnsentRequest( + new FindCoordinatorRequest.Builder(data), + new FindCoordinatorRequestHandler(), + null); + } + + /** + * Mark the current coordinator null. + * @param cause why the coordinator is marked unknown. + * @param currentTimeMs the current time in ms. + */ + protected void markCoordinatorUnknown(final String cause, final long currentTimeMs) { + if (this.coordinator != null) { + log.info("Group coordinator {} is unavailable or invalid due to cause: {}. " + + "Rediscovery will be attempted.", this.coordinator, cause); + this.coordinator = null; + timeMarkedUnknownMs = currentTimeMs; + } else { + long durationOfOngoingDisconnect = Math.max(0, currentTimeMs - timeMarkedUnknownMs); + if (durationOfOngoingDisconnect > this.rebalanceTimeoutMs) + log.debug("Consumer has been disconnected from the group coordinator for {}ms", + durationOfOngoingDisconnect); + } + } + + private void onSuccessfulResponse(final FindCoordinatorResponseData.Coordinator coordinator) { + // use MAX_VALUE - node.id as the coordinator id to allow separate connections + // for the coordinator in the underlying network client layer + int coordinatorConnectionId = Integer.MAX_VALUE - coordinator.nodeId(); + + this.coordinator = new Node( + coordinatorConnectionId, + coordinator.host(), + coordinator.port()); + log.info("Discovered group coordinator {}", coordinator); + coordinatorRequestState.reset(); + } + + private void onFailedCoordinatorResponse( + final Exception exception, + final long currentTimeMs) { + coordinatorRequestState.updateLastFailedAttempt(currentTimeMs); + markCoordinatorUnknown("coordinator unavailable", currentTimeMs); + + if (exception == Errors.GROUP_AUTHORIZATION_FAILED.exception()) { + log.debug("FindCoordinator request failed due to authorization error {}", exception.getMessage()); + errorHandler.handle(GroupAuthorizationException.forGroupId(this.groupId)); + return; + } + + if (exception instanceof RetriableException) { + log.debug("FindCoordinator request failed due to retriable exception", exception); + return; + } + + log.warn("FindCoordinator request failed due to fatal exception", exception); + errorHandler.handle(exception); + } + + /** + * Handles the response and exception upon completing the {@link FindCoordinatorRequest}. This is invoked in the callback + * {@link FindCoordinatorRequestHandler}. If the response was successful, a coordinator node will be updated. If the + * response failed due to errors, the current coordinator will be marked unknown. + * @param currentTimeMs current time ins ms. + * @param response the response, can be null. Review Comment: Perhaps we could clarify this? It looks like the code expects the response to be non-null if the exception is null? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java: ########## @@ -109,101 +132,89 @@ public void run() { try { runOnce(); } catch (final WakeupException e) { - log.debug( - "Exception thrown, background thread won't terminate", - e - ); - // swallow the wakeup exception to prevent killing the - // background thread. + log.debug("WakeupException caught, background thread won't be interrupted"); + // swallow the wakeup exception to prevent killing the background thread. } } } catch (final Throwable t) { - log.error( - "The background thread failed due to unexpected error", - t - ); - if (t instanceof RuntimeException) - this.exception.set(Optional.of((RuntimeException) t)); - else - this.exception.set(Optional.of(new RuntimeException(t))); + log.error("The background thread failed due to unexpected error", t); + throw new RuntimeException(t); } finally { close(); log.debug("{} closed", getClass()); } } /** - * Process event from a single poll + * Poll and process an {@link ApplicationEvent}. It performs the following tasks: + * 1. Try to poll and event from the queue, and try to process it using the coorsponding {@link ApplicationEventProcessor}. + * 2. Try to find Coordinator if needed + * 3. Poll the networkClient for outstanding requests. */ void runOnce() { - this.inflightEvent = maybePollEvent(); - if (this.inflightEvent.isPresent()) { - log.debug("processing application event: {}", this.inflightEvent); + Optional<ApplicationEvent> event = maybePollEvent(); + + if (event.isPresent()) { + log.debug("processing application event: {}", event); + consumeApplicationEvent(event.get()); } - if (this.inflightEvent.isPresent() && maybeConsumeInflightEvent(this.inflightEvent.get())) { - // clear inflight event upon successful consumption - this.inflightEvent = Optional.empty(); + + final long currentTimeMs = time.milliseconds(); + // TODO: This is just a place holder value. + long pollWaitTimeMs = 100; Review Comment: In `ConsumerNetworkClient`, we hard-code the following: ```java private static final int MAX_POLL_TIMEOUT_MS = 5000; ``` Perhaps we can do the same here and drop the TODO? ########## clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java: ########## @@ -50,6 +52,22 @@ public FindCoordinatorResponse(FindCoordinatorResponseData data) { this.data = data; } + public Optional<Coordinator> getCoordinatorByKey(String key) { Review Comment: nit: usually we drop `get` from method names ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java: ########## @@ -109,101 +132,89 @@ public void run() { try { runOnce(); } catch (final WakeupException e) { - log.debug( - "Exception thrown, background thread won't terminate", - e - ); - // swallow the wakeup exception to prevent killing the - // background thread. + log.debug("WakeupException caught, background thread won't be interrupted"); + // swallow the wakeup exception to prevent killing the background thread. } } } catch (final Throwable t) { - log.error( - "The background thread failed due to unexpected error", - t - ); - if (t instanceof RuntimeException) - this.exception.set(Optional.of((RuntimeException) t)); - else - this.exception.set(Optional.of(new RuntimeException(t))); + log.error("The background thread failed due to unexpected error", t); + throw new RuntimeException(t); } finally { close(); log.debug("{} closed", getClass()); } } /** - * Process event from a single poll + * Poll and process an {@link ApplicationEvent}. It performs the following tasks: + * 1. Try to poll and event from the queue, and try to process it using the coorsponding {@link ApplicationEventProcessor}. + * 2. Try to find Coordinator if needed + * 3. Poll the networkClient for outstanding requests. */ void runOnce() { - this.inflightEvent = maybePollEvent(); - if (this.inflightEvent.isPresent()) { - log.debug("processing application event: {}", this.inflightEvent); + Optional<ApplicationEvent> event = maybePollEvent(); + + if (event.isPresent()) { + log.debug("processing application event: {}", event); + consumeApplicationEvent(event.get()); } - if (this.inflightEvent.isPresent() && maybeConsumeInflightEvent(this.inflightEvent.get())) { - // clear inflight event upon successful consumption - this.inflightEvent = Optional.empty(); + + final long currentTimeMs = time.milliseconds(); + // TODO: This is just a place holder value. + long pollWaitTimeMs = 100; + + // TODO: Add a condition here, like shouldFindCoordinator in the future. Since we don't always need to find Review Comment: Can we drop this? This is already built into the call to `CoordinatorManager.poll`. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java: ########## @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.message.FindCoordinatorRequestData; +import org.apache.kafka.common.message.FindCoordinatorResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.FindCoordinatorRequest; +import org.apache.kafka.common.requests.FindCoordinatorResponse; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Objects; +import java.util.Optional; + +/** + * This is responsible for timing to send the next {@link FindCoordinatorRequest} based on the following criteria: + * + * Whether there is an existing coordinator. + * Whether there is an inflight request. + * Whether the backoff timer has expired. + * The {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} contains either a wait timer + * or a singleton list of {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest}. + * + * The {@link FindCoordinatorRequest} will be handled by the {@link FindCoordinatorRequestHandler} callback, which + * subsequently invokes {@code onResponse} to handle the exception and response. Note that the coordinator node will be + * marked {@code null} upon receiving a failure. + */ +public class CoordinatorRequestManager implements RequestManager { + + private final Logger log; + private final ErrorEventHandler errorHandler; + private final long rebalanceTimeoutMs; + private final String groupId; + + private final RequestState coordinatorRequestState; + private long timeMarkedUnknownMs = -1L; // starting logging a warning only after unable to connect for a while + private Node coordinator; + + public CoordinatorRequestManager(final LogContext logContext, + final ConsumerConfig config, + final ErrorEventHandler errorHandler, + final String groupId, + final long rebalanceTimeoutMs) { + Objects.requireNonNull(groupId); + this.log = logContext.logger(this.getClass()); + this.errorHandler = errorHandler; + this.groupId = groupId; + this.rebalanceTimeoutMs = rebalanceTimeoutMs; + this.coordinatorRequestState = new RequestState(config); + } + + // Visible for testing + CoordinatorRequestManager(final LogContext logContext, + final ErrorEventHandler errorHandler, + final String groupId, + final long rebalanceTimeoutMs, Review Comment: The dependence on the rebalance timeout is a little strange if the user is using simple partition assignment. I wonder if we could hard-code a value instead. Perhaps we can log a message every one minute (say) that the coordinator is unknown? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java: ########## @@ -20,10 +20,23 @@ * This is the abstract definition of the events created by the KafkaConsumer API */ abstract public class ApplicationEvent { + public final Type type; + + protected ApplicationEvent(Type type) { + this.type = type; + } /** * process the application event. Return true upon succesful execution, * false otherwise. * @return true if the event was successfully executed; false otherwise. */ - public abstract boolean process(); + + @Override + public String toString() { + return type + " ApplicationEvent"; + } + public enum Type { Review Comment: Perhaps we can leave this for a follow-up since we are not implementing the COMMIT type here. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java: ########## @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.message.FindCoordinatorRequestData; +import org.apache.kafka.common.message.FindCoordinatorResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.FindCoordinatorRequest; +import org.apache.kafka.common.requests.FindCoordinatorResponse; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Objects; +import java.util.Optional; + +/** + * This is responsible for timing to send the next {@link FindCoordinatorRequest} based on the following criteria: + * + * Whether there is an existing coordinator. + * Whether there is an inflight request. + * Whether the backoff timer has expired. + * The {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} contains either a wait timer + * or a singleton list of {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest}. + * + * The {@link FindCoordinatorRequest} will be handled by the {@link FindCoordinatorRequestHandler} callback, which + * subsequently invokes {@code onResponse} to handle the exception and response. Note that the coordinator node will be + * marked {@code null} upon receiving a failure. + */ +public class CoordinatorRequestManager implements RequestManager { + + private final Logger log; + private final ErrorEventHandler errorHandler; + private final long rebalanceTimeoutMs; + private final String groupId; + + private final RequestState coordinatorRequestState; + private long timeMarkedUnknownMs = -1L; // starting logging a warning only after unable to connect for a while + private Node coordinator; + + public CoordinatorRequestManager(final LogContext logContext, + final ConsumerConfig config, + final ErrorEventHandler errorHandler, + final String groupId, + final long rebalanceTimeoutMs) { + Objects.requireNonNull(groupId); + this.log = logContext.logger(this.getClass()); + this.errorHandler = errorHandler; + this.groupId = groupId; + this.rebalanceTimeoutMs = rebalanceTimeoutMs; + this.coordinatorRequestState = new RequestState(config); + } + + // Visible for testing + CoordinatorRequestManager(final LogContext logContext, + final ErrorEventHandler errorHandler, + final String groupId, + final long rebalanceTimeoutMs, + final RequestState coordinatorRequestState) { + Objects.requireNonNull(groupId); + this.log = logContext.logger(this.getClass()); + this.errorHandler = errorHandler; + this.groupId = groupId; + this.rebalanceTimeoutMs = rebalanceTimeoutMs; + this.coordinatorRequestState = coordinatorRequestState; + } + + /** + * Poll for the FindCoordinator request. + * If we don't need to discover a coordinator, this method will return a PollResult with Long.MAX_VALUE backoff time and an empty list. + * If we are still backing off from a previous attempt, this method will return a PollResult with the remaining backoff time and an empty list. + * Otherwise, this returns will return a PollResult with a singleton list of UnsentRequest and Long.MAX_VALUE backoff time. + * Note that this method does not involve any actual network IO, and it only determines if we need to send a new request or not. + * + * @param currentTimeMs current time in ms. + * @return {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult}. This will not be {@code null}. + */ + @Override + public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { + if (this.coordinator != null) { + return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, Collections.emptyList()); + } + + if (coordinatorRequestState.canSendRequest(currentTimeMs)) { + NetworkClientDelegate.UnsentRequest request = makeFindCoordinatorRequest(currentTimeMs); + return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, Collections.singletonList(request)); + } + + return new NetworkClientDelegate.PollResult( + coordinatorRequestState.remainingBackoffMs(currentTimeMs), + new ArrayList<>()); Review Comment: nit: we can use `Collections.emptyList()`? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java: ########## @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ClientRequest; +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.KafkaClient; +import org.apache.kafka.clients.RequestCompletionHandler; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.errors.DisconnectException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Queue; +import java.util.Set; + +/** + * A wrapper around the {@link org.apache.kafka.clients.NetworkClient} to handle poll and send operations. + */ +public class NetworkClientDelegate implements AutoCloseable { + private final KafkaClient client; + private final Time time; + private final Logger log; + private final int requestTimeoutMs; + private boolean wakeup = false; + private final Queue<UnsentRequest> unsentRequests; + private final Set<Node> activeNodes; + + public NetworkClientDelegate( + final Time time, + final ConsumerConfig config, + final LogContext logContext, + final KafkaClient client) { + this.time = time; + this.client = client; + this.log = logContext.logger(getClass()); + this.unsentRequests = new ArrayDeque<>(); + this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG); + this.activeNodes = new HashSet<>(); + } + + /** + * Returns the responses of the sent requests. This methods will try to send the unsent requests, poll for responses, + * and check the disconnected nodes. + * @param timeoutMs + * @return + */ + public List<ClientResponse> poll(final long timeoutMs) { + final long currentTimeMs = time.milliseconds(); + trySend(currentTimeMs); + List<ClientResponse> res = this.client.poll(timeoutMs, currentTimeMs); + checkDisconnects(); + maybeTriggerWakeup(); + return res; + } + + /** + * Iterates through the unsentRequests queue and tries to send the unsent requests. If the request doesn't have an + * assigned node, it will find the leastLoadedOne. + * Here it also stores all the nodes in the {@code activeNodes}, which will then be used to check for the disconnection. + */ + void trySend(final long currentTimeMs) { Review Comment: Can this be private? Seems like we can test it indirectly through a call to `poll`? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java: ########## @@ -50,57 +49,81 @@ public class DefaultBackgroundThread extends KafkaThread { private final Logger log; private final BlockingQueue<ApplicationEvent> applicationEventQueue; private final BlockingQueue<BackgroundEvent> backgroundEventQueue; - private final ConsumerNetworkClient networkClient; - private final SubscriptionState subscriptions; private final ConsumerMetadata metadata; - private final Metrics metrics; private final ConsumerConfig config; + // empty if groupId is null + private final Optional<CoordinatorRequestManager> coordinatorManager; + private final ApplicationEventProcessor applicationEventProcessor; + private final NetworkClientDelegate networkClientDelegate; + private final ErrorEventHandler errorEventHandler; - private String clientId; - private long retryBackoffMs; - private int heartbeatIntervalMs; private boolean running; - private Optional<ApplicationEvent> inflightEvent = Optional.empty(); - private final AtomicReference<Optional<RuntimeException>> exception = - new AtomicReference<>(Optional.empty()); + // Visible for testing + DefaultBackgroundThread(final Time time, + final ConsumerConfig config, + final LogContext logContext, + final BlockingQueue<ApplicationEvent> applicationEventQueue, + final BlockingQueue<BackgroundEvent> backgroundEventQueue, + final ErrorEventHandler errorEventHandler, + final ApplicationEventProcessor processor, + final ConsumerMetadata metadata, + final NetworkClientDelegate networkClient, + final CoordinatorRequestManager coordinatorManager) { + super(BACKGROUND_THREAD_NAME, true); + this.time = time; + this.running = true; + this.log = logContext.logger(getClass()); + this.applicationEventQueue = applicationEventQueue; + this.backgroundEventQueue = backgroundEventQueue; + this.applicationEventProcessor = processor; + this.config = config; + this.metadata = metadata; + this.networkClientDelegate = networkClient; + this.coordinatorManager = Optional.ofNullable(coordinatorManager); + this.errorEventHandler = errorEventHandler; + } public DefaultBackgroundThread(final Time time, final ConsumerConfig config, + final GroupRebalanceConfig rebalanceConfig, final LogContext logContext, final BlockingQueue<ApplicationEvent> applicationEventQueue, final BlockingQueue<BackgroundEvent> backgroundEventQueue, - final SubscriptionState subscriptions, final ConsumerMetadata metadata, - final ConsumerNetworkClient networkClient, - final Metrics metrics) { + final KafkaClient networkClient) { super(BACKGROUND_THREAD_NAME, true); try { this.time = time; - this.log = logContext.logger(DefaultBackgroundThread.class); + this.log = logContext.logger(getClass()); this.applicationEventQueue = applicationEventQueue; this.backgroundEventQueue = backgroundEventQueue; this.config = config; - setConfig(); - this.inflightEvent = Optional.empty(); // subscriptionState is initialized by the polling thread - this.subscriptions = subscriptions; this.metadata = metadata; - this.networkClient = networkClient; - this.metrics = metrics; + this.networkClientDelegate = new NetworkClientDelegate( + this.time, + this.config, + logContext, + networkClient); this.running = true; + this.errorEventHandler = new ErrorEventHandler(this.backgroundEventQueue); + String groupId = rebalanceConfig.groupId; + // TODO: Maybe consider a NOOP implementation Review Comment: Let's not leave TODOs in the code. We can file jiras instead. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java: ########## @@ -109,101 +132,89 @@ public void run() { try { runOnce(); } catch (final WakeupException e) { - log.debug( - "Exception thrown, background thread won't terminate", - e - ); - // swallow the wakeup exception to prevent killing the - // background thread. + log.debug("WakeupException caught, background thread won't be interrupted"); + // swallow the wakeup exception to prevent killing the background thread. } } } catch (final Throwable t) { - log.error( - "The background thread failed due to unexpected error", - t - ); - if (t instanceof RuntimeException) - this.exception.set(Optional.of((RuntimeException) t)); - else - this.exception.set(Optional.of(new RuntimeException(t))); + log.error("The background thread failed due to unexpected error", t); + throw new RuntimeException(t); } finally { close(); log.debug("{} closed", getClass()); } } /** - * Process event from a single poll + * Poll and process an {@link ApplicationEvent}. It performs the following tasks: + * 1. Try to poll and event from the queue, and try to process it using the coorsponding {@link ApplicationEventProcessor}. + * 2. Try to find Coordinator if needed + * 3. Poll the networkClient for outstanding requests. */ void runOnce() { - this.inflightEvent = maybePollEvent(); - if (this.inflightEvent.isPresent()) { - log.debug("processing application event: {}", this.inflightEvent); + Optional<ApplicationEvent> event = maybePollEvent(); + + if (event.isPresent()) { + log.debug("processing application event: {}", event); + consumeApplicationEvent(event.get()); } - if (this.inflightEvent.isPresent() && maybeConsumeInflightEvent(this.inflightEvent.get())) { - // clear inflight event upon successful consumption - this.inflightEvent = Optional.empty(); + + final long currentTimeMs = time.milliseconds(); + // TODO: This is just a place holder value. + long pollWaitTimeMs = 100; + + // TODO: Add a condition here, like shouldFindCoordinator in the future. Since we don't always need to find + // the coordinator. + if (coordinatorManager.isPresent()) { + pollWaitTimeMs = Math.min(pollWaitTimeMs, handlePollResult(coordinatorManager.get().poll(currentTimeMs))); } // if there are pending events to process, poll then continue without // blocking. - if (!applicationEventQueue.isEmpty() || inflightEvent.isPresent()) { - networkClient.poll(time.timer(0)); + if (!applicationEventQueue.isEmpty()) { + networkClientDelegate.poll(0); return; } - // if there are no events to process, poll until timeout. The timeout + // if there are no pending application event, poll until timeout. The timeout // will be the minimum of the requestTimeoutMs, nextHeartBeatMs, and // nextMetadataUpdate. See NetworkClient.poll impl. - networkClient.poll(time.timer(timeToNextHeartbeatMs(time.milliseconds()))); + networkClientDelegate.poll(pollWaitTimeMs); } - private long timeToNextHeartbeatMs(final long nowMs) { - // TODO: implemented when heartbeat is added to the impl - return 100; + long handlePollResult(NetworkClientDelegate.PollResult res) { + Objects.requireNonNull(res); Review Comment: nit: seems like overkill. Was there any reason you wanted to be careful with the result? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java: ########## @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ClientRequest; +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.KafkaClient; +import org.apache.kafka.clients.RequestCompletionHandler; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.errors.DisconnectException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Queue; +import java.util.Set; + +/** + * A wrapper around the {@link org.apache.kafka.clients.NetworkClient} to handle poll and send operations. + */ +public class NetworkClientDelegate implements AutoCloseable { + private final KafkaClient client; + private final Time time; + private final Logger log; + private final int requestTimeoutMs; + private boolean wakeup = false; + private final Queue<UnsentRequest> unsentRequests; + private final Set<Node> activeNodes; + + public NetworkClientDelegate( + final Time time, + final ConsumerConfig config, + final LogContext logContext, + final KafkaClient client) { + this.time = time; + this.client = client; + this.log = logContext.logger(getClass()); + this.unsentRequests = new ArrayDeque<>(); + this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG); + this.activeNodes = new HashSet<>(); + } + + /** + * Returns the responses of the sent requests. This methods will try to send the unsent requests, poll for responses, + * and check the disconnected nodes. + * @param timeoutMs + * @return + */ + public List<ClientResponse> poll(final long timeoutMs) { + final long currentTimeMs = time.milliseconds(); + trySend(currentTimeMs); + List<ClientResponse> res = this.client.poll(timeoutMs, currentTimeMs); + checkDisconnects(); + maybeTriggerWakeup(); + return res; + } + + /** + * Iterates through the unsentRequests queue and tries to send the unsent requests. If the request doesn't have an + * assigned node, it will find the leastLoadedOne. + * Here it also stores all the nodes in the {@code activeNodes}, which will then be used to check for the disconnection. + */ + void trySend(final long currentTimeMs) { + Queue<UnsentRequest> unsentAndUnreadyRequests = new LinkedList<>(); + while (!unsentRequests.isEmpty()) { + UnsentRequest unsent = unsentRequests.poll(); + unsent.timer.update(currentTimeMs); + if (unsent.timer.isExpired()) { + unsent.callback.ifPresent(c -> c.onFailure(new TimeoutException( + "Failed to send request after " + unsent.timer.timeoutMs() + " " + "ms."))); + continue; + } + + if (!doSend(unsent, currentTimeMs, unsentAndUnreadyRequests)) { + log.debug("No broker available to send the request: {}", unsent); + unsent.callback.ifPresent(v -> v.onFailure(Errors.NETWORK_EXCEPTION.exception( + "No node available in the kafka cluster to send the request"))); + } + } + + if (!unsentAndUnreadyRequests.isEmpty()) { + // Handle the unready requests in the next event loop + unsentRequests.addAll(unsentAndUnreadyRequests); + } + } + + // Visible for testing + boolean doSend(final UnsentRequest r, + final long currentTimeMs, + final Queue<UnsentRequest> unsentAndUnreadyRequests) { + Node node = r.node.orElse(client.leastLoadedNode(currentTimeMs)); + if (node == null || nodeUnavailable(node)) { + return false; + } + ClientRequest request = makeClientRequest(r, node); + if (client.isReady(node, currentTimeMs)) { + activeNodes.add(node); + client.send(request, currentTimeMs); + } else { + // enqueue the request again if the node isn't ready yet. The request will be handled in the next iteration + // of the event loop + log.debug("Node is not ready, handle the request in the next event loop: node={}, request={}", node, r); + unsentAndUnreadyRequests.add(r); + } + return true; + } + + private void checkDisconnects() { + // Check the connection status by all the nodes that are active. Disconnect the disconnected node if it is + // unable to be connected. + Iterator<Node> iter = activeNodes.iterator(); + while (iter.hasNext()) { + Node node = iter.next(); + iter.remove(); + if (client.connectionFailed(node)) { + client.disconnect(node.idString()); + } + } + } + + private ClientRequest makeClientRequest(final UnsentRequest unsent, final Node node) { + return client.newClientRequest( + node.idString(), + unsent.abstractBuilder, + time.milliseconds(), Review Comment: nit: we can pass through `currentTimeMs` from the poll call ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java: ########## @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ClientRequest; +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.KafkaClient; +import org.apache.kafka.clients.RequestCompletionHandler; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.errors.DisconnectException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Queue; +import java.util.Set; + +/** + * A wrapper around the {@link org.apache.kafka.clients.NetworkClient} to handle poll and send operations. + */ +public class NetworkClientDelegate implements AutoCloseable { + private final KafkaClient client; + private final Time time; + private final Logger log; + private final int requestTimeoutMs; + private boolean wakeup = false; + private final Queue<UnsentRequest> unsentRequests; + private final Set<Node> activeNodes; + + public NetworkClientDelegate( + final Time time, + final ConsumerConfig config, + final LogContext logContext, + final KafkaClient client) { + this.time = time; + this.client = client; + this.log = logContext.logger(getClass()); + this.unsentRequests = new ArrayDeque<>(); + this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG); + this.activeNodes = new HashSet<>(); + } + + /** + * Returns the responses of the sent requests. This methods will try to send the unsent requests, poll for responses, + * and check the disconnected nodes. + * @param timeoutMs + * @return + */ + public List<ClientResponse> poll(final long timeoutMs) { + final long currentTimeMs = time.milliseconds(); + trySend(currentTimeMs); + List<ClientResponse> res = this.client.poll(timeoutMs, currentTimeMs); + checkDisconnects(); + maybeTriggerWakeup(); + return res; + } + + /** + * Iterates through the unsentRequests queue and tries to send the unsent requests. If the request doesn't have an + * assigned node, it will find the leastLoadedOne. + * Here it also stores all the nodes in the {@code activeNodes}, which will then be used to check for the disconnection. + */ + void trySend(final long currentTimeMs) { + Queue<UnsentRequest> unsentAndUnreadyRequests = new LinkedList<>(); + while (!unsentRequests.isEmpty()) { + UnsentRequest unsent = unsentRequests.poll(); + unsent.timer.update(currentTimeMs); + if (unsent.timer.isExpired()) { + unsent.callback.ifPresent(c -> c.onFailure(new TimeoutException( + "Failed to send request after " + unsent.timer.timeoutMs() + " " + "ms."))); + continue; + } + + if (!doSend(unsent, currentTimeMs, unsentAndUnreadyRequests)) { + log.debug("No broker available to send the request: {}", unsent); + unsent.callback.ifPresent(v -> v.onFailure(Errors.NETWORK_EXCEPTION.exception( + "No node available in the kafka cluster to send the request"))); + } + } + + if (!unsentAndUnreadyRequests.isEmpty()) { + // Handle the unready requests in the next event loop + unsentRequests.addAll(unsentAndUnreadyRequests); + } + } + + // Visible for testing + boolean doSend(final UnsentRequest r, + final long currentTimeMs, + final Queue<UnsentRequest> unsentAndUnreadyRequests) { + Node node = r.node.orElse(client.leastLoadedNode(currentTimeMs)); + if (node == null || nodeUnavailable(node)) { + return false; + } + ClientRequest request = makeClientRequest(r, node); + if (client.isReady(node, currentTimeMs)) { + activeNodes.add(node); + client.send(request, currentTimeMs); + } else { + // enqueue the request again if the node isn't ready yet. The request will be handled in the next iteration + // of the event loop + log.debug("Node is not ready, handle the request in the next event loop: node={}, request={}", node, r); + unsentAndUnreadyRequests.add(r); + } + return true; + } + + private void checkDisconnects() { + // Check the connection status by all the nodes that are active. Disconnect the disconnected node if it is + // unable to be connected. + Iterator<Node> iter = activeNodes.iterator(); + while (iter.hasNext()) { + Node node = iter.next(); + iter.remove(); + if (client.connectionFailed(node)) { + client.disconnect(node.idString()); + } + } + } + + private ClientRequest makeClientRequest(final UnsentRequest unsent, final Node node) { + return client.newClientRequest( + node.idString(), + unsent.abstractBuilder, + time.milliseconds(), + true, + (int) unsent.timer.remainingMs(), + unsent.callback.orElse(new DefaultRequestFutureCompletionHandler())); + } + + public void maybeTriggerWakeup() { + if (this.wakeup) { + this.wakeup = false; + throw new WakeupException(); + } + } + + public void wakeup() { + this.wakeup = true; + this.client.wakeup(); + } + + public Node leastLoadedNode() { + return this.client.leastLoadedNode(time.milliseconds()); + } + + public void add(final UnsentRequest r) { + r.setTimer(this.time, this.requestTimeoutMs); + unsentRequests.add(r); + } + + public void ready(final Node node) { Review Comment: nit: we should resist exposing all of these if we can. I think we want a really basic api. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java: ########## @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ClientRequest; +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.KafkaClient; +import org.apache.kafka.clients.RequestCompletionHandler; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.errors.DisconnectException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Queue; +import java.util.Set; + +/** + * A wrapper around the {@link org.apache.kafka.clients.NetworkClient} to handle poll and send operations. + */ +public class NetworkClientDelegate implements AutoCloseable { + private final KafkaClient client; + private final Time time; + private final Logger log; + private final int requestTimeoutMs; + private boolean wakeup = false; + private final Queue<UnsentRequest> unsentRequests; + private final Set<Node> activeNodes; + + public NetworkClientDelegate( + final Time time, + final ConsumerConfig config, + final LogContext logContext, + final KafkaClient client) { + this.time = time; + this.client = client; + this.log = logContext.logger(getClass()); + this.unsentRequests = new ArrayDeque<>(); + this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG); + this.activeNodes = new HashSet<>(); + } + + /** + * Returns the responses of the sent requests. This methods will try to send the unsent requests, poll for responses, + * and check the disconnected nodes. + * @param timeoutMs + * @return + */ + public List<ClientResponse> poll(final long timeoutMs) { + final long currentTimeMs = time.milliseconds(); + trySend(currentTimeMs); + List<ClientResponse> res = this.client.poll(timeoutMs, currentTimeMs); + checkDisconnects(); + maybeTriggerWakeup(); + return res; + } + + /** + * Iterates through the unsentRequests queue and tries to send the unsent requests. If the request doesn't have an + * assigned node, it will find the leastLoadedOne. + * Here it also stores all the nodes in the {@code activeNodes}, which will then be used to check for the disconnection. + */ + void trySend(final long currentTimeMs) { + Queue<UnsentRequest> unsentAndUnreadyRequests = new LinkedList<>(); + while (!unsentRequests.isEmpty()) { + UnsentRequest unsent = unsentRequests.poll(); + unsent.timer.update(currentTimeMs); + if (unsent.timer.isExpired()) { + unsent.callback.ifPresent(c -> c.onFailure(new TimeoutException( + "Failed to send request after " + unsent.timer.timeoutMs() + " " + "ms."))); + continue; + } + + if (!doSend(unsent, currentTimeMs, unsentAndUnreadyRequests)) { + log.debug("No broker available to send the request: {}", unsent); + unsent.callback.ifPresent(v -> v.onFailure(Errors.NETWORK_EXCEPTION.exception( + "No node available in the kafka cluster to send the request"))); + } + } + + if (!unsentAndUnreadyRequests.isEmpty()) { + // Handle the unready requests in the next event loop + unsentRequests.addAll(unsentAndUnreadyRequests); + } + } + + // Visible for testing + boolean doSend(final UnsentRequest r, + final long currentTimeMs, + final Queue<UnsentRequest> unsentAndUnreadyRequests) { + Node node = r.node.orElse(client.leastLoadedNode(currentTimeMs)); + if (node == null || nodeUnavailable(node)) { + return false; + } + ClientRequest request = makeClientRequest(r, node); + if (client.isReady(node, currentTimeMs)) { + activeNodes.add(node); + client.send(request, currentTimeMs); + } else { + // enqueue the request again if the node isn't ready yet. The request will be handled in the next iteration + // of the event loop + log.debug("Node is not ready, handle the request in the next event loop: node={}, request={}", node, r); + unsentAndUnreadyRequests.add(r); + } + return true; + } + + private void checkDisconnects() { + // Check the connection status by all the nodes that are active. Disconnect the disconnected node if it is + // unable to be connected. + Iterator<Node> iter = activeNodes.iterator(); + while (iter.hasNext()) { + Node node = iter.next(); + iter.remove(); + if (client.connectionFailed(node)) { + client.disconnect(node.idString()); Review Comment: Why do we do this? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java: ########## @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ClientRequest; +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.KafkaClient; +import org.apache.kafka.clients.RequestCompletionHandler; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.errors.DisconnectException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Queue; +import java.util.Set; + +/** + * A wrapper around the {@link org.apache.kafka.clients.NetworkClient} to handle poll and send operations. + */ +public class NetworkClientDelegate implements AutoCloseable { + private final KafkaClient client; + private final Time time; + private final Logger log; + private final int requestTimeoutMs; + private boolean wakeup = false; + private final Queue<UnsentRequest> unsentRequests; + private final Set<Node> activeNodes; + + public NetworkClientDelegate( + final Time time, + final ConsumerConfig config, + final LogContext logContext, + final KafkaClient client) { + this.time = time; + this.client = client; + this.log = logContext.logger(getClass()); + this.unsentRequests = new ArrayDeque<>(); + this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG); + this.activeNodes = new HashSet<>(); + } + + /** + * Returns the responses of the sent requests. This methods will try to send the unsent requests, poll for responses, + * and check the disconnected nodes. + * @param timeoutMs + * @return + */ + public List<ClientResponse> poll(final long timeoutMs) { + final long currentTimeMs = time.milliseconds(); + trySend(currentTimeMs); + List<ClientResponse> res = this.client.poll(timeoutMs, currentTimeMs); + checkDisconnects(); + maybeTriggerWakeup(); + return res; + } + + /** + * Iterates through the unsentRequests queue and tries to send the unsent requests. If the request doesn't have an + * assigned node, it will find the leastLoadedOne. + * Here it also stores all the nodes in the {@code activeNodes}, which will then be used to check for the disconnection. + */ + void trySend(final long currentTimeMs) { + Queue<UnsentRequest> unsentAndUnreadyRequests = new LinkedList<>(); + while (!unsentRequests.isEmpty()) { + UnsentRequest unsent = unsentRequests.poll(); + unsent.timer.update(currentTimeMs); + if (unsent.timer.isExpired()) { + unsent.callback.ifPresent(c -> c.onFailure(new TimeoutException( + "Failed to send request after " + unsent.timer.timeoutMs() + " " + "ms."))); + continue; + } + + if (!doSend(unsent, currentTimeMs, unsentAndUnreadyRequests)) { + log.debug("No broker available to send the request: {}", unsent); + unsent.callback.ifPresent(v -> v.onFailure(Errors.NETWORK_EXCEPTION.exception( + "No node available in the kafka cluster to send the request"))); + } + } + + if (!unsentAndUnreadyRequests.isEmpty()) { + // Handle the unready requests in the next event loop + unsentRequests.addAll(unsentAndUnreadyRequests); + } + } + + // Visible for testing + boolean doSend(final UnsentRequest r, + final long currentTimeMs, + final Queue<UnsentRequest> unsentAndUnreadyRequests) { + Node node = r.node.orElse(client.leastLoadedNode(currentTimeMs)); + if (node == null || nodeUnavailable(node)) { + return false; + } + ClientRequest request = makeClientRequest(r, node); + if (client.isReady(node, currentTimeMs)) { + activeNodes.add(node); + client.send(request, currentTimeMs); + } else { + // enqueue the request again if the node isn't ready yet. The request will be handled in the next iteration + // of the event loop + log.debug("Node is not ready, handle the request in the next event loop: node={}, request={}", node, r); + unsentAndUnreadyRequests.add(r); + } + return true; + } + + private void checkDisconnects() { + // Check the connection status by all the nodes that are active. Disconnect the disconnected node if it is + // unable to be connected. + Iterator<Node> iter = activeNodes.iterator(); + while (iter.hasNext()) { + Node node = iter.next(); + iter.remove(); + if (client.connectionFailed(node)) { + client.disconnect(node.idString()); + } + } + } + + private ClientRequest makeClientRequest(final UnsentRequest unsent, final Node node) { + return client.newClientRequest( + node.idString(), + unsent.abstractBuilder, + time.milliseconds(), + true, + (int) unsent.timer.remainingMs(), + unsent.callback.orElse(new DefaultRequestFutureCompletionHandler())); + } + + public void maybeTriggerWakeup() { + if (this.wakeup) { + this.wakeup = false; + throw new WakeupException(); + } + } + + public void wakeup() { + this.wakeup = true; + this.client.wakeup(); + } + + public Node leastLoadedNode() { + return this.client.leastLoadedNode(time.milliseconds()); + } + + public void add(final UnsentRequest r) { + r.setTimer(this.time, this.requestTimeoutMs); + unsentRequests.add(r); + } + + public void ready(final Node node) { + client.ready(node, time.milliseconds()); + } + + /** + * Check if the code is disconnected and unavailable for immediate reconnection (i.e. if it is in + * reconnect backoff window following the disconnect). + */ + public boolean nodeUnavailable(final Node node) { + return client.connectionFailed(node) && client.connectionDelay(node, time.milliseconds()) > 0; + } + + public void close() throws IOException { + this.client.close(); + } + + public void addAll(final List<UnsentRequest> unsentRequests) { + unsentRequests.forEach(this::add); Review Comment: nit: `addAll`? ########## clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java: ########## @@ -50,6 +52,22 @@ public FindCoordinatorResponse(FindCoordinatorResponseData data) { this.data = data; } + public Optional<Coordinator> getCoordinatorByKey(String key) { + Objects.requireNonNull(key); + if (this.data.coordinators().isEmpty()) { + // version <= 3 Review Comment: Can we write a test case which covers all versions? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java: ########## @@ -109,101 +132,89 @@ public void run() { try { runOnce(); } catch (final WakeupException e) { - log.debug( - "Exception thrown, background thread won't terminate", - e - ); - // swallow the wakeup exception to prevent killing the - // background thread. + log.debug("WakeupException caught, background thread won't be interrupted"); + // swallow the wakeup exception to prevent killing the background thread. } } } catch (final Throwable t) { - log.error( - "The background thread failed due to unexpected error", - t - ); - if (t instanceof RuntimeException) - this.exception.set(Optional.of((RuntimeException) t)); - else - this.exception.set(Optional.of(new RuntimeException(t))); + log.error("The background thread failed due to unexpected error", t); + throw new RuntimeException(t); } finally { close(); log.debug("{} closed", getClass()); } } /** - * Process event from a single poll + * Poll and process an {@link ApplicationEvent}. It performs the following tasks: + * 1. Try to poll and event from the queue, and try to process it using the coorsponding {@link ApplicationEventProcessor}. + * 2. Try to find Coordinator if needed + * 3. Poll the networkClient for outstanding requests. */ void runOnce() { - this.inflightEvent = maybePollEvent(); - if (this.inflightEvent.isPresent()) { - log.debug("processing application event: {}", this.inflightEvent); + Optional<ApplicationEvent> event = maybePollEvent(); + + if (event.isPresent()) { + log.debug("processing application event: {}", event); + consumeApplicationEvent(event.get()); } - if (this.inflightEvent.isPresent() && maybeConsumeInflightEvent(this.inflightEvent.get())) { - // clear inflight event upon successful consumption - this.inflightEvent = Optional.empty(); + + final long currentTimeMs = time.milliseconds(); + // TODO: This is just a place holder value. + long pollWaitTimeMs = 100; + + // TODO: Add a condition here, like shouldFindCoordinator in the future. Since we don't always need to find + // the coordinator. + if (coordinatorManager.isPresent()) { + pollWaitTimeMs = Math.min(pollWaitTimeMs, handlePollResult(coordinatorManager.get().poll(currentTimeMs))); Review Comment: nit: can we move the call to `handlePollResult` to a separate line? It makes the code easier to follow. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java: ########## @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ClientRequest; +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.KafkaClient; +import org.apache.kafka.clients.RequestCompletionHandler; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.errors.DisconnectException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Queue; +import java.util.Set; + +/** + * A wrapper around the {@link org.apache.kafka.clients.NetworkClient} to handle poll and send operations. + */ +public class NetworkClientDelegate implements AutoCloseable { + private final KafkaClient client; + private final Time time; + private final Logger log; + private final int requestTimeoutMs; + private boolean wakeup = false; + private final Queue<UnsentRequest> unsentRequests; + private final Set<Node> activeNodes; + + public NetworkClientDelegate( + final Time time, + final ConsumerConfig config, + final LogContext logContext, + final KafkaClient client) { + this.time = time; + this.client = client; + this.log = logContext.logger(getClass()); + this.unsentRequests = new ArrayDeque<>(); + this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG); + this.activeNodes = new HashSet<>(); + } + + /** + * Returns the responses of the sent requests. This methods will try to send the unsent requests, poll for responses, + * and check the disconnected nodes. + * @param timeoutMs + * @return + */ + public List<ClientResponse> poll(final long timeoutMs) { + final long currentTimeMs = time.milliseconds(); + trySend(currentTimeMs); + List<ClientResponse> res = this.client.poll(timeoutMs, currentTimeMs); + checkDisconnects(); + maybeTriggerWakeup(); + return res; + } + + /** + * Iterates through the unsentRequests queue and tries to send the unsent requests. If the request doesn't have an + * assigned node, it will find the leastLoadedOne. + * Here it also stores all the nodes in the {@code activeNodes}, which will then be used to check for the disconnection. + */ + void trySend(final long currentTimeMs) { + Queue<UnsentRequest> unsentAndUnreadyRequests = new LinkedList<>(); + while (!unsentRequests.isEmpty()) { + UnsentRequest unsent = unsentRequests.poll(); + unsent.timer.update(currentTimeMs); + if (unsent.timer.isExpired()) { + unsent.callback.ifPresent(c -> c.onFailure(new TimeoutException( + "Failed to send request after " + unsent.timer.timeoutMs() + " " + "ms."))); + continue; + } + + if (!doSend(unsent, currentTimeMs, unsentAndUnreadyRequests)) { + log.debug("No broker available to send the request: {}", unsent); + unsent.callback.ifPresent(v -> v.onFailure(Errors.NETWORK_EXCEPTION.exception( + "No node available in the kafka cluster to send the request"))); + } + } + + if (!unsentAndUnreadyRequests.isEmpty()) { + // Handle the unready requests in the next event loop + unsentRequests.addAll(unsentAndUnreadyRequests); + } + } + + // Visible for testing + boolean doSend(final UnsentRequest r, + final long currentTimeMs, + final Queue<UnsentRequest> unsentAndUnreadyRequests) { + Node node = r.node.orElse(client.leastLoadedNode(currentTimeMs)); + if (node == null || nodeUnavailable(node)) { + return false; + } + ClientRequest request = makeClientRequest(r, node); + if (client.isReady(node, currentTimeMs)) { + activeNodes.add(node); + client.send(request, currentTimeMs); + } else { + // enqueue the request again if the node isn't ready yet. The request will be handled in the next iteration + // of the event loop + log.debug("Node is not ready, handle the request in the next event loop: node={}, request={}", node, r); + unsentAndUnreadyRequests.add(r); + } + return true; + } + + private void checkDisconnects() { + // Check the connection status by all the nodes that are active. Disconnect the disconnected node if it is + // unable to be connected. + Iterator<Node> iter = activeNodes.iterator(); + while (iter.hasNext()) { + Node node = iter.next(); + iter.remove(); + if (client.connectionFailed(node)) { + client.disconnect(node.idString()); + } + } + } + + private ClientRequest makeClientRequest(final UnsentRequest unsent, final Node node) { + return client.newClientRequest( + node.idString(), + unsent.abstractBuilder, + time.milliseconds(), + true, + (int) unsent.timer.remainingMs(), + unsent.callback.orElse(new DefaultRequestFutureCompletionHandler())); + } + + public void maybeTriggerWakeup() { + if (this.wakeup) { + this.wakeup = false; + throw new WakeupException(); + } + } + + public void wakeup() { + this.wakeup = true; + this.client.wakeup(); + } + + public Node leastLoadedNode() { + return this.client.leastLoadedNode(time.milliseconds()); + } + + public void add(final UnsentRequest r) { + r.setTimer(this.time, this.requestTimeoutMs); + unsentRequests.add(r); + } + + public void ready(final Node node) { + client.ready(node, time.milliseconds()); + } + + /** + * Check if the code is disconnected and unavailable for immediate reconnection (i.e. if it is in + * reconnect backoff window following the disconnect). + */ + public boolean nodeUnavailable(final Node node) { + return client.connectionFailed(node) && client.connectionDelay(node, time.milliseconds()) > 0; + } + + public void close() throws IOException { + this.client.close(); + } + + public void addAll(final List<UnsentRequest> unsentRequests) { + unsentRequests.forEach(this::add); + } + + public static class PollResult { + public final long timeMsTillNextPoll; Review Comment: nit: how about `timeUntilNextPollMs`? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java: ########## @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.message.FindCoordinatorRequestData; +import org.apache.kafka.common.message.FindCoordinatorResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.FindCoordinatorRequest; +import org.apache.kafka.common.requests.FindCoordinatorResponse; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Objects; +import java.util.Optional; + +/** + * This is responsible for timing to send the next {@link FindCoordinatorRequest} based on the following criteria: + * + * Whether there is an existing coordinator. + * Whether there is an inflight request. + * Whether the backoff timer has expired. + * The {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} contains either a wait timer + * or a singleton list of {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest}. + * + * The {@link FindCoordinatorRequest} will be handled by the {@link FindCoordinatorRequestHandler} callback, which + * subsequently invokes {@code onResponse} to handle the exception and response. Note that the coordinator node will be + * marked {@code null} upon receiving a failure. + */ +public class CoordinatorRequestManager implements RequestManager { + + private final Logger log; + private final ErrorEventHandler errorHandler; + private final long rebalanceTimeoutMs; + private final String groupId; + + private final RequestState coordinatorRequestState; + private long timeMarkedUnknownMs = -1L; // starting logging a warning only after unable to connect for a while + private Node coordinator; + + public CoordinatorRequestManager(final LogContext logContext, + final ConsumerConfig config, + final ErrorEventHandler errorHandler, + final String groupId, + final long rebalanceTimeoutMs) { + Objects.requireNonNull(groupId); + this.log = logContext.logger(this.getClass()); + this.errorHandler = errorHandler; + this.groupId = groupId; + this.rebalanceTimeoutMs = rebalanceTimeoutMs; + this.coordinatorRequestState = new RequestState(config); + } + + // Visible for testing + CoordinatorRequestManager(final LogContext logContext, + final ErrorEventHandler errorHandler, + final String groupId, + final long rebalanceTimeoutMs, + final RequestState coordinatorRequestState) { + Objects.requireNonNull(groupId); + this.log = logContext.logger(this.getClass()); + this.errorHandler = errorHandler; + this.groupId = groupId; + this.rebalanceTimeoutMs = rebalanceTimeoutMs; + this.coordinatorRequestState = coordinatorRequestState; + } + + /** + * Poll for the FindCoordinator request. + * If we don't need to discover a coordinator, this method will return a PollResult with Long.MAX_VALUE backoff time and an empty list. + * If we are still backing off from a previous attempt, this method will return a PollResult with the remaining backoff time and an empty list. + * Otherwise, this returns will return a PollResult with a singleton list of UnsentRequest and Long.MAX_VALUE backoff time. + * Note that this method does not involve any actual network IO, and it only determines if we need to send a new request or not. + * + * @param currentTimeMs current time in ms. + * @return {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult}. This will not be {@code null}. + */ + @Override + public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { + if (this.coordinator != null) { + return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, Collections.emptyList()); + } + + if (coordinatorRequestState.canSendRequest(currentTimeMs)) { + NetworkClientDelegate.UnsentRequest request = makeFindCoordinatorRequest(currentTimeMs); + return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, Collections.singletonList(request)); + } + + return new NetworkClientDelegate.PollResult( + coordinatorRequestState.remainingBackoffMs(currentTimeMs), + new ArrayList<>()); + } + + private NetworkClientDelegate.UnsentRequest makeFindCoordinatorRequest(final long currentTimeMs) { + coordinatorRequestState.updateLastSend(currentTimeMs); + FindCoordinatorRequestData data = new FindCoordinatorRequestData() + .setKeyType(FindCoordinatorRequest.CoordinatorType.GROUP.id()) + .setKey(this.groupId); + return NetworkClientDelegate.UnsentRequest.makeUnsentRequest( + new FindCoordinatorRequest.Builder(data), + new FindCoordinatorRequestHandler(), + null); + } + + /** + * Mark the current coordinator null. + * @param cause why the coordinator is marked unknown. + * @param currentTimeMs the current time in ms. + */ + protected void markCoordinatorUnknown(final String cause, final long currentTimeMs) { + if (this.coordinator != null) { + log.info("Group coordinator {} is unavailable or invalid due to cause: {}. " + + "Rediscovery will be attempted.", this.coordinator, cause); + this.coordinator = null; + timeMarkedUnknownMs = currentTimeMs; + } else { + long durationOfOngoingDisconnect = Math.max(0, currentTimeMs - timeMarkedUnknownMs); + if (durationOfOngoingDisconnect > this.rebalanceTimeoutMs) + log.debug("Consumer has been disconnected from the group coordinator for {}ms", + durationOfOngoingDisconnect); + } + } + + private void onSuccessfulResponse(final FindCoordinatorResponseData.Coordinator coordinator) { + // use MAX_VALUE - node.id as the coordinator id to allow separate connections + // for the coordinator in the underlying network client layer + int coordinatorConnectionId = Integer.MAX_VALUE - coordinator.nodeId(); + + this.coordinator = new Node( + coordinatorConnectionId, + coordinator.host(), + coordinator.port()); + log.info("Discovered group coordinator {}", coordinator); + coordinatorRequestState.reset(); + } + + private void onFailedCoordinatorResponse( + final Exception exception, + final long currentTimeMs) { + coordinatorRequestState.updateLastFailedAttempt(currentTimeMs); + markCoordinatorUnknown("coordinator unavailable", currentTimeMs); + + if (exception == Errors.GROUP_AUTHORIZATION_FAILED.exception()) { + log.debug("FindCoordinator request failed due to authorization error {}", exception.getMessage()); + errorHandler.handle(GroupAuthorizationException.forGroupId(this.groupId)); + return; + } + + if (exception instanceof RetriableException) { + log.debug("FindCoordinator request failed due to retriable exception", exception); + return; + } + + log.warn("FindCoordinator request failed due to fatal exception", exception); + errorHandler.handle(exception); + } + + /** + * Handles the response and exception upon completing the {@link FindCoordinatorRequest}. This is invoked in the callback + * {@link FindCoordinatorRequestHandler}. If the response was successful, a coordinator node will be updated. If the + * response failed due to errors, the current coordinator will be marked unknown. + * @param currentTimeMs current time ins ms. + * @param response the response, can be null. + * @param e the exception, can be null. + */ + public void onResponse(final long currentTimeMs, final FindCoordinatorResponse response, final Exception e) { + // handles Runtime exception + if (e != null) { + onFailedCoordinatorResponse(e, currentTimeMs); + return; + } + + Optional<FindCoordinatorResponseData.Coordinator> coordinator = response.getCoordinatorByKey(this.groupId); + if (!coordinator.isPresent()) { + String msg = String.format("Coordinator not found for groupId: %s", this.groupId); Review Comment: I think this message is a little too generic. How about something like this? > Response did not contain expected coordinator section for groupId: %s ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java: ########## @@ -109,101 +132,89 @@ public void run() { try { runOnce(); } catch (final WakeupException e) { - log.debug( - "Exception thrown, background thread won't terminate", - e - ); - // swallow the wakeup exception to prevent killing the - // background thread. + log.debug("WakeupException caught, background thread won't be interrupted"); + // swallow the wakeup exception to prevent killing the background thread. } } } catch (final Throwable t) { - log.error( - "The background thread failed due to unexpected error", - t - ); - if (t instanceof RuntimeException) - this.exception.set(Optional.of((RuntimeException) t)); - else - this.exception.set(Optional.of(new RuntimeException(t))); + log.error("The background thread failed due to unexpected error", t); + throw new RuntimeException(t); } finally { close(); log.debug("{} closed", getClass()); } } /** - * Process event from a single poll + * Poll and process an {@link ApplicationEvent}. It performs the following tasks: + * 1. Try to poll and event from the queue, and try to process it using the coorsponding {@link ApplicationEventProcessor}. + * 2. Try to find Coordinator if needed + * 3. Poll the networkClient for outstanding requests. */ void runOnce() { - this.inflightEvent = maybePollEvent(); - if (this.inflightEvent.isPresent()) { - log.debug("processing application event: {}", this.inflightEvent); + Optional<ApplicationEvent> event = maybePollEvent(); + + if (event.isPresent()) { + log.debug("processing application event: {}", event); + consumeApplicationEvent(event.get()); } - if (this.inflightEvent.isPresent() && maybeConsumeInflightEvent(this.inflightEvent.get())) { - // clear inflight event upon successful consumption - this.inflightEvent = Optional.empty(); + + final long currentTimeMs = time.milliseconds(); + // TODO: This is just a place holder value. + long pollWaitTimeMs = 100; + + // TODO: Add a condition here, like shouldFindCoordinator in the future. Since we don't always need to find + // the coordinator. + if (coordinatorManager.isPresent()) { + pollWaitTimeMs = Math.min(pollWaitTimeMs, handlePollResult(coordinatorManager.get().poll(currentTimeMs))); } // if there are pending events to process, poll then continue without // blocking. - if (!applicationEventQueue.isEmpty() || inflightEvent.isPresent()) { - networkClient.poll(time.timer(0)); + if (!applicationEventQueue.isEmpty()) { + networkClientDelegate.poll(0); Review Comment: It may take a little while before we are able to send a request (e.g. the connection might have reached the max inflight requests allowed). We do not want to busy loop in this case. I'd suggest using `retryBackoffMs` in this case. We can structure the logic like this. ```java if (!applicationEventQueue.isEmpty()) { pollWaitTimeMs = Math.min(retryBackoffMs, pollWaitTimeMs); } networkClientDelegate.poll(pollWaitTimeMs); ``` ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java: ########## @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.message.FindCoordinatorRequestData; +import org.apache.kafka.common.message.FindCoordinatorResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.FindCoordinatorRequest; +import org.apache.kafka.common.requests.FindCoordinatorResponse; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Objects; +import java.util.Optional; + +/** + * This is responsible for timing to send the next {@link FindCoordinatorRequest} based on the following criteria: + * + * Whether there is an existing coordinator. + * Whether there is an inflight request. + * Whether the backoff timer has expired. + * The {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} contains either a wait timer + * or a singleton list of {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest}. + * + * The {@link FindCoordinatorRequest} will be handled by the {@link FindCoordinatorRequestHandler} callback, which + * subsequently invokes {@code onResponse} to handle the exception and response. Note that the coordinator node will be + * marked {@code null} upon receiving a failure. + */ +public class CoordinatorRequestManager implements RequestManager { + + private final Logger log; + private final ErrorEventHandler errorHandler; + private final long rebalanceTimeoutMs; + private final String groupId; + + private final RequestState coordinatorRequestState; + private long timeMarkedUnknownMs = -1L; // starting logging a warning only after unable to connect for a while + private Node coordinator; + + public CoordinatorRequestManager(final LogContext logContext, + final ConsumerConfig config, + final ErrorEventHandler errorHandler, + final String groupId, + final long rebalanceTimeoutMs) { + Objects.requireNonNull(groupId); + this.log = logContext.logger(this.getClass()); + this.errorHandler = errorHandler; + this.groupId = groupId; + this.rebalanceTimeoutMs = rebalanceTimeoutMs; + this.coordinatorRequestState = new RequestState(config); + } + + // Visible for testing + CoordinatorRequestManager(final LogContext logContext, + final ErrorEventHandler errorHandler, + final String groupId, + final long rebalanceTimeoutMs, + final RequestState coordinatorRequestState) { + Objects.requireNonNull(groupId); + this.log = logContext.logger(this.getClass()); + this.errorHandler = errorHandler; + this.groupId = groupId; + this.rebalanceTimeoutMs = rebalanceTimeoutMs; + this.coordinatorRequestState = coordinatorRequestState; + } + + /** + * Poll for the FindCoordinator request. + * If we don't need to discover a coordinator, this method will return a PollResult with Long.MAX_VALUE backoff time and an empty list. + * If we are still backing off from a previous attempt, this method will return a PollResult with the remaining backoff time and an empty list. + * Otherwise, this returns will return a PollResult with a singleton list of UnsentRequest and Long.MAX_VALUE backoff time. + * Note that this method does not involve any actual network IO, and it only determines if we need to send a new request or not. + * + * @param currentTimeMs current time in ms. + * @return {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult}. This will not be {@code null}. + */ + @Override + public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { + if (this.coordinator != null) { + return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, Collections.emptyList()); + } + + if (coordinatorRequestState.canSendRequest(currentTimeMs)) { + NetworkClientDelegate.UnsentRequest request = makeFindCoordinatorRequest(currentTimeMs); + return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, Collections.singletonList(request)); + } + + return new NetworkClientDelegate.PollResult( + coordinatorRequestState.remainingBackoffMs(currentTimeMs), + new ArrayList<>()); + } + + private NetworkClientDelegate.UnsentRequest makeFindCoordinatorRequest(final long currentTimeMs) { + coordinatorRequestState.updateLastSend(currentTimeMs); + FindCoordinatorRequestData data = new FindCoordinatorRequestData() + .setKeyType(FindCoordinatorRequest.CoordinatorType.GROUP.id()) + .setKey(this.groupId); + return NetworkClientDelegate.UnsentRequest.makeUnsentRequest( + new FindCoordinatorRequest.Builder(data), + new FindCoordinatorRequestHandler(), + null); + } + + /** + * Mark the current coordinator null. + * @param cause why the coordinator is marked unknown. + * @param currentTimeMs the current time in ms. + */ + protected void markCoordinatorUnknown(final String cause, final long currentTimeMs) { + if (this.coordinator != null) { + log.info("Group coordinator {} is unavailable or invalid due to cause: {}. " + + "Rediscovery will be attempted.", this.coordinator, cause); + this.coordinator = null; + timeMarkedUnknownMs = currentTimeMs; + } else { + long durationOfOngoingDisconnect = Math.max(0, currentTimeMs - timeMarkedUnknownMs); + if (durationOfOngoingDisconnect > this.rebalanceTimeoutMs) + log.debug("Consumer has been disconnected from the group coordinator for {}ms", + durationOfOngoingDisconnect); + } + } + + private void onSuccessfulResponse(final FindCoordinatorResponseData.Coordinator coordinator) { + // use MAX_VALUE - node.id as the coordinator id to allow separate connections + // for the coordinator in the underlying network client layer + int coordinatorConnectionId = Integer.MAX_VALUE - coordinator.nodeId(); + + this.coordinator = new Node( + coordinatorConnectionId, + coordinator.host(), + coordinator.port()); + log.info("Discovered group coordinator {}", coordinator); + coordinatorRequestState.reset(); + } + + private void onFailedCoordinatorResponse( + final Exception exception, + final long currentTimeMs) { + coordinatorRequestState.updateLastFailedAttempt(currentTimeMs); + markCoordinatorUnknown("coordinator unavailable", currentTimeMs); Review Comment: Could we improve this message? Perhaps "FindCoordinator failed with exception"? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java: ########## @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ClientRequest; +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.KafkaClient; +import org.apache.kafka.clients.RequestCompletionHandler; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.errors.DisconnectException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Queue; +import java.util.Set; + +/** + * A wrapper around the {@link org.apache.kafka.clients.NetworkClient} to handle poll and send operations. + */ +public class NetworkClientDelegate implements AutoCloseable { + private final KafkaClient client; + private final Time time; + private final Logger log; + private final int requestTimeoutMs; + private boolean wakeup = false; + private final Queue<UnsentRequest> unsentRequests; + private final Set<Node> activeNodes; + + public NetworkClientDelegate( + final Time time, + final ConsumerConfig config, + final LogContext logContext, + final KafkaClient client) { + this.time = time; + this.client = client; + this.log = logContext.logger(getClass()); + this.unsentRequests = new ArrayDeque<>(); + this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG); + this.activeNodes = new HashSet<>(); + } + + /** + * Returns the responses of the sent requests. This methods will try to send the unsent requests, poll for responses, + * and check the disconnected nodes. + * @param timeoutMs + * @return + */ + public List<ClientResponse> poll(final long timeoutMs) { + final long currentTimeMs = time.milliseconds(); + trySend(currentTimeMs); + List<ClientResponse> res = this.client.poll(timeoutMs, currentTimeMs); + checkDisconnects(); + maybeTriggerWakeup(); + return res; + } + + /** + * Iterates through the unsentRequests queue and tries to send the unsent requests. If the request doesn't have an + * assigned node, it will find the leastLoadedOne. + * Here it also stores all the nodes in the {@code activeNodes}, which will then be used to check for the disconnection. + */ + void trySend(final long currentTimeMs) { + Queue<UnsentRequest> unsentAndUnreadyRequests = new LinkedList<>(); + while (!unsentRequests.isEmpty()) { + UnsentRequest unsent = unsentRequests.poll(); + unsent.timer.update(currentTimeMs); + if (unsent.timer.isExpired()) { + unsent.callback.ifPresent(c -> c.onFailure(new TimeoutException( + "Failed to send request after " + unsent.timer.timeoutMs() + " " + "ms."))); + continue; + } + + if (!doSend(unsent, currentTimeMs, unsentAndUnreadyRequests)) { + log.debug("No broker available to send the request: {}", unsent); Review Comment: Can't we retry internally until the request expires? What is the advantage to propagating this back at this point? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java: ########## @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ClientRequest; +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.KafkaClient; +import org.apache.kafka.clients.RequestCompletionHandler; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.errors.DisconnectException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Queue; +import java.util.Set; + +/** + * A wrapper around the {@link org.apache.kafka.clients.NetworkClient} to handle poll and send operations. + */ +public class NetworkClientDelegate implements AutoCloseable { + private final KafkaClient client; + private final Time time; + private final Logger log; + private final int requestTimeoutMs; + private boolean wakeup = false; + private final Queue<UnsentRequest> unsentRequests; + private final Set<Node> activeNodes; + + public NetworkClientDelegate( + final Time time, + final ConsumerConfig config, + final LogContext logContext, + final KafkaClient client) { + this.time = time; + this.client = client; + this.log = logContext.logger(getClass()); + this.unsentRequests = new ArrayDeque<>(); + this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG); + this.activeNodes = new HashSet<>(); + } + + /** + * Returns the responses of the sent requests. This methods will try to send the unsent requests, poll for responses, + * and check the disconnected nodes. + * @param timeoutMs + * @return + */ + public List<ClientResponse> poll(final long timeoutMs) { + final long currentTimeMs = time.milliseconds(); + trySend(currentTimeMs); + List<ClientResponse> res = this.client.poll(timeoutMs, currentTimeMs); + checkDisconnects(); + maybeTriggerWakeup(); + return res; + } + + /** + * Iterates through the unsentRequests queue and tries to send the unsent requests. If the request doesn't have an + * assigned node, it will find the leastLoadedOne. + * Here it also stores all the nodes in the {@code activeNodes}, which will then be used to check for the disconnection. + */ + void trySend(final long currentTimeMs) { + Queue<UnsentRequest> unsentAndUnreadyRequests = new LinkedList<>(); + while (!unsentRequests.isEmpty()) { + UnsentRequest unsent = unsentRequests.poll(); + unsent.timer.update(currentTimeMs); + if (unsent.timer.isExpired()) { + unsent.callback.ifPresent(c -> c.onFailure(new TimeoutException( + "Failed to send request after " + unsent.timer.timeoutMs() + " " + "ms."))); + continue; + } + + if (!doSend(unsent, currentTimeMs, unsentAndUnreadyRequests)) { + log.debug("No broker available to send the request: {}", unsent); + unsent.callback.ifPresent(v -> v.onFailure(Errors.NETWORK_EXCEPTION.exception( + "No node available in the kafka cluster to send the request"))); + } + } + + if (!unsentAndUnreadyRequests.isEmpty()) { + // Handle the unready requests in the next event loop + unsentRequests.addAll(unsentAndUnreadyRequests); Review Comment: Instead of rebuilding the queue after each poll(), can we use an iterator with `Iterator.remove()` to remove the request after it has been successfully sent? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java: ########## @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ClientRequest; +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.KafkaClient; +import org.apache.kafka.clients.RequestCompletionHandler; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.errors.DisconnectException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Queue; +import java.util.Set; + +/** + * A wrapper around the {@link org.apache.kafka.clients.NetworkClient} to handle poll and send operations. + */ +public class NetworkClientDelegate implements AutoCloseable { + private final KafkaClient client; + private final Time time; + private final Logger log; + private final int requestTimeoutMs; + private boolean wakeup = false; + private final Queue<UnsentRequest> unsentRequests; + private final Set<Node> activeNodes; + + public NetworkClientDelegate( + final Time time, + final ConsumerConfig config, + final LogContext logContext, + final KafkaClient client) { + this.time = time; + this.client = client; + this.log = logContext.logger(getClass()); + this.unsentRequests = new ArrayDeque<>(); + this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG); + this.activeNodes = new HashSet<>(); + } + + /** + * Returns the responses of the sent requests. This methods will try to send the unsent requests, poll for responses, + * and check the disconnected nodes. + * @param timeoutMs + * @return + */ + public List<ClientResponse> poll(final long timeoutMs) { Review Comment: nit: since we are computing the current time in `DefaultBackgroundThread`, perhaps we could just pass it through to `poll`? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java: ########## @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ClientRequest; +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.KafkaClient; +import org.apache.kafka.clients.RequestCompletionHandler; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.errors.DisconnectException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Queue; +import java.util.Set; + +/** + * A wrapper around the {@link org.apache.kafka.clients.NetworkClient} to handle poll and send operations. + */ +public class NetworkClientDelegate implements AutoCloseable { + private final KafkaClient client; + private final Time time; + private final Logger log; + private final int requestTimeoutMs; + private boolean wakeup = false; Review Comment: Not sure if you missed my previous comment, but we don't need this. We want `poll` to just return after wakeup, not raise the `WakeupException`. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java: ########## @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ClientRequest; +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.KafkaClient; +import org.apache.kafka.clients.RequestCompletionHandler; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.errors.DisconnectException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Queue; +import java.util.Set; + +/** + * A wrapper around the {@link org.apache.kafka.clients.NetworkClient} to handle poll and send operations. + */ +public class NetworkClientDelegate implements AutoCloseable { + private final KafkaClient client; + private final Time time; + private final Logger log; + private final int requestTimeoutMs; + private boolean wakeup = false; + private final Queue<UnsentRequest> unsentRequests; + private final Set<Node> activeNodes; + + public NetworkClientDelegate( + final Time time, + final ConsumerConfig config, + final LogContext logContext, + final KafkaClient client) { + this.time = time; + this.client = client; + this.log = logContext.logger(getClass()); + this.unsentRequests = new ArrayDeque<>(); + this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG); + this.activeNodes = new HashSet<>(); + } + + /** + * Returns the responses of the sent requests. This methods will try to send the unsent requests, poll for responses, + * and check the disconnected nodes. + * @param timeoutMs + * @return + */ + public List<ClientResponse> poll(final long timeoutMs) { + final long currentTimeMs = time.milliseconds(); + trySend(currentTimeMs); + List<ClientResponse> res = this.client.poll(timeoutMs, currentTimeMs); + checkDisconnects(); + maybeTriggerWakeup(); + return res; + } + + /** + * Iterates through the unsentRequests queue and tries to send the unsent requests. If the request doesn't have an + * assigned node, it will find the leastLoadedOne. + * Here it also stores all the nodes in the {@code activeNodes}, which will then be used to check for the disconnection. + */ + void trySend(final long currentTimeMs) { + Queue<UnsentRequest> unsentAndUnreadyRequests = new LinkedList<>(); + while (!unsentRequests.isEmpty()) { + UnsentRequest unsent = unsentRequests.poll(); + unsent.timer.update(currentTimeMs); + if (unsent.timer.isExpired()) { + unsent.callback.ifPresent(c -> c.onFailure(new TimeoutException( + "Failed to send request after " + unsent.timer.timeoutMs() + " " + "ms."))); + continue; + } + + if (!doSend(unsent, currentTimeMs, unsentAndUnreadyRequests)) { + log.debug("No broker available to send the request: {}", unsent); + unsent.callback.ifPresent(v -> v.onFailure(Errors.NETWORK_EXCEPTION.exception( + "No node available in the kafka cluster to send the request"))); + } + } + + if (!unsentAndUnreadyRequests.isEmpty()) { + // Handle the unready requests in the next event loop + unsentRequests.addAll(unsentAndUnreadyRequests); + } + } + + // Visible for testing Review Comment: Similar comment. Best not to expose the innards like this. We can test behavior through `poll`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org