Matthias, Any recommendations?
Also, while doing performance test, I observed that the partitions assigned to stream threads are changing. Why would this happen when the instances are not going down? e.g. i see partitions being changed between stream thread consumers. The highlighted in bold are partition numbers of input topic, and when i checked few hours ago, streamthread-12 showed different partition numbers and even different topics analytics-event-filter analytics-engagement *13* 308973 308980 7 analytics-event-filter-StreamThread-12-consumer-965a5a1c-e3f0-4d4e-bc54-1e4e375d2ac8 /10.200.27.207 analytics-event-filter-StreamThread-12-consumer analytics-event-filter analytics-agent-account-state *14 *88053 88057 4 analytics-event-filter-StreamThread-12-consumer-965a5a1c-e3f0-4d4e-bc54-1e4e375d2ac8 /10.200.27.207 analytics-event-filter-StreamThread-12-consumer On Fri, Oct 9, 2020 at 8:20 AM Pushkar Deole <pdeole2...@gmail.com> wrote: > I looked at the task assignment and it looked random for some threads: > e.g. i have 3 topics 24 partitions each and have 3 instances of > application. So, each instance assigned 8 partitions per topic, i.e. total > 24 partitions for 3 topics. > > When I set 8 stream threads, I expected each thread to be assigned 1 > partition from each topic, however some of the threads got assigned > partitions only from 2 of the topics. > Since topic C is not carrying traffic, those threads that did not get > assigned partition from topic C got overloaded than others. > > Topic A > Topic > > On Wed, Oct 7, 2020 at 11:45 PM Matthias J. Sax <mj...@apache.org> wrote: > >> Well, there are many what-ifs and I am not sure if there is general >> advice. >> >> Maybe a more generic response: do you actually observe a concrete issue >> with the task assignment that impacts your app measurable? Or might this >> be a case of premature optimization? >> >> -Matthias >> >> On 10/6/20 10:13 AM, Pushkar Deole wrote: >> > So, what do you suggest to address the topic C with lesser traffic? >> Should >> > we create a separate StreamBuilder and build a separate topology for >> topic >> > C so we can configure number of threads as per our requirement for that >> > topic? >> > >> > On Tue, Oct 6, 2020 at 10:27 PM Matthias J. Sax <mj...@apache.org> >> wrote: >> > >> >> The current assignment would be "round robin". Ie, after all tasks are >> >> created, we just take task-by-task and assign one to the threads >> >> one-by-one. >> >> >> >> Note though, that the assignment algorithm might change at any point, >> so >> >> you should not rely on it. >> >> >> >> We are also not able to know if one topic has less traffic than others >> >> and thus must blindly assume (what is of course a simplification) that >> >> all topics have the same traffic. We only consider the difference >> >> between stateless and stateful tasks atm. >> >> >> >> -Matthias >> >> >> >> On 10/6/20 3:57 AM, Pushkar Deole wrote: >> >>> Matthias, >> >>> >> >>> I am just wondering how the tasks will be spread across threads in >> case I >> >>> have lesser threads than the number of partitions. Specifically >> taking my >> >>> use case, I have 3 inputs topics with 8 partitions each and I can >> >> configure >> >>> 12 threads, so how below topics partitions will be distributed among >> 12 >> >>> threads. >> >>> Note that topic C is generally idle and carries traffic only >> sometimes, >> >> so >> >>> I would want partitions from topic C to be evenly distributed so all >> >>> partitions from topic C don't get assigned to only some of the >> threads. >> >>> >> >>> Topic A - 8 partitions >> >>> Topic B - 8 partitions >> >>> Topic C - 8 partitions >> >>> >> >>> On Fri, Sep 4, 2020 at 9:56 PM Matthias J. Sax <mj...@apache.org> >> wrote: >> >>> >> >>>> That is correct. >> >>>> >> >>>> If topicA has 5 partitions and topicB has 6 partitions, you get 5 >> tasks >> >>>> for the first sub-topology and 6 tasks for the second sub-topology >> and >> >>>> you can run up to 11 threads, each executing one task. >> >>>> >> >>>> >> >>>> -Matthias >> >>>> >> >>>> On 9/4/20 1:30 AM, Pushkar Deole wrote: >> >>>>> Matthias, >> >>>>> >> >>>>> Let's say we have independent sub topologies like: in this case, >> will >> >> the >> >>>>> streams create tasks equal to the total number of partitions from >> >> topicA >> >>>>> and topicB, and can we assign stream thread count that is sum of the >> >>>>> partition of the two topics? >> >>>>> >> >>>>> builder.stream("topicA").filter().to(); >> >>>>> builder.stream("topicB").filter().to(); >> >>>>> >> >>>>> On Thu, Sep 3, 2020 at 8:28 PM Matthias J. Sax <mj...@apache.org> >> >> wrote: >> >>>>> >> >>>>>> Well, it depends on your program. >> >>>>>> >> >>>>>> The reason for the current task creating strategy are joins: If you >> >> have >> >>>>>> two input topic that you want to join, the join happens on a >> >>>>>> per-partition basis, ie, topic-p0 is joined to topicB-p0 etc and >> thus >> >>>>>> both partitions must be assigned to the same task (to get >> >> co-partitioned >> >>>>>> data processed together). >> >>>>>> >> >>>>>> Note, that the following program would create independent tasks as >> it >> >>>>>> consist of two independent sub-topologies: >> >>>>>> >> >>>>>> builder.stream("topicA").filter().to(); >> >>>>>> builder.stream("topicB").filter().to(); >> >>>>>> >> >>>>>> However, the next program would be one sub-topology and thus we >> apply >> >>>>>> the "join" rule (as we don't really know if you actually execute a >> >> join >> >>>>>> or not when we create tasks): >> >>>>>> >> >>>>>> KStream s1 = builder.stream("topicA"); >> >>>>>> builser.stream("topicB").merge(s1).filter().to(); >> >>>>>> >> >>>>>> >> >>>>>> Having said that, I agree that it would be a nice improvement to be >> >> more >> >>>>>> clever about it. However, it not easy to do. There is actually a >> >> related >> >>>>>> ticket: https://issues.apache.org/jira/browse/KAFKA-9282 >> >>>>>> >> >>>>>> >> >>>>>> Hope this helps. >> >>>>>> -Matthias >> >>>>>> >> >>>>>> On 9/2/20 11:09 PM, Pushkar Deole wrote: >> >>>>>>> Hi, >> >>>>>>> >> >>>>>>> I came across articles where it is explained how parallelism is >> >> handled >> >>>>>> in >> >>>>>>> kafka streams. This is what I collected: >> >>>>>>> When the streams application is reading from multiple topics, the >> >> topic >> >>>>>>> with maximum number of partitions is considered for instantiating >> >>>> stream >> >>>>>>> tasks so 1 task is instantiated per partition. >> >>>>>>> Now, if the stream task is reading from multiple topics then the >> >>>>>> partitions >> >>>>>>> of multiple topics are shared among those stream tasks. >> >>>>>>> >> >>>>>>> For example, Topic A and B has 5 partitions each then 5 tasks are >> >>>>>>> instantiated and assigned to 5 stream threads where each task is >> >>>>>> assigned 1 >> >>>>>>> partition from Topic A and Topic B. >> >>>>>>> >> >>>>>>> The question here is : if I would want 1 task to be created for >> each >> >>>>>>> partition from the input topic then is this possible? e.g. I would >> >> want >> >>>>>> to >> >>>>>>> have 5 tasks for topic A and 5 for B and then would want 10 >> threads >> >> to >> >>>>>>> handle those. How can this be achieved? >> >>>>>>> >> >>>>>> >> >>>>>> >> >>>>> >> >>>> >> >>>> >> >>> >> >> >> > >> >