chickenchickenlove commented on code in PR #21727:
URL: https://github.com/apache/kafka/pull/21727#discussion_r3188514981
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilder.java:
##########
@@ -297,22 +310,118 @@ public TargetAssignmentResult build() throws
TaskAssignorException {
memberSpecs.keySet().stream().collect(Collectors.toMap(x -> x,
x -> MemberAssignment.empty())));
}
+ return newGroupAssignment;
+ }
+
+ /**
+ * Builds the records for the new target assignment, when the set of
members and static members
+ * have not changed since the assignment was built.
+ *
+ * @param newGroupAssignment The new target assignment.
+ * @return A TargetAssignmentResult which contains the records to update
+ * the existing target assignment.
+ */
+ public TargetAssignmentResult buildRecords(GroupAssignment
newGroupAssignment) {
+ return buildRecords(newGroupAssignment, Optional.empty(),
Optional.empty());
+ }
+
+ /**
+ * Builds the records for the new target assignment, when the set of
members and static members
+ * may have changed since the assignment was built.
+ *
+ * @param newGroupAssignment The new target assignment.
+ * @param currentMemberIds The current set of member ids.
+ * @param currentStaticMembers The current static members.
+ * @return A TargetAssignmentResult which contains the records to update
+ * the existing target assignment.
+ */
+ public TargetAssignmentResult buildRecords(
+ GroupAssignment newGroupAssignment,
+ Set<String> currentMemberIds,
+ Map<String, String> currentStaticMembers
+ ) {
+ return buildRecords(newGroupAssignment, Optional.of(currentMemberIds),
Optional.of(currentStaticMembers));
+ }
+
+ /**
+ * Builds the records for the new target assignment.
+ *
+ * @param newGroupAssignment The new target assignment.
+ * @param currentMemberIds The current set of member ids, if they may
have changed since the assignment was built.
+ * @param currentStaticMembers The current static members, if they may
have changed since the assignment was built.
+ * @return A TargetAssignmentResult which contains the records to update
+ * the existing target assignment.
+ */
+ public TargetAssignmentResult buildRecords(
+ GroupAssignment newGroupAssignment,
+ Optional<Set<String>> currentMemberIds,
+ Optional<Map<String, String>> currentStaticMembers
+ ) {
+ Set<String> memberIds = new HashSet<>(members.keySet());
Review Comment:
@squah-confluent
Sorry for my poor explanation. 🙇♂️
What I had in mind was something like the following. The exact API is not
important, but the idea is to make snapshotting an explicit responsibility
instead of cloning each collection individually at the call site.
For example,
```java
final class StreamsAssignmentSnapshot {
// fields
private final Map<String, StreamsGroupMember> members;
private final Map<String, String> staticMembers;
private final Map<String, TasksTuple> targetAssignment;
private final ConfiguredTopology topology;
private final Map<String, String> assignmentConfigs;
private StreamsAssignmentSnapshot(
Map<String, StreamsGroupMember> members,
Map<String, String> staticMembers,
Map<String, TasksTuple> targetAssignment,
ConfiguredTopology topology,
Map<String, String> assignmentConfigs
) {
this.members = Map.copyOf(members);
this.staticMembers = Map.copyOf(staticMembers);
this.targetAssignment = snapshotTargetAssignment(targetAssignment);
this.topology = topology;
this.assignmentConfigs = Map.copyOf(assignmentConfigs);
}
static StreamsAssignmentSnapshot from(
StreamsGroup group,
ConfiguredTopology topology,
Map<String, String> assignmentConfigs
) {
return new StreamsAssignmentSnapshot(
group.members(),
group.staticMembers(),
group.targetAssignment(),
topology,
assignmentConfigs
);
}
// getter
Map<String, StreamsGroupMember> members() {...}
Map<String, String> staticMembers() {...}
// For deep copy. maybe we can use immutable collections?
private static Map<String, TasksTuple> snapshotTargetAssignment(
Map<String, TasksTuple> targetAssignment
) {
return
targetAssignment.entrySet().stream().collect(Collectors.toUnmodifiableMap(
Map.Entry::getKey,
entry -> copyTasksTuple(entry.getValue())
));
}
// For deep copy. maybe we can use immutable collections?
private static TasksTuple copyTasksTuple(TasksTuple tasksTuple) {
return new TasksTuple(
copyTasks(tasksTuple.activeTasks()),
copyTasks(tasksTuple.standbyTasks()),
copyTasks(tasksTuple.warmupTasks())
);
}
// For deep copy. maybe we can use immutable collections?
private static Map<String, Set<Integer>> copyTasks(
Map<String, Set<Integer>> tasks
) {
return
tasks.entrySet().stream().collect(Collectors.toUnmodifiableMap(
Map.Entry::getKey,
entry -> Set.copyOf(entry.getValue())
));
}
}
```
Then the caller could either pass the snapshot fields into the existing
builder:
```java
StreamsAssignmentSnapshot snapshot = StreamsAssignmentSnapshot.from(
group,
configuredTopology,
assignmentConfigs
);
TargetAssignmentBuilder assignmentResultBuilder =
new TargetAssignmentBuilder(
logContext,
group.groupId(),
groupEpoch,
assignor,
snapshot.assignmentConfigs()
)
.withTime(time)
.withMembers(snapshot.members())
.withTopology(snapshot.topology())
.withStaticMembers(snapshot.staticMembers())
.withMetadataImage(metadataImage)
.withTargetAssignment(snapshot.targetAssignment());
```
Or the builder could accept the snapshot directly:
```java
TargetAssignmentBuilder assignmentResultBuilder =
new TargetAssignmentBuilder(
logContext,
group.groupId(),
groupEpoch,
assignor,
snapshot.assignmentConfigs()
)
.withTime(time)
.withAssignmentSnapshot(snapshot)
.withMetadataImage(metadataImage);
```
I personally prefer the withAssignmentSnapshot(...) shape because it
communicates that these inputs are already stable snapshots, and it avoids
adding more individual withX(...) calls as the assignor inputs grow.
The main benefit I see is maintainability. If a future change adds another
input to the offloaded assignment path, there would be one obvious place to
decide how that field should be snapshotted. For example, simple maps can use a
top-level copy, while nested structures such as targetAssignment can make their
deeper copy policy explicit in the snapshot class.
What do you think?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]