Hi Jason,

In my test case, I set enable.auto.commit=false and then I need to use
commitSync() in the
onPartitionsAssigned() to avoid consumer rebalanced.
Actually, I place it in for(TopicPartition partition: partitions),
so that commitSync() will not execute when partitions are first assigned.

If set enable.auto.commit=true in my test case,
when consumer rebalanced I checked the offset position and it is not
overlap before.
I guess if set enable.auto.commit=true, offset commit will be triggered
when rebalancing happened and wakeup() called.

These just are my thought, is that right?

-Ken

2016-03-09 2:06 GMT+08:00 Jason Gustafson <ja...@confluent.io>:

> Hey Ken,
>
> Whether to use subscribe or assign depends mainly on whether you need to
> use consumer groups to distribute the topic load. If you use subscribe(),
> then the partitions for the subscribed topics will be divided among all
> consumers sharing the same groupId. With assign(), you have to provide the
> partitions explicitly, and there won't be any coordination with other
> consumers. We expect most users will use subscribe() to take advantage of
> consumer groups, but there are some cases where you need finer control. If
> you're migrating from the old high level API, that is definitely the way to
> go.
>
> By the way, I'm a little puzzled why you'd want to commitSync() in the
> onPartitionsAssigned() callback. When partitions are first assigned, you
> wouldn't expect local offsets to be different from whatever was last
> committed since there hasn't been any consumption yet. It's common,
> however, for users to commit offsets in the onPartitionsRevoked() callback
> when they are using a manual commit policy. This is basically the last
> chance to commit offsets before the group rebalances.
>
> -Jason
>
> On Tue, Mar 8, 2016 at 9:02 AM, Ken Cheng <kk543...@gmail.com> wrote:
>
> > Hi Jason,
> >
> > Thanks for your detailed explain.
> > Face to this situation, I wanna discuss more a little bit,
> > I try two approach to avoid it in Kafka 0.9.0.1, and both work correctly.
> >
> > 1. Using subscribe(topics, listener)  and
> > implements onPartitionsAssigned(partitions) , it manually run
> > consumer.commitSyc() when onPartitionsAssigned be called
> > 2. Using consumer.assign(topicpartitions)  and assign partition number
> > directly
> >
> > I think first approach is better. It has more flexible for dynamic
> > partition situation and suitable for persistent consumer service.
> > If migrate form high level api to new consumer ,this approach is more
> > straightly.
> >
> > Second approach only use for specific requirement, but it has to control
> > more detail information.
> > It is suitable for target clear job or web service to get given length
> > offsets
> >
> > how do you think?
> >
> >
> >
> > 2016-03-08 1:54 GMT+08:00 Jason Gustafson <ja...@confluent.io>:
> >
> > > Hi Ken,
> > >
> > > It looks like what happened is this:
> > >
> > > 1. First thread joins the group and is assigned partitions 0 and 1
> > > 2. First thread races through a bunch of messages from these
> partitions.
> > > 3. Second thread joins and is assigned partition 1 (leaving partition 0
> > for
> > > the first thread)
> > > 4. Both threads read whatever is left from their respective partitions
> > >
> > > This is normal (expected) behavior. It might seem a little odd that the
> > > first thread gets assigned both partitions initially even though both
> > > threads were started at about the same time. This is the result of the
> > > coordinator somewhat optimistically creating the group and assigning
> > > partitions immediately when the first member joins. When the second
> > member
> > > joins, it forces a partition rebalance, but there will be some latency
> > > before the first member rejoins. We have considered adding a delay on
> > > initial group creation to give time for more members to join, but as of
> > > yet, we haven't implemented it.
> > >
> > > -Jason
> > >
> > > On Sun, Mar 6, 2016 at 7:11 PM, Ken Cheng <kk543...@gmail.com> wrote:
> > >
> > > >
> > > >
> > > > hi
> > > >
> > > >
> > > >
> > > > I'm trying new consumer features in my local PC and VM
> > > >
> > > > and reference code from
> > > >
> > >
> >
> http://www.confluent.io/blog/tutorial-getting-started-with-the-new-apache-kafka-0.9-consumer-client
> > > >
> > > > ,but execute result is different
> > > >
> > > >
> > > >
> > > > I use 2 threads consumer with same group.id consume one topic with 2
> > > > partition
> > > >
> > > > ,but one partition assign to two consumer instance !
> > > >
> > > >
> > > >
> > > > e.g.
> > > >
> > > > If you check the attachement exelog.txt
> > > >
> > > > 1. you can find term "0: {partition=0, offset=0}" twice, means that
> > > > instance pool same offset again
> > > >
> > > > 2. "1: {partition=1, offset=0}" and "0: {partition=1, offset=0}" will
> > be
> > > > found , means same partition assign to two consumer instance
> > > >
> > > >
> > > >
> > > > It is really strange for me
> > > >
> > > > does anyone know why ?
> > > >
> > > >
> > > >
> > > > plz give me a help or tips thanks.
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > my broker setup:
> > > >
> > > > Kafka version: confluent platform 2.0.1, standalone
> > > >
> > > > zookeeper version : 3.4.6, 3 node cluster
> > > >
> > > > topic partition : 2
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > Best Regards,
> > > >
> > > > Ken
> > > >
> > > >
> > > >
> > >
> >
>

Reply via email to