Rancho-7 commented on code in PR #20301:
URL: https://github.com/apache/kafka/pull/20301#discussion_r2280908402


##########
tools/src/test/java/org/apache/kafka/tools/EndToEndLatencyTest.java:
##########
@@ -29,50 +33,248 @@
 
 import java.nio.charset.StandardCharsets;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Stream;
 
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 @ExtendWith(MockitoExtension.class)
 @MockitoSettings(strictness = Strictness.STRICT_STUBS)
 public class EndToEndLatencyTest {
 
+    private static final byte[] RECORD_VALUE = 
"record-sent".getBytes(StandardCharsets.UTF_8);
+    private static final byte[] RECORD_VALUE_DIFFERENT = 
"record-received".getBytes(StandardCharsets.UTF_8);
+    private static final byte[] RECORD_KEY = 
"key-sent".getBytes(StandardCharsets.UTF_8);
+    private static final byte[] RECORD_KEY_DIFFERENT = 
"key-received".getBytes(StandardCharsets.UTF_8);
+    private static final String HEADER_KEY = "header-key-sent";
+    private static final String HEADER_KEY_DIFFERENT = "header-key-received";
+    private static final byte[] HEADER_VALUE = 
"header-value-sent".getBytes(StandardCharsets.UTF_8);
+    private static final byte[] HEADER_VALUE_DIFFERENT = 
"header-value-received".getBytes(StandardCharsets.UTF_8);
+
+    // legacy format test arguments
+    private static final String[] LEGACY_INVALID_ARGS_UNEXPECTED = {
+        "localhost:9092", "test", "10000", "1", "200", "propsfile.properties", 
"random"
+    };
+
+    private static class ArgsBuilder {
+        private final Map<String, String> params = new LinkedHashMap<>();
+        
+        private ArgsBuilder() {
+            params.put("--bootstrap-server", "localhost:9092");
+            params.put("--topic", "test-topic");
+            params.put("--num-records", "100");
+            params.put("--producer-acks", "1");
+            params.put("--record-size", "200");
+        }
+        
+        public static ArgsBuilder defaults() {
+            return new ArgsBuilder();
+        }
+        
+        public ArgsBuilder with(String param, String value) {
+            params.put(param, value);
+            return this;
+        }
+        
+        public String[] build() {
+            return params.entrySet().stream()
+                    .flatMap(entry -> Stream.of(entry.getKey(), 
entry.getValue()))
+                    .toArray(String[]::new);
+        }
+
+        public ArgsBuilder withNegative(String param) {
+            return with(param, "-1");
+        }
+        
+        public ArgsBuilder withZero(String param) {
+            return with(param, "0");
+        }
+    }
+
     @Mock
     KafkaConsumer<byte[], byte[]> consumer;
 
     @Mock
     ConsumerRecords<byte[], byte[]> records;
 
     @Test
-    public void shouldFailWhenSuppliedUnexpectedArgs() {
-        String[] args = new String[] {"localhost:9092", "test", "10000", "1", 
"200", "propsfile.properties", "random"};
-        assertThrows(TerseException.class, () -> 
EndToEndLatency.execute(args));
+    public void testInvalidArgs() {
+        testUnexpectedArgsWithLegacyFormat();
+        testInvalidProducerAcks();
+        testInvalidNumRecords();
+        testInvalidRecordSize();
+        testInvalidRecordKey();
+        testInvalidNumHeaders();
+        testInvalidRecordHeaderKey();
+        testInvalidRecordHeaderValue();
+    }
+
+    private void testUnexpectedArgsWithLegacyFormat() {
+        assertThrows(TerseException.class, () -> 
EndToEndLatency.execute(LEGACY_INVALID_ARGS_UNEXPECTED));
+    }
+
+    private void testInvalidNumRecords() {
+        String expectedMsg = "Value for --num-records must be a positive 
integer.";
+        assertInitializeInvalidOptionsExitCodeAndMsg(
+            ArgsBuilder.defaults().withNegative("--num-records").build(), 
expectedMsg);
+    }
+
+    private void testInvalidRecordSize() {
+        String expectedMsg = "Value for --record-size must be a non-negative 
integer.";
+        assertInitializeInvalidOptionsExitCodeAndMsg(
+            ArgsBuilder.defaults().withNegative("--record-size").build(), 
expectedMsg);
+    }
+
+    private void testInvalidRecordKey() {
+        String expectedMsg = "Value for --record-key-size must be a 
non-negative integer.";
+        assertInitializeInvalidOptionsExitCodeAndMsg(
+            ArgsBuilder.defaults().withNegative("--record-key-size").build(), 
expectedMsg);
+    }
+
+    private void testInvalidNumHeaders() {
+        String expectedMsg = "Value for --num-headers must be a non-negative 
integer.";
+        assertInitializeInvalidOptionsExitCodeAndMsg(
+                ArgsBuilder.defaults().withNegative("--num-headers").build(), 
expectedMsg);
+    }
+
+    private void testInvalidRecordHeaderKey() {
+        String expectedMsg = "Value for --record-header-key-size must be a 
non-negative integer.";
+        assertInitializeInvalidOptionsExitCodeAndMsg(
+            
ArgsBuilder.defaults().withNegative("--record-header-key-size").build(), 
expectedMsg);
+    }
+
+    private void testInvalidRecordHeaderValue() {
+        String expectedMsg = "Value for --record-header-size must be a 
non-negative integer.";
+        assertInitializeInvalidOptionsExitCodeAndMsg(
+            
ArgsBuilder.defaults().withNegative("--record-header-size").build(), 
expectedMsg);
+    }
+
+    private void testInvalidProducerAcks() {
+        String expectedMsg = "Invalid value for --producer-acks. Latency 
testing requires synchronous acknowledgement. Please use '1' or 'all'.";
+        assertInitializeInvalidOptionsExitCodeAndMsg(
+                ArgsBuilder.defaults().withZero("--producer-acks").build(), 
expectedMsg);
+    }
+
+    private void assertInitializeInvalidOptionsExitCodeAndMsg(String[] args, 
String expectedMsg) {
+        Exit.setExitProcedure((exitCode, message) -> {
+            assertEquals(1, exitCode);
+            assertTrue(message.contains(expectedMsg));
+            throw new RuntimeException();
+        });
+        try {
+            assertThrows(RuntimeException.class, () -> 
EndToEndLatency.execute(args));
+        } finally {
+            Exit.resetExitProcedure();
+        }
     }
 
     @Test
-    public void shouldFailWhenProducerAcksAreNotSynchronised() {
-        String[] args = new String[] {"localhost:9092", "test", "10000", "0", 
"200"};
-        assertThrows(IllegalArgumentException.class, () -> 
EndToEndLatency.execute(args));
+    public void testConvertLegacyArgs() throws Exception {

Review Comment:
   @m1a2st Thanks for catching this, updated it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to