dajac commented on a change in pull request #10974:
URL: https://github.com/apache/kafka/pull/10974#discussion_r663770706



##########
File path: 
tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java
##########
@@ -437,6 +442,536 @@ public void 
testOldBrokerAbortTransactionWithUnknownCoordinatorEpoch(int coordin
         assertNormalExit();
     }
 
+    @Test
+    public void testFindHangingRequiresEitherBrokerIdOrTopic() throws 
Exception {
+        assertCommandFailure(new String[]{
+            "--bootstrap-server",
+            "localhost:9092",
+            "find-hanging"
+        });
+    }
+
+    @Test
+    public void testFindHangingRequiresTopicIfPartitionIsSpecified() throws 
Exception {
+        assertCommandFailure(new String[]{
+            "--bootstrap-server",
+            "localhost:9092",
+            "find-hanging",
+            "--broker-id",
+            "0",
+            "--partition",
+            "5"
+        });
+    }
+
+    private void expectListTransactions(
+        Map<Integer, Collection<TransactionListing>> listingsByBroker
+    ) {
+        expectListTransactions(null, listingsByBroker);

Review comment:
       nit: We could instantiate a new `ListTransactionsOptions` here instead 
of using `null`. This would remove the `null` check below.

##########
File path: tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java
##########
@@ -461,6 +469,417 @@ public void execute(Admin admin, Namespace ns, 
PrintStream out) throws Exception
         }
     }
 
