skaundinya15 commented on a change in pull request #10743:
URL: https://github.com/apache/kafka/pull/10743#discussion_r655133582



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import 
org.apache.kafka.common.message.OffsetCommitRequestData.OffsetCommitRequestPartition;
+import 
org.apache.kafka.common.message.OffsetCommitRequestData.OffsetCommitRequestTopic;
+import 
org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponsePartition;
+import 
org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponseTopic;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.common.requests.OffsetCommitResponse;
+import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+public class AlterConsumerGroupOffsetsHandler implements 
AdminApiHandler<CoordinatorKey, Map<TopicPartition, Errors>> {
+
+    private final CoordinatorKey groupId;
+    private final Map<TopicPartition, OffsetAndMetadata> offsets;
+    private final Logger log;
+    private final AdminApiLookupStrategy<CoordinatorKey> lookupStrategy;
+
+    public AlterConsumerGroupOffsetsHandler(
+        String groupId,
+        Map<TopicPartition, OffsetAndMetadata> offsets,
+        LogContext logContext
+    ) {
+        this.groupId = CoordinatorKey.byGroupId(groupId);
+        this.offsets = offsets;
+        this.log = logContext.logger(AlterConsumerGroupOffsetsHandler.class);
+        this.lookupStrategy = new CoordinatorStrategy(CoordinatorType.GROUP, 
logContext);
+    }
+
+    @Override
+    public String apiName() {
+        return "offsetCommit";
+    }
+
+    @Override
+    public AdminApiLookupStrategy<CoordinatorKey> lookupStrategy() {
+        return lookupStrategy;
+    }
+
+    public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey, 
Map<TopicPartition, Errors>> newFuture(
+            String groupId
+    ) {
+        return 
AdminApiFuture.forKeys(Collections.singleton(CoordinatorKey.byGroupId(groupId)));
+    }
+
+    @Override
+    public OffsetCommitRequest.Builder buildRequest(int brokerId, 
Set<CoordinatorKey> keys) {
+        List<OffsetCommitRequestTopic> topics = new ArrayList<>();
+        Map<String, List<OffsetCommitRequestPartition>> offsetData = new 
HashMap<>();
+        for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : 
offsets.entrySet()) {
+            String topic = entry.getKey().topic();
+            OffsetAndMetadata oam = entry.getValue();
+            offsetData.compute(topic, (key, value) -> {
+                if (value == null) {
+                    value = new ArrayList<>();
+                }
+                OffsetCommitRequestPartition partition = new 
OffsetCommitRequestPartition()
+                        .setCommittedOffset(oam.offset())
+                        .setCommittedLeaderEpoch(oam.leaderEpoch().orElse(-1))
+                        .setCommittedMetadata(oam.metadata())
+                        .setPartitionIndex(entry.getKey().partition());
+                value.add(partition);
+                return value;
+            });
+        }
+        for (Map.Entry<String, List<OffsetCommitRequestPartition>> entry : 
offsetData.entrySet()) {
+            OffsetCommitRequestTopic topic = new OffsetCommitRequestTopic()
+                    .setName(entry.getKey())
+                    .setPartitions(entry.getValue());
+            topics.add(topic);
+        }
+        OffsetCommitRequestData data = new OffsetCommitRequestData()
+            .setGroupId(groupId.idValue)
+            .setTopics(topics);
+        return new OffsetCommitRequest.Builder(data);
+    }
+
+    @Override
+    public ApiResult<CoordinatorKey, Map<TopicPartition, Errors>> 
handleResponse(Node broker, Set<CoordinatorKey> groupIds,
+            AbstractResponse abstractResponse) {
+
+        final OffsetCommitResponse response = (OffsetCommitResponse) 
abstractResponse;
+        Map<CoordinatorKey, Map<TopicPartition, Errors>> completed = new 
HashMap<>();
+        Map<CoordinatorKey, Throwable> failed = new HashMap<>();
+        List<CoordinatorKey> unmapped = new ArrayList<>();
+
+        Map<TopicPartition, Errors> partitions = new HashMap<>();
+        for (OffsetCommitResponseTopic topic : response.data().topics()) {
+            for (OffsetCommitResponsePartition partition : topic.partitions()) 
{
+                TopicPartition tp = new TopicPartition(topic.name(), 
partition.partitionIndex());
+                Errors error = Errors.forCode(partition.errorCode());
+                if (error != Errors.NONE) {
+                    handleError(groupId, error, failed, unmapped);
+                } else {
+                    partitions.put(tp, error);
+                }
+            }
+        }
+        if (failed.isEmpty() && unmapped.isEmpty())
+            completed.put(groupId, partitions);
+
+        return new ApiResult<>(completed, failed, unmapped);
+    }
+
+    private void handleError(CoordinatorKey groupId, Errors error, 
Map<CoordinatorKey, Throwable> failed, List<CoordinatorKey> unmapped) {
+        switch (error) {
+            case GROUP_AUTHORIZATION_FAILED:
+                log.error("Received authorization failure for group {} in 
`DeleteConsumerGroupOffsets` response", groupId,
+                        error.exception());
+                failed.put(groupId, error.exception());
+                break;
+            case COORDINATOR_LOAD_IN_PROGRESS:
+            case COORDINATOR_NOT_AVAILABLE:
+            case NOT_COORDINATOR:
+                log.debug("DeleteConsumerGroupOffsets request for group {} 
returned error {}. Will retry", groupId, error);
+                unmapped.add(groupId);
+                break;
+            default:
+                log.error("Received unexpected error for group {} in 
`DeleteConsumerGroupOffsets` response", 
+                        groupId, error.exception());
+                failed.put(groupId, error.exception(
+                        "Unexpected error during DeleteConsumerGroupOffsets 
lookup for " + groupId));

Review comment:
       @mimaison makes sense, it would be good to file a JIRA for this so we 
can address it in a future PR to ensure we have consistent error handling 
across all consumer group related issues.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/CoordinatorStrategy.java
##########
@@ -17,84 +17,153 @@
 package org.apache.kafka.clients.admin.internals;
 
 import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.InvalidGroupIdException;
 import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
 import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData.Coordinator;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.AbstractResponse;
 import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
 import org.apache.kafka.common.requests.FindCoordinatorResponse;
 import org.apache.kafka.common.utils.LogContext;
 import org.slf4j.Logger;
 
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 public class CoordinatorStrategy implements 
AdminApiLookupStrategy<CoordinatorKey> {
+
+    private static final ApiRequestScope GROUP_REQUEST_SCOPE = new 
ApiRequestScope() { };
+    private static final ApiRequestScope TXN_REQUEST_SCOPE = new 
ApiRequestScope() { };
+
     private final Logger log;
+    private final FindCoordinatorRequest.CoordinatorType type;
+    private boolean batch = true;
+    private Set<CoordinatorKey> unrepresentableKeys = Collections.emptySet();
 
     public CoordinatorStrategy(
+        FindCoordinatorRequest.CoordinatorType type,
         LogContext logContext
     ) {
+        this.type = type;
         this.log = logContext.logger(CoordinatorStrategy.class);
     }
 
     @Override
     public ApiRequestScope lookupScope(CoordinatorKey key) {
-        // The `FindCoordinator` API does not support batched lookups, so we 
use a
-        // separate lookup context for each coordinator key we need to lookup
-        return new LookupRequestScope(key);
+        if (batch) {
+            if (type == CoordinatorType.GROUP) {
+                return GROUP_REQUEST_SCOPE;
+            } else {
+                return TXN_REQUEST_SCOPE;
+            }
+        } else {
+            // If the `FindCoordinator` API does not support batched lookups, 
we use a
+            // separate lookup context for each coordinator key we need to 
lookup
+            return new LookupRequestScope(key);
+        }
     }
 
     @Override
     public FindCoordinatorRequest.Builder buildRequest(Set<CoordinatorKey> 
keys) {
-        CoordinatorKey key = requireSingleton(keys);
-        return new FindCoordinatorRequest.Builder(
-            new FindCoordinatorRequestData()
-                .setKey(key.idValue)
-                .setKeyType(key.type.id())
-        );
+        unrepresentableKeys = keys.stream().filter(k -> 
!isRepresentableKey(k.idValue)).collect(Collectors.toSet());
+        keys = keys.stream().filter(k -> 
isRepresentableKey(k.idValue)).collect(Collectors.toSet());
+        if (batch) {
+            keys = requireSameType(keys);
+            FindCoordinatorRequestData data = new FindCoordinatorRequestData()
+                    .setKeyType(type.id())
+                    .setCoordinatorKeys(keys.stream().map(k -> 
k.idValue).collect(Collectors.toList()));
+            return new FindCoordinatorRequest.Builder(data);
+        } else {
+            CoordinatorKey key = requireSingleton(keys);
+            return new FindCoordinatorRequest.Builder(
+                new FindCoordinatorRequestData()
+                    .setKey(key.idValue)
+                    .setKeyType(key.type.id())
+            );
+        }
     }
 
     @Override
     public LookupResult<CoordinatorKey> handleResponse(
         Set<CoordinatorKey> keys,
         AbstractResponse abstractResponse
     ) {
-        CoordinatorKey key = requireSingleton(keys);
+        Map<CoordinatorKey, Integer> mappedKeys = new HashMap<>();
+        Map<CoordinatorKey, Throwable> failedKeys = new HashMap<>();
+
+        for (CoordinatorKey key : unrepresentableKeys) {
+            failedKeys.put(key, new InvalidGroupIdException("The given group 
id '" +
+                            key.idValue + "' cannot be represented in a 
request."));
+        }
         FindCoordinatorResponse response = (FindCoordinatorResponse) 
abstractResponse;
-        Errors error = response.error();
+        if (batch) {
+            for (Coordinator coordinator : response.data().coordinators()) {
+                handleError(Errors.forCode(coordinator.errorCode()),
+                            new CoordinatorKey(type, coordinator.key()),
+                            coordinator.nodeId(),
+                            mappedKeys,
+                            failedKeys);
+            }
+        } else {
+            CoordinatorKey key = requireSingleton(keys);
+            Errors error = response.error();
+            handleError(error, key, response.node().id(), mappedKeys, 
failedKeys);
+        }
+        return new LookupResult<>(failedKeys, mappedKeys);
+    }
+
+    public void disableBatch() {
+        batch = false;
+    }
+
+    private static CoordinatorKey requireSingleton(Set<CoordinatorKey> keys) {
+        if (keys.size() != 1) {
+            throw new IllegalArgumentException("Unexpected lookup key set");
+        }
+        return keys.iterator().next();
+    }
 
+    private static Set<CoordinatorKey> requireSameType(Set<CoordinatorKey> 
keys) {
+        if (keys.stream().map(k -> k.type).collect(Collectors.toSet()).size() 
!= 1) {
+            throw new IllegalArgumentException("Unexpected lookup key set");
+        }
+        return keys;
+    }
+
+    private static boolean isRepresentableKey(String groupId) {
+        return groupId != null;
+    }
+
+    private void handleError(Errors error, CoordinatorKey key, int nodeId, 
Map<CoordinatorKey, Integer> mappedKeys, Map<CoordinatorKey, Throwable> 
failedKeys) {
         switch (error) {
             case NONE:
-                return LookupResult.mapped(key, response.data().nodeId());
-
+                mappedKeys.put(key, nodeId);
+                break;
             case COORDINATOR_NOT_AVAILABLE:

Review comment:
       @mimaison Gotcha, makes sense. Perhaps we can still log at `DEBUG` level 
so in case we need to debug, we have evidence of this in the logs.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandler.java
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.message.DeleteGroupsRequestData;
+import 
org.apache.kafka.common.message.DeleteGroupsResponseData.DeletableGroupResult;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.DeleteGroupsRequest;
+import org.apache.kafka.common.requests.DeleteGroupsResponse;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+public class DeleteConsumerGroupsHandler implements 
AdminApiHandler<CoordinatorKey, Void> {
+
+    private final Logger log;
+    private final AdminApiLookupStrategy<CoordinatorKey> lookupStrategy;
+
+    public DeleteConsumerGroupsHandler(
+        Set<String> groupIds,
+        LogContext logContext
+    ) {
+        this.log = logContext.logger(DeleteConsumerGroupsHandler.class);
+        this.lookupStrategy = new CoordinatorStrategy(CoordinatorType.GROUP, 
logContext);
+    }
+
+    @Override
+    public String apiName() {
+        return "deleteConsumerGroups";
+    }
+
+    @Override
+    public AdminApiLookupStrategy<CoordinatorKey> lookupStrategy() {
+        return lookupStrategy;
+    }
+
+    public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey, Void> 
newFuture(
+            Collection<String> groupIds
+    ) {
+        return AdminApiFuture.forKeys(buildKeySet(groupIds));
+    }
+
+    private static Set<CoordinatorKey> buildKeySet(Collection<String> 
groupIds) {
+        return groupIds.stream()
+            .map(CoordinatorKey::byGroupId)
+            .collect(Collectors.toSet());
+    }
+
+    @Override
+    public DeleteGroupsRequest.Builder buildRequest(int brokerId, 
Set<CoordinatorKey> keys) {
+        List<String> groupIds = keys.stream().map(key -> {
+            if (key.type != FindCoordinatorRequest.CoordinatorType.GROUP) {
+                throw new IllegalArgumentException("Invalid transaction 
coordinator key " + key +
+                    " when building `DeleteGroups` request");

Review comment:
       @mimaison Sounds good, thanks!

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import 
org.apache.kafka.common.message.OffsetCommitRequestData.OffsetCommitRequestPartition;
+import 
org.apache.kafka.common.message.OffsetCommitRequestData.OffsetCommitRequestTopic;
+import 
org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponsePartition;
+import 
org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponseTopic;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.common.requests.OffsetCommitResponse;
+import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+public class AlterConsumerGroupOffsetsHandler implements 
AdminApiHandler<CoordinatorKey, Map<TopicPartition, Errors>> {

Review comment:
       @tombentley I think the `offsetCommitRequest` only takes in one group at 
a time, at least that's what it seems like from the `Admin.java` interface: 
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java#L1145

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/CoordinatorStrategy.java
##########
@@ -17,84 +17,153 @@
 package org.apache.kafka.clients.admin.internals;
 
 import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.InvalidGroupIdException;
 import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
 import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData.Coordinator;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.AbstractResponse;
 import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
 import org.apache.kafka.common.requests.FindCoordinatorResponse;
 import org.apache.kafka.common.utils.LogContext;
 import org.slf4j.Logger;
 
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 public class CoordinatorStrategy implements 
AdminApiLookupStrategy<CoordinatorKey> {
+
+    private static final ApiRequestScope GROUP_REQUEST_SCOPE = new 
ApiRequestScope() { };
+    private static final ApiRequestScope TXN_REQUEST_SCOPE = new 
ApiRequestScope() { };
+
     private final Logger log;
+    private final FindCoordinatorRequest.CoordinatorType type;
+    private boolean batch = true;
+    private Set<CoordinatorKey> unrepresentableKeys = Collections.emptySet();
 
     public CoordinatorStrategy(
+        FindCoordinatorRequest.CoordinatorType type,
         LogContext logContext
     ) {
+        this.type = type;
         this.log = logContext.logger(CoordinatorStrategy.class);
     }
 
     @Override
     public ApiRequestScope lookupScope(CoordinatorKey key) {
-        // The `FindCoordinator` API does not support batched lookups, so we 
use a
-        // separate lookup context for each coordinator key we need to lookup
-        return new LookupRequestScope(key);
+        if (batch) {
+            if (type == CoordinatorType.GROUP) {
+                return GROUP_REQUEST_SCOPE;
+            } else {
+                return TXN_REQUEST_SCOPE;
+            }
+        } else {
+            // If the `FindCoordinator` API does not support batched lookups, 
we use a
+            // separate lookup context for each coordinator key we need to 
lookup
+            return new LookupRequestScope(key);
+        }
     }
 
     @Override
     public FindCoordinatorRequest.Builder buildRequest(Set<CoordinatorKey> 
keys) {
-        CoordinatorKey key = requireSingleton(keys);
-        return new FindCoordinatorRequest.Builder(
-            new FindCoordinatorRequestData()
-                .setKey(key.idValue)
-                .setKeyType(key.type.id())
-        );
+        unrepresentableKeys = keys.stream().filter(k -> 
!isRepresentableKey(k.idValue)).collect(Collectors.toSet());
+        keys = keys.stream().filter(k -> 
isRepresentableKey(k.idValue)).collect(Collectors.toSet());
+        if (batch) {
+            keys = requireSameType(keys);
+            FindCoordinatorRequestData data = new FindCoordinatorRequestData()
+                    .setKeyType(type.id())
+                    .setCoordinatorKeys(keys.stream().map(k -> 
k.idValue).collect(Collectors.toList()));
+            return new FindCoordinatorRequest.Builder(data);
+        } else {
+            CoordinatorKey key = requireSingleton(keys);
+            return new FindCoordinatorRequest.Builder(
+                new FindCoordinatorRequestData()
+                    .setKey(key.idValue)
+                    .setKeyType(key.type.id())
+            );
+        }
     }
 
     @Override
     public LookupResult<CoordinatorKey> handleResponse(
         Set<CoordinatorKey> keys,
         AbstractResponse abstractResponse
     ) {
-        CoordinatorKey key = requireSingleton(keys);
+        Map<CoordinatorKey, Integer> mappedKeys = new HashMap<>();
+        Map<CoordinatorKey, Throwable> failedKeys = new HashMap<>();
+
+        for (CoordinatorKey key : unrepresentableKeys) {
+            failedKeys.put(key, new InvalidGroupIdException("The given group 
id '" +
+                            key.idValue + "' cannot be represented in a 
request."));
+        }
         FindCoordinatorResponse response = (FindCoordinatorResponse) 
abstractResponse;
-        Errors error = response.error();
+        if (batch) {
+            for (Coordinator coordinator : response.data().coordinators()) {
+                handleError(Errors.forCode(coordinator.errorCode()),
+                            new CoordinatorKey(type, coordinator.key()),
+                            coordinator.nodeId(),
+                            mappedKeys,
+                            failedKeys);
+            }
+        } else {
+            CoordinatorKey key = requireSingleton(keys);
+            Errors error = response.error();
+            handleError(error, key, response.node().id(), mappedKeys, 
failedKeys);
+        }
+        return new LookupResult<>(failedKeys, mappedKeys);
+    }
+
+    public void disableBatch() {
+        batch = false;
+    }
+
+    private static CoordinatorKey requireSingleton(Set<CoordinatorKey> keys) {
+        if (keys.size() != 1) {
+            throw new IllegalArgumentException("Unexpected lookup key set");
+        }
+        return keys.iterator().next();
+    }
 
+    private static Set<CoordinatorKey> requireSameType(Set<CoordinatorKey> 
keys) {
+        if (keys.stream().map(k -> k.type).collect(Collectors.toSet()).size() 
!= 1) {
+            throw new IllegalArgumentException("Unexpected lookup key set");
+        }
+        return keys;
+    }
+
+    private static boolean isRepresentableKey(String groupId) {
+        return groupId != null;
+    }
+
+    private void handleError(Errors error, CoordinatorKey key, int nodeId, 
Map<CoordinatorKey, Integer> mappedKeys, Map<CoordinatorKey, Throwable> 
failedKeys) {
         switch (error) {
             case NONE:
-                return LookupResult.mapped(key, response.data().nodeId());
-
+                mappedKeys.put(key, nodeId);
+                break;
             case COORDINATOR_NOT_AVAILABLE:

Review comment:
       @mimaison Gotcha, makes sense. Can we still we can still log at `DEBUG` 
level? In case we need to debug, we have evidence of this in the logs.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.OffsetFetchRequest;
+import org.apache.kafka.common.requests.OffsetFetchResponse;
+import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+public class ListConsumerGroupOffsetsHandler implements 
AdminApiHandler<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> {
+
+    private final CoordinatorKey groupId;
+    private final List<TopicPartition> partitions;
+    private final Logger log;
+    private final AdminApiLookupStrategy<CoordinatorKey> lookupStrategy;
+
+    public ListConsumerGroupOffsetsHandler(
+        String groupId,
+        List<TopicPartition> partitions,
+        LogContext logContext
+    ) {
+        this.groupId = CoordinatorKey.byGroupId(groupId);
+        this.partitions = partitions;
+        this.log = logContext.logger(ListConsumerGroupOffsetsHandler.class);
+        this.lookupStrategy = new CoordinatorStrategy(CoordinatorType.GROUP, 
logContext);
+    }
+
+    public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey, 
Map<TopicPartition, OffsetAndMetadata>> newFuture(
+        String groupId
+    ) {
+        return 
AdminApiFuture.forKeys(Collections.singleton(CoordinatorKey.byGroupId(groupId)));
+    }
+
+    @Override
+    public String apiName() {
+        return "offsetFetch";
+    }
+
+    @Override
+    public AdminApiLookupStrategy<CoordinatorKey> lookupStrategy() {
+        return lookupStrategy;
+    }
+
+    @Override
+    public OffsetFetchRequest.Builder buildRequest(int coordinatorId, 
Set<CoordinatorKey> keys) {
+        // Set the flag to false as for admin client request,
+        // we don't need to wait for any pending offset state to clear.
+        return new OffsetFetchRequest.Builder(groupId.idValue, false, 
partitions, false);
+    }
+
+    @Override
+    public ApiResult<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> 
handleResponse(
+        Node coordinator,
+        Set<CoordinatorKey> groupIds,
+        AbstractResponse abstractResponse
+    ) {
+        final OffsetFetchResponse response = (OffsetFetchResponse) 
abstractResponse;
+        Map<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> completed 
= new HashMap<>();
+        Map<CoordinatorKey, Throwable> failed = new HashMap<>();
+        List<CoordinatorKey> unmapped = new ArrayList<>();
+
+        if (response.error() != Errors.NONE) {
+            handleError(groupId, response.error(), failed, unmapped);
+        } else {
+            final Map<TopicPartition, OffsetAndMetadata> groupOffsetsListing = 
new HashMap<>();
+            for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> 
entry :
+                response.responseData().entrySet()) {
+                final TopicPartition topicPartition = entry.getKey();
+                OffsetFetchResponse.PartitionData partitionData = 
entry.getValue();
+                final Errors error = partitionData.error;
+
+                if (error == Errors.NONE) {
+                    final long offset = partitionData.offset;
+                    final String metadata = partitionData.metadata;
+                    final Optional<Integer> leaderEpoch = 
partitionData.leaderEpoch;
+                    // Negative offset indicates that the group has no 
committed offset for this partition
+                    if (offset < 0) {
+                        groupOffsetsListing.put(topicPartition, null);
+                    } else {
+                        groupOffsetsListing.put(topicPartition, new 
OffsetAndMetadata(offset, leaderEpoch, metadata));
+                    }
+                } else {
+                    log.warn("Skipping return offset for {} due to error {}.", 
topicPartition, error);
+                }
+            }
+            completed.put(groupId, groupOffsetsListing);
+        }
+        return new ApiResult<>(completed, failed, unmapped);
+    }
+
+    private void handleError(
+        CoordinatorKey groupId,
+        Errors error,
+        Map<CoordinatorKey,
+        Throwable> failed,
+        List<CoordinatorKey> unmapped
+    ) {
+        switch (error) {
+            case GROUP_AUTHORIZATION_FAILED:
+                log.error("Received authorization failure for group {} in 
`DeleteConsumerGroupOffsets` response", groupId,

Review comment:
       Seems like there's a copy/paste error here. I think we want 
`DeleteConsumerGroupOffsets` here to be `ListConsumerGroupOffsets`. Perhaps 
instead of writing the name for each one of these, we could just call 
`apiName()` instead?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandler.java
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
+import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.LeaveGroupRequest;
+import org.apache.kafka.common.requests.LeaveGroupResponse;
+import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+public class RemoveMembersFromConsumerGroupHandler implements 
AdminApiHandler<CoordinatorKey, Map<MemberIdentity, Errors>> {
+
+    private final CoordinatorKey groupId;
+    private final List<MemberIdentity> members;
+    private final Logger log;
+    private final AdminApiLookupStrategy<CoordinatorKey> lookupStrategy;
+
+    public RemoveMembersFromConsumerGroupHandler(
+        String groupId,
+        List<MemberIdentity> members,
+        LogContext logContext
+    ) {
+        this.groupId = CoordinatorKey.byGroupId(groupId);
+        this.members = members;
+        this.log = 
logContext.logger(RemoveMembersFromConsumerGroupHandler.class);
+        this.lookupStrategy = new CoordinatorStrategy(CoordinatorType.GROUP, 
logContext);
+    }
+
+    @Override
+    public String apiName() {
+        return "leaveGroup";
+    }
+
+    @Override
+    public AdminApiLookupStrategy<CoordinatorKey> lookupStrategy() {
+        return lookupStrategy;
+    }
+
+    public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey, 
Map<MemberIdentity, Errors>> newFuture(
+        String groupId
+    ) {
+        return 
AdminApiFuture.forKeys(Collections.singleton(CoordinatorKey.byGroupId(groupId)));
+    }
+
+    @Override
+    public LeaveGroupRequest.Builder buildRequest(int coordinatorId, 
Set<CoordinatorKey> keys) {
+        return new LeaveGroupRequest.Builder(groupId.idValue, members);
+    }
+
+    @Override
+    public ApiResult<CoordinatorKey, Map<MemberIdentity, Errors>> 
handleResponse(
+        Node coordinator,
+        Set<CoordinatorKey> groupIds,
+        AbstractResponse abstractResponse
+    ) {
+        final LeaveGroupResponse response = (LeaveGroupResponse) 
abstractResponse;
+        Map<CoordinatorKey, Map<MemberIdentity, Errors>> completed = new 
HashMap<>();
+        Map<CoordinatorKey, Throwable> failed = new HashMap<>();
+        List<CoordinatorKey> unmapped = new ArrayList<>();
+
+        final Errors error = Errors.forCode(response.data().errorCode());
+        if (error != Errors.NONE) {
+            handleError(groupId, error, failed, unmapped);
+        } else {
+            final Map<MemberIdentity, Errors> memberErrors = new HashMap<>();
+            for (MemberResponse memberResponse : response.memberResponses()) {
+                memberErrors.put(new MemberIdentity()
+                                     .setMemberId(memberResponse.memberId())
+                                     
.setGroupInstanceId(memberResponse.groupInstanceId()),
+                                 Errors.forCode(memberResponse.errorCode()));
+
+            }
+            completed.put(groupId, memberErrors);
+        }
+        return new ApiResult<>(completed, failed, unmapped);
+    }
+
+    private void handleError(
+        CoordinatorKey groupId,
+        Errors error, Map<CoordinatorKey,
+        Throwable> failed,
+        List<CoordinatorKey> unmapped
+    ) {
+        switch (error) {
+            case GROUP_AUTHORIZATION_FAILED:
+                log.error("Received authorization failure for group {} in 
`DeleteConsumerGroupOffsets` response", groupId,

Review comment:
       Another place where I think we want this to be 
`RemoveMembersFromConsumerGroup` instead of `DeleteConsumerGroupOffsets`

##########
File path: clients/src/main/resources/common/message/FindCoordinatorRequest.json
##########
@@ -23,12 +23,16 @@
   // Version 2 is the same as version 1.
   //
   // Version 3 is the first flexible version.
-  "validVersions": "0-3",
+  //
+  // Version 4 adds CoordinatorKeys
+  "validVersions": "0-4",
   "flexibleVersions": "3+",
   "fields": [
-    { "name": "Key", "type": "string", "versions": "0+",
+    { "name": "Key", "type": "string", "versions": "0-3",
       "about": "The coordinator key." },
     { "name": "KeyType", "type": "int8", "versions": "1+", "default": "0", 
"ignorable": false,

Review comment:
       Should we also change the `versions` for this to `0-3` rather than `1+`?

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java
##########
@@ -70,7 +75,7 @@ public AbstractResponse getErrorResponse(int throttleTimeMs, 
Throwable e) {
             response.setThrottleTimeMs(throttleTimeMs);
         }
         Errors error = Errors.forException(e);
-        return FindCoordinatorResponse.prepareResponse(error, Node.noNode());
+        return FindCoordinatorResponse.prepareOldResponse(error, 
Node.noNode());

Review comment:
       Why do we call the `prepareOldResponse` in `getErrorResponse`? Should we 
be doing a version check here?

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
##########
@@ -95,4 +96,21 @@ public static FindCoordinatorResponse prepareResponse(Errors 
error, Node node) {
             .setPort(node.port());
         return new FindCoordinatorResponse(data);
     }
+
+    public static FindCoordinatorResponse prepareResponse(Errors error, String 
key, Node node) {
+        FindCoordinatorResponseData data = new FindCoordinatorResponseData();
+        data.setCoordinators(Collections.singletonList(
+                new FindCoordinatorResponseData.Coordinator()
+                .setErrorCode(error.code())
+                .setErrorMessage(error.message())
+                .setKey(key)
+                .setHost(node.host())
+                .setPort(node.port())
+                .setNodeId(node.id())));
+        return new FindCoordinatorResponse(data);
+    }
+
+    public static boolean isBatch(RequestHeader header) {
+        return header.apiVersion() >= 4;
+    }

Review comment:
       This doesn't seem to be used anywhere, is this supposed to be there for 
convenience for the future?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to