AndrewJSchofield commented on code in PR #19295: URL: https://github.com/apache/kafka/pull/19295#discussion_r2020709446
########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java: ########## @@ -1234,26 +1087,20 @@ public void testRetryAcknowledgementsWithLeaderChange() { subscriptions.assignFromSubscribed(partitions); client.updateMetadata( - RequestTestUtils.metadataUpdateWithIds(2, Map.of(topicName, 1), - tp -> validLeaderEpoch, topicIds, false)); + RequestTestUtils.metadataUpdateWithIds(2, Map.of(topicName, 1), Review Comment: In quite a few places in this file, the indentation has been increased but the lines have not been changed apart from that. Please can you minimise these changes so the PR is much tighter. ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java: ########## @@ -2905,4 +2729,48 @@ public void add(BackgroundEvent event) { } } } + + @Test + void testFetchWithControlRecords() { + buildRequestManager(); + shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true); + + Map<TopicIdPartition, NodeAcknowledgements> nodeAcknowledgementsMap = new HashMap<>(); + + Acknowledgements acknowledgements = Acknowledgements.empty(); + acknowledgements.add(1L, AcknowledgeType.ACCEPT); + nodeAcknowledgementsMap.put(tip0, new NodeAcknowledgements(0, acknowledgements)); + + Map<TopicIdPartition, NodeAcknowledgements> nodeAcknowledgementsControlRecordMap = new HashMap<>(); + + Acknowledgements controlAcknowledgements = Acknowledgements.empty(); + controlAcknowledgements.addGap(2L); + nodeAcknowledgementsControlRecordMap.put(tip0, new NodeAcknowledgements(0, controlAcknowledgements)); + + shareConsumeRequestManager.fetch(nodeAcknowledgementsMap, nodeAcknowledgementsControlRecordMap); + + Map<TopicIdPartition, Acknowledgements> fetchAcksToSend = shareConsumeRequestManager.getFetchAcknowledgementsToSend(0); + System.out.println(fetchAcksToSend); + assertEquals(1, fetchAcksToSend.size()); + assertEquals(AcknowledgeType.ACCEPT, fetchAcksToSend.get(tip0).get(1L)); + assertEquals(2, fetchAcksToSend.get(tip0).size()); + assertNull(fetchAcksToSend.get(tip0).get(3L)); + } + + void sendFetchAndVerifyResponse(MemoryRecords records, Review Comment: This can be `private` I think. ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java: ########## @@ -2905,4 +2729,48 @@ public void add(BackgroundEvent event) { } } } + + @Test + void testFetchWithControlRecords() { + buildRequestManager(); + shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true); + + Map<TopicIdPartition, NodeAcknowledgements> nodeAcknowledgementsMap = new HashMap<>(); + + Acknowledgements acknowledgements = Acknowledgements.empty(); + acknowledgements.add(1L, AcknowledgeType.ACCEPT); + nodeAcknowledgementsMap.put(tip0, new NodeAcknowledgements(0, acknowledgements)); + + Map<TopicIdPartition, NodeAcknowledgements> nodeAcknowledgementsControlRecordMap = new HashMap<>(); + + Acknowledgements controlAcknowledgements = Acknowledgements.empty(); + controlAcknowledgements.addGap(2L); + nodeAcknowledgementsControlRecordMap.put(tip0, new NodeAcknowledgements(0, controlAcknowledgements)); + + shareConsumeRequestManager.fetch(nodeAcknowledgementsMap, nodeAcknowledgementsControlRecordMap); + + Map<TopicIdPartition, Acknowledgements> fetchAcksToSend = shareConsumeRequestManager.getFetchAcknowledgementsToSend(0); + System.out.println(fetchAcksToSend); + assertEquals(1, fetchAcksToSend.size()); + assertEquals(AcknowledgeType.ACCEPT, fetchAcksToSend.get(tip0).get(1L)); + assertEquals(2, fetchAcksToSend.get(tip0).size()); + assertNull(fetchAcksToSend.get(tip0).get(3L)); + } + + void sendFetchAndVerifyResponse(MemoryRecords records, + List<ShareFetchResponseData.AcquiredRecords> acquiredRecords, + Errors... error) { Review Comment: You are not using the varargs part of this. You could just use `Errors error`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org