Matthias, I am just wondering how the tasks will be spread across threads in case I have lesser threads than the number of partitions. Specifically taking my use case, I have 3 inputs topics with 8 partitions each and I can configure 12 threads, so how below topics partitions will be distributed among 12 threads. Note that topic C is generally idle and carries traffic only sometimes, so I would want partitions from topic C to be evenly distributed so all partitions from topic C don't get assigned to only some of the threads.
Topic A - 8 partitions Topic B - 8 partitions Topic C - 8 partitions On Fri, Sep 4, 2020 at 9:56 PM Matthias J. Sax <mj...@apache.org> wrote: > That is correct. > > If topicA has 5 partitions and topicB has 6 partitions, you get 5 tasks > for the first sub-topology and 6 tasks for the second sub-topology and > you can run up to 11 threads, each executing one task. > > > -Matthias > > On 9/4/20 1:30 AM, Pushkar Deole wrote: > > Matthias, > > > > Let's say we have independent sub topologies like: in this case, will the > > streams create tasks equal to the total number of partitions from topicA > > and topicB, and can we assign stream thread count that is sum of the > > partition of the two topics? > > > > builder.stream("topicA").filter().to(); > > builder.stream("topicB").filter().to(); > > > > On Thu, Sep 3, 2020 at 8:28 PM Matthias J. Sax <mj...@apache.org> wrote: > > > >> Well, it depends on your program. > >> > >> The reason for the current task creating strategy are joins: If you have > >> two input topic that you want to join, the join happens on a > >> per-partition basis, ie, topic-p0 is joined to topicB-p0 etc and thus > >> both partitions must be assigned to the same task (to get co-partitioned > >> data processed together). > >> > >> Note, that the following program would create independent tasks as it > >> consist of two independent sub-topologies: > >> > >> builder.stream("topicA").filter().to(); > >> builder.stream("topicB").filter().to(); > >> > >> However, the next program would be one sub-topology and thus we apply > >> the "join" rule (as we don't really know if you actually execute a join > >> or not when we create tasks): > >> > >> KStream s1 = builder.stream("topicA"); > >> builser.stream("topicB").merge(s1).filter().to(); > >> > >> > >> Having said that, I agree that it would be a nice improvement to be more > >> clever about it. However, it not easy to do. There is actually a related > >> ticket: https://issues.apache.org/jira/browse/KAFKA-9282 > >> > >> > >> Hope this helps. > >> -Matthias > >> > >> On 9/2/20 11:09 PM, Pushkar Deole wrote: > >>> Hi, > >>> > >>> I came across articles where it is explained how parallelism is handled > >> in > >>> kafka streams. This is what I collected: > >>> When the streams application is reading from multiple topics, the topic > >>> with maximum number of partitions is considered for instantiating > stream > >>> tasks so 1 task is instantiated per partition. > >>> Now, if the stream task is reading from multiple topics then the > >> partitions > >>> of multiple topics are shared among those stream tasks. > >>> > >>> For example, Topic A and B has 5 partitions each then 5 tasks are > >>> instantiated and assigned to 5 stream threads where each task is > >> assigned 1 > >>> partition from Topic A and Topic B. > >>> > >>> The question here is : if I would want 1 task to be created for each > >>> partition from the input topic then is this possible? e.g. I would want > >> to > >>> have 5 tasks for topic A and 5 for B and then would want 10 threads to > >>> handle those. How can this be achieved? > >>> > >> > >> > > > >