squah-confluent commented on code in PR #19497:
URL: https://github.com/apache/kafka/pull/19497#discussion_r2068324511


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -192,10 +193,165 @@ public OffsetMetadataManager build() {
      */
     private final TimelineHashMap<Long, Offsets> pendingTransactionalOffsets;
 
+    private final OpenTransactions openTransactions;
+
     /**
-     * The open transactions (producer ids) keyed by group.
+     * Tracks open transactions (producer ids) by group id, topic name and 
partition id.
+     * It is the responsiblity of the caller to update {@link 
#pendingTransactionalOffsets}.
      */
-    private final TimelineHashMap<String, TimelineHashSet<Long>> 
openTransactionsByGroup;
+    private class OpenTransactions {
+        /**
+         * The open transactions (producer ids) keyed by group id, topic name 
and partition id.
+         * Tracks whether partitions have any pending transactional offsets 
that have not been deleted.
+         *
+         * Values in each level of the map will never be empty collections.
+         */
+        private final TimelineHashMap<String, TimelineHashMap<String, 
TimelineHashMap<Integer, TimelineHashSet<Long>>>> openTransactionsByGroup;
+
+        private OpenTransactions() {
+            this.openTransactionsByGroup = new 
TimelineHashMap<>(snapshotRegistry, 0);
+        }
+
+        /**
+         * Adds a producer id to the open transactions for the given group and 
topic partition.
+         *
+         * @param groupId    The group id.
+         * @param topic      The topic name.
+         * @param partition  The partition.
+         * @param producerId The producer id.
+         * @return {@code true} if the partition did not already have a 
pending offset from the producer id.
+         */
+        private boolean add(String groupId, String topic, int partition, long 
producerId) {
+            return openTransactionsByGroup
+                .computeIfAbsent(groupId, __ -> new 
TimelineHashMap<>(snapshotRegistry, 1))
+                .computeIfAbsent(topic, __ -> new 
TimelineHashMap<>(snapshotRegistry, 1))
+                .computeIfAbsent(partition, __ -> new 
TimelineHashSet<>(snapshotRegistry, 1))
+                .add(producerId);
+        }
+
+        /**
+         * Clears all open transactions for the given group and topic 
partition.
+         *
+         * @param groupId    The group id.
+         * @param topic      The topic name.
+         * @param partition  The partition.
+         */
+        private void clear(String groupId, String topic, int partition) {
+            TimelineHashMap<String, TimelineHashMap<Integer, 
TimelineHashSet<Long>>> openTransactionsByTopic =
+                openTransactionsByGroup.get(groupId);
+            if (openTransactionsByTopic == null) return;
+
+            TimelineHashMap<Integer, TimelineHashSet<Long>> 
openTransactionsByPartition = openTransactionsByTopic.get(topic);
+            if (openTransactionsByPartition == null) return;
+
+            openTransactionsByPartition.remove(partition);
+
+            if (openTransactionsByPartition.isEmpty()) {
+                openTransactionsByTopic.remove(topic);
+                if (openTransactionsByTopic.isEmpty()) {
+                    openTransactionsByGroup.remove(groupId);
+                }
+            }
+        }
+
+        /**
+         * Returns {@code true} if the given group has any pending 
transactional offsets.
+         *
+         * @param groupId The group id.
+         * @return {@code true} if the given group has any pending 
transactional offsets.
+         */
+        private boolean contains(String groupId) {
+            return openTransactionsByGroup.containsKey(groupId);
+        }
+
+        /**
+         * Returns {@code true} if the given group has any pending 
transactional offsets for the given topic and partition.
+         *
+         * @param groupId   The group id.
+         * @param topic     The topic name.
+         * @param partition The partition.
+         * @return {@code true} if the given group has any pending 
transactional offsets for the given topic and partition.
+         */
+        private boolean contains(String groupId, String topic, int partition) {
+            TimelineHashSet<Long> openTransactions = get(groupId, topic, 
partition);
+            return openTransactions != null;
+        }

Review Comment:
   I checked and `containsKey` does basically the same operation as a `get`.
   
https://github.com/apache/kafka/blob/08f6042f7a24753fc96389c6cf4c6042999bbd39/server-common/src/main/java/org/apache/kafka/timeline/TimelineHashMap.java#L120-L126
   
https://github.com/apache/kafka/blob/08f6042f7a24753fc96389c6cf4c6042999bbd39/server-common/src/main/java/org/apache/kafka/timeline/TimelineHashMap.java#L139-L150



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to