dajac commented on a change in pull request #10973:
URL: https://github.com/apache/kafka/pull/10973#discussion_r667347765



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java
##########
@@ -136,21 +142,28 @@ private void handleError(
     ) {
         switch (error) {
             case GROUP_AUTHORIZATION_FAILED:
-                log.error("Received authorization failure for group {} in 
`OffsetCommit` response", groupId,
-                        error.exception());
+                log.error("Received authorization failure for group {} in `{}` 
response", groupId,
+                        apiName(), error.exception());
                 failed.put(groupId, error.exception());
                 break;
             case COORDINATOR_LOAD_IN_PROGRESS:
+                // If the coordinator is in the middle of loading, then we 
just need to retry
+                log.debug("`{}` request for group {} failed because the 
coordinator" +
+                    " is still in the process of loading state. Will retry.", 
apiName(), groupId);
+                break;
             case COORDINATOR_NOT_AVAILABLE:
             case NOT_COORDINATOR:
-                log.debug("OffsetCommit request for group {} returned error 
{}. Will retry", groupId, error);
+                // If the coordinator is unavailable or there was a 
coordinator change, then we unmap
+                // the key so that we retry the `FindCoordinator` request
+                log.debug("`{}` request for group {} returned error {}. " +
+                    "Will attempt to find the coordinator again and retry.", 
apiName(), groupId, error);
                 unmapped.add(groupId);
                 break;
             default:
-                log.error("Received unexpected error for group {} in 
`OffsetCommit` response",
-                        groupId, error.exception());
-                failed.put(groupId, error.exception(
-                        "Received unexpected error for group " + groupId + " 
in `OffsetCommit` response"));
+                final String unexpectedErrorMsg = String.format("Received 
unexpected error for group %s in `%s` response",
+                    groupId, apiName());
+                log.error(unexpectedErrorMsg, error.exception());
+                failed.put(groupId, error.exception(unexpectedErrorMsg));

Review comment:
       @showuon I actually believe that we have regressed here. The admin api 
returns wrong results. I just tried with a small unit test:
   
   ```
       @Test
       public void testOffsetCommitErrors() throws Exception {
           final Cluster cluster = mockCluster(3, 0);
           final Time time = new MockTime();
   
           try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, 
cluster,
               AdminClientConfig.RETRIES_CONFIG, "0")) {
               env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
   
               final TopicPartition foo0 = new TopicPartition("foo", 0);
               final TopicPartition foo1 = new TopicPartition("foo", 1);
   
               env.kafkaClient().prepareResponse(
                   prepareFindCoordinatorResponse(Errors.NONE, 
env.cluster().controller()));
   
               Map<TopicPartition, Errors> responseData = new HashMap<>();
               responseData.put(foo0, Errors.NONE);
               responseData.put(foo1, Errors.UNKNOWN_TOPIC_OR_PARTITION);
               env.kafkaClient().prepareResponse(new OffsetCommitResponse(0, 
responseData));
   
               Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
               offsets.put(foo0, new OffsetAndMetadata(123L));
               offsets.put(foo1, new OffsetAndMetadata(456L));
               final AlterConsumerGroupOffsetsResult result = env.adminClient()
                   .alterConsumerGroupOffsets("group", offsets);
   
               assertNull(result.partitionResult(foo0).get());
               TestUtils.assertFutureError(result.partitionResult(foo1), 
UnknownTopicOrPartitionException.class);
   
               TestUtils.assertFutureError(result.all(), 
UnknownTopicOrPartitionException.class);
           }
       }
   ```
   
   It works with 2.8 but fails with trunk. In trunk, `foo0` is also failed with 
`UnknownTopicOrPartitionException`. This is because we don't handle the 
partition level correctly. Could you double check if I am correct here? If you 
confirm, I will raise a blocker for 3.0.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to