ShivsundarR commented on code in PR #19295: URL: https://github.com/apache/kafka/pull/19295#discussion_r2022176927
########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java: ########## @@ -2905,4 +2729,48 @@ public void add(BackgroundEvent event) { } } } + + @Test + void testFetchWithControlRecords() { + buildRequestManager(); + shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true); + + Map<TopicIdPartition, NodeAcknowledgements> nodeAcknowledgementsMap = new HashMap<>(); + + Acknowledgements acknowledgements = Acknowledgements.empty(); + acknowledgements.add(1L, AcknowledgeType.ACCEPT); + nodeAcknowledgementsMap.put(tip0, new NodeAcknowledgements(0, acknowledgements)); + + Map<TopicIdPartition, NodeAcknowledgements> nodeAcknowledgementsControlRecordMap = new HashMap<>(); + + Acknowledgements controlAcknowledgements = Acknowledgements.empty(); + controlAcknowledgements.addGap(2L); + nodeAcknowledgementsControlRecordMap.put(tip0, new NodeAcknowledgements(0, controlAcknowledgements)); + + shareConsumeRequestManager.fetch(nodeAcknowledgementsMap, nodeAcknowledgementsControlRecordMap); + + Map<TopicIdPartition, Acknowledgements> fetchAcksToSend = shareConsumeRequestManager.getFetchAcknowledgementsToSend(0); + System.out.println(fetchAcksToSend); + assertEquals(1, fetchAcksToSend.size()); + assertEquals(AcknowledgeType.ACCEPT, fetchAcksToSend.get(tip0).get(1L)); + assertEquals(2, fetchAcksToSend.get(tip0).size()); + assertNull(fetchAcksToSend.get(tip0).get(3L)); + } + + void sendFetchAndVerifyResponse(MemoryRecords records, + List<ShareFetchResponseData.AcquiredRecords> acquiredRecords, + Errors... error) { Review Comment: There was one test which needed this, the change was missed when I pushed the commit. I have updated the PR with one usage of this varargs part. In future if we add tests with both top level error and acknowledge error, this might be useful as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org