lianetm commented on code in PR #16694:
URL: https://github.com/apache/kafka/pull/16694#discussion_r1722269401


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##########
@@ -746,6 +746,37 @@ public void 
testSendingLeaveGroupHeartbeatWhenPreviousOneInFlight(final short ve
         NetworkClientDelegate.PollResult pollAgain = 
heartbeatRequestManager.poll(time.milliseconds());
         assertEquals(0, pollAgain.unsentRequests.size());
     }
+    
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.CONSUMER_GROUP_HEARTBEAT)
+    public void 
testConsumerAcksReconciledAssignmentWithFirstHeartBeatAckLost(final short 
version) {

Review Comment:
   nit: would this be simpler/clearer maybe? 
testConsumerAcksReconciledAssignmentAfterAckLost



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##########
@@ -746,6 +746,37 @@ public void 
testSendingLeaveGroupHeartbeatWhenPreviousOneInFlight(final short ve
         NetworkClientDelegate.PollResult pollAgain = 
heartbeatRequestManager.poll(time.milliseconds());
         assertEquals(0, pollAgain.unsentRequests.size());
     }
+    
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.CONSUMER_GROUP_HEARTBEAT)
+    public void 
testConsumerAcksReconciledAssignmentWithFirstHeartBeatAckLost(final short 
version) {
+        // 1. complete reconciliation
+        createHeartbeatStatAndRequestManager();
+        String topic = "topic1";
+        int exceededTimeMs = 5;
+        Set<String> set = Collections.singleton(topic);
+        when(subscriptions.subscription()).thenReturn(set);
+        subscriptions.subscribe(set, Optional.empty());
+        mockReconcilingMemberData();
+        // 2. send heartbeat1 to ack assignment tp0
+        time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
+        NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+        // 3. HB1 times out
+        result.unsentRequests.get(0)
+                .handler()
+                .onFailure(time.milliseconds(), new 
TimeoutException("timeout"));
+        // 4. heartbeat request manager resets the sentFields to null 
HeartbeatState.reset()
+        time.sleep(DEFAULT_MAX_POLL_INTERVAL_MS + exceededTimeMs);
+        assertHeartbeat(heartbeatRequestManager, 
DEFAULT_HEARTBEAT_INTERVAL_MS);
+        verify(heartbeatRequestState).reset();
+        // 5. following HB will include tp0 (and act as ack), tp0 != null
+        result = heartbeatRequestManager.poll(time.milliseconds());
+        NetworkClientDelegate.UnsentRequest request = 
result.unsentRequests.get(0);
+        ConsumerGroupHeartbeatRequest heartbeatRequest =
+                (ConsumerGroupHeartbeatRequest) 
request.requestBuilder().build(version);
+        
+        assertEquals(Collections.singletonList(topic), 
heartbeatRequest.data().subscribedTopicNames());

Review Comment:
   In this situation (ack lost), we expect that the member should resend the 
partitions, not only the topic names. So should we assert it does? (we probably 
need to pass the partitions into the `mockReconcilingMemberData`, to be 
returned in the currentAssignment on ln 919, and then assert that the same 
partitions are indeed in the `heartbeatRequest.data().topicPartitions()`



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##########
@@ -746,6 +746,37 @@ public void 
testSendingLeaveGroupHeartbeatWhenPreviousOneInFlight(final short ve
         NetworkClientDelegate.PollResult pollAgain = 
heartbeatRequestManager.poll(time.milliseconds());
         assertEquals(0, pollAgain.unsentRequests.size());
     }
+    
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.CONSUMER_GROUP_HEARTBEAT)
+    public void 
testConsumerAcksReconciledAssignmentWithFirstHeartBeatAckLost(final short 
version) {
+        // 1. complete reconciliation
+        createHeartbeatStatAndRequestManager();

Review Comment:
   since we're here, let's please fix the typo in 
createHeartbeat**State**AndRequestManager



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##########
@@ -746,6 +746,37 @@ public void 
testSendingLeaveGroupHeartbeatWhenPreviousOneInFlight(final short ve
         NetworkClientDelegate.PollResult pollAgain = 
heartbeatRequestManager.poll(time.milliseconds());
         assertEquals(0, pollAgain.unsentRequests.size());
     }
+    
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.CONSUMER_GROUP_HEARTBEAT)
+    public void 
testConsumerAcksReconciledAssignmentWithFirstHeartBeatAckLost(final short 
version) {
+        // 1. complete reconciliation
+        createHeartbeatStatAndRequestManager();
+        String topic = "topic1";
+        int exceededTimeMs = 5;
+        Set<String> set = Collections.singleton(topic);
+        when(subscriptions.subscription()).thenReturn(set);
+        subscriptions.subscribe(set, Optional.empty());
+        mockReconcilingMemberData();
+        // 2. send heartbeat1 to ack assignment tp0
+        time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
+        NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+        // 3. HB1 times out
+        result.unsentRequests.get(0)
+                .handler()
+                .onFailure(time.milliseconds(), new 
TimeoutException("timeout"));
+        // 4. heartbeat request manager resets the sentFields to null 
HeartbeatState.reset()
+        time.sleep(DEFAULT_MAX_POLL_INTERVAL_MS + exceededTimeMs);

Review Comment:
   I would expect that just sleeping the interval would be enough, so we could 
maybe simplify the test and remove the `exceededTimeMs` from here and the var 
itself?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to