kirktrue commented on code in PR #18050:
URL: https://github.com/apache/kafka/pull/18050#discussion_r1904717131


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -196,6 +198,16 @@ public NetworkClientDelegate.PollResult poll(final long 
currentTimeMs) {
         return new NetworkClientDelegate.PollResult(timeUntilNextPoll, 
requests);
     }
 
+    private void maybeFailPendingRequestsOnCoordinatorFatalError() {
+        Optional<Throwable> fatalError = 
coordinatorRequestManager.fatalError();
+        if (fatalError.isPresent()) {
+            log.warn("Failing all unset commit requests and offset fetches 
because of coordinator fatal error. ", fatalError.get());

Review Comment:
   ```suggestion
               log.warn("Failing all unsent commit requests and offset fetches 
because of coordinator fatal error. ", fatalError.get());
   ```



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -196,6 +198,14 @@ public NetworkClientDelegate.PollResult poll(final long 
currentTimeMs) {
         return new NetworkClientDelegate.PollResult(timeUntilNextPoll, 
requests);
     }
 
+    private void maybeFailPendingRequestsOnCoordinatorFatalError() {
+        Optional<Throwable> fatalError = 
coordinatorRequestManager.fatalError();
+        if (fatalError.isPresent()) {
+            pendingRequests.unsentOffsetCommits.forEach(request -> 
request.future.completeExceptionally(fatalError.get()));
+            pendingRequests.unsentOffsetFetches.forEach(request -> 
request.future.completeExceptionally(fatalError.get()));

Review Comment:
   > don't we need to remove the pending requests that we're failing here? 
`pendingRequests` are usually cleared on poll [clearAll]
   
   Isn't that what `clearAll()` on line 207 does?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java:
##########
@@ -263,6 +264,11 @@ public void resetPollTimer(final long pollMs) {
         pollTimer.reset(maxPollIntervalMs);
     }
 
+    private void maybePropagateCoordinatorFatalErrorEvent() {
+        coordinatorRequestManager.getAndClearFatalError()
+                .ifPresent(fatalError -> backgroundEventHandler.add(new 
ErrorEvent(fatalError)));
+    }
+

Review Comment:
   My apologies—it's still not clear from looking at the code why the 
`AbstractHeartbeatRequestManager` clears the fatal error but 
`CommitRequestManager` doesn't. Can you provide some context for the different 
ways of interacting with the error? Thanks!



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -196,6 +198,16 @@ public NetworkClientDelegate.PollResult poll(final long 
currentTimeMs) {
         return new NetworkClientDelegate.PollResult(timeUntilNextPoll, 
requests);
     }
 
+    private void maybeFailPendingRequestsOnCoordinatorFatalError() {

Review Comment:
   What about moving the entire 
`maybeFailPendingRequestsOnCoordinatorFatalError()` method into the inner 
`PendingRequests` class directly? That way the outer class doesn't have to 
worry about the details of how to fail them, only that it has to call 
`pendingRequests.maybeFailOnCoordinatorFatalError()`?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java:
##########
@@ -188,12 +184,12 @@ private void onFailedResponse(final long currentTimeMs, 
final Throwable exceptio
         if (exception == Errors.GROUP_AUTHORIZATION_FAILED.exception()) {
             log.debug("FindCoordinator request failed due to authorization 
error {}", exception.getMessage());
             KafkaException groupAuthorizationException = 
GroupAuthorizationException.forGroupId(this.groupId);
-            backgroundEventHandler.add(new 
ErrorEvent(groupAuthorizationException));
+            fatalError = Optional.of(groupAuthorizationException);
             return;
         }
 
         log.warn("FindCoordinator request failed due to fatal exception", 
exception);
-        backgroundEventHandler.add(new ErrorEvent(exception));
+        fatalError = Optional.of(exception);

Review Comment:
   Clearing the fatal error on a successful response from the coordinator seems 
_intuitively_ correct, but I could easily be wrong.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java:
##########
@@ -53,24 +51,27 @@
 public class CoordinatorRequestManager implements RequestManager {
     private static final long COORDINATOR_DISCONNECT_LOGGING_INTERVAL_MS = 60 
* 1000;
     private final Logger log;
-    private final BackgroundEventHandler backgroundEventHandler;
     private final String groupId;
 
     private final RequestState coordinatorRequestState;
     private long timeMarkedUnknownMs = -1L; // starting logging a warning only 
after unable to connect for a while
     private long totalDisconnectedMin = 0;
     private Node coordinator;
+    // Hold the last fatal error received. It is exposed so that managers 
requiring a coordinator can access it and take 

Review Comment:
   Super nit picky, but can we change the term _last_ to either _latest_ or 
_most recent_?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to