Ah, that makes more sense. I have no idea about the limitations of your use case, but maybe you could expose a different interface to users.
interface RebalanceListener { void onPartitionsAssigned(Consumer<K, V> consumer, Collection<TopicPartition> partitions); void onPartitionsRevoked(Consumer<K, V> consumer, Collection<TopicPartition> partitions); } Then you can adapt it internally in the call to subscribe(). Would that work? Also, we already have partitionsFor() to fetch topic metadata. And you can look at the pause/resume API which lets you call poll() without consuming messages. We'd probably want to understand why those are insufficient before considering new APIs. -Jason On Mon, Mar 14, 2016 at 12:17 PM, Cody Koeninger <c...@koeninger.org> wrote: > Regarding the rebalance listener, in the case of the spark > integration, it is possible a job can fail and be restarted from > checkpoint in a new jvm. That means that you need to be able to > reconstruct objects. Any reasonable rebalance listener can't have a > 0-arg constructor, because it needs a reference to the consumer in > order to do anything useful. I can get around this by making the user > provide a 0-arg function to return a fully configured + subscribed > Kafka consumer, so that they get to do the dance of constructing a > consumer before constructing the listener. I don't think the current > rebalance listener is a dealbreaker, I just think it is an > unnecessarily awkward API. > > Regarding poll(), I honestly don't see what the problem is with > keeping poll() behavior as is, but also having public methods to e.g. > fetch metadata, for people that need that functionality without > actually consuming messages. > > On Mon, Mar 14, 2016 at 1:01 PM, Jason Gustafson <ja...@confluent.io> > wrote: > > Late arrival to this discussion. I'm not really sure I see the problem > with > > accessing the consumer in the rebalance listener. Before we passed the > > consumer instance as a separate argument, but that was only because the > > rebalance listener had to be passed by classname before a reference to > the > > consumer was available. After we changed the API to pass it in > subscribe() > > instead, getting a reference to the consumer shouldn't be a problem. > Maybe > > I'm missing something? > > > > As for poll() doing everything... It's definitely caused some confusion, > > but I'm a little doubtful that trying to split out the functionality is > > going to help as much as people think. A heartbeat() API has come up > > several times, for example, but you have to figure out how it's going to > > affect rebalancing. And rebalancing affects which partitions need to be > > fetched or can be committed. They're all so intertwined that I'm not sure > > you can divide them up without creating a bigger problem than the one > > you're trying to solve. At this point, with 0.10 shortly on the way, it > > seems unlikely that incompatible changes to the API will be accepted. > > However, if someone can propose a compatible solution which addresses > some > > of the concerns mentioned, we'd love to hear about it! > > > > -Jason > > > > On Mon, Mar 14, 2016 at 9:21 AM, Cody Koeninger <c...@koeninger.org> > wrote: > > > >> Honestly the fact that everything is hidden inside poll() has been > >> confusing people since last year, e.g. > >> > >> https://issues.apache.org/jira/browse/KAFKA-2359 > >> > >> I can try to formulate a KIP for this, but it seems clear that I'm not > >> the only one giving this feedback, and I may not understand all the > >> other use cases that have been brought up. > >> > >> On Sun, Mar 13, 2016 at 3:09 PM, Guozhang Wang <wangg...@gmail.com> > wrote: > >> > Cody, > >> > > >> > We do not have an umbrella JIRA for this, but rather a case-by-case > JIRA > >> > ticket / KIP for API changes in consumer. > >> > > >> > If you feel strong about some specific change on the consumer API, > please > >> > feel free to create a new KIP with the detailed motivation and > proposed > >> > modifications. > >> > > >> > Guozhang > >> > > >> > On Fri, Mar 11, 2016 at 12:28 PM, Cody Koeninger <c...@koeninger.org> > >> wrote: > >> > > >> >> Is there a KIP or Jira related to " working on improving these cases > >> >> with improved APIs " ? > >> >> > >> >> I saw that there was some discussion of it in KIP-41, but that seemed > >> >> to have been resolved in favor of keeping everything inside of poll() > >> >> > >> >> On Fri, Mar 11, 2016 at 11:17 AM, Guozhang Wang <wangg...@gmail.com> > >> >> wrote: > >> >> > Cody, Mansi: > >> >> > > >> >> > All good points! Let me try to answer them one-by-one. > >> >> > > >> >> > About this specific issue, as I suggested in the JIRA we can > separate > >> the > >> >> > case about resetting offset upon initializing a partition to fetch, > >> from > >> >> > the case that fetching offset out-of-range in the auto.offset.reset > >> >> config. > >> >> > These two scenarios are indeed quite different and it's reasonable > >> >> treating > >> >> > them differently. > >> >> > > >> >> > About passing a consumer context to the rebalance callback's > >> constructor, > >> >> > we left it for user's flexibility: if you want to use Kafka to > commit > >> >> > offsets, for example, then you pass the consumer reference to the > >> >> callback; > >> >> > if you use an external service to store offsets, you can pass a > JDBC > >> >> > connector, for example, to the callback; for some data mirroring > you > >> can > >> >> > even pass in another producer client into it. Always enforcing the > >> >> consumer > >> >> > context could be convenient (i.e. you do not need to pass in the > >> argument > >> >> > to the constructor yourself) for some use cases, but not > necessarily > >> all. > >> >> > > >> >> > About wrapping coordination protocols (partition assignment, > >> heartbeat) > >> >> > inside "poll()" behind the scene, we implemented the APIs in this > way > >> in > >> >> > order to abstract the underlying details from the users, and also > to > >> >> > provide a simple "single-thread-poll-loop" design pattern in the > new > >> >> > Consumer. We realized that it does actually make some of the use > cases > >> >> more > >> >> > awkward, and are working on improving these cases with improved > APIs > >> as > >> >> > well. Let us know if you have any suggestions about this. > >> >> > > >> >> > Guozhang > >> >> > > >> >> > > >> >> > > >> >> > > >> >> > > >> >> > > >> >> > On Thu, Mar 10, 2016 at 7:53 AM, Mansi Shah < > mansis...@maprtech.com> > >> >> wrote: > >> >> > > >> >> >> I second the need for having a consumer context passed to > rebalance > >> >> >> callback. I have ran into issues several times because of that. > >> >> >> > >> >> >> About - subscribe vs assign - I have not read through your spark > code > >> >> yet > >> >> >> (will do by eod), so I am not sure what you mean (other than I do > >> agree > >> >> >> that new partitions should be consumed automatically). I guess we > can > >> >> >> continue this discussion on the spark list then :-) > >> >> >> > >> >> >> Thanks > >> >> >> Mansi. > >> >> >> > >> >> >> On Thu, Mar 10, 2016 at 7:43 AM, Cody Koeninger < > c...@koeninger.org> > >> >> >> wrote: > >> >> >> > >> >> >> > Mansi, I'd agree that the fact that everything is tied up in > poll > >> >> >> > seems like the source of the awkward behavior. > >> >> >> > > >> >> >> > Regarding assign vs subscribe, most people using the spark > >> integration > >> >> >> > are just going to want to provide a topic name, not go figure > out a > >> >> >> > bunch of partitions. They're also going to be surprised if > things > >> >> >> > suddenly blow up once a partition is added, or that partition > >> doesn't > >> >> >> > start being consumed (we already have that second issue today). > >> >> >> > > >> >> >> > Thats why separating the behavior of auto offset reset seems > like > >> the > >> >> >> > best idea I've heard so far. > >> >> >> > > >> >> >> > Consumer rebalance listeners are still probably going to be > >> necessary > >> >> >> > for people who are storing offsets externally. > >> >> >> > > >> >> >> > On Thu, Mar 10, 2016 at 9:27 AM, Mansi Shah < > >> mansis...@maprtech.com> > >> >> >> > wrote: > >> >> >> > > Guozhang > >> >> >> > > > >> >> >> > > Sorry for joining the party a little late. I have been > thinking > >> >> about > >> >> >> > this > >> >> >> > > whole awkward behavior of having to call poll(0) to actually > make > >> >> the > >> >> >> > > underlying subscriptions take effect. Is the core reason for > this > >> >> >> design > >> >> >> > > the fact that poll is also the actual heartbeat and you want > to > >> make > >> >> >> the > >> >> >> > > listener group assignments through poll - so that timeouts and > >> >> >> > > reassignments can all go through poll? So I think clubbing > >> liveness > >> >> >> with > >> >> >> > > poll (which in effect clubs consumer group assignments and > hence > >> >> >> metadata > >> >> >> > > fetch with poll) is the real cause of this design. Were there > >> issues > >> >> >> > where > >> >> >> > > you were seeing active consumers not calling poll that led to > >> this > >> >> >> design > >> >> >> > > choice? I tried to look for a relevant JIRA but could not find > >> one - > >> >> >> can > >> >> >> > > you please point me to something if you have it handy? > >> >> >> > > > >> >> >> > > Btw this would also means that your proposal to do the actual > >> >> >> assignments > >> >> >> > > through seek might not be ideal since there can still be > >> indefinite > >> >> >> time > >> >> >> > > between seek and poll (just like between subscribe and poll) > and > >> the > >> >> >> > > consumer could timeout even before the first poll is called? > >> >> >> > > > >> >> >> > > > >> >> >> > > @Cody in your case if you really have only one consumer and > it is > >> >> going > >> >> >> > to > >> >> >> > > get all the partitions of the topic anyway - then you might as > >> well > >> >> >> > > subscribe using "assign" call instead of "subscribe" call. > That > >> will > >> >> >> make > >> >> >> > > at least your code cleaner and I do not think you are gaining > >> >> anything > >> >> >> > with > >> >> >> > > the listener group functionality anyway? > >> >> >> > > > >> >> >> > > - Mansi. > >> >> >> > > > >> >> >> > > > >> >> >> > > > >> >> >> > > On Wed, Mar 9, 2016 at 8:35 PM, Guozhang Wang < > >> wangg...@gmail.com> > >> >> >> > wrote: > >> >> >> > > > >> >> >> > >> In order to do anything meaningful with the consumer itself > in > >> >> >> rebalance > >> >> >> > >> callback (e.g. commit offset), you would need to hold on the > >> >> consumer > >> >> >> > >> reference; admittedly it sounds a bit awkward, but by design > we > >> >> choose > >> >> >> > to > >> >> >> > >> not enforce it in the interface itself. > >> >> >> > >> > >> >> >> > >> Guozhang > >> >> >> > >> > >> >> >> > >> On Wed, Mar 9, 2016 at 3:39 PM, Cody Koeninger < > >> c...@koeninger.org > >> >> > > >> >> >> > wrote: > >> >> >> > >> > >> >> >> > >> > So what about my comments regarding the consumer rebalance > >> >> listener > >> >> >> > >> > interface not providing access to a consumer? I can > probably > >> >> work > >> >> >> > around > >> >> >> > >> > it, but it seems odd. > >> >> >> > >> > On Mar 9, 2016 5:04 PM, "Guozhang Wang" < > wangg...@gmail.com> > >> >> wrote: > >> >> >> > >> > > >> >> >> > >> > > One thing proposed by Jason: > >> >> >> > >> > > > >> >> >> > >> > > If you want to only reset offset upon initialization, > and by > >> >> >> > >> > initialization > >> >> >> > >> > > you mean "no committed offset", you can do sth. like the > >> >> following > >> >> >> > in > >> >> >> > >> > > rebalance callback. > >> >> >> > >> > > > >> >> >> > >> > > @Override > >> >> >> > >> > > > >> >> >> > >> > > public void > >> >> >> > >> > onPartitionsAssigned(Collection<TopicPartition> > >> >> >> > >> > > partitions) { > >> >> >> > >> > > > >> >> >> > >> > > for (TopicPartition partition : > >> partitions) > >> >> >> > >> > > > >> >> >> > >> > > if > (consumer.committed(partition) == > >> >> null) > >> >> >> > >> > > > >> >> >> > >> > > > >> >> consumer.seekToBeginning(partition); > >> >> >> > >> > > > >> >> >> > >> > > } > >> >> >> > >> > > > >> >> >> > >> > > > >> >> >> > >> > > Guozhang > >> >> >> > >> > > > >> >> >> > >> > > On Wed, Mar 9, 2016 at 2:11 PM, Guozhang Wang < > >> >> wangg...@gmail.com > >> >> >> > > >> >> >> > >> > wrote: > >> >> >> > >> > > > >> >> >> > >> > > > Filed https://issues.apache.org/jira/browse/KAFKA-3370 > . > >> >> >> > >> > > > > >> >> >> > >> > > > On Wed, Mar 9, 2016 at 1:11 PM, Cody Koeninger < > >> >> >> > c...@koeninger.org> > >> >> >> > >> > > wrote: > >> >> >> > >> > > > > >> >> >> > >> > > >> That sounds like an interesting way of addressing the > >> >> problem, > >> >> >> > can > >> >> >> > >> > > >> continue further discussions on the JIRA > >> >> >> > >> > > >> > >> >> >> > >> > > >> > >> >> >> > >> > > >> > >> >> >> > >> > > >> On Wed, Mar 9, 2016 at 2:59 PM, Guozhang Wang < > >> >> >> > wangg...@gmail.com> > >> >> >> > >> > > wrote: > >> >> >> > >> > > >> > Cody: > >> >> >> > >> > > >> > > >> >> >> > >> > > >> > More specifically, you do not need the "listTopics" > >> >> function > >> >> >> if > >> >> >> > >> you > >> >> >> > >> > > >> already > >> >> >> > >> > > >> > know your subscribed topics, just use > "partitionsFor" > >> is > >> >> >> > >> sufficient. > >> >> >> > >> > > >> > > >> >> >> > >> > > >> > About the fix, I'm thinking of adding two more > options > >> in > >> >> the > >> >> >> > >> > > >> > auto.offset.rest, say namely "earliest-on-start" and > >> >> >> > >> > > "latest-on-start", > >> >> >> > >> > > >> > which sets the reset position ONLY at starting up. > The > >> >> reason > >> >> >> > is > >> >> >> > >> > that > >> >> >> > >> > > >> the > >> >> >> > >> > > >> > seekToXX was actually not designed to do such > >> >> initialization > >> >> >> > but > >> >> >> > >> for > >> >> >> > >> > > >> > calling during the lifetime of the consumer, and > we'd > >> >> better > >> >> >> > >> provide > >> >> >> > >> > > the > >> >> >> > >> > > >> > right solution to do so. > >> >> >> > >> > > >> > > >> >> >> > >> > > >> > I can file the JIRA right away and start further > >> >> discussions > >> >> >> > >> there. > >> >> >> > >> > > But > >> >> >> > >> > > >> let > >> >> >> > >> > > >> > me know if you have any other ideas. > >> >> >> > >> > > >> > > >> >> >> > >> > > >> > Guozhang > >> >> >> > >> > > >> > > >> >> >> > >> > > >> > On Wed, Mar 9, 2016 at 12:25 PM, Cody Koeninger < > >> >> >> > >> c...@koeninger.org > >> >> >> > >> > > > >> >> >> > >> > > >> wrote: > >> >> >> > >> > > >> > > >> >> >> > >> > > >> >> Yeah, I think I understood what you were saying. > What > >> >> I'm > >> >> >> > saying > >> >> >> > >> > is > >> >> >> > >> > > >> >> that if there were a way to just fetch metadata > >> without > >> >> >> doing > >> >> >> > the > >> >> >> > >> > > rest > >> >> >> > >> > > >> >> of the work poll() does, it wouldn't be > necessary. I > >> >> guess > >> >> >> I > >> >> >> > can > >> >> >> > >> > do > >> >> >> > >> > > >> >> listTopics to get all metadata for all topics and > then > >> >> parse > >> >> >> > it. > >> >> >> > >> > > >> >> > >> >> >> > >> > > >> >> Regarding running a single instance, that is the > case > >> for > >> >> >> what > >> >> >> > >> I'm > >> >> >> > >> > > >> >> talking about. > >> >> >> > >> > > >> >> > >> >> >> > >> > > >> >> On Wed, Mar 9, 2016 at 2:02 PM, Guozhang Wang < > >> >> >> > >> wangg...@gmail.com> > >> >> >> > >> > > >> wrote: > >> >> >> > >> > > >> >> > Cody, > >> >> >> > >> > > >> >> > > >> >> >> > >> > > >> >> > What I meant for a special case of `seekToXX` is > >> that, > >> >> >> today > >> >> >> > >> when > >> >> >> > >> > > the > >> >> >> > >> > > >> >> > function is called with no partition parameters. > It > >> >> will > >> >> >> > try to > >> >> >> > >> > > >> execute > >> >> >> > >> > > >> >> the > >> >> >> > >> > > >> >> > logic on all "assigned" partitions for the > consumer. > >> >> And > >> >> >> > once > >> >> >> > >> > that > >> >> >> > >> > > is > >> >> >> > >> > > >> >> done, > >> >> >> > >> > > >> >> > the subsequent poll() will not throw the > exception > >> >> since > >> >> >> it > >> >> >> > >> knows > >> >> >> > >> > > >> those > >> >> >> > >> > > >> >> > partitions needs to reset offsets. > >> >> >> > >> > > >> >> > > >> >> >> > >> > > >> >> > However for your case, there is no assigned > >> partitions > >> >> >> yet, > >> >> >> > and > >> >> >> > >> > > hence > >> >> >> > >> > > >> >> > `seekToXX` will not take effects on any > partitions. > >> The > >> >> >> > >> > assignment > >> >> >> > >> > > is > >> >> >> > >> > > >> >> > wrapped in the poll() call as you mentioned. And > one > >> >> way > >> >> >> to > >> >> >> > >> solve > >> >> >> > >> > > it > >> >> >> > >> > > >> is > >> >> >> > >> > > >> >> to > >> >> >> > >> > > >> >> > let the `seekToXX()` with no parameters do the > >> >> >> coordination > >> >> >> > and > >> >> >> > >> > get > >> >> >> > >> > > >> the > >> >> >> > >> > > >> >> > assigned partitions if there are any subscribed > >> >> topics, so > >> >> >> > that > >> >> >> > >> > the > >> >> >> > >> > > >> >> > subsequent poll() will know those partitions need > >> >> >> resetting > >> >> >> > >> > > offsets. > >> >> >> > >> > > >> Does > >> >> >> > >> > > >> >> > that make sense? > >> >> >> > >> > > >> >> > > >> >> >> > >> > > >> >> > As for now another way I can think of is to get > the > >> >> >> > partition > >> >> >> > >> > info > >> >> >> > >> > > >> >> > beforehand and call `seekToBeginning` on all > >> >> partitions. > >> >> >> But > >> >> >> > >> that > >> >> >> > >> > > >> only > >> >> >> > >> > > >> >> > works if the consumer knows it is going to get > all > >> the > >> >> >> > >> partitions > >> >> >> > >> > > >> >> assigned > >> >> >> > >> > > >> >> > to itself (i.e. you are only running a single > >> >> instance). > >> >> >> > >> > > >> >> > > >> >> >> > >> > > >> >> > Guozhang > >> >> >> > >> > > >> >> > > >> >> >> > >> > > >> >> > > >> >> >> > >> > > >> >> > On Wed, Mar 9, 2016 at 6:22 AM, Cody Koeninger < > >> >> >> > >> > c...@koeninger.org > >> >> >> > >> > > > > >> >> >> > >> > > >> >> wrote: > >> >> >> > >> > > >> >> > > >> >> >> > >> > > >> >> >> Another unfortunate thing about > >> >> ConsumerRebalanceListener > >> >> >> > is > >> >> >> > >> > that > >> >> >> > >> > > in > >> >> >> > >> > > >> >> >> order to do meaningful work in the callback, you > >> need > >> >> a > >> >> >> > >> > reference > >> >> >> > >> > > to > >> >> >> > >> > > >> >> >> the consumer that called it. But that reference > >> isn't > >> >> >> > >> provided > >> >> >> > >> > to > >> >> >> > >> > > >> the > >> >> >> > >> > > >> >> >> callback, which means the listener > implementation > >> >> needs > >> >> >> to > >> >> >> > >> hold > >> >> >> > >> > a > >> >> >> > >> > > >> >> >> reference to the consumer. Seems like this > makes > >> it > >> >> >> > >> > unnecessarily > >> >> >> > >> > > >> >> >> awkward to serialize or provide a 0 arg > constructor > >> >> for > >> >> >> the > >> >> >> > >> > > >> listener. > >> >> >> > >> > > >> >> >> > >> >> >> > >> > > >> >> >> On Wed, Mar 9, 2016 at 7:28 AM, Cody Koeninger < > >> >> >> > >> > > c...@koeninger.org> > >> >> >> > >> > > >> >> wrote: > >> >> >> > >> > > >> >> >> > I thought about ConsumerRebalanceListener, but > >> >> seeking > >> >> >> to > >> >> >> > >> the > >> >> >> > >> > > >> >> >> > beginning any time there's a rebalance for > >> whatever > >> >> >> > reason > >> >> >> > >> is > >> >> >> > >> > > not > >> >> >> > >> > > >> >> >> > necessarily the same thing as seeking to the > >> >> beginning > >> >> >> > >> before > >> >> >> > >> > > >> first > >> >> >> > >> > > >> >> >> > starting the consumer. > >> >> >> > >> > > >> >> >> > > >> >> >> > >> > > >> >> >> > On Wed, Mar 9, 2016 at 2:24 AM, Kamal C < > >> >> >> > >> > kamaltar...@gmail.com> > >> >> >> > >> > > >> >> wrote: > >> >> >> > >> > > >> >> >> >> Cody, > >> >> >> > >> > > >> >> >> >> > >> >> >> > >> > > >> >> >> >> Use ConsumerRebalanceListener to achieve > that, > >> >> >> > >> > > >> >> >> >> > >> >> >> > >> > > >> >> >> >> ConsumerRebalanceListener listener = new > >> >> >> > >> > > >> ConsumerRebalanceListener() > >> >> >> > >> > > >> >> { > >> >> >> > >> > > >> >> >> >> > >> >> >> > >> > > >> >> >> >> @Override > >> >> >> > >> > > >> >> >> >> public void > >> >> >> > >> > > >> >> onPartitionsRevoked(Collection<TopicPartition> > >> >> >> > >> > > >> >> >> >> partitions) { > >> >> >> > >> > > >> >> >> >> } > >> >> >> > >> > > >> >> >> >> > >> >> >> > >> > > >> >> >> >> @Override > >> >> >> > >> > > >> >> >> >> public void > >> >> >> > >> > > >> >> onPartitionsAssigned(Collection<TopicPartition> > >> >> >> > >> > > >> >> >> >> partitions) { > >> >> >> > >> > > >> >> >> >> > >> >> >> > >> > consumer.seekToBeginning(partitions.toArray(new > >> >> >> > >> > > >> >> >> >> TopicPartition[0])); > >> >> >> > >> > > >> >> >> >> } > >> >> >> > >> > > >> >> >> >> }; > >> >> >> > >> > > >> >> >> >> > >> >> >> > >> > > >> >> >> >> consumer.subscribe(topics, listener); > >> >> >> > >> > > >> >> >> >> > >> >> >> > >> > > >> >> >> >> On Wed, Mar 9, 2016 at 12:05 PM, Cody > Koeninger > >> < > >> >> >> > >> > > >> c...@koeninger.org> > >> >> >> > >> > > >> >> >> wrote: > >> >> >> > >> > > >> >> >> >> > >> >> >> > >> > > >> >> >> >>> That suggestion doesn't work, for pretty > much > >> the > >> >> >> same > >> >> >> > >> > reason > >> >> >> > >> > > - > >> >> >> > >> > > >> at > >> >> >> > >> > > >> >> the > >> >> >> > >> > > >> >> >> >>> time poll is first called, there is no reset > >> >> policy > >> >> >> > and no > >> >> >> > >> > > >> committed > >> >> >> > >> > > >> >> >> >>> offset, so NoOffsetForPartitionException is > >> thrown > >> >> >> > >> > > >> >> >> >>> > >> >> >> > >> > > >> >> >> >>> I feel like the underlying problem isn't so > >> much > >> >> that > >> >> >> > >> > > seekToEnd > >> >> >> > >> > > >> >> needs > >> >> >> > >> > > >> >> >> >>> special case behavior. It's more that > topic > >> >> >> metadata > >> >> >> > >> > > fetches, > >> >> >> > >> > > >> >> >> >>> consumer position fetches, and message > fetches > >> are > >> >> >> all > >> >> >> > >> > lumped > >> >> >> > >> > > >> >> together > >> >> >> > >> > > >> >> >> >>> under a single poll() call, with no way to > do > >> them > >> >> >> > >> > > individually > >> >> >> > >> > > >> if > >> >> >> > >> > > >> >> >> >>> necessary. > >> >> >> > >> > > >> >> >> >>> > >> >> >> > >> > > >> >> >> >>> What does "work" in this situation is to > just > >> >> catch > >> >> >> the > >> >> >> > >> > > >> exception > >> >> >> > >> > > >> >> >> >>> (which leaves the consumer in a state where > >> topics > >> >> >> are > >> >> >> > >> > > >> assigned) and > >> >> >> > >> > > >> >> >> >>> then seek. But that is not exactly an > elegant > >> >> >> > interface. > >> >> >> > >> > > >> >> >> >>> > >> >> >> > >> > > >> >> >> >>> consumer.subscribe(topics) > >> >> >> > >> > > >> >> >> >>> try { > >> >> >> > >> > > >> >> >> >>> consumer.poll(0) > >> >> >> > >> > > >> >> >> >>> } catch { > >> >> >> > >> > > >> >> >> >>> case x: Throwable => > >> >> >> > >> > > >> >> >> >>> } > >> >> >> > >> > > >> >> >> >>> consumer.seekToBeginning() > >> >> >> > >> > > >> >> >> >>> consumer.poll(0) > >> >> >> > >> > > >> >> >> >>> > >> >> >> > >> > > >> >> >> >>> > >> >> >> > >> > > >> >> >> >>> > >> >> >> > >> > > >> >> >> >>> > >> >> >> > >> > > >> >> >> >>> On Tue, Mar 8, 2016 at 11:22 PM, Guozhang > Wang > >> < > >> >> >> > >> > > >> wangg...@gmail.com> > >> >> >> > >> > > >> >> >> wrote: > >> >> >> > >> > > >> >> >> >>> > Hi Cody, > >> >> >> > >> > > >> >> >> >>> > > >> >> >> > >> > > >> >> >> >>> > The problem with that code is in > >> >> >> `seekToBeginning()` > >> >> >> > >> > > followed > >> >> >> > >> > > >> by > >> >> >> > >> > > >> >> >> >>> > `subscribe(topic)`. > >> >> >> > >> > > >> >> >> >>> > > >> >> >> > >> > > >> >> >> >>> > Since `subscribe` call is lazy evaluated, > by > >> the > >> >> >> time > >> >> >> > >> > > >> >> >> `seekToBeginning()` > >> >> >> > >> > > >> >> >> >>> > is called no partition is assigned yet, > and > >> >> hence > >> >> >> it > >> >> >> > is > >> >> >> > >> > > >> >> effectively > >> >> >> > >> > > >> >> >> an > >> >> >> > >> > > >> >> >> >>> > no-op. > >> >> >> > >> > > >> >> >> >>> > > >> >> >> > >> > > >> >> >> >>> > Try > >> >> >> > >> > > >> >> >> >>> > > >> >> >> > >> > > >> >> >> >>> > consumer.subscribe(topics) > >> >> >> > >> > > >> >> >> >>> > consumer.poll(0); // get assigned > >> >> partitions > >> >> >> > >> > > >> >> >> >>> > consumer.seekToBeginning() > >> >> >> > >> > > >> >> >> >>> > consumer.poll(0) > >> >> >> > >> > > >> >> >> >>> > > >> >> >> > >> > > >> >> >> >>> > to see if that works. > >> >> >> > >> > > >> >> >> >>> > > >> >> >> > >> > > >> >> >> >>> > I think it is a valid issue that can be > >> fixed in > >> >> >> the > >> >> >> > new > >> >> >> > >> > > >> consumer > >> >> >> > >> > > >> >> >> that, > >> >> >> > >> > > >> >> >> >>> > upon calling seekToEnd/Beginning with no > >> >> parameter, > >> >> >> > >> while > >> >> >> > >> > no > >> >> >> > >> > > >> >> >> assigned is > >> >> >> > >> > > >> >> >> >>> > done yet, do the coordination behind the > >> scene; > >> >> it > >> >> >> > will > >> >> >> > >> > > though > >> >> >> > >> > > >> >> >> change the > >> >> >> > >> > > >> >> >> >>> > behavior of the functions as they are no > >> longer > >> >> >> > always > >> >> >> > >> > > lazily > >> >> >> > >> > > >> >> >> evaluated. > >> >> >> > >> > > >> >> >> >>> > > >> >> >> > >> > > >> >> >> >>> > > >> >> >> > >> > > >> >> >> >>> > Guozhang > >> >> >> > >> > > >> >> >> >>> > > >> >> >> > >> > > >> >> >> >>> > > >> >> >> > >> > > >> >> >> >>> > On Tue, Mar 8, 2016 at 2:08 PM, Cody > >> Koeninger < > >> >> >> > >> > > >> >> c...@koeninger.org> > >> >> >> > >> > > >> >> >> >>> wrote: > >> >> >> > >> > > >> >> >> >>> > > >> >> >> > >> > > >> >> >> >>> >> Using the 0.9 consumer, I would like to > >> start > >> >> >> > consuming > >> >> >> > >> > at > >> >> >> > >> > > >> the > >> >> >> > >> > > >> >> >> >>> >> beginning or end, without specifying > >> >> >> > auto.offset.reset. > >> >> >> > >> > > >> >> >> >>> >> > >> >> >> > >> > > >> >> >> >>> >> This does not seem to be possible: > >> >> >> > >> > > >> >> >> >>> >> > >> >> >> > >> > > >> >> >> >>> >> val kafkaParams = Map[String, > Object]( > >> >> >> > >> > > >> >> >> >>> >> "bootstrap.servers" -> > >> >> >> > >> > > conf.getString("kafka.brokers"), > >> >> >> > >> > > >> >> >> >>> >> "key.deserializer" -> > >> >> >> > >> classOf[StringDeserializer], > >> >> >> > >> > > >> >> >> >>> >> "value.deserializer" -> > >> >> >> > >> > classOf[StringDeserializer], > >> >> >> > >> > > >> >> >> >>> >> "group.id" -> "example", > >> >> >> > >> > > >> >> >> >>> >> "auto.offset.reset" -> "none" > >> >> >> > >> > > >> >> >> >>> >> ).asJava > >> >> >> > >> > > >> >> >> >>> >> val topics = > >> >> >> > >> > > >> >> >> > >> >> conf.getString("kafka.topics").split(",").toList.asJava > >> >> >> > >> > > >> >> >> >>> >> val consumer = new > KafkaConsumer[String, > >> >> >> > >> > > >> String](kafkaParams) > >> >> >> > >> > > >> >> >> >>> >> consumer.subscribe(topics) > >> >> >> > >> > > >> >> >> >>> >> consumer.seekToBeginning() > >> >> >> > >> > > >> >> >> >>> >> consumer.poll(0) > >> >> >> > >> > > >> >> >> >>> >> > >> >> >> > >> > > >> >> >> >>> >> > >> >> >> > >> > > >> >> >> >>> >> Results in: > >> >> >> > >> > > >> >> >> >>> >> > >> >> >> > >> > > >> >> >> >>> >> Exception in thread "main" > >> >> >> > >> > > >> >> >> >>> >> > >> >> >> > >> > > >> > >> >> >> org.apache.kafka.clients.consumer.NoOffsetForPartitionException: > >> >> >> > >> > > >> >> >> >>> >> Undefined offset with no reset policy for > >> >> >> partition: > >> >> >> > >> > > >> testtwo-4 > >> >> >> > >> > > >> >> >> >>> >> at > >> >> >> > >> > > >> >> >> >>> >> > >> >> >> > >> > > >> >> >> >>> > >> >> >> > >> > > >> >> >> > >> >> >> > >> > > >> >> > >> >> >> > >> > > >> > >> >> >> > >> > > > >> >> >> > >> > > >> >> >> > >> > >> >> >> > > >> >> >> > >> >> > >> > org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:288) > >> >> >> > >> > > >> >> >> >>> >> at > >> >> >> > >> > > >> >> >> >>> >> > >> >> >> > >> > > >> >> >> >>> > >> >> >> > >> > > >> >> >> > >> >> >> > >> > > >> >> > >> >> >> > >> > > >> > >> >> >> > >> > > > >> >> >> > >> > > >> >> >> > >> > >> >> >> > > >> >> >> > >> >> > >> > org.apache.kafka.clients.consumer.internals.Fetcher.updateFetchPositions(Fetcher.java:167) > >> >> >> > >> > > >> >> >> >>> >> at > >> >> >> > >> > > >> >> >> >>> >> > >> >> >> > >> > > >> >> >> >>> > >> >> >> > >> > > >> >> >> > >> >> >> > >> > > >> >> > >> >> >> > >> > > >> > >> >> >> > >> > > > >> >> >> > >> > > >> >> >> > >> > >> >> >> > > >> >> >> > >> >> > >> > org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1302) > >> >> >> > >> > > >> >> >> >>> >> at > >> >> >> > >> > > >> >> >> >>> >> > >> >> >> > >> > > >> >> >> >>> > >> >> >> > >> > > >> >> >> > >> >> >> > >> > > >> >> > >> >> >> > >> > > >> > >> >> >> > >> > > > >> >> >> > >> > > >> >> >> > >> > >> >> >> > > >> >> >> > >> >> > >> > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:895) > >> >> >> > >> > > >> >> >> >>> >> at > >> >> >> > >> > > >> >> >> >>> >> > >> >> >> > >> > > >> >> >> >>> > >> >> >> > >> > > >> >> >> > >> >> >> > >> > > >> >> > >> >> >> > >> > > >> > >> >> >> > >> > > > >> >> >> > >> > > >> >> >> > >> > >> >> >> > > >> >> >> > >> >> > >> > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853) > >> >> >> > >> > > >> >> >> >>> >> at > >> >> >> > >> > > >> >> >> > >> >> >> > example.BasicKafkaConsumer$.main(BasicKafkaConsumer.scala:25) > >> >> >> > >> > > >> >> >> >>> >> > >> >> >> > >> > > >> >> >> >>> >> > >> >> >> > >> > > >> >> >> >>> >> I'm assuming this is because, at the time > >> >> >> > >> > seekToBeginning() > >> >> >> > >> > > >> is > >> >> >> > >> > > >> >> >> called, > >> >> >> > >> > > >> >> >> >>> >> subscriptions.assignedPartitions isn't > >> >> populated. > >> >> >> > But > >> >> >> > >> > > >> polling in > >> >> >> > >> > > >> >> >> >>> >> order to assign topicpartitions results > in > >> an > >> >> >> error, > >> >> >> > >> > which > >> >> >> > >> > > >> >> creates a > >> >> >> > >> > > >> >> >> >>> >> chicken-or-the-egg situation. > >> >> >> > >> > > >> >> >> >>> >> > >> >> >> > >> > > >> >> >> >>> >> I don't want to set auto.offset.reset, > >> because > >> >> I > >> >> >> > want a > >> >> >> > >> > > hard > >> >> >> > >> > > >> >> error > >> >> >> > >> > > >> >> >> if > >> >> >> > >> > > >> >> >> >>> >> the offsets are out of range at any other > >> time > >> >> >> > during > >> >> >> > >> > > >> >> consumption. > >> >> >> > >> > > >> >> >> >>> >> > >> >> >> > >> > > >> >> >> >>> > > >> >> >> > >> > > >> >> >> >>> > > >> >> >> > >> > > >> >> >> >>> > > >> >> >> > >> > > >> >> >> >>> > -- > >> >> >> > >> > > >> >> >> >>> > -- Guozhang > >> >> >> > >> > > >> >> >> >>> > >> >> >> > >> > > >> >> >> > >> >> >> > >> > > >> >> > > >> >> >> > >> > > >> >> > > >> >> >> > >> > > >> >> > > >> >> >> > >> > > >> >> > -- > >> >> >> > >> > > >> >> > -- Guozhang > >> >> >> > >> > > >> >> > >> >> >> > >> > > >> > > >> >> >> > >> > > >> > > >> >> >> > >> > > >> > > >> >> >> > >> > > >> > -- > >> >> >> > >> > > >> > -- Guozhang > >> >> >> > >> > > >> > >> >> >> > >> > > > > >> >> >> > >> > > > > >> >> >> > >> > > > > >> >> >> > >> > > > -- > >> >> >> > >> > > > -- Guozhang > >> >> >> > >> > > > > >> >> >> > >> > > > >> >> >> > >> > > > >> >> >> > >> > > > >> >> >> > >> > > -- > >> >> >> > >> > > -- Guozhang > >> >> >> > >> > > > >> >> >> > >> > > >> >> >> > >> > >> >> >> > >> > >> >> >> > >> > >> >> >> > >> -- > >> >> >> > >> -- Guozhang > >> >> >> > >> > >> >> >> > > >> >> >> > >> >> > > >> >> > > >> >> > > >> >> > -- > >> >> > -- Guozhang > >> >> > >> > > >> > > >> > > >> > -- > >> > -- Guozhang > >> >