mimaison commented on code in PR #13136:
URL: https://github.com/apache/kafka/pull/13136#discussion_r1085565198


##########
checkstyle/import-control.xml:
##########
@@ -347,7 +347,7 @@
 
   <subpackage name="server">
     <allow pkg="org.apache.kafka.common" />
-    <allow pkg="joptsimple" />

Review Comment:
   We seem to have the trailing space everywhere else, so maybe keep it here too



##########
core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala:
##########
@@ -801,21 +801,15 @@ object ConsumerGroupCommand extends Logging {
         partitionsToReset.map { topicPartition =>
           logStartOffsets.get(topicPartition) match {
             case Some(LogOffsetResult.LogOffset(offset)) => (topicPartition, 
new OffsetAndMetadata(offset))
-            case _ => {
-              CommandLineUtils.printUsageAndDie(opts.parser, s"Error getting 
starting offset of topic partition: $topicPartition")
-              Exit.exit(1)
-            }
+            case _ => ToolsUtils.printUsageAndDie(opts.parser, s"Error getting 
starting offset of topic partition: $topicPartition")

Review Comment:
   Can you remove other changes not related to JmxTool? 



##########
tools/src/main/java/org/apache/kafka/tools/JmxCommand.java:
##########
@@ -0,0 +1,441 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import joptsimple.OptionSpec;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+
+import javax.management.Attribute;
+import javax.management.AttributeList;
+import javax.management.MBeanFeatureInfo;
+import javax.management.MBeanInfo;
+import javax.management.MBeanServerConnection;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import javax.management.remote.JMXConnector;
+import javax.management.remote.JMXConnectorFactory;
+import javax.management.remote.JMXServiceURL;
+import javax.rmi.ssl.SslRMIClientSocketFactory;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiPredicate;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * A program for reading JMX metrics from a given endpoint.
+ * <p>
+ * This tool only works reliably if the JmxServer is fully initialized prior 
to invoking the tool.
+ * See KAFKA-4620 for details.
+ */
+public class JmxCommand {
+    public static void main(String[] args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            execute(args);
+            return 0;
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            return 1;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    static void execute(String... args) throws Exception {
+        JmxCommandOptions options = new JmxCommandOptions(args);
+        CommandLineUtils.printHelpAndExitIfNeeded(options, "Dump JMX values to 
standard output.");
+
+        Optional<String[]> attributesInclude = options.attributesInclude();
+        Optional<DateFormat> dateFormat = options.dateFormat();
+        String reportFormat = options.parseFormat();
+        boolean keepGoing = true;
+
+        MBeanServerConnection conn = connectToBeanServer(options);
+        List<ObjectName> queries = options.queries();
+        boolean hasPatternQueries = 
queries.stream().filter(Objects::nonNull).anyMatch(ObjectName::isPattern);
+
+        Set<ObjectName> found = findObjectsIfNoPattern(options, conn, queries, 
hasPatternQueries);
+        Map<ObjectName, Integer> numExpectedAttributes =
+                findNumExpectedAttributes(conn, attributesInclude, 
hasPatternQueries, queries, found);
+
+        List<String> keys = new ArrayList<>();
+        keys.add("time");
+        keys.addAll(new TreeSet<>(queryAttributes(conn, found, 
attributesInclude).keySet()));
+        maybePrintCsvHeader(reportFormat, keys, numExpectedAttributes);
+
+        while (keepGoing) {
+            long start = System.currentTimeMillis();
+            Map<String, Object> attributes = queryAttributes(conn, found, 
attributesInclude);
+            attributes.put("time", dateFormat.isPresent() ? 
dateFormat.get().format(new Date()) : 
String.valueOf(System.currentTimeMillis()));
+            maybePrintDataRows(reportFormat, numExpectedAttributes, keys, 
attributes);
+            if (options.isOneTime()) {
+                keepGoing = false;
+            } else {
+                TimeUnit.MILLISECONDS.sleep(Math.max(0, options.interval() - 
(System.currentTimeMillis() - start)));
+            }
+        }
+    }
+
+    private static String mkString(Stream<Object> stream, String delimeter) {
+        return 
stream.filter(Objects::nonNull).map(Object::toString).collect(Collectors.joining(delimeter));
+    }
+
+    private static int sumValues(Map<ObjectName, Integer> 
numExpectedAttributes) {
+        return 
numExpectedAttributes.values().stream().mapToInt(Integer::intValue).sum();
+    }
+
+    private static String[] attributesNames(MBeanInfo mBeanInfo) {
+        return 
Arrays.stream(mBeanInfo.getAttributes()).map(MBeanFeatureInfo::getName).toArray(String[]::new);
+    }
+
+    private static MBeanServerConnection connectToBeanServer(JmxCommandOptions 
options) throws Exception {
+        JMXConnector connector;
+        MBeanServerConnection serverConn = null;
+        boolean connected = false;
+        long connectTimeoutMs = 10_000;
+        long connectTestStarted = System.currentTimeMillis();
+        do {
+            try {
+                // printing to stderr because system tests parse the output
+                System.err.printf("Trying to connect to JMX url: %s%n", 
options.jmxServiceURL());
+                Map<String, Object> env = new HashMap<>();
+                // ssl enable
+                if (options.hasJmxSslEnableOpt()) {
+                    env.put("com.sun.jndi.rmi.factory.socket", new 
SslRMIClientSocketFactory());
+                }
+                // password authentication enable
+                if (options.hasJmxAuthPropOpt()) {
+                    env.put(JMXConnector.CREDENTIALS, options.credentials());
+                }
+                connector = 
JMXConnectorFactory.connect(options.jmxServiceURL(), env);
+                serverConn = connector.getMBeanServerConnection();
+                connected = true;
+            } catch (Exception e) {
+                System.err.printf("Could not connect to JMX url: %s. 
Exception: %s.%n",
+                        options.jmxServiceURL(), e.getMessage());
+                e.printStackTrace();
+                TimeUnit.MILLISECONDS.sleep(100);
+            }
+        } while (System.currentTimeMillis() - connectTestStarted < 
connectTimeoutMs && !connected);
+
+        if (!connected) {
+            throw new TerseException(String.format("Could not connect to JMX 
url %s after %d ms.",
+                    options.jmxServiceURL(), connectTimeoutMs));
+        }
+        return serverConn;
+    }
+
+    private static Set<ObjectName> findObjectsIfNoPattern(JmxCommandOptions 
options,
+                                                          
MBeanServerConnection conn,
+                                                          List<ObjectName> 
queries,
+                                                          boolean 
hasPatternQueries) throws Exception {
+        long waitTimeoutMs = 10_000;
+        Set<ObjectName> result = new HashSet<>();
+        Set<ObjectName> querySet = new HashSet<>(queries);
+        BiPredicate<Set<ObjectName>, Set<ObjectName>> foundAllObjects = (s1, 
s2) -> s1.containsAll(s2);
+        if (!hasPatternQueries) {
+            long start = System.currentTimeMillis();
+            do {
+                if (!result.isEmpty()) {
+                    System.err.println("Could not find all object names, 
retrying");
+                    TimeUnit.MILLISECONDS.sleep(100);
+                }
+                result.addAll(queryObjects(conn, queries));
+            } while (options.hasWait() && System.currentTimeMillis() - start < 
waitTimeoutMs && !foundAllObjects.test(querySet, result));
+        }
+
+        if (options.hasWait() && !foundAllObjects.test(querySet, result)) {
+            querySet.removeAll(result);
+            String missing = mkString(querySet.stream().map(Object::toString), 
",");
+            throw new TerseException(String.format("Could not find all 
requested object names after %d ms. Missing %s", waitTimeoutMs, missing));
+        }
+        return result;
+    }
+
+    private static Set<ObjectName> queryObjects(MBeanServerConnection conn,
+                                                List<ObjectName> queries) {
+        Set<ObjectName> result = new HashSet<>();
+        queries.forEach(name -> {
+            try {
+                result.addAll(conn.queryNames(name, null));
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        });
+        return result;
+    }
+
+    private static Map<ObjectName, Integer> 
findNumExpectedAttributes(MBeanServerConnection conn,
+                                                                      
Optional<String[]> attributesInclude,
+                                                                      boolean 
hasPatternQueries,
+                                                                      
List<ObjectName> queries,
+                                                                      
Set<ObjectName> found) throws Exception {
+        Map<ObjectName, Integer> result = new HashMap<>();
+        if (!attributesInclude.isPresent()) {
+            found.forEach(objectName -> {
+                try {
+                    MBeanInfo mBeanInfo = conn.getMBeanInfo(objectName);
+                    result.put(objectName, conn.getAttributes(objectName, 
attributesNames(mBeanInfo)).size());
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            });
+        } else {
+            if (!hasPatternQueries) {
+                found.forEach(objectName -> {
+                    try {
+                        MBeanInfo mBeanInfo = conn.getMBeanInfo(objectName);
+                        AttributeList attributes = 
conn.getAttributes(objectName, attributesNames(mBeanInfo));
+                        List<ObjectName> expectedAttributes = new 
ArrayList<>();
+                        attributes.asList().forEach(attribute -> {
+                            if 
(Arrays.asList(attributesInclude.get()).contains(attribute.getName())) {
+                                expectedAttributes.add(objectName);
+                            }
+                        });
+                        if (expectedAttributes.size() > 0) {
+                            result.put(objectName, expectedAttributes.size());
+                        }
+                    } catch (Exception e) {
+                        throw new RuntimeException(e);
+                    }
+                });
+            } else {
+                queries.forEach(objectName -> result.put(objectName, 
attributesInclude.get().length));
+            }
+        }
+
+        if (result.isEmpty()) {
+            throw new TerseException(String.format("No matched attributes for 
the queried objects %s.", queries));
+        }
+        return result;
+    }
+
+    private static Map<String, Object> queryAttributes(MBeanServerConnection 
conn,
+                                                       Set<ObjectName> 
objectNames,
+                                                       Optional<String[]> 
attributesInclude) throws Exception {
+        Map<String, Object> result = new HashMap<>();
+        for (ObjectName objectName : objectNames) {
+            MBeanInfo beanInfo = conn.getMBeanInfo(objectName);
+            AttributeList attributes = conn.getAttributes(objectName,
+                    Arrays.stream(beanInfo.getAttributes()).map(a -> 
a.getName()).toArray(String[]::new));
+            for (Attribute attribute : attributes.asList()) {
+                if (attributesInclude.isPresent()) {
+                    if 
(Arrays.asList(attributesInclude.get()).contains(attribute.getName())) {
+                        result.put(String.format("%s:%s", 
objectName.toString(), attribute.getName()),
+                                attribute.getValue());
+                    }
+                } else {
+                    result.put(String.format("%s:%s", objectName.toString(), 
attribute.getName()),
+                            attribute.getValue());
+                }
+            }
+        }
+        return result;
+    }
+
+    private static void maybePrintCsvHeader(String reportFormat, List<String> 
keys, Map<ObjectName, Integer> numExpectedAttributes) {
+        if (reportFormat.equals("original") && keys.size() == 
sumValues(numExpectedAttributes) + 1) {
+            System.out.println(mkString(keys.stream().map(key -> 
String.format("\"%s\"", key)), ","));
+        }
+    }
+
+    private static void maybePrintDataRows(String reportFormat, 
Map<ObjectName, Integer> numExpectedAttributes, List<String> keys, Map<String, 
Object> attributes) {
+        if (attributes.keySet().size() == sumValues(numExpectedAttributes) + 
1) {

Review Comment:
   We should be able to do `attributes.size()` instead of 
`attributes.keySet().size()`



##########
tools/src/main/java/org/apache/kafka/tools/JmxCommand.java:
##########
@@ -0,0 +1,441 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import joptsimple.OptionSpec;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+
+import javax.management.Attribute;
+import javax.management.AttributeList;
+import javax.management.MBeanFeatureInfo;
+import javax.management.MBeanInfo;
+import javax.management.MBeanServerConnection;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import javax.management.remote.JMXConnector;
+import javax.management.remote.JMXConnectorFactory;
+import javax.management.remote.JMXServiceURL;
+import javax.rmi.ssl.SslRMIClientSocketFactory;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiPredicate;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * A program for reading JMX metrics from a given endpoint.
+ * <p>
+ * This tool only works reliably if the JmxServer is fully initialized prior 
to invoking the tool.
+ * See KAFKA-4620 for details.
+ */
+public class JmxCommand {
+    public static void main(String[] args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            execute(args);
+            return 0;
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            return 1;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    static void execute(String... args) throws Exception {
+        JmxCommandOptions options = new JmxCommandOptions(args);
+        CommandLineUtils.printHelpAndExitIfNeeded(options, "Dump JMX values to 
standard output.");
+
+        Optional<String[]> attributesInclude = options.attributesInclude();
+        Optional<DateFormat> dateFormat = options.dateFormat();
+        String reportFormat = options.parseFormat();
+        boolean keepGoing = true;
+
+        MBeanServerConnection conn = connectToBeanServer(options);
+        List<ObjectName> queries = options.queries();
+        boolean hasPatternQueries = 
queries.stream().filter(Objects::nonNull).anyMatch(ObjectName::isPattern);
+
+        Set<ObjectName> found = findObjectsIfNoPattern(options, conn, queries, 
hasPatternQueries);
+        Map<ObjectName, Integer> numExpectedAttributes =
+                findNumExpectedAttributes(conn, attributesInclude, 
hasPatternQueries, queries, found);
+
+        List<String> keys = new ArrayList<>();
+        keys.add("time");
+        keys.addAll(new TreeSet<>(queryAttributes(conn, found, 
attributesInclude).keySet()));
+        maybePrintCsvHeader(reportFormat, keys, numExpectedAttributes);
+
+        while (keepGoing) {
+            long start = System.currentTimeMillis();
+            Map<String, Object> attributes = queryAttributes(conn, found, 
attributesInclude);
+            attributes.put("time", dateFormat.isPresent() ? 
dateFormat.get().format(new Date()) : 
String.valueOf(System.currentTimeMillis()));
+            maybePrintDataRows(reportFormat, numExpectedAttributes, keys, 
attributes);
+            if (options.isOneTime()) {
+                keepGoing = false;
+            } else {
+                TimeUnit.MILLISECONDS.sleep(Math.max(0, options.interval() - 
(System.currentTimeMillis() - start)));
+            }
+        }
+    }
+
+    private static String mkString(Stream<Object> stream, String delimeter) {
+        return 
stream.filter(Objects::nonNull).map(Object::toString).collect(Collectors.joining(delimeter));
+    }
+
+    private static int sumValues(Map<ObjectName, Integer> 
numExpectedAttributes) {
+        return 
numExpectedAttributes.values().stream().mapToInt(Integer::intValue).sum();
+    }
+
+    private static String[] attributesNames(MBeanInfo mBeanInfo) {
+        return 
Arrays.stream(mBeanInfo.getAttributes()).map(MBeanFeatureInfo::getName).toArray(String[]::new);
+    }
+
+    private static MBeanServerConnection connectToBeanServer(JmxCommandOptions 
options) throws Exception {
+        JMXConnector connector;
+        MBeanServerConnection serverConn = null;
+        boolean connected = false;
+        long connectTimeoutMs = 10_000;
+        long connectTestStarted = System.currentTimeMillis();
+        do {
+            try {
+                // printing to stderr because system tests parse the output
+                System.err.printf("Trying to connect to JMX url: %s%n", 
options.jmxServiceURL());
+                Map<String, Object> env = new HashMap<>();
+                // ssl enable
+                if (options.hasJmxSslEnableOpt()) {
+                    env.put("com.sun.jndi.rmi.factory.socket", new 
SslRMIClientSocketFactory());
+                }
+                // password authentication enable
+                if (options.hasJmxAuthPropOpt()) {
+                    env.put(JMXConnector.CREDENTIALS, options.credentials());
+                }
+                connector = 
JMXConnectorFactory.connect(options.jmxServiceURL(), env);
+                serverConn = connector.getMBeanServerConnection();
+                connected = true;
+            } catch (Exception e) {
+                System.err.printf("Could not connect to JMX url: %s. 
Exception: %s.%n",
+                        options.jmxServiceURL(), e.getMessage());
+                e.printStackTrace();
+                TimeUnit.MILLISECONDS.sleep(100);
+            }
+        } while (System.currentTimeMillis() - connectTestStarted < 
connectTimeoutMs && !connected);
+
+        if (!connected) {
+            throw new TerseException(String.format("Could not connect to JMX 
url %s after %d ms.",
+                    options.jmxServiceURL(), connectTimeoutMs));
+        }
+        return serverConn;
+    }
+
+    private static Set<ObjectName> findObjectsIfNoPattern(JmxCommandOptions 
options,
+                                                          
MBeanServerConnection conn,
+                                                          List<ObjectName> 
queries,
+                                                          boolean 
hasPatternQueries) throws Exception {
+        long waitTimeoutMs = 10_000;
+        Set<ObjectName> result = new HashSet<>();
+        Set<ObjectName> querySet = new HashSet<>(queries);
+        BiPredicate<Set<ObjectName>, Set<ObjectName>> foundAllObjects = (s1, 
s2) -> s1.containsAll(s2);
+        if (!hasPatternQueries) {
+            long start = System.currentTimeMillis();
+            do {
+                if (!result.isEmpty()) {
+                    System.err.println("Could not find all object names, 
retrying");
+                    TimeUnit.MILLISECONDS.sleep(100);
+                }
+                result.addAll(queryObjects(conn, queries));
+            } while (options.hasWait() && System.currentTimeMillis() - start < 
waitTimeoutMs && !foundAllObjects.test(querySet, result));
+        }
+
+        if (options.hasWait() && !foundAllObjects.test(querySet, result)) {
+            querySet.removeAll(result);
+            String missing = mkString(querySet.stream().map(Object::toString), 
",");
+            throw new TerseException(String.format("Could not find all 
requested object names after %d ms. Missing %s", waitTimeoutMs, missing));
+        }
+        return result;
+    }
+
+    private static Set<ObjectName> queryObjects(MBeanServerConnection conn,
+                                                List<ObjectName> queries) {
+        Set<ObjectName> result = new HashSet<>();
+        queries.forEach(name -> {
+            try {
+                result.addAll(conn.queryNames(name, null));
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        });
+        return result;
+    }
+
+    private static Map<ObjectName, Integer> 
findNumExpectedAttributes(MBeanServerConnection conn,
+                                                                      
Optional<String[]> attributesInclude,
+                                                                      boolean 
hasPatternQueries,
+                                                                      
List<ObjectName> queries,
+                                                                      
Set<ObjectName> found) throws Exception {
+        Map<ObjectName, Integer> result = new HashMap<>();
+        if (!attributesInclude.isPresent()) {
+            found.forEach(objectName -> {
+                try {
+                    MBeanInfo mBeanInfo = conn.getMBeanInfo(objectName);
+                    result.put(objectName, conn.getAttributes(objectName, 
attributesNames(mBeanInfo)).size());
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            });
+        } else {
+            if (!hasPatternQueries) {
+                found.forEach(objectName -> {
+                    try {
+                        MBeanInfo mBeanInfo = conn.getMBeanInfo(objectName);
+                        AttributeList attributes = 
conn.getAttributes(objectName, attributesNames(mBeanInfo));
+                        List<ObjectName> expectedAttributes = new 
ArrayList<>();
+                        attributes.asList().forEach(attribute -> {
+                            if 
(Arrays.asList(attributesInclude.get()).contains(attribute.getName())) {
+                                expectedAttributes.add(objectName);
+                            }
+                        });
+                        if (expectedAttributes.size() > 0) {
+                            result.put(objectName, expectedAttributes.size());
+                        }
+                    } catch (Exception e) {
+                        throw new RuntimeException(e);
+                    }
+                });
+            } else {
+                queries.forEach(objectName -> result.put(objectName, 
attributesInclude.get().length));
+            }
+        }
+
+        if (result.isEmpty()) {
+            throw new TerseException(String.format("No matched attributes for 
the queried objects %s.", queries));
+        }
+        return result;
+    }
+
+    private static Map<String, Object> queryAttributes(MBeanServerConnection 
conn,
+                                                       Set<ObjectName> 
objectNames,
+                                                       Optional<String[]> 
attributesInclude) throws Exception {
+        Map<String, Object> result = new HashMap<>();
+        for (ObjectName objectName : objectNames) {
+            MBeanInfo beanInfo = conn.getMBeanInfo(objectName);
+            AttributeList attributes = conn.getAttributes(objectName,
+                    Arrays.stream(beanInfo.getAttributes()).map(a -> 
a.getName()).toArray(String[]::new));
+            for (Attribute attribute : attributes.asList()) {
+                if (attributesInclude.isPresent()) {
+                    if 
(Arrays.asList(attributesInclude.get()).contains(attribute.getName())) {
+                        result.put(String.format("%s:%s", 
objectName.toString(), attribute.getName()),
+                                attribute.getValue());
+                    }
+                } else {
+                    result.put(String.format("%s:%s", objectName.toString(), 
attribute.getName()),
+                            attribute.getValue());
+                }
+            }
+        }
+        return result;
+    }
+
+    private static void maybePrintCsvHeader(String reportFormat, List<String> 
keys, Map<ObjectName, Integer> numExpectedAttributes) {
+        if (reportFormat.equals("original") && keys.size() == 
sumValues(numExpectedAttributes) + 1) {
+            System.out.println(mkString(keys.stream().map(key -> 
String.format("\"%s\"", key)), ","));
+        }
+    }
+
+    private static void maybePrintDataRows(String reportFormat, 
Map<ObjectName, Integer> numExpectedAttributes, List<String> keys, Map<String, 
Object> attributes) {
+        if (attributes.keySet().size() == sumValues(numExpectedAttributes) + 
1) {
+            switch (reportFormat) {
+                case "properties":
+                    keys.forEach(key -> 
System.out.println(String.format("%s=%s", key, attributes.get(key))));
+                    break;
+                case "csv":
+                    keys.forEach(key -> 
System.out.println(String.format("%s,\"%s\"", key, attributes.get(key))));
+                    break;
+                case "tsv":
+                    keys.forEach(key -> 
System.out.println(String.format("%s\t%s", key, attributes.get(key))));
+                    break;
+                default:
+                    
System.out.println(mkString(keys.stream().map(attributes::get), ","));
+                    break;
+            }
+        }
+    }
+
+    private static class JmxCommandOptions extends CommandDefaultOptions {
+        private final OptionSpec<String> objectNameOpt;
+        private final OptionSpec<String> attributesOpt;
+        private final OptionSpec<Integer> reportingIntervalOpt;
+        private final OptionSpec<Boolean> oneTimeOpt;
+        private final OptionSpec<String> dateFormatOpt;
+        private final OptionSpec<String> jmxServiceUrlOpt;
+        private final OptionSpec<String> reportFormatOpt;
+        private final OptionSpec<String> jmxAuthPropOpt;
+        private final OptionSpec<Boolean> jmxSslEnableOpt;
+        private final OptionSpec<Void> waitOpt;
+
+        public JmxCommandOptions(String[] args) {
+            super(args);
+            objectNameOpt = parser.accepts("object-name", "A JMX object name 
to use as a query. This can contain wild cards, and this option " +
+                            "can be given multiple times to specify more than 
one query. If no objects are specified " +
+                            "all objects will be queried.")
+                    .withOptionalArg()

Review Comment:
   The initial tool had `withRequiredArg`. Why are we changing it?
   
   Same for a few more arguments below



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to