squah-confluent commented on code in PR #21565:
URL: https://github.com/apache/kafka/pull/21565#discussion_r2868419294


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2087,8 +2163,17 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, 
CoordinatorRecord> stream
                 targetAssignmentEpoch = groupEpoch;
             }
         } else {
-            targetAssignmentEpoch = group.assignmentEpoch();
-            targetAssignment = 
group.targetAssignment(updatedMember.memberId());
+            if (instanceId == null) {
+                targetAssignmentEpoch = group.assignmentEpoch();
+                targetAssignment = 
group.targetAssignment(updatedMember.memberId());
+            } else {
+                targetAssignmentEpoch = group.assignmentEpoch();
+                StreamsGroupMember maybeOldStaticMember = 
group.staticMember(instanceId);
+                String maybeOldStaticMemberId = maybeOldStaticMember == null ?
+                        updatedMember.memberId() :
+                        maybeOldStaticMember.memberId();
+                targetAssignment = 
group.targetAssignment(maybeOldStaticMemberId);
+            }

Review Comment:
   In consumer groups, this is written as
   ```
               targetAssignmentEpoch = group.assignmentEpoch();
               targetAssignment = 
group.targetAssignment(updatedMember.memberId(), updatedMember.instanceId());
   ```
   Could we follow the same pattern?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java:
##########
@@ -500,6 +500,20 @@ public Map<String, String> staticMembers() {
         return Collections.unmodifiableMap(staticMembers);
     }
 
+    /**
+     * @return Returns whether the given instance ID is currently associated 
with an existing static member.
+     */
+    public boolean staticMemberExist(String instanceId) {
+        String memberId = staticMembers.get(instanceId);
+        if (memberId == null) 
+            return false;
+        
+        StreamsGroupMember member = members.get(memberId);
+        if (member == null)
+            return false;
+        return true;
+    }
+

Review Comment:
   Could we follow the same pattern as `ConsumerGroup.hasStaticMember`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to