I don't think it has anything to do with your specific topology, but it
might be
that the "stickiness" is overriding the "data parallelism balance" in the
current
assignment algorithm. There are a lot of different factors to optimize for,
so we
end up making tradeoffs with a rough hierarchy of these factors:

1) task balance (number of tasks)
2) stickiness (instance had this task before, helps with state restoration)
3) data parallelism (distributing tasks of the same subtopology across
different instances)

In other words, within the constraint that instances should have an equal
number of
total tasks (proportionate to their number of stream thread, differing by
at most 1), the
next highest criteria is "which instance had this task last". So, if you
bring up your
instances one at a time, the first one will end up with all the tasks
initially. When you
bring up the second, we will go through the tasks and assign them to their
previous
owner if possible, ie, until that previous owner is full (the balance
constraint). Since
we assign tasks in subtopology order, eg t1p0, t1p1, t2p0, t2p1, t3 ... the
"first" tasks
will presumably all end up on your first instance. It sounds like in your
case the "first"
tasks are the ones for the input topic.

I hope that made sense and didn't just confuse things further -- if you're
interested in
the assignment code I'm referring to take a look at StickyTaskAssignor. The
good news
is that KIP-441 will definitely address this limitation of the current
assignment algorithm.

On Mon, Mar 23, 2020 at 7:57 AM Stephen Young
<stephendeyo...@yahoo.co.uk.invalid> wrote:

> Thanks for your help Sophie and Matthias.
>
> In my cloud environment I'm using kafka version 2.2.1. I've tested this
> locally with 2.4.1 and I can see the same issue with 3 local instances. As
> I add more local instances I start to see better balancing.
>
> I was wondering if the issue could be because my kafka streams app reads
> the input topic as a ktable. I want this so it is simple for producers to
> send deletes and updates to records in the topic (by nullifying keys etc)
> and also so my streams app automatically recalculates various aggregations
> in all of the sub-topologies. Could this be the cause of the problem?
>
> Stephen
>
> On Fri, 20 Mar 2020 at 17:33, Sophie Blee-Goldman <sop...@confluent.io>
> wrote:
>
> > Although it's not the main objective, one side effect of KIP-441 should
> be
> > improved balance of the final stable assignment. By warming up standbys
> > before switching them over to active tasks we can achieve stickiness
> > without
> > sacrificing balance in the followup rebalance.
> >
> > This work is targeted for the next release, so if you do still observe
> > issues in
> > newer versions I'd recommend trying out 2.6 when it comes out.
> >
> > You can read up on the details and track the progress of this KIP in the
> > KIP document:
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-441:+Smooth+Scaling+Out+for+Kafka+Streams
> > JIRA: https://issues.apache.org/jira/browse/KAFKA-6145?src=confmacro
> >
> > Cheers,
> > Sophie
> >
> > On Fri, Mar 20, 2020 at 10:20 AM Matthias J. Sax <mj...@apache.org>
> wrote:
> >
> > > Partition assignment, or move specific "task placement" for Kafka
> > > Streams, is a hard-coded algorithm (cf.
> > >
> > >
> >
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
> > > ).
> > > The algorithm actually tires to assign different tasks from the same
> > > sub-topology to different instances and thus, your 6 input topic
> > > partitions should ideally get balanced over your 3 instance (ie, 2
> each,
> > > one for each thread).
> > >
> > > However, the algorithm needs to trade-off load balancing and stickiness
> > > (to avoid unnecessary, expensive state migration) and thus, the
> > > placement strategy is best effort only. Also, in older versions there
> > > was some issue that got fixed in newer version (ie, 2.0.x and newer).
> > > Not sure what version you are on (as you linked to 1.0 docs, maybe
> > > upgrade resolves your issue?).
> > >
> > > Compare:
> > >
> > >  - https://issues.apache.org/jira/browse/KAFKA-6039
> > >  - https://issues.apache.org/jira/browse/KAFKA-7144
> > >
> > > If you still observe issues in never version, please comment on the
> > > tickets ofr create a new ticket describing the problem. Or even better,
> > > do a PR to help improving the "task placement" algorithm. :)
> > >
> > >
> > > -Matthias
> > >
> > >
> > > On 3/20/20 6:47 AM, Stephen Young wrote:
> > > > Thanks Guozhang. That's really helpful!
> > > >
> > > > Are you able to explain a bit more about how it would work for my use
> > > case? As I understand it this 'repartition' method enables us to
> > > materialize a stream to a new topic with a custom partitioning
> strategy.
> > > >
> > > > But my problem is not how the topic is partitioned. My issue is that
> > the
> > > partitions of the source topic need to be spread equally amongst all
> the
> > > available threads. How could 'repartition' help with this?
> > > >
> > > > Stephen
> > > >
> > > > On 2020/03/19 23:20:54, Guozhang Wang <wangg...@gmail.com> wrote:
> > > >> Hi Stephen,
> > > >>
> > > >> We've deprecated the partition-grouper API due to its drawbacks in
> > > >> upgrading compatibility (consider if you want to change the
> > > num.partitions
> > > >> while evolving your application), and instead we're working on
> KIP-221
> > > for
> > > >> the same purpose of your use case:
> > > >>
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+DSL+with+Connecting+Topic+Creation+and+Repartition+Hint
> > > >>
> > > >>
> > > >> Guozhang
> > > >>
> > > >> On Wed, Mar 18, 2020 at 7:48 AM Stephen Young
> > > >> <wintersg...@googlemail.com.invalid> wrote:
> > > >>
> > > >>> I have a question about partition assignment for a kafka streams
> app.
> > > As I
> > > >>> understand it the more complex your topology is the greater the
> > number
> > > of
> > > >>> internal topics kafka streams will create. In my case the app has 8
> > > graphs
> > > >>> in the topology. There are 6 partitions for each graph (this
> matches
> > > the
> > > >>> number of partitions of the input topic). So there are 48
> partitions
> > > that
> > > >>> the app needs to handle. These get balanced equally across all 3
> > > servers
> > > >>> where the app is running (each server also has 2 threads so there
> > are 6
> > > >>> available instances of the app).
> > > >>>
> > > >>> The problem for me is that the partitions of the input topic have
> the
> > > >>> heaviest workload. But these 6 partitions are not distributed
> evenly
> > > >>> amongst the instances. They are just considered 6 partitions
> amongst
> > > the 48
> > > >>> the app needs to balance. But this means if a server gets most or
> all
> > > of
> > > >>> these 6 partitions, it ends up exhausting all of the resources on
> > that
> > > >>> server.
> > > >>>
> > > >>> Is there a way of equally balancing these 6 specific partitions
> > > amongst the
> > > >>> available instances? I thought writing a custom partition grouper
> > might
> > > >>> help here:
> > > >>>
> > > >>>
> > > >>>
> > >
> >
> https://kafka.apache.org/10/documentation/streams/developer-guide/config-streams.html#partition-grouper
> > > >>>
> > > >>> But the advice seems to be to not do this otherwise you risk
> breaking
> > > the
> > > >>> app.
> > > >>>
> > > >>> Thanks!
> > > >>>
> > > >>
> > > >>
> > > >> --
> > > >> -- Guozhang
> > > >>
> > >
> > >
> >
>

Reply via email to