kirktrue commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1621334851
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ########## @@ -1290,4 +1290,4 @@ static class MemberInfo { this.memberEpoch = Optional.empty(); } } -} +} Review Comment: We should revert/fix this change as it's whitespace only. ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ########## @@ -53,89 +39,111 @@ import org.junit.jupiter.params.provider.ValueSource; import java.time.Duration; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; -import java.util.Optional; +import java.util.*; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.LinkedBlockingQueue; -import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_HEARTBEAT_INTERVAL_MS; -import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_REQUEST_TIMEOUT_MS; -import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.createDefaultGroupInformation; import static org.apache.kafka.clients.consumer.internals.events.CompletableEvent.calculateDeadlineMs; import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS; -import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.ArgumentMatchers.any; +import static org.junit.jupiter.api.Assertions.*; import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; public class ConsumerNetworkThreadTest { + static final int DEFAULT_HEARTBEAT_INTERVAL_MS = 1000; + static final int DEFAULT_REQUEST_TIMEOUT_MS = 500; + + private final Time time; + private final ConsumerMetadata metadata; + private final BlockingQueue<ApplicationEvent> applicationEventsQueue; + private final ApplicationEventProcessor applicationEventProcessor; + private final OffsetsRequestManager offsetsRequestManager; + private final HeartbeatRequestManager heartbeatRequestManager; + private final CoordinatorRequestManager coordinatorRequestManager; + private final ConsumerNetworkThread consumerNetworkThread; + private final MockClient client; + private final NetworkClientDelegate networkClientDelegate; + private final NetworkClientDelegate networkClient; + private final RequestManagers requestManagers; + private final CompletableEventReaper applicationEventReaper; + + ConsumerNetworkThreadTest() { + LogContext logContext = new LogContext(); + ConsumerConfig config = mock(ConsumerConfig.class); + this.time = new MockTime(); + this.networkClientDelegate = mock(NetworkClientDelegate.class); + this.requestManagers = mock(RequestManagers.class); + this.offsetsRequestManager = mock(OffsetsRequestManager.class); + this.heartbeatRequestManager = mock(HeartbeatRequestManager.class); + this.coordinatorRequestManager = mock(CoordinatorRequestManager.class); + this.applicationEventsQueue = new LinkedBlockingQueue<>(); + this.metadata = mock(ConsumerMetadata.class); + this.applicationEventProcessor = mock(ApplicationEventProcessor.class); + this.applicationEventReaper = mock(CompletableEventReaper.class); + this.client = new MockClient(time); + + this.networkClient = new NetworkClientDelegate( + time, + config, + logContext, + client + ); - private ConsumerTestBuilder testBuilder; - private Time time; - private ConsumerMetadata metadata; - private NetworkClientDelegate networkClient; - private BlockingQueue<ApplicationEvent> applicationEventsQueue; - private ApplicationEventProcessor applicationEventProcessor; - private OffsetsRequestManager offsetsRequestManager; - private CommitRequestManager commitRequestManager; - private CoordinatorRequestManager coordinatorRequestManager; - private ConsumerNetworkThread consumerNetworkThread; - private final CompletableEventReaper applicationEventReaper = mock(CompletableEventReaper.class); - private MockClient client; - - @BeforeEach - public void setup() { - testBuilder = new ConsumerTestBuilder(createDefaultGroupInformation()); - time = testBuilder.time; - metadata = testBuilder.metadata; - networkClient = testBuilder.networkClientDelegate; - client = testBuilder.client; - applicationEventsQueue = testBuilder.applicationEventQueue; - applicationEventProcessor = testBuilder.applicationEventProcessor; - commitRequestManager = testBuilder.commitRequestManager.orElseThrow(IllegalStateException::new); - offsetsRequestManager = testBuilder.offsetsRequestManager; - coordinatorRequestManager = testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new); - consumerNetworkThread = new ConsumerNetworkThread( - testBuilder.logContext, + this.consumerNetworkThread = new ConsumerNetworkThread( + logContext, time, - testBuilder.applicationEventQueue, + applicationEventsQueue, applicationEventReaper, () -> applicationEventProcessor, - () -> testBuilder.networkClientDelegate, - () -> testBuilder.requestManagers + () -> networkClientDelegate, + () -> requestManagers ); + } + + @BeforeEach + public void setup() { consumerNetworkThread.initializeResources(); } @AfterEach public void tearDown() { - if (testBuilder != null) { - testBuilder.close(); - consumerNetworkThread.close(Duration.ZERO); - } + if (consumerNetworkThread != null) + consumerNetworkThread.close(); + } + + @Test + public void testEnsureCloseStopsRunningThread() { + // consumerNetworkThread.running is set to true in the constructor + assertTrue(consumerNetworkThread.isRunning()); + + // close() should make consumerNetworkThread.running false by calling closeInternal(Duration timeout) + consumerNetworkThread.close(); + assertFalse(consumerNetworkThread.isRunning()); + } + + @ParameterizedTest + @ValueSource(longs = {1, 100, 1000, 4999, 5001}) + public void testConsumerNetworkThreadWaitTimeComputations(long exampleTime) { + List<Optional<? extends RequestManager>> requestManagersList = new ArrayList<>(); + requestManagersList.add(Optional.of(coordinatorRequestManager)); + when(requestManagers.entries()).thenReturn(requestManagersList); Review Comment: Nit, but could this be: ```suggestion when(requestManagers.entries()).thenReturn(Collections.singletonList(Optional.of(coordinatorRequestManager))); ``` ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ########## @@ -149,20 +157,28 @@ public void testStartupAndTearDown() throws InterruptedException { "The consumer network thread did not stop within " + DEFAULT_MAX_WAIT_MS + " ms"); } + @Test + void testRequestManagersArePolledOnce() { + consumerNetworkThread.runOnce(); + requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm, times(1)).poll(anyLong()))); + requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm, times(1)).maximumTimeToWait(anyLong()))); + verify(networkClientDelegate).poll(anyLong(), anyLong()); + } + @Test public void testApplicationEvent() { ApplicationEvent e = new PollEvent(100); applicationEventsQueue.add(e); consumerNetworkThread.runOnce(); - verify(applicationEventProcessor, times(1)).process(e); + verify(applicationEventProcessor).process(e); Review Comment: I'm curious as to this change. Is this something that was incorrect as is? 🤔 ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ########## @@ -379,13 +403,4 @@ private MockClient.RequestMatcher offsetCommitRequestMatcher(final Map<TopicPart return true; }; } - - private HashMap<TopicPartition, OffsetAndMetadata> mockTopicPartitionOffset() { - final TopicPartition t0 = new TopicPartition("t0", 2); - final TopicPartition t1 = new TopicPartition("t0", 3); - HashMap<TopicPartition, OffsetAndMetadata> topicPartitionOffsets = new HashMap<>(); - topicPartitionOffsets.put(t0, new OffsetAndMetadata(10L)); - topicPartitionOffsets.put(t1, new OffsetAndMetadata(20L)); - return topicPartitionOffsets; - } -} +} Review Comment: ```suggestion } ``` ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ########## @@ -268,32 +281,31 @@ void testPollResultTimer() { @Test void testMaximumTimeToWait() { + List<Optional<? extends RequestManager>> list = new ArrayList<>(); + list.add(Optional.of(heartbeatRequestManager)); // Initial value before runOnce has been called assertEquals(ConsumerNetworkThread.MAX_POLL_TIMEOUT_MS, consumerNetworkThread.maximumTimeToWait()); + + when(requestManagers.entries()).thenReturn(list); Review Comment: Minor, but it seems like this line could be more succinct like: ```suggestion when(requestManagers.entries()).thenReturn(Collections.singletonList(Optional.of(heartbeatRequestManager))); ``` ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ########## @@ -314,7 +326,7 @@ void testEnsureEventsAreCompleted() { coordinatorRequestManager.markCoordinatorUnknown("test", time.milliseconds()); client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "group-id", node)); prepareOffsetCommitRequest(new HashMap<>(), Errors.NONE, false); - CompletableApplicationEvent<Void> event1 = spy(new AsyncCommitEvent(Collections.emptyMap())); + CompletableApplicationEvent<Void> event1 = mock(AsyncCommitEvent.class); Review Comment: 👍 ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ########## @@ -327,8 +339,11 @@ void testEnsureEventsAreCompleted() { assertTrue(applicationEventsQueue.isEmpty()); } + // Look into this one Review Comment: Can you elaborate on this comment? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org