adixitconfluent commented on code in PR #19261: URL: https://github.com/apache/kafka/pull/19261#discussion_r2031745262
########## core/src/test/java/kafka/server/share/SharePartitionTest.java: ########## @@ -6671,15 +6693,441 @@ private String assertionFailedMessage(SharePartition sharePartition, Map<Long, L return errorMessage.toString(); } + @Test + public void testFilterRecordBatchesFromAcquiredRecords() { + SharePartition sharePartition = SharePartitionBuilder.builder() + .withState(SharePartitionState.ACTIVE) + .build(); + + List<AcquiredRecords> acquiredRecords1 = List.of( + new AcquiredRecords().setFirstOffset(1).setLastOffset(5).setDeliveryCount((short) 1), + new AcquiredRecords().setFirstOffset(10).setLastOffset(15).setDeliveryCount((short) 2), + new AcquiredRecords().setFirstOffset(20).setLastOffset(25).setDeliveryCount((short) 1) + ); + List<RecordBatch> recordBatches1 = List.of( + memoryRecordsBuilder(3, 2).build().batches().iterator().next(), + memoryRecordsBuilder(3, 12).build().batches().iterator().next() + ); + assertEquals( + List.of( + new AcquiredRecords().setFirstOffset(1).setLastOffset(1).setDeliveryCount((short) 1), + new AcquiredRecords().setFirstOffset(5).setLastOffset(5).setDeliveryCount((short) 1), + new AcquiredRecords().setFirstOffset(10).setLastOffset(11).setDeliveryCount((short) 2), + new AcquiredRecords().setFirstOffset(15).setLastOffset(15).setDeliveryCount((short) 2), + new AcquiredRecords().setFirstOffset(20).setLastOffset(25).setDeliveryCount((short) 1)), + sharePartition.filterRecordBatchesFromAcquiredRecords(acquiredRecords1, recordBatches1)); + + List<AcquiredRecords> acquiredRecords2 = List.of( + new AcquiredRecords().setFirstOffset(1).setLastOffset(4).setDeliveryCount((short) 3), + new AcquiredRecords().setFirstOffset(5).setLastOffset(8).setDeliveryCount((short) 3), + new AcquiredRecords().setFirstOffset(9).setLastOffset(30).setDeliveryCount((short) 2), + new AcquiredRecords().setFirstOffset(31).setLastOffset(40).setDeliveryCount((short) 3) + ); + List<RecordBatch> recordBatches2 = List.of( + memoryRecordsBuilder(21, 5).build().batches().iterator().next(), + memoryRecordsBuilder(5, 31).build().batches().iterator().next() + ); + assertEquals( + List.of( + new AcquiredRecords().setFirstOffset(1).setLastOffset(4).setDeliveryCount((short) 3), + new AcquiredRecords().setFirstOffset(26).setLastOffset(30).setDeliveryCount((short) 2), + new AcquiredRecords().setFirstOffset(36).setLastOffset(40).setDeliveryCount((short) 3) + + ), sharePartition.filterRecordBatchesFromAcquiredRecords(acquiredRecords2, recordBatches2) + ); + + // Record batches is empty. + assertEquals(acquiredRecords2, sharePartition.filterRecordBatchesFromAcquiredRecords(acquiredRecords2, List.of())); + + List<AcquiredRecords> acquiredRecords3 = List.of( + new AcquiredRecords().setFirstOffset(0).setLastOffset(19).setDeliveryCount((short) 1) + ); + List<RecordBatch> recordBatches3 = List.of( + memoryRecordsBuilder(1, 8).build().batches().iterator().next(), + memoryRecordsBuilder(1, 18).build().batches().iterator().next() + ); + + assertEquals( + List.of( + new AcquiredRecords().setFirstOffset(0).setLastOffset(7).setDeliveryCount((short) 1), + new AcquiredRecords().setFirstOffset(9).setLastOffset(17).setDeliveryCount((short) 1), + new AcquiredRecords().setFirstOffset(19).setLastOffset(19).setDeliveryCount((short) 1) + + ), sharePartition.filterRecordBatchesFromAcquiredRecords(acquiredRecords3, recordBatches3) + ); + } + + @Test + public void testAcquireWithReadCommittedIsolationLevel() { + SharePartition sharePartition = Mockito.spy(SharePartitionBuilder.builder() + .withState(SharePartitionState.ACTIVE) + .build()); + + ByteBuffer buffer = ByteBuffer.allocate(4096); + memoryRecordsBuilder(buffer, 5, 10).close(); + memoryRecordsBuilder(buffer, 5, 15).close(); + memoryRecordsBuilder(buffer, 15, 20).close(); + memoryRecordsBuilder(buffer, 8, 50).close(); + memoryRecordsBuilder(buffer, 10, 58).close(); + memoryRecordsBuilder(buffer, 5, 70).close(); + + buffer.flip(); + MemoryRecords records = MemoryRecords.readableRecords(buffer); + FetchPartitionData fetchPartitionData = fetchPartitionData(records, newAbortedTransactions()); + + // We are mocking the result of function fetchAbortedTransactionRecordBatches. The records present at these offsets need to be archived. + // We won't be utilizing the aborted transactions passed in fetchPartitionData. + when(sharePartition.fetchAbortedTransactionRecordBatches(fetchPartitionData.records.batches(), fetchPartitionData.abortedTransactions.get())).thenReturn( + List.of( + memoryRecordsBuilder(5, 10).build().batches().iterator().next(), + memoryRecordsBuilder(10, 58).build().batches().iterator().next(), + memoryRecordsBuilder(5, 70).build().batches().iterator().next() + ) + ); + + List<AcquiredRecords> acquiredRecordsList = fetchAcquiredRecords( + sharePartition.acquire( + MEMBER_ID, + 10 /* Batch size */, + 100, + DEFAULT_FETCH_OFFSET, + fetchPartitionData, + FetchIsolation.TXN_COMMITTED), + 45 /* Gap of 15 records will be added to second batch, gap of 2 records will also be added to fourth batch */); + + assertEquals(List.of( + new AcquiredRecords().setFirstOffset(15).setLastOffset(19).setDeliveryCount((short) 1), + new AcquiredRecords().setFirstOffset(20).setLastOffset(49).setDeliveryCount((short) 1), Review Comment: This is because there is a gap of records from 35-49 in the original records to be acquired. This gap is added to this batch since we rely on the client to inform the broker about these natural gaps in the partition log. ``` memoryRecordsBuilder(buffer, 5, 10).close(); // batch from 10-14 memoryRecordsBuilder(buffer, 5, 15).close(); // batch from 15-19 memoryRecordsBuilder(buffer, 15, 20).close(); // batch from 20-34 memoryRecordsBuilder(buffer, 8, 50).close(); // batch from 50-57 ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org