clolov commented on code in PR #12418:
URL: https://github.com/apache/kafka/pull/12418#discussion_r937576532


##########
connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java:
##########
@@ -103,36 +103,57 @@ public class KafkaOffsetBackingStoreTest {
     private static final ByteBuffer TP0_VALUE_NEW = buffer("VAL0_NEW");
     private static final ByteBuffer TP1_VALUE_NEW = buffer("VAL1_NEW");
 
-    @Mock
-    KafkaBasedLog<byte[], byte[]> storeLog;
-    private KafkaOffsetBackingStore store;
-
-    private Capture<String> capturedTopic = EasyMock.newCapture();
-    private Capture<Map<String, Object>> capturedProducerProps = 
EasyMock.newCapture();
-    private Capture<Map<String, Object>> capturedConsumerProps = 
EasyMock.newCapture();
-    private Capture<Supplier<TopicAdmin>> capturedAdminSupplier = 
EasyMock.newCapture();
-    private Capture<NewTopic> capturedNewTopic = EasyMock.newCapture();
-    private Capture<Callback<ConsumerRecord<byte[], byte[]>>> 
capturedConsumedCallback = EasyMock.newCapture();
+    @SuppressWarnings("unchecked")

Review Comment:
   Hopefully addressed in the upcoming commit.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java:
##########
@@ -142,97 +163,90 @@ public void testStartStop() throws Exception {
         assertEquals(TOPIC_PARTITIONS, 
capturedNewTopic.getValue().numPartitions());
         assertEquals(TOPIC_REPLICATION_FACTOR, 
capturedNewTopic.getValue().replicationFactor());
 
-        store.start();
-        store.stop();
+        /* Clean up */
 
-        PowerMock.verifyAll();
+        store.stop();
     }
 
     @Test
     public void testReloadOnStart() throws Exception {
-        expectConfigure();
-        expectStart(Arrays.asList(
-                new ConsumerRecord<>(TOPIC, 0, 0, 0L, 
TimestampType.CREATE_TIME, 0, 0, TP0_KEY.array(), TP0_VALUE.array(),
-                    new RecordHeaders(), Optional.empty()),
-                new ConsumerRecord<>(TOPIC, 1, 0, 0L, 
TimestampType.CREATE_TIME, 0, 0, TP1_KEY.array(), TP1_VALUE.array(),
-                    new RecordHeaders(), Optional.empty()),
-                new ConsumerRecord<>(TOPIC, 0, 1, 0L, 
TimestampType.CREATE_TIME, 0, 0, TP0_KEY.array(), TP0_VALUE_NEW.array(),
-                    new RecordHeaders(), Optional.empty()),
-                new ConsumerRecord<>(TOPIC, 1, 1, 0L, 
TimestampType.CREATE_TIME, 0, 0, TP1_KEY.array(), TP1_VALUE_NEW.array(),
-                    new RecordHeaders(), Optional.empty())
-        ));
-        expectStop();
-        expectClusterId();
-
-        PowerMock.replayAll();
+        /* Setup */
+
+        doAnswer(invocation -> {
+            capturedConsumedCallback.getValue().onCompletion(null, new 
ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, 
TP0_KEY.array(), TP0_VALUE.array(),
+                    new RecordHeaders(), Optional.empty()));
+            capturedConsumedCallback.getValue().onCompletion(null, new 
ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, 0, 0, 
TP1_KEY.array(), TP1_VALUE.array(),
+                    new RecordHeaders(), Optional.empty()));
+            capturedConsumedCallback.getValue().onCompletion(null, new 
ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0, 0, 
TP0_KEY.array(), TP0_VALUE_NEW.array(),
+                    new RecordHeaders(), Optional.empty()));
+            capturedConsumedCallback.getValue().onCompletion(null, new 
ConsumerRecord<>(TOPIC, 1, 1, 0L, TimestampType.CREATE_TIME, 0, 0, 
TP1_KEY.array(), TP1_VALUE_NEW.array(),
+                    new RecordHeaders(), Optional.empty()));
+            return null;
+        }).when(storeLog).start();
+
+        /* Execution */
 
         store.configure(DEFAULT_DISTRIBUTED_CONFIG);
         store.start();
-        HashMap<ByteBuffer, ByteBuffer> data = 
Whitebox.getInternalState(store, "data");
+
+        /* Assertions */
+
+        // Unless the implementation of KafkaOffsetBackingStore changes there 
should only ever be 1 data field.
+        List<Field> boxedDataField = 
ReflectionSupport.findFields(KafkaOffsetBackingStore.class,
+                field -> "data".equals(field.getName()), 
HierarchyTraversalMode.TOP_DOWN);
+        // Unless the implementation of KafkaOffsetBackingStore changes it 
should be safe to get the data field
+        // as there should be exactly one.
+        @SuppressWarnings("unchecked")
+        HashMap<ByteBuffer, ByteBuffer> data = (HashMap<ByteBuffer, 
ByteBuffer>) ReflectionSupport.tryToReadFieldValue(
+                boxedDataField.get(0), store).get();
         assertEquals(TP0_VALUE_NEW, data.get(TP0_KEY));
         assertEquals(TP1_VALUE_NEW, data.get(TP1_KEY));
 
