hgeraldino commented on code in PR #13191: URL: https://github.com/apache/kafka/pull/13191#discussion_r1112456303
########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java: ########## @@ -639,144 +644,112 @@ public void testTopicCreateSucceedsWhenCreateReturnsNewTopicFound() { SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); - expectPreliminaryCalls(); - EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap()); + expectSendRecord(emptyHeaders()); - Capture<NewTopic> newTopicCapture = EasyMock.newCapture(); - EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))).andReturn(createdTopic(TOPIC)); - - expectSendRecord(); - expectSendRecord(); - - PowerMock.replayAll(); + when(admin.describeTopics(TOPIC)).thenReturn(Collections.emptyMap()); + when(admin.createOrFindTopics(any(NewTopic.class))).thenReturn(createdTopic(TOPIC)); workerTask.toSend = Arrays.asList(record1, record2); workerTask.sendRecords(); + + ArgumentCaptor<ProducerRecord<byte[], byte[]>> sent = verifySendRecord(2); + + List<ProducerRecord<byte[], byte[]>> capturedValues = sent.getAllValues(); + assertEquals(2, capturedValues.size()); } - private Capture<ProducerRecord<byte[], byte[]>> expectSendRecord( - String topic, - boolean anyTimes, - Headers headers - ) { + private void expectSendRecord(Headers headers) { if (headers != null) - expectConvertHeadersAndKeyValue(topic, anyTimes, headers); + expectConvertHeadersAndKeyValue(headers); - expectApplyTransformationChain(anyTimes); + expectApplyTransformationChain(); - Capture<ProducerRecord<byte[], byte[]>> sent = EasyMock.newCapture(); - - IExpectationSetters<Future<RecordMetadata>> expect = EasyMock.expect( - producer.send(EasyMock.capture(sent), EasyMock.capture(producerCallbacks))); + expectTaskGetTopic(); + } - IAnswer<Future<RecordMetadata>> expectResponse = () -> { - synchronized (producerCallbacks) { - for (Callback cb : producerCallbacks.getValues()) { - cb.onCompletion(new RecordMetadata(new TopicPartition("foo", 0), 0, 0, 0L, 0, 0), null); - } - producerCallbacks.reset(); - } - return null; - }; + private ArgumentCaptor<ProducerRecord<byte[], byte[]>> verifySendRecord() { + return verifySendRecord(1); + } - if (anyTimes) - expect.andStubAnswer(expectResponse); - else - expect.andAnswer(expectResponse); + private ArgumentCaptor<ProducerRecord<byte[], byte[]>> verifySendRecord(int times) { + ArgumentCaptor<ProducerRecord<byte[], byte[]>> sent = ArgumentCaptor.forClass(ProducerRecord.class); + ArgumentCaptor<Callback> producerCallbacks = ArgumentCaptor.forClass(Callback.class); + verify(producer, times(times)).send(sent.capture(), producerCallbacks.capture()); - expectTaskGetTopic(anyTimes); + for (Callback cb : producerCallbacks.getAllValues()) { + cb.onCompletion(new RecordMetadata(new TopicPartition("foo", 0), 0, 0, 0L, 0, 0), + null); + } return sent; } - private Capture<ProducerRecord<byte[], byte[]>> expectSendRecordAnyTimes() { - return expectSendRecord(TOPIC, true, emptyHeaders()); + private void expectTaskGetTopic() { + when(statusBackingStore.getTopic(anyString(), anyString())).thenAnswer((Answer<TopicStatus>) invocation -> { + String connector = invocation.getArgument(0, String.class); + String topic = invocation.getArgument(1, String.class); + return new TopicStatus(topic, new ConnectorTaskId(connector, 0), Time.SYSTEM.milliseconds()); + }); } - private Capture<ProducerRecord<byte[], byte[]>> expectSendRecord() { - return expectSendRecord(TOPIC, false, emptyHeaders()); - } + private void verifyTaskGetTopic() { + ArgumentCaptor<String> connectorCapture = ArgumentCaptor.forClass(String.class); + ArgumentCaptor<String> topicCapture = ArgumentCaptor.forClass(String.class); + ArgumentCaptor<NewTopic> newTopicCapture = ArgumentCaptor.forClass(NewTopic.class); + verify(statusBackingStore).getTopic(connectorCapture.capture(), topicCapture.capture()); - private void expectTaskGetTopic(boolean anyTimes) { - final Capture<String> connectorCapture = EasyMock.newCapture(); - final Capture<String> topicCapture = EasyMock.newCapture(); - IExpectationSetters<TopicStatus> expect = EasyMock.expect(statusBackingStore.getTopic( - EasyMock.capture(connectorCapture), - EasyMock.capture(topicCapture))); - if (anyTimes) { - expect.andStubAnswer(() -> new TopicStatus( - topicCapture.getValue(), - new ConnectorTaskId(connectorCapture.getValue(), 0), - Time.SYSTEM.milliseconds())); - } else { - expect.andAnswer(() -> new TopicStatus( - topicCapture.getValue(), - new ConnectorTaskId(connectorCapture.getValue(), 0), - Time.SYSTEM.milliseconds())); - } - if (connectorCapture.hasCaptured() && topicCapture.hasCaptured()) { - assertEquals("job", connectorCapture.getValue()); - assertEquals(TOPIC, topicCapture.getValue()); - } + assertEquals("job", connectorCapture.getValue()); + assertEquals(TOPIC, topicCapture.getValue()); + + verify(admin).createOrFindTopics(newTopicCapture.capture()); + assertEquals(TOPIC, newTopicCapture.getValue().name()); Review Comment: Makes sense. Moved the `createOrFindTopics` verification to a separate method -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org