philipnee commented on code in PR #12862: URL: https://github.com/apache/kafka/pull/12862#discussion_r1047782610
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java: ########## @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.message.FindCoordinatorRequestData; +import org.apache.kafka.common.message.FindCoordinatorResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.FindCoordinatorRequest; +import org.apache.kafka.common.requests.FindCoordinatorResponse; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Objects; +import java.util.Optional; + +/** + * This is responsible for timing to send the next {@link FindCoordinatorRequest} based on the following criteria: + * + * Whether there is an existing coordinator. + * Whether there is an inflight request. + * Whether the backoff timer has expired. + * The {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} contains either a wait timer + * or a singleton list of {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest}. + * + * The {@link FindCoordinatorRequest} will be handled by the {@link FindCoordinatorRequestHandler} callback, which + * subsequently invokes {@code onResponse} to handle the exception and response. Note that the coordinator node will be + * marked {@code null} upon receiving a failure. + */ +public class CoordinatorRequestManager implements RequestManager { + + private final Logger log; + private final ErrorEventHandler errorHandler; + private final long rebalanceTimeoutMs; + private final String groupId; + + private final RequestState coordinatorRequestState; + private long timeMarkedUnknownMs = -1L; // starting logging a warning only after unable to connect for a while + private Node coordinator; + + public CoordinatorRequestManager(final LogContext logContext, + final ConsumerConfig config, + final ErrorEventHandler errorHandler, + final String groupId, + final long rebalanceTimeoutMs) { + Objects.requireNonNull(groupId); + this.log = logContext.logger(this.getClass()); + this.errorHandler = errorHandler; + this.groupId = groupId; + this.rebalanceTimeoutMs = rebalanceTimeoutMs; + this.coordinatorRequestState = new RequestState(config); + } + + // Visible for testing + CoordinatorRequestManager(final LogContext logContext, + final ErrorEventHandler errorHandler, + final String groupId, + final long rebalanceTimeoutMs, + final RequestState coordinatorRequestState) { + Objects.requireNonNull(groupId); + this.log = logContext.logger(this.getClass()); + this.errorHandler = errorHandler; + this.groupId = groupId; + this.rebalanceTimeoutMs = rebalanceTimeoutMs; + this.coordinatorRequestState = coordinatorRequestState; + } + + /** + * Poll for the FindCoordinator request. + * If we don't need to discover a coordinator, this method will return a PollResult with Long.MAX_VALUE backoff time and an empty list. + * If we are still backing off from a previous attempt, this method will return a PollResult with the remaining backoff time and an empty list. + * Otherwise, this returns will return a PollResult with a singleton list of UnsentRequest and Long.MAX_VALUE backoff time. + * Note that this method does not involve any actual network IO, and it only determines if we need to send a new request or not. + * + * @param currentTimeMs current time in ms. + * @return {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult}. This will not be {@code null}. + */ + @Override + public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { + if (this.coordinator != null) { + return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, Collections.emptyList()); + } + + if (coordinatorRequestState.canSendRequest(currentTimeMs)) { + NetworkClientDelegate.UnsentRequest request = makeFindCoordinatorRequest(currentTimeMs); + return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, Collections.singletonList(request)); + } + + return new NetworkClientDelegate.PollResult( + coordinatorRequestState.remainingBackoffMs(currentTimeMs), + new ArrayList<>()); + } + + private NetworkClientDelegate.UnsentRequest makeFindCoordinatorRequest(final long currentTimeMs) { + coordinatorRequestState.updateLastSend(currentTimeMs); + FindCoordinatorRequestData data = new FindCoordinatorRequestData() + .setKeyType(FindCoordinatorRequest.CoordinatorType.GROUP.id()) + .setKey(this.groupId); + return NetworkClientDelegate.UnsentRequest.makeUnsentRequest( + new FindCoordinatorRequest.Builder(data), + new FindCoordinatorRequestHandler(), + null); + } + + /** + * Mark the current coordinator null. + * @param cause why the coordinator is marked unknown. + * @param currentTimeMs the current time in ms. + */ + protected void markCoordinatorUnknown(final String cause, final long currentTimeMs) { + if (this.coordinator != null) { + log.info("Group coordinator {} is unavailable or invalid due to cause: {}. " + + "Rediscovery will be attempted.", this.coordinator, cause); + this.coordinator = null; + timeMarkedUnknownMs = currentTimeMs; + } else { + long durationOfOngoingDisconnect = Math.max(0, currentTimeMs - timeMarkedUnknownMs); + if (durationOfOngoingDisconnect > this.rebalanceTimeoutMs) + log.debug("Consumer has been disconnected from the group coordinator for {}ms", + durationOfOngoingDisconnect); + } + } + + private void onSuccessfulResponse(final FindCoordinatorResponseData.Coordinator coordinator) { + // use MAX_VALUE - node.id as the coordinator id to allow separate connections + // for the coordinator in the underlying network client layer + int coordinatorConnectionId = Integer.MAX_VALUE - coordinator.nodeId(); + + this.coordinator = new Node( + coordinatorConnectionId, + coordinator.host(), + coordinator.port()); + log.info("Discovered group coordinator {}", coordinator); + coordinatorRequestState.reset(); + } + + private void onFailedCoordinatorResponse( + final Exception exception, + final long currentTimeMs) { + coordinatorRequestState.updateLastFailedAttempt(currentTimeMs); + markCoordinatorUnknown("coordinator unavailable", currentTimeMs); + + if (exception == Errors.GROUP_AUTHORIZATION_FAILED.exception()) { Review Comment: Thanks, somehow I thought this is a retriable error. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
