Matthias,

Any recommendations?

Also, while doing performance test, I observed that the partitions assigned
to stream threads are changing. Why would this happen when the instances
are not going down?

e.g. i see partitions being changed between stream thread consumers. The
highlighted in bold are partition numbers of input topic, and when i
checked few hours ago, streamthread-12 showed different partition numbers
and even different topics

analytics-event-filter analytics-engagement          *13*         308973
       308980          7
analytics-event-filter-StreamThread-12-consumer-965a5a1c-e3f0-4d4e-bc54-1e4e375d2ac8
/10.200.27.207  analytics-event-filter-StreamThread-12-consumer
analytics-event-filter analytics-agent-account-state *14         *88053
      88057           4
analytics-event-filter-StreamThread-12-consumer-965a5a1c-e3f0-4d4e-bc54-1e4e375d2ac8
/10.200.27.207  analytics-event-filter-StreamThread-12-consumer

On Fri, Oct 9, 2020 at 8:20 AM Pushkar Deole <pdeole2...@gmail.com> wrote:

> I looked at the task assignment and it looked random for some threads:
> e.g. i have 3 topics 24 partitions each and have 3 instances of
> application. So, each instance assigned 8 partitions per topic, i.e. total
> 24 partitions for 3 topics.
>
> When I set 8 stream threads, I expected each thread to be assigned 1
> partition from each topic, however some of the threads got assigned
> partitions only from 2 of the topics.
> Since topic C is not carrying traffic, those threads that did not get
> assigned partition from topic C got overloaded than others.
>
> Topic A
> Topic
>
> On Wed, Oct 7, 2020 at 11:45 PM Matthias J. Sax <mj...@apache.org> wrote:
>
>> Well, there are many what-ifs and I am not sure if there is general
>> advice.
>>
>> Maybe a more generic response: do you actually observe a concrete issue
>> with the task assignment that impacts your app measurable? Or might this
>> be a case of premature optimization?
>>
>> -Matthias
>>
>> On 10/6/20 10:13 AM, Pushkar Deole wrote:
>> > So, what do you suggest to address the topic C with lesser traffic?
>> Should
>> > we create a separate StreamBuilder and build a separate topology for
>> topic
>> > C so we can configure number of threads as per our requirement for that
>> > topic?
>> >
>> > On Tue, Oct 6, 2020 at 10:27 PM Matthias J. Sax <mj...@apache.org>
>> wrote:
>> >
>> >> The current assignment would be "round robin". Ie, after all tasks are
>> >> created, we just take task-by-task and assign one to the threads
>> >> one-by-one.
>> >>
>> >> Note though, that the assignment algorithm might change at any point,
>> so
>> >> you should not rely on it.
>> >>
>> >> We are also not able to know if one topic has less traffic than others
>> >> and thus must blindly assume (what is of course a simplification) that
>> >> all topics have the same traffic. We only consider the difference
>> >> between stateless and stateful tasks atm.
>> >>
>> >> -Matthias
>> >>
>> >> On 10/6/20 3:57 AM, Pushkar Deole wrote:
>> >>> Matthias,
>> >>>
>> >>> I am just wondering how the tasks will be spread across threads in
>> case I
>> >>> have lesser threads than the number of partitions. Specifically
>> taking my
>> >>> use case, I have 3 inputs topics with 8 partitions each and I can
>> >> configure
>> >>> 12 threads, so how below topics partitions will be distributed among
>> 12
>> >>> threads.
>> >>> Note that topic C is generally idle and carries traffic only
>> sometimes,
>> >> so
>> >>> I would want partitions from topic C to be evenly distributed so all
>> >>> partitions from topic C don't get assigned to only some of the
>> threads.
>> >>>
>> >>> Topic A - 8 partitions
>> >>> Topic B - 8 partitions
>> >>> Topic C - 8 partitions
>> >>>
>> >>> On Fri, Sep 4, 2020 at 9:56 PM Matthias J. Sax <mj...@apache.org>
>> wrote:
>> >>>
>> >>>> That is correct.
>> >>>>
>> >>>> If topicA has 5 partitions and topicB has 6 partitions, you get 5
>> tasks
>> >>>> for the first sub-topology and 6 tasks for the second sub-topology
>> and
>> >>>> you can run up to 11 threads, each executing one task.
>> >>>>
>> >>>>
>> >>>> -Matthias
>> >>>>
>> >>>> On 9/4/20 1:30 AM, Pushkar Deole wrote:
>> >>>>> Matthias,
>> >>>>>
>> >>>>> Let's say we have independent sub topologies like: in this case,
>> will
>> >> the
>> >>>>> streams create tasks equal to the total number of partitions from
>> >> topicA
>> >>>>> and topicB, and can we assign stream thread count that is sum of the
>> >>>>> partition of the two topics?
>> >>>>>
>> >>>>> builder.stream("topicA").filter().to();
>> >>>>> builder.stream("topicB").filter().to();
>> >>>>>
>> >>>>> On Thu, Sep 3, 2020 at 8:28 PM Matthias J. Sax <mj...@apache.org>
>> >> wrote:
>> >>>>>
>> >>>>>> Well, it depends on your program.
>> >>>>>>
>> >>>>>> The reason for the current task creating strategy are joins: If you
>> >> have
>> >>>>>> two input topic that you want to join, the join happens on a
>> >>>>>> per-partition basis, ie, topic-p0 is joined to topicB-p0 etc and
>> thus
>> >>>>>> both partitions must be assigned to the same task (to get
>> >> co-partitioned
>> >>>>>> data processed together).
>> >>>>>>
>> >>>>>> Note, that the following program would create independent tasks as
>> it
>> >>>>>> consist of two independent sub-topologies:
>> >>>>>>
>> >>>>>> builder.stream("topicA").filter().to();
>> >>>>>> builder.stream("topicB").filter().to();
>> >>>>>>
>> >>>>>> However, the next program would be one sub-topology and thus we
>> apply
>> >>>>>> the "join" rule (as we don't really know if you actually execute a
>> >> join
>> >>>>>> or not when we create tasks):
>> >>>>>>
>> >>>>>> KStream s1 = builder.stream("topicA");
>> >>>>>> builser.stream("topicB").merge(s1).filter().to();
>> >>>>>>
>> >>>>>>
>> >>>>>> Having said that, I agree that it would be a nice improvement to be
>> >> more
>> >>>>>> clever about it. However, it not easy to do. There is actually a
>> >> related
>> >>>>>> ticket: https://issues.apache.org/jira/browse/KAFKA-9282
>> >>>>>>
>> >>>>>>
>> >>>>>> Hope this helps.
>> >>>>>>   -Matthias
>> >>>>>>
>> >>>>>> On 9/2/20 11:09 PM, Pushkar Deole wrote:
>> >>>>>>> Hi,
>> >>>>>>>
>> >>>>>>> I came across articles where it is explained how parallelism is
>> >> handled
>> >>>>>> in
>> >>>>>>> kafka streams. This is what I collected:
>> >>>>>>> When the streams application is reading from multiple topics, the
>> >> topic
>> >>>>>>> with maximum number of partitions is considered for instantiating
>> >>>> stream
>> >>>>>>> tasks so 1 task is instantiated per partition.
>> >>>>>>> Now, if the stream task is reading from multiple topics then the
>> >>>>>> partitions
>> >>>>>>> of multiple topics are shared among those stream tasks.
>> >>>>>>>
>> >>>>>>> For example, Topic A and B has 5 partitions each then 5 tasks are
>> >>>>>>> instantiated and assigned to 5 stream threads where each task is
>> >>>>>> assigned 1
>> >>>>>>> partition from Topic A and Topic B.
>> >>>>>>>
>> >>>>>>> The question here is : if I would want 1 task to be created for
>> each
>> >>>>>>> partition from the input topic then is this possible? e.g. I would
>> >> want
>> >>>>>> to
>> >>>>>>> have 5 tasks for topic A and 5 for B and then would want 10
>> threads
>> >> to
>> >>>>>>> handle those. How can this be achieved?
>> >>>>>>>
>> >>>>>>
>> >>>>>>
>> >>>>>
>> >>>>
>> >>>>
>> >>>
>> >>
>> >
>>
>

Reply via email to