dajac commented on a change in pull request #11021:
URL: https://github.com/apache/kafka/pull/11021#discussion_r669379080
##########
File path:
clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandler.java
##########
@@ -86,47 +87,54 @@ public String apiName() {
Set<CoordinatorKey> groupIds,
AbstractResponse abstractResponse
) {
- DeleteGroupsResponse response = (DeleteGroupsResponse)
abstractResponse;
- Map<CoordinatorKey, Void> completed = new HashMap<>();
- Map<CoordinatorKey, Throwable> failed = new HashMap<>();
- List<CoordinatorKey> unmapped = new ArrayList<>();
+ final DeleteGroupsResponse response = (DeleteGroupsResponse)
abstractResponse;
+ final Map<CoordinatorKey, Void> completed = new HashMap<>();
+ final Map<CoordinatorKey, Throwable> failed = new HashMap<>();
+ final Set<CoordinatorKey> groupsToUnmap = new HashSet<>();
for (DeletableGroupResult deletedGroup : response.data().results()) {
CoordinatorKey groupIdKey =
CoordinatorKey.byGroupId(deletedGroup.groupId());
Errors error = Errors.forCode(deletedGroup.errorCode());
if (error != Errors.NONE) {
- handleError(groupIdKey, error, failed, unmapped);
+ handleError(groupIdKey, error, failed, groupsToUnmap);
continue;
}
completed.put(groupIdKey, null);
}
- return new ApiResult<>(completed, failed, unmapped);
+
+ return new ApiResult<>(completed, failed, new
ArrayList<>(groupsToUnmap));
}
private void handleError(
CoordinatorKey groupId,
Errors error,
Map<CoordinatorKey, Throwable> failed,
- List<CoordinatorKey> unmapped
+ Set<CoordinatorKey> groupsToUnmap
) {
switch (error) {
case GROUP_AUTHORIZATION_FAILED:
- log.error("Received authorization failure for group {} in
`DeleteConsumerGroups` response", groupId,
- error.exception());
+ case INVALID_GROUP_ID:
+ case NON_EMPTY_GROUP:
+ case GROUP_ID_NOT_FOUND:
+ log.debug("`DeleteConsumerGroups` request for group id {}
failed due to error {}", groupId, error);
failed.put(groupId, error.exception());
break;
case COORDINATOR_LOAD_IN_PROGRESS:
- case COORDINATOR_NOT_AVAILABLE:
+ // If the coordinator is in the middle of loading, then we
just need to retry
+ log.debug("`DeleteConsumerGroups` request for group {} failed
because the coordinator " +
+ "is still in the process of loading state. Will retry",
groupId);
break;
+ case COORDINATOR_NOT_AVAILABLE:
case NOT_COORDINATOR:
- log.debug("DeleteConsumerGroups request for group {} returned
error {}. Will retry",
- groupId, error);
- unmapped.add(groupId);
+ // If the coordinator is unavailable or there was a
coordinator change, then we unmap
+ // the key so that we retry the `FindCoordinator` request
+ log.debug("`DeleteConsumerGroups` request for group {}
returned error {}. " +
Review comment:
nit: `group` -> `group id`?
##########
File path:
clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandler.java
##########
@@ -86,47 +87,54 @@ public String apiName() {
Set<CoordinatorKey> groupIds,
AbstractResponse abstractResponse
) {
- DeleteGroupsResponse response = (DeleteGroupsResponse)
abstractResponse;
- Map<CoordinatorKey, Void> completed = new HashMap<>();
- Map<CoordinatorKey, Throwable> failed = new HashMap<>();
- List<CoordinatorKey> unmapped = new ArrayList<>();
+ final DeleteGroupsResponse response = (DeleteGroupsResponse)
abstractResponse;
+ final Map<CoordinatorKey, Void> completed = new HashMap<>();
+ final Map<CoordinatorKey, Throwable> failed = new HashMap<>();
+ final Set<CoordinatorKey> groupsToUnmap = new HashSet<>();
for (DeletableGroupResult deletedGroup : response.data().results()) {
CoordinatorKey groupIdKey =
CoordinatorKey.byGroupId(deletedGroup.groupId());
Errors error = Errors.forCode(deletedGroup.errorCode());
if (error != Errors.NONE) {
- handleError(groupIdKey, error, failed, unmapped);
+ handleError(groupIdKey, error, failed, groupsToUnmap);
continue;
}
completed.put(groupIdKey, null);
}
- return new ApiResult<>(completed, failed, unmapped);
+
+ return new ApiResult<>(completed, failed, new
ArrayList<>(groupsToUnmap));
}
private void handleError(
CoordinatorKey groupId,
Errors error,
Map<CoordinatorKey, Throwable> failed,
- List<CoordinatorKey> unmapped
+ Set<CoordinatorKey> groupsToUnmap
) {
switch (error) {
case GROUP_AUTHORIZATION_FAILED:
- log.error("Received authorization failure for group {} in
`DeleteConsumerGroups` response", groupId,
- error.exception());
+ case INVALID_GROUP_ID:
+ case NON_EMPTY_GROUP:
+ case GROUP_ID_NOT_FOUND:
+ log.debug("`DeleteConsumerGroups` request for group id {}
failed due to error {}", groupId, error);
failed.put(groupId, error.exception());
break;
case COORDINATOR_LOAD_IN_PROGRESS:
- case COORDINATOR_NOT_AVAILABLE:
+ // If the coordinator is in the middle of loading, then we
just need to retry
+ log.debug("`DeleteConsumerGroups` request for group {} failed
because the coordinator " +
Review comment:
nit: `group` -> `group id`?
##########
File path:
clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandler.java
##########
@@ -86,47 +87,54 @@ public String apiName() {
Set<CoordinatorKey> groupIds,
AbstractResponse abstractResponse
) {
- DeleteGroupsResponse response = (DeleteGroupsResponse)
abstractResponse;
- Map<CoordinatorKey, Void> completed = new HashMap<>();
- Map<CoordinatorKey, Throwable> failed = new HashMap<>();
- List<CoordinatorKey> unmapped = new ArrayList<>();
+ final DeleteGroupsResponse response = (DeleteGroupsResponse)
abstractResponse;
+ final Map<CoordinatorKey, Void> completed = new HashMap<>();
+ final Map<CoordinatorKey, Throwable> failed = new HashMap<>();
+ final Set<CoordinatorKey> groupsToUnmap = new HashSet<>();
for (DeletableGroupResult deletedGroup : response.data().results()) {
CoordinatorKey groupIdKey =
CoordinatorKey.byGroupId(deletedGroup.groupId());
Errors error = Errors.forCode(deletedGroup.errorCode());
if (error != Errors.NONE) {
- handleError(groupIdKey, error, failed, unmapped);
+ handleError(groupIdKey, error, failed, groupsToUnmap);
continue;
}
completed.put(groupIdKey, null);
}
- return new ApiResult<>(completed, failed, unmapped);
+
+ return new ApiResult<>(completed, failed, new
ArrayList<>(groupsToUnmap));
}
private void handleError(
CoordinatorKey groupId,
Errors error,
Map<CoordinatorKey, Throwable> failed,
- List<CoordinatorKey> unmapped
+ Set<CoordinatorKey> groupsToUnmap
) {
switch (error) {
case GROUP_AUTHORIZATION_FAILED:
- log.error("Received authorization failure for group {} in
`DeleteConsumerGroups` response", groupId,
- error.exception());
+ case INVALID_GROUP_ID:
+ case NON_EMPTY_GROUP:
+ case GROUP_ID_NOT_FOUND:
+ log.debug("`DeleteConsumerGroups` request for group id {}
failed due to error {}", groupId, error);
Review comment:
nit: We should use `groupId.idValue` here and in the others.
##########
File path:
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -3179,41 +3179,47 @@ public void testDeleteConsumerGroupsWithOlderBroker()
throws Exception {
env.kafkaClient().prepareResponse(
prepareOldFindCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED,
Node.noNode()));
- final DeleteConsumerGroupsResult errorResult =
env.adminClient().deleteConsumerGroups(groupIds);
+ DeleteConsumerGroupsResult errorResult =
env.adminClient().deleteConsumerGroups(groupIds);
TestUtils.assertFutureError(errorResult.deletedGroups().get("groupId"),
GroupAuthorizationException.class);
- //Retriable errors should be retried
+ // Retriable errors should be retried
env.kafkaClient().prepareResponse(
prepareOldFindCoordinatorResponse(Errors.NONE,
env.cluster().controller()));
- final DeletableGroupResultCollection errorResponse1 = new
DeletableGroupResultCollection();
- errorResponse1.add(new DeletableGroupResult()
- .setGroupId("groupId")
-
.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
- );
- env.kafkaClient().prepareResponse(new DeleteGroupsResponse(
- new DeleteGroupsResponseData()
- .setResults(errorResponse1)));
Review comment:
Why are we moving this to later?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]