adixitconfluent commented on code in PR #19261:
URL: https://github.com/apache/kafka/pull/19261#discussion_r2031745262


##########
core/src/test/java/kafka/server/share/SharePartitionTest.java:
##########
@@ -6671,15 +6693,441 @@ private String assertionFailedMessage(SharePartition 
sharePartition, Map<Long, L
         return errorMessage.toString();
     }
 
+    @Test
+    public void testFilterRecordBatchesFromAcquiredRecords() {
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withState(SharePartitionState.ACTIVE)
+            .build();
+
+        List<AcquiredRecords> acquiredRecords1 = List.of(
+            new 
AcquiredRecords().setFirstOffset(1).setLastOffset(5).setDeliveryCount((short) 
1),
+            new 
AcquiredRecords().setFirstOffset(10).setLastOffset(15).setDeliveryCount((short) 
2),
+            new 
AcquiredRecords().setFirstOffset(20).setLastOffset(25).setDeliveryCount((short) 
1)
+        );
+        List<RecordBatch> recordBatches1 = List.of(
+            memoryRecordsBuilder(3, 2).build().batches().iterator().next(),
+            memoryRecordsBuilder(3, 12).build().batches().iterator().next()
+        );
+        assertEquals(
+            List.of(
+                new 
AcquiredRecords().setFirstOffset(1).setLastOffset(1).setDeliveryCount((short) 
1),
+                new 
AcquiredRecords().setFirstOffset(5).setLastOffset(5).setDeliveryCount((short) 
1),
+                new 
AcquiredRecords().setFirstOffset(10).setLastOffset(11).setDeliveryCount((short) 
2),
+                new 
AcquiredRecords().setFirstOffset(15).setLastOffset(15).setDeliveryCount((short) 
2),
+                new 
AcquiredRecords().setFirstOffset(20).setLastOffset(25).setDeliveryCount((short) 
1)),
+            
sharePartition.filterRecordBatchesFromAcquiredRecords(acquiredRecords1, 
recordBatches1));
+
+        List<AcquiredRecords> acquiredRecords2 = List.of(
+            new 
AcquiredRecords().setFirstOffset(1).setLastOffset(4).setDeliveryCount((short) 
3),
+            new 
AcquiredRecords().setFirstOffset(5).setLastOffset(8).setDeliveryCount((short) 
3),
+            new 
AcquiredRecords().setFirstOffset(9).setLastOffset(30).setDeliveryCount((short) 
2),
+            new 
AcquiredRecords().setFirstOffset(31).setLastOffset(40).setDeliveryCount((short) 
3)
+        );
+        List<RecordBatch> recordBatches2 = List.of(
+            memoryRecordsBuilder(21, 5).build().batches().iterator().next(),
+            memoryRecordsBuilder(5, 31).build().batches().iterator().next()
+        );
+        assertEquals(
+            List.of(
+                new 
AcquiredRecords().setFirstOffset(1).setLastOffset(4).setDeliveryCount((short) 
3),
+                new 
AcquiredRecords().setFirstOffset(26).setLastOffset(30).setDeliveryCount((short) 
2),
+                new 
AcquiredRecords().setFirstOffset(36).setLastOffset(40).setDeliveryCount((short) 
3)
+
+            ), 
sharePartition.filterRecordBatchesFromAcquiredRecords(acquiredRecords2, 
recordBatches2)
+        );
+
+        // Record batches is empty.
+        assertEquals(acquiredRecords2, 
sharePartition.filterRecordBatchesFromAcquiredRecords(acquiredRecords2, 
List.of()));
+
+        List<AcquiredRecords> acquiredRecords3 = List.of(
+            new 
AcquiredRecords().setFirstOffset(0).setLastOffset(19).setDeliveryCount((short) 
1)
+        );
+        List<RecordBatch> recordBatches3 = List.of(
+            memoryRecordsBuilder(1, 8).build().batches().iterator().next(),
+            memoryRecordsBuilder(1, 18).build().batches().iterator().next()
+        );
+
+        assertEquals(
+            List.of(
+                new 
AcquiredRecords().setFirstOffset(0).setLastOffset(7).setDeliveryCount((short) 
1),
+                new 
AcquiredRecords().setFirstOffset(9).setLastOffset(17).setDeliveryCount((short) 
1),
+                new 
AcquiredRecords().setFirstOffset(19).setLastOffset(19).setDeliveryCount((short) 
1)
+
+            ), 
sharePartition.filterRecordBatchesFromAcquiredRecords(acquiredRecords3, 
recordBatches3)
+        );
+    }
+
+    @Test
+    public void testAcquireWithReadCommittedIsolationLevel() {
+        SharePartition sharePartition = 
Mockito.spy(SharePartitionBuilder.builder()
+            .withState(SharePartitionState.ACTIVE)
+            .build());
+
+        ByteBuffer buffer = ByteBuffer.allocate(4096);
+        memoryRecordsBuilder(buffer, 5, 10).close();
+        memoryRecordsBuilder(buffer, 5, 15).close();
+        memoryRecordsBuilder(buffer, 15, 20).close();
+        memoryRecordsBuilder(buffer, 8, 50).close();
+        memoryRecordsBuilder(buffer, 10, 58).close();
+        memoryRecordsBuilder(buffer, 5, 70).close();
+
+        buffer.flip();
+        MemoryRecords records = MemoryRecords.readableRecords(buffer);
+        FetchPartitionData fetchPartitionData = fetchPartitionData(records, 
newAbortedTransactions());
+
+        // We are mocking the result of function 
fetchAbortedTransactionRecordBatches. The records present at these offsets need 
to be archived.
+        // We won't be utilizing the aborted transactions passed in 
fetchPartitionData.
+        
when(sharePartition.fetchAbortedTransactionRecordBatches(fetchPartitionData.records.batches(),
 fetchPartitionData.abortedTransactions.get())).thenReturn(
+            List.of(
+                memoryRecordsBuilder(5, 
10).build().batches().iterator().next(),
+                memoryRecordsBuilder(10, 
58).build().batches().iterator().next(),
+                memoryRecordsBuilder(5, 70).build().batches().iterator().next()
+            )
+        );
+
+        List<AcquiredRecords> acquiredRecordsList = fetchAcquiredRecords(
+            sharePartition.acquire(
+                MEMBER_ID,
+                10 /* Batch size */,
+                100,
+                DEFAULT_FETCH_OFFSET,
+                fetchPartitionData,
+                FetchIsolation.TXN_COMMITTED),
+            45 /* Gap of 15 records will be added to second batch, gap of 2 
records will also be added to fourth batch */);
+
+        assertEquals(List.of(
+            new 
AcquiredRecords().setFirstOffset(15).setLastOffset(19).setDeliveryCount((short) 
1),
+            new 
AcquiredRecords().setFirstOffset(20).setLastOffset(49).setDeliveryCount((short) 
1),

Review Comment:
   This is because there is a gap of records from 35-49 in the original records 
to be acquired. This gap is added to this batch since we rely on the client to 
inform the broker about these natural gaps in the partition log.
   ```
   memoryRecordsBuilder(buffer, 5, 10).close(); // batch from 10-14
   memoryRecordsBuilder(buffer, 5, 15).close(); // batch from 15-19
   memoryRecordsBuilder(buffer, 15, 20).close(); // batch from 20-34
   memoryRecordsBuilder(buffer, 8, 50).close();  // batch from 50-57
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to