yashmayya commented on code in PR #14718:
URL: https://github.com/apache/kafka/pull/14718#discussion_r1387771267


##########
connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetUtilsTest.java:
##########
@@ -75,49 +76,51 @@ public void testValidateFormatWithValidFormat() {
         OffsetUtils.validateFormat(offsetData);
     }
 
+    @Test
+    public void testProcessPartitionKeyWithUnknownSerialization() {
+        assertProcessPartitionKeyLogMessage(
+                new byte[]{0},
+                "Ignoring offset partition key with unknown serialization");
+        assertProcessPartitionKeyLogMessage(
+                "i-am-not-json".getBytes(StandardCharsets.UTF_8),
+                "Ignoring offset partition key with unknown serialization");
+    }
+
     @Test
     public void testProcessPartitionKeyNotList() {
-        try (LogCaptureAppender logCaptureAppender = 
LogCaptureAppender.createAndRegister(OffsetUtils.class)) {
-            Map<String, Set<Map<String, Object>>> connectorPartitions = new 
HashMap<>();
-            OffsetUtils.processPartitionKey(serializePartitionKey(new 
HashMap<>()), new byte[0], CONVERTER, connectorPartitions);
-            // Expect no partition to be added to the map since the partition 
key is of an invalid format
-            assertEquals(0, connectorPartitions.size());
-            assertEquals(1, logCaptureAppender.getMessages().size());
-            assertThat(logCaptureAppender.getMessages().get(0),
-                    containsString("Ignoring offset partition key with an 
unexpected format"));
-        }
+        assertProcessPartitionKeyLogMessage(
+                new byte[]{},
+                "Ignoring offset partition key with an unexpected format");
+        assertProcessPartitionKeyLogMessage(
+                serializePartitionKey(new HashMap<>()),
+                "Ignoring offset partition key with an unexpected format");
     }
 
     @Test
     public void testProcessPartitionKeyListWithOneElement() {
-        try (LogCaptureAppender logCaptureAppender = 
LogCaptureAppender.createAndRegister(OffsetUtils.class)) {
-            Map<String, Set<Map<String, Object>>> connectorPartitions = new 
HashMap<>();
-            
OffsetUtils.processPartitionKey(serializePartitionKey(Collections.singletonList("")),
 new byte[0], CONVERTER, connectorPartitions);
-            // Expect no partition to be added to the map since the partition 
key is of an invalid format
-            assertEquals(0, connectorPartitions.size());
-            assertEquals(1, logCaptureAppender.getMessages().size());
-            assertThat(logCaptureAppender.getMessages().get(0),
-                    containsString("Ignoring offset partition key with an 
unexpected number of elements"));
-        }
+        assertProcessPartitionKeyLogMessage(
+                serializePartitionKey(Collections.singletonList("")),
+                "Ignoring offset partition key with an unexpected number of 
elements");
     }
 
     @Test
     public void testProcessPartitionKeyListWithElementsOfWrongType() {
+        assertProcessPartitionKeyLogMessage(
+                serializePartitionKey(Arrays.asList(1, new HashMap<>())),
+                "Ignoring offset partition key with an unexpected format for 
the first element in the partition key list");
+        assertProcessPartitionKeyLogMessage(
+                serializePartitionKey(Arrays.asList("connector-name", new 
ArrayList<>())),
+                "Ignoring offset partition key with an unexpected format for 
the second element in the partition key list");
+    }
+
+    public void assertProcessPartitionKeyLogMessage(byte[] key, String 
message) {

Review Comment:
   This isn't just asserting the warning log message, it's also asserting that 
nothing was added to the `connectorPartitions` (the partition key is expected 
to be invalid). Could we name this method something like 
`assertInvalidPartitionKey` instead?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to