squah-confluent commented on code in PR #21727:
URL: https://github.com/apache/kafka/pull/21727#discussion_r3186988796


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilder.java:
##########
@@ -297,22 +310,118 @@ public TargetAssignmentResult build() throws 
TaskAssignorException {
                 memberSpecs.keySet().stream().collect(Collectors.toMap(x -> x, 
x -> MemberAssignment.empty())));
         }
 
+        return newGroupAssignment;
+    }
+
+    /**
+     * Builds the records for the new target assignment, when the set of 
members and static members
+     * have not changed since the assignment was built.
+     *
+     * @param newGroupAssignment    The new target assignment.
+     * @return A TargetAssignmentResult which contains the records to update
+     *         the existing target assignment.
+     */
+    public TargetAssignmentResult buildRecords(GroupAssignment 
newGroupAssignment) {
+        return buildRecords(newGroupAssignment, Optional.empty(), 
Optional.empty());
+    }
+
+    /**
+     * Builds the records for the new target assignment, when the set of 
members and static members
+     * may have changed since the assignment was built.
+     *
+     * @param newGroupAssignment    The new target assignment.
+     * @param currentMemberIds      The current set of member ids.
+     * @param currentStaticMembers  The current static members.
+     * @return A TargetAssignmentResult which contains the records to update
+     *         the existing target assignment.
+     */
+    public TargetAssignmentResult buildRecords(
+        GroupAssignment newGroupAssignment,
+        Set<String> currentMemberIds,
+        Map<String, String> currentStaticMembers
+    ) {
+        return buildRecords(newGroupAssignment, Optional.of(currentMemberIds), 
Optional.of(currentStaticMembers));
+    }
+
+    /**
+     * Builds the records for the new target assignment.
+     *
+     * @param newGroupAssignment    The new target assignment.
+     * @param currentMemberIds      The current set of member ids, if they may 
have changed since the assignment was built.
+     * @param currentStaticMembers  The current static members, if they may 
have changed since the assignment was built.
+     * @return A TargetAssignmentResult which contains the records to update
+     *         the existing target assignment.
+     */
+    public TargetAssignmentResult buildRecords(
+        GroupAssignment newGroupAssignment,
+        Optional<Set<String>> currentMemberIds,
+        Optional<Map<String, String>> currentStaticMembers
+    ) {
+        Set<String> memberIds = new HashSet<>(members.keySet());

Review Comment:
   I can't quite follow. Could you provide some example code to demonstrate 
what you have in mind?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to