lianetm commented on code in PR #18050:
URL: https://github.com/apache/kafka/pull/18050#discussion_r1907490954


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManagerTest.java:
##########
@@ -191,28 +187,15 @@ public void testBackoffAfterRetriableFailure() {
     }
 
     @Test
-    public void testPropagateAndBackoffAfterFatalError() {
+    public void testBackoffAfterFatalError() {
         CoordinatorRequestManager coordinatorManager = 
setupCoordinatorManager(GROUP_ID);
         expectFindCoordinatorRequest(coordinatorManager, 
Errors.GROUP_AUTHORIZATION_FAILED);
 
-        verify(backgroundEventHandler).add(argThat(backgroundEvent -> {
-            if (!(backgroundEvent instanceof ErrorEvent))
-                return false;
-
-            RuntimeException exception = ((ErrorEvent) 
backgroundEvent).error();
-
-            if (!(exception instanceof GroupAuthorizationException))
-                return false;
-
-            GroupAuthorizationException groupAuthException = 
(GroupAuthorizationException) exception;
-            return groupAuthException.groupId().equals(GROUP_ID);
-        }));
-
         time.sleep(RETRY_BACKOFF_MS - 1);
-        assertEquals(Collections.emptyList(), 
coordinatorManager.poll(time.milliseconds()).unsentRequests);

Review Comment:
   I would expect we should keep this line to check that no request is 
generated if the backoff hasn't expired. 



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManagerTest.java:
##########
@@ -191,28 +187,15 @@ public void testBackoffAfterRetriableFailure() {
     }
 
     @Test
-    public void testPropagateAndBackoffAfterFatalError() {
+    public void testBackoffAfterFatalError() {
         CoordinatorRequestManager coordinatorManager = 
setupCoordinatorManager(GROUP_ID);
         expectFindCoordinatorRequest(coordinatorManager, 
Errors.GROUP_AUTHORIZATION_FAILED);
 
-        verify(backgroundEventHandler).add(argThat(backgroundEvent -> {
-            if (!(backgroundEvent instanceof ErrorEvent))
-                return false;
-
-            RuntimeException exception = ((ErrorEvent) 
backgroundEvent).error();
-
-            if (!(exception instanceof GroupAuthorizationException))
-                return false;
-
-            GroupAuthorizationException groupAuthException = 
(GroupAuthorizationException) exception;
-            return groupAuthException.groupId().equals(GROUP_ID);
-        }));
-
         time.sleep(RETRY_BACKOFF_MS - 1);
-        assertEquals(Collections.emptyList(), 
coordinatorManager.poll(time.milliseconds()).unsentRequests);
 
         time.sleep(1);
         assertEquals(1, 
coordinatorManager.poll(time.milliseconds()).unsentRequests.size());
+        assertEquals(0, 
coordinatorManager.poll(time.milliseconds()).unsentRequests.size());

Review Comment:
   if we restore the line from the comment above seems we don't need to add 
this, right?  



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java:
##########
@@ -244,4 +246,12 @@ private void onResponse(
     public Optional<Node> coordinator() {
         return Optional.ofNullable(this.coordinator);
     }
+    
+    public void clearFatalError() {

Review Comment:
   should this be private? we only call it from this same component now



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java:
##########
@@ -186,6 +187,7 @@ private void onSuccessfulResponse(
                 coordinator.port());
         log.info("Discovered group coordinator {}", coordinator);
         coordinatorRequestState.onSuccessfulAttempt(currentTimeMs);
+        clearFatalError();

Review Comment:
   thinking more about this call here, wonder if it may not be enough and 
should be higher up in the call stack of the response. Ex. in the case of fatal 
error and then a retriable error, we would not clear the fatal error right? If 
we want to ensure we always clear/reset the fatal error, maybe we should move 
it to the first thing to do when we receive a response
   
https://github.com/apache/kafka/blob/624dd458099fa93b3fa1e1715b58bbc6d8689857/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java#L116
   
   From there on, it will be set again if there is again a fatal error, or left 
clear if there is not (successful response, non-fatal error). Makes sense? 
   
   Also, could we add a unit test to cover this, checking how the 
coordinatorReqMgr clears/reset the fatal error when receiving a successful 
response, retriable error, or different fatal error? 



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##########
@@ -1498,6 +1498,23 @@ public void testSignalClose() {
         OffsetCommitRequestData data = (OffsetCommitRequestData) 
res.unsentRequests.get(0).requestBuilder().build().data();
         assertEquals("topic", data.topics().get(0).name());
     }
+    
+    @Test
+    public void testPollWithFatalErrorShouldFailingAllUnsentRequest() {

Review Comment:
   ```suggestion
       public void testPollWithFatalErrorShouldFailAllUnsentRequests() {
   ```



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -1246,6 +1248,16 @@ private List<NetworkClientDelegate.UnsentRequest> 
drainPendingCommits() {
             clearAll();
             return res;
         }
+
+        private void maybeFailCoordinatorFatalError() {
+            Optional<Throwable> fatalError = 
coordinatorRequestManager.fatalError();
+            if (fatalError.isPresent()) {

Review Comment:
   ```suggestion
               coordinatorRequestManager.fatalError().ifPresent(throwable -> {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to