Hey Ken, Whether to use subscribe or assign depends mainly on whether you need to use consumer groups to distribute the topic load. If you use subscribe(), then the partitions for the subscribed topics will be divided among all consumers sharing the same groupId. With assign(), you have to provide the partitions explicitly, and there won't be any coordination with other consumers. We expect most users will use subscribe() to take advantage of consumer groups, but there are some cases where you need finer control. If you're migrating from the old high level API, that is definitely the way to go.
By the way, I'm a little puzzled why you'd want to commitSync() in the onPartitionsAssigned() callback. When partitions are first assigned, you wouldn't expect local offsets to be different from whatever was last committed since there hasn't been any consumption yet. It's common, however, for users to commit offsets in the onPartitionsRevoked() callback when they are using a manual commit policy. This is basically the last chance to commit offsets before the group rebalances. -Jason On Tue, Mar 8, 2016 at 9:02 AM, Ken Cheng <kk543...@gmail.com> wrote: > Hi Jason, > > Thanks for your detailed explain. > Face to this situation, I wanna discuss more a little bit, > I try two approach to avoid it in Kafka 0.9.0.1, and both work correctly. > > 1. Using subscribe(topics, listener) and > implements onPartitionsAssigned(partitions) , it manually run > consumer.commitSyc() when onPartitionsAssigned be called > 2. Using consumer.assign(topicpartitions) and assign partition number > directly > > I think first approach is better. It has more flexible for dynamic > partition situation and suitable for persistent consumer service. > If migrate form high level api to new consumer ,this approach is more > straightly. > > Second approach only use for specific requirement, but it has to control > more detail information. > It is suitable for target clear job or web service to get given length > offsets > > how do you think? > > > > 2016-03-08 1:54 GMT+08:00 Jason Gustafson <ja...@confluent.io>: > > > Hi Ken, > > > > It looks like what happened is this: > > > > 1. First thread joins the group and is assigned partitions 0 and 1 > > 2. First thread races through a bunch of messages from these partitions. > > 3. Second thread joins and is assigned partition 1 (leaving partition 0 > for > > the first thread) > > 4. Both threads read whatever is left from their respective partitions > > > > This is normal (expected) behavior. It might seem a little odd that the > > first thread gets assigned both partitions initially even though both > > threads were started at about the same time. This is the result of the > > coordinator somewhat optimistically creating the group and assigning > > partitions immediately when the first member joins. When the second > member > > joins, it forces a partition rebalance, but there will be some latency > > before the first member rejoins. We have considered adding a delay on > > initial group creation to give time for more members to join, but as of > > yet, we haven't implemented it. > > > > -Jason > > > > On Sun, Mar 6, 2016 at 7:11 PM, Ken Cheng <kk543...@gmail.com> wrote: > > > > > > > > > > > hi > > > > > > > > > > > > I'm trying new consumer features in my local PC and VM > > > > > > and reference code from > > > > > > http://www.confluent.io/blog/tutorial-getting-started-with-the-new-apache-kafka-0.9-consumer-client > > > > > > ,but execute result is different > > > > > > > > > > > > I use 2 threads consumer with same group.id consume one topic with 2 > > > partition > > > > > > ,but one partition assign to two consumer instance ! > > > > > > > > > > > > e.g. > > > > > > If you check the attachement exelog.txt > > > > > > 1. you can find term "0: {partition=0, offset=0}" twice, means that > > > instance pool same offset again > > > > > > 2. "1: {partition=1, offset=0}" and "0: {partition=1, offset=0}" will > be > > > found , means same partition assign to two consumer instance > > > > > > > > > > > > It is really strange for me > > > > > > does anyone know why ? > > > > > > > > > > > > plz give me a help or tips thanks. > > > > > > > > > > > > > > > > > > > > > > > > my broker setup: > > > > > > Kafka version: confluent platform 2.0.1, standalone > > > > > > zookeeper version : 3.4.6, 3 node cluster > > > > > > topic partition : 2 > > > > > > > > > > > > > > > > > > Best Regards, > > > > > > Ken > > > > > > > > > > > >