chickenchickenlove commented on code in PR #21727:
URL: https://github.com/apache/kafka/pull/21727#discussion_r3188514981


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilder.java:
##########
@@ -297,22 +310,118 @@ public TargetAssignmentResult build() throws 
TaskAssignorException {
                 memberSpecs.keySet().stream().collect(Collectors.toMap(x -> x, 
x -> MemberAssignment.empty())));
         }
 
+        return newGroupAssignment;
+    }
+
+    /**
+     * Builds the records for the new target assignment, when the set of 
members and static members
+     * have not changed since the assignment was built.
+     *
+     * @param newGroupAssignment    The new target assignment.
+     * @return A TargetAssignmentResult which contains the records to update
+     *         the existing target assignment.
+     */
+    public TargetAssignmentResult buildRecords(GroupAssignment 
newGroupAssignment) {
+        return buildRecords(newGroupAssignment, Optional.empty(), 
Optional.empty());
+    }
+
+    /**
+     * Builds the records for the new target assignment, when the set of 
members and static members
+     * may have changed since the assignment was built.
+     *
+     * @param newGroupAssignment    The new target assignment.
+     * @param currentMemberIds      The current set of member ids.
+     * @param currentStaticMembers  The current static members.
+     * @return A TargetAssignmentResult which contains the records to update
+     *         the existing target assignment.
+     */
+    public TargetAssignmentResult buildRecords(
+        GroupAssignment newGroupAssignment,
+        Set<String> currentMemberIds,
+        Map<String, String> currentStaticMembers
+    ) {
+        return buildRecords(newGroupAssignment, Optional.of(currentMemberIds), 
Optional.of(currentStaticMembers));
+    }
+
+    /**
+     * Builds the records for the new target assignment.
+     *
+     * @param newGroupAssignment    The new target assignment.
+     * @param currentMemberIds      The current set of member ids, if they may 
have changed since the assignment was built.
+     * @param currentStaticMembers  The current static members, if they may 
have changed since the assignment was built.
+     * @return A TargetAssignmentResult which contains the records to update
+     *         the existing target assignment.
+     */
+    public TargetAssignmentResult buildRecords(
+        GroupAssignment newGroupAssignment,
+        Optional<Set<String>> currentMemberIds,
+        Optional<Map<String, String>> currentStaticMembers
+    ) {
+        Set<String> memberIds = new HashSet<>(members.keySet());

Review Comment:
   @squah-confluent 
   Sorry for my poor explanation. 🙇‍♂️ 
   
   What I had in mind was something like the following. The exact API is not 
important, but the idea is to make snapshotting an explicit responsibility 
instead of cloning each collection individually at the call site.
   
   For example, 
   ```java
   final class StreamsAssignmentSnapshot {
       // fields
       private final Map<String, StreamsGroupMember> members;
       private final Map<String, String> staticMembers;
       private final Map<String, TasksTuple> targetAssignment;
       private final ConfiguredTopology topology;
       private final Map<String, String> assignmentConfigs;
   
       private StreamsAssignmentSnapshot(
               Map<String, StreamsGroupMember> members,
               Map<String, String> staticMembers,
               Map<String, TasksTuple> targetAssignment,
               ConfiguredTopology topology,
               Map<String, String> assignmentConfigs
       ) {
           this.members = Map.copyOf(members);
           this.staticMembers = Map.copyOf(staticMembers);
           this.targetAssignment = snapshotTargetAssignment(targetAssignment);
           this.topology = topology;
           this.assignmentConfigs = Map.copyOf(assignmentConfigs);
       }
   
       static StreamsAssignmentSnapshot from(
               StreamsGroup group,
               ConfiguredTopology topology,
               Map<String, String> assignmentConfigs
       ) {
           return new StreamsAssignmentSnapshot(
                   group.members(),
                   group.staticMembers(),
                   group.targetAssignment(),
                   topology,
                   assignmentConfigs
           );
       }
   
       // getter
       Map<String, StreamsGroupMember> members() {...}
       Map<String, String> staticMembers() {...}
   
       // For deep copy. maybe we can use immutable collections?
       private static Map<String, TasksTuple> snapshotTargetAssignment(
               Map<String, TasksTuple> targetAssignment
       ) {
           return 
targetAssignment.entrySet().stream().collect(Collectors.toUnmodifiableMap(
                   Map.Entry::getKey,
                   entry -> copyTasksTuple(entry.getValue())
           ));
       }
   
       // For deep copy. maybe we can use immutable collections?
       private static TasksTuple copyTasksTuple(TasksTuple tasksTuple) {
           return new TasksTuple(
                   copyTasks(tasksTuple.activeTasks()),
                   copyTasks(tasksTuple.standbyTasks()),
                   copyTasks(tasksTuple.warmupTasks())
           );
       }
   
       // For deep copy. maybe we can use immutable collections?
       private static Map<String, Set<Integer>> copyTasks(
               Map<String, Set<Integer>> tasks
       ) {
           return 
tasks.entrySet().stream().collect(Collectors.toUnmodifiableMap(
                   Map.Entry::getKey,
                   entry -> Set.copyOf(entry.getValue())
           ));
       }
   }
   ```
   
   Then the caller could either pass the snapshot fields into the existing 
builder:
   ```java
     StreamsAssignmentSnapshot snapshot = StreamsAssignmentSnapshot.from(
         group,
         configuredTopology,
         assignmentConfigs
     );
   
     TargetAssignmentBuilder assignmentResultBuilder =
         new TargetAssignmentBuilder(
             logContext,
             group.groupId(),
             groupEpoch,
             assignor,
             snapshot.assignmentConfigs()
         )
             .withTime(time)
             .withMembers(snapshot.members())
             .withTopology(snapshot.topology())
             .withStaticMembers(snapshot.staticMembers())
             .withMetadataImage(metadataImage)
             .withTargetAssignment(snapshot.targetAssignment());
   ```
   
   Or the builder could accept the snapshot directly:
   
   ```java
     TargetAssignmentBuilder assignmentResultBuilder =
         new TargetAssignmentBuilder(
             logContext,
             group.groupId(),
             groupEpoch,
             assignor,
             snapshot.assignmentConfigs()
         )
             .withTime(time)
             .withAssignmentSnapshot(snapshot)
             .withMetadataImage(metadataImage);
   ```
   I personally prefer the withAssignmentSnapshot(...) shape because it 
communicates that these inputs are already stable snapshots, and it avoids 
adding more individual withX(...) calls as the assignor inputs grow.
   
   The main benefit I see is maintainability. If a future change adds another 
input to the offloaded assignment path, there would be one obvious place to 
decide how that field should be snapshotted. For example, simple maps can use a 
top-level copy, while nested structures such as targetAssignment can make their 
deeper copy policy explicit in the snapshot class.
   
   What do you think?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to