chia7712 commented on code in PR #20301:
URL: https://github.com/apache/kafka/pull/20301#discussion_r2311752554


##########
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java:
##########
@@ -77,22 +86,24 @@ static int mainNoExit(String... args) {
     }
 
     // Visible for testing
-    static void execute(String... args) throws Exception {
-        if (args.length != 5 && args.length != 6) {
-            throw new TerseException("USAGE: java " + 
EndToEndLatency.class.getName()
-                    + " broker_list topic num_messages producer_acks 
message_size_bytes [optional] properties_file");
-        }
+    static void execute(String[] args) throws Exception {
+        String[] processedArgs = convertLegacyArgsIfNeeded(args);
+        EndToEndLatencyCommandOptions opts = new 
EndToEndLatencyCommandOptions(processedArgs);
 
-        String brokers = args[0];
-        String topic = args[1];
-        int numMessages = Integer.parseInt(args[2]);
-        String acks = args[3];
-        int messageSizeBytes = Integer.parseInt(args[4]);
-        Optional<String> propertiesFile = (args.length > 5 && 
!Utils.isBlank(args[5])) ? Optional.of(args[5]) : Optional.empty();
+        // required
+        String brokers = opts.options.valueOf(opts.bootstrapServerOpt);
+        String topic = opts.options.valueOf(opts.topicOpt);
+        int numRecords = opts.options.valueOf(opts.numRecordsOpt);
+        String acks = opts.options.valueOf(opts.acksOpt);
+        int recordValueSize = opts.options.valueOf(opts.recordSizeOpt);
 
-        if (!List.of("1", "all").contains(acks)) {
-            throw new IllegalArgumentException("Latency testing requires 
synchronous acknowledgement. Please use 1 or all");
-        }
+        // optional
+        Optional<String> propertiesFile = 
opts.options.has(opts.commandConfigOpt) ?

Review Comment:
   ```java
   Optional<String> propertiesFile = 
Optional.ofNullable(opts.options.valueOf(opts.commandConfigOpt));
   ```



##########
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java:
##########
@@ -221,4 +299,173 @@ private static KafkaProducer<byte[], byte[]> 
createKafkaProducer(Optional<String
         producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArraySerializer");
         return new KafkaProducer<>(producerProps);
     }
+
+    /**
+     * Converts legacy positional arguments to named arguments for backward 
compatibility.
+     *
+     * @param args the command line arguments to convert
+     * @return converted named arguments
+     * @throws Exception if the legacy arguments are invalid
+     * @deprecated Positional argument usage is deprecated and will be removed 
in Apache Kafka 5.0.
+     *             Use named arguments instead: --bootstrap-server, --topic, 
--num-records, --producer-acks, --record-size, --command-config
+     */
+    @Deprecated(since = "4.2", forRemoval = true)
+    static String[] convertLegacyArgsIfNeeded(String[] args) throws Exception {
+        if (args.length == 0) {
+            return args;
+        }
+
+        boolean hasRequiredNamedArgs = Arrays.stream(args).anyMatch(arg -> 
+            arg.equals("--bootstrap-server") || 
+            arg.equals("--topic") || 
+            arg.equals("--num-records") || 
+            arg.equals("--producer-acks") || 
+            arg.equals("--record-size"));
+        if (hasRequiredNamedArgs) {
+            return args;
+        }
+
+        if (args.length != 5 && args.length != 6) {
+            throw new TerseException("Invalid number of arguments. Expected 5 
or 6 positional arguments, but got " + args.length + ". " +
+                    "Usage: bootstrap-server topic num-records producer-acks 
record-size [optional] command-config");
+        }
+
+        return convertLegacyArgs(args);
+    }
+
+    private static String[] convertLegacyArgs(String[] legacyArgs) {
+        List<String> newArgs = new ArrayList<>();
+
+        // broker_list -> --bootstrap-server
+        newArgs.add("--bootstrap-server");
+        newArgs.add(legacyArgs[0]);
+
+        // topic -> --topic
+        newArgs.add("--topic");
+        newArgs.add(legacyArgs[1]);
+
+        // num_messages -> --num-records
+        newArgs.add("--num-records");
+        newArgs.add(legacyArgs[2]);
+
+        // producer_acks -> --producer-acks
+        newArgs.add("--producer-acks");
+        newArgs.add(legacyArgs[3]);
+
+        // message_size_bytes -> --record-size
+        newArgs.add("--record-size");
+        newArgs.add(legacyArgs[4]);
+
+        // properties_file -> --command-config
+        if (legacyArgs.length == 6) {
+            newArgs.add("--command-config");
+            newArgs.add(legacyArgs[5]);
+        }
+        System.out.println("WARNING: Positional argument usage is deprecated 
and will be removed in Apache Kafka 5.0. " +
+                "Please use named arguments instead: --bootstrap-server, 
--topic, --num-records, --producer-acks, --record-size, --command-config");
+        return newArgs.toArray(new String[0]);
+    }
+
+    public static final class EndToEndLatencyCommandOptions extends 
CommandDefaultOptions {
+        final OptionSpec<String> bootstrapServerOpt;
+        final OptionSpec<String> topicOpt;
+        final OptionSpec<Integer> numRecordsOpt;
+        final OptionSpec<String> acksOpt;
+        final OptionSpec<Integer> recordSizeOpt;
+        final OptionSpec<String> commandConfigOpt;
+        final OptionSpec<Integer> recordKeySizeOpt;
+        final OptionSpec<Integer> recordHeaderValueSizeOpt;
+        final OptionSpec<Integer> recordHeaderKeySizeOpt;
+        final OptionSpec<Integer> numHeadersOpt;
+
+        public EndToEndLatencyCommandOptions(String[] args) {
+            super(args);
+
+            bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED: 
The Kafka broker list string in the form HOST1:PORT1,HOST2:PORT2.")
+                    .withRequiredArg()
+                    .describedAs("bootstrap-server")
+                    .ofType(String.class);
+            topicOpt = parser.accepts("topic", "REQUIRED: The topic to use for 
the test.")
+                    .withRequiredArg()
+                    .describedAs("topic-name")
+                    .ofType(String.class);
+            numRecordsOpt = parser.accepts("num-records", "REQUIRED: The 
number of messages to send.")
+                    .withRequiredArg()
+                    .describedAs("count")
+                    .ofType(Integer.class);
+            acksOpt = parser.accepts("producer-acks", "REQUIRED: Producer 
acknowledgements. Must be '1' or 'all'.")
+                    .withRequiredArg()
+                    .describedAs("producer-acks")
+                    .ofType(String.class);
+            recordSizeOpt = parser.accepts("record-size", "REQUIRED: The size 
of each message payload in bytes.")
+                    .withRequiredArg()
+                    .describedAs("bytes")
+                    .ofType(Integer.class);
+            recordKeySizeOpt = parser.accepts("record-key-size", "Optional: 
The size of the message key in bytes. If not set, messages are sent without a 
key.")
+                    .withOptionalArg()
+                    .describedAs("bytes")
+                    .ofType(Integer.class)
+                    .defaultsTo(0);
+            recordHeaderKeySizeOpt = parser.accepts("record-header-key-size", 
"Optional: The size of the message header key in bytes. Used together with 
record-header-size.")
+                    .withOptionalArg()
+                    .describedAs("bytes")
+                    .ofType(Integer.class)
+                    .defaultsTo(0);
+            recordHeaderValueSizeOpt = parser.accepts("record-header-size", 
"Optional: The size of message header value in bytes. Use -1 for null header 
value.")
+                    .withOptionalArg()
+                    .describedAs("bytes")
+                    .ofType(Integer.class)
+                    .defaultsTo(0);
+            numHeadersOpt = parser.accepts("num-headers", "Optional: The 
number of headers to include in each message.")
+                    .withOptionalArg()
+                    .describedAs("count")
+                    .ofType(Integer.class)
+                    .defaultsTo(0);
+            commandConfigOpt = parser.accepts("command-config", "Optional: A 
property file for Kafka producer/consumer/admin client configuration.")
+                    .withOptionalArg()
+                    .describedAs("config-file")
+                    .ofType(String.class);
+
+            try {
+                options = parser.parse(args);
+            } catch (OptionException e) {
+                CommandLineUtils.printUsageAndExit(parser, e.getMessage());
+            }
+            checkArgs();
+        }
+
+        void checkArgs() {
+            CommandLineUtils.maybePrintHelpOrVersion(this, "This tool measures 
end-to-end latency in Kafka by sending messages and timing their reception.");
+
+            // check required arguments
+            CommandLineUtils.checkRequiredArgs(parser, options, 
bootstrapServerOpt, topicOpt, numRecordsOpt, acksOpt, recordSizeOpt);
+
+            // validate 'producer-acks'
+            String acksValue = options.valueOf(acksOpt);
+            if (!List.of("1", "all").contains(acksValue)) {
+                CommandLineUtils.printUsageAndExit(parser, "Invalid value for 
--producer-acks. Latency testing requires synchronous acknowledgement. Please 
use '1' or 'all'.");
+            }
+
+            // validate for num-records and record-size
+            if (options.valueOf(numRecordsOpt) <= 0) {
+                CommandLineUtils.printUsageAndExit(parser, "Value for 
--num-records must be a positive integer.");
+            }
+            if (options.valueOf(recordSizeOpt) < 0) {
+                CommandLineUtils.printUsageAndExit(parser, "Value for 
--record-size must be a non-negative integer.");
+            }
+
+            if (options.has(recordKeySizeOpt) && 
options.valueOf(recordKeySizeOpt) < 0) {
+                CommandLineUtils.printUsageAndExit(parser, "Value for 
--record-key-size must be a non-negative integer.");
+            }
+            if (options.has(recordHeaderKeySizeOpt) && 
options.valueOf(recordHeaderKeySizeOpt) < 0) {
+                CommandLineUtils.printUsageAndExit(parser, "Value for 
--record-header-key-size must be a non-negative integer.");
+            }
+            if (options.has(recordHeaderValueSizeOpt) && 
options.valueOf(recordHeaderValueSizeOpt) < -1) {

Review Comment:
   Is `options.has(recordHeaderValueSizeOpt)` necessary since 
`recordHeaderValueSizeOpt` has default value?



##########
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java:
##########
@@ -221,4 +299,173 @@ private static KafkaProducer<byte[], byte[]> 
createKafkaProducer(Optional<String
         producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArraySerializer");
         return new KafkaProducer<>(producerProps);
     }
+
+    /**
+     * Converts legacy positional arguments to named arguments for backward 
compatibility.
+     *
+     * @param args the command line arguments to convert
+     * @return converted named arguments
+     * @throws Exception if the legacy arguments are invalid
+     * @deprecated Positional argument usage is deprecated and will be removed 
in Apache Kafka 5.0.
+     *             Use named arguments instead: --bootstrap-server, --topic, 
--num-records, --producer-acks, --record-size, --command-config
+     */
+    @Deprecated(since = "4.2", forRemoval = true)
+    static String[] convertLegacyArgsIfNeeded(String[] args) throws Exception {
+        if (args.length == 0) {
+            return args;
+        }
+
+        boolean hasRequiredNamedArgs = Arrays.stream(args).anyMatch(arg -> 
+            arg.equals("--bootstrap-server") || 
+            arg.equals("--topic") || 
+            arg.equals("--num-records") || 
+            arg.equals("--producer-acks") || 
+            arg.equals("--record-size"));
+        if (hasRequiredNamedArgs) {
+            return args;
+        }
+
+        if (args.length != 5 && args.length != 6) {
+            throw new TerseException("Invalid number of arguments. Expected 5 
or 6 positional arguments, but got " + args.length + ". " +
+                    "Usage: bootstrap-server topic num-records producer-acks 
record-size [optional] command-config");
+        }
+
+        return convertLegacyArgs(args);
+    }
+
+    private static String[] convertLegacyArgs(String[] legacyArgs) {
+        List<String> newArgs = new ArrayList<>();
+
+        // broker_list -> --bootstrap-server
+        newArgs.add("--bootstrap-server");
+        newArgs.add(legacyArgs[0]);
+
+        // topic -> --topic
+        newArgs.add("--topic");
+        newArgs.add(legacyArgs[1]);
+
+        // num_messages -> --num-records
+        newArgs.add("--num-records");
+        newArgs.add(legacyArgs[2]);
+
+        // producer_acks -> --producer-acks
+        newArgs.add("--producer-acks");
+        newArgs.add(legacyArgs[3]);
+
+        // message_size_bytes -> --record-size
+        newArgs.add("--record-size");
+        newArgs.add(legacyArgs[4]);
+
+        // properties_file -> --command-config
+        if (legacyArgs.length == 6) {
+            newArgs.add("--command-config");
+            newArgs.add(legacyArgs[5]);
+        }
+        System.out.println("WARNING: Positional argument usage is deprecated 
and will be removed in Apache Kafka 5.0. " +
+                "Please use named arguments instead: --bootstrap-server, 
--topic, --num-records, --producer-acks, --record-size, --command-config");
+        return newArgs.toArray(new String[0]);
+    }
+
+    public static final class EndToEndLatencyCommandOptions extends 
CommandDefaultOptions {
+        final OptionSpec<String> bootstrapServerOpt;
+        final OptionSpec<String> topicOpt;
+        final OptionSpec<Integer> numRecordsOpt;
+        final OptionSpec<String> acksOpt;
+        final OptionSpec<Integer> recordSizeOpt;
+        final OptionSpec<String> commandConfigOpt;
+        final OptionSpec<Integer> recordKeySizeOpt;
+        final OptionSpec<Integer> recordHeaderValueSizeOpt;
+        final OptionSpec<Integer> recordHeaderKeySizeOpt;
+        final OptionSpec<Integer> numHeadersOpt;
+
+        public EndToEndLatencyCommandOptions(String[] args) {
+            super(args);
+
+            bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED: 
The Kafka broker list string in the form HOST1:PORT1,HOST2:PORT2.")
+                    .withRequiredArg()
+                    .describedAs("bootstrap-server")
+                    .ofType(String.class);
+            topicOpt = parser.accepts("topic", "REQUIRED: The topic to use for 
the test.")
+                    .withRequiredArg()
+                    .describedAs("topic-name")
+                    .ofType(String.class);
+            numRecordsOpt = parser.accepts("num-records", "REQUIRED: The 
number of messages to send.")
+                    .withRequiredArg()
+                    .describedAs("count")
+                    .ofType(Integer.class);
+            acksOpt = parser.accepts("producer-acks", "REQUIRED: Producer 
acknowledgements. Must be '1' or 'all'.")
+                    .withRequiredArg()
+                    .describedAs("producer-acks")
+                    .ofType(String.class);
+            recordSizeOpt = parser.accepts("record-size", "REQUIRED: The size 
of each message payload in bytes.")
+                    .withRequiredArg()
+                    .describedAs("bytes")
+                    .ofType(Integer.class);
+            recordKeySizeOpt = parser.accepts("record-key-size", "Optional: 
The size of the message key in bytes. If not set, messages are sent without a 
key.")
+                    .withOptionalArg()
+                    .describedAs("bytes")
+                    .ofType(Integer.class)
+                    .defaultsTo(0);
+            recordHeaderKeySizeOpt = parser.accepts("record-header-key-size", 
"Optional: The size of the message header key in bytes. Used together with 
record-header-size.")
+                    .withOptionalArg()
+                    .describedAs("bytes")
+                    .ofType(Integer.class)
+                    .defaultsTo(0);
+            recordHeaderValueSizeOpt = parser.accepts("record-header-size", 
"Optional: The size of message header value in bytes. Use -1 for null header 
value.")
+                    .withOptionalArg()
+                    .describedAs("bytes")
+                    .ofType(Integer.class)
+                    .defaultsTo(0);
+            numHeadersOpt = parser.accepts("num-headers", "Optional: The 
number of headers to include in each message.")
+                    .withOptionalArg()
+                    .describedAs("count")
+                    .ofType(Integer.class)
+                    .defaultsTo(0);
+            commandConfigOpt = parser.accepts("command-config", "Optional: A 
property file for Kafka producer/consumer/admin client configuration.")
+                    .withOptionalArg()
+                    .describedAs("config-file")
+                    .ofType(String.class);
+
+            try {
+                options = parser.parse(args);
+            } catch (OptionException e) {
+                CommandLineUtils.printUsageAndExit(parser, e.getMessage());
+            }
+            checkArgs();
+        }
+
+        void checkArgs() {
+            CommandLineUtils.maybePrintHelpOrVersion(this, "This tool measures 
end-to-end latency in Kafka by sending messages and timing their reception.");
+
+            // check required arguments
+            CommandLineUtils.checkRequiredArgs(parser, options, 
bootstrapServerOpt, topicOpt, numRecordsOpt, acksOpt, recordSizeOpt);
+
+            // validate 'producer-acks'
+            String acksValue = options.valueOf(acksOpt);
+            if (!List.of("1", "all").contains(acksValue)) {
+                CommandLineUtils.printUsageAndExit(parser, "Invalid value for 
--producer-acks. Latency testing requires synchronous acknowledgement. Please 
use '1' or 'all'.");
+            }
+
+            // validate for num-records and record-size
+            if (options.valueOf(numRecordsOpt) <= 0) {
+                CommandLineUtils.printUsageAndExit(parser, "Value for 
--num-records must be a positive integer.");
+            }
+            if (options.valueOf(recordSizeOpt) < 0) {
+                CommandLineUtils.printUsageAndExit(parser, "Value for 
--record-size must be a non-negative integer.");
+            }
+
+            if (options.has(recordKeySizeOpt) && 
options.valueOf(recordKeySizeOpt) < 0) {

Review Comment:
   ditto for `options.has(recordKeySizeOpt)`



##########
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java:
##########
@@ -221,4 +299,173 @@ private static KafkaProducer<byte[], byte[]> 
createKafkaProducer(Optional<String
         producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArraySerializer");
         return new KafkaProducer<>(producerProps);
     }
+
+    /**
+     * Converts legacy positional arguments to named arguments for backward 
compatibility.
+     *
+     * @param args the command line arguments to convert
+     * @return converted named arguments
+     * @throws Exception if the legacy arguments are invalid
+     * @deprecated Positional argument usage is deprecated and will be removed 
in Apache Kafka 5.0.
+     *             Use named arguments instead: --bootstrap-server, --topic, 
--num-records, --producer-acks, --record-size, --command-config
+     */
+    @Deprecated(since = "4.2", forRemoval = true)
+    static String[] convertLegacyArgsIfNeeded(String[] args) throws Exception {
+        if (args.length == 0) {
+            return args;
+        }
+
+        boolean hasRequiredNamedArgs = Arrays.stream(args).anyMatch(arg -> 
+            arg.equals("--bootstrap-server") || 
+            arg.equals("--topic") || 
+            arg.equals("--num-records") || 
+            arg.equals("--producer-acks") || 
+            arg.equals("--record-size"));
+        if (hasRequiredNamedArgs) {
+            return args;
+        }
+
+        if (args.length != 5 && args.length != 6) {
+            throw new TerseException("Invalid number of arguments. Expected 5 
or 6 positional arguments, but got " + args.length + ". " +
+                    "Usage: bootstrap-server topic num-records producer-acks 
record-size [optional] command-config");
+        }
+
+        return convertLegacyArgs(args);
+    }
+
+    private static String[] convertLegacyArgs(String[] legacyArgs) {
+        List<String> newArgs = new ArrayList<>();
+
+        // broker_list -> --bootstrap-server
+        newArgs.add("--bootstrap-server");
+        newArgs.add(legacyArgs[0]);
+
+        // topic -> --topic
+        newArgs.add("--topic");
+        newArgs.add(legacyArgs[1]);
+
+        // num_messages -> --num-records
+        newArgs.add("--num-records");
+        newArgs.add(legacyArgs[2]);
+
+        // producer_acks -> --producer-acks
+        newArgs.add("--producer-acks");
+        newArgs.add(legacyArgs[3]);
+
+        // message_size_bytes -> --record-size
+        newArgs.add("--record-size");
+        newArgs.add(legacyArgs[4]);
+
+        // properties_file -> --command-config
+        if (legacyArgs.length == 6) {
+            newArgs.add("--command-config");
+            newArgs.add(legacyArgs[5]);
+        }
+        System.out.println("WARNING: Positional argument usage is deprecated 
and will be removed in Apache Kafka 5.0. " +
+                "Please use named arguments instead: --bootstrap-server, 
--topic, --num-records, --producer-acks, --record-size, --command-config");
+        return newArgs.toArray(new String[0]);
+    }
+
+    public static final class EndToEndLatencyCommandOptions extends 
CommandDefaultOptions {
+        final OptionSpec<String> bootstrapServerOpt;
+        final OptionSpec<String> topicOpt;
+        final OptionSpec<Integer> numRecordsOpt;
+        final OptionSpec<String> acksOpt;
+        final OptionSpec<Integer> recordSizeOpt;
+        final OptionSpec<String> commandConfigOpt;
+        final OptionSpec<Integer> recordKeySizeOpt;
+        final OptionSpec<Integer> recordHeaderValueSizeOpt;
+        final OptionSpec<Integer> recordHeaderKeySizeOpt;
+        final OptionSpec<Integer> numHeadersOpt;
+
+        public EndToEndLatencyCommandOptions(String[] args) {
+            super(args);
+
+            bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED: 
The Kafka broker list string in the form HOST1:PORT1,HOST2:PORT2.")
+                    .withRequiredArg()
+                    .describedAs("bootstrap-server")
+                    .ofType(String.class);
+            topicOpt = parser.accepts("topic", "REQUIRED: The topic to use for 
the test.")
+                    .withRequiredArg()
+                    .describedAs("topic-name")
+                    .ofType(String.class);
+            numRecordsOpt = parser.accepts("num-records", "REQUIRED: The 
number of messages to send.")
+                    .withRequiredArg()
+                    .describedAs("count")
+                    .ofType(Integer.class);
+            acksOpt = parser.accepts("producer-acks", "REQUIRED: Producer 
acknowledgements. Must be '1' or 'all'.")
+                    .withRequiredArg()
+                    .describedAs("producer-acks")
+                    .ofType(String.class);
+            recordSizeOpt = parser.accepts("record-size", "REQUIRED: The size 
of each message payload in bytes.")
+                    .withRequiredArg()
+                    .describedAs("bytes")
+                    .ofType(Integer.class);
+            recordKeySizeOpt = parser.accepts("record-key-size", "Optional: 
The size of the message key in bytes. If not set, messages are sent without a 
key.")
+                    .withOptionalArg()
+                    .describedAs("bytes")
+                    .ofType(Integer.class)
+                    .defaultsTo(0);
+            recordHeaderKeySizeOpt = parser.accepts("record-header-key-size", 
"Optional: The size of the message header key in bytes. Used together with 
record-header-size.")
+                    .withOptionalArg()
+                    .describedAs("bytes")
+                    .ofType(Integer.class)
+                    .defaultsTo(0);
+            recordHeaderValueSizeOpt = parser.accepts("record-header-size", 
"Optional: The size of message header value in bytes. Use -1 for null header 
value.")
+                    .withOptionalArg()
+                    .describedAs("bytes")
+                    .ofType(Integer.class)
+                    .defaultsTo(0);
+            numHeadersOpt = parser.accepts("num-headers", "Optional: The 
number of headers to include in each message.")
+                    .withOptionalArg()
+                    .describedAs("count")
+                    .ofType(Integer.class)
+                    .defaultsTo(0);
+            commandConfigOpt = parser.accepts("command-config", "Optional: A 
property file for Kafka producer/consumer/admin client configuration.")
+                    .withOptionalArg()
+                    .describedAs("config-file")
+                    .ofType(String.class);
+
+            try {
+                options = parser.parse(args);
+            } catch (OptionException e) {
+                CommandLineUtils.printUsageAndExit(parser, e.getMessage());
+            }
+            checkArgs();
+        }
+
+        void checkArgs() {
+            CommandLineUtils.maybePrintHelpOrVersion(this, "This tool measures 
end-to-end latency in Kafka by sending messages and timing their reception.");
+
+            // check required arguments
+            CommandLineUtils.checkRequiredArgs(parser, options, 
bootstrapServerOpt, topicOpt, numRecordsOpt, acksOpt, recordSizeOpt);
+
+            // validate 'producer-acks'
+            String acksValue = options.valueOf(acksOpt);
+            if (!List.of("1", "all").contains(acksValue)) {
+                CommandLineUtils.printUsageAndExit(parser, "Invalid value for 
--producer-acks. Latency testing requires synchronous acknowledgement. Please 
use '1' or 'all'.");
+            }
+
+            // validate for num-records and record-size
+            if (options.valueOf(numRecordsOpt) <= 0) {
+                CommandLineUtils.printUsageAndExit(parser, "Value for 
--num-records must be a positive integer.");
+            }
+            if (options.valueOf(recordSizeOpt) < 0) {
+                CommandLineUtils.printUsageAndExit(parser, "Value for 
--record-size must be a non-negative integer.");
+            }
+
+            if (options.has(recordKeySizeOpt) && 
options.valueOf(recordKeySizeOpt) < 0) {
+                CommandLineUtils.printUsageAndExit(parser, "Value for 
--record-key-size must be a non-negative integer.");
+            }
+            if (options.has(recordHeaderKeySizeOpt) && 
options.valueOf(recordHeaderKeySizeOpt) < 0) {

Review Comment:
   ditto for `options.has(recordHeaderKeySizeOpt) `



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to