jeffkbkim commented on code in PR #13870:
URL: https://github.com/apache/kafka/pull/13870#discussion_r1235497178


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -171,70 +260,152 @@ GroupMetadataManager build() {
     /**
      * The maximum number of members allowed in a single consumer group.
      */
-    private final int consumerGroupMaxSize;
+    private final int groupMaxSize;
 
     /**
      * The heartbeat interval for consumer groups.
      */
     private final int consumerGroupHeartbeatIntervalMs;
 
     /**
-     * The topics metadata (or image).
+     * The metadata image.
+     */
+    private MetadataImage metadataImage;
+
+    // Rest of the fields are used for the generic group APIs.
+
+    /**
+     * An empty result returned to the state machine. This means that
+     * there are no records to append to the log.
+     *
+     * Package private for testing.
+     */
+    static final CoordinatorResult<CompletableFuture<Errors>, Record> 
EMPTY_RESULT =
+        new CoordinatorResult<>(Collections.emptyList(), 
CompletableFuture.completedFuture(null));
+
+    /**
+     * Initial rebalance delay for members joining a generic group.
+     */
+    private final int initialRebalanceDelayMs;
+
+    /**
+     * The timeout used to wait for a new member in milliseconds.
+     */
+    private final int newMemberJoinTimeoutMs;
+
+    /**
+     * The group minimum session timeout.
+     */
+    private final int groupMinSessionTimeoutMs;
+
+    /**
+     * The group maximum session timeout.
+     */
+    private final int groupMaxSessionTimeoutMs;
+
+    /**
+     * The timer to add and cancel group operations.
      */
-    private TopicsImage topicsImage;
+    private final Timer<CompletableFuture<Errors>, Record> timer;
+
+    /**
+     * The time.
+     */
+    private final Time time;
 
     private GroupMetadataManager(
         SnapshotRegistry snapshotRegistry,
         LogContext logContext,
         List<PartitionAssignor> assignors,
-        TopicsImage topicsImage,
-        int consumerGroupMaxSize,
-        int consumerGroupHeartbeatIntervalMs
+        MetadataImage metadataImage,
+        TopicPartition topicPartition,
+        int groupMaxSize,
+        int consumerGroupHeartbeatIntervalMs,
+        int initialRebalanceDelayMs,
+        int newMemberJoinTimeoutMs,
+        int groupMinSessionTimeoutMs,
+        int groupMaxSessionTimeoutMs,
+        Timer<CompletableFuture<Errors>, Record> timer,
+        Time time
     ) {
+        this.logContext = logContext;
         this.log = logContext.logger(GroupMetadataManager.class);
         this.snapshotRegistry = snapshotRegistry;
-        this.topicsImage = topicsImage;
+        this.metadataImage = metadataImage;
         this.assignors = 
assignors.stream().collect(Collectors.toMap(PartitionAssignor::name, 
Function.identity()));
+        this.topicPartition = topicPartition;
         this.defaultAssignor = assignors.get(0);
         this.groups = new TimelineHashMap<>(snapshotRegistry, 0);
-        this.consumerGroupMaxSize = consumerGroupMaxSize;
+        this.groupMaxSize = groupMaxSize;
         this.consumerGroupHeartbeatIntervalMs = 
consumerGroupHeartbeatIntervalMs;
+        this.initialRebalanceDelayMs = initialRebalanceDelayMs;
+        this.newMemberJoinTimeoutMs = newMemberJoinTimeoutMs;
+        this.groupMinSessionTimeoutMs = groupMinSessionTimeoutMs;
+        this.groupMaxSessionTimeoutMs = groupMaxSessionTimeoutMs;
+        this.timer = timer;
+        this.time = time;
+    }
+
+    /**
+     * When a new metadata image is pushed.
+     *
+     * @param metadataImage The new metadata image.
+     */
+    public void onNewMetadataImage(MetadataImage metadataImage) {
+        this.metadataImage = metadataImage;
     }
 
     /**
      * Gets or maybe creates a consumer group.
      *
      * @param groupId           The group id.
+     * @param groupType         The group type (generic or consumer).
      * @param createIfNotExists A boolean indicating whether the group should 
be
      *                          created if it does not exist.
      *
      * @return A ConsumerGroup.
+     * @throws InvalidGroupIdException  if the group id is invalid.
      * @throws GroupIdNotFoundException if the group does not exist and 
createIfNotExists is false or
      *                                  if the group is not a consumer group.
      *
      * Package private for testing.
      */
-    ConsumerGroup getOrMaybeCreateConsumerGroup(
+    // Package private for testing.
+    Group getOrMaybeCreateGroup(

Review Comment:
   i think we should store generic groups separately. the added benefit here is 
that we wouldn't have to create a new record when a new group was created as 
you have mentioned in the comment below. wdyt?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to