I looked at the task assignment and it looked random for some threads: e.g. i have 3 topics 24 partitions each and have 3 instances of application. So, each instance assigned 8 partitions per topic, i.e. total 24 partitions for 3 topics.
When I set 8 stream threads, I expected each thread to be assigned 1 partition from each topic, however some of the threads got assigned partitions only from 2 of the topics. Since topic C is not carrying traffic, those threads that did not get assigned partition from topic C got overloaded than others. Topic A Topic On Wed, Oct 7, 2020 at 11:45 PM Matthias J. Sax <mj...@apache.org> wrote: > Well, there are many what-ifs and I am not sure if there is general advice. > > Maybe a more generic response: do you actually observe a concrete issue > with the task assignment that impacts your app measurable? Or might this > be a case of premature optimization? > > -Matthias > > On 10/6/20 10:13 AM, Pushkar Deole wrote: > > So, what do you suggest to address the topic C with lesser traffic? > Should > > we create a separate StreamBuilder and build a separate topology for > topic > > C so we can configure number of threads as per our requirement for that > > topic? > > > > On Tue, Oct 6, 2020 at 10:27 PM Matthias J. Sax <mj...@apache.org> > wrote: > > > >> The current assignment would be "round robin". Ie, after all tasks are > >> created, we just take task-by-task and assign one to the threads > >> one-by-one. > >> > >> Note though, that the assignment algorithm might change at any point, so > >> you should not rely on it. > >> > >> We are also not able to know if one topic has less traffic than others > >> and thus must blindly assume (what is of course a simplification) that > >> all topics have the same traffic. We only consider the difference > >> between stateless and stateful tasks atm. > >> > >> -Matthias > >> > >> On 10/6/20 3:57 AM, Pushkar Deole wrote: > >>> Matthias, > >>> > >>> I am just wondering how the tasks will be spread across threads in > case I > >>> have lesser threads than the number of partitions. Specifically taking > my > >>> use case, I have 3 inputs topics with 8 partitions each and I can > >> configure > >>> 12 threads, so how below topics partitions will be distributed among 12 > >>> threads. > >>> Note that topic C is generally idle and carries traffic only sometimes, > >> so > >>> I would want partitions from topic C to be evenly distributed so all > >>> partitions from topic C don't get assigned to only some of the threads. > >>> > >>> Topic A - 8 partitions > >>> Topic B - 8 partitions > >>> Topic C - 8 partitions > >>> > >>> On Fri, Sep 4, 2020 at 9:56 PM Matthias J. Sax <mj...@apache.org> > wrote: > >>> > >>>> That is correct. > >>>> > >>>> If topicA has 5 partitions and topicB has 6 partitions, you get 5 > tasks > >>>> for the first sub-topology and 6 tasks for the second sub-topology and > >>>> you can run up to 11 threads, each executing one task. > >>>> > >>>> > >>>> -Matthias > >>>> > >>>> On 9/4/20 1:30 AM, Pushkar Deole wrote: > >>>>> Matthias, > >>>>> > >>>>> Let's say we have independent sub topologies like: in this case, will > >> the > >>>>> streams create tasks equal to the total number of partitions from > >> topicA > >>>>> and topicB, and can we assign stream thread count that is sum of the > >>>>> partition of the two topics? > >>>>> > >>>>> builder.stream("topicA").filter().to(); > >>>>> builder.stream("topicB").filter().to(); > >>>>> > >>>>> On Thu, Sep 3, 2020 at 8:28 PM Matthias J. Sax <mj...@apache.org> > >> wrote: > >>>>> > >>>>>> Well, it depends on your program. > >>>>>> > >>>>>> The reason for the current task creating strategy are joins: If you > >> have > >>>>>> two input topic that you want to join, the join happens on a > >>>>>> per-partition basis, ie, topic-p0 is joined to topicB-p0 etc and > thus > >>>>>> both partitions must be assigned to the same task (to get > >> co-partitioned > >>>>>> data processed together). > >>>>>> > >>>>>> Note, that the following program would create independent tasks as > it > >>>>>> consist of two independent sub-topologies: > >>>>>> > >>>>>> builder.stream("topicA").filter().to(); > >>>>>> builder.stream("topicB").filter().to(); > >>>>>> > >>>>>> However, the next program would be one sub-topology and thus we > apply > >>>>>> the "join" rule (as we don't really know if you actually execute a > >> join > >>>>>> or not when we create tasks): > >>>>>> > >>>>>> KStream s1 = builder.stream("topicA"); > >>>>>> builser.stream("topicB").merge(s1).filter().to(); > >>>>>> > >>>>>> > >>>>>> Having said that, I agree that it would be a nice improvement to be > >> more > >>>>>> clever about it. However, it not easy to do. There is actually a > >> related > >>>>>> ticket: https://issues.apache.org/jira/browse/KAFKA-9282 > >>>>>> > >>>>>> > >>>>>> Hope this helps. > >>>>>> -Matthias > >>>>>> > >>>>>> On 9/2/20 11:09 PM, Pushkar Deole wrote: > >>>>>>> Hi, > >>>>>>> > >>>>>>> I came across articles where it is explained how parallelism is > >> handled > >>>>>> in > >>>>>>> kafka streams. This is what I collected: > >>>>>>> When the streams application is reading from multiple topics, the > >> topic > >>>>>>> with maximum number of partitions is considered for instantiating > >>>> stream > >>>>>>> tasks so 1 task is instantiated per partition. > >>>>>>> Now, if the stream task is reading from multiple topics then the > >>>>>> partitions > >>>>>>> of multiple topics are shared among those stream tasks. > >>>>>>> > >>>>>>> For example, Topic A and B has 5 partitions each then 5 tasks are > >>>>>>> instantiated and assigned to 5 stream threads where each task is > >>>>>> assigned 1 > >>>>>>> partition from Topic A and Topic B. > >>>>>>> > >>>>>>> The question here is : if I would want 1 task to be created for > each > >>>>>>> partition from the input topic then is this possible? e.g. I would > >> want > >>>>>> to > >>>>>>> have 5 tasks for topic A and 5 for B and then would want 10 threads > >> to > >>>>>>> handle those. How can this be achieved? > >>>>>>> > >>>>>> > >>>>>> > >>>>> > >>>> > >>>> > >>> > >> > > >