squah-confluent commented on code in PR #21727:
URL: https://github.com/apache/kafka/pull/21727#discussion_r3186988796
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilder.java:
##########
@@ -297,22 +310,118 @@ public TargetAssignmentResult build() throws
TaskAssignorException {
memberSpecs.keySet().stream().collect(Collectors.toMap(x -> x,
x -> MemberAssignment.empty())));
}
+ return newGroupAssignment;
+ }
+
+ /**
+ * Builds the records for the new target assignment, when the set of
members and static members
+ * have not changed since the assignment was built.
+ *
+ * @param newGroupAssignment The new target assignment.
+ * @return A TargetAssignmentResult which contains the records to update
+ * the existing target assignment.
+ */
+ public TargetAssignmentResult buildRecords(GroupAssignment
newGroupAssignment) {
+ return buildRecords(newGroupAssignment, Optional.empty(),
Optional.empty());
+ }
+
+ /**
+ * Builds the records for the new target assignment, when the set of
members and static members
+ * may have changed since the assignment was built.
+ *
+ * @param newGroupAssignment The new target assignment.
+ * @param currentMemberIds The current set of member ids.
+ * @param currentStaticMembers The current static members.
+ * @return A TargetAssignmentResult which contains the records to update
+ * the existing target assignment.
+ */
+ public TargetAssignmentResult buildRecords(
+ GroupAssignment newGroupAssignment,
+ Set<String> currentMemberIds,
+ Map<String, String> currentStaticMembers
+ ) {
+ return buildRecords(newGroupAssignment, Optional.of(currentMemberIds),
Optional.of(currentStaticMembers));
+ }
+
+ /**
+ * Builds the records for the new target assignment.
+ *
+ * @param newGroupAssignment The new target assignment.
+ * @param currentMemberIds The current set of member ids, if they may
have changed since the assignment was built.
+ * @param currentStaticMembers The current static members, if they may
have changed since the assignment was built.
+ * @return A TargetAssignmentResult which contains the records to update
+ * the existing target assignment.
+ */
+ public TargetAssignmentResult buildRecords(
+ GroupAssignment newGroupAssignment,
+ Optional<Set<String>> currentMemberIds,
+ Optional<Map<String, String>> currentStaticMembers
+ ) {
+ Set<String> memberIds = new HashSet<>(members.keySet());
Review Comment:
I can't quite follow. Could you provide some example code to demonstrate
what you have in mind?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]