The current assignment would be "round robin". Ie, after all tasks are created, we just take task-by-task and assign one to the threads one-by-one.
Note though, that the assignment algorithm might change at any point, so you should not rely on it. We are also not able to know if one topic has less traffic than others and thus must blindly assume (what is of course a simplification) that all topics have the same traffic. We only consider the difference between stateless and stateful tasks atm. -Matthias On 10/6/20 3:57 AM, Pushkar Deole wrote: > Matthias, > > I am just wondering how the tasks will be spread across threads in case I > have lesser threads than the number of partitions. Specifically taking my > use case, I have 3 inputs topics with 8 partitions each and I can configure > 12 threads, so how below topics partitions will be distributed among 12 > threads. > Note that topic C is generally idle and carries traffic only sometimes, so > I would want partitions from topic C to be evenly distributed so all > partitions from topic C don't get assigned to only some of the threads. > > Topic A - 8 partitions > Topic B - 8 partitions > Topic C - 8 partitions > > On Fri, Sep 4, 2020 at 9:56 PM Matthias J. Sax <mj...@apache.org> wrote: > >> That is correct. >> >> If topicA has 5 partitions and topicB has 6 partitions, you get 5 tasks >> for the first sub-topology and 6 tasks for the second sub-topology and >> you can run up to 11 threads, each executing one task. >> >> >> -Matthias >> >> On 9/4/20 1:30 AM, Pushkar Deole wrote: >>> Matthias, >>> >>> Let's say we have independent sub topologies like: in this case, will the >>> streams create tasks equal to the total number of partitions from topicA >>> and topicB, and can we assign stream thread count that is sum of the >>> partition of the two topics? >>> >>> builder.stream("topicA").filter().to(); >>> builder.stream("topicB").filter().to(); >>> >>> On Thu, Sep 3, 2020 at 8:28 PM Matthias J. Sax <mj...@apache.org> wrote: >>> >>>> Well, it depends on your program. >>>> >>>> The reason for the current task creating strategy are joins: If you have >>>> two input topic that you want to join, the join happens on a >>>> per-partition basis, ie, topic-p0 is joined to topicB-p0 etc and thus >>>> both partitions must be assigned to the same task (to get co-partitioned >>>> data processed together). >>>> >>>> Note, that the following program would create independent tasks as it >>>> consist of two independent sub-topologies: >>>> >>>> builder.stream("topicA").filter().to(); >>>> builder.stream("topicB").filter().to(); >>>> >>>> However, the next program would be one sub-topology and thus we apply >>>> the "join" rule (as we don't really know if you actually execute a join >>>> or not when we create tasks): >>>> >>>> KStream s1 = builder.stream("topicA"); >>>> builser.stream("topicB").merge(s1).filter().to(); >>>> >>>> >>>> Having said that, I agree that it would be a nice improvement to be more >>>> clever about it. However, it not easy to do. There is actually a related >>>> ticket: https://issues.apache.org/jira/browse/KAFKA-9282 >>>> >>>> >>>> Hope this helps. >>>> -Matthias >>>> >>>> On 9/2/20 11:09 PM, Pushkar Deole wrote: >>>>> Hi, >>>>> >>>>> I came across articles where it is explained how parallelism is handled >>>> in >>>>> kafka streams. This is what I collected: >>>>> When the streams application is reading from multiple topics, the topic >>>>> with maximum number of partitions is considered for instantiating >> stream >>>>> tasks so 1 task is instantiated per partition. >>>>> Now, if the stream task is reading from multiple topics then the >>>> partitions >>>>> of multiple topics are shared among those stream tasks. >>>>> >>>>> For example, Topic A and B has 5 partitions each then 5 tasks are >>>>> instantiated and assigned to 5 stream threads where each task is >>>> assigned 1 >>>>> partition from Topic A and Topic B. >>>>> >>>>> The question here is : if I would want 1 task to be created for each >>>>> partition from the input topic then is this possible? e.g. I would want >>>> to >>>>> have 5 tasks for topic A and 5 for B and then would want 10 threads to >>>>> handle those. How can this be achieved? >>>>> >>>> >>>> >>> >> >> >