AndrewJSchofield commented on code in PR #19666: URL: https://github.com/apache/kafka/pull/19666#discussion_r2086868597
########## server/src/main/java/org/apache/kafka/server/share/session/ShareSessionCache.java: ########## @@ -143,6 +191,7 @@ public synchronized ShareSessionKey maybeCreateSession( ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)); sessions.put(session.key(), session); updateNumPartitions(session); + numMembersPerGroup.merge(session.key().groupId(), 1, Integer::sum); Review Comment: Again, I'd prefer `HashMap.compute`. ########## server/src/main/java/org/apache/kafka/server/share/session/ShareSessionCache.java: ########## @@ -111,6 +158,7 @@ public synchronized ShareSession remove(ShareSession session) { ShareSession removeResult = sessions.remove(session.key()); if (removeResult != null) { numPartitions = numPartitions - session.cachedSize(); + numMembersPerGroup.merge(session.key().groupId(), -1, Integer::sum); Review Comment: I would rather see this use `HashMap.compute` and remove the key if the group transitions to zero. ########## server/src/main/java/org/apache/kafka/server/share/ShareGroupListener.java: ########## @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.share; + +import org.apache.kafka.common.Uuid; + +/** + * The ShareGroupListener is used to notify when there is a change in the share group members. + */ +public interface ShareGroupListener { + + /** + * Called when member leaves the group. + * + * @param groupId The id of the group. + * @param memberId The id of the member. + */ + void onMemberLeave(String groupId, Uuid memberId); + + /** + * Called when the group is empty. + * + * @param groupId The id of the group. + */ + void onEmpty(String groupId); Review Comment: `onGroupEmpty` would be preferable I think. ########## server/src/main/java/org/apache/kafka/server/share/session/ShareSessionCache.java: ########## @@ -51,15 +52,31 @@ public class ShareSessionCache { * Metric for the rate of eviction of share sessions. */ private final Meter evictionsMeter; - - private final int maxEntries; - private long numPartitions = 0; + /** + * The listener for connection disconnect events for the client. + */ private final ConnectionDisconnectListener connectionDisconnectListener; - - // A map of session key to ShareSession. + /** + * Map of session key to ShareSession. + */ private final Map<ShareSessionKey, ShareSession> sessions = new HashMap<>(); - + /** + * Map of groupId to number of members in the group. + */ + private final Map<String, Integer> numMembersPerGroup = new HashMap<>(); Review Comment: The handling of this map has the characteristic that entries are never deleted, even when a group is empty for an extended period. I'm not convinced this is a good principle. ########## core/src/main/java/kafka/server/share/SharePartitionCache.java: ########## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server.share; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.server.share.SharePartitionKey; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; + +/** + * The SharePartitionCache is used to cache the SharePartition objects. The cache is thread-safe. + */ +public class SharePartitionCache { + + /** + * The map to store the share group id and the set of topic-partitions for that group. + */ + private final Map<String, Set<TopicIdPartition>> groups; + + /** + * The map is used to store the SharePartition objects for each share group topic-partition. + */ + private final Map<SharePartitionKey, SharePartition> partitions; + + SharePartitionCache() { + this.groups = new HashMap<>(); + this.partitions = new ConcurrentHashMap<>(); + } + + public SharePartition get(SharePartitionKey partitionKey) { + return partitions.get(partitionKey); + } + + public synchronized SharePartition remove(SharePartitionKey partitionKey) { + groups.computeIfPresent(partitionKey.groupId(), (k, v) -> { + v.remove(partitionKey.topicIdPartition()); + return v; + }); + return partitions.remove(partitionKey); + } + + public synchronized SharePartition computeIfAbsent(SharePartitionKey partitionKey, Function<SharePartitionKey, SharePartition> mappingFunction) { + groups.putIfAbsent(partitionKey.groupId(), new HashSet<>()); Review Comment: How about `groups.computeIfAbsent(partitionKey.groupId(), new HashSet<>()).add(partitionKey.topicIdPartition());` to avoid an unnecessary lookup in the synchronized block. ########## server/src/main/java/org/apache/kafka/server/share/session/ShareSessionCache.java: ########## @@ -101,6 +118,36 @@ public synchronized ShareSession remove(ShareSessionKey key) { return null; } + /** + * Maybe remove the session and notify listeners. This is called when the connection is disconnected + * for the client. The session may have already been removed by the client as part of final epoch, + * hence check if the session is still present in the cache. + * + * @param key The share session key. + */ + public synchronized void maybeRemoveAndNotifyListeners(ShareSessionKey key) { Review Comment: I observe that we are calling code synchronized on the share-partition cache when already in a synchronized method of the share-session cache. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org