AndrewJSchofield commented on code in PR #19261: URL: https://github.com/apache/kafka/pull/19261#discussion_r2026700007
########## core/src/test/java/kafka/server/share/SharePartitionTest.java: ########## @@ -1212,7 +1223,8 @@ public void testAcquireWithMultipleBatchesAndMaxFetchRecords() throws Interrupte BATCH_SIZE, 10, DEFAULT_FETCH_OFFSET, - fetchPartitionData(records, 10)), + fetchPartitionData(records, 10), + FETCH_ISOLATION_HWM), Review Comment: nit: indentation ########## core/src/test/java/kafka/server/share/SharePartitionTest.java: ########## @@ -1308,7 +1321,8 @@ public void testAcquireWithBatchSizeAndSingleBatch() { 2 /* Batch size */, 10, DEFAULT_FETCH_OFFSET, - fetchPartitionData(records)), + fetchPartitionData(records), + FETCH_ISOLATION_HWM), Review Comment: nit: indentation ########## core/src/test/java/kafka/server/share/SharePartitionTest.java: ########## @@ -1175,7 +1185,8 @@ public void testAcquireWithMaxFetchRecords() { BATCH_SIZE, 10, DEFAULT_FETCH_OFFSET, - fetchPartitionData(records)), + fetchPartitionData(records), + FETCH_ISOLATION_HWM), Review Comment: nit: indentation ########## core/src/test/java/kafka/server/share/SharePartitionTest.java: ########## @@ -1339,7 +1353,8 @@ public void testAcquireWithBatchSizeAndMultipleBatches() { 5 /* Batch size */, 100, DEFAULT_FETCH_OFFSET, - fetchPartitionData(records)), + fetchPartitionData(records), + FETCH_ISOLATION_HWM), Review Comment: nit: indentation ########## core/src/test/java/kafka/server/share/SharePartitionTest.java: ########## @@ -6671,15 +6693,441 @@ private String assertionFailedMessage(SharePartition sharePartition, Map<Long, L return errorMessage.toString(); } + @Test + public void testFilterRecordBatchesFromAcquiredRecords() { + SharePartition sharePartition = SharePartitionBuilder.builder() + .withState(SharePartitionState.ACTIVE) + .build(); + + List<AcquiredRecords> acquiredRecords1 = List.of( + new AcquiredRecords().setFirstOffset(1).setLastOffset(5).setDeliveryCount((short) 1), + new AcquiredRecords().setFirstOffset(10).setLastOffset(15).setDeliveryCount((short) 1), + new AcquiredRecords().setFirstOffset(20).setLastOffset(25).setDeliveryCount((short) 1) + ); + List<RecordBatch> recordBatches1 = List.of( + memoryRecordsBuilder(3, 2).build().batches().iterator().next(), + memoryRecordsBuilder(3, 12).build().batches().iterator().next() + ); + assertEquals( + List.of( + new AcquiredRecords().setFirstOffset(1).setLastOffset(1).setDeliveryCount((short) 1), + new AcquiredRecords().setFirstOffset(5).setLastOffset(5).setDeliveryCount((short) 1), + new AcquiredRecords().setFirstOffset(10).setLastOffset(11).setDeliveryCount((short) 1), + new AcquiredRecords().setFirstOffset(15).setLastOffset(15).setDeliveryCount((short) 1), + new AcquiredRecords().setFirstOffset(20).setLastOffset(25).setDeliveryCount((short) 1)), + sharePartition.filterRecordBatchesFromAcquiredRecords(acquiredRecords1, recordBatches1)); + + List<AcquiredRecords> acquiredRecords2 = List.of( + new AcquiredRecords().setFirstOffset(1).setLastOffset(4).setDeliveryCount((short) 1), Review Comment: Let's put some variation in the delivery count here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org