Hi Jason, In my test case, I set enable.auto.commit=false and then I need to use commitSync() in the onPartitionsAssigned() to avoid consumer rebalanced. Actually, I place it in for(TopicPartition partition: partitions), so that commitSync() will not execute when partitions are first assigned.
If set enable.auto.commit=true in my test case, when consumer rebalanced I checked the offset position and it is not overlap before. I guess if set enable.auto.commit=true, offset commit will be triggered when rebalancing happened and wakeup() called. These just are my thought, is that right? -Ken 2016-03-09 2:06 GMT+08:00 Jason Gustafson <ja...@confluent.io>: > Hey Ken, > > Whether to use subscribe or assign depends mainly on whether you need to > use consumer groups to distribute the topic load. If you use subscribe(), > then the partitions for the subscribed topics will be divided among all > consumers sharing the same groupId. With assign(), you have to provide the > partitions explicitly, and there won't be any coordination with other > consumers. We expect most users will use subscribe() to take advantage of > consumer groups, but there are some cases where you need finer control. If > you're migrating from the old high level API, that is definitely the way to > go. > > By the way, I'm a little puzzled why you'd want to commitSync() in the > onPartitionsAssigned() callback. When partitions are first assigned, you > wouldn't expect local offsets to be different from whatever was last > committed since there hasn't been any consumption yet. It's common, > however, for users to commit offsets in the onPartitionsRevoked() callback > when they are using a manual commit policy. This is basically the last > chance to commit offsets before the group rebalances. > > -Jason > > On Tue, Mar 8, 2016 at 9:02 AM, Ken Cheng <kk543...@gmail.com> wrote: > > > Hi Jason, > > > > Thanks for your detailed explain. > > Face to this situation, I wanna discuss more a little bit, > > I try two approach to avoid it in Kafka 0.9.0.1, and both work correctly. > > > > 1. Using subscribe(topics, listener) and > > implements onPartitionsAssigned(partitions) , it manually run > > consumer.commitSyc() when onPartitionsAssigned be called > > 2. Using consumer.assign(topicpartitions) and assign partition number > > directly > > > > I think first approach is better. It has more flexible for dynamic > > partition situation and suitable for persistent consumer service. > > If migrate form high level api to new consumer ,this approach is more > > straightly. > > > > Second approach only use for specific requirement, but it has to control > > more detail information. > > It is suitable for target clear job or web service to get given length > > offsets > > > > how do you think? > > > > > > > > 2016-03-08 1:54 GMT+08:00 Jason Gustafson <ja...@confluent.io>: > > > > > Hi Ken, > > > > > > It looks like what happened is this: > > > > > > 1. First thread joins the group and is assigned partitions 0 and 1 > > > 2. First thread races through a bunch of messages from these > partitions. > > > 3. Second thread joins and is assigned partition 1 (leaving partition 0 > > for > > > the first thread) > > > 4. Both threads read whatever is left from their respective partitions > > > > > > This is normal (expected) behavior. It might seem a little odd that the > > > first thread gets assigned both partitions initially even though both > > > threads were started at about the same time. This is the result of the > > > coordinator somewhat optimistically creating the group and assigning > > > partitions immediately when the first member joins. When the second > > member > > > joins, it forces a partition rebalance, but there will be some latency > > > before the first member rejoins. We have considered adding a delay on > > > initial group creation to give time for more members to join, but as of > > > yet, we haven't implemented it. > > > > > > -Jason > > > > > > On Sun, Mar 6, 2016 at 7:11 PM, Ken Cheng <kk543...@gmail.com> wrote: > > > > > > > > > > > > > > > hi > > > > > > > > > > > > > > > > I'm trying new consumer features in my local PC and VM > > > > > > > > and reference code from > > > > > > > > > > http://www.confluent.io/blog/tutorial-getting-started-with-the-new-apache-kafka-0.9-consumer-client > > > > > > > > ,but execute result is different > > > > > > > > > > > > > > > > I use 2 threads consumer with same group.id consume one topic with 2 > > > > partition > > > > > > > > ,but one partition assign to two consumer instance ! > > > > > > > > > > > > > > > > e.g. > > > > > > > > If you check the attachement exelog.txt > > > > > > > > 1. you can find term "0: {partition=0, offset=0}" twice, means that > > > > instance pool same offset again > > > > > > > > 2. "1: {partition=1, offset=0}" and "0: {partition=1, offset=0}" will > > be > > > > found , means same partition assign to two consumer instance > > > > > > > > > > > > > > > > It is really strange for me > > > > > > > > does anyone know why ? > > > > > > > > > > > > > > > > plz give me a help or tips thanks. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > my broker setup: > > > > > > > > Kafka version: confluent platform 2.0.1, standalone > > > > > > > > zookeeper version : 3.4.6, 3 node cluster > > > > > > > > topic partition : 2 > > > > > > > > > > > > > > > > > > > > > > > > Best Regards, > > > > > > > > Ken > > > > > > > > > > > > > > > > > >