[
https://issues.apache.org/jira/browse/KAFKA-4117?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Guozhang Wang updated KAFKA-4117:
---------------------------------
Resolution: Fixed
Fix Version/s: 0.10.2.0
Status: Resolved (was: Patch Available)
Issue resolved by pull request 2012
[https://github.com/apache/kafka/pull/2012]
> Cleanup StreamPartitionAssignor behavior
> ----------------------------------------
>
> Key: KAFKA-4117
> URL: https://issues.apache.org/jira/browse/KAFKA-4117
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Reporter: Guozhang Wang
> Assignee: Guozhang Wang
> Labels: architecture
> Fix For: 0.10.2.0
>
>
> I went through the whole assignment logic once again and I feel the logic has
> now becomes a bit lossy, and I want to clean them up probably in another PR
> but just dump my thoughts here on the appropriate logic:
> Some background:
> 1. Each {{KafkaStreams}} instance contains a clientId, and if not specified
> default value is applicationId-1/2/etc if there are multiple instances inside
> the same JVM. One instance contains multiple threads where the
> thread-clientId is constructed as clientId-StreamThread-1/2/etc, and the
> thread-clientId is used as the embedded consumer clientId as well as metrics
> tag.
> 2. However, since one instance can contain multiple threads, and hence
> multiple consumers, and when considering partition assignment, the streams
> library need to take the capacity into consideration based on the granularity
> of instance not on threads. Therefore we create a 4byte {{UUID.randomUUID()}}
> as the processId and encode that in the subscription metadata bytes, and the
> leader then knows if multiple consumer members are actually belong to the
> same instance (i.e. belong to threads of that instance), so that when
> assigning partitions it can balance among instances. NOTE that in production
> we recommend one thread per instance, so consumersByClient will only have one
> consumer per client (i.e. instance).
> 3. In addition, historically we hard-code the partition grouper logic, where
> for each task, it is assigned only with one partition of its subscribed
> topic. For example, if we have topicA with 5 partitions and topicB with 10
> partitions, we will create 10 tasks, with the first five tasks containing one
> of the partitions each, while the last five tasks contain only one partition
> from topicB. And therefore the TaskId class contains the groupId of the
> sub-topology and the partition, so that taskId(group, 1) gets partition1 of
> topicA and partition1 of topicB. We later expose this to users to customize
> so that more than one partitions of the topic can be assigned to the same
> task, so that the partition field in the TaskId no longer indicate anything
> about which partitions are assigned, and we add {{AssignedPartitions}} to
> capture which partitions are assigned to which tasks.
> 4. While doing the assignment, the leader is also responsible for creating
> these changelog / repartition topics, and the number of partitions of these
> topics are equal to the number of tasks that needs to write to these topics,
> which are wrapped in {{stateChangelogTopicToTaskIds}} and
> {{internalSourceTopicToTaskIds}} respectively. After such topics are created,
> the leader also needs to "augment" the received cluster metadata with these
> topics to 1) check for copartitioning, and 2) maintained for QueryableState's
> discovery function.
> The current implementation is mixed with all these legacy logic and gets
> quite messy, and I'm thinking to make a pass over the StreamPartitionAssignor
> and cleaning up it bit. More precisely:
> 1. Read and parse the subscription information to construct the
> clientMetadata map, where each metadata contains the {{Set<String>
> consumerMemberIds}}, {{ClientState<TaskId> state}}, and {{HostInfo hostInfo}}.
> 2. Access the (sub-)topology to create the corresponding changelog /
> repartition topics and construct the {{stateChangelogTopicToTaskIds}} and
> {{internalSourceTopicToTaskIds}}.
> 3. Call {{streamThread.partitionGrouper.partitionGroups}} to get the map from
> created tasks to their assigned partitions.
> 4. Call {{TaskAssignor.assign}} (which now takes the whole clientMetadata
> map) to assign tasks to clients, and hence we get the assigned partitions to
> clients.
> 5. For each client, use some round-robin manner (as we did now) to assign
> tasks to their hosted consumers with the {{clientMetadata.consumerMemberIds}}
> map.
> 6. Check co-partitioning of assigned partitions, and maintain the {{Cluster}}
> metadata locally on the leader.
> 7. Construct the assignment info, where activeTasks is also a map from
> {{TaskId}} to list of {{TopicPartitions}} since otherwise we will not know
> which partitions are assigned to which tasks.
> 8. For non-leaders, when getting the assignment, also construct the Cluster
> metadata from the decoded assignment information; and also maintain the
> AssignmentInfo locally for constructing the tasks.
> And some minor improvements:
> 1. The default {{thread-clientIds applicationId-x-StreamThread-y}} may still
> be conflicting to each other with multiple JVMs / machines, which is bad for
> metrics collection / debugging across hosts. We can modify the default
> clientId to {{applicationId-processId}} whereprocessId is the generated UUID,
> hence the default thread-clientId is {{applicationId-UUID-StreamThread-y}}.
> 2. The {{TaskId.partition}} field no longer indicate which partitions are
> actually assigned to this task, but we still need to keep its topicGroupId
> field as it indicates which sub-topology this task belongs to, hence helpful
> for debugging. So maybe we can rename the partition field to sth. like
> sequence?
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)