Rancho-7 commented on code in PR #20301: URL: https://github.com/apache/kafka/pull/20301#discussion_r2298564631
########## tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java: ########## @@ -122,33 +137,87 @@ static void execute(String... args) throws Exception { latencies[i] = elapsed / 1000 / 1000; } - printResults(numMessages, totalTime, latencies); + printResults(numRecords, totalTime, latencies); consumer.commitSync(); } } // Visible for testing - static void validate(KafkaConsumer<byte[], byte[]> consumer, byte[] message, ConsumerRecords<byte[], byte[]> records) { + static void validate(KafkaConsumer<byte[], byte[]> consumer, byte[] sentRecordValue, ConsumerRecords<byte[], byte[]> records, byte[] sentRecordKey, Iterable<Header> sentHeaders) { if (records.isEmpty()) { - consumer.commitSync(); - throw new RuntimeException("poll() timed out before finding a result (timeout:[" + POLL_TIMEOUT_MS + "])"); + commitAndThrow(consumer, "poll() timed out before finding a result (timeout:[" + POLL_TIMEOUT_MS + "ms])"); } - //Check result matches the original record - String sent = new String(message, StandardCharsets.UTF_8); - String read = new String(records.iterator().next().value(), StandardCharsets.UTF_8); + ConsumerRecord<byte[], byte[]> record = records.iterator().next(); + String sent = new String(sentRecordValue, StandardCharsets.UTF_8); + String read = new String(record.value(), StandardCharsets.UTF_8); if (!read.equals(sent)) { - consumer.commitSync(); - throw new RuntimeException("The message read [" + read + "] did not match the message sent [" + sent + "]"); + commitAndThrow(consumer, "The message value read [" + read + "] did not match the message value sent [" + sent + "]"); + } + + if (sentRecordKey != null) { + if (record.key() == null) { + commitAndThrow(consumer, "Expected message key but received null"); + } + String sentKey = new String(sentRecordKey, StandardCharsets.UTF_8); + String readKey = new String(record.key(), StandardCharsets.UTF_8); + if (!readKey.equals(sentKey)) { + commitAndThrow(consumer, "The message key read [" + readKey + "] did not match the message key sent [" + sentKey + "]"); + } + } else if (record.key() != null) { + commitAndThrow(consumer, "Expected null message key but received [" + new String(record.key(), StandardCharsets.UTF_8) + "]"); } + validateHeaders(consumer, sentHeaders, record); + //Check we only got the one message if (records.count() != 1) { int count = records.count(); - consumer.commitSync(); - throw new RuntimeException("Only one result was expected during this test. We found [" + count + "]"); + commitAndThrow(consumer, "Only one result was expected during this test. We found [" + count + "]"); + } + } + + private static void commitAndThrow(KafkaConsumer<byte[], byte[]> consumer, String message) { + consumer.commitSync(); + throw new RuntimeException(message); + } + + private static void validateHeaders(KafkaConsumer<byte[], byte[]> consumer, Iterable<Header> sentHeaders, ConsumerRecord<byte[], byte[]> record) { + if (sentHeaders != null && sentHeaders.iterator().hasNext()) { + if (!record.headers().iterator().hasNext()) { + commitAndThrow(consumer, "Expected message headers but received none"); + } + + Iterator<Header> sentIterator = sentHeaders.iterator(); + Iterator<Header> receivedIterator = record.headers().iterator(); + + while (sentIterator.hasNext() && receivedIterator.hasNext()) { + Header sentHeader = sentIterator.next(); + Header receivedHeader = receivedIterator.next(); + if (!receivedHeader.key().equals(sentHeader.key()) || !Arrays.equals(receivedHeader.value(), sentHeader.value())) { + String receivedValueStr = receivedHeader.value() == null ? "null" : Arrays.toString(receivedHeader.value()); + String sentValueStr = sentHeader.value() == null ? "null" : Arrays.toString(sentHeader.value()); + commitAndThrow(consumer, "The message header read [" + receivedHeader.key() + ":" + receivedValueStr + + "] did not match the message header sent [" + sentHeader.key() + ":" + sentValueStr + "]"); + } + } + + if (sentIterator.hasNext() || receivedIterator.hasNext()) { + commitAndThrow(consumer, "Header count mismatch between sent and received messages"); + } + } + } + + private static List<Header> generateHeadersWithSeparateSizes(Random random, int numHeaders, int keySize, int valueSize) { + List<Header> headers = new ArrayList<>(); + + for (int i = 0; i < numHeaders; i++) { + String headerKey = new String(randomBytesOfLen(random, keySize), StandardCharsets.UTF_8); Review Comment: Not yet. The `randomBytesOfLen` function was used in the earlier version, and I found it could be reused, so I kept it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org