AndrewJSchofield commented on code in PR #19666:
URL: https://github.com/apache/kafka/pull/19666#discussion_r2086868597


##########
server/src/main/java/org/apache/kafka/server/share/session/ShareSessionCache.java:
##########
@@ -143,6 +191,7 @@ public synchronized ShareSessionKey maybeCreateSession(
                 
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH));
             sessions.put(session.key(), session);
             updateNumPartitions(session);
+            numMembersPerGroup.merge(session.key().groupId(), 1, Integer::sum);

Review Comment:
   Again, I'd prefer `HashMap.compute`.



##########
server/src/main/java/org/apache/kafka/server/share/session/ShareSessionCache.java:
##########
@@ -111,6 +158,7 @@ public synchronized ShareSession remove(ShareSession 
session) {
         ShareSession removeResult = sessions.remove(session.key());
         if (removeResult != null) {
             numPartitions = numPartitions - session.cachedSize();
+            numMembersPerGroup.merge(session.key().groupId(), -1, 
Integer::sum);

Review Comment:
   I would rather see this use `HashMap.compute` and remove the key if the 
group transitions to zero.



##########
server/src/main/java/org/apache/kafka/server/share/ShareGroupListener.java:
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.share;
+
+import org.apache.kafka.common.Uuid;
+
+/**
+ * The ShareGroupListener is used to notify when there is a change in the 
share group members.
+ */
+public interface ShareGroupListener {
+
+    /**
+     * Called when member leaves the group.
+     *
+     * @param groupId  The id of the group.
+     * @param memberId The id of the member.
+     */
+    void onMemberLeave(String groupId, Uuid memberId);
+
+    /**
+     * Called when the group is empty.
+     *
+     * @param groupId The id of the group.
+     */
+    void onEmpty(String groupId);

Review Comment:
   `onGroupEmpty` would be preferable I think.



##########
server/src/main/java/org/apache/kafka/server/share/session/ShareSessionCache.java:
##########
@@ -51,15 +52,31 @@ public class ShareSessionCache {
      * Metric for the rate of eviction of share sessions.
      */
     private final Meter evictionsMeter;
-
-    private final int maxEntries;
-    private long numPartitions = 0;
+    /**
+     * The listener for connection disconnect events for the client.
+     */
     private final ConnectionDisconnectListener connectionDisconnectListener;
-
-    // A map of session key to ShareSession.
+    /**
+     * Map of session key to ShareSession.
+     */
     private final Map<ShareSessionKey, ShareSession> sessions = new 
HashMap<>();
-
+    /**
+     * Map of groupId to number of members in the group.
+     */
+    private final Map<String, Integer> numMembersPerGroup = new HashMap<>();

Review Comment:
   The handling of this map has the characteristic that entries are never 
deleted, even when a group is empty for an extended period. I'm not convinced 
this is a good principle.



##########
core/src/main/java/kafka/server/share/SharePartitionCache.java:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server.share;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.server.share.SharePartitionKey;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+
+/**
+ * The SharePartitionCache is used to cache the SharePartition objects. The 
cache is thread-safe.
+ */
+public class SharePartitionCache {
+
+    /**
+     * The map to store the share group id and the set of topic-partitions for 
that group.
+     */
+    private final Map<String, Set<TopicIdPartition>> groups;
+
+    /**
+     * The map is used to store the SharePartition objects for each share 
group topic-partition.
+     */
+    private final Map<SharePartitionKey, SharePartition> partitions;
+
+    SharePartitionCache() {
+        this.groups = new HashMap<>();
+        this.partitions = new ConcurrentHashMap<>();
+    }
+
+    public SharePartition get(SharePartitionKey partitionKey) {
+        return partitions.get(partitionKey);
+    }
+
+    public synchronized SharePartition remove(SharePartitionKey partitionKey) {
+        groups.computeIfPresent(partitionKey.groupId(), (k, v) -> {
+            v.remove(partitionKey.topicIdPartition());
+            return v;
+        });
+        return partitions.remove(partitionKey);
+    }
+
+    public synchronized SharePartition computeIfAbsent(SharePartitionKey 
partitionKey, Function<SharePartitionKey, SharePartition> mappingFunction) {
+        groups.putIfAbsent(partitionKey.groupId(), new HashSet<>());

Review Comment:
   How about `groups.computeIfAbsent(partitionKey.groupId(), new 
HashSet<>()).add(partitionKey.topicIdPartition());` to avoid an unnecessary 
lookup in the synchronized block.



##########
server/src/main/java/org/apache/kafka/server/share/session/ShareSessionCache.java:
##########
@@ -101,6 +118,36 @@ public synchronized ShareSession remove(ShareSessionKey 
key) {
         return null;
     }
 
+    /**
+     * Maybe remove the session and notify listeners. This is called when the 
connection is disconnected
+     * for the client. The session may have already been removed by the client 
as part of final epoch,
+     * hence check if the session is still present in the cache.
+     *
+     * @param key The share session key.
+     */
+    public synchronized void maybeRemoveAndNotifyListeners(ShareSessionKey 
key) {

Review Comment:
   I observe that we are calling code synchronized on the share-partition cache 
when already in a synchronized method of the share-session cache.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to