-        store.stop();
+        /* Clean up */
 
-        PowerMock.verifyAll();
+        store.stop();
     }
 
     @Test
     public void testGetSet() throws Exception {
-        expectConfigure();
-        expectStart(Collections.emptyList());
-        expectStop();
-
-        // First get() against an empty store
-        final Capture<Callback<Void>> firstGetReadToEndCallback = 
EasyMock.newCapture();
-        storeLog.readToEnd(EasyMock.capture(firstGetReadToEndCallback));
-        PowerMock.expectLastCall().andAnswer(() -> {
-            firstGetReadToEndCallback.getValue().onCompletion(null, null);
-            return null;
-        });
-
-        // Set offsets
-        Capture<org.apache.kafka.clients.producer.Callback> callback0 = 
EasyMock.newCapture();
-        storeLog.send(EasyMock.aryEq(TP0_KEY.array()), 
EasyMock.aryEq(TP0_VALUE.array()), EasyMock.capture(callback0));
-        PowerMock.expectLastCall();
-        Capture<org.apache.kafka.clients.producer.Callback> callback1 = 
EasyMock.newCapture();
-        storeLog.send(EasyMock.aryEq(TP1_KEY.array()), 
EasyMock.aryEq(TP1_VALUE.array()), EasyMock.capture(callback1));
-        PowerMock.expectLastCall();
+        /* Setup */
 
-        // Second get() should get the produced data and return the new values
-        final Capture<Callback<Void>> secondGetReadToEndCallback = 
EasyMock.newCapture();
-        storeLog.readToEnd(EasyMock.capture(secondGetReadToEndCallback));
-        PowerMock.expectLastCall().andAnswer(() -> {
+        @SuppressWarnings("unchecked")
+        final ArgumentCaptor<Callback<Void>> storeLogCallbackArgumentCaptor = 
ArgumentCaptor.forClass(Callback.class);
+        doAnswer(invocation -> {
+            // First get() against an empty store
+            storeLogCallbackArgumentCaptor.getValue().onCompletion(null, null);
+            return null;
+        }).
+        doAnswer(invocation -> {
+            // Second get() should get the produced data and return the new 
values
             capturedConsumedCallback.getValue().onCompletion(null,
-                new ConsumerRecord<>(TOPIC, 0, 0, 0L, 
TimestampType.CREATE_TIME, 0, 0, TP0_KEY.array(), TP0_VALUE.array(),
-                    new RecordHeaders(), Optional.empty()));
+                    new ConsumerRecord<>(TOPIC, 0, 0, 0L, 
TimestampType.CREATE_TIME, 0, 0, TP0_KEY.array(), TP0_VALUE.array(),
+                            new RecordHeaders(), Optional.empty()));
             capturedConsumedCallback.getValue().onCompletion(null,
-                new ConsumerRecord<>(TOPIC, 1, 0, 0L, 
TimestampType.CREATE_TIME, 0, 0, TP1_KEY.array(), TP1_VALUE.array(),
-                    new RecordHeaders(), Optional.empty()));
-            secondGetReadToEndCallback.getValue().onCompletion(null, null);
+                    new ConsumerRecord<>(TOPIC, 1, 0, 0L, 
TimestampType.CREATE_TIME, 0, 0, TP1_KEY.array(), TP1_VALUE.array(),
+                            new RecordHeaders(), Optional.empty()));
+            storeLogCallbackArgumentCaptor.getValue().onCompletion(null, null);
             return null;
-        });
-
-        // Third get() should pick up data produced by someone else and return 
those values
-        final Capture<Callback<Void>> thirdGetReadToEndCallback = 
EasyMock.newCapture();
-        storeLog.readToEnd(EasyMock.capture(thirdGetReadToEndCallback));
-        PowerMock.expectLastCall().andAnswer(() -> {
+        }).doAnswer(invocation -> {
+            // Third get() should pick up data produced by someone else and 
return those values
             capturedConsumedCallback.getValue().onCompletion(null,
-                new ConsumerRecord<>(TOPIC, 0, 1, 0L, 
TimestampType.CREATE_TIME, 0, 0, TP0_KEY.array(), TP0_VALUE_NEW.array(),
-                    new RecordHeaders(), Optional.empty()));
+                    new ConsumerRecord<>(TOPIC, 0, 1, 0L, 
TimestampType.CREATE_TIME, 0, 0, TP0_KEY.array(), TP0_VALUE_NEW.array(),
+                            new RecordHeaders(), Optional.empty()));
             capturedConsumedCallback.getValue().onCompletion(null,
-                new ConsumerRecord<>(TOPIC, 1, 1, 0L, 
TimestampType.CREATE_TIME, 0, 0, TP1_KEY.array(), TP1_VALUE_NEW.array(),
-                    new RecordHeaders(), Optional.empty()));
-            thirdGetReadToEndCallback.getValue().onCompletion(null, null);
+                    new ConsumerRecord<>(TOPIC, 1, 1, 0L, 
TimestampType.CREATE_TIME, 0, 0, TP1_KEY.array(), TP1_VALUE_NEW.array(),
+                            new RecordHeaders(), Optional.empty()));
+            storeLogCallbackArgumentCaptor.getValue().onCompletion(null, null);
             return null;
-        });
+        }).when(storeLog).readToEnd(storeLogCallbackArgumentCaptor.capture());

