AndrewJSchofield commented on code in PR #19295:
URL: https://github.com/apache/kafka/pull/19295#discussion_r2020709446


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java:
##########
@@ -1234,26 +1087,20 @@ public void testRetryAcknowledgementsWithLeaderChange() 
{
         subscriptions.assignFromSubscribed(partitions);
 
         client.updateMetadata(
-            RequestTestUtils.metadataUpdateWithIds(2, Map.of(topicName, 1),
-                tp -> validLeaderEpoch, topicIds, false));
+                RequestTestUtils.metadataUpdateWithIds(2, Map.of(topicName, 1),

Review Comment:
   In quite a few places in this file, the indentation has been increased but 
the lines have not been changed apart from that. Please can you minimise these 
changes so the PR is much tighter.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java:
##########
@@ -2905,4 +2729,48 @@ public void add(BackgroundEvent event) {
             }
         }
     }
+
+    @Test
+    void testFetchWithControlRecords() {
+        buildRequestManager();
+        
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
+
+        Map<TopicIdPartition, NodeAcknowledgements> nodeAcknowledgementsMap = 
new HashMap<>();
+
+        Acknowledgements acknowledgements = Acknowledgements.empty();
+        acknowledgements.add(1L, AcknowledgeType.ACCEPT);
+        nodeAcknowledgementsMap.put(tip0, new NodeAcknowledgements(0, 
acknowledgements));
+
+        Map<TopicIdPartition, NodeAcknowledgements> 
nodeAcknowledgementsControlRecordMap = new HashMap<>();
+
+        Acknowledgements controlAcknowledgements = Acknowledgements.empty();
+        controlAcknowledgements.addGap(2L);
+        nodeAcknowledgementsControlRecordMap.put(tip0, new 
NodeAcknowledgements(0, controlAcknowledgements));
+
+        shareConsumeRequestManager.fetch(nodeAcknowledgementsMap, 
nodeAcknowledgementsControlRecordMap);
+
+        Map<TopicIdPartition, Acknowledgements> fetchAcksToSend = 
shareConsumeRequestManager.getFetchAcknowledgementsToSend(0);
+        System.out.println(fetchAcksToSend);
+        assertEquals(1, fetchAcksToSend.size());
+        assertEquals(AcknowledgeType.ACCEPT, 
fetchAcksToSend.get(tip0).get(1L));
+        assertEquals(2, fetchAcksToSend.get(tip0).size());
+        assertNull(fetchAcksToSend.get(tip0).get(3L));
+    }
+
+    void sendFetchAndVerifyResponse(MemoryRecords records,

Review Comment:
   This can be `private` I think.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java:
##########
@@ -2905,4 +2729,48 @@ public void add(BackgroundEvent event) {
             }
         }
     }
+
+    @Test
+    void testFetchWithControlRecords() {
+        buildRequestManager();
+        
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
+
+        Map<TopicIdPartition, NodeAcknowledgements> nodeAcknowledgementsMap = 
new HashMap<>();
+
+        Acknowledgements acknowledgements = Acknowledgements.empty();
+        acknowledgements.add(1L, AcknowledgeType.ACCEPT);
+        nodeAcknowledgementsMap.put(tip0, new NodeAcknowledgements(0, 
acknowledgements));
+
+        Map<TopicIdPartition, NodeAcknowledgements> 
nodeAcknowledgementsControlRecordMap = new HashMap<>();
+
+        Acknowledgements controlAcknowledgements = Acknowledgements.empty();
+        controlAcknowledgements.addGap(2L);
+        nodeAcknowledgementsControlRecordMap.put(tip0, new 
NodeAcknowledgements(0, controlAcknowledgements));
+
+        shareConsumeRequestManager.fetch(nodeAcknowledgementsMap, 
nodeAcknowledgementsControlRecordMap);
+
+        Map<TopicIdPartition, Acknowledgements> fetchAcksToSend = 
shareConsumeRequestManager.getFetchAcknowledgementsToSend(0);
+        System.out.println(fetchAcksToSend);
+        assertEquals(1, fetchAcksToSend.size());
+        assertEquals(AcknowledgeType.ACCEPT, 
fetchAcksToSend.get(tip0).get(1L));
+        assertEquals(2, fetchAcksToSend.get(tip0).size());
+        assertNull(fetchAcksToSend.get(tip0).get(3L));
+    }
+
+    void sendFetchAndVerifyResponse(MemoryRecords records,
+                                    
List<ShareFetchResponseData.AcquiredRecords> acquiredRecords,
+                                    Errors... error) {

Review Comment:
   You are not using the varargs part of this. You could just use `Errors 
error`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to