Well, there are many what-ifs and I am not sure if there is general advice.
Maybe a more generic response: do you actually observe a concrete issue with the task assignment that impacts your app measurable? Or might this be a case of premature optimization? -Matthias On 10/6/20 10:13 AM, Pushkar Deole wrote: > So, what do you suggest to address the topic C with lesser traffic? Should > we create a separate StreamBuilder and build a separate topology for topic > C so we can configure number of threads as per our requirement for that > topic? > > On Tue, Oct 6, 2020 at 10:27 PM Matthias J. Sax <mj...@apache.org> wrote: > >> The current assignment would be "round robin". Ie, after all tasks are >> created, we just take task-by-task and assign one to the threads >> one-by-one. >> >> Note though, that the assignment algorithm might change at any point, so >> you should not rely on it. >> >> We are also not able to know if one topic has less traffic than others >> and thus must blindly assume (what is of course a simplification) that >> all topics have the same traffic. We only consider the difference >> between stateless and stateful tasks atm. >> >> -Matthias >> >> On 10/6/20 3:57 AM, Pushkar Deole wrote: >>> Matthias, >>> >>> I am just wondering how the tasks will be spread across threads in case I >>> have lesser threads than the number of partitions. Specifically taking my >>> use case, I have 3 inputs topics with 8 partitions each and I can >> configure >>> 12 threads, so how below topics partitions will be distributed among 12 >>> threads. >>> Note that topic C is generally idle and carries traffic only sometimes, >> so >>> I would want partitions from topic C to be evenly distributed so all >>> partitions from topic C don't get assigned to only some of the threads. >>> >>> Topic A - 8 partitions >>> Topic B - 8 partitions >>> Topic C - 8 partitions >>> >>> On Fri, Sep 4, 2020 at 9:56 PM Matthias J. Sax <mj...@apache.org> wrote: >>> >>>> That is correct. >>>> >>>> If topicA has 5 partitions and topicB has 6 partitions, you get 5 tasks >>>> for the first sub-topology and 6 tasks for the second sub-topology and >>>> you can run up to 11 threads, each executing one task. >>>> >>>> >>>> -Matthias >>>> >>>> On 9/4/20 1:30 AM, Pushkar Deole wrote: >>>>> Matthias, >>>>> >>>>> Let's say we have independent sub topologies like: in this case, will >> the >>>>> streams create tasks equal to the total number of partitions from >> topicA >>>>> and topicB, and can we assign stream thread count that is sum of the >>>>> partition of the two topics? >>>>> >>>>> builder.stream("topicA").filter().to(); >>>>> builder.stream("topicB").filter().to(); >>>>> >>>>> On Thu, Sep 3, 2020 at 8:28 PM Matthias J. Sax <mj...@apache.org> >> wrote: >>>>> >>>>>> Well, it depends on your program. >>>>>> >>>>>> The reason for the current task creating strategy are joins: If you >> have >>>>>> two input topic that you want to join, the join happens on a >>>>>> per-partition basis, ie, topic-p0 is joined to topicB-p0 etc and thus >>>>>> both partitions must be assigned to the same task (to get >> co-partitioned >>>>>> data processed together). >>>>>> >>>>>> Note, that the following program would create independent tasks as it >>>>>> consist of two independent sub-topologies: >>>>>> >>>>>> builder.stream("topicA").filter().to(); >>>>>> builder.stream("topicB").filter().to(); >>>>>> >>>>>> However, the next program would be one sub-topology and thus we apply >>>>>> the "join" rule (as we don't really know if you actually execute a >> join >>>>>> or not when we create tasks): >>>>>> >>>>>> KStream s1 = builder.stream("topicA"); >>>>>> builser.stream("topicB").merge(s1).filter().to(); >>>>>> >>>>>> >>>>>> Having said that, I agree that it would be a nice improvement to be >> more >>>>>> clever about it. However, it not easy to do. There is actually a >> related >>>>>> ticket: https://issues.apache.org/jira/browse/KAFKA-9282 >>>>>> >>>>>> >>>>>> Hope this helps. >>>>>> -Matthias >>>>>> >>>>>> On 9/2/20 11:09 PM, Pushkar Deole wrote: >>>>>>> Hi, >>>>>>> >>>>>>> I came across articles where it is explained how parallelism is >> handled >>>>>> in >>>>>>> kafka streams. This is what I collected: >>>>>>> When the streams application is reading from multiple topics, the >> topic >>>>>>> with maximum number of partitions is considered for instantiating >>>> stream >>>>>>> tasks so 1 task is instantiated per partition. >>>>>>> Now, if the stream task is reading from multiple topics then the >>>>>> partitions >>>>>>> of multiple topics are shared among those stream tasks. >>>>>>> >>>>>>> For example, Topic A and B has 5 partitions each then 5 tasks are >>>>>>> instantiated and assigned to 5 stream threads where each task is >>>>>> assigned 1 >>>>>>> partition from Topic A and Topic B. >>>>>>> >>>>>>> The question here is : if I would want 1 task to be created for each >>>>>>> partition from the input topic then is this possible? e.g. I would >> want >>>>>> to >>>>>>> have 5 tasks for topic A and 5 for B and then would want 10 threads >> to >>>>>>> handle those. How can this be achieved? >>>>>>> >>>>>> >>>>>> >>>>> >>>> >>>> >>> >> >