clolov commented on code in PR #12418: URL: https://github.com/apache/kafka/pull/12418#discussion_r937576532
########## connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java: ########## @@ -103,36 +103,57 @@ public class KafkaOffsetBackingStoreTest { private static final ByteBuffer TP0_VALUE_NEW = buffer("VAL0_NEW"); private static final ByteBuffer TP1_VALUE_NEW = buffer("VAL1_NEW"); - @Mock - KafkaBasedLog<byte[], byte[]> storeLog; - private KafkaOffsetBackingStore store; - - private Capture<String> capturedTopic = EasyMock.newCapture(); - private Capture<Map<String, Object>> capturedProducerProps = EasyMock.newCapture(); - private Capture<Map<String, Object>> capturedConsumerProps = EasyMock.newCapture(); - private Capture<Supplier<TopicAdmin>> capturedAdminSupplier = EasyMock.newCapture(); - private Capture<NewTopic> capturedNewTopic = EasyMock.newCapture(); - private Capture<Callback<ConsumerRecord<byte[], byte[]>>> capturedConsumedCallback = EasyMock.newCapture(); + @SuppressWarnings("unchecked") Review Comment: Hopefully addressed in the upcoming commit. ########## connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java: ########## @@ -142,97 +163,90 @@ public void testStartStop() throws Exception { assertEquals(TOPIC_PARTITIONS, capturedNewTopic.getValue().numPartitions()); assertEquals(TOPIC_REPLICATION_FACTOR, capturedNewTopic.getValue().replicationFactor()); - store.start(); - store.stop(); + /* Clean up */ - PowerMock.verifyAll(); + store.stop(); } @Test public void testReloadOnStart() throws Exception { - expectConfigure(); - expectStart(Arrays.asList( - new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, TP0_KEY.array(), TP0_VALUE.array(), - new RecordHeaders(), Optional.empty()), - new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, 0, 0, TP1_KEY.array(), TP1_VALUE.array(), - new RecordHeaders(), Optional.empty()), - new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TP0_KEY.array(), TP0_VALUE_NEW.array(), - new RecordHeaders(), Optional.empty()), - new ConsumerRecord<>(TOPIC, 1, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TP1_KEY.array(), TP1_VALUE_NEW.array(), - new RecordHeaders(), Optional.empty()) - )); - expectStop(); - expectClusterId(); - - PowerMock.replayAll(); + /* Setup */ + + doAnswer(invocation -> { + capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, TP0_KEY.array(), TP0_VALUE.array(), + new RecordHeaders(), Optional.empty())); + capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, 0, 0, TP1_KEY.array(), TP1_VALUE.array(), + new RecordHeaders(), Optional.empty())); + capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TP0_KEY.array(), TP0_VALUE_NEW.array(), + new RecordHeaders(), Optional.empty())); + capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 1, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TP1_KEY.array(), TP1_VALUE_NEW.array(), + new RecordHeaders(), Optional.empty())); + return null; + }).when(storeLog).start(); + + /* Execution */ store.configure(DEFAULT_DISTRIBUTED_CONFIG); store.start(); - HashMap<ByteBuffer, ByteBuffer> data = Whitebox.getInternalState(store, "data"); + + /* Assertions */ + + // Unless the implementation of KafkaOffsetBackingStore changes there should only ever be 1 data field. + List<Field> boxedDataField = ReflectionSupport.findFields(KafkaOffsetBackingStore.class, + field -> "data".equals(field.getName()), HierarchyTraversalMode.TOP_DOWN); + // Unless the implementation of KafkaOffsetBackingStore changes it should be safe to get the data field + // as there should be exactly one. + @SuppressWarnings("unchecked") + HashMap<ByteBuffer, ByteBuffer> data = (HashMap<ByteBuffer, ByteBuffer>) ReflectionSupport.tryToReadFieldValue( + boxedDataField.get(0), store).get(); assertEquals(TP0_VALUE_NEW, data.get(TP0_KEY)); assertEquals(TP1_VALUE_NEW, data.get(TP1_KEY)); - store.stop(); + /* Clean up */ - PowerMock.verifyAll(); + store.stop(); } @Test public void testGetSet() throws Exception { - expectConfigure(); - expectStart(Collections.emptyList()); - expectStop(); - - // First get() against an empty store - final Capture<Callback<Void>> firstGetReadToEndCallback = EasyMock.newCapture(); - storeLog.readToEnd(EasyMock.capture(firstGetReadToEndCallback)); - PowerMock.expectLastCall().andAnswer(() -> { - firstGetReadToEndCallback.getValue().onCompletion(null, null); - return null; - }); - - // Set offsets - Capture<org.apache.kafka.clients.producer.Callback> callback0 = EasyMock.newCapture(); - storeLog.send(EasyMock.aryEq(TP0_KEY.array()), EasyMock.aryEq(TP0_VALUE.array()), EasyMock.capture(callback0)); - PowerMock.expectLastCall(); - Capture<org.apache.kafka.clients.producer.Callback> callback1 = EasyMock.newCapture(); - storeLog.send(EasyMock.aryEq(TP1_KEY.array()), EasyMock.aryEq(TP1_VALUE.array()), EasyMock.capture(callback1)); - PowerMock.expectLastCall(); + /* Setup */ - // Second get() should get the produced data and return the new values - final Capture<Callback<Void>> secondGetReadToEndCallback = EasyMock.newCapture(); - storeLog.readToEnd(EasyMock.capture(secondGetReadToEndCallback)); - PowerMock.expectLastCall().andAnswer(() -> { + @SuppressWarnings("unchecked") + final ArgumentCaptor<Callback<Void>> storeLogCallbackArgumentCaptor = ArgumentCaptor.forClass(Callback.class); + doAnswer(invocation -> { + // First get() against an empty store + storeLogCallbackArgumentCaptor.getValue().onCompletion(null, null); + return null; + }). + doAnswer(invocation -> { + // Second get() should get the produced data and return the new values capturedConsumedCallback.getValue().onCompletion(null, - new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, TP0_KEY.array(), TP0_VALUE.array(), - new RecordHeaders(), Optional.empty())); + new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, TP0_KEY.array(), TP0_VALUE.array(), + new RecordHeaders(), Optional.empty())); capturedConsumedCallback.getValue().onCompletion(null, - new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, 0, 0, TP1_KEY.array(), TP1_VALUE.array(), - new RecordHeaders(), Optional.empty())); - secondGetReadToEndCallback.getValue().onCompletion(null, null); + new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, 0, 0, TP1_KEY.array(), TP1_VALUE.array(), + new RecordHeaders(), Optional.empty())); + storeLogCallbackArgumentCaptor.getValue().onCompletion(null, null); return null; - }); - - // Third get() should pick up data produced by someone else and return those values - final Capture<Callback<Void>> thirdGetReadToEndCallback = EasyMock.newCapture(); - storeLog.readToEnd(EasyMock.capture(thirdGetReadToEndCallback)); - PowerMock.expectLastCall().andAnswer(() -> { + }).doAnswer(invocation -> { + // Third get() should pick up data produced by someone else and return those values capturedConsumedCallback.getValue().onCompletion(null, - new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TP0_KEY.array(), TP0_VALUE_NEW.array(), - new RecordHeaders(), Optional.empty())); + new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TP0_KEY.array(), TP0_VALUE_NEW.array(), + new RecordHeaders(), Optional.empty())); capturedConsumedCallback.getValue().onCompletion(null, - new ConsumerRecord<>(TOPIC, 1, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TP1_KEY.array(), TP1_VALUE_NEW.array(), - new RecordHeaders(), Optional.empty())); - thirdGetReadToEndCallback.getValue().onCompletion(null, null); + new ConsumerRecord<>(TOPIC, 1, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TP1_KEY.array(), TP1_VALUE_NEW.array(), + new RecordHeaders(), Optional.empty())); + storeLogCallbackArgumentCaptor.getValue().onCompletion(null, null); return null; - }); + }).when(storeLog).readToEnd(storeLogCallbackArgumentCaptor.capture()); Review Comment: I hope I have addressed this in the upcoming commit. ########## connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java: ########## @@ -142,97 +163,90 @@ public void testStartStop() throws Exception { assertEquals(TOPIC_PARTITIONS, capturedNewTopic.getValue().numPartitions()); assertEquals(TOPIC_REPLICATION_FACTOR, capturedNewTopic.getValue().replicationFactor()); - store.start(); - store.stop(); + /* Clean up */ - PowerMock.verifyAll(); + store.stop(); } @Test public void testReloadOnStart() throws Exception { - expectConfigure(); - expectStart(Arrays.asList( - new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, TP0_KEY.array(), TP0_VALUE.array(), - new RecordHeaders(), Optional.empty()), - new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, 0, 0, TP1_KEY.array(), TP1_VALUE.array(), - new RecordHeaders(), Optional.empty()), - new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TP0_KEY.array(), TP0_VALUE_NEW.array(), - new RecordHeaders(), Optional.empty()), - new ConsumerRecord<>(TOPIC, 1, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TP1_KEY.array(), TP1_VALUE_NEW.array(), - new RecordHeaders(), Optional.empty()) - )); - expectStop(); - expectClusterId(); - - PowerMock.replayAll(); + /* Setup */ + + doAnswer(invocation -> { + capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, TP0_KEY.array(), TP0_VALUE.array(), + new RecordHeaders(), Optional.empty())); + capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, 0, 0, TP1_KEY.array(), TP1_VALUE.array(), + new RecordHeaders(), Optional.empty())); + capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TP0_KEY.array(), TP0_VALUE_NEW.array(), + new RecordHeaders(), Optional.empty())); + capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 1, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TP1_KEY.array(), TP1_VALUE_NEW.array(), + new RecordHeaders(), Optional.empty())); + return null; + }).when(storeLog).start(); + + /* Execution */ store.configure(DEFAULT_DISTRIBUTED_CONFIG); store.start(); - HashMap<ByteBuffer, ByteBuffer> data = Whitebox.getInternalState(store, "data"); + + /* Assertions */ + + // Unless the implementation of KafkaOffsetBackingStore changes there should only ever be 1 data field. + List<Field> boxedDataField = ReflectionSupport.findFields(KafkaOffsetBackingStore.class, + field -> "data".equals(field.getName()), HierarchyTraversalMode.TOP_DOWN); + // Unless the implementation of KafkaOffsetBackingStore changes it should be safe to get the data field + // as there should be exactly one. Review Comment: Addressed in the upcoming commit. ########## connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java: ########## @@ -142,97 +163,90 @@ public void testStartStop() throws Exception { assertEquals(TOPIC_PARTITIONS, capturedNewTopic.getValue().numPartitions()); assertEquals(TOPIC_REPLICATION_FACTOR, capturedNewTopic.getValue().replicationFactor()); - store.start(); - store.stop(); + /* Clean up */ - PowerMock.verifyAll(); Review Comment: Okay, I have gone through the tests again and wherever we start or stop the store I have verified that we are invoking start or stop on the storeLog. I haven't noticed something else which needs to be verified, but I am happy to change this if you notice something. I have also removed the sections for setup/execution/assertions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org