[ https://issues.apache.org/jira/browse/KAFKA-4117?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Guozhang Wang updated KAFKA-4117: --------------------------------- Status: Patch Available (was: In Progress) > Cleanup StreamPartitionAssignor behavior > ---------------------------------------- > > Key: KAFKA-4117 > URL: https://issues.apache.org/jira/browse/KAFKA-4117 > Project: Kafka > Issue Type: Bug > Components: streams > Reporter: Guozhang Wang > Assignee: Guozhang Wang > Labels: architecture > > I went through the whole assignment logic once again and I feel the logic has > now becomes a bit lossy, and I want to clean them up probably in another PR > but just dump my thoughts here on the appropriate logic: > Some background: > 1. Each {{KafkaStreams}} instance contains a clientId, and if not specified > default value is applicationId-1/2/etc if there are multiple instances inside > the same JVM. One instance contains multiple threads where the > thread-clientId is constructed as clientId-StreamThread-1/2/etc, and the > thread-clientId is used as the embedded consumer clientId as well as metrics > tag. > 2. However, since one instance can contain multiple threads, and hence > multiple consumers, and when considering partition assignment, the streams > library need to take the capacity into consideration based on the granularity > of instance not on threads. Therefore we create a 4byte {{UUID.randomUUID()}} > as the processId and encode that in the subscription metadata bytes, and the > leader then knows if multiple consumer members are actually belong to the > same instance (i.e. belong to threads of that instance), so that when > assigning partitions it can balance among instances. NOTE that in production > we recommend one thread per instance, so consumersByClient will only have one > consumer per client (i.e. instance). > 3. In addition, historically we hard-code the partition grouper logic, where > for each task, it is assigned only with one partition of its subscribed > topic. For example, if we have topicA with 5 partitions and topicB with 10 > partitions, we will create 10 tasks, with the first five tasks containing one > of the partitions each, while the last five tasks contain only one partition > from topicB. And therefore the TaskId class contains the groupId of the > sub-topology and the partition, so that taskId(group, 1) gets partition1 of > topicA and partition1 of topicB. We later expose this to users to customize > so that more than one partitions of the topic can be assigned to the same > task, so that the partition field in the TaskId no longer indicate anything > about which partitions are assigned, and we add {{AssignedPartitions}} to > capture which partitions are assigned to which tasks. > 4. While doing the assignment, the leader is also responsible for creating > these changelog / repartition topics, and the number of partitions of these > topics are equal to the number of tasks that needs to write to these topics, > which are wrapped in {{stateChangelogTopicToTaskIds}} and > {{internalSourceTopicToTaskIds}} respectively. After such topics are created, > the leader also needs to "augment" the received cluster metadata with these > topics to 1) check for copartitioning, and 2) maintained for QueryableState's > discovery function. > The current implementation is mixed with all these legacy logic and gets > quite messy, and I'm thinking to make a pass over the StreamPartitionAssignor > and cleaning up it bit. More precisely: > 1. Read and parse the subscription information to construct the > clientMetadata map, where each metadata contains the {{Set<String> > consumerMemberIds}}, {{ClientState<TaskId> state}}, and {{HostInfo hostInfo}}. > 2. Access the (sub-)topology to create the corresponding changelog / > repartition topics and construct the {{stateChangelogTopicToTaskIds}} and > {{internalSourceTopicToTaskIds}}. > 3. Call {{streamThread.partitionGrouper.partitionGroups}} to get the map from > created tasks to their assigned partitions. > 4. Call {{TaskAssignor.assign}} (which now takes the whole clientMetadata > map) to assign tasks to clients, and hence we get the assigned partitions to > clients. > 5. For each client, use some round-robin manner (as we did now) to assign > tasks to their hosted consumers with the {{clientMetadata.consumerMemberIds}} > map. > 6. Check co-partitioning of assigned partitions, and maintain the {{Cluster}} > metadata locally on the leader. > 7. Construct the assignment info, where activeTasks is also a map from > {{TaskId}} to list of {{TopicPartitions}} since otherwise we will not know > which partitions are assigned to which tasks. > 8. For non-leaders, when getting the assignment, also construct the Cluster > metadata from the decoded assignment information; and also maintain the > AssignmentInfo locally for constructing the tasks. > And some minor improvements: > 1. The default {{thread-clientIds applicationId-x-StreamThread-y}} may still > be conflicting to each other with multiple JVMs / machines, which is bad for > metrics collection / debugging across hosts. We can modify the default > clientId to {{applicationId-processId}} whereprocessId is the generated UUID, > hence the default thread-clientId is {{applicationId-UUID-StreamThread-y}}. > 2. The {{TaskId.partition}} field no longer indicate which partitions are > actually assigned to this task, but we still need to keep its topicGroupId > field as it indicates which sub-topology this task belongs to, hence helpful > for debugging. So maybe we can rename the partition field to sth. like > sequence? -- This message was sent by Atlassian JIRA (v6.3.4#6332)