apoorvmittal10 commented on code in PR #19261:
URL: https://github.com/apache/kafka/pull/19261#discussion_r2013454652


##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -2484,6 +2497,259 @@ private long startOffsetDuringInitialization(long 
partitionDataStartOffset) thro
         }
     }
 
+    private ShareAcquiredRecords 
filterAbortedTransactionalAcquiredRecords(FetchPartitionData 
fetchPartitionData, FetchIsolation isolationLevel, ShareAcquiredRecords 
shareAcquiredRecords) {
+        if (isolationLevel != FetchIsolation.TXN_COMMITTED)
+            return shareAcquiredRecords;
+        // When FetchIsolation.TXN_COMMITTED is used as isolation type by the 
share group, we need to filter any
+        // transactions that were aborted/did not commit due to timeout.
+        List<AcquiredRecords> result = 
filterAbortedTransactionalRecords(fetchPartitionData.records.batches(), 
shareAcquiredRecords.acquiredRecords(), fetchPartitionData.abortedTransactions);
+        int acquiredCount = 0;
+        for (AcquiredRecords records : result) {
+            acquiredCount += (int) (records.lastOffset() - 
records.firstOffset() + 1);
+        }
+        return new ShareAcquiredRecords(result, acquiredCount);
+    }
+
+    private List<AcquiredRecords> filterAbortedTransactionalRecords(
+        Iterable<? extends RecordBatch> batches,
+        List<AcquiredRecords> acquiredRecords,
+        Optional<List<FetchResponseData.AbortedTransaction>> 
abortedTransactions

Review Comment:
   Are there any guarantees for `abortedTransactions` to be in some defined 
order with `firstOffset` for the RecordBatch?



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -2484,6 +2497,259 @@ private long startOffsetDuringInitialization(long 
partitionDataStartOffset) thro
         }
     }
 
+    private ShareAcquiredRecords 
filterAbortedTransactionalAcquiredRecords(FetchPartitionData 
fetchPartitionData, FetchIsolation isolationLevel, ShareAcquiredRecords 
shareAcquiredRecords) {
+        if (isolationLevel != FetchIsolation.TXN_COMMITTED)
+            return shareAcquiredRecords;
+        // When FetchIsolation.TXN_COMMITTED is used as isolation type by the 
share group, we need to filter any
+        // transactions that were aborted/did not commit due to timeout.
+        List<AcquiredRecords> result = 
filterAbortedTransactionalRecords(fetchPartitionData.records.batches(), 
shareAcquiredRecords.acquiredRecords(), fetchPartitionData.abortedTransactions);
+        int acquiredCount = 0;
+        for (AcquiredRecords records : result) {
+            acquiredCount += (int) (records.lastOffset() - 
records.firstOffset() + 1);
+        }
+        return new ShareAcquiredRecords(result, acquiredCount);
+    }
+
+    private List<AcquiredRecords> filterAbortedTransactionalRecords(

Review Comment:
   Shouldn't the method name be consistent with earlier method 
`filterAbortedTransactionalAcquiredRecords` as we are sending the `acquired` 
records list here as well. Hence earlier method can have `maybe` prefix and 
this one as `filterAbortedTransactionalAcquiredRecords`.



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -2484,6 +2497,259 @@ private long startOffsetDuringInitialization(long 
partitionDataStartOffset) thro
         }
     }
 
+    private ShareAcquiredRecords 
filterAbortedTransactionalAcquiredRecords(FetchPartitionData 
fetchPartitionData, FetchIsolation isolationLevel, ShareAcquiredRecords 
shareAcquiredRecords) {

Review Comment:
   ```suggestion
       private ShareAcquiredRecords 
maybeFilterAbortedTransactionalAcquiredRecords(FetchPartitionData 
fetchPartitionData, FetchIsolation isolationLevel, ShareAcquiredRecords 
shareAcquiredRecords) {
   ```



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -2484,6 +2497,259 @@ private long startOffsetDuringInitialization(long 
partitionDataStartOffset) thro
         }
     }
 
+    private ShareAcquiredRecords 
filterAbortedTransactionalAcquiredRecords(FetchPartitionData 
fetchPartitionData, FetchIsolation isolationLevel, ShareAcquiredRecords 
shareAcquiredRecords) {
+        if (isolationLevel != FetchIsolation.TXN_COMMITTED)
+            return shareAcquiredRecords;
+        // When FetchIsolation.TXN_COMMITTED is used as isolation type by the 
share group, we need to filter any

Review Comment:
   Why not to have the below check in the same method and pass the aborted 
transactions to `filterAbortedTransactionalRecords` without optional. This can 
make sure that when it's required to filter then only call goes to further 
methods, and maybe method should have all these pre-checks.
   
   ```
   if (abortedTransactions.isEmpty())
                   return acquiredRecords;



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to