AndrewJSchofield commented on code in PR #19659:
URL: https://github.com/apache/kafka/pull/19659#discussion_r2086341822


##########
core/src/main/java/kafka/server/share/SharePartitionManager.java:
##########
@@ -144,6 +146,11 @@ public class SharePartitionManager implements 
AutoCloseable {
      */
     private final BrokerTopicStats brokerTopicStats;
 
+    /**
+     * Flag indicating if share groups have been turned on.
+     */
+    private final AtomicBoolean isShareGroupsSupported;

Review Comment:
   This `AtomicBoolean` could equally well live inside the `ShareSessionCache`. 
You are only checking its value before calling the cache methods, and the 
encapsulation would be better if the cache itself policed when mutating 
operations were made.



##########
core/src/main/java/kafka/server/share/SharePartitionManager.java:
##########
@@ -144,6 +146,11 @@ public class SharePartitionManager implements 
AutoCloseable {
      */
     private final BrokerTopicStats brokerTopicStats;
 
+    /**
+     * Flag indicating if share groups have been turned on.
+     */
+    private final AtomicBoolean isShareGroupsSupported;

Review Comment:
   Or even `supportsShareGroups` if you prefer.



##########
core/src/main/java/kafka/server/share/SharePartitionManager.java:
##########
@@ -746,6 +766,37 @@ private Consumer<Set<String>> 
failedShareAcknowledgeMetricsHandler() {
         };
     }
 
+    /**
+     * The handler for share version feature metadata changes.
+     * @param shareVersion the new share version feature
+     */
+    public void onShareVersionToggle(ShareVersion shareVersion) {
+        if (!shareVersion.supportsShareGroups()) {
+            isShareGroupsSupported.set(false);
+            // Remove all share sessions from share session cache.
+            synchronized (cache) {
+                cache.removeAllSessions();
+            }
+            Set<SharePartitionKey> sharePartitionKeys = new 
HashSet<>(partitionCacheMap.keySet());
+            // Remove all share partitions from partition cache.
+            sharePartitionKeys.forEach(sharePartitionKey ->
+                removeSharePartitionFromCache(sharePartitionKey, 
partitionCacheMap, replicaManager)
+            );
+        } else {
+            isShareGroupsSupported.set(true);
+        }
+    }
+
+    // Visible for testing.
+    protected Map<SharePartitionKey, SharePartition> partitionCacheMap() {

Review Comment:
   It seems to me that the tests only really want to know the number of 
share-partitions in the cache. I wonder if you could have a method that exposes 
that instead, rather than constructing a hashmap from the internal map.



##########
core/src/main/java/kafka/server/share/SharePartitionManager.java:
##########
@@ -144,6 +146,11 @@ public class SharePartitionManager implements 
AutoCloseable {
      */
     private final BrokerTopicStats brokerTopicStats;
 
+    /**
+     * Flag indicating if share groups have been turned on.
+     */
+    private final AtomicBoolean isShareGroupsSupported;

Review Comment:
   grammatical nit: `areShareGroupsSupported` reads much better.



##########
core/src/main/java/kafka/server/share/SharePartitionManager.java:
##########
@@ -455,9 +468,12 @@ public ShareFetchContext newContext(
                         ImplicitLinkedHashCollection<>(shareFetchData.size());
                 shareFetchData.forEach(topicIdPartition ->
                     cachedSharePartitions.mustAdd(new 
CachedSharePartition(topicIdPartition, false)));
-                ShareSessionKey responseShareSessionKey = 
cache.maybeCreateSession(groupId, reqMetadata.memberId(),
-                    cachedSharePartitions, clientConnectionId);
-                if (responseShareSessionKey == null) {
+                ShareSessionKey responseShareSessionKey = null;
+                if (isShareGroupsSupported.get()) {
+                    responseShareSessionKey = 
cache.maybeCreateSession(groupId, reqMetadata.memberId(),
+                        cachedSharePartitions, clientConnectionId);
+                }
+                if (responseShareSessionKey == null && 
isShareGroupsSupported.get()) {

Review Comment:
   This test doesn't seem right in the situation where share groups are not 
supported. We'd have no share session key, but we'd still proceed to create a 
share session context.



##########
core/src/test/java/kafka/server/share/SharePartitionManagerTest.java:
##########
@@ -2803,6 +2804,59 @@ public void testFetchMessagesRotatePartitions() {
         validateRotatedListEquals(topicIdPartitions, 
resultShareFetch.topicIdPartitions(), 1);
     }
 
+    @Test
+    public void testIsShareGroupEnabled() {

Review Comment:
   I'm afraid that "is" clashes with my sense of grammatical purity.



##########
core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala:
##########
@@ -232,6 +240,23 @@ class BrokerMetadataPublisher(
       if (_firstPublish) {
         finishInitializingReplicaManager()
       }
+
+      if (delta.featuresDelta != null) {
+        try {
+          val newFinalizedFeatures = new 
FinalizedFeatures(newImage.features.metadataVersionOrThrow, 
newImage.features.finalizedVersions, newImage.provenance.lastContainedOffset)
+          // Share version feature has been toggled.
+          if 
(!(newFinalizedFeatures.finalizedFeatures().getOrDefault(ShareVersion.FEATURE_NAME,
 0.toShort) == finalizedShareVersion)) {

Review Comment:
   Wouldn't this test be cleaner replacing the `==` with `!=`?



##########
server/src/main/java/org/apache/kafka/server/share/session/ShareSessionCache.java:
##########
@@ -90,6 +90,14 @@ public synchronized int size() {
         return sessions.size();
     }
 
+    /**
+     * Remove all the share sessions from cache.
+     */
+    public synchronized void removeAllSessions() {
+        sessions.clear();
+        numPartitions = 0;

Review Comment:
   I observe that the `connectionIdToSessionMap` is not clearer. I think that's 
harmless and it will gradually empty as the connections close.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to