dajac commented on a change in pull request #10616:
URL: https://github.com/apache/kafka/pull/10616#discussion_r638764893



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/AllBrokersStrategy.java
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.message.MetadataRequestData;
+import org.apache.kafka.common.message.MetadataResponseData;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.OptionalInt;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * This class is used for use cases which require requests to be sent to all
+ * brokers in the cluster.
+ *
+ * This is a slightly degenerate case of a lookup strategy in the sense that
+ * the broker IDs are used as both the keys and values. Also, unlike
+ * {@link CoordinatorStrategy} and {@link PartitionLeaderStrategy}, we do not
+ * know the set of keys ahead of time: we require the initial lookup in order
+ * to discover what the broker IDs are. This is represented with a more complex
+ * type {@code Future<Map<Integer, Future<V>>} in the admin API result type.
+ * For example, see {@link 
org.apache.kafka.clients.admin.ListTransactionsResult}.
+ */
+public class AllBrokersStrategy implements 
AdminApiLookupStrategy<AllBrokersStrategy.BrokerKey> {
+    public static final BrokerKey ANY_BROKER = new 
BrokerKey(OptionalInt.empty());
+    public static final Set<BrokerKey> LOOKUP_KEYS = 
Collections.singleton(ANY_BROKER);
+    private static final ApiRequestScope SINGLE_REQUEST_SCOPE = new 
ApiRequestScope() {
+    };
+
+    private final Logger log;
+
+    public AllBrokersStrategy(
+        LogContext logContext
+    ) {
+        this.log = logContext.logger(AllBrokersStrategy.class);
+    }
+
+    @Override
+    public ApiRequestScope lookupScope(BrokerKey key) {
+        return SINGLE_REQUEST_SCOPE;
+    }
+
+    @Override
+    public MetadataRequest.Builder buildRequest(Set<BrokerKey> keys) {
+        validateLookupKeys(keys);
+        // Send empty `Metadata` request. We are only interested in the 
brokers from the response
+        return new MetadataRequest.Builder(new MetadataRequestData());
+    }
+
+    @Override
+    public LookupResult<BrokerKey> handleResponse(Set<BrokerKey> keys, 
AbstractResponse abstractResponse) {
+        validateLookupKeys(keys);
+
+        MetadataResponse response = (MetadataResponse) abstractResponse;
+        MetadataResponseData.MetadataResponseBrokerCollection brokers = 
response.data().brokers();
+
+        if (brokers.isEmpty()) {
+            log.debug("Metadata response contained no brokers. Will backoff 
and retry");
+            return LookupResult.empty();
+        } else {
+            log.debug("Discovered all brokers {} to send requests to", 
brokers);
+        }
+
+        Map<BrokerKey, Integer> brokerKeys = 
brokers.stream().collect(Collectors.toMap(
+            broker -> new BrokerKey(OptionalInt.of(broker.nodeId())),
+            MetadataResponseData.MetadataResponseBroker::nodeId
+        ));
+
+        return new LookupResult<>(
+            Collections.singletonList(ANY_BROKER),
+            Collections.emptyMap(),
+            brokerKeys
+        );
+    }
+
+    private void validateLookupKeys(Set<BrokerKey> keys) {
+        if (keys.size() != 1) {
+            throw new IllegalArgumentException("Unexpected key set " + keys);

Review comment:
       nit: I would add a `:` after `set` to stay consistent with the other 
error messages in this PR.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/AllBrokersStrategy.java
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.message.MetadataRequestData;
+import org.apache.kafka.common.message.MetadataResponseData;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.OptionalInt;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * This class is used for use cases which require requests to be sent to all
+ * brokers in the cluster.
+ *
+ * This is a slightly degenerate case of a lookup strategy in the sense that
+ * the broker IDs are used as both the keys and values. Also, unlike
+ * {@link CoordinatorStrategy} and {@link PartitionLeaderStrategy}, we do not
+ * know the set of keys ahead of time: we require the initial lookup in order
+ * to discover what the broker IDs are. This is represented with a more complex
+ * type {@code Future<Map<Integer, Future<V>>} in the admin API result type.
+ * For example, see {@link 
org.apache.kafka.clients.admin.ListTransactionsResult}.
+ */
+public class AllBrokersStrategy implements 
AdminApiLookupStrategy<AllBrokersStrategy.BrokerKey> {
+    public static final BrokerKey ANY_BROKER = new 
BrokerKey(OptionalInt.empty());
+    public static final Set<BrokerKey> LOOKUP_KEYS = 
Collections.singleton(ANY_BROKER);
+    private static final ApiRequestScope SINGLE_REQUEST_SCOPE = new 
ApiRequestScope() {
+    };
+
+    private final Logger log;
+
+    public AllBrokersStrategy(
+        LogContext logContext
+    ) {
+        this.log = logContext.logger(AllBrokersStrategy.class);
+    }
+
+    @Override
+    public ApiRequestScope lookupScope(BrokerKey key) {
+        return SINGLE_REQUEST_SCOPE;
+    }
+
+    @Override
+    public MetadataRequest.Builder buildRequest(Set<BrokerKey> keys) {
+        validateLookupKeys(keys);
+        // Send empty `Metadata` request. We are only interested in the 
brokers from the response
+        return new MetadataRequest.Builder(new MetadataRequestData());
+    }
+
+    @Override
+    public LookupResult<BrokerKey> handleResponse(Set<BrokerKey> keys, 
AbstractResponse abstractResponse) {
+        validateLookupKeys(keys);
+
+        MetadataResponse response = (MetadataResponse) abstractResponse;
+        MetadataResponseData.MetadataResponseBrokerCollection brokers = 
response.data().brokers();
+
+        if (brokers.isEmpty()) {
+            log.debug("Metadata response contained no brokers. Will backoff 
and retry");
+            return LookupResult.empty();
+        } else {
+            log.debug("Discovered all brokers {} to send requests to", 
brokers);
+        }
+
+        Map<BrokerKey, Integer> brokerKeys = 
brokers.stream().collect(Collectors.toMap(
+            broker -> new BrokerKey(OptionalInt.of(broker.nodeId())),
+            MetadataResponseData.MetadataResponseBroker::nodeId
+        ));
+
+        return new LookupResult<>(
+            Collections.singletonList(ANY_BROKER),
+            Collections.emptyMap(),
+            brokerKeys
+        );
+    }
+
+    private void validateLookupKeys(Set<BrokerKey> keys) {
+        if (keys.size() != 1) {
+            throw new IllegalArgumentException("Unexpected key set " + keys);
+        }
+        BrokerKey key = keys.iterator().next();
+        if (key != ANY_BROKER) {
+            throw new IllegalArgumentException("Unexpected key set " + keys);

Review comment:
       nit: ditto.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/AllBrokersStrategy.java
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.message.MetadataRequestData;
+import org.apache.kafka.common.message.MetadataResponseData;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.OptionalInt;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * This class is used for use cases which require requests to be sent to all
+ * brokers in the cluster.
+ *
+ * This is a slightly degenerate case of a lookup strategy in the sense that
+ * the broker IDs are used as both the keys and values. Also, unlike
+ * {@link CoordinatorStrategy} and {@link PartitionLeaderStrategy}, we do not
+ * know the set of keys ahead of time: we require the initial lookup in order
+ * to discover what the broker IDs are. This is represented with a more complex
+ * type {@code Future<Map<Integer, Future<V>>} in the admin API result type.
+ * For example, see {@link 
org.apache.kafka.clients.admin.ListTransactionsResult}.
+ */
+public class AllBrokersStrategy implements 
AdminApiLookupStrategy<AllBrokersStrategy.BrokerKey> {
+    public static final BrokerKey ANY_BROKER = new 
BrokerKey(OptionalInt.empty());
+    public static final Set<BrokerKey> LOOKUP_KEYS = 
Collections.singleton(ANY_BROKER);
+    private static final ApiRequestScope SINGLE_REQUEST_SCOPE = new 
ApiRequestScope() {
+    };
+
+    private final Logger log;
+
+    public AllBrokersStrategy(
+        LogContext logContext
+    ) {
+        this.log = logContext.logger(AllBrokersStrategy.class);
+    }
+
+    @Override
+    public ApiRequestScope lookupScope(BrokerKey key) {
+        return SINGLE_REQUEST_SCOPE;
+    }
+
+    @Override
+    public MetadataRequest.Builder buildRequest(Set<BrokerKey> keys) {
+        validateLookupKeys(keys);
+        // Send empty `Metadata` request. We are only interested in the 
brokers from the response
+        return new MetadataRequest.Builder(new MetadataRequestData());
+    }
+
+    @Override
+    public LookupResult<BrokerKey> handleResponse(Set<BrokerKey> keys, 
AbstractResponse abstractResponse) {
+        validateLookupKeys(keys);
+
+        MetadataResponse response = (MetadataResponse) abstractResponse;
+        MetadataResponseData.MetadataResponseBrokerCollection brokers = 
response.data().brokers();
+
+        if (brokers.isEmpty()) {
+            log.debug("Metadata response contained no brokers. Will backoff 
and retry");
+            return LookupResult.empty();
+        } else {
+            log.debug("Discovered all brokers {} to send requests to", 
brokers);
+        }
+
+        Map<BrokerKey, Integer> brokerKeys = 
brokers.stream().collect(Collectors.toMap(
+            broker -> new BrokerKey(OptionalInt.of(broker.nodeId())),
+            MetadataResponseData.MetadataResponseBroker::nodeId
+        ));
+
+        return new LookupResult<>(
+            Collections.singletonList(ANY_BROKER),
+            Collections.emptyMap(),
+            brokerKeys
+        );
+    }
+
+    private void validateLookupKeys(Set<BrokerKey> keys) {
+        if (keys.size() != 1) {
+            throw new IllegalArgumentException("Unexpected key set " + keys);
+        }
+        BrokerKey key = keys.iterator().next();
+        if (key != ANY_BROKER) {
+            throw new IllegalArgumentException("Unexpected key set " + keys);
+        }
+    }
+
+    public static class BrokerKey {
+        public final OptionalInt brokerId;
+
+        public BrokerKey(OptionalInt brokerId) {
+            this.brokerId = brokerId;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            BrokerKey that = (BrokerKey) o;
+            return Objects.equals(brokerId, that.brokerId);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(brokerId);
+        }
+
+        @Override
+        public String toString() {
+            return "BrokerKey(" +
+                "brokerId=" + brokerId +
+                ')';
+        }
+    }
+
+    public static class AllBrokersFuture<V> implements 
AdminApiFuture<BrokerKey, V> {
+        private final KafkaFutureImpl<Map<Integer, KafkaFutureImpl<V>>> future 
= new KafkaFutureImpl<>();
+        private final Map<Integer, KafkaFutureImpl<V>> brokerFutures = new 
HashMap<>();
+
+        @Override
+        public Set<BrokerKey> lookupKeys() {
+            return LOOKUP_KEYS;
+        }
+
+        @Override
+        public void completeLookup(Map<BrokerKey, Integer> brokerMapping) {
+            brokerMapping.forEach((brokerKey, brokerId) -> {
+                if (brokerId != brokerKey.brokerId.orElse(-1)) {
+                    throw new IllegalArgumentException("Invalid lookup mapping 
" + brokerKey + " -> " + brokerId);
+                }
+                brokerFutures.put(brokerId, new KafkaFutureImpl<>());
+            });
+            future.complete(brokerFutures);
+        }
+
+        @Override
+        public void completeLookupExceptionally(Map<BrokerKey, Throwable> 
lookupErrors) {
+            if (!LOOKUP_KEYS.equals(lookupErrors.keySet())) {
+                throw new IllegalArgumentException("Unexpected keys among 
lookup errors: " + lookupErrors);
+            }
+            future.completeExceptionally(lookupErrors.get(ANY_BROKER));
+        }
+
+        @Override
+        public void complete(Map<BrokerKey, V> values) {
+            values.forEach(this::complete);
+        }
+
+        private void complete(AllBrokersStrategy.BrokerKey key, V value) {
+            if (key == ANY_BROKER) {
+                throw new IllegalArgumentException("Invalid attempt to 
complete with lookup key sentinel");
+            } else {
+                futureOrThrow(key).complete(value);
+            }
+        }
+
+        @Override
+        public void completeExceptionally(Map<BrokerKey, Throwable> errors) {
+            errors.forEach(this::completeExceptionally);
+        }
+
+        private void completeExceptionally(AllBrokersStrategy.BrokerKey key, 
Throwable t) {
+            if (key == ANY_BROKER) {
+                future.completeExceptionally(t);
+            } else {
+                futureOrThrow(key).completeExceptionally(t);
+            }
+        }
+
+        public KafkaFutureImpl<Map<Integer, KafkaFutureImpl<V>>> all() {
+            return future;
+        }
+
+        private KafkaFutureImpl<V> futureOrThrow(BrokerKey key) {
+            if (!key.brokerId.isPresent()) {
+                throw new IllegalArgumentException("Attempt to complete with 
invalid key " + key);

Review comment:
       nit: ditto.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/AllBrokersStrategy.java
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.message.MetadataRequestData;
+import org.apache.kafka.common.message.MetadataResponseData;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.OptionalInt;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * This class is used for use cases which require requests to be sent to all
+ * brokers in the cluster.
+ *
+ * This is a slightly degenerate case of a lookup strategy in the sense that
+ * the broker IDs are used as both the keys and values. Also, unlike
+ * {@link CoordinatorStrategy} and {@link PartitionLeaderStrategy}, we do not
+ * know the set of keys ahead of time: we require the initial lookup in order
+ * to discover what the broker IDs are. This is represented with a more complex
+ * type {@code Future<Map<Integer, Future<V>>} in the admin API result type.
+ * For example, see {@link 
org.apache.kafka.clients.admin.ListTransactionsResult}.
+ */
+public class AllBrokersStrategy implements 
AdminApiLookupStrategy<AllBrokersStrategy.BrokerKey> {
+    public static final BrokerKey ANY_BROKER = new 
BrokerKey(OptionalInt.empty());
+    public static final Set<BrokerKey> LOOKUP_KEYS = 
Collections.singleton(ANY_BROKER);
+    private static final ApiRequestScope SINGLE_REQUEST_SCOPE = new 
ApiRequestScope() {
+    };
+
+    private final Logger log;
+
+    public AllBrokersStrategy(
+        LogContext logContext
+    ) {
+        this.log = logContext.logger(AllBrokersStrategy.class);
+    }
+
+    @Override
+    public ApiRequestScope lookupScope(BrokerKey key) {
+        return SINGLE_REQUEST_SCOPE;
+    }
+
+    @Override
+    public MetadataRequest.Builder buildRequest(Set<BrokerKey> keys) {
+        validateLookupKeys(keys);
+        // Send empty `Metadata` request. We are only interested in the 
brokers from the response
+        return new MetadataRequest.Builder(new MetadataRequestData());
+    }
+
+    @Override
+    public LookupResult<BrokerKey> handleResponse(Set<BrokerKey> keys, 
AbstractResponse abstractResponse) {
+        validateLookupKeys(keys);
+
+        MetadataResponse response = (MetadataResponse) abstractResponse;
+        MetadataResponseData.MetadataResponseBrokerCollection brokers = 
response.data().brokers();
+
+        if (brokers.isEmpty()) {
+            log.debug("Metadata response contained no brokers. Will backoff 
and retry");
+            return LookupResult.empty();
+        } else {
+            log.debug("Discovered all brokers {} to send requests to", 
brokers);
+        }
+
+        Map<BrokerKey, Integer> brokerKeys = 
brokers.stream().collect(Collectors.toMap(
+            broker -> new BrokerKey(OptionalInt.of(broker.nodeId())),
+            MetadataResponseData.MetadataResponseBroker::nodeId
+        ));
+
+        return new LookupResult<>(
+            Collections.singletonList(ANY_BROKER),
+            Collections.emptyMap(),
+            brokerKeys
+        );
+    }
+
+    private void validateLookupKeys(Set<BrokerKey> keys) {
+        if (keys.size() != 1) {
+            throw new IllegalArgumentException("Unexpected key set " + keys);
+        }
+        BrokerKey key = keys.iterator().next();
+        if (key != ANY_BROKER) {
+            throw new IllegalArgumentException("Unexpected key set " + keys);
+        }
+    }
+
+    public static class BrokerKey {
+        public final OptionalInt brokerId;
+
+        public BrokerKey(OptionalInt brokerId) {
+            this.brokerId = brokerId;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            BrokerKey that = (BrokerKey) o;
+            return Objects.equals(brokerId, that.brokerId);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(brokerId);
+        }
+
+        @Override
+        public String toString() {
+            return "BrokerKey(" +
+                "brokerId=" + brokerId +
+                ')';
+        }
+    }
+
+    public static class AllBrokersFuture<V> implements 
AdminApiFuture<BrokerKey, V> {
+        private final KafkaFutureImpl<Map<Integer, KafkaFutureImpl<V>>> future 
= new KafkaFutureImpl<>();
+        private final Map<Integer, KafkaFutureImpl<V>> brokerFutures = new 
HashMap<>();
+
+        @Override
+        public Set<BrokerKey> lookupKeys() {
+            return LOOKUP_KEYS;
+        }
+
+        @Override
+        public void completeLookup(Map<BrokerKey, Integer> brokerMapping) {
+            brokerMapping.forEach((brokerKey, brokerId) -> {
+                if (brokerId != brokerKey.brokerId.orElse(-1)) {
+                    throw new IllegalArgumentException("Invalid lookup mapping 
" + brokerKey + " -> " + brokerId);
+                }
+                brokerFutures.put(brokerId, new KafkaFutureImpl<>());
+            });
+            future.complete(brokerFutures);
+        }
+
+        @Override
+        public void completeLookupExceptionally(Map<BrokerKey, Throwable> 
lookupErrors) {
+            if (!LOOKUP_KEYS.equals(lookupErrors.keySet())) {
+                throw new IllegalArgumentException("Unexpected keys among 
lookup errors: " + lookupErrors);
+            }
+            future.completeExceptionally(lookupErrors.get(ANY_BROKER));
+        }
+
+        @Override
+        public void complete(Map<BrokerKey, V> values) {
+            values.forEach(this::complete);
+        }
+
+        private void complete(AllBrokersStrategy.BrokerKey key, V value) {
+            if (key == ANY_BROKER) {
+                throw new IllegalArgumentException("Invalid attempt to 
complete with lookup key sentinel");
+            } else {
+                futureOrThrow(key).complete(value);
+            }
+        }
+
+        @Override
+        public void completeExceptionally(Map<BrokerKey, Throwable> errors) {
+            errors.forEach(this::completeExceptionally);
+        }
+
+        private void completeExceptionally(AllBrokersStrategy.BrokerKey key, 
Throwable t) {
+            if (key == ANY_BROKER) {
+                future.completeExceptionally(t);
+            } else {
+                futureOrThrow(key).completeExceptionally(t);
+            }
+        }
+
+        public KafkaFutureImpl<Map<Integer, KafkaFutureImpl<V>>> all() {
+            return future;
+        }
+
+        private KafkaFutureImpl<V> futureOrThrow(BrokerKey key) {
+            if (!key.brokerId.isPresent()) {
+                throw new IllegalArgumentException("Attempt to complete with 
invalid key " + key);
+            } else {
+                int brokerId = key.brokerId.getAsInt();
+                KafkaFutureImpl<V> future = brokerFutures.get(brokerId);
+                if (future == null) {
+                    throw new IllegalArgumentException("Attempt to complete 
with unknown broker id " + brokerId);

Review comment:
       nit: ditto.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/ListTransactionsResult.java
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * The result of the {@link Admin#listTransactions()} call.
+ * <p>
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
+@InterfaceStability.Evolving
+public class ListTransactionsResult {
+    private final KafkaFutureImpl<Map<Integer, 
KafkaFutureImpl<Collection<TransactionListing>>>> future;
+
+    ListTransactionsResult(KafkaFutureImpl<Map<Integer, 
KafkaFutureImpl<Collection<TransactionListing>>>> future) {
+        this.future = future;
+    }
+
+    public KafkaFuture<Collection<TransactionListing>> all() {
+        return allByBrokerId().thenApply(map -> {
+            List<TransactionListing> allListings = new ArrayList<>();
+            for (Collection<TransactionListing> listings : map.values()) {
+                allListings.addAll(listings);
+            }
+            return allListings;
+        });
+    }
+
+    public KafkaFuture<Set<Integer>> brokerIds() {
+        return future.thenApply(map -> new HashSet<>(map.keySet()));
+    }

Review comment:
       This is the only remaining point of discussion. I don't have a strong 
preference for any of them so I leave it up to you.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to