Review Comment:
   I hope I have addressed this in the upcoming commit.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java:
##########
@@ -142,97 +163,90 @@ public void testStartStop() throws Exception {
         assertEquals(TOPIC_PARTITIONS, 
capturedNewTopic.getValue().numPartitions());
         assertEquals(TOPIC_REPLICATION_FACTOR, 
capturedNewTopic.getValue().replicationFactor());
 
-        store.start();
-        store.stop();
+        /* Clean up */
 
-        PowerMock.verifyAll();
+        store.stop();
     }
 
     @Test
     public void testReloadOnStart() throws Exception {
-        expectConfigure();
-        expectStart(Arrays.asList(
-                new ConsumerRecord<>(TOPIC, 0, 0, 0L, 
TimestampType.CREATE_TIME, 0, 0, TP0_KEY.array(), TP0_VALUE.array(),
-                    new RecordHeaders(), Optional.empty()),
-                new ConsumerRecord<>(TOPIC, 1, 0, 0L, 
TimestampType.CREATE_TIME, 0, 0, TP1_KEY.array(), TP1_VALUE.array(),
-                    new RecordHeaders(), Optional.empty()),
-                new ConsumerRecord<>(TOPIC, 0, 1, 0L, 
TimestampType.CREATE_TIME, 0, 0, TP0_KEY.array(), TP0_VALUE_NEW.array(),
-                    new RecordHeaders(), Optional.empty()),
-                new ConsumerRecord<>(TOPIC, 1, 1, 0L, 
TimestampType.CREATE_TIME, 0, 0, TP1_KEY.array(), TP1_VALUE_NEW.array(),
-                    new RecordHeaders(), Optional.empty())
-        ));
-        expectStop();
-        expectClusterId();
-
-        PowerMock.replayAll();
+        /* Setup */
+
+        doAnswer(invocation -> {
+            capturedConsumedCallback.getValue().onCompletion(null, new 
ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, 
TP0_KEY.array(), TP0_VALUE.array(),
+                    new RecordHeaders(), Optional.empty()));
+            capturedConsumedCallback.getValue().onCompletion(null, new 
ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, 0, 0, 
TP1_KEY.array(), TP1_VALUE.array(),
+                    new RecordHeaders(), Optional.empty()));
+            capturedConsumedCallback.getValue().onCompletion(null, new 
ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0, 0, 
TP0_KEY.array(), TP0_VALUE_NEW.array(),
+                    new RecordHeaders(), Optional.empty()));
+            capturedConsumedCallback.getValue().onCompletion(null, new 
ConsumerRecord<>(TOPIC, 1, 1, 0L, TimestampType.CREATE_TIME, 0, 0, 
TP1_KEY.array(), TP1_VALUE_NEW.array(),
+                    new RecordHeaders(), Optional.empty()));
+            return null;
+        }).when(storeLog).start();
+
+        /* Execution */
 
         store.configure(DEFAULT_DISTRIBUTED_CONFIG);
         store.start();
-        HashMap<ByteBuffer, ByteBuffer> data = 
Whitebox.getInternalState(store, "data");
+
+        /* Assertions */
+
+        // Unless the implementation of KafkaOffsetBackingStore changes there 
should only ever be 1 data field.
+        List<Field> boxedDataField = 
ReflectionSupport.findFields(KafkaOffsetBackingStore.class,
+                field -> "data".equals(field.getName()), 
HierarchyTraversalMode.TOP_DOWN);
+        // Unless the implementation of KafkaOffsetBackingStore changes it 
should be safe to get the data field
+        // as there should be exactly one.

Review Comment:
   Addressed in the upcoming commit.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java:
##########
@@ -142,97 +163,90 @@ public void testStartStop() throws Exception {
         assertEquals(TOPIC_PARTITIONS, 
capturedNewTopic.getValue().numPartitions());
         assertEquals(TOPIC_REPLICATION_FACTOR, 
capturedNewTopic.getValue().replicationFactor());
 
-        store.start();
-        store.stop();
+        /* Clean up */
 
-        PowerMock.verifyAll();

Review Comment:
   Okay, I have gone through the tests again and wherever we start or stop the 
store I have verified that we are invoking start or stop on the storeLog. I 
haven't noticed something else which needs to be verified, but I am happy to 
change this if you notice something. I have also removed the sections for 
setup/execution/assertions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to