lianetm commented on code in PR #17440: URL: https://github.com/apache/kafka/pull/17440#discussion_r1846829642
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java: ########## @@ -188,6 +190,8 @@ private void onFailedResponse(final long currentTimeMs, final Throwable exceptio if (exception == Errors.GROUP_AUTHORIZATION_FAILED.exception()) { log.debug("FindCoordinator request failed due to authorization error {}", exception.getMessage()); KafkaException groupAuthorizationException = GroupAuthorizationException.forGroupId(this.groupId); + metadataError.completeExceptionally(groupAuthorizationException); + metadataError = metadataError.newIncompleteFuture(); Review Comment: Just with the goal of simplifying here, is a future really needed or just keeping the error could do? The main difference with the metadata errors is that the `CommitRequestManager.poll` called regularly will have the pending requests in hand, so we could fail them if there is a fatal error in the coordinator manager. Would that work? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ########## @@ -425,6 +427,13 @@ public CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitSync(fina CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result = new CompletableFuture<>(); OffsetCommitRequestState requestState = createOffsetCommitRequest(commitOffsets, deadlineMs); commitSyncWithRetries(requestState, result); + new ArrayList<>(metadataErrors).forEach(metadataError -> metadataError.whenComplete((__, error) -> { + if (error != null) { + result.completeExceptionally(error); + metadataErrors.remove(metadataError); + } + })); + Review Comment: this is only handling the commit sync but we do have other requests that should also fail if there is a fatal error in the coordinator. Given that all requests that this manager sends end up being added to the `pendingRequests`, and we attempt to send them on poll, shouldn't we just check on poll of there is no coordinator due to a fatal error and fail all pendingRequests? It would ensure we behave consistently for all requests. ``` public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { if (!coordinatorRequestManager.coordinator().isPresent()) { if (coordinatorRequestManager.fatalError().isPresent()) { // Fatal error looking up coordinator => Fail all pendingRequests } return EMPTY; } ... ``` I created this separate Jira https://issues.apache.org/jira/browse/KAFKA-18034 for this, given that it's a different gap and fix. I would suggest we address that in a separate/simpler PR with these thoughts, what do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org