So, what do you suggest to address the topic C with lesser traffic? Should we create a separate StreamBuilder and build a separate topology for topic C so we can configure number of threads as per our requirement for that topic?
On Tue, Oct 6, 2020 at 10:27 PM Matthias J. Sax <mj...@apache.org> wrote: > The current assignment would be "round robin". Ie, after all tasks are > created, we just take task-by-task and assign one to the threads > one-by-one. > > Note though, that the assignment algorithm might change at any point, so > you should not rely on it. > > We are also not able to know if one topic has less traffic than others > and thus must blindly assume (what is of course a simplification) that > all topics have the same traffic. We only consider the difference > between stateless and stateful tasks atm. > > -Matthias > > On 10/6/20 3:57 AM, Pushkar Deole wrote: > > Matthias, > > > > I am just wondering how the tasks will be spread across threads in case I > > have lesser threads than the number of partitions. Specifically taking my > > use case, I have 3 inputs topics with 8 partitions each and I can > configure > > 12 threads, so how below topics partitions will be distributed among 12 > > threads. > > Note that topic C is generally idle and carries traffic only sometimes, > so > > I would want partitions from topic C to be evenly distributed so all > > partitions from topic C don't get assigned to only some of the threads. > > > > Topic A - 8 partitions > > Topic B - 8 partitions > > Topic C - 8 partitions > > > > On Fri, Sep 4, 2020 at 9:56 PM Matthias J. Sax <mj...@apache.org> wrote: > > > >> That is correct. > >> > >> If topicA has 5 partitions and topicB has 6 partitions, you get 5 tasks > >> for the first sub-topology and 6 tasks for the second sub-topology and > >> you can run up to 11 threads, each executing one task. > >> > >> > >> -Matthias > >> > >> On 9/4/20 1:30 AM, Pushkar Deole wrote: > >>> Matthias, > >>> > >>> Let's say we have independent sub topologies like: in this case, will > the > >>> streams create tasks equal to the total number of partitions from > topicA > >>> and topicB, and can we assign stream thread count that is sum of the > >>> partition of the two topics? > >>> > >>> builder.stream("topicA").filter().to(); > >>> builder.stream("topicB").filter().to(); > >>> > >>> On Thu, Sep 3, 2020 at 8:28 PM Matthias J. Sax <mj...@apache.org> > wrote: > >>> > >>>> Well, it depends on your program. > >>>> > >>>> The reason for the current task creating strategy are joins: If you > have > >>>> two input topic that you want to join, the join happens on a > >>>> per-partition basis, ie, topic-p0 is joined to topicB-p0 etc and thus > >>>> both partitions must be assigned to the same task (to get > co-partitioned > >>>> data processed together). > >>>> > >>>> Note, that the following program would create independent tasks as it > >>>> consist of two independent sub-topologies: > >>>> > >>>> builder.stream("topicA").filter().to(); > >>>> builder.stream("topicB").filter().to(); > >>>> > >>>> However, the next program would be one sub-topology and thus we apply > >>>> the "join" rule (as we don't really know if you actually execute a > join > >>>> or not when we create tasks): > >>>> > >>>> KStream s1 = builder.stream("topicA"); > >>>> builser.stream("topicB").merge(s1).filter().to(); > >>>> > >>>> > >>>> Having said that, I agree that it would be a nice improvement to be > more > >>>> clever about it. However, it not easy to do. There is actually a > related > >>>> ticket: https://issues.apache.org/jira/browse/KAFKA-9282 > >>>> > >>>> > >>>> Hope this helps. > >>>> -Matthias > >>>> > >>>> On 9/2/20 11:09 PM, Pushkar Deole wrote: > >>>>> Hi, > >>>>> > >>>>> I came across articles where it is explained how parallelism is > handled > >>>> in > >>>>> kafka streams. This is what I collected: > >>>>> When the streams application is reading from multiple topics, the > topic > >>>>> with maximum number of partitions is considered for instantiating > >> stream > >>>>> tasks so 1 task is instantiated per partition. > >>>>> Now, if the stream task is reading from multiple topics then the > >>>> partitions > >>>>> of multiple topics are shared among those stream tasks. > >>>>> > >>>>> For example, Topic A and B has 5 partitions each then 5 tasks are > >>>>> instantiated and assigned to 5 stream threads where each task is > >>>> assigned 1 > >>>>> partition from Topic A and Topic B. > >>>>> > >>>>> The question here is : if I would want 1 task to be created for each > >>>>> partition from the input topic then is this possible? e.g. I would > want > >>>> to > >>>>> have 5 tasks for topic A and 5 for B and then would want 10 threads > to > >>>>> handle those. How can this be achieved? > >>>>> > >>>> > >>>> > >>> > >> > >> > > >