I second the need for having a consumer context passed to rebalance callback. I have ran into issues several times because of that.
About - subscribe vs assign - I have not read through your spark code yet (will do by eod), so I am not sure what you mean (other than I do agree that new partitions should be consumed automatically). I guess we can continue this discussion on the spark list then :-) Thanks Mansi. On Thu, Mar 10, 2016 at 7:43 AM, Cody Koeninger <c...@koeninger.org> wrote: > Mansi, I'd agree that the fact that everything is tied up in poll > seems like the source of the awkward behavior. > > Regarding assign vs subscribe, most people using the spark integration > are just going to want to provide a topic name, not go figure out a > bunch of partitions. They're also going to be surprised if things > suddenly blow up once a partition is added, or that partition doesn't > start being consumed (we already have that second issue today). > > Thats why separating the behavior of auto offset reset seems like the > best idea I've heard so far. > > Consumer rebalance listeners are still probably going to be necessary > for people who are storing offsets externally. > > On Thu, Mar 10, 2016 at 9:27 AM, Mansi Shah <mansis...@maprtech.com> > wrote: > > Guozhang > > > > Sorry for joining the party a little late. I have been thinking about > this > > whole awkward behavior of having to call poll(0) to actually make the > > underlying subscriptions take effect. Is the core reason for this design > > the fact that poll is also the actual heartbeat and you want to make the > > listener group assignments through poll - so that timeouts and > > reassignments can all go through poll? So I think clubbing liveness with > > poll (which in effect clubs consumer group assignments and hence metadata > > fetch with poll) is the real cause of this design. Were there issues > where > > you were seeing active consumers not calling poll that led to this design > > choice? I tried to look for a relevant JIRA but could not find one - can > > you please point me to something if you have it handy? > > > > Btw this would also means that your proposal to do the actual assignments > > through seek might not be ideal since there can still be indefinite time > > between seek and poll (just like between subscribe and poll) and the > > consumer could timeout even before the first poll is called? > > > > > > @Cody in your case if you really have only one consumer and it is going > to > > get all the partitions of the topic anyway - then you might as well > > subscribe using "assign" call instead of "subscribe" call. That will make > > at least your code cleaner and I do not think you are gaining anything > with > > the listener group functionality anyway? > > > > - Mansi. > > > > > > > > On Wed, Mar 9, 2016 at 8:35 PM, Guozhang Wang <wangg...@gmail.com> > wrote: > > > >> In order to do anything meaningful with the consumer itself in rebalance > >> callback (e.g. commit offset), you would need to hold on the consumer > >> reference; admittedly it sounds a bit awkward, but by design we choose > to > >> not enforce it in the interface itself. > >> > >> Guozhang > >> > >> On Wed, Mar 9, 2016 at 3:39 PM, Cody Koeninger <c...@koeninger.org> > wrote: > >> > >> > So what about my comments regarding the consumer rebalance listener > >> > interface not providing access to a consumer? I can probably work > around > >> > it, but it seems odd. > >> > On Mar 9, 2016 5:04 PM, "Guozhang Wang" <wangg...@gmail.com> wrote: > >> > > >> > > One thing proposed by Jason: > >> > > > >> > > If you want to only reset offset upon initialization, and by > >> > initialization > >> > > you mean "no committed offset", you can do sth. like the following > in > >> > > rebalance callback. > >> > > > >> > > @Override > >> > > > >> > > public void > >> > onPartitionsAssigned(Collection<TopicPartition> > >> > > partitions) { > >> > > > >> > > for (TopicPartition partition : partitions) > >> > > > >> > > if (consumer.committed(partition) == null) > >> > > > >> > > consumer.seekToBeginning(partition); > >> > > > >> > > } > >> > > > >> > > > >> > > Guozhang > >> > > > >> > > On Wed, Mar 9, 2016 at 2:11 PM, Guozhang Wang <wangg...@gmail.com> > >> > wrote: > >> > > > >> > > > Filed https://issues.apache.org/jira/browse/KAFKA-3370. > >> > > > > >> > > > On Wed, Mar 9, 2016 at 1:11 PM, Cody Koeninger < > c...@koeninger.org> > >> > > wrote: > >> > > > > >> > > >> That sounds like an interesting way of addressing the problem, > can > >> > > >> continue further discussions on the JIRA > >> > > >> > >> > > >> > >> > > >> > >> > > >> On Wed, Mar 9, 2016 at 2:59 PM, Guozhang Wang < > wangg...@gmail.com> > >> > > wrote: > >> > > >> > Cody: > >> > > >> > > >> > > >> > More specifically, you do not need the "listTopics" function if > >> you > >> > > >> already > >> > > >> > know your subscribed topics, just use "partitionsFor" is > >> sufficient. > >> > > >> > > >> > > >> > About the fix, I'm thinking of adding two more options in the > >> > > >> > auto.offset.rest, say namely "earliest-on-start" and > >> > > "latest-on-start", > >> > > >> > which sets the reset position ONLY at starting up. The reason > is > >> > that > >> > > >> the > >> > > >> > seekToXX was actually not designed to do such initialization > but > >> for > >> > > >> > calling during the lifetime of the consumer, and we'd better > >> provide > >> > > the > >> > > >> > right solution to do so. > >> > > >> > > >> > > >> > I can file the JIRA right away and start further discussions > >> there. > >> > > But > >> > > >> let > >> > > >> > me know if you have any other ideas. > >> > > >> > > >> > > >> > Guozhang > >> > > >> > > >> > > >> > On Wed, Mar 9, 2016 at 12:25 PM, Cody Koeninger < > >> c...@koeninger.org > >> > > > >> > > >> wrote: > >> > > >> > > >> > > >> >> Yeah, I think I understood what you were saying. What I'm > saying > >> > is > >> > > >> >> that if there were a way to just fetch metadata without doing > the > >> > > rest > >> > > >> >> of the work poll() does, it wouldn't be necessary. I guess I > can > >> > do > >> > > >> >> listTopics to get all metadata for all topics and then parse > it. > >> > > >> >> > >> > > >> >> Regarding running a single instance, that is the case for what > >> I'm > >> > > >> >> talking about. > >> > > >> >> > >> > > >> >> On Wed, Mar 9, 2016 at 2:02 PM, Guozhang Wang < > >> wangg...@gmail.com> > >> > > >> wrote: > >> > > >> >> > Cody, > >> > > >> >> > > >> > > >> >> > What I meant for a special case of `seekToXX` is that, today > >> when > >> > > the > >> > > >> >> > function is called with no partition parameters. It will > try to > >> > > >> execute > >> > > >> >> the > >> > > >> >> > logic on all "assigned" partitions for the consumer. And > once > >> > that > >> > > is > >> > > >> >> done, > >> > > >> >> > the subsequent poll() will not throw the exception since it > >> knows > >> > > >> those > >> > > >> >> > partitions needs to reset offsets. > >> > > >> >> > > >> > > >> >> > However for your case, there is no assigned partitions yet, > and > >> > > hence > >> > > >> >> > `seekToXX` will not take effects on any partitions. The > >> > assignment > >> > > is > >> > > >> >> > wrapped in the poll() call as you mentioned. And one way to > >> solve > >> > > it > >> > > >> is > >> > > >> >> to > >> > > >> >> > let the `seekToXX()` with no parameters do the coordination > and > >> > get > >> > > >> the > >> > > >> >> > assigned partitions if there are any subscribed topics, so > that > >> > the > >> > > >> >> > subsequent poll() will know those partitions need resetting > >> > > offsets. > >> > > >> Does > >> > > >> >> > that make sense? > >> > > >> >> > > >> > > >> >> > As for now another way I can think of is to get the > partition > >> > info > >> > > >> >> > beforehand and call `seekToBeginning` on all partitions. But > >> that > >> > > >> only > >> > > >> >> > works if the consumer knows it is going to get all the > >> partitions > >> > > >> >> assigned > >> > > >> >> > to itself (i.e. you are only running a single instance). > >> > > >> >> > > >> > > >> >> > Guozhang > >> > > >> >> > > >> > > >> >> > > >> > > >> >> > On Wed, Mar 9, 2016 at 6:22 AM, Cody Koeninger < > >> > c...@koeninger.org > >> > > > > >> > > >> >> wrote: > >> > > >> >> > > >> > > >> >> >> Another unfortunate thing about ConsumerRebalanceListener > is > >> > that > >> > > in > >> > > >> >> >> order to do meaningful work in the callback, you need a > >> > reference > >> > > to > >> > > >> >> >> the consumer that called it. But that reference isn't > >> provided > >> > to > >> > > >> the > >> > > >> >> >> callback, which means the listener implementation needs to > >> hold > >> > a > >> > > >> >> >> reference to the consumer. Seems like this makes it > >> > unnecessarily > >> > > >> >> >> awkward to serialize or provide a 0 arg constructor for the > >> > > >> listener. > >> > > >> >> >> > >> > > >> >> >> On Wed, Mar 9, 2016 at 7:28 AM, Cody Koeninger < > >> > > c...@koeninger.org> > >> > > >> >> wrote: > >> > > >> >> >> > I thought about ConsumerRebalanceListener, but seeking to > >> the > >> > > >> >> >> > beginning any time there's a rebalance for whatever > reason > >> is > >> > > not > >> > > >> >> >> > necessarily the same thing as seeking to the beginning > >> before > >> > > >> first > >> > > >> >> >> > starting the consumer. > >> > > >> >> >> > > >> > > >> >> >> > On Wed, Mar 9, 2016 at 2:24 AM, Kamal C < > >> > kamaltar...@gmail.com> > >> > > >> >> wrote: > >> > > >> >> >> >> Cody, > >> > > >> >> >> >> > >> > > >> >> >> >> Use ConsumerRebalanceListener to achieve that, > >> > > >> >> >> >> > >> > > >> >> >> >> ConsumerRebalanceListener listener = new > >> > > >> ConsumerRebalanceListener() > >> > > >> >> { > >> > > >> >> >> >> > >> > > >> >> >> >> @Override > >> > > >> >> >> >> public void > >> > > >> >> onPartitionsRevoked(Collection<TopicPartition> > >> > > >> >> >> >> partitions) { > >> > > >> >> >> >> } > >> > > >> >> >> >> > >> > > >> >> >> >> @Override > >> > > >> >> >> >> public void > >> > > >> >> onPartitionsAssigned(Collection<TopicPartition> > >> > > >> >> >> >> partitions) { > >> > > >> >> >> >> > >> > consumer.seekToBeginning(partitions.toArray(new > >> > > >> >> >> >> TopicPartition[0])); > >> > > >> >> >> >> } > >> > > >> >> >> >> }; > >> > > >> >> >> >> > >> > > >> >> >> >> consumer.subscribe(topics, listener); > >> > > >> >> >> >> > >> > > >> >> >> >> On Wed, Mar 9, 2016 at 12:05 PM, Cody Koeninger < > >> > > >> c...@koeninger.org> > >> > > >> >> >> wrote: > >> > > >> >> >> >> > >> > > >> >> >> >>> That suggestion doesn't work, for pretty much the same > >> > reason > >> > > - > >> > > >> at > >> > > >> >> the > >> > > >> >> >> >>> time poll is first called, there is no reset policy > and no > >> > > >> committed > >> > > >> >> >> >>> offset, so NoOffsetForPartitionException is thrown > >> > > >> >> >> >>> > >> > > >> >> >> >>> I feel like the underlying problem isn't so much that > >> > > seekToEnd > >> > > >> >> needs > >> > > >> >> >> >>> special case behavior. It's more that topic metadata > >> > > fetches, > >> > > >> >> >> >>> consumer position fetches, and message fetches are all > >> > lumped > >> > > >> >> together > >> > > >> >> >> >>> under a single poll() call, with no way to do them > >> > > individually > >> > > >> if > >> > > >> >> >> >>> necessary. > >> > > >> >> >> >>> > >> > > >> >> >> >>> What does "work" in this situation is to just catch the > >> > > >> exception > >> > > >> >> >> >>> (which leaves the consumer in a state where topics are > >> > > >> assigned) and > >> > > >> >> >> >>> then seek. But that is not exactly an elegant > interface. > >> > > >> >> >> >>> > >> > > >> >> >> >>> consumer.subscribe(topics) > >> > > >> >> >> >>> try { > >> > > >> >> >> >>> consumer.poll(0) > >> > > >> >> >> >>> } catch { > >> > > >> >> >> >>> case x: Throwable => > >> > > >> >> >> >>> } > >> > > >> >> >> >>> consumer.seekToBeginning() > >> > > >> >> >> >>> consumer.poll(0) > >> > > >> >> >> >>> > >> > > >> >> >> >>> > >> > > >> >> >> >>> > >> > > >> >> >> >>> > >> > > >> >> >> >>> On Tue, Mar 8, 2016 at 11:22 PM, Guozhang Wang < > >> > > >> wangg...@gmail.com> > >> > > >> >> >> wrote: > >> > > >> >> >> >>> > Hi Cody, > >> > > >> >> >> >>> > > >> > > >> >> >> >>> > The problem with that code is in `seekToBeginning()` > >> > > followed > >> > > >> by > >> > > >> >> >> >>> > `subscribe(topic)`. > >> > > >> >> >> >>> > > >> > > >> >> >> >>> > Since `subscribe` call is lazy evaluated, by the time > >> > > >> >> >> `seekToBeginning()` > >> > > >> >> >> >>> > is called no partition is assigned yet, and hence it > is > >> > > >> >> effectively > >> > > >> >> >> an > >> > > >> >> >> >>> > no-op. > >> > > >> >> >> >>> > > >> > > >> >> >> >>> > Try > >> > > >> >> >> >>> > > >> > > >> >> >> >>> > consumer.subscribe(topics) > >> > > >> >> >> >>> > consumer.poll(0); // get assigned partitions > >> > > >> >> >> >>> > consumer.seekToBeginning() > >> > > >> >> >> >>> > consumer.poll(0) > >> > > >> >> >> >>> > > >> > > >> >> >> >>> > to see if that works. > >> > > >> >> >> >>> > > >> > > >> >> >> >>> > I think it is a valid issue that can be fixed in the > new > >> > > >> consumer > >> > > >> >> >> that, > >> > > >> >> >> >>> > upon calling seekToEnd/Beginning with no parameter, > >> while > >> > no > >> > > >> >> >> assigned is > >> > > >> >> >> >>> > done yet, do the coordination behind the scene; it > will > >> > > though > >> > > >> >> >> change the > >> > > >> >> >> >>> > behavior of the functions as they are no longer > always > >> > > lazily > >> > > >> >> >> evaluated. > >> > > >> >> >> >>> > > >> > > >> >> >> >>> > > >> > > >> >> >> >>> > Guozhang > >> > > >> >> >> >>> > > >> > > >> >> >> >>> > > >> > > >> >> >> >>> > On Tue, Mar 8, 2016 at 2:08 PM, Cody Koeninger < > >> > > >> >> c...@koeninger.org> > >> > > >> >> >> >>> wrote: > >> > > >> >> >> >>> > > >> > > >> >> >> >>> >> Using the 0.9 consumer, I would like to start > consuming > >> > at > >> > > >> the > >> > > >> >> >> >>> >> beginning or end, without specifying > auto.offset.reset. > >> > > >> >> >> >>> >> > >> > > >> >> >> >>> >> This does not seem to be possible: > >> > > >> >> >> >>> >> > >> > > >> >> >> >>> >> val kafkaParams = Map[String, Object]( > >> > > >> >> >> >>> >> "bootstrap.servers" -> > >> > > conf.getString("kafka.brokers"), > >> > > >> >> >> >>> >> "key.deserializer" -> > >> classOf[StringDeserializer], > >> > > >> >> >> >>> >> "value.deserializer" -> > >> > classOf[StringDeserializer], > >> > > >> >> >> >>> >> "group.id" -> "example", > >> > > >> >> >> >>> >> "auto.offset.reset" -> "none" > >> > > >> >> >> >>> >> ).asJava > >> > > >> >> >> >>> >> val topics = > >> > > >> >> >> conf.getString("kafka.topics").split(",").toList.asJava > >> > > >> >> >> >>> >> val consumer = new KafkaConsumer[String, > >> > > >> String](kafkaParams) > >> > > >> >> >> >>> >> consumer.subscribe(topics) > >> > > >> >> >> >>> >> consumer.seekToBeginning() > >> > > >> >> >> >>> >> consumer.poll(0) > >> > > >> >> >> >>> >> > >> > > >> >> >> >>> >> > >> > > >> >> >> >>> >> Results in: > >> > > >> >> >> >>> >> > >> > > >> >> >> >>> >> Exception in thread "main" > >> > > >> >> >> >>> >> > >> > > >> org.apache.kafka.clients.consumer.NoOffsetForPartitionException: > >> > > >> >> >> >>> >> Undefined offset with no reset policy for partition: > >> > > >> testtwo-4 > >> > > >> >> >> >>> >> at > >> > > >> >> >> >>> >> > >> > > >> >> >> >>> > >> > > >> >> >> > >> > > >> >> > >> > > >> > >> > > > >> > > >> > org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:288) > >> > > >> >> >> >>> >> at > >> > > >> >> >> >>> >> > >> > > >> >> >> >>> > >> > > >> >> >> > >> > > >> >> > >> > > >> > >> > > > >> > > >> > org.apache.kafka.clients.consumer.internals.Fetcher.updateFetchPositions(Fetcher.java:167) > >> > > >> >> >> >>> >> at > >> > > >> >> >> >>> >> > >> > > >> >> >> >>> > >> > > >> >> >> > >> > > >> >> > >> > > >> > >> > > > >> > > >> > org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1302) > >> > > >> >> >> >>> >> at > >> > > >> >> >> >>> >> > >> > > >> >> >> >>> > >> > > >> >> >> > >> > > >> >> > >> > > >> > >> > > > >> > > >> > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:895) > >> > > >> >> >> >>> >> at > >> > > >> >> >> >>> >> > >> > > >> >> >> >>> > >> > > >> >> >> > >> > > >> >> > >> > > >> > >> > > > >> > > >> > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853) > >> > > >> >> >> >>> >> at > >> > > >> >> >> > example.BasicKafkaConsumer$.main(BasicKafkaConsumer.scala:25) > >> > > >> >> >> >>> >> > >> > > >> >> >> >>> >> > >> > > >> >> >> >>> >> I'm assuming this is because, at the time > >> > seekToBeginning() > >> > > >> is > >> > > >> >> >> called, > >> > > >> >> >> >>> >> subscriptions.assignedPartitions isn't populated. > But > >> > > >> polling in > >> > > >> >> >> >>> >> order to assign topicpartitions results in an error, > >> > which > >> > > >> >> creates a > >> > > >> >> >> >>> >> chicken-or-the-egg situation. > >> > > >> >> >> >>> >> > >> > > >> >> >> >>> >> I don't want to set auto.offset.reset, because I > want a > >> > > hard > >> > > >> >> error > >> > > >> >> >> if > >> > > >> >> >> >>> >> the offsets are out of range at any other time > during > >> > > >> >> consumption. > >> > > >> >> >> >>> >> > >> > > >> >> >> >>> > > >> > > >> >> >> >>> > > >> > > >> >> >> >>> > > >> > > >> >> >> >>> > -- > >> > > >> >> >> >>> > -- Guozhang > >> > > >> >> >> >>> > >> > > >> >> >> > >> > > >> >> > > >> > > >> >> > > >> > > >> >> > > >> > > >> >> > -- > >> > > >> >> > -- Guozhang > >> > > >> >> > >> > > >> > > >> > > >> > > >> > > >> > > >> > > >> > -- > >> > > >> > -- Guozhang > >> > > >> > >> > > > > >> > > > > >> > > > > >> > > > -- > >> > > > -- Guozhang > >> > > > > >> > > > >> > > > >> > > > >> > > -- > >> > > -- Guozhang > >> > > > >> > > >> > >> > >> > >> -- > >> -- Guozhang > >> >