kirktrue commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1621334851


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -1290,4 +1290,4 @@ static class MemberInfo {
             this.memberEpoch = Optional.empty();
         }
     }
-}
+}

Review Comment:
   We should revert/fix this change as it's whitespace only.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##########
@@ -53,89 +39,111 @@
 import org.junit.jupiter.params.provider.ValueSource;
 
 import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Optional;
+import java.util.*;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingQueue;
 
-import static 
org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_HEARTBEAT_INTERVAL_MS;
-import static 
org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_REQUEST_TIMEOUT_MS;
-import static 
org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.createDefaultGroupInformation;
 import static 
org.apache.kafka.clients.consumer.internals.events.CompletableEvent.calculateDeadlineMs;
 import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS;
-import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
+import static org.junit.jupiter.api.Assertions.*;
 import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
 
 public class ConsumerNetworkThreadTest {
+    static final int DEFAULT_HEARTBEAT_INTERVAL_MS = 1000;
+    static final int DEFAULT_REQUEST_TIMEOUT_MS = 500;
+
+    private final Time time;
+    private final ConsumerMetadata metadata;
+    private final BlockingQueue<ApplicationEvent> applicationEventsQueue;
+    private final ApplicationEventProcessor applicationEventProcessor;
+    private final OffsetsRequestManager offsetsRequestManager;
+    private final HeartbeatRequestManager heartbeatRequestManager;
+    private final CoordinatorRequestManager coordinatorRequestManager;
+    private final ConsumerNetworkThread consumerNetworkThread;
+    private final MockClient client;
+    private final NetworkClientDelegate networkClientDelegate;
+    private final NetworkClientDelegate networkClient;
+    private final RequestManagers requestManagers;
+    private final CompletableEventReaper applicationEventReaper;
+
+    ConsumerNetworkThreadTest() {
+        LogContext logContext = new LogContext();
+        ConsumerConfig config = mock(ConsumerConfig.class);
+        this.time = new MockTime();
+        this.networkClientDelegate = mock(NetworkClientDelegate.class);
+        this.requestManagers = mock(RequestManagers.class);
+        this.offsetsRequestManager = mock(OffsetsRequestManager.class);
+        this.heartbeatRequestManager = mock(HeartbeatRequestManager.class);
+        this.coordinatorRequestManager = mock(CoordinatorRequestManager.class);
+        this.applicationEventsQueue = new LinkedBlockingQueue<>();
+        this.metadata = mock(ConsumerMetadata.class);
+        this.applicationEventProcessor = mock(ApplicationEventProcessor.class);
+        this.applicationEventReaper = mock(CompletableEventReaper.class);
+        this.client = new MockClient(time);
+
+        this.networkClient = new NetworkClientDelegate(
+                time,
+                config,
+                logContext,
+                client
+        );
 
-    private ConsumerTestBuilder testBuilder;
-    private Time time;
-    private ConsumerMetadata metadata;
-    private NetworkClientDelegate networkClient;
-    private BlockingQueue<ApplicationEvent> applicationEventsQueue;
-    private ApplicationEventProcessor applicationEventProcessor;
-    private OffsetsRequestManager offsetsRequestManager;
-    private CommitRequestManager commitRequestManager;
-    private CoordinatorRequestManager coordinatorRequestManager;
-    private ConsumerNetworkThread consumerNetworkThread;
-    private final CompletableEventReaper applicationEventReaper = 
mock(CompletableEventReaper.class);
-    private MockClient client;
-
-    @BeforeEach
-    public void setup() {
-        testBuilder = new ConsumerTestBuilder(createDefaultGroupInformation());
-        time = testBuilder.time;
-        metadata = testBuilder.metadata;
-        networkClient = testBuilder.networkClientDelegate;
-        client = testBuilder.client;
-        applicationEventsQueue = testBuilder.applicationEventQueue;
-        applicationEventProcessor = testBuilder.applicationEventProcessor;
-        commitRequestManager = 
testBuilder.commitRequestManager.orElseThrow(IllegalStateException::new);
-        offsetsRequestManager = testBuilder.offsetsRequestManager;
-        coordinatorRequestManager = 
testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new);
-        consumerNetworkThread = new ConsumerNetworkThread(
-                testBuilder.logContext,
+        this.consumerNetworkThread = new ConsumerNetworkThread(
+                logContext,
                 time,
-                testBuilder.applicationEventQueue,
+                applicationEventsQueue,
                 applicationEventReaper,
                 () -> applicationEventProcessor,
-                () -> testBuilder.networkClientDelegate,
-                () -> testBuilder.requestManagers
+                () -> networkClientDelegate,
+                () -> requestManagers
         );
+    }
+
+    @BeforeEach
+    public void setup() {
         consumerNetworkThread.initializeResources();
     }
 
     @AfterEach
     public void tearDown() {
-        if (testBuilder != null) {
-            testBuilder.close();
-            consumerNetworkThread.close(Duration.ZERO);
-        }
+        if (consumerNetworkThread != null)
+            consumerNetworkThread.close();
+    }
+
+    @Test
+    public void testEnsureCloseStopsRunningThread() {
+        // consumerNetworkThread.running is set to true in the constructor
+        assertTrue(consumerNetworkThread.isRunning());
+
+        // close() should make consumerNetworkThread.running false by calling 
closeInternal(Duration timeout)
+        consumerNetworkThread.close();
+        assertFalse(consumerNetworkThread.isRunning());
+    }
+
+    @ParameterizedTest
+    @ValueSource(longs = {1, 100, 1000, 4999, 5001})
+    public void testConsumerNetworkThreadWaitTimeComputations(long 
exampleTime) {
+        List<Optional<? extends RequestManager>> requestManagersList = new 
ArrayList<>();
+        requestManagersList.add(Optional.of(coordinatorRequestManager));
+        when(requestManagers.entries()).thenReturn(requestManagersList);

Review Comment:
   Nit, but could this be:
   
   ```suggestion
           
when(requestManagers.entries()).thenReturn(Collections.singletonList(Optional.of(coordinatorRequestManager)));
   ```



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##########
@@ -149,20 +157,28 @@ public void testStartupAndTearDown() throws 
InterruptedException {
                 "The consumer network thread did not stop within " + 
DEFAULT_MAX_WAIT_MS + " ms");
     }
 
+    @Test
+    void testRequestManagersArePolledOnce() {
+        consumerNetworkThread.runOnce();
+        requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> 
verify(rm, times(1)).poll(anyLong())));
+        requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> 
verify(rm, times(1)).maximumTimeToWait(anyLong())));
+        verify(networkClientDelegate).poll(anyLong(), anyLong());
+    }
+
     @Test
     public void testApplicationEvent() {
         ApplicationEvent e = new PollEvent(100);
         applicationEventsQueue.add(e);
         consumerNetworkThread.runOnce();
-        verify(applicationEventProcessor, times(1)).process(e);
+        verify(applicationEventProcessor).process(e);

Review Comment:
   I'm curious as to this change. Is this something that was incorrect as is? 🤔 



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##########
@@ -379,13 +403,4 @@ private MockClient.RequestMatcher 
offsetCommitRequestMatcher(final Map<TopicPart
             return true;
         };
     }
