yashmayya commented on code in PR #14718: URL: https://github.com/apache/kafka/pull/14718#discussion_r1387771267
########## connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetUtilsTest.java: ########## @@ -75,49 +76,51 @@ public void testValidateFormatWithValidFormat() { OffsetUtils.validateFormat(offsetData); } + @Test + public void testProcessPartitionKeyWithUnknownSerialization() { + assertProcessPartitionKeyLogMessage( + new byte[]{0}, + "Ignoring offset partition key with unknown serialization"); + assertProcessPartitionKeyLogMessage( + "i-am-not-json".getBytes(StandardCharsets.UTF_8), + "Ignoring offset partition key with unknown serialization"); + } + @Test public void testProcessPartitionKeyNotList() { - try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(OffsetUtils.class)) { - Map<String, Set<Map<String, Object>>> connectorPartitions = new HashMap<>(); - OffsetUtils.processPartitionKey(serializePartitionKey(new HashMap<>()), new byte[0], CONVERTER, connectorPartitions); - // Expect no partition to be added to the map since the partition key is of an invalid format - assertEquals(0, connectorPartitions.size()); - assertEquals(1, logCaptureAppender.getMessages().size()); - assertThat(logCaptureAppender.getMessages().get(0), - containsString("Ignoring offset partition key with an unexpected format")); - } + assertProcessPartitionKeyLogMessage( + new byte[]{}, + "Ignoring offset partition key with an unexpected format"); + assertProcessPartitionKeyLogMessage( + serializePartitionKey(new HashMap<>()), + "Ignoring offset partition key with an unexpected format"); } @Test public void testProcessPartitionKeyListWithOneElement() { - try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(OffsetUtils.class)) { - Map<String, Set<Map<String, Object>>> connectorPartitions = new HashMap<>(); - OffsetUtils.processPartitionKey(serializePartitionKey(Collections.singletonList("")), new byte[0], CONVERTER, connectorPartitions); - // Expect no partition to be added to the map since the partition key is of an invalid format - assertEquals(0, connectorPartitions.size()); - assertEquals(1, logCaptureAppender.getMessages().size()); - assertThat(logCaptureAppender.getMessages().get(0), - containsString("Ignoring offset partition key with an unexpected number of elements")); - } + assertProcessPartitionKeyLogMessage( + serializePartitionKey(Collections.singletonList("")), + "Ignoring offset partition key with an unexpected number of elements"); } @Test public void testProcessPartitionKeyListWithElementsOfWrongType() { + assertProcessPartitionKeyLogMessage( + serializePartitionKey(Arrays.asList(1, new HashMap<>())), + "Ignoring offset partition key with an unexpected format for the first element in the partition key list"); + assertProcessPartitionKeyLogMessage( + serializePartitionKey(Arrays.asList("connector-name", new ArrayList<>())), + "Ignoring offset partition key with an unexpected format for the second element in the partition key list"); + } + + public void assertProcessPartitionKeyLogMessage(byte[] key, String message) { Review Comment: This isn't just asserting the warning log message, it's also asserting that nothing was added to the `connectorPartitions` (the partition key is expected to be invalid). Could we name this method something like `assertInvalidPartitionKey` instead? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org