kirktrue commented on code in PR #18050: URL: https://github.com/apache/kafka/pull/18050#discussion_r1904717131
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ########## @@ -196,6 +198,16 @@ public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { return new NetworkClientDelegate.PollResult(timeUntilNextPoll, requests); } + private void maybeFailPendingRequestsOnCoordinatorFatalError() { + Optional<Throwable> fatalError = coordinatorRequestManager.fatalError(); + if (fatalError.isPresent()) { + log.warn("Failing all unset commit requests and offset fetches because of coordinator fatal error. ", fatalError.get()); Review Comment: ```suggestion log.warn("Failing all unsent commit requests and offset fetches because of coordinator fatal error. ", fatalError.get()); ``` ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ########## @@ -196,6 +198,14 @@ public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { return new NetworkClientDelegate.PollResult(timeUntilNextPoll, requests); } + private void maybeFailPendingRequestsOnCoordinatorFatalError() { + Optional<Throwable> fatalError = coordinatorRequestManager.fatalError(); + if (fatalError.isPresent()) { + pendingRequests.unsentOffsetCommits.forEach(request -> request.future.completeExceptionally(fatalError.get())); + pendingRequests.unsentOffsetFetches.forEach(request -> request.future.completeExceptionally(fatalError.get())); Review Comment: > don't we need to remove the pending requests that we're failing here? `pendingRequests` are usually cleared on poll [clearAll] Isn't that what `clearAll()` on line 207 does? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java: ########## @@ -263,6 +264,11 @@ public void resetPollTimer(final long pollMs) { pollTimer.reset(maxPollIntervalMs); } + private void maybePropagateCoordinatorFatalErrorEvent() { + coordinatorRequestManager.getAndClearFatalError() + .ifPresent(fatalError -> backgroundEventHandler.add(new ErrorEvent(fatalError))); + } + Review Comment: My apologies—it's still not clear from looking at the code why the `AbstractHeartbeatRequestManager` clears the fatal error but `CommitRequestManager` doesn't. Can you provide some context for the different ways of interacting with the error? Thanks! ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ########## @@ -196,6 +198,16 @@ public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { return new NetworkClientDelegate.PollResult(timeUntilNextPoll, requests); } + private void maybeFailPendingRequestsOnCoordinatorFatalError() { Review Comment: What about moving the entire `maybeFailPendingRequestsOnCoordinatorFatalError()` method into the inner `PendingRequests` class directly? That way the outer class doesn't have to worry about the details of how to fail them, only that it has to call `pendingRequests.maybeFailOnCoordinatorFatalError()`? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java: ########## @@ -188,12 +184,12 @@ private void onFailedResponse(final long currentTimeMs, final Throwable exceptio if (exception == Errors.GROUP_AUTHORIZATION_FAILED.exception()) { log.debug("FindCoordinator request failed due to authorization error {}", exception.getMessage()); KafkaException groupAuthorizationException = GroupAuthorizationException.forGroupId(this.groupId); - backgroundEventHandler.add(new ErrorEvent(groupAuthorizationException)); + fatalError = Optional.of(groupAuthorizationException); return; } log.warn("FindCoordinator request failed due to fatal exception", exception); - backgroundEventHandler.add(new ErrorEvent(exception)); + fatalError = Optional.of(exception); Review Comment: Clearing the fatal error on a successful response from the coordinator seems _intuitively_ correct, but I could easily be wrong. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java: ########## @@ -53,24 +51,27 @@ public class CoordinatorRequestManager implements RequestManager { private static final long COORDINATOR_DISCONNECT_LOGGING_INTERVAL_MS = 60 * 1000; private final Logger log; - private final BackgroundEventHandler backgroundEventHandler; private final String groupId; private final RequestState coordinatorRequestState; private long timeMarkedUnknownMs = -1L; // starting logging a warning only after unable to connect for a while private long totalDisconnectedMin = 0; private Node coordinator; + // Hold the last fatal error received. It is exposed so that managers requiring a coordinator can access it and take Review Comment: Super nit picky, but can we change the term _last_ to either _latest_ or _most recent_? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org