-
-    private HashMap<TopicPartition, OffsetAndMetadata> 
mockTopicPartitionOffset() {
-        final TopicPartition t0 = new TopicPartition("t0", 2);
-        final TopicPartition t1 = new TopicPartition("t0", 3);
-        HashMap<TopicPartition, OffsetAndMetadata> topicPartitionOffsets = new 
HashMap<>();
-        topicPartitionOffsets.put(t0, new OffsetAndMetadata(10L));
-        topicPartitionOffsets.put(t1, new OffsetAndMetadata(20L));
-        return topicPartitionOffsets;
-    }
-}
+}

Review Comment:
   ```suggestion
   }
   
   ```



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##########
@@ -268,32 +281,31 @@ void testPollResultTimer() {
 
     @Test
     void testMaximumTimeToWait() {
+        List<Optional<? extends RequestManager>> list = new ArrayList<>();
+        list.add(Optional.of(heartbeatRequestManager));
         // Initial value before runOnce has been called
         assertEquals(ConsumerNetworkThread.MAX_POLL_TIMEOUT_MS, 
consumerNetworkThread.maximumTimeToWait());
+
+        when(requestManagers.entries()).thenReturn(list);

Review Comment:
   Minor, but it seems like this line could be more succinct like:
   
   ```suggestion
           
when(requestManagers.entries()).thenReturn(Collections.singletonList(Optional.of(heartbeatRequestManager)));
   ```



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##########
@@ -314,7 +326,7 @@ void testEnsureEventsAreCompleted() {
         coordinatorRequestManager.markCoordinatorUnknown("test", 
time.milliseconds());
         
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
"group-id", node));
         prepareOffsetCommitRequest(new HashMap<>(), Errors.NONE, false);
-        CompletableApplicationEvent<Void> event1 = spy(new 
AsyncCommitEvent(Collections.emptyMap()));
+        CompletableApplicationEvent<Void> event1 = 
mock(AsyncCommitEvent.class);

Review Comment:
   👍 



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##########
@@ -327,8 +339,11 @@ void testEnsureEventsAreCompleted() {
         assertTrue(applicationEventsQueue.isEmpty());
     }
 
+    // Look into this one

Review Comment:
   Can you elaborate on this comment?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to