lianetm commented on code in PR #17440:
URL: https://github.com/apache/kafka/pull/17440#discussion_r1846829642


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java:
##########
@@ -188,6 +190,8 @@ private void onFailedResponse(final long currentTimeMs, 
final Throwable exceptio
         if (exception == Errors.GROUP_AUTHORIZATION_FAILED.exception()) {
             log.debug("FindCoordinator request failed due to authorization 
error {}", exception.getMessage());
             KafkaException groupAuthorizationException = 
GroupAuthorizationException.forGroupId(this.groupId);
+            metadataError.completeExceptionally(groupAuthorizationException);
+            metadataError = metadataError.newIncompleteFuture();

Review Comment:
   Just with the goal of simplifying here, is a future really needed or just 
keeping the error could do? The main difference with the metadata errors is 
that the `CommitRequestManager.poll` called regularly will have the pending 
requests in hand, so we could fail them if there is a fatal error in the 
coordinator manager. Would that work?  



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -425,6 +427,13 @@ public CompletableFuture<Map<TopicPartition, 
OffsetAndMetadata>> commitSync(fina
         CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result = new 
CompletableFuture<>();
         OffsetCommitRequestState requestState = 
createOffsetCommitRequest(commitOffsets, deadlineMs);
         commitSyncWithRetries(requestState, result);
+        new ArrayList<>(metadataErrors).forEach(metadataError -> 
metadataError.whenComplete((__, error) -> {
+            if (error != null) {
+                result.completeExceptionally(error);
+                metadataErrors.remove(metadataError);
+            }
+        }));
+        

Review Comment:
   this is only handling the commit sync but we do have other requests that 
should also fail if there is a fatal error in the coordinator. Given that all 
requests that this manager sends end up being added to the `pendingRequests`, 
and we attempt to send them on poll, shouldn't we just check on poll of there 
is no coordinator due to a fatal error and fail all pendingRequests? It would 
ensure we behave consistently for all requests.
   
   ```
       public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
           if (!coordinatorRequestManager.coordinator().isPresent()) {
               if (coordinatorRequestManager.fatalError().isPresent()) {
                   // Fatal error looking up coordinator => Fail all 
pendingRequests
               }
               return EMPTY;
           }
           ...
   ```
   I created this separate Jira 
https://issues.apache.org/jira/browse/KAFKA-18034 for this, given that it's a 
different gap and fix. I would suggest we address that in a separate/simpler PR 
with these thoughts, what do you think? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to