junrao commented on code in PR #19261:
URL: https://github.com/apache/kafka/pull/19261#discussion_r2031538464


##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -2488,6 +2509,190 @@ private long startOffsetDuringInitialization(long 
partitionDataStartOffset) thro
         }
     }
 
+    private ShareAcquiredRecords 
maybeFilterAbortedTransactionalAcquiredRecords(
+        FetchPartitionData fetchPartitionData,
+        FetchIsolation isolationLevel,
+        ShareAcquiredRecords shareAcquiredRecords
+    ) {
+        if (isolationLevel != FetchIsolation.TXN_COMMITTED || 
fetchPartitionData.abortedTransactions.isEmpty() || 
fetchPartitionData.abortedTransactions.get().isEmpty())
+            return shareAcquiredRecords;
+
+        // When FetchIsolation.TXN_COMMITTED is used as isolation level by the 
share group, we need to filter any
+        // transactions that were aborted/did not commit due to timeout.
+        List<AcquiredRecords> result = 
filterAbortedTransactionalAcquiredRecords(fetchPartitionData.records.batches(),
+            shareAcquiredRecords.acquiredRecords(), 
fetchPartitionData.abortedTransactions.get());
+        int acquiredCount = 0;
+        for (AcquiredRecords records : result) {
+            acquiredCount += (int) (records.lastOffset() - 
records.firstOffset() + 1);
+        }
+        return new ShareAcquiredRecords(result, acquiredCount);
+    }
+
+    private List<AcquiredRecords> filterAbortedTransactionalAcquiredRecords(
+        Iterable<? extends RecordBatch> batches,
+        List<AcquiredRecords> acquiredRecords,
+        List<FetchResponseData.AbortedTransaction> abortedTransactions
+    ) {
+        // The record batches that need to be archived in cachedState because 
they were a part of aborted transactions.
+        List<RecordBatch> recordsToArchive = 
fetchAbortedTransactionRecordBatches(batches, abortedTransactions);
+        for (RecordBatch recordBatch : recordsToArchive) {
+            // Archive the offsets/batches in the cached state.
+            NavigableMap<Long, InFlightBatch> subMap = 
fetchSubMap(recordBatch);
+            archiveRecords(recordBatch.baseOffset(), recordBatch.lastOffset() 
+ 1, subMap, RecordState.ACQUIRED);
+        }
+        return filterRecordBatchesFromAcquiredRecords(acquiredRecords, 
recordsToArchive);
+    }
+
+    /**
+     * This function filters out the offsets present in the acquired records 
list that are also a part of batches that need to be archived.
+     * It follows an iterative refinement of acquired records to eliminate 
batches to be archived.
+     * @param acquiredRecordsList The list containing acquired records. This 
list is sorted by the firstOffset of the acquired batch.
+     * @param batchesToArchive The list containing record batches to archive. 
This list is sorted by the baseOffset of the record batch.
+     * @return The list containing filtered acquired records offsets.
+     */
+    List<AcquiredRecords> filterRecordBatchesFromAcquiredRecords(
+        List<AcquiredRecords> acquiredRecordsList,
+        List<RecordBatch> batchesToArchive
+    ) {
+        List<AcquiredRecords> result = new ArrayList<>();
+        Iterator<AcquiredRecords> acquiredRecordsListIter = 
acquiredRecordsList.iterator();
+        Iterator<RecordBatch> batchesToArchiveIterator = 
batchesToArchive.iterator();
+        if (!batchesToArchiveIterator.hasNext())
+            return acquiredRecordsList;
+        RecordBatch batchToArchive = batchesToArchiveIterator.next();
+        AcquiredRecords unresolvedAcquiredRecords = null;
+
+        while (unresolvedAcquiredRecords != null || 
acquiredRecordsListIter.hasNext()) {
+            if (unresolvedAcquiredRecords == null)
+                unresolvedAcquiredRecords = acquiredRecordsListIter.next();
+
+            long unresolvedFirstOffset = 
unresolvedAcquiredRecords.firstOffset();
+            long unresolvedLastOffset = unresolvedAcquiredRecords.lastOffset();
+            short unresolvedDeliveryCount = 
unresolvedAcquiredRecords.deliveryCount();
+
+            if (batchToArchive == null) {
+                result.add(unresolvedAcquiredRecords);
+                unresolvedAcquiredRecords = null;
+                continue;
+            }
+
+            // Non-overlap check - unresolvedFirstOffset offsets lie before 
the batchToArchive offsets. No need to filter out the offsets in such a 
scenario.
+            if (unresolvedLastOffset < batchToArchive.baseOffset()) {
+                // Offsets in unresolvedAcquiredRecords do not overlap with 
batchToArchive, hence it should not get filtered out.
+                result.add(unresolvedAcquiredRecords);
+                unresolvedAcquiredRecords = null;
+            }
+
+            // Overlap check - unresolvedFirstOffset offsets overlap with the 
batchToArchive offsets. We need to filter out the overlapping
+            // offsets in such a scenario.
+            if (unresolvedFirstOffset <= batchToArchive.lastOffset() &&
+                unresolvedLastOffset >= batchToArchive.baseOffset()) {
+                unresolvedAcquiredRecords = null;
+                // Split the unresolvedFirstOffset into parts - before and 
after the overlapping record batchToArchive.
+                if (unresolvedFirstOffset < batchToArchive.baseOffset()) {
+                    // The offsets in unresolvedAcquiredRecords that are 
present before batchToArchive's baseOffset should not get filtered out.
+                    result.add(new AcquiredRecords()
+                        .setFirstOffset(unresolvedFirstOffset)
+                        .setLastOffset(batchToArchive.baseOffset() - 1)
+                        .setDeliveryCount(unresolvedDeliveryCount));
+                }
+                if (unresolvedLastOffset > batchToArchive.lastOffset()) {
+                    // The offsets in unresolvedAcquiredRecords that are 
present after batchToArchive's lastOffset should not get filtered out
+                    // and should be taken forward for further processing 
since they could potentially contain offsets that need to be archived.
+                    unresolvedAcquiredRecords = new AcquiredRecords()
+                        .setFirstOffset(batchToArchive.lastOffset() + 1)
+                        .setLastOffset(unresolvedLastOffset)
+                        .setDeliveryCount(unresolvedDeliveryCount);
+                }
+            }
+
+            if (unresolvedLastOffset >= batchToArchive.lastOffset()) {

Review Comment:
   Could we add a comment for this case?



##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java:
##########
@@ -2090,6 +2091,307 @@ public void testComplexShareConsumer() throws Exception 
{
         verifyShareGroupStateTopicRecordsProduced();
     }
 
+    @ClusterTest
+    public void testReadCommittedIsolationLevel() {
+        alterShareAutoOffsetReset("group1", "earliest");
+        alterShareIsolationLevel("group1", "read_committed");
+        try (Producer<byte[], byte[]> transactionalProducer = 
createProducer("T1");
+             ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
+            
produceCommittedAndAbortedTransactionsInInterval(transactionalProducer, 10, 5);
+            shareConsumer.subscribe(Set.of(tp.topic()));
+            ConsumerRecords<byte[], byte[]> records = 
waitedPoll(shareConsumer, 2500L, 8);
+            // 5th and 10th message transaction was aborted, hence they won't 
be included in the fetched records.
+            assertEquals(8, records.count());
+            int messageCounter = 1;
+            for (ConsumerRecord<byte[], byte[]> record : records) {
+                assertEquals(tp.topic(), record.topic());
+                assertEquals(tp.partition(), record.partition());
+                if (messageCounter % 5 == 0)
+                    messageCounter++;
+                assertEquals("Message " + messageCounter, new 
String(record.value()));
+                messageCounter++;
+            }
+        }
+        verifyShareGroupStateTopicRecordsProduced();
+    }
+
+    @ClusterTest
+    public void testReadUncommittedIsolationLevel() {
+        alterShareAutoOffsetReset("group1", "earliest");
+        alterShareIsolationLevel("group1", "read_uncommitted");
+        try (Producer<byte[], byte[]> transactionalProducer = 
createProducer("T1");
+             ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
+            
produceCommittedAndAbortedTransactionsInInterval(transactionalProducer, 10, 5);
+            shareConsumer.subscribe(Set.of(tp.topic()));
+            ConsumerRecords<byte[], byte[]> records = 
waitedPoll(shareConsumer, 2500L, 10);
+            // Even though 5th and 10th message transaction was aborted, they 
will be included in the fetched records since IsolationLevel is 
READ_UNCOMMITTED.
+            assertEquals(10, records.count());
+            int messageCounter = 1;
+            for (ConsumerRecord<byte[], byte[]> record : records) {
+                assertEquals(tp.topic(), record.topic());
+                assertEquals(tp.partition(), record.partition());
+                assertEquals("Message " + messageCounter, new 
String(record.value()));
+                messageCounter++;
+            }
+        }
+        verifyShareGroupStateTopicRecordsProduced();
+    }
+
+    @ClusterTest
+    public void testAlterReadUncommittedToReadCommittedIsolationLevel() {
+        alterShareAutoOffsetReset("group1", "earliest");
+        alterShareIsolationLevel("group1", "read_uncommitted");
+        try (Producer<byte[], byte[]> transactionalProducer = 
createProducer("T1");
+             ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
+            shareConsumer.subscribe(Set.of(tp.topic()));
+            transactionalProducer.initTransactions();
+            try {
+                // First transaction is committed.
+                produceCommittedTransaction(transactionalProducer, "Message 
1");
+
+                ConsumerRecords<byte[], byte[]> records = 
waitedPoll(shareConsumer, 2500L, 1);

Review Comment:
   Does this force the test to wait for 2.5 secs? It's kind of long.



##########
core/src/test/java/kafka/server/share/SharePartitionTest.java:
##########
@@ -6671,15 +6693,441 @@ private String assertionFailedMessage(SharePartition 
sharePartition, Map<Long, L
         return errorMessage.toString();
     }
 
+    @Test
+    public void testFilterRecordBatchesFromAcquiredRecords() {
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withState(SharePartitionState.ACTIVE)
+            .build();
+
+        List<AcquiredRecords> acquiredRecords1 = List.of(
+            new 
AcquiredRecords().setFirstOffset(1).setLastOffset(5).setDeliveryCount((short) 
1),
+            new 
AcquiredRecords().setFirstOffset(10).setLastOffset(15).setDeliveryCount((short) 
2),
+            new 
AcquiredRecords().setFirstOffset(20).setLastOffset(25).setDeliveryCount((short) 
1)
+        );
+        List<RecordBatch> recordBatches1 = List.of(
+            memoryRecordsBuilder(3, 2).build().batches().iterator().next(),
+            memoryRecordsBuilder(3, 12).build().batches().iterator().next()
+        );
+        assertEquals(
+            List.of(
+                new 
AcquiredRecords().setFirstOffset(1).setLastOffset(1).setDeliveryCount((short) 
1),
+                new 
AcquiredRecords().setFirstOffset(5).setLastOffset(5).setDeliveryCount((short) 
1),
+                new 
AcquiredRecords().setFirstOffset(10).setLastOffset(11).setDeliveryCount((short) 
2),
+                new 
AcquiredRecords().setFirstOffset(15).setLastOffset(15).setDeliveryCount((short) 
2),
+                new 
AcquiredRecords().setFirstOffset(20).setLastOffset(25).setDeliveryCount((short) 
1)),
+            
sharePartition.filterRecordBatchesFromAcquiredRecords(acquiredRecords1, 
recordBatches1));
+
+        List<AcquiredRecords> acquiredRecords2 = List.of(
+            new 
AcquiredRecords().setFirstOffset(1).setLastOffset(4).setDeliveryCount((short) 
3),
+            new 
AcquiredRecords().setFirstOffset(5).setLastOffset(8).setDeliveryCount((short) 
3),
+            new 
AcquiredRecords().setFirstOffset(9).setLastOffset(30).setDeliveryCount((short) 
2),
+            new 
AcquiredRecords().setFirstOffset(31).setLastOffset(40).setDeliveryCount((short) 
3)
+        );
+        List<RecordBatch> recordBatches2 = List.of(
+            memoryRecordsBuilder(21, 5).build().batches().iterator().next(),
+            memoryRecordsBuilder(5, 31).build().batches().iterator().next()
+        );
+        assertEquals(
+            List.of(
+                new 
AcquiredRecords().setFirstOffset(1).setLastOffset(4).setDeliveryCount((short) 
3),
+                new 
AcquiredRecords().setFirstOffset(26).setLastOffset(30).setDeliveryCount((short) 
2),
+                new 
AcquiredRecords().setFirstOffset(36).setLastOffset(40).setDeliveryCount((short) 
3)
+
+            ), 
sharePartition.filterRecordBatchesFromAcquiredRecords(acquiredRecords2, 
recordBatches2)
+        );
+
+        // Record batches is empty.
+        assertEquals(acquiredRecords2, 
sharePartition.filterRecordBatchesFromAcquiredRecords(acquiredRecords2, 
List.of()));
+
+        List<AcquiredRecords> acquiredRecords3 = List.of(
+            new 
AcquiredRecords().setFirstOffset(0).setLastOffset(19).setDeliveryCount((short) 
1)
+        );
+        List<RecordBatch> recordBatches3 = List.of(
+            memoryRecordsBuilder(1, 8).build().batches().iterator().next(),
+            memoryRecordsBuilder(1, 18).build().batches().iterator().next()
+        );
+
+        assertEquals(
+            List.of(
+                new 
AcquiredRecords().setFirstOffset(0).setLastOffset(7).setDeliveryCount((short) 
1),
+                new 
AcquiredRecords().setFirstOffset(9).setLastOffset(17).setDeliveryCount((short) 
1),
+                new 
AcquiredRecords().setFirstOffset(19).setLastOffset(19).setDeliveryCount((short) 
1)
+
+            ), 
sharePartition.filterRecordBatchesFromAcquiredRecords(acquiredRecords3, 
recordBatches3)
+        );
+    }
+
+    @Test
+    public void testAcquireWithReadCommittedIsolationLevel() {
+        SharePartition sharePartition = 
Mockito.spy(SharePartitionBuilder.builder()
+            .withState(SharePartitionState.ACTIVE)
+            .build());
+
+        ByteBuffer buffer = ByteBuffer.allocate(4096);
+        memoryRecordsBuilder(buffer, 5, 10).close();
+        memoryRecordsBuilder(buffer, 5, 15).close();
+        memoryRecordsBuilder(buffer, 15, 20).close();
+        memoryRecordsBuilder(buffer, 8, 50).close();
+        memoryRecordsBuilder(buffer, 10, 58).close();
+        memoryRecordsBuilder(buffer, 5, 70).close();
+
+        buffer.flip();
+        MemoryRecords records = MemoryRecords.readableRecords(buffer);
+        FetchPartitionData fetchPartitionData = fetchPartitionData(records, 
newAbortedTransactions());
+
+        // We are mocking the result of function 
fetchAbortedTransactionRecordBatches. The records present at these offsets need 
to be archived.
+        // We won't be utilizing the aborted transactions passed in 
fetchPartitionData.
+        
when(sharePartition.fetchAbortedTransactionRecordBatches(fetchPartitionData.records.batches(),
 fetchPartitionData.abortedTransactions.get())).thenReturn(
+            List.of(
+                memoryRecordsBuilder(5, 
10).build().batches().iterator().next(),
+                memoryRecordsBuilder(10, 
58).build().batches().iterator().next(),
+                memoryRecordsBuilder(5, 70).build().batches().iterator().next()
+            )
+        );
+
+        List<AcquiredRecords> acquiredRecordsList = fetchAcquiredRecords(
+            sharePartition.acquire(
+                MEMBER_ID,
+                10 /* Batch size */,
+                100,
+                DEFAULT_FETCH_OFFSET,
+                fetchPartitionData,
+                FetchIsolation.TXN_COMMITTED),
+            45 /* Gap of 15 records will be added to second batch, gap of 2 
records will also be added to fourth batch */);
+
+        assertEquals(List.of(
+            new 
AcquiredRecords().setFirstOffset(15).setLastOffset(19).setDeliveryCount((short) 
1),
+            new 
AcquiredRecords().setFirstOffset(20).setLastOffset(49).setDeliveryCount((short) 
1),

Review Comment:
   Why do we set the last offset to 49 when the original batch ends at offset 
34?



##########
core/src/test/java/kafka/server/share/SharePartitionTest.java:
##########
@@ -1154,7 +1163,8 @@ public void testAcquireWithMaxFetchRecords() {
             BATCH_SIZE,
             10,
             DEFAULT_FETCH_OFFSET,
-            fetchPartitionData(records)),
+            fetchPartitionData(records),
+                FETCH_ISOLATION_HWM),

Review Comment:
   indentation



##########
core/src/test/java/kafka/server/share/SharePartitionTest.java:
##########
@@ -6671,15 +6693,441 @@ private String assertionFailedMessage(SharePartition 
sharePartition, Map<Long, L
         return errorMessage.toString();
     }
 
+    @Test
+    public void testFilterRecordBatchesFromAcquiredRecords() {
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withState(SharePartitionState.ACTIVE)
+            .build();
+
+        List<AcquiredRecords> acquiredRecords1 = List.of(
+            new 
AcquiredRecords().setFirstOffset(1).setLastOffset(5).setDeliveryCount((short) 
1),
+            new 
AcquiredRecords().setFirstOffset(10).setLastOffset(15).setDeliveryCount((short) 
2),
+            new 
AcquiredRecords().setFirstOffset(20).setLastOffset(25).setDeliveryCount((short) 
1)
+        );
+        List<RecordBatch> recordBatches1 = List.of(
+            memoryRecordsBuilder(3, 2).build().batches().iterator().next(),
+            memoryRecordsBuilder(3, 12).build().batches().iterator().next()
+        );
+        assertEquals(
+            List.of(
+                new 
AcquiredRecords().setFirstOffset(1).setLastOffset(1).setDeliveryCount((short) 
1),
+                new 
AcquiredRecords().setFirstOffset(5).setLastOffset(5).setDeliveryCount((short) 
1),
+                new 
AcquiredRecords().setFirstOffset(10).setLastOffset(11).setDeliveryCount((short) 
2),
+                new 
AcquiredRecords().setFirstOffset(15).setLastOffset(15).setDeliveryCount((short) 
2),
+                new 
AcquiredRecords().setFirstOffset(20).setLastOffset(25).setDeliveryCount((short) 
1)),
+            
sharePartition.filterRecordBatchesFromAcquiredRecords(acquiredRecords1, 
recordBatches1));
+
+        List<AcquiredRecords> acquiredRecords2 = List.of(
+            new 
AcquiredRecords().setFirstOffset(1).setLastOffset(4).setDeliveryCount((short) 
3),
+            new 
AcquiredRecords().setFirstOffset(5).setLastOffset(8).setDeliveryCount((short) 
3),
+            new 
AcquiredRecords().setFirstOffset(9).setLastOffset(30).setDeliveryCount((short) 
2),
+            new 
AcquiredRecords().setFirstOffset(31).setLastOffset(40).setDeliveryCount((short) 
3)
+        );
+        List<RecordBatch> recordBatches2 = List.of(
+            memoryRecordsBuilder(21, 5).build().batches().iterator().next(),
+            memoryRecordsBuilder(5, 31).build().batches().iterator().next()
+        );
+        assertEquals(
+            List.of(
+                new 
AcquiredRecords().setFirstOffset(1).setLastOffset(4).setDeliveryCount((short) 
3),
+                new 
AcquiredRecords().setFirstOffset(26).setLastOffset(30).setDeliveryCount((short) 
2),
+                new 
AcquiredRecords().setFirstOffset(36).setLastOffset(40).setDeliveryCount((short) 
3)
+
+            ), 
sharePartition.filterRecordBatchesFromAcquiredRecords(acquiredRecords2, 
recordBatches2)
+        );
+
+        // Record batches is empty.
+        assertEquals(acquiredRecords2, 
sharePartition.filterRecordBatchesFromAcquiredRecords(acquiredRecords2, 
List.of()));
+
+        List<AcquiredRecords> acquiredRecords3 = List.of(
+            new 
AcquiredRecords().setFirstOffset(0).setLastOffset(19).setDeliveryCount((short) 
1)
+        );
+        List<RecordBatch> recordBatches3 = List.of(
+            memoryRecordsBuilder(1, 8).build().batches().iterator().next(),
+            memoryRecordsBuilder(1, 18).build().batches().iterator().next()
+        );
+
+        assertEquals(
+            List.of(
+                new 
AcquiredRecords().setFirstOffset(0).setLastOffset(7).setDeliveryCount((short) 
1),
+                new 
AcquiredRecords().setFirstOffset(9).setLastOffset(17).setDeliveryCount((short) 
1),
+                new 
AcquiredRecords().setFirstOffset(19).setLastOffset(19).setDeliveryCount((short) 
1)
+
+            ), 
sharePartition.filterRecordBatchesFromAcquiredRecords(acquiredRecords3, 
recordBatches3)
+        );
+    }
+
+    @Test
+    public void testAcquireWithReadCommittedIsolationLevel() {
+        SharePartition sharePartition = 
Mockito.spy(SharePartitionBuilder.builder()
+            .withState(SharePartitionState.ACTIVE)
+            .build());
+
+        ByteBuffer buffer = ByteBuffer.allocate(4096);
+        memoryRecordsBuilder(buffer, 5, 10).close();
+        memoryRecordsBuilder(buffer, 5, 15).close();
+        memoryRecordsBuilder(buffer, 15, 20).close();
+        memoryRecordsBuilder(buffer, 8, 50).close();
+        memoryRecordsBuilder(buffer, 10, 58).close();
+        memoryRecordsBuilder(buffer, 5, 70).close();
+
+        buffer.flip();
+        MemoryRecords records = MemoryRecords.readableRecords(buffer);
+        FetchPartitionData fetchPartitionData = fetchPartitionData(records, 
newAbortedTransactions());
+
+        // We are mocking the result of function 
fetchAbortedTransactionRecordBatches. The records present at these offsets need 
to be archived.
+        // We won't be utilizing the aborted transactions passed in 
fetchPartitionData.
+        
when(sharePartition.fetchAbortedTransactionRecordBatches(fetchPartitionData.records.batches(),
 fetchPartitionData.abortedTransactions.get())).thenReturn(
+            List.of(
+                memoryRecordsBuilder(5, 
10).build().batches().iterator().next(),
+                memoryRecordsBuilder(10, 
58).build().batches().iterator().next(),
+                memoryRecordsBuilder(5, 70).build().batches().iterator().next()
+            )
+        );
+
+        List<AcquiredRecords> acquiredRecordsList = fetchAcquiredRecords(
+            sharePartition.acquire(
+                MEMBER_ID,
+                10 /* Batch size */,
+                100,
+                DEFAULT_FETCH_OFFSET,
+                fetchPartitionData,
+                FetchIsolation.TXN_COMMITTED),
+            45 /* Gap of 15 records will be added to second batch, gap of 2 
records will also be added to fourth batch */);
+
+        assertEquals(List.of(
+            new 
AcquiredRecords().setFirstOffset(15).setLastOffset(19).setDeliveryCount((short) 
1),
+            new 
AcquiredRecords().setFirstOffset(20).setLastOffset(49).setDeliveryCount((short) 
1),
+            new 
AcquiredRecords().setFirstOffset(50).setLastOffset(57).setDeliveryCount((short) 
1),
+            new 
AcquiredRecords().setFirstOffset(68).setLastOffset(69).setDeliveryCount((short) 
1)
+        ), acquiredRecordsList);
+        assertEquals(75, sharePartition.nextFetchOffset());
+
+        // Checking cached state.
+        assertEquals(4, sharePartition.cachedState().size());
+        assertTrue(sharePartition.cachedState().containsKey(10L));
+        assertTrue(sharePartition.cachedState().containsKey(20L));
+        assertTrue(sharePartition.cachedState().containsKey(50L));
+        assertTrue(sharePartition.cachedState().containsKey(70L));
+        assertNotNull(sharePartition.cachedState().get(10L).offsetState());
+        assertNotNull(sharePartition.cachedState().get(50L).offsetState());
+
+        assertEquals(19L, sharePartition.cachedState().get(10L).lastOffset());
+        assertEquals(49L, sharePartition.cachedState().get(20L).lastOffset());
+        assertEquals(69L, sharePartition.cachedState().get(50L).lastOffset());
+        assertEquals(74L, sharePartition.cachedState().get(70L).lastOffset());
+
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(20L).batchState());
+        assertEquals(RecordState.ARCHIVED, 
sharePartition.cachedState().get(70L).batchState());
+
+        assertEquals(MEMBER_ID, 
sharePartition.cachedState().get(20L).batchMemberId());
+        assertEquals(EMPTY_MEMBER_ID, 
sharePartition.cachedState().get(70L).batchMemberId());
+
+        
assertNotNull(sharePartition.cachedState().get(20L).batchAcquisitionLockTimeoutTask());
+        
assertNull(sharePartition.cachedState().get(70L).batchAcquisitionLockTimeoutTask());
+
+        Map<Long, InFlightState> expectedOffsetStateMap = new HashMap<>();
+        expectedOffsetStateMap.put(10L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(11L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(12L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(13L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(14L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(15L, new 
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+        expectedOffsetStateMap.put(16L, new 
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+        expectedOffsetStateMap.put(17L, new 
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+        expectedOffsetStateMap.put(18L, new 
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+        expectedOffsetStateMap.put(19L, new 
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+        assertEquals(expectedOffsetStateMap, 
sharePartition.cachedState().get(10L).offsetState());
+
+        
assertNull(sharePartition.cachedState().get(10L).offsetState().get(10L).acquisitionLockTimeoutTask());
+        
assertNull(sharePartition.cachedState().get(10L).offsetState().get(11L).acquisitionLockTimeoutTask());
+        
assertNull(sharePartition.cachedState().get(10L).offsetState().get(12L).acquisitionLockTimeoutTask());
+        
assertNull(sharePartition.cachedState().get(10L).offsetState().get(13L).acquisitionLockTimeoutTask());
+        
assertNull(sharePartition.cachedState().get(10L).offsetState().get(14L).acquisitionLockTimeoutTask());
+        
assertNotNull(sharePartition.cachedState().get(10L).offsetState().get(15L).acquisitionLockTimeoutTask());
+        
assertNotNull(sharePartition.cachedState().get(10L).offsetState().get(16L).acquisitionLockTimeoutTask());
+        
assertNotNull(sharePartition.cachedState().get(10L).offsetState().get(17L).acquisitionLockTimeoutTask());
+        
assertNotNull(sharePartition.cachedState().get(10L).offsetState().get(18L).acquisitionLockTimeoutTask());
+        
assertNotNull(sharePartition.cachedState().get(10L).offsetState().get(19L).acquisitionLockTimeoutTask());
+
+        expectedOffsetStateMap = new HashMap<>();
+        expectedOffsetStateMap.put(50L, new 
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+        expectedOffsetStateMap.put(51L, new 
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+        expectedOffsetStateMap.put(52L, new 
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+        expectedOffsetStateMap.put(53L, new 
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+        expectedOffsetStateMap.put(54L, new 
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+        expectedOffsetStateMap.put(55L, new 
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+        expectedOffsetStateMap.put(56L, new 
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+        expectedOffsetStateMap.put(57L, new 
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+        expectedOffsetStateMap.put(58L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(59L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(60L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(61L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(62L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(63L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(64L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(65L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(66L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(67L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(68L, new 
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+        expectedOffsetStateMap.put(69L, new 
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+        assertEquals(expectedOffsetStateMap, 
sharePartition.cachedState().get(50L).offsetState());
+
+        
assertNotNull(sharePartition.cachedState().get(50L).offsetState().get(50L).acquisitionLockTimeoutTask());
+        
assertNotNull(sharePartition.cachedState().get(50L).offsetState().get(51L).acquisitionLockTimeoutTask());
+        
assertNotNull(sharePartition.cachedState().get(50L).offsetState().get(52L).acquisitionLockTimeoutTask());
+        
assertNotNull(sharePartition.cachedState().get(50L).offsetState().get(53L).acquisitionLockTimeoutTask());
+        
assertNotNull(sharePartition.cachedState().get(50L).offsetState().get(54L).acquisitionLockTimeoutTask());
+        
assertNotNull(sharePartition.cachedState().get(50L).offsetState().get(55L).acquisitionLockTimeoutTask());
+        
assertNotNull(sharePartition.cachedState().get(50L).offsetState().get(56L).acquisitionLockTimeoutTask());
+        
assertNotNull(sharePartition.cachedState().get(50L).offsetState().get(57L).acquisitionLockTimeoutTask());
+        
assertNull(sharePartition.cachedState().get(50L).offsetState().get(58L).acquisitionLockTimeoutTask());
+        
assertNull(sharePartition.cachedState().get(50L).offsetState().get(59L).acquisitionLockTimeoutTask());
+        
assertNull(sharePartition.cachedState().get(50L).offsetState().get(60L).acquisitionLockTimeoutTask());
+        
assertNull(sharePartition.cachedState().get(50L).offsetState().get(61L).acquisitionLockTimeoutTask());
+        
assertNull(sharePartition.cachedState().get(50L).offsetState().get(62L).acquisitionLockTimeoutTask());
+        
assertNull(sharePartition.cachedState().get(50L).offsetState().get(63L).acquisitionLockTimeoutTask());
+        
assertNull(sharePartition.cachedState().get(50L).offsetState().get(64L).acquisitionLockTimeoutTask());
+        
assertNull(sharePartition.cachedState().get(50L).offsetState().get(65L).acquisitionLockTimeoutTask());
+        
assertNull(sharePartition.cachedState().get(50L).offsetState().get(66L).acquisitionLockTimeoutTask());
+        
assertNull(sharePartition.cachedState().get(50L).offsetState().get(67L).acquisitionLockTimeoutTask());
+        
assertNotNull(sharePartition.cachedState().get(50L).offsetState().get(68L).acquisitionLockTimeoutTask());
+        
assertNotNull(sharePartition.cachedState().get(50L).offsetState().get(69L).acquisitionLockTimeoutTask());
+    }
+
+    @Test
+    public void testContainsAbortMarker() {
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withState(SharePartitionState.ACTIVE)
+            .build();
+
+        // Record batch is not a control batch.
+        RecordBatch recordBatch = mock(RecordBatch.class);
+        when(recordBatch.isControlBatch()).thenReturn(false);
+        assertFalse(sharePartition.containsAbortMarker(recordBatch));
+
+        // Record batch is a control batch but doesn't contain any records..

Review Comment:
   extra .



##########
core/src/test/java/kafka/server/share/SharePartitionTest.java:
##########
@@ -6671,15 +6693,441 @@ private String assertionFailedMessage(SharePartition 
sharePartition, Map<Long, L
         return errorMessage.toString();
     }
 
+    @Test
+    public void testFilterRecordBatchesFromAcquiredRecords() {
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withState(SharePartitionState.ACTIVE)
+            .build();
+
+        List<AcquiredRecords> acquiredRecords1 = List.of(
+            new 
AcquiredRecords().setFirstOffset(1).setLastOffset(5).setDeliveryCount((short) 
1),
+            new 
AcquiredRecords().setFirstOffset(10).setLastOffset(15).setDeliveryCount((short) 
2),
+            new 
AcquiredRecords().setFirstOffset(20).setLastOffset(25).setDeliveryCount((short) 
1)
+        );
+        List<RecordBatch> recordBatches1 = List.of(
+            memoryRecordsBuilder(3, 2).build().batches().iterator().next(),
+            memoryRecordsBuilder(3, 12).build().batches().iterator().next()
+        );
+        assertEquals(
+            List.of(
+                new 
AcquiredRecords().setFirstOffset(1).setLastOffset(1).setDeliveryCount((short) 
1),
+                new 
AcquiredRecords().setFirstOffset(5).setLastOffset(5).setDeliveryCount((short) 
1),
+                new 
AcquiredRecords().setFirstOffset(10).setLastOffset(11).setDeliveryCount((short) 
2),
+                new 
AcquiredRecords().setFirstOffset(15).setLastOffset(15).setDeliveryCount((short) 
2),
+                new 
AcquiredRecords().setFirstOffset(20).setLastOffset(25).setDeliveryCount((short) 
1)),
+            
sharePartition.filterRecordBatchesFromAcquiredRecords(acquiredRecords1, 
recordBatches1));
+
+        List<AcquiredRecords> acquiredRecords2 = List.of(
+            new 
AcquiredRecords().setFirstOffset(1).setLastOffset(4).setDeliveryCount((short) 
3),
+            new 
AcquiredRecords().setFirstOffset(5).setLastOffset(8).setDeliveryCount((short) 
3),
+            new 
AcquiredRecords().setFirstOffset(9).setLastOffset(30).setDeliveryCount((short) 
2),
+            new 
AcquiredRecords().setFirstOffset(31).setLastOffset(40).setDeliveryCount((short) 
3)
+        );
+        List<RecordBatch> recordBatches2 = List.of(
+            memoryRecordsBuilder(21, 5).build().batches().iterator().next(),
+            memoryRecordsBuilder(5, 31).build().batches().iterator().next()
+        );
+        assertEquals(
+            List.of(
+                new 
AcquiredRecords().setFirstOffset(1).setLastOffset(4).setDeliveryCount((short) 
3),
+                new 
AcquiredRecords().setFirstOffset(26).setLastOffset(30).setDeliveryCount((short) 
2),
+                new 
AcquiredRecords().setFirstOffset(36).setLastOffset(40).setDeliveryCount((short) 
3)
+
+            ), 
sharePartition.filterRecordBatchesFromAcquiredRecords(acquiredRecords2, 
recordBatches2)
+        );
+
+        // Record batches is empty.
+        assertEquals(acquiredRecords2, 
sharePartition.filterRecordBatchesFromAcquiredRecords(acquiredRecords2, 
List.of()));
+
+        List<AcquiredRecords> acquiredRecords3 = List.of(
+            new 
AcquiredRecords().setFirstOffset(0).setLastOffset(19).setDeliveryCount((short) 
1)
+        );
+        List<RecordBatch> recordBatches3 = List.of(
+            memoryRecordsBuilder(1, 8).build().batches().iterator().next(),
+            memoryRecordsBuilder(1, 18).build().batches().iterator().next()
+        );
+
+        assertEquals(
+            List.of(
+                new 
AcquiredRecords().setFirstOffset(0).setLastOffset(7).setDeliveryCount((short) 
1),
+                new 
AcquiredRecords().setFirstOffset(9).setLastOffset(17).setDeliveryCount((short) 
1),
+                new 
AcquiredRecords().setFirstOffset(19).setLastOffset(19).setDeliveryCount((short) 
1)
+
+            ), 
sharePartition.filterRecordBatchesFromAcquiredRecords(acquiredRecords3, 
recordBatches3)
+        );
+    }
+
+    @Test
+    public void testAcquireWithReadCommittedIsolationLevel() {
+        SharePartition sharePartition = 
Mockito.spy(SharePartitionBuilder.builder()
+            .withState(SharePartitionState.ACTIVE)
+            .build());
+
+        ByteBuffer buffer = ByteBuffer.allocate(4096);
+        memoryRecordsBuilder(buffer, 5, 10).close();
+        memoryRecordsBuilder(buffer, 5, 15).close();
+        memoryRecordsBuilder(buffer, 15, 20).close();
+        memoryRecordsBuilder(buffer, 8, 50).close();
+        memoryRecordsBuilder(buffer, 10, 58).close();
+        memoryRecordsBuilder(buffer, 5, 70).close();
+
+        buffer.flip();
+        MemoryRecords records = MemoryRecords.readableRecords(buffer);
+        FetchPartitionData fetchPartitionData = fetchPartitionData(records, 
newAbortedTransactions());
+
+        // We are mocking the result of function 
fetchAbortedTransactionRecordBatches. The records present at these offsets need 
to be archived.
+        // We won't be utilizing the aborted transactions passed in 
fetchPartitionData.
+        
when(sharePartition.fetchAbortedTransactionRecordBatches(fetchPartitionData.records.batches(),
 fetchPartitionData.abortedTransactions.get())).thenReturn(
+            List.of(
+                memoryRecordsBuilder(5, 
10).build().batches().iterator().next(),
+                memoryRecordsBuilder(10, 
58).build().batches().iterator().next(),
+                memoryRecordsBuilder(5, 70).build().batches().iterator().next()
+            )
+        );
+
+        List<AcquiredRecords> acquiredRecordsList = fetchAcquiredRecords(
+            sharePartition.acquire(
+                MEMBER_ID,
+                10 /* Batch size */,
+                100,
+                DEFAULT_FETCH_OFFSET,
+                fetchPartitionData,
+                FetchIsolation.TXN_COMMITTED),
+            45 /* Gap of 15 records will be added to second batch, gap of 2 
records will also be added to fourth batch */);
+
+        assertEquals(List.of(
+            new 
AcquiredRecords().setFirstOffset(15).setLastOffset(19).setDeliveryCount((short) 
1),
+            new 
AcquiredRecords().setFirstOffset(20).setLastOffset(49).setDeliveryCount((short) 
1),
+            new 
AcquiredRecords().setFirstOffset(50).setLastOffset(57).setDeliveryCount((short) 
1),
+            new 
AcquiredRecords().setFirstOffset(68).setLastOffset(69).setDeliveryCount((short) 
1)
+        ), acquiredRecordsList);
+        assertEquals(75, sharePartition.nextFetchOffset());
+
+        // Checking cached state.
+        assertEquals(4, sharePartition.cachedState().size());
+        assertTrue(sharePartition.cachedState().containsKey(10L));
+        assertTrue(sharePartition.cachedState().containsKey(20L));
+        assertTrue(sharePartition.cachedState().containsKey(50L));
+        assertTrue(sharePartition.cachedState().containsKey(70L));
+        assertNotNull(sharePartition.cachedState().get(10L).offsetState());
+        assertNotNull(sharePartition.cachedState().get(50L).offsetState());
+
+        assertEquals(19L, sharePartition.cachedState().get(10L).lastOffset());
+        assertEquals(49L, sharePartition.cachedState().get(20L).lastOffset());
+        assertEquals(69L, sharePartition.cachedState().get(50L).lastOffset());
+        assertEquals(74L, sharePartition.cachedState().get(70L).lastOffset());
+
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(20L).batchState());
+        assertEquals(RecordState.ARCHIVED, 
sharePartition.cachedState().get(70L).batchState());
+
+        assertEquals(MEMBER_ID, 
sharePartition.cachedState().get(20L).batchMemberId());
+        assertEquals(EMPTY_MEMBER_ID, 
sharePartition.cachedState().get(70L).batchMemberId());
+
+        
assertNotNull(sharePartition.cachedState().get(20L).batchAcquisitionLockTimeoutTask());
+        
assertNull(sharePartition.cachedState().get(70L).batchAcquisitionLockTimeoutTask());
+
+        Map<Long, InFlightState> expectedOffsetStateMap = new HashMap<>();
+        expectedOffsetStateMap.put(10L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(11L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(12L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(13L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(14L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(15L, new 
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+        expectedOffsetStateMap.put(16L, new 
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+        expectedOffsetStateMap.put(17L, new 
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+        expectedOffsetStateMap.put(18L, new 
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+        expectedOffsetStateMap.put(19L, new 
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+        assertEquals(expectedOffsetStateMap, 
sharePartition.cachedState().get(10L).offsetState());
+
+        
assertNull(sharePartition.cachedState().get(10L).offsetState().get(10L).acquisitionLockTimeoutTask());
+        
assertNull(sharePartition.cachedState().get(10L).offsetState().get(11L).acquisitionLockTimeoutTask());
+        
assertNull(sharePartition.cachedState().get(10L).offsetState().get(12L).acquisitionLockTimeoutTask());
+        
assertNull(sharePartition.cachedState().get(10L).offsetState().get(13L).acquisitionLockTimeoutTask());
+        
assertNull(sharePartition.cachedState().get(10L).offsetState().get(14L).acquisitionLockTimeoutTask());
+        
assertNotNull(sharePartition.cachedState().get(10L).offsetState().get(15L).acquisitionLockTimeoutTask());
+        
assertNotNull(sharePartition.cachedState().get(10L).offsetState().get(16L).acquisitionLockTimeoutTask());
+        
assertNotNull(sharePartition.cachedState().get(10L).offsetState().get(17L).acquisitionLockTimeoutTask());
+        
assertNotNull(sharePartition.cachedState().get(10L).offsetState().get(18L).acquisitionLockTimeoutTask());
+        
assertNotNull(sharePartition.cachedState().get(10L).offsetState().get(19L).acquisitionLockTimeoutTask());
+
+        expectedOffsetStateMap = new HashMap<>();
+        expectedOffsetStateMap.put(50L, new 
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+        expectedOffsetStateMap.put(51L, new 
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+        expectedOffsetStateMap.put(52L, new 
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+        expectedOffsetStateMap.put(53L, new 
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+        expectedOffsetStateMap.put(54L, new 
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+        expectedOffsetStateMap.put(55L, new 
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+        expectedOffsetStateMap.put(56L, new 
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+        expectedOffsetStateMap.put(57L, new 
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+        expectedOffsetStateMap.put(58L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(59L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(60L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(61L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(62L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(63L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(64L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(65L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(66L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(67L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(68L, new 
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+        expectedOffsetStateMap.put(69L, new 
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+        assertEquals(expectedOffsetStateMap, 
sharePartition.cachedState().get(50L).offsetState());
+
+        
assertNotNull(sharePartition.cachedState().get(50L).offsetState().get(50L).acquisitionLockTimeoutTask());
+        
assertNotNull(sharePartition.cachedState().get(50L).offsetState().get(51L).acquisitionLockTimeoutTask());
+        
assertNotNull(sharePartition.cachedState().get(50L).offsetState().get(52L).acquisitionLockTimeoutTask());
+        
assertNotNull(sharePartition.cachedState().get(50L).offsetState().get(53L).acquisitionLockTimeoutTask());
+        
assertNotNull(sharePartition.cachedState().get(50L).offsetState().get(54L).acquisitionLockTimeoutTask());
+        
assertNotNull(sharePartition.cachedState().get(50L).offsetState().get(55L).acquisitionLockTimeoutTask());
+        
assertNotNull(sharePartition.cachedState().get(50L).offsetState().get(56L).acquisitionLockTimeoutTask());
+        
assertNotNull(sharePartition.cachedState().get(50L).offsetState().get(57L).acquisitionLockTimeoutTask());
+        
assertNull(sharePartition.cachedState().get(50L).offsetState().get(58L).acquisitionLockTimeoutTask());
+        
assertNull(sharePartition.cachedState().get(50L).offsetState().get(59L).acquisitionLockTimeoutTask());
+        
assertNull(sharePartition.cachedState().get(50L).offsetState().get(60L).acquisitionLockTimeoutTask());
+        
assertNull(sharePartition.cachedState().get(50L).offsetState().get(61L).acquisitionLockTimeoutTask());
+        
assertNull(sharePartition.cachedState().get(50L).offsetState().get(62L).acquisitionLockTimeoutTask());
+        
assertNull(sharePartition.cachedState().get(50L).offsetState().get(63L).acquisitionLockTimeoutTask());
+        
assertNull(sharePartition.cachedState().get(50L).offsetState().get(64L).acquisitionLockTimeoutTask());
+        
assertNull(sharePartition.cachedState().get(50L).offsetState().get(65L).acquisitionLockTimeoutTask());
+        
assertNull(sharePartition.cachedState().get(50L).offsetState().get(66L).acquisitionLockTimeoutTask());
+        
assertNull(sharePartition.cachedState().get(50L).offsetState().get(67L).acquisitionLockTimeoutTask());
+        
assertNotNull(sharePartition.cachedState().get(50L).offsetState().get(68L).acquisitionLockTimeoutTask());
+        
assertNotNull(sharePartition.cachedState().get(50L).offsetState().get(69L).acquisitionLockTimeoutTask());
+    }
+
+    @Test
+    public void testContainsAbortMarker() {
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withState(SharePartitionState.ACTIVE)
+            .build();
+
+        // Record batch is not a control batch.
+        RecordBatch recordBatch = mock(RecordBatch.class);
+        when(recordBatch.isControlBatch()).thenReturn(false);
+        assertFalse(sharePartition.containsAbortMarker(recordBatch));
+
+        // Record batch is a control batch but doesn't contain any records..
+        recordBatch = mock(RecordBatch.class);
+        Iterator batchIterator = mock(Iterator.class);
+        when(batchIterator.hasNext()).thenReturn(false);
+        when(recordBatch.iterator()).thenReturn(batchIterator);
+        when(recordBatch.isControlBatch()).thenReturn(true);
+        assertFalse(sharePartition.containsAbortMarker(recordBatch));
+
+        // Record batch is a control batch which contains a record of type 
ControlRecordType.ABORT.
+        recordBatch = mock(RecordBatch.class);
+        batchIterator = mock(Iterator.class);
+        when(batchIterator.hasNext()).thenReturn(true);
+        DefaultRecord record = mock(DefaultRecord.class);
+        ByteBuffer buffer = ByteBuffer.allocate(4096);
+        // Buffer has to be created in a way that 
ControlRecordType.parse(buffer) returns ControlRecordType.ABORT.
+        buffer.putShort((short) 5);
+        buffer.putShort(ControlRecordType.ABORT.type());
+        buffer.putInt(23432); // some field added in version 5
+        buffer.flip();
+        when(record.key()).thenReturn(buffer);
+        when(batchIterator.next()).thenReturn(record);
+        when(recordBatch.iterator()).thenReturn(batchIterator);
+        when(recordBatch.isControlBatch()).thenReturn(true);
+        assertTrue(sharePartition.containsAbortMarker(recordBatch));
+
+        // Record batch is a control batch which contains a record of type 
ControlRecordType.COMMIT.
+        recordBatch = mock(RecordBatch.class);
+        batchIterator = mock(Iterator.class);
+        when(batchIterator.hasNext()).thenReturn(true);
+        record = mock(DefaultRecord.class);
+        buffer = ByteBuffer.allocate(4096);
+        // Buffer has to be created in a way that 
ControlRecordType.parse(buffer) returns ControlRecordType.COMMIT.
+        buffer.putShort((short) 5);
+        buffer.putShort(ControlRecordType.COMMIT.type());
+        buffer.putInt(23432); // some field added in version 5
+        buffer.flip();
+        when(record.key()).thenReturn(buffer);
+        when(batchIterator.next()).thenReturn(record);
+        when(recordBatch.iterator()).thenReturn(batchIterator);
+        when(recordBatch.isControlBatch()).thenReturn(true);
+        assertFalse(sharePartition.containsAbortMarker(recordBatch));
+    }
+
+    @Test
+    public void 
testFetchAbortedTransactionRecordBatchesForOnlyAbortedTransactions() {
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withState(SharePartitionState.ACTIVE)
+            .build();
+
+        // Case 1 - Creating 10 transactional records in a single batch 
followed by a ABORT marker record for producerId 1.
+        ByteBuffer buffer = ByteBuffer.allocate(1024);
+        newTransactionalRecords(buffer, ControlRecordType.ABORT, 10, 1, 0);
+        buffer.flip();
+        Records records = MemoryRecords.readableRecords(buffer);
+
+        List<FetchResponseData.AbortedTransaction> abortedTransactions = 
List.of(
+            new 
FetchResponseData.AbortedTransaction().setFirstOffset(0).setProducerId(1)
+        );
+        // records from 0 to 9 should be archived because they are a part of 
aborted transactions.
+        List<RecordBatch> actual = 
sharePartition.fetchAbortedTransactionRecordBatches(records.batches(), 
abortedTransactions);
+        assertEquals(1, actual.size());
+        assertEquals(0, actual.get(0).baseOffset());
+        assertEquals(9, actual.get(0).lastOffset());
+        assertEquals(1, actual.get(0).producerId());
+
+        // Case 2: 3 individual batches each followed by a ABORT marker record 
for producerId 1.
+        buffer = ByteBuffer.allocate(1024);
+        newTransactionalRecords(buffer, ControlRecordType.ABORT, 1, 1, 0);
+        newTransactionalRecords(buffer, ControlRecordType.ABORT, 1, 1, 2);
+        newTransactionalRecords(buffer, ControlRecordType.ABORT, 1, 1, 4);
+        buffer.flip();
+        records = MemoryRecords.readableRecords(buffer);
+        abortedTransactions = List.of(
+            new 
FetchResponseData.AbortedTransaction().setFirstOffset(0).setProducerId(1),
+            new 
FetchResponseData.AbortedTransaction().setFirstOffset(2).setProducerId(1),
+            new 
FetchResponseData.AbortedTransaction().setFirstOffset(4).setProducerId(1)
+        );
+
+        actual = 
sharePartition.fetchAbortedTransactionRecordBatches(records.batches(), 
abortedTransactions);
+        assertEquals(3, actual.size());
+        assertEquals(0, actual.get(0).baseOffset());
+        assertEquals(0, actual.get(0).lastOffset());
+        assertEquals(1, actual.get(0).producerId());
+        assertEquals(2, actual.get(1).baseOffset());
+        assertEquals(2, actual.get(1).lastOffset());
+        assertEquals(1, actual.get(1).producerId());
+        assertEquals(4, actual.get(2).baseOffset());
+        assertEquals(4, actual.get(2).lastOffset());
+        assertEquals(1, actual.get(2).producerId());
+
+        // Case 3: The producer id of records is different, so they should not 
be archived,
+        buffer = ByteBuffer.allocate(1024);
+        // We are creating 10 transactional records followed by a ABORT marker 
record for producerId 2.
+        newTransactionalRecords(buffer, ControlRecordType.ABORT, 10, 2, 0);
+        buffer.flip();
+        records = MemoryRecords.readableRecords(buffer);
+        abortedTransactions = List.of(
+            new 
FetchResponseData.AbortedTransaction().setFirstOffset(0).setProducerId(1)
+        );
+
+        actual = 
sharePartition.fetchAbortedTransactionRecordBatches(records.batches(), 
abortedTransactions);
+        assertEquals(0, actual.size());
+    }
+
+    @Test
+    public void 
testFetchAbortedTransactionRecordBatchesForAbortedAndCommittedTransactions() {
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withState(SharePartitionState.ACTIVE)
+            .build();
+
+        ByteBuffer buffer = ByteBuffer.allocate(1024);
+        newTransactionalRecords(buffer, ControlRecordType.ABORT, 2, 1, 0);
+        newTransactionalRecords(buffer, ControlRecordType.COMMIT, 2, 2, 3);
+        newTransactionalRecords(buffer, ControlRecordType.ABORT, 2, 2, 6);
+        newTransactionalRecords(buffer, ControlRecordType.ABORT, 2, 1, 9);
+        newTransactionalRecords(buffer, ControlRecordType.COMMIT, 2, 1, 12);
+        newTransactionalRecords(buffer, ControlRecordType.ABORT, 2, 1, 15);
+        buffer.flip();
+        Records records = MemoryRecords.readableRecords(buffer);
+
+        // Case 1 - Aborted transactions does not contain the record batch 
from 4-5 with producer id 2.

Review Comment:
   Hmm, batch 2-3 have produce id 2.



##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java:
##########
@@ -2090,6 +2091,307 @@ public void testComplexShareConsumer() throws Exception 
{
         verifyShareGroupStateTopicRecordsProduced();
     }
 
+    @ClusterTest
+    public void testReadCommittedIsolationLevel() {
+        alterShareAutoOffsetReset("group1", "earliest");
+        alterShareIsolationLevel("group1", "read_committed");
+        try (Producer<byte[], byte[]> transactionalProducer = 
createProducer("T1");
+             ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
+            
produceCommittedAndAbortedTransactionsInInterval(transactionalProducer, 10, 5);
+            shareConsumer.subscribe(Set.of(tp.topic()));
+            ConsumerRecords<byte[], byte[]> records = 
waitedPoll(shareConsumer, 2500L, 8);
+            // 5th and 10th message transaction was aborted, hence they won't 
be included in the fetched records.
+            assertEquals(8, records.count());
+            int messageCounter = 1;
+            for (ConsumerRecord<byte[], byte[]> record : records) {
+                assertEquals(tp.topic(), record.topic());
+                assertEquals(tp.partition(), record.partition());
+                if (messageCounter % 5 == 0)
+                    messageCounter++;
+                assertEquals("Message " + messageCounter, new 
String(record.value()));
+                messageCounter++;
+            }
+        }
+        verifyShareGroupStateTopicRecordsProduced();
+    }
+
+    @ClusterTest
+    public void testReadUncommittedIsolationLevel() {
+        alterShareAutoOffsetReset("group1", "earliest");
+        alterShareIsolationLevel("group1", "read_uncommitted");
+        try (Producer<byte[], byte[]> transactionalProducer = 
createProducer("T1");
+             ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
+            
produceCommittedAndAbortedTransactionsInInterval(transactionalProducer, 10, 5);
+            shareConsumer.subscribe(Set.of(tp.topic()));
+            ConsumerRecords<byte[], byte[]> records = 
waitedPoll(shareConsumer, 2500L, 10);
+            // Even though 5th and 10th message transaction was aborted, they 
will be included in the fetched records since IsolationLevel is 
READ_UNCOMMITTED.
+            assertEquals(10, records.count());
+            int messageCounter = 1;
+            for (ConsumerRecord<byte[], byte[]> record : records) {
+                assertEquals(tp.topic(), record.topic());
+                assertEquals(tp.partition(), record.partition());
+                assertEquals("Message " + messageCounter, new 
String(record.value()));
+                messageCounter++;
+            }
+        }
+        verifyShareGroupStateTopicRecordsProduced();
+    }
+
+    @ClusterTest
+    public void testAlterReadUncommittedToReadCommittedIsolationLevel() {
+        alterShareAutoOffsetReset("group1", "earliest");
+        alterShareIsolationLevel("group1", "read_uncommitted");
+        try (Producer<byte[], byte[]> transactionalProducer = 
createProducer("T1");
+             ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
+            shareConsumer.subscribe(Set.of(tp.topic()));
+            transactionalProducer.initTransactions();
+            try {
+                // First transaction is committed.
+                produceCommittedTransaction(transactionalProducer, "Message 
1");
+
+                ConsumerRecords<byte[], byte[]> records = 
waitedPoll(shareConsumer, 2500L, 1);
+                assertEquals(1, records.count());
+                ConsumerRecord<byte[], byte[]> record = 
records.iterator().next();
+                assertEquals("Message 1", new String(record.value()));
+                assertEquals(tp.topic(), record.topic());
+                assertEquals(tp.partition(), record.partition());
+                records.forEach(consumedRecord -> 
shareConsumer.acknowledge(consumedRecord, AcknowledgeType.ACCEPT));
+                shareConsumer.commitSync();
+
+                // Second transaction is aborted.
+                produceAbortedTransaction(transactionalProducer, "Message 2");
+
+                records = waitedPoll(shareConsumer, 2500L, 1);
+                assertEquals(1, records.count());
+                record = records.iterator().next();
+                assertEquals("Message 2", new String(record.value()));
+                records.forEach(consumedRecord -> 
shareConsumer.acknowledge(consumedRecord, AcknowledgeType.ACCEPT));
+                shareConsumer.commitSync();
+
+                // Third transaction is committed.
+                produceCommittedTransaction(transactionalProducer, "Message 
3");
+                // Fourth transaction is aborted.
+                produceAbortedTransaction(transactionalProducer, "Message 4");
+
+                records = waitedPoll(shareConsumer, 2500L, 2);
+                // Message 3 and Message 4 would be returned by this poll.
+                assertEquals(2, records.count());
+                Iterator<ConsumerRecord<byte[], byte[]>> recordIterator = 
records.iterator();
+                record = recordIterator.next();
+                assertEquals("Message 3", new String(record.value()));
+                record = recordIterator.next();
+                assertEquals("Message 4", new String(record.value()));
+                // We will make Message 3 and Message 4 available for 
re-consumption.
+                records.forEach(consumedRecord -> 
shareConsumer.acknowledge(consumedRecord, AcknowledgeType.RELEASE));
+                shareConsumer.commitSync();
+
+                // We are altering IsolationLevel to READ_COMMITTED now. We 
will only read committed transactions now.
+                alterShareIsolationLevel("group1", "read_committed");
+
+                // Fifth transaction is committed.
+                produceCommittedTransaction(transactionalProducer, "Message 
5");
+                // Sixth transaction is aborted.
+                produceAbortedTransaction(transactionalProducer, "Message 6");
+                // Seventh transaction is aborted.
+                produceAbortedTransaction(transactionalProducer, "Message 7");
+                // Eighth transaction is committed.
+                produceCommittedTransaction(transactionalProducer, "Message 
8");
+
+                // Since isolation level is READ_COMMITTED, we can consume 
Message 3 (committed transaction that was released), Message 5 and Message 8.
+                // We cannot consume Message 4 (aborted transaction that was 
released), Message 6 and Message 7 since they were aborted.
+                List<String> messages = new ArrayList<>();
+                TestUtils.waitForCondition(() -> {
+                    ConsumerRecords<byte[], byte[]> pollRecords = 
shareConsumer.poll(Duration.ofMillis(5000));
+                    if (pollRecords.count() > 0) {
+                        for (ConsumerRecord<byte[], byte[]> pollRecord : 
pollRecords)
+                            messages.add(new String(pollRecord.value()));
+                        pollRecords.forEach(consumedRecord -> 
shareConsumer.acknowledge(consumedRecord, AcknowledgeType.ACCEPT));
+                        shareConsumer.commitSync();
+                    }
+                    return messages.size() == 3;
+                }, DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to consume all 
records post altering share isolation level");
+
+                assertEquals("Message 3", messages.get(0));
+                assertEquals("Message 5", messages.get(1));
+                assertEquals("Message 8", messages.get(2));
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            } finally {
+                transactionalProducer.close();
+            }
+        }
+        verifyShareGroupStateTopicRecordsProduced();
+    }
+
+    @ClusterTest
+    public void 
testAlterReadCommittedToReadUncommittedIsolationLevelWithReleaseAck() {
+        alterShareAutoOffsetReset("group1", "earliest");
+        alterShareIsolationLevel("group1", "read_committed");
+        try (Producer<byte[], byte[]> transactionalProducer = 
createProducer("T1");
+             ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
+            shareConsumer.subscribe(Set.of(tp.topic()));
+            transactionalProducer.initTransactions();
+
+            try {
+                // First transaction is committed.
+                produceCommittedTransaction(transactionalProducer, "Message 
1");
+
+                ConsumerRecords<byte[], byte[]> records = 
waitedPoll(shareConsumer, 2500L, 1);
+                assertEquals(1, records.count());
+                ConsumerRecord<byte[], byte[]> record = 
records.iterator().next();
+                assertEquals("Message 1", new String(record.value()));
+                assertEquals(tp.topic(), record.topic());
+                assertEquals(tp.partition(), record.partition());
+                records.forEach(consumedRecord -> 
shareConsumer.acknowledge(consumedRecord, AcknowledgeType.ACCEPT));
+                shareConsumer.commitSync();
+
+                // Second transaction is aborted.
+                produceAbortedTransaction(transactionalProducer, "Message 2");
+
+                // We will not receive any records since the transaction was 
aborted.
+                records = waitedPoll(shareConsumer, 2500L, 0);
+                assertEquals(0, records.count());
+
+                // Third transaction is committed.
+                produceCommittedTransaction(transactionalProducer, "Message 
3");
+                // Fourth transaction is aborted.
+                produceAbortedTransaction(transactionalProducer, "Message 4");
+
+                records = waitedPoll(shareConsumer, 2500L, 1);
+                // Message 3 would be returned by this poll.
+                assertEquals(1, records.count());
+                record = records.iterator().next();
+                assertEquals("Message 3", new String(record.value()));
+                // We will make Message 3 available for re-consumption.
+                records.forEach(consumedRecord -> 
shareConsumer.acknowledge(consumedRecord, AcknowledgeType.RELEASE));
+                shareConsumer.commitSync();
+
+                // Precautionary poll so that the aborted transaction for 
Message 4 is fetched by the consumer.
+                shareConsumer.poll(Duration.ofMillis(5000));

Review Comment:
   Could we do `waitUntil` to avoid having to wait for 5 secs?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to