AndrewJSchofield commented on code in PR #19261:
URL: https://github.com/apache/kafka/pull/19261#discussion_r2020777296


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java:
##########
@@ -67,6 +67,8 @@ public void testFromPropsInvalid() {
                 assertPropertyInvalid(name, "not_a_number", "1.0");
             } else if 
(GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG.equals(name)) {
                 assertPropertyInvalid(name, "not_a_number", "1.0");
+            } else if (GroupConfig.SHARE_ISOLATION_LEVEL_CONFIG.equals(name)) {

Review Comment:
   nit: Please move up to the end of the other share configs.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java:
##########
@@ -247,6 +274,7 @@ private Properties createValidGroupConfig() {
         props.put(GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG, "50000");
         props.put(GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, "6000");
         props.put(GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG, "1");
+        props.put(GroupConfig.SHARE_ISOLATION_LEVEL_CONFIG, 
"read_uncommitted");

Review Comment:
   nit: I would tend to put all of the consumer configs, and then the share 
configs, and then the streams configs, just for neatness.



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -3239,7 +3239,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         shareFetchRequest.maxWait,
         fetchMinBytes,
         fetchMaxBytes,
-        FetchIsolation.HIGH_WATERMARK,
+        FetchIsolation.of(-1, GroupConfig.defaultShareIsolationLevel),

Review Comment:
   This `-1` can be `FetchRequest.CONSUMER_REPLICA_ID`.



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -2484,6 +2497,243 @@ private long startOffsetDuringInitialization(long 
partitionDataStartOffset) thro
         }
     }
 
+    private ShareAcquiredRecords 
maybeFilterAbortedTransactionalAcquiredRecords(FetchPartitionData 
fetchPartitionData, FetchIsolation isolationLevel, ShareAcquiredRecords 
shareAcquiredRecords) {
+        if (isolationLevel != FetchIsolation.TXN_COMMITTED || 
fetchPartitionData.abortedTransactions.isEmpty())
+            return shareAcquiredRecords;
+        // When FetchIsolation.TXN_COMMITTED is used as isolation type by the 
share group, we need to filter any

Review Comment:
   Isolation level, not type, please.



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -2484,6 +2497,243 @@ private long startOffsetDuringInitialization(long 
partitionDataStartOffset) thro
         }
     }
 
