squah-confluent commented on code in PR #19497: URL: https://github.com/apache/kafka/pull/19497#discussion_r2070405167
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ########## @@ -192,10 +193,165 @@ public OffsetMetadataManager build() { */ private final TimelineHashMap<Long, Offsets> pendingTransactionalOffsets; + private final OpenTransactions openTransactions; + /** - * The open transactions (producer ids) keyed by group. + * Tracks open transactions (producer ids) by group id, topic name and partition id. + * It is the responsiblity of the caller to update {@link #pendingTransactionalOffsets}. */ - private final TimelineHashMap<String, TimelineHashSet<Long>> openTransactionsByGroup; + private class OpenTransactions { + /** + * The open transactions (producer ids) keyed by group id, topic name and partition id. + * Tracks whether partitions have any pending transactional offsets that have not been deleted. + * + * Values in each level of the map will never be empty collections. + */ + private final TimelineHashMap<String, TimelineHashMap<String, TimelineHashMap<Integer, TimelineHashSet<Long>>>> openTransactionsByGroup; Review Comment: I tried a couple of approaches and settled on inlining `hasPendingTransactionalOffsets` into the offset fetch path. This way we don't do string comparisons of the group id and topic name for every partition and also avoid allocations from a compound key. ``` Benchmark (partitionCount) (transactionCount) Mode Cnt Score Error Units TransactionalOffsetFetchBenchmark.run 4000 4000 avgt 5 0.129 ± 0.002 ms/op ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org