AndrewJSchofield commented on code in PR #19261:
URL: https://github.com/apache/kafka/pull/19261#discussion_r2029317076


##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -2484,6 +2505,182 @@ private long startOffsetDuringInitialization(long 
partitionDataStartOffset) thro
         }
     }
 
+    private ShareAcquiredRecords 
maybeFilterAbortedTransactionalAcquiredRecords(
+        FetchPartitionData fetchPartitionData,
+        FetchIsolation isolationLevel,
+        ShareAcquiredRecords shareAcquiredRecords
+    ) {
+        if (isolationLevel != FetchIsolation.TXN_COMMITTED || 
fetchPartitionData.abortedTransactions.isEmpty() || 
fetchPartitionData.abortedTransactions.get().isEmpty())
+            return shareAcquiredRecords;
+
+        // When FetchIsolation.TXN_COMMITTED is used as isolation level by the 
share group, we need to filter any
+        // transactions that were aborted/did not commit due to timeout.
+        List<AcquiredRecords> result = 
filterAbortedTransactionalAcquiredRecords(fetchPartitionData.records.batches(),
+            shareAcquiredRecords.acquiredRecords(), 
fetchPartitionData.abortedTransactions.get());
+        int acquiredCount = 0;
+        for (AcquiredRecords records : result) {
+            acquiredCount += (int) (records.lastOffset() - 
records.firstOffset() + 1);
+        }
+        return new ShareAcquiredRecords(result, acquiredCount);
+    }
+
+    private List<AcquiredRecords> filterAbortedTransactionalAcquiredRecords(
+        Iterable<? extends RecordBatch> batches,
+        List<AcquiredRecords> acquiredRecords,
+        List<FetchResponseData.AbortedTransaction> abortedTransactions
+    ) {
+        // The record batches that need to be archived in cachedState because 
they were a part of aborted transactions.
+        List<RecordBatch> recordsToArchive = 
fetchAbortedTransactionRecordBatches(batches, abortedTransactions);
+        for (RecordBatch recordBatch : recordsToArchive) {
+            // Archive the offsets/batches in the cached state.
+            NavigableMap<Long, InFlightBatch> subMap = 
fetchSubMap(recordBatch);
+            archiveRecords(recordBatch.baseOffset(), recordBatch.lastOffset() 
+ 1, subMap, RecordState.ACQUIRED);
+        }
+        return filterRecordBatchesFromAcquiredRecords(acquiredRecords, 
recordsToArchive);
+    }
+
+    /**
+     * This function filters out the offsets present in the acquired records 
list that are also a part of batches that need to be archived.
+     * It follows an iterative refinement of acquired records to eliminate 
batches to be archived.
+     * @param acquiredRecordsList The list containing acquired records. This 
list is sorted by the firstOffset of the acquired batch.
+     * @param batchesToArchive The list containing record batches to archive. 
This list is sorted by the baseOffset of the record batch.
+     * @return The list containing filtered acquired records offsets.
+     */
+    List<AcquiredRecords> filterRecordBatchesFromAcquiredRecords(
+        List<AcquiredRecords> acquiredRecordsList,
+        List<RecordBatch> batchesToArchive
+    ) {
+        List<AcquiredRecords> result = new ArrayList<>();
+        for (AcquiredRecords acquiredRecords : acquiredRecordsList) {
+            List<AcquiredRecords> tempAcquiredRecordsList = new ArrayList<>();

Review Comment:
   It's because the next iteration can divide the single AcquiredRecords, and 
then loop again. It's this iterative refinement that necessitates the list. It 
is true that the list is often only a single element.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to