lianetm commented on code in PR #18050: URL: https://github.com/apache/kafka/pull/18050#discussion_r1907490954
########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManagerTest.java: ########## @@ -191,28 +187,15 @@ public void testBackoffAfterRetriableFailure() { } @Test - public void testPropagateAndBackoffAfterFatalError() { + public void testBackoffAfterFatalError() { CoordinatorRequestManager coordinatorManager = setupCoordinatorManager(GROUP_ID); expectFindCoordinatorRequest(coordinatorManager, Errors.GROUP_AUTHORIZATION_FAILED); - verify(backgroundEventHandler).add(argThat(backgroundEvent -> { - if (!(backgroundEvent instanceof ErrorEvent)) - return false; - - RuntimeException exception = ((ErrorEvent) backgroundEvent).error(); - - if (!(exception instanceof GroupAuthorizationException)) - return false; - - GroupAuthorizationException groupAuthException = (GroupAuthorizationException) exception; - return groupAuthException.groupId().equals(GROUP_ID); - })); - time.sleep(RETRY_BACKOFF_MS - 1); - assertEquals(Collections.emptyList(), coordinatorManager.poll(time.milliseconds()).unsentRequests); Review Comment: I would expect we should keep this line to check that no request is generated if the backoff hasn't expired. ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManagerTest.java: ########## @@ -191,28 +187,15 @@ public void testBackoffAfterRetriableFailure() { } @Test - public void testPropagateAndBackoffAfterFatalError() { + public void testBackoffAfterFatalError() { CoordinatorRequestManager coordinatorManager = setupCoordinatorManager(GROUP_ID); expectFindCoordinatorRequest(coordinatorManager, Errors.GROUP_AUTHORIZATION_FAILED); - verify(backgroundEventHandler).add(argThat(backgroundEvent -> { - if (!(backgroundEvent instanceof ErrorEvent)) - return false; - - RuntimeException exception = ((ErrorEvent) backgroundEvent).error(); - - if (!(exception instanceof GroupAuthorizationException)) - return false; - - GroupAuthorizationException groupAuthException = (GroupAuthorizationException) exception; - return groupAuthException.groupId().equals(GROUP_ID); - })); - time.sleep(RETRY_BACKOFF_MS - 1); - assertEquals(Collections.emptyList(), coordinatorManager.poll(time.milliseconds()).unsentRequests); time.sleep(1); assertEquals(1, coordinatorManager.poll(time.milliseconds()).unsentRequests.size()); + assertEquals(0, coordinatorManager.poll(time.milliseconds()).unsentRequests.size()); Review Comment: if we restore the line from the comment above seems we don't need to add this, right? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java: ########## @@ -244,4 +246,12 @@ private void onResponse( public Optional<Node> coordinator() { return Optional.ofNullable(this.coordinator); } + + public void clearFatalError() { Review Comment: should this be private? we only call it from this same component now ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java: ########## @@ -186,6 +187,7 @@ private void onSuccessfulResponse( coordinator.port()); log.info("Discovered group coordinator {}", coordinator); coordinatorRequestState.onSuccessfulAttempt(currentTimeMs); + clearFatalError(); Review Comment: thinking more about this call here, wonder if it may not be enough and should be higher up in the call stack of the response. Ex. in the case of fatal error and then a retriable error, we would not clear the fatal error right? If we want to ensure we always clear/reset the fatal error, maybe we should move it to the first thing to do when we receive a response https://github.com/apache/kafka/blob/624dd458099fa93b3fa1e1715b58bbc6d8689857/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java#L116 From there on, it will be set again if there is again a fatal error, or left clear if there is not (successful response, non-fatal error). Makes sense? Also, could we add a unit test to cover this, checking how the coordinatorReqMgr clears/reset the fatal error when receiving a successful response, retriable error, or different fatal error? ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java: ########## @@ -1498,6 +1498,23 @@ public void testSignalClose() { OffsetCommitRequestData data = (OffsetCommitRequestData) res.unsentRequests.get(0).requestBuilder().build().data(); assertEquals("topic", data.topics().get(0).name()); } + + @Test + public void testPollWithFatalErrorShouldFailingAllUnsentRequest() { Review Comment: ```suggestion public void testPollWithFatalErrorShouldFailAllUnsentRequests() { ``` ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ########## @@ -1246,6 +1248,16 @@ private List<NetworkClientDelegate.UnsentRequest> drainPendingCommits() { clearAll(); return res; } + + private void maybeFailCoordinatorFatalError() { + Optional<Throwable> fatalError = coordinatorRequestManager.fatalError(); + if (fatalError.isPresent()) { Review Comment: ```suggestion coordinatorRequestManager.fatalError().ifPresent(throwable -> { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org