chia7712 commented on code in PR #20301:
URL: https://github.com/apache/kafka/pull/20301#discussion_r2297895392


##########
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java:
##########
@@ -221,4 +290,173 @@ private static KafkaProducer<byte[], byte[]> 
createKafkaProducer(Optional<String
         producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArraySerializer");
         return new KafkaProducer<>(producerProps);
     }
+
+    /**
+     * Converts legacy positional arguments to named arguments for backward 
compatibility.
+     *
+     * @param args the command line arguments to convert
+     * @return converted named arguments
+     * @throws Exception if the legacy arguments are invalid
+     * @deprecated Positional argument usage is deprecated and will be removed 
in Apache Kafka 5.0.
+     *             Use named arguments instead: --bootstrap-server, --topic, 
--num-records, --producer-acks, --record-size, --command-config
+     */
+    @Deprecated(since = "4.2", forRemoval = true)
+    static String[] convertLegacyArgsIfNeeded(String[] args) throws Exception {
+        if (args.length == 0) {
+            return args;
+        }
+
+        boolean hasRequiredNamedArgs = Arrays.stream(args).anyMatch(arg -> 
+            arg.equals("--bootstrap-server") || 
+            arg.equals("--topic") || 
+            arg.equals("--num-records") || 
+            arg.equals("--producer-acks") || 
+            arg.equals("--record-size"));
+        if (hasRequiredNamedArgs) {
+            return args;
+        }
+
+        if (args.length != 5 && args.length != 6) {
+            throw new TerseException("Invalid number of arguments. Expected 5 
or 6 positional arguments, but got " + args.length + ". " +
+                    "Usage: bootstrap-server topic num-records producer-acks 
record-size [optional] command-config");
+        }
+
+        return convertLegacyArgs(args);
+    }
+
+    private static String[] convertLegacyArgs(String[] legacyArgs) {
+        List<String> newArgs = new ArrayList<>();
+
+        // broker_list -> --bootstrap-server
+        newArgs.add("--bootstrap-server");
+        newArgs.add(legacyArgs[0]);
+
+        // topic -> --topic
+        newArgs.add("--topic");
+        newArgs.add(legacyArgs[1]);
+
+        // num_messages -> --num-records
+        newArgs.add("--num-records");
+        newArgs.add(legacyArgs[2]);
+
+        // producer_acks -> --producer-acks
+        newArgs.add("--producer-acks");
+        newArgs.add(legacyArgs[3]);
+
+        // message_size_bytes -> --record-size
+        newArgs.add("--record-size");
+        newArgs.add(legacyArgs[4]);
+
+        // properties_file -> --command-config
+        if (legacyArgs.length == 6 && !legacyArgs[5].trim().isEmpty()) {

Review Comment:
   Excuse me, why to add `!legacyArgs[5].trim().isEmpty()` condition?



##########
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java:
##########
@@ -221,4 +290,173 @@ private static KafkaProducer<byte[], byte[]> 
createKafkaProducer(Optional<String
         producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArraySerializer");
         return new KafkaProducer<>(producerProps);
     }
+
+    /**
+     * Converts legacy positional arguments to named arguments for backward 
compatibility.
+     *
+     * @param args the command line arguments to convert
+     * @return converted named arguments
+     * @throws Exception if the legacy arguments are invalid
+     * @deprecated Positional argument usage is deprecated and will be removed 
in Apache Kafka 5.0.
+     *             Use named arguments instead: --bootstrap-server, --topic, 
--num-records, --producer-acks, --record-size, --command-config
+     */
+    @Deprecated(since = "4.2", forRemoval = true)
+    static String[] convertLegacyArgsIfNeeded(String[] args) throws Exception {
+        if (args.length == 0) {
+            return args;
+        }
+
+        boolean hasRequiredNamedArgs = Arrays.stream(args).anyMatch(arg -> 
+            arg.equals("--bootstrap-server") || 
+            arg.equals("--topic") || 
+            arg.equals("--num-records") || 
+            arg.equals("--producer-acks") || 
+            arg.equals("--record-size"));
+        if (hasRequiredNamedArgs) {
+            return args;
+        }
+
+        if (args.length != 5 && args.length != 6) {
+            throw new TerseException("Invalid number of arguments. Expected 5 
or 6 positional arguments, but got " + args.length + ". " +
+                    "Usage: bootstrap-server topic num-records producer-acks 
record-size [optional] command-config");
+        }
+
+        return convertLegacyArgs(args);
+    }
+
+    private static String[] convertLegacyArgs(String[] legacyArgs) {
+        List<String> newArgs = new ArrayList<>();
+
+        // broker_list -> --bootstrap-server
+        newArgs.add("--bootstrap-server");
+        newArgs.add(legacyArgs[0]);
+
+        // topic -> --topic
+        newArgs.add("--topic");
+        newArgs.add(legacyArgs[1]);
+
+        // num_messages -> --num-records
+        newArgs.add("--num-records");
+        newArgs.add(legacyArgs[2]);
+
+        // producer_acks -> --producer-acks
+        newArgs.add("--producer-acks");
+        newArgs.add(legacyArgs[3]);
+
+        // message_size_bytes -> --record-size
+        newArgs.add("--record-size");
+        newArgs.add(legacyArgs[4]);
+
+        // properties_file -> --command-config
+        if (legacyArgs.length == 6 && !legacyArgs[5].trim().isEmpty()) {
+            newArgs.add("--command-config");
+            newArgs.add(legacyArgs[5]);
+        }
+        System.out.println("WARNING: Positional argument usage is deprecated 
and will be removed in Apache Kafka 5.0. " +
+                "Please use named arguments instead: --bootstrap-server, 
--topic, --num-records, --producer-acks, --record-size, --command-config");
+        return newArgs.toArray(new String[0]);
+    }
+
+    public static final class EndToEndLatencyCommandOptions extends 
CommandDefaultOptions {
+        final OptionSpec<String> bootstrapServerOpt;
+        final OptionSpec<String> topicOpt;
+        final OptionSpec<Integer> numRecordsOpt;
+        final OptionSpec<String> acksOpt;
+        final OptionSpec<Integer> recordSizeOpt;
+        final OptionSpec<String> commandConfigOpt;
+        final OptionSpec<Integer> recordKeyOpt;
+        final OptionSpec<Integer> recordHeaderValueSizeOpt;
+        final OptionSpec<Integer> recordHeaderKeySizeOpt;
+        final OptionSpec<Integer> numHeadersOpt;
+
+        public EndToEndLatencyCommandOptions(String[] args) {
+            super(args);
+
+            bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED: 
The Kafka broker list string in the form HOST1:PORT1,HOST2:PORT2.")
+                    .withRequiredArg()
+                    .describedAs("bootstrap-server")
+                    .ofType(String.class);
+            topicOpt = parser.accepts("topic", "REQUIRED: The topic to use for 
the test.")
+                    .withRequiredArg()
+                    .describedAs("topic-name")
+                    .ofType(String.class);
+            numRecordsOpt = parser.accepts("num-records", "REQUIRED: The 
number of messages to send.")
+                    .withRequiredArg()
+                    .describedAs("count")
+                    .ofType(Integer.class);
+            acksOpt = parser.accepts("producer-acks", "REQUIRED: Producer 
acknowledgements. Must be '1' or 'all'.")
+                    .withRequiredArg()
+                    .describedAs("producer-acks")
+                    .ofType(String.class);
+            recordSizeOpt = parser.accepts("record-size", "REQUIRED: The size 
of each message payload in bytes.")
+                    .withRequiredArg()
+                    .describedAs("bytes")
+                    .ofType(Integer.class);
+            recordKeyOpt = parser.accepts("record-key-size", "Optional: The 
size of the message key in bytes. If not set, messages are sent without a key.")
+                    .withOptionalArg()
+                    .describedAs("bytes")
+                    .ofType(Integer.class)
+                    .defaultsTo(0);
+            recordHeaderKeySizeOpt = parser.accepts("record-header-key-size", 
"Optional: The size of the message header key in bytes. Used together with 
record-header-size.")
+                    .withOptionalArg()
+                    .describedAs("bytes")
+                    .ofType(Integer.class)
+                    .defaultsTo(0);

Review Comment:
   This is not discussed in the KIP. By default, the number of records is one, 
so should we create a non-empty header by default?



##########
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java:
##########
@@ -221,4 +290,173 @@ private static KafkaProducer<byte[], byte[]> 
createKafkaProducer(Optional<String
         producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArraySerializer");
         return new KafkaProducer<>(producerProps);
     }
+
+    /**
+     * Converts legacy positional arguments to named arguments for backward 
compatibility.
+     *
+     * @param args the command line arguments to convert
+     * @return converted named arguments
+     * @throws Exception if the legacy arguments are invalid
+     * @deprecated Positional argument usage is deprecated and will be removed 
in Apache Kafka 5.0.
+     *             Use named arguments instead: --bootstrap-server, --topic, 
--num-records, --producer-acks, --record-size, --command-config
+     */
+    @Deprecated(since = "4.2", forRemoval = true)
+    static String[] convertLegacyArgsIfNeeded(String[] args) throws Exception {
+        if (args.length == 0) {
+            return args;
+        }
+
+        boolean hasRequiredNamedArgs = Arrays.stream(args).anyMatch(arg -> 
+            arg.equals("--bootstrap-server") || 
+            arg.equals("--topic") || 
+            arg.equals("--num-records") || 
+            arg.equals("--producer-acks") || 
+            arg.equals("--record-size"));
+        if (hasRequiredNamedArgs) {
+            return args;
+        }
+
+        if (args.length != 5 && args.length != 6) {
+            throw new TerseException("Invalid number of arguments. Expected 5 
or 6 positional arguments, but got " + args.length + ". " +
+                    "Usage: bootstrap-server topic num-records producer-acks 
record-size [optional] command-config");
+        }
+
+        return convertLegacyArgs(args);
+    }
+
+    private static String[] convertLegacyArgs(String[] legacyArgs) {
+        List<String> newArgs = new ArrayList<>();
+
+        // broker_list -> --bootstrap-server
+        newArgs.add("--bootstrap-server");
+        newArgs.add(legacyArgs[0]);
+
+        // topic -> --topic
+        newArgs.add("--topic");
+        newArgs.add(legacyArgs[1]);
+
+        // num_messages -> --num-records
+        newArgs.add("--num-records");
+        newArgs.add(legacyArgs[2]);
+
+        // producer_acks -> --producer-acks
+        newArgs.add("--producer-acks");
+        newArgs.add(legacyArgs[3]);
+
+        // message_size_bytes -> --record-size
+        newArgs.add("--record-size");
+        newArgs.add(legacyArgs[4]);
+
+        // properties_file -> --command-config
+        if (legacyArgs.length == 6 && !legacyArgs[5].trim().isEmpty()) {
+            newArgs.add("--command-config");
+            newArgs.add(legacyArgs[5]);
+        }
+        System.out.println("WARNING: Positional argument usage is deprecated 
and will be removed in Apache Kafka 5.0. " +
+                "Please use named arguments instead: --bootstrap-server, 
--topic, --num-records, --producer-acks, --record-size, --command-config");
+        return newArgs.toArray(new String[0]);
+    }
+
+    public static final class EndToEndLatencyCommandOptions extends 
CommandDefaultOptions {
+        final OptionSpec<String> bootstrapServerOpt;
+        final OptionSpec<String> topicOpt;
+        final OptionSpec<Integer> numRecordsOpt;
+        final OptionSpec<String> acksOpt;
+        final OptionSpec<Integer> recordSizeOpt;
+        final OptionSpec<String> commandConfigOpt;
+        final OptionSpec<Integer> recordKeyOpt;
+        final OptionSpec<Integer> recordHeaderValueSizeOpt;
+        final OptionSpec<Integer> recordHeaderKeySizeOpt;
+        final OptionSpec<Integer> numHeadersOpt;
+
+        public EndToEndLatencyCommandOptions(String[] args) {
+            super(args);
+
+            bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED: 
The Kafka broker list string in the form HOST1:PORT1,HOST2:PORT2.")
+                    .withRequiredArg()
+                    .describedAs("bootstrap-server")
+                    .ofType(String.class);
+            topicOpt = parser.accepts("topic", "REQUIRED: The topic to use for 
the test.")
+                    .withRequiredArg()
+                    .describedAs("topic-name")
+                    .ofType(String.class);
+            numRecordsOpt = parser.accepts("num-records", "REQUIRED: The 
number of messages to send.")
+                    .withRequiredArg()
+                    .describedAs("count")
+                    .ofType(Integer.class);
+            acksOpt = parser.accepts("producer-acks", "REQUIRED: Producer 
acknowledgements. Must be '1' or 'all'.")
+                    .withRequiredArg()
+                    .describedAs("producer-acks")
+                    .ofType(String.class);
+            recordSizeOpt = parser.accepts("record-size", "REQUIRED: The size 
of each message payload in bytes.")
+                    .withRequiredArg()
+                    .describedAs("bytes")
+                    .ofType(Integer.class);
+            recordKeyOpt = parser.accepts("record-key-size", "Optional: The 
size of the message key in bytes. If not set, messages are sent without a key.")
+                    .withOptionalArg()
+                    .describedAs("bytes")
+                    .ofType(Integer.class)
+                    .defaultsTo(0);
+            recordHeaderKeySizeOpt = parser.accepts("record-header-key-size", 
"Optional: The size of the message header key in bytes. Used together with 
record-header-size.")
+                    .withOptionalArg()
+                    .describedAs("bytes")
+                    .ofType(Integer.class)
+                    .defaultsTo(0);
+            recordHeaderValueSizeOpt = parser.accepts("record-header-size", 
"Optional: The size of message header value in bytes. Use -1 for null header 
value.")
+                    .withOptionalArg()
+                    .describedAs("bytes")
+                    .ofType(Integer.class)
+                    .defaultsTo(0);
+            numHeadersOpt = parser.accepts("num-headers", "Optional: The 
number of headers to include in each message.")
+                    .withOptionalArg()
+                    .describedAs("count")
+                    .ofType(Integer.class)
+                    .defaultsTo(1);

Review Comment:
   Could you please alos add those default values to the KIP?



##########
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java:
##########
@@ -221,4 +290,173 @@ private static KafkaProducer<byte[], byte[]> 
createKafkaProducer(Optional<String
         producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArraySerializer");
         return new KafkaProducer<>(producerProps);
     }
+
+    /**
+     * Converts legacy positional arguments to named arguments for backward 
compatibility.
+     *
+     * @param args the command line arguments to convert
+     * @return converted named arguments
+     * @throws Exception if the legacy arguments are invalid
+     * @deprecated Positional argument usage is deprecated and will be removed 
in Apache Kafka 5.0.
+     *             Use named arguments instead: --bootstrap-server, --topic, 
--num-records, --producer-acks, --record-size, --command-config
+     */
+    @Deprecated(since = "4.2", forRemoval = true)
+    static String[] convertLegacyArgsIfNeeded(String[] args) throws Exception {
+        if (args.length == 0) {
+            return args;
+        }
+
+        boolean hasRequiredNamedArgs = Arrays.stream(args).anyMatch(arg -> 
+            arg.equals("--bootstrap-server") || 
+            arg.equals("--topic") || 
+            arg.equals("--num-records") || 
+            arg.equals("--producer-acks") || 
+            arg.equals("--record-size"));
+        if (hasRequiredNamedArgs) {
+            return args;
+        }
+
+        if (args.length != 5 && args.length != 6) {
+            throw new TerseException("Invalid number of arguments. Expected 5 
or 6 positional arguments, but got " + args.length + ". " +
+                    "Usage: bootstrap-server topic num-records producer-acks 
record-size [optional] command-config");
+        }
+
+        return convertLegacyArgs(args);
+    }
+
+    private static String[] convertLegacyArgs(String[] legacyArgs) {
+        List<String> newArgs = new ArrayList<>();
+
+        // broker_list -> --bootstrap-server
+        newArgs.add("--bootstrap-server");
+        newArgs.add(legacyArgs[0]);
+
+        // topic -> --topic
+        newArgs.add("--topic");
+        newArgs.add(legacyArgs[1]);
+
+        // num_messages -> --num-records
+        newArgs.add("--num-records");
+        newArgs.add(legacyArgs[2]);
+
+        // producer_acks -> --producer-acks
+        newArgs.add("--producer-acks");
+        newArgs.add(legacyArgs[3]);
+
+        // message_size_bytes -> --record-size
+        newArgs.add("--record-size");
+        newArgs.add(legacyArgs[4]);
+
+        // properties_file -> --command-config
+        if (legacyArgs.length == 6 && !legacyArgs[5].trim().isEmpty()) {
+            newArgs.add("--command-config");
+            newArgs.add(legacyArgs[5]);
+        }
+        System.out.println("WARNING: Positional argument usage is deprecated 
and will be removed in Apache Kafka 5.0. " +
+                "Please use named arguments instead: --bootstrap-server, 
--topic, --num-records, --producer-acks, --record-size, --command-config");
+        return newArgs.toArray(new String[0]);
+    }
+
+    public static final class EndToEndLatencyCommandOptions extends 
CommandDefaultOptions {
+        final OptionSpec<String> bootstrapServerOpt;
+        final OptionSpec<String> topicOpt;
+        final OptionSpec<Integer> numRecordsOpt;
+        final OptionSpec<String> acksOpt;
+        final OptionSpec<Integer> recordSizeOpt;
+        final OptionSpec<String> commandConfigOpt;
+        final OptionSpec<Integer> recordKeyOpt;
+        final OptionSpec<Integer> recordHeaderValueSizeOpt;
+        final OptionSpec<Integer> recordHeaderKeySizeOpt;
+        final OptionSpec<Integer> numHeadersOpt;
+
+        public EndToEndLatencyCommandOptions(String[] args) {
+            super(args);
+
+            bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED: 
The Kafka broker list string in the form HOST1:PORT1,HOST2:PORT2.")
+                    .withRequiredArg()
+                    .describedAs("bootstrap-server")
+                    .ofType(String.class);
+            topicOpt = parser.accepts("topic", "REQUIRED: The topic to use for 
the test.")
+                    .withRequiredArg()
+                    .describedAs("topic-name")
+                    .ofType(String.class);
+            numRecordsOpt = parser.accepts("num-records", "REQUIRED: The 
number of messages to send.")
+                    .withRequiredArg()
+                    .describedAs("count")
+                    .ofType(Integer.class);
+            acksOpt = parser.accepts("producer-acks", "REQUIRED: Producer 
acknowledgements. Must be '1' or 'all'.")
+                    .withRequiredArg()
+                    .describedAs("producer-acks")
+                    .ofType(String.class);
+            recordSizeOpt = parser.accepts("record-size", "REQUIRED: The size 
of each message payload in bytes.")
+                    .withRequiredArg()
+                    .describedAs("bytes")
+                    .ofType(Integer.class);
+            recordKeyOpt = parser.accepts("record-key-size", "Optional: The 
size of the message key in bytes. If not set, messages are sent without a key.")
+                    .withOptionalArg()
+                    .describedAs("bytes")
+                    .ofType(Integer.class)
+                    .defaultsTo(0);
+            recordHeaderKeySizeOpt = parser.accepts("record-header-key-size", 
"Optional: The size of the message header key in bytes. Used together with 
record-header-size.")
+                    .withOptionalArg()
+                    .describedAs("bytes")
+                    .ofType(Integer.class)
+                    .defaultsTo(0);
+            recordHeaderValueSizeOpt = parser.accepts("record-header-size", 
"Optional: The size of message header value in bytes. Use -1 for null header 
value.")
+                    .withOptionalArg()
+                    .describedAs("bytes")
+                    .ofType(Integer.class)
+                    .defaultsTo(0);
+            numHeadersOpt = parser.accepts("num-headers", "Optional: The 
number of headers to include in each message.")
+                    .withOptionalArg()
+                    .describedAs("count")
+                    .ofType(Integer.class)
+                    .defaultsTo(1);

Review Comment:
   ditto



##########
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java:
##########
@@ -122,33 +137,87 @@ static void execute(String... args) throws Exception {
                 latencies[i] = elapsed / 1000 / 1000;
             }
 
-            printResults(numMessages, totalTime, latencies);
+            printResults(numRecords, totalTime, latencies);
             consumer.commitSync();
         }
     }
 
     // Visible for testing
-    static void validate(KafkaConsumer<byte[], byte[]> consumer, byte[] 
message, ConsumerRecords<byte[], byte[]> records) {
+    static void validate(KafkaConsumer<byte[], byte[]> consumer, byte[] 
sentRecordValue, ConsumerRecords<byte[], byte[]> records, byte[] sentRecordKey, 
Iterable<Header> sentHeaders) {
         if (records.isEmpty()) {
-            consumer.commitSync();
-            throw new RuntimeException("poll() timed out before finding a 
result (timeout:[" + POLL_TIMEOUT_MS + "])");
+            commitAndThrow(consumer, "poll() timed out before finding a result 
(timeout:[" + POLL_TIMEOUT_MS + "ms])");
         }
 
-        //Check result matches the original record
-        String sent = new String(message, StandardCharsets.UTF_8);
-        String read = new String(records.iterator().next().value(), 
StandardCharsets.UTF_8);
+        ConsumerRecord<byte[], byte[]> record = records.iterator().next();
+        String sent = new String(sentRecordValue, StandardCharsets.UTF_8);
+        String read = new String(record.value(), StandardCharsets.UTF_8);
 
         if (!read.equals(sent)) {
-            consumer.commitSync();
-            throw new RuntimeException("The message read [" + read + "] did 
not match the message sent [" + sent + "]");
+            commitAndThrow(consumer, "The message value read [" + read + "] 
did not match the message value sent [" + sent + "]");
+        }
+
+        if (sentRecordKey != null) {
+            if (record.key() == null) {
+                commitAndThrow(consumer, "Expected message key but received 
null");
+            }
+            String sentKey = new String(sentRecordKey, StandardCharsets.UTF_8);
+            String readKey = new String(record.key(), StandardCharsets.UTF_8);
+            if (!readKey.equals(sentKey)) {
+                commitAndThrow(consumer, "The message key read [" + readKey + 
"] did not match the message key sent [" + sentKey + "]");
+            }
+        } else if (record.key() != null) {
+            commitAndThrow(consumer, "Expected null message key but received 
[" + new String(record.key(), StandardCharsets.UTF_8) + "]");
         }
 
+        validateHeaders(consumer, sentHeaders, record);
+
         //Check we only got the one message
         if (records.count() != 1) {
             int count = records.count();
-            consumer.commitSync();
-            throw new RuntimeException("Only one result was expected during 
this test. We found [" + count + "]");
+            commitAndThrow(consumer, "Only one result was expected during this 
test. We found [" + count + "]");
+        }
+    }
+
+    private static void commitAndThrow(KafkaConsumer<byte[], byte[]> consumer, 
String message) {
+        consumer.commitSync();
+        throw new RuntimeException(message);
+    }
+
+    private static void validateHeaders(KafkaConsumer<byte[], byte[]> 
consumer, Iterable<Header> sentHeaders, ConsumerRecord<byte[], byte[]> record) {
+        if (sentHeaders != null && sentHeaders.iterator().hasNext()) {
+            if (!record.headers().iterator().hasNext()) {
+                commitAndThrow(consumer, "Expected message headers but 
received none");
+            }
+            
+            Iterator<Header> sentIterator = sentHeaders.iterator();
+            Iterator<Header> receivedIterator = record.headers().iterator();
+            
+            while (sentIterator.hasNext() && receivedIterator.hasNext()) {
+                Header sentHeader = sentIterator.next();
+                Header receivedHeader = receivedIterator.next();
+                if (!receivedHeader.key().equals(sentHeader.key()) || 
!Arrays.equals(receivedHeader.value(), sentHeader.value())) {
+                    String receivedValueStr = receivedHeader.value() == null ? 
"null" : Arrays.toString(receivedHeader.value());
+                    String sentValueStr = sentHeader.value() == null ? "null" 
: Arrays.toString(sentHeader.value());
+                    commitAndThrow(consumer, "The message header read [" + 
receivedHeader.key() + ":" + receivedValueStr +
+                            "] did not match the message header sent [" + 
sentHeader.key() + ":" + sentValueStr + "]");
+                }
+            }
+            
+            if (sentIterator.hasNext() || receivedIterator.hasNext()) {
+                commitAndThrow(consumer, "Header count mismatch between sent 
and received messages");
+            }
+        }
+    }
+
+    private static List<Header> generateHeadersWithSeparateSizes(Random 
random, int numHeaders, int keySize, int valueSize) {
+        List<Header> headers = new ArrayList<>();
+
+        for (int i = 0; i < numHeaders; i++) {
+            String headerKey = new String(randomBytesOfLen(random, keySize), 
StandardCharsets.UTF_8);
+            byte[] headerValue = valueSize == -1 ? null : 
randomBytesOfLen(random, valueSize);
+            headers.add(new RecordHeader(headerKey, headerValue));

Review Comment:
   `RecordHeader` is internal class. Perhaps we could add a record class instead



##########
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java:
##########
@@ -122,33 +137,87 @@ static void execute(String... args) throws Exception {
                 latencies[i] = elapsed / 1000 / 1000;
             }
 
-            printResults(numMessages, totalTime, latencies);
+            printResults(numRecords, totalTime, latencies);
             consumer.commitSync();
         }
     }
 
     // Visible for testing
-    static void validate(KafkaConsumer<byte[], byte[]> consumer, byte[] 
message, ConsumerRecords<byte[], byte[]> records) {
+    static void validate(KafkaConsumer<byte[], byte[]> consumer, byte[] 
sentRecordValue, ConsumerRecords<byte[], byte[]> records, byte[] sentRecordKey, 
Iterable<Header> sentHeaders) {
         if (records.isEmpty()) {
-            consumer.commitSync();
-            throw new RuntimeException("poll() timed out before finding a 
result (timeout:[" + POLL_TIMEOUT_MS + "])");
+            commitAndThrow(consumer, "poll() timed out before finding a result 
(timeout:[" + POLL_TIMEOUT_MS + "ms])");
         }
 
-        //Check result matches the original record
-        String sent = new String(message, StandardCharsets.UTF_8);
-        String read = new String(records.iterator().next().value(), 
StandardCharsets.UTF_8);
+        ConsumerRecord<byte[], byte[]> record = records.iterator().next();
+        String sent = new String(sentRecordValue, StandardCharsets.UTF_8);
+        String read = new String(record.value(), StandardCharsets.UTF_8);
 
         if (!read.equals(sent)) {
-            consumer.commitSync();
-            throw new RuntimeException("The message read [" + read + "] did 
not match the message sent [" + sent + "]");
+            commitAndThrow(consumer, "The message value read [" + read + "] 
did not match the message value sent [" + sent + "]");
+        }
+
+        if (sentRecordKey != null) {
+            if (record.key() == null) {
+                commitAndThrow(consumer, "Expected message key but received 
null");
+            }
+            String sentKey = new String(sentRecordKey, StandardCharsets.UTF_8);
+            String readKey = new String(record.key(), StandardCharsets.UTF_8);
+            if (!readKey.equals(sentKey)) {
+                commitAndThrow(consumer, "The message key read [" + readKey + 
"] did not match the message key sent [" + sentKey + "]");
+            }
+        } else if (record.key() != null) {
+            commitAndThrow(consumer, "Expected null message key but received 
[" + new String(record.key(), StandardCharsets.UTF_8) + "]");
         }
 
+        validateHeaders(consumer, sentHeaders, record);
+
         //Check we only got the one message
         if (records.count() != 1) {
             int count = records.count();
-            consumer.commitSync();
-            throw new RuntimeException("Only one result was expected during 
this test. We found [" + count + "]");
+            commitAndThrow(consumer, "Only one result was expected during this 
test. We found [" + count + "]");
+        }
+    }
+
+    private static void commitAndThrow(KafkaConsumer<byte[], byte[]> consumer, 
String message) {
+        consumer.commitSync();
+        throw new RuntimeException(message);
+    }
+
+    private static void validateHeaders(KafkaConsumer<byte[], byte[]> 
consumer, Iterable<Header> sentHeaders, ConsumerRecord<byte[], byte[]> record) {
+        if (sentHeaders != null && sentHeaders.iterator().hasNext()) {
+            if (!record.headers().iterator().hasNext()) {
+                commitAndThrow(consumer, "Expected message headers but 
received none");
+            }
+            
+            Iterator<Header> sentIterator = sentHeaders.iterator();
+            Iterator<Header> receivedIterator = record.headers().iterator();
+            
+            while (sentIterator.hasNext() && receivedIterator.hasNext()) {
+                Header sentHeader = sentIterator.next();
+                Header receivedHeader = receivedIterator.next();
+                if (!receivedHeader.key().equals(sentHeader.key()) || 
!Arrays.equals(receivedHeader.value(), sentHeader.value())) {
+                    String receivedValueStr = receivedHeader.value() == null ? 
"null" : Arrays.toString(receivedHeader.value());
+                    String sentValueStr = sentHeader.value() == null ? "null" 
: Arrays.toString(sentHeader.value());
+                    commitAndThrow(consumer, "The message header read [" + 
receivedHeader.key() + ":" + receivedValueStr +
+                            "] did not match the message header sent [" + 
sentHeader.key() + ":" + sentValueStr + "]");
+                }
+            }
+            
+            if (sentIterator.hasNext() || receivedIterator.hasNext()) {
+                commitAndThrow(consumer, "Header count mismatch between sent 
and received messages");
+            }
+        }
+    }
+
+    private static List<Header> generateHeadersWithSeparateSizes(Random 
random, int numHeaders, int keySize, int valueSize) {
+        List<Header> headers = new ArrayList<>();
+
+        for (int i = 0; i < numHeaders; i++) {
+            String headerKey = new String(randomBytesOfLen(random, keySize), 
StandardCharsets.UTF_8);

Review Comment:
   Have you considered directly generating a random string?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to