junrao commented on code in PR #19261:
URL: https://github.com/apache/kafka/pull/19261#discussion_r2029290363


##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -2484,6 +2505,182 @@ private long startOffsetDuringInitialization(long 
partitionDataStartOffset) thro
         }
     }
 
+    private ShareAcquiredRecords 
maybeFilterAbortedTransactionalAcquiredRecords(
+        FetchPartitionData fetchPartitionData,
+        FetchIsolation isolationLevel,
+        ShareAcquiredRecords shareAcquiredRecords
+    ) {
+        if (isolationLevel != FetchIsolation.TXN_COMMITTED || 
fetchPartitionData.abortedTransactions.isEmpty() || 
fetchPartitionData.abortedTransactions.get().isEmpty())
+            return shareAcquiredRecords;
+
+        // When FetchIsolation.TXN_COMMITTED is used as isolation level by the 
share group, we need to filter any
+        // transactions that were aborted/did not commit due to timeout.
+        List<AcquiredRecords> result = 
filterAbortedTransactionalAcquiredRecords(fetchPartitionData.records.batches(),
+            shareAcquiredRecords.acquiredRecords(), 
fetchPartitionData.abortedTransactions.get());
+        int acquiredCount = 0;
+        for (AcquiredRecords records : result) {
+            acquiredCount += (int) (records.lastOffset() - 
records.firstOffset() + 1);
+        }
+        return new ShareAcquiredRecords(result, acquiredCount);
+    }
+
+    private List<AcquiredRecords> filterAbortedTransactionalAcquiredRecords(
+        Iterable<? extends RecordBatch> batches,
+        List<AcquiredRecords> acquiredRecords,
+        List<FetchResponseData.AbortedTransaction> abortedTransactions
+    ) {
+        // The record batches that need to be archived in cachedState because 
they were a part of aborted transactions.
+        List<RecordBatch> recordsToArchive = 
fetchAbortedTransactionRecordBatches(batches, abortedTransactions);
+        for (RecordBatch recordBatch : recordsToArchive) {
+            // Archive the offsets/batches in the cached state.
+            NavigableMap<Long, InFlightBatch> subMap = 
fetchSubMap(recordBatch);
+            archiveRecords(recordBatch.baseOffset(), recordBatch.lastOffset() 
+ 1, subMap, RecordState.ACQUIRED);
+        }
+        return filterRecordBatchesFromAcquiredRecords(acquiredRecords, 
recordsToArchive);
+    }
+
+    /**
+     * This function filters out the offsets present in the acquired records 
list that are also a part of batches that need to be archived.
+     * It follows an iterative refinement of acquired records to eliminate 
batches to be archived.
+     * @param acquiredRecordsList The list containing acquired records. This 
list is sorted by the firstOffset of the acquired batch.
+     * @param batchesToArchive The list containing record batches to archive. 
This list is sorted by the baseOffset of the record batch.
+     * @return The list containing filtered acquired records offsets.
+     */
+    List<AcquiredRecords> filterRecordBatchesFromAcquiredRecords(
+        List<AcquiredRecords> acquiredRecordsList,
+        List<RecordBatch> batchesToArchive
+    ) {
+        List<AcquiredRecords> result = new ArrayList<>();
+        for (AcquiredRecords acquiredRecords : acquiredRecordsList) {
+            List<AcquiredRecords> tempAcquiredRecordsList = new ArrayList<>();

Review Comment:
   Hmm, I don't quite understand this. tempAcquiredRecordsList seems to always 
contain a single AcquiredRecords. Why does it need to be a list?



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -2484,6 +2505,182 @@ private long startOffsetDuringInitialization(long 
partitionDataStartOffset) thro
         }
     }
 
+    private ShareAcquiredRecords 
maybeFilterAbortedTransactionalAcquiredRecords(
+        FetchPartitionData fetchPartitionData,
+        FetchIsolation isolationLevel,
+        ShareAcquiredRecords shareAcquiredRecords
+    ) {
+        if (isolationLevel != FetchIsolation.TXN_COMMITTED || 
fetchPartitionData.abortedTransactions.isEmpty() || 
fetchPartitionData.abortedTransactions.get().isEmpty())
+            return shareAcquiredRecords;
+
+        // When FetchIsolation.TXN_COMMITTED is used as isolation level by the 
share group, we need to filter any
+        // transactions that were aborted/did not commit due to timeout.
+        List<AcquiredRecords> result = 
filterAbortedTransactionalAcquiredRecords(fetchPartitionData.records.batches(),
+            shareAcquiredRecords.acquiredRecords(), 
fetchPartitionData.abortedTransactions.get());
+        int acquiredCount = 0;
+        for (AcquiredRecords records : result) {
+            acquiredCount += (int) (records.lastOffset() - 
records.firstOffset() + 1);
+        }
+        return new ShareAcquiredRecords(result, acquiredCount);
+    }
+
+    private List<AcquiredRecords> filterAbortedTransactionalAcquiredRecords(
+        Iterable<? extends RecordBatch> batches,
+        List<AcquiredRecords> acquiredRecords,
+        List<FetchResponseData.AbortedTransaction> abortedTransactions
+    ) {
+        // The record batches that need to be archived in cachedState because 
they were a part of aborted transactions.
+        List<RecordBatch> recordsToArchive = 
fetchAbortedTransactionRecordBatches(batches, abortedTransactions);
+        for (RecordBatch recordBatch : recordsToArchive) {
+            // Archive the offsets/batches in the cached state.
+            NavigableMap<Long, InFlightBatch> subMap = 
fetchSubMap(recordBatch);
+            archiveRecords(recordBatch.baseOffset(), recordBatch.lastOffset() 
+ 1, subMap, RecordState.ACQUIRED);
+        }
+        return filterRecordBatchesFromAcquiredRecords(acquiredRecords, 
recordsToArchive);
+    }
+
+    /**
+     * This function filters out the offsets present in the acquired records 
list that are also a part of batches that need to be archived.
+     * It follows an iterative refinement of acquired records to eliminate 
batches to be archived.
+     * @param acquiredRecordsList The list containing acquired records. This 
list is sorted by the firstOffset of the acquired batch.
+     * @param batchesToArchive The list containing record batches to archive. 
This list is sorted by the baseOffset of the record batch.
+     * @return The list containing filtered acquired records offsets.
+     */
+    List<AcquiredRecords> filterRecordBatchesFromAcquiredRecords(
+        List<AcquiredRecords> acquiredRecordsList,
+        List<RecordBatch> batchesToArchive
+    ) {
+        List<AcquiredRecords> result = new ArrayList<>();
+        for (AcquiredRecords acquiredRecords : acquiredRecordsList) {
+            List<AcquiredRecords> tempAcquiredRecordsList = new ArrayList<>();
+            tempAcquiredRecordsList.add(acquiredRecords);
+            for (RecordBatch batchToArchive : batchesToArchive) {

Review Comment:
   This might be ok for now. But for each acquiredRecords, we are still 
iterating every batchToArchive. Since both acquiredRecordsList and 
batchesToArchive are in offset order, we could just iterating them together in 
one pass?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to