> example: launching 4 processes on 4 different machines with 4 threads per
> process on 12 partition topic will have each machine with 3 assigned
> threads and one doing nothing. more over no matter what number of threads
> each process will have , as long as it is bigger then 3, the end result
> will stay the same with 3 assigned threads per machine, and the rest of
> them doing nothing.
> 
> Ideally, I would want something like consumer set/ensemble/{what ever word
> not group} that will be used to denote a group of threads on a machine,
> so that when specific threads request to join a consumer group they will be
> elected so that they are balanced across the machine denoted by the
> consumer set/ensemble identifier.
> 
> will partition.assignment.strategy="roundrobin" help with that?

You can give it a try. It does have the constraint the subscription is
identical across your machines. i.e., it should work in your above
scenario (4 process on 4 machines with 4 threads per process). The
partition assignment will assign partitions to threads in a
round-robin manner. The difference of max(owned) and min(owned) will
be exactly one.

We can discuss improving partition assignment strategies in the 0.9
release with the new consumer.

On Thu, Oct 30, 2014 at 08:52:40AM +0200, Shlomi Hazan wrote:
> Jun, Joel,
> 
> The issue here is exactly which threads are left out, and which threads are
> assigned partitions.
> Maybe I am missing something but what I want is to balance consuming
> threads across machines/processes, regardless of the amount of threads the
> machine launches (side effect: this way if you have more threads than
> partitions you get a reserve force awaiting to charge in).
> 
> example: launching 4 processes on 4 different machines with 4 threads per
> process on 12 partition topic will have each machine with 3 assigned
> threads and one doing nothing. more over no matter what number of threads
> each process will have , as long as it is bigger then 3, the end result
> will stay the same with 3 assigned threads per machine, and the rest of
> them doing nothing.
> 
> Ideally, I would want something like consumer set/ensemble/{what ever word
> not group} that will be used to denote a group of threads on a machine,
> so that when specific threads request to join a consumer group they will be
> elected so that they are balanced across the machine denoted by the
> consumer set/ensemble identifier.
> 
> will partition.assignment.strategy="roundrobin" help with that?
> 10x,
> Shlomi
> 
> On Thu, Oct 30, 2014 at 4:00 AM, Joel Koshy <jjkosh...@gmail.com> wrote:
> 
> > Shlomi,
> >
> > If you are on trunk, and your consumer subscriptions are identical
> > then you can try a slightly different partition assignment strategy.
> > Try setting partition.assignment.strategy="roundrobin" in your
> > consumer config.
> >
> > Thanks,
> >
> > Joel
> >
> > On Wed, Oct 29, 2014 at 06:29:30PM -0700, Jun Rao wrote:
> > > By consumer, I actually mean consumer threads (the thread # you used when
> > > creating consumer streams). So, if you have 4 consumers, each with 4
> > > threads, 4 of the threads will not get any data with 12 partitions. It
> > > sounds like that's not what you get?  What's the output of the
> > > ConsumerOffsetChecker (see http://kafka.apache.org/documentation.html)?
> > >
> > > For consumer.id, you don't need to set it in general. We generate some
> > uuid
> > > automatically.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Tue, Oct 28, 2014 at 4:59 AM, Shlomi Hazan <shl...@viber.com> wrote:
> > >
> > > > Jun,
> > > >
> > > > I hear you say "partitions are evenly distributed among all consumers
> > in
> > > > the same group", yet I did bump into a case where launching a process
> > with
> > > > X high level consumer API threads took over all partitions, sending
> > > > existing consumers to be unemployed.
> > > >
> > > > According to the claim above, and if I am not mistaken:
> > > > on a topic T with 12 partitions and 3 consumers C1-C3 on the same group
> > > > with 4 threads each,
> > > > adding a new consumer C4 with 12 threads should yield the following
> > > > balance:
> > > > C1-C3 each relinquish a single partition holding only 3 partitions
> > each.
> > > > C4 holds the 3 partitions relinquished by C1-C3.
> > > > Yet, in the case I described what happened is that C4 gained all 12
> > > > partitions and sent C1-C3 out of business with 0 partitions each.
> > > > Now maybe I overlooked something but I think I did see that happen.
> > > >
> > > > BTW
> > > > What key is used to distinguish one consumer from another? "
> > consumer.id"?
> > > > docs for "consumer.id" are "Generated automatically if not set."
> > > > What is the best practice for setting it's value? leave empty? is
> > server
> > > > host name good enough? what are the considerations?
> > > > When using the high level consumer API, are all threads identified as
> > the
> > > > same consumer? I guess they are, right?...
> > > >
> > > > Thanks,
> > > > Shlomi
> > > >
> > > >
> > > > On Tue, Oct 28, 2014 at 4:21 AM, Jun Rao <jun...@gmail.com> wrote:
> > > >
> > > > > You can take a look at the "consumer rebalancing algorithm" part in
> > > > > http://kafka.apache.org/documentation.html. Basically, partitions
> > are
> > > > > evenly distributed among all consumers in the same group. If there
> > are
> > > > more
> > > > > consumers in a group than partitions, some consumers will never get
> > any
> > > > > data.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jun
> > > > >
> > > > > On Mon, Oct 27, 2014 at 4:14 AM, Shlomi Hazan <shl...@viber.com>
> > wrote:
> > > > >
> > > > > > Hi All,
> > > > > >
> > > > > > Using Kafka's high consumer API I have bumped into a situation
> > where
> > > > > > launching a consumer process P1 with X consuming threads on a topic
> > > > with
> > > > > X
> > > > > > partition kicks out all other existing consumer threads that
> > consumed
> > > > > prior
> > > > > > to launching the process P.
> > > > > > That is, consumer process P is stealing all partitions from all
> > other
> > > > > > consumer processes.
> > > > > >
> > > > > > While understandable, it makes it hard to size & deploy a cluster
> > with
> > > > a
> > > > > > number of partitions that will both allow balancing of consumption
> > > > across
> > > > > > consuming processes, dividing the partitions across consumers by
> > > > setting
> > > > > > each consumer with it's share of the total number of partitions on
> > the
> > > > > > consumed topic, and on the other hand provide room for growth and
> > > > > addition
> > > > > > of new consumers to help with increasing traffic into the cluster
> > and
> > > > the
> > > > > > topic.
> > > > > >
> > > > > > This stealing effect forces me to have more partitions then really
> > > > needed
> > > > > > at the moment, planning for future growth, or stick to what I need
> > and
> > > > > > trust the option to add partitions which comes with a price in
> > terms of
> > > > > > restarting consumers, bumping into out of order messages (hash
> > > > > > partitioning) etc.
> > > > > >
> > > > > > Is this policy of stealing is intended, or did I just jump to
> > > > > conclusions?
> > > > > > what is the way to cope with the sizing question?
> > > > > >
> > > > > > Shlomi
> > > > > >
> > > > >
> > > >
> >
> >

Reply via email to