shaan150 commented on code in PR #19497: URL: https://github.com/apache/kafka/pull/19497#discussion_r2047739805
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ########## @@ -650,24 +658,18 @@ public int deleteAllOffsets( // Delete all the pending transactional offsets too. Here we only write a tombstone // if the topic-partition was not in the main storage because we don't need to write // two consecutive tombstones. - TimelineHashSet<Long> openTransactions = openTransactionsByGroup.get(groupId); - if (openTransactions != null) { - openTransactions.forEach(producerId -> { - Offsets pendingOffsets = pendingTransactionalOffsets.get(producerId); - if (pendingOffsets != null) { - TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>> pendingGroupOffsets = - pendingOffsets.offsetsByGroup.get(groupId); - if (pendingGroupOffsets != null) { - pendingGroupOffsets.forEach((topic, offsetsByPartition) -> { - offsetsByPartition.keySet().forEach(partition -> { - if (!hasCommittedOffset(groupId, topic, partition)) { - records.add(GroupCoordinatorRecordHelpers.newOffsetCommitTombstoneRecord(groupId, topic, partition)); - numDeletedOffsets.getAndIncrement(); - } - }); - }); - } - } + TimelineHashMap<String, TimelineHashMap<Integer, TimelineHashSet<Long>>> openTransactionsByTopic = + openTransactionsByGroupTopicAndPartition.get(groupId); + if (openTransactionsByTopic != null) { Review Comment: There are three nested for loops, this can be avoided. Something like the following might help (was done for quickness) ```java var openTransactionsByTopic = openTransactionsByGroupTopicAndPartition.get(groupId); if (openTransactionsByTopic == null) return; for (var topicEntry : openTransactionsByTopic.entrySet()) { String topic = topicEntry.getKey(); var openTransactionsByPartition = topicEntry.getValue(); for (var partitionEntry : openTransactionsByPartition.entrySet()) { int partition = partitionEntry.getKey(); // Single check per partition if (!hasCommittedOffset(groupId, topic, partition)) { records.add(GroupCoordinatorRecordHelpers.newOffsetCommitTombstoneRecord(groupId, topic, partition)); numDeletedOffsets.getAndIncrement(); } } } ``` ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ########## @@ -1005,21 +1005,41 @@ public void replay( openTransactionsByGroup .computeIfAbsent(groupId, __ -> new TimelineHashSet<>(snapshotRegistry, 1)) .add(producerId); + openTransactionsByGroupTopicAndPartition + .computeIfAbsent(groupId, __ -> new TimelineHashMap<>(snapshotRegistry, 1)) + .computeIfAbsent(topic, __ -> new TimelineHashMap<>(snapshotRegistry, 1)) + .computeIfAbsent(partition, __ -> new TimelineHashSet<>(snapshotRegistry, 1)) + .add(producerId); } } else { if (offsets.remove(groupId, topic, partition) != null) { metrics.decrementNumOffsets(); } // Remove all the pending offset commits related to the tombstone. - TimelineHashSet<Long> openTransactions = openTransactionsByGroup.get(groupId); - if (openTransactions != null) { - openTransactions.forEach(openProducerId -> { - Offsets pendingOffsets = pendingTransactionalOffsets.get(openProducerId); - if (pendingOffsets != null) { - pendingOffsets.remove(groupId, topic, partition); + TimelineHashMap<String, TimelineHashMap<Integer, TimelineHashSet<Long>>> openTransactionsByTopic = + openTransactionsByGroupTopicAndPartition.get(groupId); + if (openTransactionsByTopic != null) { + TimelineHashMap<Integer, TimelineHashSet<Long>> openTransactionsByPartition = openTransactionsByTopic.get(topic); + if (openTransactionsByPartition != null) { + TimelineHashSet<Long> openTransactions = openTransactionsByPartition.get(partition); + if (openTransactions != null) { + openTransactions.forEach(openProducerId -> { + Offsets pendingOffsets = pendingTransactionalOffsets.get(openProducerId); + if (pendingOffsets != null) { + pendingOffsets.remove(groupId, topic, partition); + } + }); + + openTransactionsByPartition.remove(partition); + if (openTransactionsByPartition.isEmpty()) { + openTransactionsByTopic.remove(topic); + } + if (openTransactionsByTopic.isEmpty()) { + openTransactionsByGroupTopicAndPartition.remove(groupId); + } } - }); + } } Review Comment: This whole area is pretty badly structured, and maintainability will prove difficult, potentially fragile. I would split this out into methods, and tidy some of it up. This is an example on how i'd put it, this code isn't tested baring mind and is more of a boiler plate example: ```java public void clearOpenTransactions(final String groupId, final String topic, inal int partition) { final TimelineHashMap<Integer, TimelineHashSet<Long>> partitionMap = getPartitionMap(groupId, topic); if (partitionMap == null) return; final TimelineHashSet<Long> openProducerIds = partitionMap.get(partition); if (openProducerIds == null) return; removePendingOffsets(openProducerIds, groupId, topic, partition); partitionMap.remove(partition); cleanupIfEmpty(partitionMap, getTopicMap(groupId), topic); cleanupIfEmpty(getTopicMap(groupId), openTransactionsByGroupTopicAndPartition, groupId); } private TimelineHashMap<Integer, TimelineHashSet<Long>> getPartitionMap(final String groupId, final String topic) { final TimelineHashMap<String, TimelineHashMap<Integer, TimelineHashSet<Long>>> topicMap = openTransactionsByGroupTopicAndPartition.get(groupId); if (topicMap == null) return null; return topicMap.get(topic); } private TimelineHashMap<String, TimelineHashMap<Integer, TimelineHashSet<Long>>> getTopicMap(final String groupId) { return openTransactionsByGroupTopicAndPartition.get(groupId); } private void removePendingOffsets( final Set<Long> producerIds, final String groupId, final String topic, final int partition) { for (final Long producerId : producerIds) { final Offsets offsets = pendingTransactionalOffsets.get(producerId); if (offsets != null) { offsets.remove(groupId, topic, partition); } } } private <K, V extends Map<?, ?>> void cleanupIfEmpty(final V innerMap, final Map<K, V> outerMap, final K key) { if (innerMap != null && innerMap.isEmpty()) { outerMap.remove(key); } } ``` ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ########## @@ -194,9 +194,16 @@ public OffsetMetadataManager build() { /** * The open transactions (producer ids) keyed by group. + * Tracks whether groups have any open transactions. */ private final TimelineHashMap<String, TimelineHashSet<Long>> openTransactionsByGroup; + /** + * The open transactions (producer ids) keyed by group id, topic name and partition id. + * Tracks whether partitions have any pending transactional offsets. + */ + private final TimelineHashMap<String, TimelineHashMap<String, TimelineHashMap<Integer, TimelineHashSet<Long>>>> openTransactionsByGroupTopicAndPartition; Review Comment: I understand your reasoning behind this addition, but it does introduce significant complexity and carries a high risk of data duplication if not carefully managed. While I appreciate this may serve as a stop-gap for now, I do think it’s important that this is revisited going forward. A more maintainable long-term solution, perhaps wrapping this logic in a dedicated structure or abstraction, would help reduce coupling and make it easier to evolve the logic cleanly. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ########## @@ -1043,14 +1064,39 @@ public void replayEndTransactionMarker( return; } - pendingOffsets.offsetsByGroup.keySet().forEach(groupId -> { - TimelineHashSet<Long> openTransactions = openTransactionsByGroup.get(groupId); - if (openTransactions != null) { - openTransactions.remove(producerId); - if (openTransactions.isEmpty()) { + pendingOffsets.offsetsByGroup.forEach((groupId, topicOffsets) -> { + TimelineHashSet<Long> groupTransactions = openTransactionsByGroup.get(groupId); + if (groupTransactions != null) { + groupTransactions.remove(producerId); + if (groupTransactions.isEmpty()) { openTransactionsByGroup.remove(groupId); } } + + TimelineHashMap<String, TimelineHashMap<Integer, TimelineHashSet<Long>>> openTransactionsByTopic = + openTransactionsByGroupTopicAndPartition.get(groupId); + if (openTransactionsByTopic == null) return; + + topicOffsets.forEach((topic, partitionOffsets) -> { + TimelineHashMap<Integer, TimelineHashSet<Long>> openTransactionsByPartition = openTransactionsByTopic.get(topic); + if (openTransactionsByPartition == null) return; + + partitionOffsets.keySet().forEach(partitionId -> { + TimelineHashSet<Long> partitionTransactions = openTransactionsByPartition.get(partitionId); + if (partitionTransactions != null) { + partitionTransactions.remove(producerId); + if (partitionTransactions.isEmpty()) { + openTransactionsByPartition.remove(partitionId); + } + if (openTransactionsByPartition.isEmpty()) { + openTransactionsByTopic.remove(topic); + } + if (openTransactionsByTopic.isEmpty()) { + openTransactionsByGroupTopicAndPartition.remove(groupId); + } + } + }); + }); }); Review Comment: Previous point stands, how i'd do it would be something like the below attempt, the cleanupIfEmpty method from earlier here could be used if implemented ```java public void clearOpenTransactionsForProducer(final long producerId, final PendingOffsets pendingOffsets) { for (final Map.Entry<String, Map<String, Map<Integer, OffsetAndMetadata>>> groupEntry : pendingOffsets.offsetsByGroup.entrySet()) { final String groupId = groupEntry.getKey(); final Map<String, Map<Integer, OffsetAndMetadata>> topicOffsets = groupEntry.getValue(); removeProducerFromGroupTransactions(groupId, producerId); final TimelineHashMap<String, TimelineHashMap<Integer, TimelineHashSet<Long>>> topicMap = openTransactionsByGroupTopicAndPartition.get(groupId); if (topicMap == null) continue; processTopicOffsets(producerId, groupId, topicOffsets, topicMap); cleanupIfEmpty(topicMap, openTransactionsByGroupTopicAndPartition, groupId); } } private void removeProducerFromGroupTransactions(final String groupId, final long producerId) { final TimelineHashSet<Long> groupTransactions = openTransactionsByGroup.get(groupId); if (groupTransactions == null) return; groupTransactions.remove(producerId); if (groupTransactions.isEmpty()) { openTransactionsByGroup.remove(groupId); } } private void processTopicOffsets( final long producerId, final String groupId, final Map<String, Map<Integer, OffsetAndMetadata>> topicOffsets, final TimelineHashMap<String, TimelineHashMap<Integer, TimelineHashSet<Long>>> topicMap) { for (final Map.Entry<String, Map<Integer, OffsetAndMetadata>> topicEntry : topicOffsets.entrySet()) { final String topic = topicEntry.getKey(); final Map<Integer, OffsetAndMetadata> partitionOffsets = topicEntry.getValue(); final TimelineHashMap<Integer, TimelineHashSet<Long>> partitionMap = topicMap.get(topic); if (partitionMap == null) continue; for (final Integer partitionId : partitionOffsets.keySet()) { removeProducerFromPartitionMap(producerId, partitionId, partitionMap); } cleanupIfEmpty(partitionMap, topicMap, topic); } } private void removeProducerFromPartitionMap( final long producerId, final int partitionId, final TimelineHashMap<Integer, TimelineHashSet<Long>> partitionMap) { final TimelineHashSet<Long> partitionTransactions = partitionMap.get(partitionId); if (partitionTransactions == null) return; partitionTransactions.remove(producerId); if (partitionTransactions.isEmpty()) { partitionMap.remove(partitionId); } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org