Well, there are many what-ifs and I am not sure if there is general advice.

Maybe a more generic response: do you actually observe a concrete issue
with the task assignment that impacts your app measurable? Or might this
be a case of premature optimization?

-Matthias

On 10/6/20 10:13 AM, Pushkar Deole wrote:
> So, what do you suggest to address the topic C with lesser traffic? Should
> we create a separate StreamBuilder and build a separate topology for topic
> C so we can configure number of threads as per our requirement for that
> topic?
> 
> On Tue, Oct 6, 2020 at 10:27 PM Matthias J. Sax <mj...@apache.org> wrote:
> 
>> The current assignment would be "round robin". Ie, after all tasks are
>> created, we just take task-by-task and assign one to the threads
>> one-by-one.
>>
>> Note though, that the assignment algorithm might change at any point, so
>> you should not rely on it.
>>
>> We are also not able to know if one topic has less traffic than others
>> and thus must blindly assume (what is of course a simplification) that
>> all topics have the same traffic. We only consider the difference
>> between stateless and stateful tasks atm.
>>
>> -Matthias
>>
>> On 10/6/20 3:57 AM, Pushkar Deole wrote:
>>> Matthias,
>>>
>>> I am just wondering how the tasks will be spread across threads in case I
>>> have lesser threads than the number of partitions. Specifically taking my
>>> use case, I have 3 inputs topics with 8 partitions each and I can
>> configure
>>> 12 threads, so how below topics partitions will be distributed among 12
>>> threads.
>>> Note that topic C is generally idle and carries traffic only sometimes,
>> so
>>> I would want partitions from topic C to be evenly distributed so all
>>> partitions from topic C don't get assigned to only some of the threads.
>>>
>>> Topic A - 8 partitions
>>> Topic B - 8 partitions
>>> Topic C - 8 partitions
>>>
>>> On Fri, Sep 4, 2020 at 9:56 PM Matthias J. Sax <mj...@apache.org> wrote:
>>>
>>>> That is correct.
>>>>
>>>> If topicA has 5 partitions and topicB has 6 partitions, you get 5 tasks
>>>> for the first sub-topology and 6 tasks for the second sub-topology and
>>>> you can run up to 11 threads, each executing one task.
>>>>
>>>>
>>>> -Matthias
>>>>
>>>> On 9/4/20 1:30 AM, Pushkar Deole wrote:
>>>>> Matthias,
>>>>>
>>>>> Let's say we have independent sub topologies like: in this case, will
>> the
>>>>> streams create tasks equal to the total number of partitions from
>> topicA
>>>>> and topicB, and can we assign stream thread count that is sum of the
>>>>> partition of the two topics?
>>>>>
>>>>> builder.stream("topicA").filter().to();
>>>>> builder.stream("topicB").filter().to();
>>>>>
>>>>> On Thu, Sep 3, 2020 at 8:28 PM Matthias J. Sax <mj...@apache.org>
>> wrote:
>>>>>
>>>>>> Well, it depends on your program.
>>>>>>
>>>>>> The reason for the current task creating strategy are joins: If you
>> have
>>>>>> two input topic that you want to join, the join happens on a
>>>>>> per-partition basis, ie, topic-p0 is joined to topicB-p0 etc and thus
>>>>>> both partitions must be assigned to the same task (to get
>> co-partitioned
>>>>>> data processed together).
>>>>>>
>>>>>> Note, that the following program would create independent tasks as it
>>>>>> consist of two independent sub-topologies:
>>>>>>
>>>>>> builder.stream("topicA").filter().to();
>>>>>> builder.stream("topicB").filter().to();
>>>>>>
>>>>>> However, the next program would be one sub-topology and thus we apply
>>>>>> the "join" rule (as we don't really know if you actually execute a
>> join
>>>>>> or not when we create tasks):
>>>>>>
>>>>>> KStream s1 = builder.stream("topicA");
>>>>>> builser.stream("topicB").merge(s1).filter().to();
>>>>>>
>>>>>>
>>>>>> Having said that, I agree that it would be a nice improvement to be
>> more
>>>>>> clever about it. However, it not easy to do. There is actually a
>> related
>>>>>> ticket: https://issues.apache.org/jira/browse/KAFKA-9282
>>>>>>
>>>>>>
>>>>>> Hope this helps.
>>>>>>   -Matthias
>>>>>>
>>>>>> On 9/2/20 11:09 PM, Pushkar Deole wrote:
>>>>>>> Hi,
>>>>>>>
>>>>>>> I came across articles where it is explained how parallelism is
>> handled
>>>>>> in
>>>>>>> kafka streams. This is what I collected:
>>>>>>> When the streams application is reading from multiple topics, the
>> topic
>>>>>>> with maximum number of partitions is considered for instantiating
>>>> stream
>>>>>>> tasks so 1 task is instantiated per partition.
>>>>>>> Now, if the stream task is reading from multiple topics then the
>>>>>> partitions
>>>>>>> of multiple topics are shared among those stream tasks.
>>>>>>>
>>>>>>> For example, Topic A and B has 5 partitions each then 5 tasks are
>>>>>>> instantiated and assigned to 5 stream threads where each task is
>>>>>> assigned 1
>>>>>>> partition from Topic A and Topic B.
>>>>>>>
>>>>>>> The question here is : if I would want 1 task to be created for each
>>>>>>> partition from the input topic then is this possible? e.g. I would
>> want
>>>>>> to
>>>>>>> have 5 tasks for topic A and 5 for B and then would want 10 threads
>> to
>>>>>>> handle those. How can this be achieved?
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>
>>
> 

Reply via email to