apoorvmittal10 commented on code in PR #19261: URL: https://github.com/apache/kafka/pull/19261#discussion_r2013454652
########## core/src/main/java/kafka/server/share/SharePartition.java: ########## @@ -2484,6 +2497,259 @@ private long startOffsetDuringInitialization(long partitionDataStartOffset) thro } } + private ShareAcquiredRecords filterAbortedTransactionalAcquiredRecords(FetchPartitionData fetchPartitionData, FetchIsolation isolationLevel, ShareAcquiredRecords shareAcquiredRecords) { + if (isolationLevel != FetchIsolation.TXN_COMMITTED) + return shareAcquiredRecords; + // When FetchIsolation.TXN_COMMITTED is used as isolation type by the share group, we need to filter any + // transactions that were aborted/did not commit due to timeout. + List<AcquiredRecords> result = filterAbortedTransactionalRecords(fetchPartitionData.records.batches(), shareAcquiredRecords.acquiredRecords(), fetchPartitionData.abortedTransactions); + int acquiredCount = 0; + for (AcquiredRecords records : result) { + acquiredCount += (int) (records.lastOffset() - records.firstOffset() + 1); + } + return new ShareAcquiredRecords(result, acquiredCount); + } + + private List<AcquiredRecords> filterAbortedTransactionalRecords( + Iterable<? extends RecordBatch> batches, + List<AcquiredRecords> acquiredRecords, + Optional<List<FetchResponseData.AbortedTransaction>> abortedTransactions Review Comment: Are there any guarantees for `abortedTransactions` to be in some defined order with `firstOffset` for the RecordBatch? ########## core/src/main/java/kafka/server/share/SharePartition.java: ########## @@ -2484,6 +2497,259 @@ private long startOffsetDuringInitialization(long partitionDataStartOffset) thro } } + private ShareAcquiredRecords filterAbortedTransactionalAcquiredRecords(FetchPartitionData fetchPartitionData, FetchIsolation isolationLevel, ShareAcquiredRecords shareAcquiredRecords) { + if (isolationLevel != FetchIsolation.TXN_COMMITTED) + return shareAcquiredRecords; + // When FetchIsolation.TXN_COMMITTED is used as isolation type by the share group, we need to filter any + // transactions that were aborted/did not commit due to timeout. + List<AcquiredRecords> result = filterAbortedTransactionalRecords(fetchPartitionData.records.batches(), shareAcquiredRecords.acquiredRecords(), fetchPartitionData.abortedTransactions); + int acquiredCount = 0; + for (AcquiredRecords records : result) { + acquiredCount += (int) (records.lastOffset() - records.firstOffset() + 1); + } + return new ShareAcquiredRecords(result, acquiredCount); + } + + private List<AcquiredRecords> filterAbortedTransactionalRecords( Review Comment: Shouldn't the method name be consistent with earlier method `filterAbortedTransactionalAcquiredRecords` as we are sending the `acquired` records list here as well. Hence earlier method can have `maybe` prefix and this one as `filterAbortedTransactionalAcquiredRecords`. ########## core/src/main/java/kafka/server/share/SharePartition.java: ########## @@ -2484,6 +2497,259 @@ private long startOffsetDuringInitialization(long partitionDataStartOffset) thro } } + private ShareAcquiredRecords filterAbortedTransactionalAcquiredRecords(FetchPartitionData fetchPartitionData, FetchIsolation isolationLevel, ShareAcquiredRecords shareAcquiredRecords) { Review Comment: ```suggestion private ShareAcquiredRecords maybeFilterAbortedTransactionalAcquiredRecords(FetchPartitionData fetchPartitionData, FetchIsolation isolationLevel, ShareAcquiredRecords shareAcquiredRecords) { ``` ########## core/src/main/java/kafka/server/share/SharePartition.java: ########## @@ -2484,6 +2497,259 @@ private long startOffsetDuringInitialization(long partitionDataStartOffset) thro } } + private ShareAcquiredRecords filterAbortedTransactionalAcquiredRecords(FetchPartitionData fetchPartitionData, FetchIsolation isolationLevel, ShareAcquiredRecords shareAcquiredRecords) { + if (isolationLevel != FetchIsolation.TXN_COMMITTED) + return shareAcquiredRecords; + // When FetchIsolation.TXN_COMMITTED is used as isolation type by the share group, we need to filter any Review Comment: Why not to have the below check in the same method and pass the aborted transactions to `filterAbortedTransactionalRecords` without optional. This can make sure that when it's required to filter then only call goes to further methods, and maybe method should have all these pre-checks. ``` if (abortedTransactions.isEmpty()) return acquiredRecords; -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org