+    static class FindHangingTransactionsCommand extends TransactionsCommand {
+        private static final int MAX_BATCH_SIZE = 500;
+
+        static final String[] HEADERS = new String[] {
+            "Topic",
+            "Partition",
+            "ProducerId",
+            "ProducerEpoch",
+            "CoordinatorEpoch",
+            "StartOffset",
+            "LastTimestamp",
+            "Duration(min)"
+        };
+
+        FindHangingTransactionsCommand(Time time) {
+            super(time);
+        }
+
+        @Override
+        String name() {
+            return "find-hanging";
+        }
+
+        @Override
+        void addSubparser(Subparsers subparsers) {
+            Subparser subparser = subparsers.addParser(name())
+                .help("find hanging transactions");
+
+            subparser.addArgument("--broker-id")
+                .help("broker id to search for hanging transactions")
+                .action(store())
+                .type(Integer.class)
+                .required(false);
+
+            subparser.addArgument("--max-transaction-timeout")
+                .help("maximum transaction timeout in minutes to limit the 
scope of the search")
+                .action(store())
+                .type(Integer.class)
+                .setDefault(15)
+                .required(false);
+
+            subparser.addArgument("--topic")
+                .help("topic name to limit search to")
+                .action(store())
+                .type(String.class)
+                .required(false);
+
+            subparser.addArgument("--partition")
+                .help("partition number")
+                .action(store())
+                .type(Integer.class)
+                .required(false);
+        }
+
+        @Override
+        void execute(Admin admin, Namespace ns, PrintStream out) throws 
Exception {
+            Optional<Integer> brokerId = 
Optional.ofNullable(ns.getInt("broker_id"));
+            Optional<String> topic = 
Optional.ofNullable(ns.getString("topic"));
+
+            if (!topic.isPresent() && !brokerId.isPresent()) {
+                printErrorAndExit("The `find-hanging` command requires either 
--topic " +
+                    "or --broker-id to limit the scope of the search");
+                return;
+            }
+
+            Optional<Integer> partition = 
Optional.ofNullable(ns.getInt("partition"));
+            if (partition.isPresent() && !topic.isPresent()) {
+                printErrorAndExit("The --partition argument requires --topic 
to be provided");
+                return;
+            }
+
+            long maxTransactionTimeoutMs = TimeUnit.MINUTES.toMillis(
+                ns.getInt("max_transaction_timeout"));
+
+            List<TopicPartition> topicPartitions = 
collectTopicPartitionsToSearch(
+                admin,
+                topic,
+                partition,
+                brokerId
+            );
+
+            List<OpenTransaction> candidates = 
collectCandidateOpenTransactions(
+                admin,
+                brokerId,
+                maxTransactionTimeoutMs,
+                topicPartitions
+            );
+
+            if (candidates.isEmpty()) {
+                printHangingTransactions(Collections.emptyList(), out);
+            } else {
+                Map<Long, List<OpenTransaction>> openTransactionsByProducerId 
= groupByProducerId(candidates);
+
+                Map<Long, String> transactionalIds = lookupTransactionalIds(
+                    admin,
+                    openTransactionsByProducerId.keySet()
+                );
+
+                Map<String, TransactionDescription> descriptions = 
describeTransactions(
+                    admin,
+                    transactionalIds.values()
+                );
+
+                List<OpenTransaction> hangingTransactions = 
filterHangingTransactions(
+                    openTransactionsByProducerId,
+                    transactionalIds,
+                    descriptions
+                );
+
+                printHangingTransactions(hangingTransactions, out);
+            }
+        }
+
+        private List<TopicPartition> collectTopicPartitionsToSearch(
+            Admin admin,
+            Optional<String> topic,
+            Optional<Integer> partition,
+            Optional<Integer> brokerId
+        ) throws Exception {
+            final List<String> topics;
+
+            if (topic.isPresent()) {
+                if (partition.isPresent()) {
+                    return Collections.singletonList(new 
TopicPartition(topic.get(), partition.get()));
+                } else {
+                    topics = Collections.singletonList(topic.get());
+                }
+            } else {
+                topics = listTopics(admin);
+            }
+
+            return findTopicPartitions(
+                admin,
+                brokerId,
+                topics
+            );
+        }
+
+        private List<OpenTransaction> filterHangingTransactions(
+            Map<Long, List<OpenTransaction>> openTransactionsByProducerId,
+            Map<Long, String> transactionalIds,
+            Map<String, TransactionDescription> descriptions
+        ) {
+            List<OpenTransaction> hangingTransactions = new ArrayList<>();
+
+            openTransactionsByProducerId.forEach((producerId, 
openTransactions) -> {
+                String transactionalId = transactionalIds.get(producerId);
+                if (transactionalId == null) {
+                    // If we could not find the transactionalId corresponding 
to the
+                    // producerId of an open transaction, then the transaction 
is hanging.
+                    hangingTransactions.addAll(openTransactions);
+                } else {
+                    // Otherwise, we need to check the current transaction 
state
+                    TransactionDescription description = 
descriptions.get(transactionalId);
+                    if (description == null) {
+                        hangingTransactions.addAll(openTransactions);
+                    } else {
+                        for (OpenTransaction openTransaction : 
openTransactions) {
+                            if (description.producerEpoch() > 
openTransaction.producerState.producerEpoch()
+                                || 
!description.topicPartitions().contains(openTransaction.topicPartition)) {

Review comment:
       Should we also add a small comment here to be complete? Otherwise, we 
could add a javadoc which explains all the conditions.

##########
File path: tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java
##########
@@ -461,6 +469,417 @@ public void execute(Admin admin, Namespace ns, 
PrintStream out) throws Exception
         }
     }
 
+    static class FindHangingTransactionsCommand extends TransactionsCommand {
+        private static final int MAX_BATCH_SIZE = 500;
+
+        static final String[] HEADERS = new String[] {
+            "Topic",
+            "Partition",
+            "ProducerId",
+            "ProducerEpoch",
+            "CoordinatorEpoch",
+            "StartOffset",
+            "LastTimestamp",
+            "Duration(min)"
+        };
+
+        FindHangingTransactionsCommand(Time time) {
+            super(time);
+        }
+
+        @Override
+        String name() {
+            return "find-hanging";
+        }
+
+        @Override
+        void addSubparser(Subparsers subparsers) {
+            Subparser subparser = subparsers.addParser(name())
+                .help("find hanging transactions");
+
+            subparser.addArgument("--broker-id")
+                .help("broker id to search for hanging transactions")
+                .action(store())
+                .type(Integer.class)
+                .required(false);
+
+            subparser.addArgument("--max-transaction-timeout")
+                .help("maximum transaction timeout in minutes to limit the 
scope of the search")
+                .action(store())
+                .type(Integer.class)
+                .setDefault(15)
+                .required(false);
+
+            subparser.addArgument("--topic")
+                .help("topic name to limit search to")
+                .action(store())
+                .type(String.class)
+                .required(false);
+
+            subparser.addArgument("--partition")
+                .help("partition number")
+                .action(store())
+                .type(Integer.class)
+                .required(false);
+        }
+
+        @Override
+        void execute(Admin admin, Namespace ns, PrintStream out) throws 
Exception {
+            Optional<Integer> brokerId = 
Optional.ofNullable(ns.getInt("broker_id"));
+            Optional<String> topic = 
Optional.ofNullable(ns.getString("topic"));
+
+            if (!topic.isPresent() && !brokerId.isPresent()) {
+                printErrorAndExit("The `find-hanging` command requires either 
--topic " +
+                    "or --broker-id to limit the scope of the search");
+                return;
+            }
+
+            Optional<Integer> partition = 
Optional.ofNullable(ns.getInt("partition"));
+            if (partition.isPresent() && !topic.isPresent()) {
+                printErrorAndExit("The --partition argument requires --topic 
to be provided");
+                return;
+            }
+
+            long maxTransactionTimeoutMs = TimeUnit.MINUTES.toMillis(
+                ns.getInt("max_transaction_timeout"));
+
+            List<TopicPartition> topicPartitions = 
collectTopicPartitionsToSearch(
+                admin,
+                topic,
+                partition,
+                brokerId
+            );
+
+            List<OpenTransaction> candidates = 
collectCandidateOpenTransactions(
+                admin,
+                brokerId,
+                maxTransactionTimeoutMs,
+                topicPartitions
+            );
+
+            if (candidates.isEmpty()) {
+                printHangingTransactions(Collections.emptyList(), out);
+            } else {
+                Map<Long, List<OpenTransaction>> openTransactionsByProducerId 
= groupByProducerId(candidates);
+
+                Map<Long, String> transactionalIds = lookupTransactionalIds(
+                    admin,
+                    openTransactionsByProducerId.keySet()
+                );
+
+                Map<String, TransactionDescription> descriptions = 
describeTransactions(
+                    admin,
+                    transactionalIds.values()
+                );
+
+                List<OpenTransaction> hangingTransactions = 
filterHangingTransactions(
+                    openTransactionsByProducerId,
+                    transactionalIds,
+                    descriptions
+                );
+
+                printHangingTransactions(hangingTransactions, out);
+            }
+        }
+
+        private List<TopicPartition> collectTopicPartitionsToSearch(
+            Admin admin,
+            Optional<String> topic,
+            Optional<Integer> partition,
+            Optional<Integer> brokerId
+        ) throws Exception {
+            final List<String> topics;
+
+            if (topic.isPresent()) {
+                if (partition.isPresent()) {
+                    return Collections.singletonList(new 
TopicPartition(topic.get(), partition.get()));
+                } else {
+                    topics = Collections.singletonList(topic.get());
+                }
+            } else {
+                topics = listTopics(admin);
+            }
+
+            return findTopicPartitions(
+                admin,
+                brokerId,
+                topics
+            );
+        }
+
+        private List<OpenTransaction> filterHangingTransactions(
+            Map<Long, List<OpenTransaction>> openTransactionsByProducerId,
+            Map<Long, String> transactionalIds,
+            Map<String, TransactionDescription> descriptions
+        ) {
+            List<OpenTransaction> hangingTransactions = new ArrayList<>();
+
+            openTransactionsByProducerId.forEach((producerId, 
openTransactions) -> {
+                String transactionalId = transactionalIds.get(producerId);
+                if (transactionalId == null) {
+                    // If we could not find the transactionalId corresponding 
to the
+                    // producerId of an open transaction, then the transaction 
is hanging.
+                    hangingTransactions.addAll(openTransactions);
+                } else {
+                    // Otherwise, we need to check the current transaction 
state

Review comment:
       nit: Add `.` at the end of the sentence.

##########
File path: tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java
##########
@@ -461,6 +469,417 @@ public void execute(Admin admin, Namespace ns, 
PrintStream out) throws Exception
         }
     }
 
+    static class FindHangingTransactionsCommand extends TransactionsCommand {
+        private static final int MAX_BATCH_SIZE = 500;
+
+        static final String[] HEADERS = new String[] {
+            "Topic",
+            "Partition",
+            "ProducerId",
+            "ProducerEpoch",
+            "CoordinatorEpoch",
+            "StartOffset",
+            "LastTimestamp",
+            "Duration(min)"
+        };
+
+        FindHangingTransactionsCommand(Time time) {
+            super(time);
+        }
+
+        @Override
+        String name() {
+            return "find-hanging";
+        }
+
+        @Override
+        void addSubparser(Subparsers subparsers) {
+            Subparser subparser = subparsers.addParser(name())
+                .help("find hanging transactions");
+
+            subparser.addArgument("--broker-id")
+                .help("broker id to search for hanging transactions")
+                .action(store())
+                .type(Integer.class)
+                .required(false);
+
+            subparser.addArgument("--max-transaction-timeout")
+                .help("maximum transaction timeout in minutes to limit the 
scope of the search")
+                .action(store())
+                .type(Integer.class)
+                .setDefault(15)
+                .required(false);
+
+            subparser.addArgument("--topic")
+                .help("topic name to limit search to")
+                .action(store())
+                .type(String.class)
+                .required(false);
+
+            subparser.addArgument("--partition")
+                .help("partition number")
+                .action(store())
+                .type(Integer.class)
+                .required(false);
+        }
+
+        @Override
+        void execute(Admin admin, Namespace ns, PrintStream out) throws 
Exception {
+            Optional<Integer> brokerId = 
Optional.ofNullable(ns.getInt("broker_id"));
+            Optional<String> topic = 
Optional.ofNullable(ns.getString("topic"));
+
+            if (!topic.isPresent() && !brokerId.isPresent()) {
+                printErrorAndExit("The `find-hanging` command requires either 
--topic " +
+                    "or --broker-id to limit the scope of the search");
+                return;
+            }
+
+            Optional<Integer> partition = 
Optional.ofNullable(ns.getInt("partition"));
+            if (partition.isPresent() && !topic.isPresent()) {
+                printErrorAndExit("The --partition argument requires --topic 
to be provided");
+                return;
+            }
+
+            long maxTransactionTimeoutMs = TimeUnit.MINUTES.toMillis(
+                ns.getInt("max_transaction_timeout"));
+
+            List<TopicPartition> topicPartitions = 
collectTopicPartitionsToSearch(
+                admin,
+                topic,
+                partition,
+                brokerId
+            );
+
+            List<OpenTransaction> candidates = 
collectCandidateOpenTransactions(
+                admin,
+                brokerId,
+                maxTransactionTimeoutMs,
+                topicPartitions
+            );
+
+            if (candidates.isEmpty()) {
+                printHangingTransactions(Collections.emptyList(), out);
+            } else {
+                Map<Long, List<OpenTransaction>> openTransactionsByProducerId 
= groupByProducerId(candidates);
+
+                Map<Long, String> transactionalIds = lookupTransactionalIds(
+                    admin,
+                    openTransactionsByProducerId.keySet()
+                );
+
+                Map<String, TransactionDescription> descriptions = 
describeTransactions(
+                    admin,
+                    transactionalIds.values()
+                );
+
+                List<OpenTransaction> hangingTransactions = 
filterHangingTransactions(
+                    openTransactionsByProducerId,
+                    transactionalIds,
+                    descriptions
+                );
+
+                printHangingTransactions(hangingTransactions, out);
+            }
+        }
+
+        private List<TopicPartition> collectTopicPartitionsToSearch(
+            Admin admin,
+            Optional<String> topic,
+            Optional<Integer> partition,
+            Optional<Integer> brokerId
+        ) throws Exception {
+            final List<String> topics;
+
+            if (topic.isPresent()) {
+                if (partition.isPresent()) {
+                    return Collections.singletonList(new 
TopicPartition(topic.get(), partition.get()));
+                } else {
+                    topics = Collections.singletonList(topic.get());
+                }
+            } else {
+                topics = listTopics(admin);
+            }
+
+            return findTopicPartitions(
+                admin,
+                brokerId,
+                topics
+            );
+        }
+
+        private List<OpenTransaction> filterHangingTransactions(
+            Map<Long, List<OpenTransaction>> openTransactionsByProducerId,
+            Map<Long, String> transactionalIds,
+            Map<String, TransactionDescription> descriptions
+        ) {
+            List<OpenTransaction> hangingTransactions = new ArrayList<>();
+
+            openTransactionsByProducerId.forEach((producerId, 
openTransactions) -> {
+                String transactionalId = transactionalIds.get(producerId);
+                if (transactionalId == null) {
+                    // If we could not find the transactionalId corresponding 
to the
+                    // producerId of an open transaction, then the transaction 
is hanging.
+                    hangingTransactions.addAll(openTransactions);
+                } else {
+                    // Otherwise, we need to check the current transaction 
state
+                    TransactionDescription description = 
descriptions.get(transactionalId);
+                    if (description == null) {
+                        hangingTransactions.addAll(openTransactions);
+                    } else {
+                        for (OpenTransaction openTransaction : 
openTransactions) {
+                            if (description.producerEpoch() > 
openTransaction.producerState.producerEpoch()
+                                || 
!description.topicPartitions().contains(openTransaction.topicPartition)) {
+                                hangingTransactions.add(openTransaction);
+                            }
+                        }
+                    }
+                }
+            });
+
+            return hangingTransactions;
+        }
+
+        private void printHangingTransactions(
+            List<OpenTransaction> hangingTransactions,
+            PrintStream out
+        ) {
+            long currentTimeMs = time.milliseconds();
+            List<String[]> rows = new ArrayList<>(hangingTransactions.size());
+
+            for (OpenTransaction transaction : hangingTransactions) {
+                long transactionDurationMinutes = 
TimeUnit.MILLISECONDS.toMinutes(
+                    currentTimeMs - transaction.producerState.lastTimestamp());
+
+                rows.add(new String[] {
+                    transaction.topicPartition.topic(),
+                    String.valueOf(transaction.topicPartition.partition()),
+                    String.valueOf(transaction.producerState.producerId()),
+                    String.valueOf(transaction.producerState.producerEpoch()),
+                    
String.valueOf(transaction.producerState.coordinatorEpoch().orElse(-1)),
+                    
String.valueOf(transaction.producerState.currentTransactionStartOffset().orElse(-1)),
+                    String.valueOf(transaction.producerState.lastTimestamp()),
+                    String.valueOf(transactionDurationMinutes)
+                });
+            }
+
+            prettyPrintTable(HEADERS, rows, out);
+        }
+
+        private Map<String, TransactionDescription> describeTransactions(
+            Admin admin,
+            Collection<String> transactionalIds
+        ) throws Exception {
+            try {
+                return admin.describeTransactions(new 
HashSet<>(transactionalIds)).all().get();
+            } catch (ExecutionException e) {
+                printErrorAndExit("Failed to describe " + 
transactionalIds.size()
+                    + " transactions", e.getCause());
+                return Collections.emptyMap();
+            }
+        }
+
+        private Map<Long, List<OpenTransaction>> groupByProducerId(
+            List<OpenTransaction> openTransactions
+        ) {
+            Map<Long, List<OpenTransaction>> res = new HashMap<>();
+            for (OpenTransaction transaction : openTransactions) {
+                List<OpenTransaction> states = res.computeIfAbsent(
+                    transaction.producerState.producerId(),
+                    __ -> new ArrayList<>()
+                );
+                states.add(transaction);
+            }
+            return res;
+        }
+
+        private List<String> listTopics(
+            Admin admin
+        ) throws Exception {
+            try {
+                return new ArrayList<>(admin.listTopics().names().get());
+            } catch (ExecutionException e) {
+                printErrorAndExit("Failed to list topics", e.getCause());
+                return Collections.emptyList();
+            }
+        }
+
+        private List<TopicPartition> findTopicPartitions(
+            Admin admin,
+            Optional<Integer> brokerId,
+            List<String> topics
+        ) throws Exception {
+            List<TopicPartition> topicPartitions = new ArrayList<>();
+            consumeInBatches(topics, MAX_BATCH_SIZE, batch -> {
+                findTopicPartitions(
+                    admin,
+                    brokerId,
+                    batch,
+                    topicPartitions
+                );
+            });
+            return topicPartitions;
+        }
+
+        private void findTopicPartitions(
+            Admin admin,
+            Optional<Integer> brokerId,
+            List<String> topics,
+            List<TopicPartition> topicPartitions
+        ) throws Exception {
+            try {
+                Map<String, TopicDescription> topicDescriptions = 
admin.describeTopics(topics).all().get();
+                topicDescriptions.forEach((topic, description) -> {
+                    description.partitions().forEach(partitionInfo -> {
+                        if (!brokerId.isPresent() || 
hasReplica(brokerId.get(), partitionInfo)) {
+                            topicPartitions.add(new TopicPartition(topic, 
partitionInfo.partition()));
+                        }
+                    });
+                });
+            } catch (ExecutionException e) {
+                printErrorAndExit("Failed to describe " + topics.size() + " 
topics", e.getCause());
+            }
+        }
+
+        private boolean hasReplica(
+            int brokerId,
+            TopicPartitionInfo partitionInfo
+        ) {
+            return partitionInfo.replicas().stream().anyMatch(node -> 
node.id() == brokerId);
+        }
+
+        private List<OpenTransaction> collectCandidateOpenTransactions(
+            Admin admin,
+            Optional<Integer> brokerId,
+            long maxTransactionTimeoutMs,
+            List<TopicPartition> topicPartitions
+        ) throws Exception {
+            // We have to check all partitions on the broker. In order to avoid
+            // overwhelming it with a giant request, we break the requests into
+            // smaller batches.
+
+            List<OpenTransaction> candidateTransactions = new ArrayList<>();
+
+            consumeInBatches(topicPartitions, MAX_BATCH_SIZE, batch -> {
+                collectCandidateOpenTransactions(
+                    admin,
+                    brokerId,
+                    maxTransactionTimeoutMs,
+                    batch,
+                    candidateTransactions
+                );
+            });
+
+            return candidateTransactions;
+        }
+
+        @FunctionalInterface
+        private interface ThrowableConsumer<T> {
+            void accept(T t) throws Exception;
+        }
+
+        private static class OpenTransaction {
+            private final TopicPartition topicPartition;
+            private final ProducerState producerState;
+
+            private OpenTransaction(
+                TopicPartition topicPartition,
+                ProducerState producerState
+            ) {
+                this.topicPartition = topicPartition;
+                this.producerState = producerState;
+            }
+        }
+
+        private void collectCandidateOpenTransactions(
+            Admin admin,
+            Optional<Integer> brokerId,
+            long maxTransactionTimeoutMs,
+            List<TopicPartition> topicPartitions,
+            List<OpenTransaction> candidateTransactions
+        ) throws Exception {
+            try {
+                DescribeProducersOptions describeOptions = new 
DescribeProducersOptions();
+                brokerId.ifPresent(describeOptions::brokerId);
+
+                Map<TopicPartition, 
DescribeProducersResult.PartitionProducerState> producersByPartition =
+                    admin.describeProducers(topicPartitions, 
describeOptions).all().get();
+
+                long currentTimeMs = time.milliseconds();
+
+                producersByPartition.forEach((topicPartition, producersStates) 
-> {
+                    producersStates.activeProducers().forEach(activeProducer 
-> {
+                        if 
(activeProducer.currentTransactionStartOffset().isPresent()) {
+                            long transactionDurationMs = currentTimeMs - 
activeProducer.lastTimestamp();
+                            if (transactionDurationMs > 
maxTransactionTimeoutMs) {
+                                candidateTransactions.add(new OpenTransaction(
+                                    topicPartition,
+                                    activeProducer
+                                ));
+                            }
+                        }
+                    });
+                });
+            } catch (ExecutionException e) {
+                printErrorAndExit("Failed to describe producers for " + 
topicPartitions.size() +
+                    " partitions on broker " + brokerId, e.getCause());
+            }
+        }
+
+        private Map<Long, String> lookupTransactionalIds(
+            Admin admin,
+            Set<Long> producerIds
+        ) throws Exception {
+            try {
+                ListTransactionsOptions listTransactionsOptions = new 
ListTransactionsOptions()
+                    .filterProducerIds(producerIds);
+
+                Collection<TransactionListing> transactionListings =
+                    
admin.listTransactions(listTransactionsOptions).all().get();
+
+                Map<Long, String> transactionalIdMap = new HashMap<>();
+
+                transactionListings.forEach(listing -> {
+                    if (!producerIds.contains(listing.producerId())) {
+                        log.debug("Received transaction listing {} which has a 
producerId " +
+                            "which was not requested", listing);
+                    } else {
+                        transactionalIdMap.put(
+                            listing.producerId(),
+                            listing.transactionalId()
+                        );
+                    }
+                });
+
+                return transactionalIdMap;
+            } catch (ExecutionException e) {
+                printErrorAndExit("Failed to list transactions for " + 
producerIds.size() +
+                    " producers", e.getCause());
+                return Collections.emptyMap();
+            }
+        }
+
+
+        private <T> void consumeInBatches(
+            List<T> list,
+            int batchSize,
+            ThrowableConsumer<List<T>> consumer
+        ) throws Exception {
+            int batchStartIndex = 0;
+            int limitIndex = list.size();
+
+            while (batchStartIndex < limitIndex) {
+                int batchEndIndex = Math.min(
+                    limitIndex,
+                    batchStartIndex + batchSize
+                );
+
+                List<T> batch = list.subList(batchStartIndex, batchEndIndex);
+                consumer.accept(batch);

Review comment:
       nit: As we don't reuse `batch`, we could directly pass 
`list.subList(batchStartIndex, batchEndIndex)` to `accept`.

##########
File path: 
tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java
##########
@@ -437,6 +442,536 @@ public void 
testOldBrokerAbortTransactionWithUnknownCoordinatorEpoch(int coordin
         assertNormalExit();
     }
 
+    @Test
+    public void testFindHangingRequiresEitherBrokerIdOrTopic() throws 
Exception {
+        assertCommandFailure(new String[]{
+            "--bootstrap-server",
+            "localhost:9092",
+            "find-hanging"
+        });
+    }
+
+    @Test
+    public void testFindHangingRequiresTopicIfPartitionIsSpecified() throws 
Exception {
+        assertCommandFailure(new String[]{
+            "--bootstrap-server",
+            "localhost:9092",
+            "find-hanging",
+            "--broker-id",
+            "0",
+            "--partition",
+            "5"
+        });
+    }
+
+    private void expectListTransactions(
+        Map<Integer, Collection<TransactionListing>> listingsByBroker
+    ) {
+        expectListTransactions(null, listingsByBroker);
+    }
+
+    private void expectListTransactions(
+        ListTransactionsOptions options,
+        Map<Integer, Collection<TransactionListing>> listingsByBroker
+    ) {
+        ListTransactionsResult listResult = 
Mockito.mock(ListTransactionsResult.class);
+
+        if (options == null) {
+            Mockito.when(admin.listTransactions()).thenReturn(listResult);
+        } else {
+            
Mockito.when(admin.listTransactions(options)).thenReturn(listResult);
+        }
+
+        List<TransactionListing> allListings = new ArrayList<>();
+        listingsByBroker.values().forEach(allListings::addAll);
+
+        
Mockito.when(listResult.all()).thenReturn(completedFuture(allListings));
+        
Mockito.when(listResult.allByBrokerId()).thenReturn(completedFuture(listingsByBroker));
+    }
+
+    private void expectDescribeProducers(
+        TopicPartition topicPartition,
+        long producerId,
+        short producerEpoch,
+        long lastTimestamp,
+        OptionalInt coordinatorEpoch,
+        OptionalLong txnStartOffset
+    ) {
+        PartitionProducerState partitionProducerState = new 
PartitionProducerState(singletonList(
+            new ProducerState(
+                producerId,
+                producerEpoch,
+                500,
+                lastTimestamp,
+                coordinatorEpoch,
+                txnStartOffset
+            )
+        ));
+
+        DescribeProducersResult result = 
Mockito.mock(DescribeProducersResult.class);
+        Mockito.when(result.all()).thenReturn(
+            completedFuture(singletonMap(topicPartition, 
partitionProducerState))
+        );
+
+        Mockito.when(admin.describeProducers(
+            Collections.singletonList(topicPartition),
+            new DescribeProducersOptions()
+        )).thenReturn(result);
+    }
+
+    private void expectDescribeTransactions(
+        Map<String, TransactionDescription> descriptions
+    ) {
+        DescribeTransactionsResult result = 
Mockito.mock(DescribeTransactionsResult.class);
+        Mockito.when(result.all()).thenReturn(completedFuture(descriptions));
+        
Mockito.when(admin.describeTransactions(descriptions.keySet())).thenReturn(result);
+    }
+
+    private void expectListTopics(
+        Set<String> topics
+    ) {
+        ListTopicsResult result = Mockito.mock(ListTopicsResult.class);
+        Mockito.when(result.names()).thenReturn(completedFuture(topics));
+        Mockito.when(admin.listTopics()).thenReturn(result);
+    }
+
+    private void expectDescribeTopics(
+        Map<String, TopicDescription> descriptions
+    ) {
+        DescribeTopicsResult result = Mockito.mock(DescribeTopicsResult.class);
+        Mockito.when(result.all()).thenReturn(completedFuture(descriptions));
+        Mockito.when(admin.describeTopics(new 
ArrayList<>(descriptions.keySet()))).thenReturn(result);
+    }
+
+    @Test
+    public void testFindHangingLookupTopicPartitionsForBroker() throws 
Exception {
+        int brokerId = 5;
+
+        String[] args = new String[]{
+            "--bootstrap-server",
+            "localhost:9092",
+            "find-hanging",
+            "--broker-id",
+            String.valueOf(brokerId)
+        };
+
+        String topic = "foo";
+        expectListTopics(singleton(topic));
+
+        Node node0 = new Node(0, "localhost", 9092);
+        Node node1 = new Node(1, "localhost", 9093);
+        Node node5 = new Node(5, "localhost", 9097);
+
+        TopicPartitionInfo partition0 = new TopicPartitionInfo(
+            0,
+            node0,
+            Arrays.asList(node0, node1),
+            Arrays.asList(node0, node1)
+        );
+        TopicPartitionInfo partition1 = new TopicPartitionInfo(
+            1,
+            node1,
+            Arrays.asList(node1, node5),
+            Arrays.asList(node1, node5)
+        );
+
+        TopicDescription description = new TopicDescription(
+            topic,
+            false,
+            Arrays.asList(partition0, partition1)
+        );
+        expectDescribeTopics(singletonMap(topic, description));
+
+        DescribeProducersResult result = 
Mockito.mock(DescribeProducersResult.class);
+        Mockito.when(result.all()).thenReturn(completedFuture(emptyMap()));
+
+        Mockito.when(admin.describeProducers(
+            Collections.singletonList(new TopicPartition(topic, 1)),
+            new DescribeProducersOptions().brokerId(brokerId)
+        )).thenReturn(result);
+
+        execute(args);
+        assertNormalExit();
+
+        List<List<String>> table = readOutputAsTable();
+        assertEquals(1, table.size());
+
+        List<String> expectedHeaders = 
asList(TransactionsCommand.FindHangingTransactionsCommand.HEADERS);
+        assertEquals(expectedHeaders, table.get(0));
+    }
+
+    @Test
+    public void testFindHangingLookupTopicPartitionsForTopic() throws 
Exception {
+        String topic = "foo";
+
+        String[] args = new String[]{
+            "--bootstrap-server",
+            "localhost:9092",
+            "find-hanging",
+            "--topic",
+            topic
+        };
+
+        Node node0 = new Node(0, "localhost", 9092);
+        Node node1 = new Node(1, "localhost", 9093);
+        Node node5 = new Node(5, "localhost", 9097);
+
+        TopicPartitionInfo partition0 = new TopicPartitionInfo(
+            0,
+            node0,
+            Arrays.asList(node0, node1),
+            Arrays.asList(node0, node1)
+        );
+        TopicPartitionInfo partition1 = new TopicPartitionInfo(
+            1,
+            node1,
+            Arrays.asList(node1, node5),
+            Arrays.asList(node1, node5)
+        );
+
+        TopicDescription description = new TopicDescription(
+            topic,
+            false,
+            Arrays.asList(partition0, partition1)
+        );
+        expectDescribeTopics(singletonMap(topic, description));
+
+        DescribeProducersResult result = 
Mockito.mock(DescribeProducersResult.class);
+        Mockito.when(result.all()).thenReturn(completedFuture(emptyMap()));
+
+        Mockito.when(admin.describeProducers(
+            Arrays.asList(new TopicPartition(topic, 0), new 
TopicPartition(topic, 1)),
+            new DescribeProducersOptions()
+        )).thenReturn(result);
+
+        execute(args);
+        assertNormalExit();
+
+        List<List<String>> table = readOutputAsTable();
+        assertEquals(1, table.size());
+
+        List<String> expectedHeaders = 
asList(TransactionsCommand.FindHangingTransactionsCommand.HEADERS);
+        assertEquals(expectedHeaders, table.get(0));
+    }
+
+    @Test
+    public void testFindHangingSpecifiedTopicPartition() throws Exception {
+        TopicPartition topicPartition = new TopicPartition("foo", 5);
+
+        String[] args = new String[]{
+            "--bootstrap-server",
+            "localhost:9092",
+            "find-hanging",
+            "--topic",
+            topicPartition.topic(),
+            "--partition",
+            String.valueOf(topicPartition.partition())
+        };
+
+        long producerId = 132L;
+        short producerEpoch = 5;
+        long lastTimestamp = time.milliseconds();
+        OptionalInt coordinatorEpoch = OptionalInt.of(19);
+        OptionalLong txnStartOffset = OptionalLong.of(29384L);
+
+        expectDescribeProducers(
+            topicPartition,
+            producerId,
+            producerEpoch,
+            lastTimestamp,
+            coordinatorEpoch,
+            txnStartOffset
+        );
+
+        execute(args);
+        assertNormalExit();
+
+        List<List<String>> table = readOutputAsTable();
+        assertEquals(1, table.size());
+
+        List<String> expectedHeaders = 
asList(TransactionsCommand.FindHangingTransactionsCommand.HEADERS);
+        assertEquals(expectedHeaders, table.get(0));
+    }
+
+    @Test
+    public void testFindHangingNoMappedTransactionalId() throws Exception {

Review comment:
       Should we also add a test with no mapped `TransactionDescription`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to