shaan150 commented on code in PR #19497:
URL: https://github.com/apache/kafka/pull/19497#discussion_r2047739805


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -650,24 +658,18 @@ public int deleteAllOffsets(
         // Delete all the pending transactional offsets too. Here we only 
write a tombstone
         // if the topic-partition was not in the main storage because we don't 
need to write
         // two consecutive tombstones.
-        TimelineHashSet<Long> openTransactions = 
openTransactionsByGroup.get(groupId);
-        if (openTransactions != null) {
-            openTransactions.forEach(producerId -> {
-                Offsets pendingOffsets = 
pendingTransactionalOffsets.get(producerId);
-                if (pendingOffsets != null) {
-                    TimelineHashMap<String, TimelineHashMap<Integer, 
OffsetAndMetadata>> pendingGroupOffsets =
-                        pendingOffsets.offsetsByGroup.get(groupId);
-                    if (pendingGroupOffsets != null) {
-                        pendingGroupOffsets.forEach((topic, 
offsetsByPartition) -> {
-                            offsetsByPartition.keySet().forEach(partition -> {
-                                if (!hasCommittedOffset(groupId, topic, 
partition)) {
-                                    
records.add(GroupCoordinatorRecordHelpers.newOffsetCommitTombstoneRecord(groupId,
 topic, partition));
-                                    numDeletedOffsets.getAndIncrement();
-                                }
-                            });
-                        });
-                    }
-                }
+        TimelineHashMap<String, TimelineHashMap<Integer, 
TimelineHashSet<Long>>> openTransactionsByTopic =
+            openTransactionsByGroupTopicAndPartition.get(groupId);
+        if (openTransactionsByTopic != null) {

Review Comment:
   There are three nested for loops, this can be avoided.
   
   Something like the following might help (was done for quickness)
   
   ```java
   var openTransactionsByTopic = 
openTransactionsByGroupTopicAndPartition.get(groupId);
   if (openTransactionsByTopic == null) return;
   
   for (var topicEntry : openTransactionsByTopic.entrySet()) {
       String topic = topicEntry.getKey();
       var openTransactionsByPartition = topicEntry.getValue();
   
       for (var partitionEntry : openTransactionsByPartition.entrySet()) {
           int partition = partitionEntry.getKey();
   
           // Single check per partition
           if (!hasCommittedOffset(groupId, topic, partition)) {
               
records.add(GroupCoordinatorRecordHelpers.newOffsetCommitTombstoneRecord(groupId,
 topic, partition));
               numDeletedOffsets.getAndIncrement();
           }
       }
   }
   ```



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -1005,21 +1005,41 @@ public void replay(
                 openTransactionsByGroup
                     .computeIfAbsent(groupId, __ -> new 
TimelineHashSet<>(snapshotRegistry, 1))
                     .add(producerId);
+                openTransactionsByGroupTopicAndPartition
+                    .computeIfAbsent(groupId, __ -> new 
TimelineHashMap<>(snapshotRegistry, 1))
+                    .computeIfAbsent(topic, __ -> new 
TimelineHashMap<>(snapshotRegistry, 1))
+                    .computeIfAbsent(partition, __ -> new 
TimelineHashSet<>(snapshotRegistry, 1))
+                    .add(producerId);
             }
         } else {
             if (offsets.remove(groupId, topic, partition) != null) {
                 metrics.decrementNumOffsets();
             }
 
             // Remove all the pending offset commits related to the tombstone.
-            TimelineHashSet<Long> openTransactions = 
openTransactionsByGroup.get(groupId);
-            if (openTransactions != null) {
-                openTransactions.forEach(openProducerId -> {
-                    Offsets pendingOffsets = 
pendingTransactionalOffsets.get(openProducerId);
-                    if (pendingOffsets != null) {
-                        pendingOffsets.remove(groupId, topic, partition);
+            TimelineHashMap<String, TimelineHashMap<Integer, 
TimelineHashSet<Long>>> openTransactionsByTopic =
+                openTransactionsByGroupTopicAndPartition.get(groupId);
+            if (openTransactionsByTopic != null) {
+                TimelineHashMap<Integer, TimelineHashSet<Long>> 
openTransactionsByPartition = openTransactionsByTopic.get(topic);
+                if (openTransactionsByPartition != null) {
+                    TimelineHashSet<Long> openTransactions = 
openTransactionsByPartition.get(partition);
+                    if (openTransactions != null) {
+                        openTransactions.forEach(openProducerId -> {
+                            Offsets pendingOffsets = 
pendingTransactionalOffsets.get(openProducerId);
+                            if (pendingOffsets != null) {
+                                pendingOffsets.remove(groupId, topic, 
partition);
+                            }
+                        });
+
+                        openTransactionsByPartition.remove(partition);
+                        if (openTransactionsByPartition.isEmpty()) {
+                            openTransactionsByTopic.remove(topic);
+                        }
+                        if (openTransactionsByTopic.isEmpty()) {
+                            
openTransactionsByGroupTopicAndPartition.remove(groupId);
+                        }
                     }
-                });
+                }
             }

Review Comment:
   This whole area is pretty badly structured, and maintainability will prove 
difficult, potentially fragile. 
   
   I would split this out into methods, and tidy some of it up. This is an 
example on how i'd put it, this code isn't tested baring mind and is more of a 
boiler plate example:
   
   ```java
       public void clearOpenTransactions(final String groupId, final String 
topic, inal int partition) {
           final TimelineHashMap<Integer, TimelineHashSet<Long>> partitionMap = 
getPartitionMap(groupId, topic);
           if (partitionMap == null) return;
       
           final TimelineHashSet<Long> openProducerIds = 
partitionMap.get(partition);
           if (openProducerIds == null) return;
       
           removePendingOffsets(openProducerIds, groupId, topic, partition);
       
           partitionMap.remove(partition);
           cleanupIfEmpty(partitionMap, getTopicMap(groupId), topic);
           cleanupIfEmpty(getTopicMap(groupId), 
openTransactionsByGroupTopicAndPartition, groupId);
       }
       
       private TimelineHashMap<Integer, TimelineHashSet<Long>> 
getPartitionMap(final String groupId, final String topic) {
           final TimelineHashMap<String, TimelineHashMap<Integer, 
TimelineHashSet<Long>>> topicMap =
               openTransactionsByGroupTopicAndPartition.get(groupId);
           if (topicMap == null) return null;
           return topicMap.get(topic);
       }
       
       private TimelineHashMap<String, TimelineHashMap<Integer, 
TimelineHashSet<Long>>> getTopicMap(final String groupId) {
           return openTransactionsByGroupTopicAndPartition.get(groupId);
       }
       
       private void removePendingOffsets(
           final Set<Long> producerIds, 
           final String groupId, 
           final String topic, 
           final int partition) {
           for (final Long producerId : producerIds) {
               final Offsets offsets = 
pendingTransactionalOffsets.get(producerId);
               if (offsets != null) {
                   offsets.remove(groupId, topic, partition);
               }
           }
       }
       
       private <K, V extends Map<?, ?>> void cleanupIfEmpty(final V innerMap, 
final Map<K, V> outerMap, final K key) {
           if (innerMap != null && innerMap.isEmpty()) {
               outerMap.remove(key);
           }
       }
   ```



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -194,9 +194,16 @@ public OffsetMetadataManager build() {
 
     /**
      * The open transactions (producer ids) keyed by group.
+     * Tracks whether groups have any open transactions.
      */
     private final TimelineHashMap<String, TimelineHashSet<Long>> 
openTransactionsByGroup;
 
+    /**
+     * The open transactions (producer ids) keyed by group id, topic name and 
partition id.
+     * Tracks whether partitions have any pending transactional offsets.
+     */
+    private final TimelineHashMap<String, TimelineHashMap<String, 
TimelineHashMap<Integer, TimelineHashSet<Long>>>> 
openTransactionsByGroupTopicAndPartition;

Review Comment:
   I understand your reasoning behind this addition, but it does introduce 
significant complexity and carries a high risk of data duplication if not 
carefully managed. While I appreciate this may serve as a stop-gap for now, I 
do think it’s important that this is revisited going forward. A more 
maintainable long-term solution, perhaps wrapping this logic in a dedicated 
structure or abstraction, would help reduce coupling and make it easier to 
evolve the logic cleanly.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -1043,14 +1064,39 @@ public void replayEndTransactionMarker(
             return;
         }
 
-        pendingOffsets.offsetsByGroup.keySet().forEach(groupId -> {
-            TimelineHashSet<Long> openTransactions = 
openTransactionsByGroup.get(groupId);
-            if (openTransactions != null) {
-                openTransactions.remove(producerId);
-                if (openTransactions.isEmpty()) {
+        pendingOffsets.offsetsByGroup.forEach((groupId, topicOffsets) -> {
+            TimelineHashSet<Long> groupTransactions = 
openTransactionsByGroup.get(groupId);
+            if (groupTransactions != null) {
+                groupTransactions.remove(producerId);
+                if (groupTransactions.isEmpty()) {
                     openTransactionsByGroup.remove(groupId);
                 }
             }
+
+            TimelineHashMap<String, TimelineHashMap<Integer, 
TimelineHashSet<Long>>> openTransactionsByTopic =
+                openTransactionsByGroupTopicAndPartition.get(groupId);
+            if (openTransactionsByTopic == null) return;
+
+            topicOffsets.forEach((topic, partitionOffsets) -> {
+                TimelineHashMap<Integer, TimelineHashSet<Long>> 
openTransactionsByPartition = openTransactionsByTopic.get(topic);
+                if (openTransactionsByPartition == null) return;
+
+                partitionOffsets.keySet().forEach(partitionId -> {
+                    TimelineHashSet<Long> partitionTransactions = 
openTransactionsByPartition.get(partitionId);
+                    if (partitionTransactions != null) {
+                        partitionTransactions.remove(producerId);
+                        if (partitionTransactions.isEmpty()) {
+                            openTransactionsByPartition.remove(partitionId);
+                        }
+                        if (openTransactionsByPartition.isEmpty()) {
+                            openTransactionsByTopic.remove(topic);
+                        }
+                        if (openTransactionsByTopic.isEmpty()) {
+                            
openTransactionsByGroupTopicAndPartition.remove(groupId);
+                        }
+                    }
+                });
+            });
         });

Review Comment:
   Previous point stands, how i'd do it would be something like the below 
attempt, the cleanupIfEmpty method from earlier here could be used if 
implemented 
   
   ```java
   
   public void clearOpenTransactionsForProducer(final long producerId, final 
PendingOffsets pendingOffsets) {
       for (final Map.Entry<String, Map<String, Map<Integer, 
OffsetAndMetadata>>> groupEntry : pendingOffsets.offsetsByGroup.entrySet()) {
           final String groupId = groupEntry.getKey();
           final Map<String, Map<Integer, OffsetAndMetadata>> topicOffsets = 
groupEntry.getValue();
   
           removeProducerFromGroupTransactions(groupId, producerId);
   
           final TimelineHashMap<String, TimelineHashMap<Integer, 
TimelineHashSet<Long>>> topicMap =
               openTransactionsByGroupTopicAndPartition.get(groupId);
           if (topicMap == null) continue;
   
           processTopicOffsets(producerId, groupId, topicOffsets, topicMap);
           cleanupIfEmpty(topicMap, openTransactionsByGroupTopicAndPartition, 
groupId);
       }
   }
   
   private void removeProducerFromGroupTransactions(final String groupId, final 
long producerId) {
       final TimelineHashSet<Long> groupTransactions = 
openTransactionsByGroup.get(groupId);
       if (groupTransactions == null) return;
   
       groupTransactions.remove(producerId);
       if (groupTransactions.isEmpty()) {
           openTransactionsByGroup.remove(groupId);
       }
   }
   
   private void processTopicOffsets(
       final long producerId,
       final String groupId,
       final Map<String, Map<Integer, OffsetAndMetadata>> topicOffsets,
       final TimelineHashMap<String, TimelineHashMap<Integer, 
TimelineHashSet<Long>>> topicMap) {
   
       for (final Map.Entry<String, Map<Integer, OffsetAndMetadata>> topicEntry 
: topicOffsets.entrySet()) {
           final String topic = topicEntry.getKey();
           final Map<Integer, OffsetAndMetadata> partitionOffsets = 
topicEntry.getValue();
   
           final TimelineHashMap<Integer, TimelineHashSet<Long>> partitionMap = 
topicMap.get(topic);
           if (partitionMap == null) continue;
   
           for (final Integer partitionId : partitionOffsets.keySet()) {
               removeProducerFromPartitionMap(producerId, partitionId, 
partitionMap);
           }
   
           cleanupIfEmpty(partitionMap, topicMap, topic);
       }
   }
   
   private void removeProducerFromPartitionMap(
       final long producerId,
       final int partitionId,
       final TimelineHashMap<Integer, TimelineHashSet<Long>> partitionMap) {
   
       final TimelineHashSet<Long> partitionTransactions = 
partitionMap.get(partitionId);
       if (partitionTransactions == null) return;
   
       partitionTransactions.remove(producerId);
   
       if (partitionTransactions.isEmpty()) {
           partitionMap.remove(partitionId);
       }
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to