squah-confluent commented on code in PR #19497: URL: https://github.com/apache/kafka/pull/19497#discussion_r2068324511
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ########## @@ -192,10 +193,165 @@ public OffsetMetadataManager build() { */ private final TimelineHashMap<Long, Offsets> pendingTransactionalOffsets; + private final OpenTransactions openTransactions; + /** - * The open transactions (producer ids) keyed by group. + * Tracks open transactions (producer ids) by group id, topic name and partition id. + * It is the responsiblity of the caller to update {@link #pendingTransactionalOffsets}. */ - private final TimelineHashMap<String, TimelineHashSet<Long>> openTransactionsByGroup; + private class OpenTransactions { + /** + * The open transactions (producer ids) keyed by group id, topic name and partition id. + * Tracks whether partitions have any pending transactional offsets that have not been deleted. + * + * Values in each level of the map will never be empty collections. + */ + private final TimelineHashMap<String, TimelineHashMap<String, TimelineHashMap<Integer, TimelineHashSet<Long>>>> openTransactionsByGroup; + + private OpenTransactions() { + this.openTransactionsByGroup = new TimelineHashMap<>(snapshotRegistry, 0); + } + + /** + * Adds a producer id to the open transactions for the given group and topic partition. + * + * @param groupId The group id. + * @param topic The topic name. + * @param partition The partition. + * @param producerId The producer id. + * @return {@code true} if the partition did not already have a pending offset from the producer id. + */ + private boolean add(String groupId, String topic, int partition, long producerId) { + return openTransactionsByGroup + .computeIfAbsent(groupId, __ -> new TimelineHashMap<>(snapshotRegistry, 1)) + .computeIfAbsent(topic, __ -> new TimelineHashMap<>(snapshotRegistry, 1)) + .computeIfAbsent(partition, __ -> new TimelineHashSet<>(snapshotRegistry, 1)) + .add(producerId); + } + + /** + * Clears all open transactions for the given group and topic partition. + * + * @param groupId The group id. + * @param topic The topic name. + * @param partition The partition. + */ + private void clear(String groupId, String topic, int partition) { + TimelineHashMap<String, TimelineHashMap<Integer, TimelineHashSet<Long>>> openTransactionsByTopic = + openTransactionsByGroup.get(groupId); + if (openTransactionsByTopic == null) return; + + TimelineHashMap<Integer, TimelineHashSet<Long>> openTransactionsByPartition = openTransactionsByTopic.get(topic); + if (openTransactionsByPartition == null) return; + + openTransactionsByPartition.remove(partition); + + if (openTransactionsByPartition.isEmpty()) { + openTransactionsByTopic.remove(topic); + if (openTransactionsByTopic.isEmpty()) { + openTransactionsByGroup.remove(groupId); + } + } + } + + /** + * Returns {@code true} if the given group has any pending transactional offsets. + * + * @param groupId The group id. + * @return {@code true} if the given group has any pending transactional offsets. + */ + private boolean contains(String groupId) { + return openTransactionsByGroup.containsKey(groupId); + } + + /** + * Returns {@code true} if the given group has any pending transactional offsets for the given topic and partition. + * + * @param groupId The group id. + * @param topic The topic name. + * @param partition The partition. + * @return {@code true} if the given group has any pending transactional offsets for the given topic and partition. + */ + private boolean contains(String groupId, String topic, int partition) { + TimelineHashSet<Long> openTransactions = get(groupId, topic, partition); + return openTransactions != null; + } Review Comment: I checked and `containsKey` does basically the same operation as a `get`. https://github.com/apache/kafka/blob/08f6042f7a24753fc96389c6cf4c6042999bbd39/server-common/src/main/java/org/apache/kafka/timeline/TimelineHashMap.java#L120-L126 https://github.com/apache/kafka/blob/08f6042f7a24753fc96389c6cf4c6042999bbd39/server-common/src/main/java/org/apache/kafka/timeline/TimelineHashMap.java#L139-L150 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org