jeffkbkim commented on code in PR #18020:
URL: https://github.com/apache/kafka/pull/18020#discussion_r1872281239


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/SubscriptionCount.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.modern;
+
+/**
+ * A class which holds two counters. One to count subscription by name and
+ * another one to count subscription by regex.
+ */
+public class SubscriptionCount {
+    public final int byNameCount;
+    public final int byRegexCount;
+
+    public SubscriptionCount(int byNameCount, int byRegexCount) {
+        this.byNameCount = byNameCount;
+        this.byRegexCount = byRegexCount;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        SubscriptionCount that = (SubscriptionCount) o;
+
+        if (byNameCount != that.byNameCount) return false;
+        return byRegexCount == that.byRegexCount;
+    }
+
+    @Override
+    public int hashCode() {
+        int result = byNameCount;
+        result = 31 * result + byRegexCount;
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return "SubscriptionCount(" +
+            "byNameCount=" + byNameCount +
+            ", byRegexCount=" + byRegexCount +
+            ')';
+    }
+
+    public static SubscriptionCount incNameCount(String key, SubscriptionCount 
count) {

Review Comment:
   can we remove all `String key` parameters in the inc/dec methods?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##########
@@ -798,6 +827,60 @@ private void validateMemberEpoch(
         }
     }
 
+    /**
+     * Computes the subscription type based on the provided information.
+     *
+     * @param subscribedRegularExpressions  The subscribed regular expression 
count.
+     * @param subscribedTopicNames          The subscribed topic name count.
+     * @param numberOfMembers               The number of members in the group.
+     *
+     * @return The subscription type.
+     */
+    public static SubscriptionType subscriptionType(
+        Map<String, Integer> subscribedRegularExpressions,
+        Map<String, SubscriptionCount> subscribedTopicNames,
+        int numberOfMembers
+    ) {
+        if (subscribedRegularExpressions.isEmpty()) {
+            // If the members do not use regular expressions, the subscription 
is
+            // considered as homogeneous if all the members are subscribed to 
the
+            // same topics. Otherwise, it is considered as heterogeneous.
+            for (SubscriptionCount subscriberCount : 
subscribedTopicNames.values()) {
+                if (subscriberCount.byNameCount != numberOfMembers) {
+                    return HETEROGENEOUS;
+                }
+            }
+            return HOMOGENEOUS;
+        } else {
+            int count = 
subscribedRegularExpressions.values().iterator().next();
+            if (count == numberOfMembers) {
+                // If all the members are subscribed to a single regular 
expressions
+                // and none of them are subscribed to topic names, the 
subscription
+                // is considered as homogeneous. If some members are 
subscribed to
+                // topic names too, the subscription is considered as 
heterogeneous.
+                for (SubscriptionCount subscriberCount : 
subscribedTopicNames.values()) {
+                    if (subscriberCount.byRegexCount != 1 || 
subscriberCount.byNameCount > 0) {
+                        return HETEROGENEOUS;

Review Comment:
   To confirm, homogeneous means that all members are subscribed to the same 
set of topics, and they all subscribed using the same method, either by name or 
by regex?
   
   Also, if we have:
   * 5 members, 1 topic "foo"
   * 4 members subscribed via regex "foo*"
   * 1 member subscribed via name "foo"
   
   my intuition tells me this should be homogeneous because it results in the 
homogeneous assignment computation. this is if we define homogeneous as "all 
members are subscribed to the same set of topics"
   



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java:
##########
@@ -2097,4 +2098,134 @@ public void 
testCreateGroupTombstoneRecordsWithReplacedMember() {
             records
         );
     }
+
+    @Test
+    public void testSubscriptionType() {
+        assertEquals(
+            HOMOGENEOUS,
+            ConsumerGroup.subscriptionType(
+                Collections.emptyMap(),
+                Collections.emptyMap(),
+                0
+            )
+        );
+
+        assertEquals(
+            HOMOGENEOUS,
+            ConsumerGroup.subscriptionType(
+                Collections.emptyMap(),
+                Map.of("foo", new SubscriptionCount(5, 0)),
+                5
+            )
+        );
+
+        assertEquals(
+            HETEROGENEOUS,
+            ConsumerGroup.subscriptionType(
+                Collections.emptyMap(),
+                Map.of(
+                    "foo", new SubscriptionCount(4, 0),
+                    "bar", new SubscriptionCount(1, 0)
+                ),
+                5
+            )
+        );
+
+        assertEquals(
+            HOMOGENEOUS,
+            ConsumerGroup.subscriptionType(
+                Map.of("foo*", 5),
+                Map.of("foo", new SubscriptionCount(0, 1)),
+                5
+            )
+        );

Review Comment:
   nit: can we do something like
   ```
           assertEquals(
               HOMOGENEOUS,
               ConsumerGroup.subscriptionType(
                   Map.of("foo*", 5),
                   Map.of(
                       "foo", new SubscriptionCount(0, 1), 
                       "food", new SubscriptionCount(0, 1)),
                   5
               )
           );
   ```



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##########
@@ -59,11 +61,14 @@
 
 import static org.apache.kafka.coordinator.group.Utils.toOptional;
 import static org.apache.kafka.coordinator.group.Utils.toTopicPartitionMap;
+import static 
org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HETEROGENEOUS;

Review Comment:
   btw, when will we remove `@InterfaceStability.Unstable` from the 
`SubscriptionType` class?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to