ableegoldman commented on code in PR #16201:
URL: https://github.com/apache/kafka/pull/16201#discussion_r1628527914


##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java:
##########
@@ -53,6 +55,76 @@ public final class TaskAssignmentUtils {
 
     private TaskAssignmentUtils() {}
 
+    /**
+     * Return an {@code AssignmentError} for a task assignment created for an 
application.
+     *
+     * @param applicationState The application for which this task assignment 
is being assessed.
+     * @param taskAssignment   The task assignment that will be validated.
+     *
+     * @return {@code AssignmentError.NONE} if the assignment created for this 
application is valid,
+     *         or another {@code AssignmentError} otherwise.
+     */
+    public static AssignmentError validateTaskAssignment(final 
ApplicationState applicationState,
+                                                         final TaskAssignment 
taskAssignment) {
+        final Collection<KafkaStreamsAssignment> assignments = 
taskAssignment.assignment();
+        final Map<TaskId, ProcessId> activeTasksInOutput = new HashMap<>();
+        final Map<TaskId, ProcessId> standbyTasksInOutput = new HashMap<>();
+        for (final KafkaStreamsAssignment assignment : assignments) {
+            for (final KafkaStreamsAssignment.AssignedTask task : 
assignment.tasks().values()) {
+                if (activeTasksInOutput.containsKey(task.id()) && task.type() 
== KafkaStreamsAssignment.AssignedTask.Type.ACTIVE) {
+                    LOG.error("Assignment is invalid: active task {} was 
assigned to multiple KafkaStreams clients: {} and {}",
+                        task.id(), assignment.processId().id(), 
activeTasksInOutput.get(task.id()).id());
+                    return AssignmentError.ACTIVE_TASK_ASSIGNED_MULTIPLE_TIMES;
+                }
+
+                if (task.type() == 
KafkaStreamsAssignment.AssignedTask.Type.ACTIVE) {
+                    activeTasksInOutput.put(task.id(), assignment.processId());
+                } else {
+                    standbyTasksInOutput.put(task.id(), 
assignment.processId());
+                }
+            }
+        }
+
+        for (final TaskInfo task : applicationState.allTasks().values()) {
+            if (!task.isStateful() && 
standbyTasksInOutput.containsKey(task.id())) {
+                LOG.error("Assignment is invalid: standby task for stateless 
task {} was assigned to KafkaStreams client {}",
+                    task.id(), standbyTasksInOutput.get(task.id()).id());
+                return AssignmentError.INVALID_STANDBY_TASK;
+            }
+        }
+
+        final Map<ProcessId, KafkaStreamsState> clientStates = 
applicationState.kafkaStreamsStates(false);
+        final Set<ProcessId> clientsInOutput = 
assignments.stream().map(KafkaStreamsAssignment::processId)
+            .collect(Collectors.toSet());
+        for (final Map.Entry<ProcessId, KafkaStreamsState> entry : 
clientStates.entrySet()) {
+            final ProcessId processIdInInput = entry.getKey();
+            if (!clientsInOutput.contains(processIdInInput)) {
+                LOG.error("Assignment is invalid: KafkaStreams client {} has 
no assignment", processIdInInput.id());
+                return AssignmentError.MISSING_PROCESS_ID;
+            }
+        }
+
+        for (final ProcessId processIdInOutput : clientsInOutput) {
+            if (!clientStates.containsKey(processIdInOutput)) {
+                LOG.error("Assignment is invalid: the KafkaStreams client {} 
is unknown", processIdInOutput.id());
+                return AssignmentError.UNKNOWN_PROCESS_ID;
+            }
+        }
+
+        final Set<TaskId> taskIdsInInput = 
applicationState.allTasks().keySet();
+        for (final KafkaStreamsAssignment assignment : assignments) {
+            for (final KafkaStreamsAssignment.AssignedTask task : 
assignment.tasks().values()) {
+                if (!taskIdsInInput.contains(task.id())) {
+                    LOG.error("Assignment is invalid: task {} assigned to 
KafkaStreams client {} was unknown",
+                        task.id(), assignment.processId().id());
+                    return AssignmentError.UNKNOWN_TASK_ID;

Review Comment:
   Just noticed we're doing exactly the same loop here as we are at the 
beginning of this method. Can we just move the contents into that loop?
   
   (not a performance thing, I just think it's easier to read if we don't have 
a bunch of individual loops where I have to re-check the loop conditions each 
time)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to