[
https://issues.apache.org/jira/browse/KAFKA-7369?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16599455#comment-16599455
]
ASF GitHub Bot commented on KAFKA-7369:
---------------------------------------
hachikuji closed pull request #5595: KAFKA-7369; Handle retriable errors in
AdminClient list groups API
URL: https://github.com/apache/kafka/pull/5595
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 8ddb0c08627..904cd0601e5 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -2567,8 +2567,11 @@ private void
maybeAddConsumerGroup(ListGroupsResponse.Group group) {
void handleResponse(AbstractResponse abstractResponse)
{
final ListGroupsResponse response =
(ListGroupsResponse) abstractResponse;
synchronized (results) {
- if (response.error() != Errors.NONE) {
-
results.addError(response.error().exception(), node);
+ Errors error = response.error();
+ if (error ==
Errors.COORDINATOR_LOAD_IN_PROGRESS || error ==
Errors.COORDINATOR_NOT_AVAILABLE) {
+ throw error.exception();
+ } else if (error != Errors.NONE) {
+ results.addError(error.exception(), node);
} else {
for (ListGroupsResponse.Group group :
response.groups()) {
maybeAddConsumerGroup(group);
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
index b108803590f..af6f7212e8c 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
@@ -62,6 +62,7 @@
/**
* Possible error codes:
*
+ * COORDINATOR_LOADING_IN_PROGRESS (14)
* COORDINATOR_NOT_AVAILABLE (15)
* AUTHORIZATION_FAILED (29)
*/
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 0245cbd3695..c0dc542b159 100644
---
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -36,7 +36,6 @@
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.AuthenticationException;
-import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.LeaderNotAvailableException;
@@ -46,6 +45,7 @@
import org.apache.kafka.common.errors.SecurityDisabledException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicDeletionDisabledException;
+import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ApiError;
@@ -863,9 +863,11 @@ public void testListConsumerGroups() throws Exception {
Node node0 = new Node(0, "localhost", 8121);
Node node1 = new Node(1, "localhost", 8122);
Node node2 = new Node(2, "localhost", 8123);
+ Node node3 = new Node(3, "localhost", 8124);
nodes.put(0, node0);
nodes.put(1, node1);
nodes.put(2, node2);
+ nodes.put(3, node3);
final Cluster cluster = new Cluster(
"mockClusterId",
@@ -902,13 +904,19 @@ public void testListConsumerGroups() throws Exception {
)),
node0);
+ // handle retriable errors
env.kafkaClient().prepareResponseFrom(
new ListGroupsResponse(
Errors.COORDINATOR_NOT_AVAILABLE,
Collections.emptyList()
),
node1);
-
+ env.kafkaClient().prepareResponseFrom(
+ new ListGroupsResponse(
+ Errors.COORDINATOR_LOAD_IN_PROGRESS,
+ Collections.emptyList()
+ ),
+ node1);
env.kafkaClient().prepareResponseFrom(
new ListGroupsResponse(
Errors.NONE,
@@ -916,15 +924,37 @@ public void testListConsumerGroups() throws Exception {
new ListGroupsResponse.Group("group-2",
ConsumerProtocol.PROTOCOL_TYPE),
new
ListGroupsResponse.Group("group-connect-2", "connector")
)),
+ node1);
+
+ env.kafkaClient().prepareResponseFrom(
+ new ListGroupsResponse(
+ Errors.NONE,
+ asList(
+ new ListGroupsResponse.Group("group-3",
ConsumerProtocol.PROTOCOL_TYPE),
+ new
ListGroupsResponse.Group("group-connect-3", "connector")
+ )),
node2);
+ // fatal error
+ env.kafkaClient().prepareResponseFrom(
+ new ListGroupsResponse(
+ Errors.UNKNOWN_SERVER_ERROR,
+ Collections.emptyList()),
+ node3);
+
+
final ListConsumerGroupsResult result =
env.adminClient().listConsumerGroups();
- TestUtils.assertFutureError(result.all(),
CoordinatorNotAvailableException.class);
+ TestUtils.assertFutureError(result.all(),
UnknownServerException.class);
+
Collection<ConsumerGroupListing> listings = result.valid().get();
- assertEquals(2, listings.size());
+ assertEquals(3, listings.size());
+
+ Set<String> groupIds = new HashSet<>();
for (ConsumerGroupListing listing : listings) {
- assertTrue(listing.groupId().equals("group-1") ||
listing.groupId().equals("group-2"));
+ groupIds.add(listing.groupId());
}
+
+ assertEquals(Utils.mkSet("group-1", "group-2", "group-3"),
groupIds);
assertEquals(1, result.errors().get().size());
}
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Retry when possible in AdminClient.listConsumerGroups
> -----------------------------------------------------
>
> Key: KAFKA-7369
> URL: https://issues.apache.org/jira/browse/KAFKA-7369
> Project: Kafka
> Issue Type: Bug
> Reporter: Jason Gustafson
> Assignee: Jason Gustafson
> Priority: Major
>
> Currently we do not retry ListGroups requests when they fail due to retriable
> errors. For example, this is causing some instability in
> `kafka.admin.ListConsumerGroupTest.testListConsumerGroups`.
> {code}
> java.util.concurrent.ExecutionException:
> org.apache.kafka.common.errors.CoordinatorLoadInProgressException: Error
> listing groups on localhost:43001 (id: 0 rack: null)
> at
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
> at
> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
> at
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
> at
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:262)
> at
> kafka.admin.ConsumerGroupCommand$ConsumerGroupService.listGroups(ConsumerGroupCommand.scala:132)
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)