+    private ShareAcquiredRecords 
maybeFilterAbortedTransactionalAcquiredRecords(FetchPartitionData 
fetchPartitionData, FetchIsolation isolationLevel, ShareAcquiredRecords 
shareAcquiredRecords) {
+        if (isolationLevel != FetchIsolation.TXN_COMMITTED || 
fetchPartitionData.abortedTransactions.isEmpty())
+            return shareAcquiredRecords;
+        // When FetchIsolation.TXN_COMMITTED is used as isolation type by the 
share group, we need to filter any
+        // transactions that were aborted/did not commit due to timeout.
+        List<AcquiredRecords> result = 
filterAbortedTransactionalAcquiredRecords(fetchPartitionData.records.batches(), 
shareAcquiredRecords.acquiredRecords(), 
fetchPartitionData.abortedTransactions.get());
+        int acquiredCount = 0;
+        for (AcquiredRecords records : result) {
+            acquiredCount += (int) (records.lastOffset() - 
records.firstOffset() + 1);
+        }
+        return new ShareAcquiredRecords(result, acquiredCount);
+    }
+
+    private List<AcquiredRecords> filterAbortedTransactionalAcquiredRecords(
+        Iterable<? extends RecordBatch> batches,
+        List<AcquiredRecords> acquiredRecords,
+        List<FetchResponseData.AbortedTransaction> abortedTransactions
+    ) {
+        lock.writeLock().lock();
+        try {
+            // The record batches that need to be archived in cachedState 
because they were a part of aborted transactions.
+            List<RecordBatch> recordsToArchive = 
fetchAbortedTransactionRecordBatches(batches, abortedTransactions);
+            for (RecordBatch recordBatch : recordsToArchive) {
+                // Archive the offsets/batches in the cached state.
+                NavigableMap<Long, InFlightBatch> subMap = 
fetchSubMap(recordBatch);
+                archiveAcquiredBatchRecords(subMap, recordBatch);
+            }
+            return filterRecordBatchesFromAcquiredRecords(acquiredRecords, 
recordsToArchive);
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    // Visible for testing.
+    List<AcquiredRecords> filterRecordBatchesFromAcquiredRecords(
+        List<AcquiredRecords> acquiredRecords,
+        List<RecordBatch> recordsToArchive
+    ) {
+        lock.writeLock().lock();
+        try {
+            List<AcquiredRecords> result = new ArrayList<>();
+
+            for (AcquiredRecords acquiredRecord : acquiredRecords) {
+                List<AcquiredRecords> tempAcquiredRecords = new ArrayList<>();
+                tempAcquiredRecords.add(acquiredRecord);
+                for (RecordBatch recordBatch : recordsToArchive) {
+                    List<AcquiredRecords> newAcquiredRecords = new 
ArrayList<>();
+                    for (AcquiredRecords temp : tempAcquiredRecords) {
+                        // Check if record batch overlaps with the acquired 
records.
+                        if (temp.firstOffset() <= recordBatch.lastOffset() && 
temp.lastOffset() >= recordBatch.baseOffset()) {
+                            // Split the acquired record into parts before, 
inside, and after the overlapping record batch.
+                            if (temp.firstOffset() < recordBatch.baseOffset()) 
{
+                                newAcquiredRecords.add(new AcquiredRecords()
+                                    .setFirstOffset(temp.firstOffset())
+                                    .setLastOffset(recordBatch.baseOffset() - 
1)
+                                    .setDeliveryCount((short) 1));
+                            }
+                            if (temp.lastOffset() > recordBatch.lastOffset()) {
+                                newAcquiredRecords.add(new AcquiredRecords()
+                                    .setFirstOffset(recordBatch.lastOffset() + 
1)
+                                    .setLastOffset(temp.lastOffset())
+                                    .setDeliveryCount((short) 1));
+                            }
+                        } else {
+                            newAcquiredRecords.add(temp);
+                        }
+                    }
+                    tempAcquiredRecords = newAcquiredRecords;
+                }
+                result.addAll(tempAcquiredRecords);
+            }
+            return result;
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    private void archiveAcquiredBatchRecords(NavigableMap<Long, InFlightBatch> 
subMap, RecordBatch recordBatch) {
+        lock.writeLock().lock();
+        try {
+            // The fetched batch either is exact fetch equivalent batch 
(mostly), subset
+            // or spans over multiple fetched batches. The state can vary per 
offset itself from
+            // the fetched batch in case of subset.
+            for (Map.Entry<Long, InFlightBatch> entry : subMap.entrySet()) {
+                InFlightBatch inFlightBatch = entry.getValue();
+
+                // If startOffset has moved ahead of the in-flight batch, skip 
the batch.
+                if (inFlightBatch.lastOffset() < startOffset) {
+                    log.trace("All offsets in the inflight batch {} are 
already archived: {}-{}",
+                        inFlightBatch, groupId, topicIdPartition);
+                    continue;
+                }
+
+                // Determine if the in-flight batch is a full match from the 
request batch.
+                boolean fullMatch = checkForFullMatch(inFlightBatch, 
recordBatch.baseOffset(), recordBatch.lastOffset());
+
+                // Maintain state per offset if the inflight batch is not a 
full match or the
+                // offset state is managed for this in-flight batch.
+                if (!fullMatch || inFlightBatch.offsetState() != null) {
+                    log.debug("Subset or offset tracked batch record found for 
record,"
+                            + " batch: {}, request offsets - first: {}, last: 
{} for the share partition: {}-{}",
+                        inFlightBatch, recordBatch.baseOffset(), 
recordBatch.lastOffset(), groupId, topicIdPartition);
+                    if (inFlightBatch.offsetState() == null) {
+                        // The record batch is a subset and requires per 
offset state hence initialize
+                        // the offsets state in the in-flight batch.
+                        inFlightBatch.maybeInitializeOffsetStateUpdate();
+                    }
+                    archivePerOffsetAcquiredBatchRecords(inFlightBatch, 
recordBatch.baseOffset(), recordBatch.lastOffset());
+                    continue;
+                }
+                // The in-flight batch is a full match hence change the state 
of the complete batch.
+                archiveCompleteAcquiredBatch(inFlightBatch);
+            }
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    private void archivePerOffsetAcquiredBatchRecords(InFlightBatch 
inFlightBatch, long startOffsetToArchive, long endOffsetToArchive) {
+        lock.writeLock().lock();
+        try {
+            log.trace("Archiving offset tracked batch: {} for the share 
partition: {}-{} since it was a part of aborted transaction", inFlightBatch, 
groupId, topicIdPartition);
+            for (Map.Entry<Long, InFlightState> offsetState : 
inFlightBatch.offsetState().entrySet()) {
+                if (offsetState.getKey() < startOffsetToArchive) {
+                    continue;
+                }
+                if (offsetState.getKey() > endOffsetToArchive) {
+                    // No further offsets to process.
+                    break;
+                }
+                if (offsetState.getValue().state != RecordState.ACQUIRED) {
+                    continue;
+                }
+                offsetState.getValue().archive(EMPTY_MEMBER_ID);
+                
offsetState.getValue().cancelAndClearAcquisitionLockTimeoutTask();
+            }
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    private void archiveCompleteAcquiredBatch(InFlightBatch inFlightBatch) {
+        lock.writeLock().lock();
+        try {
+            log.trace("Archiving complete batch: {} for the share partition: 
{}-{} since it was a part of aborted transaction", inFlightBatch, groupId, 
topicIdPartition);
+            if (inFlightBatch.batchState() == RecordState.ACQUIRED) {
+                // Change the state of complete batch since the same state 
exists for the entire inFlight batch.
+                inFlightBatch.archiveBatch(EMPTY_MEMBER_ID);
+                
inFlightBatch.batchState.cancelAndClearAcquisitionLockTimeoutTask();
+            }
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    private NavigableMap<Long, InFlightBatch> fetchSubMap(RecordBatch 
recordBatch) {
+        lock.writeLock().lock();
+        try {
+            Map.Entry<Long, InFlightBatch> floorOffset = 
cachedState.floorEntry(recordBatch.baseOffset());
+            if (floorOffset == null) {
+                log.debug("Fetched batch record {} not found for share 
partition: {}-{}", recordBatch, groupId,
+                    topicIdPartition);
+                throw new InvalidRecordStateException(
+                    "Batch record not found. The request batch offsets are not 
found in the cache.");
+            }
+            return cachedState.subMap(floorOffset.getKey(), true, 
recordBatch.lastOffset(), true);
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    // Visible for testing.
+    List<RecordBatch> fetchAbortedTransactionRecordBatches(
+        Iterable<? extends RecordBatch> batches,
+        List<FetchResponseData.AbortedTransaction> abortedTransactions
+    ) {
+        lock.writeLock().lock();
+        try {
+            PriorityQueue<FetchResponseData.AbortedTransaction> 
abortedTransactionsHeap = abortedTransactionsHeap(abortedTransactions);
+            Set<Long> abortedProducerIds = new HashSet<>();
+            List<RecordBatch> recordsToArchive = new ArrayList<>();
+
+            for (RecordBatch currentBatch : batches) {
+                if (currentBatch.hasProducerId()) {
+                    // remove from the aborted transactions queue, all aborted 
transactions which have begun
+                    // before the current batch's last offset and add the 
associated producerIds to the
+                    // aborted producer set
+                    if (abortedTransactionsHeap != null) {
+                        while (!abortedTransactionsHeap.isEmpty() && 
abortedTransactionsHeap.peek().firstOffset() <= currentBatch.lastOffset()) {
+                            FetchResponseData.AbortedTransaction 
abortedTransaction = abortedTransactionsHeap.poll();
+                            
abortedProducerIds.add(abortedTransaction.producerId());
+                        }
+                    }
+                    long producerId = currentBatch.producerId();
+                    if (containsAbortMarker(currentBatch)) {
+                        abortedProducerIds.remove(producerId);
+                    } else if (isBatchAborted(currentBatch, 
abortedProducerIds)) {
+                        log.debug("Skipping aborted record batch for share 
partition: {}-{} with producerId {} and " +
+                            "offsets {} to {}", groupId, topicIdPartition, 
producerId, currentBatch.baseOffset(), currentBatch.lastOffset());
+                        recordsToArchive.add(currentBatch);
+                    }
+                }
+            }
+            return recordsToArchive;
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    private PriorityQueue<FetchResponseData.AbortedTransaction> 
abortedTransactionsHeap(List<FetchResponseData.AbortedTransaction> 
abortedTransactions) {

Review Comment:
   The name `abortedTransactionsHeap` seems very clumsy to me. What you are 
doing is ensuring that the returned list is ordered by increasing first offset. 
So, "sorted" or "ordered" might be useful in the name.



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -683,6 +692,7 @@ public long nextFetchOffset() {
      *                           if the records are already part of the same 
fetch batch.
      * @param fetchOffset        The fetch offset for which the records are 
fetched.
      * @param fetchPartitionData The fetched records for the share partition.
+     * @param isolationType      The isolation level for the share fetch 
request.

Review Comment:
   nit: It's `isolationLevel` not `isolationType` I think. Even the comment 
agrees with me ;0)



##########
core/src/main/java/kafka/server/share/SharePartitionManager.java:
##########
@@ -267,8 +268,20 @@ public CompletableFuture<Map<TopicIdPartition, 
PartitionData>> fetchMessages(
             .rotate(topicIdPartitions, new 
PartitionRotateMetadata(sessionEpoch));
 
         CompletableFuture<Map<TopicIdPartition, PartitionData>> future = new 
CompletableFuture<>();
-        processShareFetch(new ShareFetch(fetchParams, groupId, memberId, 
future, rotatedTopicIdPartitions, batchSize, maxFetchRecords, 
brokerTopicStats));
-
+        if (groupConfigManager.groupConfig(groupId).isEmpty()) {
+            processShareFetch(new ShareFetch(fetchParams, groupId, memberId, 
future, rotatedTopicIdPartitions, batchSize, maxFetchRecords, 
brokerTopicStats));
+        } else {
+            FetchParams updatedFetchParams = new FetchParams(
+                fetchParams.replicaId,
+                fetchParams.replicaEpoch,
+                fetchParams.maxWaitMs,
+                fetchParams.minBytes,
+                fetchParams.maxBytes,
+                FetchIsolation.of(-1, 
groupConfigManager.groupConfig(groupId).get().shareIsolationLevel()),

Review Comment:
   This `-1` can be `FetchRequest.CONSUMER_REPLICA_ID`.



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -338,6 +338,7 @@ class KafkaApisTest extends Logging {
     cgConfigs.put(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, 
GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT.toString)
     cgConfigs.put(STREAMS_SESSION_TIMEOUT_MS_CONFIG, 
GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_DEFAULT.toString)
     cgConfigs.put(STREAMS_NUM_STANDBY_REPLICAS_CONFIG, 
GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_DEFAULT.toString)
+    cgConfigs.put(SHARE_ISOLATION_LEVEL_CONFIG, 
GroupConfig.SHARE_ISOLATION_LEVEL_DEFAULT)

Review Comment:
   nit: Please move to the end of the other share group configs.



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -2484,6 +2497,243 @@ private long startOffsetDuringInitialization(long 
partitionDataStartOffset) thro
         }
     }
 
+    private ShareAcquiredRecords 
maybeFilterAbortedTransactionalAcquiredRecords(FetchPartitionData 
fetchPartitionData, FetchIsolation isolationLevel, ShareAcquiredRecords 
shareAcquiredRecords) {

Review Comment:
   nit: This source file tends to put the parameter list over multiple lines 
when it gets this long.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java:
##########
@@ -212,6 +237,7 @@ public void testFromPropsWithDefaultValue() {
         defaultValue.put(GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, 
"10");
         defaultValue.put(GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG, 
"2000");
         defaultValue.put(GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG, "1");
+        defaultValue.put(GroupConfig.SHARE_ISOLATION_LEVEL_CONFIG, 
"read_uncommitted");

Review Comment:
   nit: Move to the end of the other share group configs.



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -2484,6 +2497,243 @@ private long startOffsetDuringInitialization(long 
partitionDataStartOffset) thro
         }
     }
 
+    private ShareAcquiredRecords 
maybeFilterAbortedTransactionalAcquiredRecords(FetchPartitionData 
fetchPartitionData, FetchIsolation isolationLevel, ShareAcquiredRecords 
shareAcquiredRecords) {
+        if (isolationLevel != FetchIsolation.TXN_COMMITTED || 
fetchPartitionData.abortedTransactions.isEmpty())
+            return shareAcquiredRecords;
+        // When FetchIsolation.TXN_COMMITTED is used as isolation type by the 
share group, we need to filter any
+        // transactions that were aborted/did not commit due to timeout.
+        List<AcquiredRecords> result = 
filterAbortedTransactionalAcquiredRecords(fetchPartitionData.records.batches(), 
shareAcquiredRecords.acquiredRecords(), 
fetchPartitionData.abortedTransactions.get());
+        int acquiredCount = 0;
+        for (AcquiredRecords records : result) {
+            acquiredCount += (int) (records.lastOffset() - 
records.firstOffset() + 1);
+        }
+        return new ShareAcquiredRecords(result, acquiredCount);
+    }
+
+    private List<AcquiredRecords> filterAbortedTransactionalAcquiredRecords(
+        Iterable<? extends RecordBatch> batches,
+        List<AcquiredRecords> acquiredRecords,
+        List<FetchResponseData.AbortedTransaction> abortedTransactions
+    ) {
+        lock.writeLock().lock();
+        try {
+            // The record batches that need to be archived in cachedState 
because they were a part of aborted transactions.
+            List<RecordBatch> recordsToArchive = 
fetchAbortedTransactionRecordBatches(batches, abortedTransactions);
+            for (RecordBatch recordBatch : recordsToArchive) {
+                // Archive the offsets/batches in the cached state.
+                NavigableMap<Long, InFlightBatch> subMap = 
fetchSubMap(recordBatch);
+                archiveAcquiredBatchRecords(subMap, recordBatch);
+            }
+            return filterRecordBatchesFromAcquiredRecords(acquiredRecords, 
recordsToArchive);
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    // Visible for testing.
+    List<AcquiredRecords> filterRecordBatchesFromAcquiredRecords(
+        List<AcquiredRecords> acquiredRecords,
+        List<RecordBatch> recordsToArchive
+    ) {
+        lock.writeLock().lock();
+        try {
+            List<AcquiredRecords> result = new ArrayList<>();
+
+            for (AcquiredRecords acquiredRecord : acquiredRecords) {
+                List<AcquiredRecords> tempAcquiredRecords = new ArrayList<>();
+                tempAcquiredRecords.add(acquiredRecord);
+                for (RecordBatch recordBatch : recordsToArchive) {
+                    List<AcquiredRecords> newAcquiredRecords = new 
ArrayList<>();
+                    for (AcquiredRecords temp : tempAcquiredRecords) {
+                        // Check if record batch overlaps with the acquired 
records.
+                        if (temp.firstOffset() <= recordBatch.lastOffset() && 
temp.lastOffset() >= recordBatch.baseOffset()) {
+                            // Split the acquired record into parts before, 
inside, and after the overlapping record batch.
+                            if (temp.firstOffset() < recordBatch.baseOffset()) 
{
+                                newAcquiredRecords.add(new AcquiredRecords()
+                                    .setFirstOffset(temp.firstOffset())
+                                    .setLastOffset(recordBatch.baseOffset() - 
1)
+                                    .setDeliveryCount((short) 1));
+                            }
+                            if (temp.lastOffset() > recordBatch.lastOffset()) {
+                                newAcquiredRecords.add(new AcquiredRecords()
+                                    .setFirstOffset(recordBatch.lastOffset() + 
1)
+                                    .setLastOffset(temp.lastOffset())
+                                    .setDeliveryCount((short) 1));
+                            }
+                        } else {
+                            newAcquiredRecords.add(temp);
+                        }
+                    }
+                    tempAcquiredRecords = newAcquiredRecords;
+                }
+                result.addAll(tempAcquiredRecords);
+            }
+            return result;
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    private void archiveAcquiredBatchRecords(NavigableMap<Long, InFlightBatch> 
subMap, RecordBatch recordBatch) {
+        lock.writeLock().lock();
+        try {
+            // The fetched batch either is exact fetch equivalent batch 
(mostly), subset
+            // or spans over multiple fetched batches. The state can vary per 
offset itself from
+            // the fetched batch in case of subset.
+            for (Map.Entry<Long, InFlightBatch> entry : subMap.entrySet()) {
+                InFlightBatch inFlightBatch = entry.getValue();
+
+                // If startOffset has moved ahead of the in-flight batch, skip 
the batch.
+                if (inFlightBatch.lastOffset() < startOffset) {
+                    log.trace("All offsets in the inflight batch {} are 
already archived: {}-{}",
+                        inFlightBatch, groupId, topicIdPartition);
+                    continue;
+                }
+
+                // Determine if the in-flight batch is a full match from the 
request batch.
+                boolean fullMatch = checkForFullMatch(inFlightBatch, 
recordBatch.baseOffset(), recordBatch.lastOffset());
+
+                // Maintain state per offset if the inflight batch is not a 
full match or the
+                // offset state is managed for this in-flight batch.
+                if (!fullMatch || inFlightBatch.offsetState() != null) {
+                    log.debug("Subset or offset tracked batch record found for 
record,"
+                            + " batch: {}, request offsets - first: {}, last: 
{} for the share partition: {}-{}",
+                        inFlightBatch, recordBatch.baseOffset(), 
recordBatch.lastOffset(), groupId, topicIdPartition);
+                    if (inFlightBatch.offsetState() == null) {
+                        // The record batch is a subset and requires per 
offset state hence initialize
+                        // the offsets state in the in-flight batch.
+                        inFlightBatch.maybeInitializeOffsetStateUpdate();
+                    }
+                    archivePerOffsetAcquiredBatchRecords(inFlightBatch, 
recordBatch.baseOffset(), recordBatch.lastOffset());
+                    continue;
+                }
+                // The in-flight batch is a full match hence change the state 
of the complete batch.
+                archiveCompleteAcquiredBatch(inFlightBatch);
+            }
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    private void archivePerOffsetAcquiredBatchRecords(InFlightBatch 
inFlightBatch, long startOffsetToArchive, long endOffsetToArchive) {
+        lock.writeLock().lock();
+        try {
+            log.trace("Archiving offset tracked batch: {} for the share 
partition: {}-{} since it was a part of aborted transaction", inFlightBatch, 
groupId, topicIdPartition);
+            for (Map.Entry<Long, InFlightState> offsetState : 
inFlightBatch.offsetState().entrySet()) {
+                if (offsetState.getKey() < startOffsetToArchive) {
+                    continue;
+                }
+                if (offsetState.getKey() > endOffsetToArchive) {
+                    // No further offsets to process.
+                    break;
+                }
+                if (offsetState.getValue().state != RecordState.ACQUIRED) {
+                    continue;
+                }
+                offsetState.getValue().archive(EMPTY_MEMBER_ID);
+                
offsetState.getValue().cancelAndClearAcquisitionLockTimeoutTask();
+            }
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    private void archiveCompleteAcquiredBatch(InFlightBatch inFlightBatch) {
+        lock.writeLock().lock();
+        try {
+            log.trace("Archiving complete batch: {} for the share partition: 
{}-{} since it was a part of aborted transaction", inFlightBatch, groupId, 
topicIdPartition);
+            if (inFlightBatch.batchState() == RecordState.ACQUIRED) {
+                // Change the state of complete batch since the same state 
exists for the entire inFlight batch.
+                inFlightBatch.archiveBatch(EMPTY_MEMBER_ID);
+                
inFlightBatch.batchState.cancelAndClearAcquisitionLockTimeoutTask();
+            }
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    private NavigableMap<Long, InFlightBatch> fetchSubMap(RecordBatch 
recordBatch) {
+        lock.writeLock().lock();
+        try {
+            Map.Entry<Long, InFlightBatch> floorOffset = 
cachedState.floorEntry(recordBatch.baseOffset());
+            if (floorOffset == null) {
+                log.debug("Fetched batch record {} not found for share 
partition: {}-{}", recordBatch, groupId,
+                    topicIdPartition);
+                throw new InvalidRecordStateException(
+                    "Batch record not found. The request batch offsets are not 
found in the cache.");
+            }
+            return cachedState.subMap(floorOffset.getKey(), true, 
recordBatch.lastOffset(), true);
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    // Visible for testing.
+    List<RecordBatch> fetchAbortedTransactionRecordBatches(
+        Iterable<? extends RecordBatch> batches,
+        List<FetchResponseData.AbortedTransaction> abortedTransactions
+    ) {
+        lock.writeLock().lock();
+        try {
+            PriorityQueue<FetchResponseData.AbortedTransaction> 
abortedTransactionsHeap = abortedTransactionsHeap(abortedTransactions);
+            Set<Long> abortedProducerIds = new HashSet<>();
+            List<RecordBatch> recordsToArchive = new ArrayList<>();
+
+            for (RecordBatch currentBatch : batches) {
+                if (currentBatch.hasProducerId()) {
+                    // remove from the aborted transactions queue, all aborted 
transactions which have begun
+                    // before the current batch's last offset and add the 
associated producerIds to the
+                    // aborted producer set
+                    if (abortedTransactionsHeap != null) {
+                        while (!abortedTransactionsHeap.isEmpty() && 
abortedTransactionsHeap.peek().firstOffset() <= currentBatch.lastOffset()) {
+                            FetchResponseData.AbortedTransaction 
abortedTransaction = abortedTransactionsHeap.poll();
+                            
abortedProducerIds.add(abortedTransaction.producerId());
+                        }
+                    }
+                    long producerId = currentBatch.producerId();
+                    if (containsAbortMarker(currentBatch)) {
+                        abortedProducerIds.remove(producerId);
+                    } else if (isBatchAborted(currentBatch, 
abortedProducerIds)) {
+                        log.debug("Skipping aborted record batch for share 
partition: {}-{} with producerId {} and " +
+                            "offsets {} to {}", groupId, topicIdPartition, 
producerId, currentBatch.baseOffset(), currentBatch.lastOffset());
+                        recordsToArchive.add(currentBatch);
+                    }
+                }
+            }
+            return recordsToArchive;
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    private PriorityQueue<FetchResponseData.AbortedTransaction> 
abortedTransactionsHeap(List<FetchResponseData.AbortedTransaction> 
abortedTransactions) {
+        if (abortedTransactions == null || abortedTransactions.isEmpty())

Review Comment:
   I think we know that neither of these conditions is true. I'd prefer to take 
advantage of that knowledge so that the caller does not need to handle the null 
return here with its own conditional logic.



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -2484,6 +2497,259 @@ private long startOffsetDuringInitialization(long 
partitionDataStartOffset) thro
         }
     }
 
+    private ShareAcquiredRecords 
filterAbortedTransactionalAcquiredRecords(FetchPartitionData 
fetchPartitionData, FetchIsolation isolationLevel, ShareAcquiredRecords 
shareAcquiredRecords) {
+        if (isolationLevel != FetchIsolation.TXN_COMMITTED)
+            return shareAcquiredRecords;
+        // When FetchIsolation.TXN_COMMITTED is used as isolation type by the 
share group, we need to filter any
+        // transactions that were aborted/did not commit due to timeout.
+        List<AcquiredRecords> result = 
filterAbortedTransactionalRecords(fetchPartitionData.records.batches(), 
shareAcquiredRecords.acquiredRecords(), fetchPartitionData.abortedTransactions);
+        int acquiredCount = 0;
+        for (AcquiredRecords records : result) {
+            acquiredCount += (int) (records.lastOffset() - 
records.firstOffset() + 1);
+        }
+        return new ShareAcquiredRecords(result, acquiredCount);
+    }
+
+    private List<AcquiredRecords> filterAbortedTransactionalRecords(
+        Iterable<? extends RecordBatch> batches,
+        List<AcquiredRecords> acquiredRecords,
+        Optional<List<FetchResponseData.AbortedTransaction>> 
abortedTransactions
+    ) {
+        lock.writeLock().lock();
+        try {
+            if (abortedTransactions.isEmpty())
+                return acquiredRecords;
+            // The record batches that need to be archived in cachedState 
because they were a part of aborted transactions.
+            List<RecordBatch> recordsToArchive = 
fetchAbortedTransactionRecordBatches(batches, abortedTransactions);
+            for (RecordBatch recordBatch : recordsToArchive) {
+                // Archive the offsets/batches in the cached state.
+                NavigableMap<Long, InFlightBatch> subMap = 
fetchSubMap(recordBatch);
+                archiveAcquiredBatchRecords(subMap, recordBatch);
+            }
+            return filterRecordBatchesFromAcquiredRecords(acquiredRecords, 
recordsToArchive);
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    // Visible for testing.
+    List<AcquiredRecords> filterRecordBatchesFromAcquiredRecords(
+        List<AcquiredRecords> acquiredRecords,
+        List<RecordBatch> recordsToArchive
+    ) {
+        lock.writeLock().lock();
+        try {
+            List<AcquiredRecords> result = new ArrayList<>();
+
+            for (AcquiredRecords acquiredRecord : acquiredRecords) {
+                List<AcquiredRecords> tempAcquiredRecords = new ArrayList<>();
+                tempAcquiredRecords.add(acquiredRecord);
+                for (RecordBatch recordBatch : recordsToArchive) {
+                    List<AcquiredRecords> newAcquiredRecords = new 
ArrayList<>();
+                    for (AcquiredRecords temp : tempAcquiredRecords) {
+                        // Check if record batch overlaps with the acquired 
records.
+                        if (temp.firstOffset() <= recordBatch.lastOffset() && 
temp.lastOffset() >= recordBatch.baseOffset()) {
+                            // Split the acquired record into parts before, 
inside, and after the overlapping record batch.
+                            if (temp.firstOffset() < recordBatch.baseOffset()) 
{
+                                newAcquiredRecords.add(new AcquiredRecords()
+                                    .setFirstOffset(temp.firstOffset())
+                                    .setLastOffset(recordBatch.baseOffset() - 
1)
+                                    .setDeliveryCount((short) 1));
+                            }
+                            if (temp.lastOffset() > recordBatch.lastOffset()) {
+                                newAcquiredRecords.add(new AcquiredRecords()
+                                    .setFirstOffset(recordBatch.lastOffset() + 
1)
+                                    .setLastOffset(temp.lastOffset())
+                                    .setDeliveryCount((short) 1));
+                            }
+                        } else {
+                            newAcquiredRecords.add(temp);
+                        }
+                    }
+                    tempAcquiredRecords = newAcquiredRecords;
+                }
+                result.addAll(tempAcquiredRecords);
+            }
+            return result;
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    private void archiveAcquiredBatchRecords(NavigableMap<Long, InFlightBatch> 
subMap, RecordBatch recordBatch) {
+        lock.writeLock().lock();
+        try {
+            // The fetched batch either is exact fetch equivalent batch 
(mostly), subset
+            // or spans over multiple fetched batches. The state can vary per 
offset itself from
+            // the fetched batch in case of subset.
+            for (Map.Entry<Long, InFlightBatch> entry : subMap.entrySet()) {
+                InFlightBatch inFlightBatch = entry.getValue();
+
+                // If startOffset has moved ahead of the in-flight batch, skip 
the batch.
+                if (inFlightBatch.lastOffset() < startOffset) {
+                    log.trace("All offsets in the inflight batch {} are 
already archived: {}-{}",
+                        inFlightBatch, groupId, topicIdPartition);
+                    continue;
+                }
+
+                // Determine if the in-flight batch is a full match from the 
request batch.
+                boolean fullMatch = checkForFullMatch(inFlightBatch, 
recordBatch.baseOffset(), recordBatch.lastOffset());
+
+                // Maintain state per offset if the inflight batch is not a 
full match or the
+                // offset state is managed for this in-flight batch.
+                if (!fullMatch || inFlightBatch.offsetState() != null) {
+                    log.debug("Subset or offset tracked batch record found for 
record,"
+                            + " batch: {}, request offsets - first: {}, last: 
{} for the share partition: {}-{}",
+                        inFlightBatch, recordBatch.baseOffset(), 
recordBatch.lastOffset(), groupId, topicIdPartition);
+                    if (inFlightBatch.offsetState() == null) {
+                        // The record batch is a subset and requires per 
offset state hence initialize
+                        // the offsets state in the in-flight batch.
+                        inFlightBatch.maybeInitializeOffsetStateUpdate();
+                    }
+                    archivePerOffsetAcquiredBatchRecords(inFlightBatch, 
recordBatch.baseOffset(), recordBatch.lastOffset());
+                    continue;
+                }
+                // The in-flight batch is a full match hence change the state 
of the complete batch.
+                archiveCompleteAcquiredBatch(inFlightBatch);
+            }
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    private void archivePerOffsetAcquiredBatchRecords(InFlightBatch 
inFlightBatch, long startOffsetToArchive, long endOffsetToArchive) {
+        lock.writeLock().lock();
+        try {
+            log.trace("Archiving offset tracked batch: {} for the share 
partition: {}-{} since it was a part of aborted transaction", inFlightBatch, 
groupId, topicIdPartition);
+            for (Map.Entry<Long, InFlightState> offsetState : 
inFlightBatch.offsetState().entrySet()) {
+                if (offsetState.getKey() < startOffsetToArchive) {
+                    continue;
+                }
+                if (offsetState.getKey() > endOffsetToArchive) {
+                    // No further offsets to process.
+                    break;
+                }
+                if (offsetState.getValue().state != RecordState.ACQUIRED) {
+                    continue;
+                }
+                offsetState.getValue().archive(EMPTY_MEMBER_ID);
+                
offsetState.getValue().cancelAndClearAcquisitionLockTimeoutTask();
+            }
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    private void archiveCompleteAcquiredBatch(InFlightBatch inFlightBatch) {
+        lock.writeLock().lock();
+        try {
+            log.trace("Archiving complete batch: {} for the share partition: 
{}-{} since it was a part of aborted transaction", inFlightBatch, groupId, 
topicIdPartition);
+            if (inFlightBatch.batchState() == RecordState.ACQUIRED) {
+                // Change the state of complete batch since the same state 
exists for the entire inFlight batch.
+                inFlightBatch.archiveBatch(EMPTY_MEMBER_ID);
+                
inFlightBatch.batchState.cancelAndClearAcquisitionLockTimeoutTask();
+            }
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    private NavigableMap<Long, InFlightBatch> fetchSubMap(RecordBatch 
recordBatch) {
+        lock.writeLock().lock();
+        try {
+            Map.Entry<Long, InFlightBatch> floorOffset = 
cachedState.floorEntry(recordBatch.baseOffset());
+            if (floorOffset == null) {
+                log.debug("Fetched batch record {} not found for share 
partition: {}-{}", recordBatch, groupId,
+                    topicIdPartition);
+                throw new InvalidRecordStateException(
+                    "Batch record not found. The request batch offsets are not 
found in the cache.");
+            }
+            return cachedState.subMap(floorOffset.getKey(), true, 
recordBatch.lastOffset(), true);
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    // Visible for testing.
+    List<RecordBatch> fetchAbortedTransactionRecordBatches(
+        Iterable<? extends RecordBatch> batches,
+        Optional<List<FetchResponseData.AbortedTransaction>> 
abortedTransactions
+    ) {
+        lock.writeLock().lock();
+        try {
+            PriorityQueue<FetchResponseData.AbortedTransaction> 
abortedTransactionsHeap = abortedTransactionsHeap(abortedTransactions.get());
+            Set<Long> abortedProducerIds = new HashSet<>();
+            List<RecordBatch> recordsToArchive = new ArrayList<>();
+
+            for (RecordBatch currentBatch : batches) {
+                if (currentBatch.hasProducerId()) {
+                    // remove from the aborted transactions queue, all aborted 
transactions which have begun
+                    // before the current batch's last offset and add the 
associated producerIds to the
+                    // aborted producer set
+                    if (abortedTransactionsHeap != null) {
+                        while (!abortedTransactionsHeap.isEmpty() && 
abortedTransactionsHeap.peek().firstOffset() <= currentBatch.lastOffset()) {
+                            FetchResponseData.AbortedTransaction 
abortedTransaction = abortedTransactionsHeap.poll();
+                            
abortedProducerIds.add(abortedTransaction.producerId());
+                        }
+                    }
+                    long producerId = currentBatch.producerId();
+                    if (containsAbortMarker(currentBatch)) {
+                        abortedProducerIds.remove(producerId);
+                    } else if (isBatchAborted(currentBatch, 
abortedProducerIds)) {
+                        log.debug("Skipping aborted record batch for share 
partition: {}-{} with producerId {} and " +
+                            "offsets {} to {}", groupId, topicIdPartition, 
producerId, currentBatch.baseOffset(), currentBatch.lastOffset());
+                        recordsToArchive.add(currentBatch);
+                    }
+                }
+            }
+            return recordsToArchive;
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    private PriorityQueue<FetchResponseData.AbortedTransaction> 
abortedTransactionsHeap(List<FetchResponseData.AbortedTransaction> 
abortedTransactions) {

Review Comment:
   This code essentially mirrors the way that aborted transactions are handled 
in `CompletedFetch.abortedTransactions`. If you look in detail at the way the 
information comes out of the replica manager, I think it's a good thing that 
the code being added to `SharePartition` is clearly doing the same thing. It 
would be easy to mess it up in odd cases by re-inventing the wheel.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to