I can't get initial subscriptions without poll. As far as I can tell, I won't get updated subscriptions (because a partition was added or another topic matching the pattern was added) without poll either, right?
I'll take a look at those jiras. On Mon, Mar 14, 2016 at 4:56 PM, Jason Gustafson <ja...@confluent.io> wrote: > The offset API is definitely a gap at the moment. I think there were some > problems with the old consumer's API and we wanted to make sure we didn't > make the same mistakes. Unfortunately, I'm not sure anyone has had the time > to give this the attention it needs. Here's a couple JIRAS if you want to > have a look: > > https://issues.apache.org/jira/browse/KAFKA-2076 > https://issues.apache.org/jira/browse/KAFKA-2500 > > For now, the workaround above should work. However, you should be able to > skip the call to poll() between the seek since position() will block to get > the new offset. > > -Jason > > On Mon, Mar 14, 2016 at 2:37 PM, Cody Koeninger <c...@koeninger.org> wrote: > >> Sorry, by metadata I also meant the equivalent of the old >> OffsetRequest api, which partitionsFor doesn't give you. I understand >> why you didn't want to expose the broken "offsets before a certain >> time" api, but I don't understand why equivalent functionality for >> first or last offset isn't available in the new consumer. >> >> For instance, I need to know the log end offset, preferably without >> consuming messages. >> >> So it sounds like you're suggesting something like this: >> >> val kc = KafkaConsumer(...) >> kc.subscribe(somePattern) >> >> while(...) { >> kc.assignment.foreach(tp => kc.pause(tp)) >> kc.seekToEnd() >> kc.poll(0) >> val logEnd = kc.assignment.map(tp => kc.position(tp)) >> ... do something with logEnd >> } >> >> Seems like that will work. >> >> Once timestamp indexing is implemented and an actual getOffsetsBefore >> api could work, are people still going to have to mutate consumer >> position just to get information? Is this a case of just wanting to >> hide as much as possible, even if that means making some things harder >> than they were with the old simple consumer? >> >> >> >> >> >> On Mon, Mar 14, 2016 at 2:35 PM, Jason Gustafson <ja...@confluent.io> >> wrote: >> > Ah, that makes more sense. I have no idea about the limitations of your >> use >> > case, but maybe you could expose a different interface to users. >> > >> > interface RebalanceListener { >> > void onPartitionsAssigned(Consumer<K, V> consumer, >> > Collection<TopicPartition> partitions); >> > void onPartitionsRevoked(Consumer<K, V> consumer, >> > Collection<TopicPartition> partitions); >> > } >> > >> > Then you can adapt it internally in the call to subscribe(). Would that >> > work? >> > >> > Also, we already have partitionsFor() to fetch topic metadata. And you >> can >> > look at the pause/resume API which lets you call poll() without consuming >> > messages. We'd probably want to understand why those are insufficient >> > before considering new APIs. >> > >> > -Jason >> > >> > On Mon, Mar 14, 2016 at 12:17 PM, Cody Koeninger <c...@koeninger.org> >> wrote: >> > >> >> Regarding the rebalance listener, in the case of the spark >> >> integration, it is possible a job can fail and be restarted from >> >> checkpoint in a new jvm. That means that you need to be able to >> >> reconstruct objects. Any reasonable rebalance listener can't have a >> >> 0-arg constructor, because it needs a reference to the consumer in >> >> order to do anything useful. I can get around this by making the user >> >> provide a 0-arg function to return a fully configured + subscribed >> >> Kafka consumer, so that they get to do the dance of constructing a >> >> consumer before constructing the listener. I don't think the current >> >> rebalance listener is a dealbreaker, I just think it is an >> >> unnecessarily awkward API. >> >> >> >> Regarding poll(), I honestly don't see what the problem is with >> >> keeping poll() behavior as is, but also having public methods to e.g. >> >> fetch metadata, for people that need that functionality without >> >> actually consuming messages. >> >> >> >> On Mon, Mar 14, 2016 at 1:01 PM, Jason Gustafson <ja...@confluent.io> >> >> wrote: >> >> > Late arrival to this discussion. I'm not really sure I see the problem >> >> with >> >> > accessing the consumer in the rebalance listener. Before we passed the >> >> > consumer instance as a separate argument, but that was only because >> the >> >> > rebalance listener had to be passed by classname before a reference to >> >> the >> >> > consumer was available. After we changed the API to pass it in >> >> subscribe() >> >> > instead, getting a reference to the consumer shouldn't be a problem. >> >> Maybe >> >> > I'm missing something? >> >> > >> >> > As for poll() doing everything... It's definitely caused some >> confusion, >> >> > but I'm a little doubtful that trying to split out the functionality >> is >> >> > going to help as much as people think. A heartbeat() API has come up >> >> > several times, for example, but you have to figure out how it's going >> to >> >> > affect rebalancing. And rebalancing affects which partitions need to >> be >> >> > fetched or can be committed. They're all so intertwined that I'm not >> sure >> >> > you can divide them up without creating a bigger problem than the one >> >> > you're trying to solve. At this point, with 0.10 shortly on the way, >> it >> >> > seems unlikely that incompatible changes to the API will be accepted. >> >> > However, if someone can propose a compatible solution which addresses >> >> some >> >> > of the concerns mentioned, we'd love to hear about it! >> >> > >> >> > -Jason >> >> > >> >> > On Mon, Mar 14, 2016 at 9:21 AM, Cody Koeninger <c...@koeninger.org> >> >> wrote: >> >> > >> >> >> Honestly the fact that everything is hidden inside poll() has been >> >> >> confusing people since last year, e.g. >> >> >> >> >> >> https://issues.apache.org/jira/browse/KAFKA-2359 >> >> >> >> >> >> I can try to formulate a KIP for this, but it seems clear that I'm >> not >> >> >> the only one giving this feedback, and I may not understand all the >> >> >> other use cases that have been brought up. >> >> >> >> >> >> On Sun, Mar 13, 2016 at 3:09 PM, Guozhang Wang <wangg...@gmail.com> >> >> wrote: >> >> >> > Cody, >> >> >> > >> >> >> > We do not have an umbrella JIRA for this, but rather a case-by-case >> >> JIRA >> >> >> > ticket / KIP for API changes in consumer. >> >> >> > >> >> >> > If you feel strong about some specific change on the consumer API, >> >> please >> >> >> > feel free to create a new KIP with the detailed motivation and >> >> proposed >> >> >> > modifications. >> >> >> > >> >> >> > Guozhang >> >> >> > >> >> >> > On Fri, Mar 11, 2016 at 12:28 PM, Cody Koeninger < >> c...@koeninger.org> >> >> >> wrote: >> >> >> > >> >> >> >> Is there a KIP or Jira related to " working on improving these >> cases >> >> >> >> with improved APIs " ? >> >> >> >> >> >> >> >> I saw that there was some discussion of it in KIP-41, but that >> seemed >> >> >> >> to have been resolved in favor of keeping everything inside of >> poll() >> >> >> >> >> >> >> >> On Fri, Mar 11, 2016 at 11:17 AM, Guozhang Wang < >> wangg...@gmail.com> >> >> >> >> wrote: >> >> >> >> > Cody, Mansi: >> >> >> >> > >> >> >> >> > All good points! Let me try to answer them one-by-one. >> >> >> >> > >> >> >> >> > About this specific issue, as I suggested in the JIRA we can >> >> separate >> >> >> the >> >> >> >> > case about resetting offset upon initializing a partition to >> fetch, >> >> >> from >> >> >> >> > the case that fetching offset out-of-range in the >> auto.offset.reset >> >> >> >> config. >> >> >> >> > These two scenarios are indeed quite different and it's >> reasonable >> >> >> >> treating >> >> >> >> > them differently. >> >> >> >> > >> >> >> >> > About passing a consumer context to the rebalance callback's >> >> >> constructor, >> >> >> >> > we left it for user's flexibility: if you want to use Kafka to >> >> commit >> >> >> >> > offsets, for example, then you pass the consumer reference to >> the >> >> >> >> callback; >> >> >> >> > if you use an external service to store offsets, you can pass a >> >> JDBC >> >> >> >> > connector, for example, to the callback; for some data mirroring >> >> you >> >> >> can >> >> >> >> > even pass in another producer client into it. Always enforcing >> the >> >> >> >> consumer >> >> >> >> > context could be convenient (i.e. you do not need to pass in the >> >> >> argument >> >> >> >> > to the constructor yourself) for some use cases, but not >> >> necessarily >> >> >> all. >> >> >> >> > >> >> >> >> > About wrapping coordination protocols (partition assignment, >> >> >> heartbeat) >> >> >> >> > inside "poll()" behind the scene, we implemented the APIs in >> this >> >> way >> >> >> in >> >> >> >> > order to abstract the underlying details from the users, and >> also >> >> to >> >> >> >> > provide a simple "single-thread-poll-loop" design pattern in the >> >> new >> >> >> >> > Consumer. We realized that it does actually make some of the use >> >> cases >> >> >> >> more >> >> >> >> > awkward, and are working on improving these cases with improved >> >> APIs >> >> >> as >> >> >> >> > well. Let us know if you have any suggestions about this. >> >> >> >> > >> >> >> >> > Guozhang >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > On Thu, Mar 10, 2016 at 7:53 AM, Mansi Shah < >> >> mansis...@maprtech.com> >> >> >> >> wrote: >> >> >> >> > >> >> >> >> >> I second the need for having a consumer context passed to >> >> rebalance >> >> >> >> >> callback. I have ran into issues several times because of that. >> >> >> >> >> >> >> >> >> >> About - subscribe vs assign - I have not read through your >> spark >> >> code >> >> >> >> yet >> >> >> >> >> (will do by eod), so I am not sure what you mean (other than I >> do >> >> >> agree >> >> >> >> >> that new partitions should be consumed automatically). I guess >> we >> >> can >> >> >> >> >> continue this discussion on the spark list then :-) >> >> >> >> >> >> >> >> >> >> Thanks >> >> >> >> >> Mansi. >> >> >> >> >> >> >> >> >> >> On Thu, Mar 10, 2016 at 7:43 AM, Cody Koeninger < >> >> c...@koeninger.org> >> >> >> >> >> wrote: >> >> >> >> >> >> >> >> >> >> > Mansi, I'd agree that the fact that everything is tied up in >> >> poll >> >> >> >> >> > seems like the source of the awkward behavior. >> >> >> >> >> > >> >> >> >> >> > Regarding assign vs subscribe, most people using the spark >> >> >> integration >> >> >> >> >> > are just going to want to provide a topic name, not go figure >> >> out a >> >> >> >> >> > bunch of partitions. They're also going to be surprised if >> >> things >> >> >> >> >> > suddenly blow up once a partition is added, or that partition >> >> >> doesn't >> >> >> >> >> > start being consumed (we already have that second issue >> today). >> >> >> >> >> > >> >> >> >> >> > Thats why separating the behavior of auto offset reset seems >> >> like >> >> >> the >> >> >> >> >> > best idea I've heard so far. >> >> >> >> >> > >> >> >> >> >> > Consumer rebalance listeners are still probably going to be >> >> >> necessary >> >> >> >> >> > for people who are storing offsets externally. >> >> >> >> >> > >> >> >> >> >> > On Thu, Mar 10, 2016 at 9:27 AM, Mansi Shah < >> >> >> mansis...@maprtech.com> >> >> >> >> >> > wrote: >> >> >> >> >> > > Guozhang >> >> >> >> >> > > >> >> >> >> >> > > Sorry for joining the party a little late. I have been >> >> thinking >> >> >> >> about >> >> >> >> >> > this >> >> >> >> >> > > whole awkward behavior of having to call poll(0) to >> actually >> >> make >> >> >> >> the >> >> >> >> >> > > underlying subscriptions take effect. Is the core reason >> for >> >> this >> >> >> >> >> design >> >> >> >> >> > > the fact that poll is also the actual heartbeat and you >> want >> >> to >> >> >> make >> >> >> >> >> the >> >> >> >> >> > > listener group assignments through poll - so that timeouts >> and >> >> >> >> >> > > reassignments can all go through poll? So I think clubbing >> >> >> liveness >> >> >> >> >> with >> >> >> >> >> > > poll (which in effect clubs consumer group assignments and >> >> hence >> >> >> >> >> metadata >> >> >> >> >> > > fetch with poll) is the real cause of this design. Were >> there >> >> >> issues >> >> >> >> >> > where >> >> >> >> >> > > you were seeing active consumers not calling poll that led >> to >> >> >> this >> >> >> >> >> design >> >> >> >> >> > > choice? I tried to look for a relevant JIRA but could not >> find >> >> >> one - >> >> >> >> >> can >> >> >> >> >> > > you please point me to something if you have it handy? >> >> >> >> >> > > >> >> >> >> >> > > Btw this would also means that your proposal to do the >> actual >> >> >> >> >> assignments >> >> >> >> >> > > through seek might not be ideal since there can still be >> >> >> indefinite >> >> >> >> >> time >> >> >> >> >> > > between seek and poll (just like between subscribe and >> poll) >> >> and >> >> >> the >> >> >> >> >> > > consumer could timeout even before the first poll is >> called? >> >> >> >> >> > > >> >> >> >> >> > > >> >> >> >> >> > > @Cody in your case if you really have only one consumer and >> >> it is >> >> >> >> going >> >> >> >> >> > to >> >> >> >> >> > > get all the partitions of the topic anyway - then you >> might as >> >> >> well >> >> >> >> >> > > subscribe using "assign" call instead of "subscribe" call. >> >> That >> >> >> will >> >> >> >> >> make >> >> >> >> >> > > at least your code cleaner and I do not think you are >> gaining >> >> >> >> anything >> >> >> >> >> > with >> >> >> >> >> > > the listener group functionality anyway? >> >> >> >> >> > > >> >> >> >> >> > > - Mansi. >> >> >> >> >> > > >> >> >> >> >> > > >> >> >> >> >> > > >> >> >> >> >> > > On Wed, Mar 9, 2016 at 8:35 PM, Guozhang Wang < >> >> >> wangg...@gmail.com> >> >> >> >> >> > wrote: >> >> >> >> >> > > >> >> >> >> >> > >> In order to do anything meaningful with the consumer >> itself >> >> in >> >> >> >> >> rebalance >> >> >> >> >> > >> callback (e.g. commit offset), you would need to hold on >> the >> >> >> >> consumer >> >> >> >> >> > >> reference; admittedly it sounds a bit awkward, but by >> design >> >> we >> >> >> >> choose >> >> >> >> >> > to >> >> >> >> >> > >> not enforce it in the interface itself. >> >> >> >> >> > >> >> >> >> >> >> > >> Guozhang >> >> >> >> >> > >> >> >> >> >> >> > >> On Wed, Mar 9, 2016 at 3:39 PM, Cody Koeninger < >> >> >> c...@koeninger.org >> >> >> >> > >> >> >> >> >> > wrote: >> >> >> >> >> > >> >> >> >> >> >> > >> > So what about my comments regarding the consumer >> rebalance >> >> >> >> listener >> >> >> >> >> > >> > interface not providing access to a consumer? I can >> >> probably >> >> >> >> work >> >> >> >> >> > around >> >> >> >> >> > >> > it, but it seems odd. >> >> >> >> >> > >> > On Mar 9, 2016 5:04 PM, "Guozhang Wang" < >> >> wangg...@gmail.com> >> >> >> >> wrote: >> >> >> >> >> > >> > >> >> >> >> >> > >> > > One thing proposed by Jason: >> >> >> >> >> > >> > > >> >> >> >> >> > >> > > If you want to only reset offset upon initialization, >> >> and by >> >> >> >> >> > >> > initialization >> >> >> >> >> > >> > > you mean "no committed offset", you can do sth. like >> the >> >> >> >> following >> >> >> >> >> > in >> >> >> >> >> > >> > > rebalance callback. >> >> >> >> >> > >> > > >> >> >> >> >> > >> > > @Override >> >> >> >> >> > >> > > >> >> >> >> >> > >> > > public void >> >> >> >> >> > >> > onPartitionsAssigned(Collection<TopicPartition> >> >> >> >> >> > >> > > partitions) { >> >> >> >> >> > >> > > >> >> >> >> >> > >> > > for (TopicPartition partition : >> >> >> partitions) >> >> >> >> >> > >> > > >> >> >> >> >> > >> > > if >> >> (consumer.committed(partition) == >> >> >> >> null) >> >> >> >> >> > >> > > >> >> >> >> >> > >> > > >> >> >> >> consumer.seekToBeginning(partition); >> >> >> >> >> > >> > > >> >> >> >> >> > >> > > } >> >> >> >> >> > >> > > >> >> >> >> >> > >> > > >> >> >> >> >> > >> > > Guozhang >> >> >> >> >> > >> > > >> >> >> >> >> > >> > > On Wed, Mar 9, 2016 at 2:11 PM, Guozhang Wang < >> >> >> >> wangg...@gmail.com >> >> >> >> >> > >> >> >> >> >> > >> > wrote: >> >> >> >> >> > >> > > >> >> >> >> >> > >> > > > Filed >> https://issues.apache.org/jira/browse/KAFKA-3370 >> >> . >> >> >> >> >> > >> > > > >> >> >> >> >> > >> > > > On Wed, Mar 9, 2016 at 1:11 PM, Cody Koeninger < >> >> >> >> >> > c...@koeninger.org> >> >> >> >> >> > >> > > wrote: >> >> >> >> >> > >> > > > >> >> >> >> >> > >> > > >> That sounds like an interesting way of addressing >> the >> >> >> >> problem, >> >> >> >> >> > can >> >> >> >> >> > >> > > >> continue further discussions on the JIRA >> >> >> >> >> > >> > > >> >> >> >> >> >> > >> > > >> >> >> >> >> >> > >> > > >> >> >> >> >> >> > >> > > >> On Wed, Mar 9, 2016 at 2:59 PM, Guozhang Wang < >> >> >> >> >> > wangg...@gmail.com> >> >> >> >> >> > >> > > wrote: >> >> >> >> >> > >> > > >> > Cody: >> >> >> >> >> > >> > > >> > >> >> >> >> >> > >> > > >> > More specifically, you do not need the >> "listTopics" >> >> >> >> function >> >> >> >> >> if >> >> >> >> >> > >> you >> >> >> >> >> > >> > > >> already >> >> >> >> >> > >> > > >> > know your subscribed topics, just use >> >> "partitionsFor" >> >> >> is >> >> >> >> >> > >> sufficient. >> >> >> >> >> > >> > > >> > >> >> >> >> >> > >> > > >> > About the fix, I'm thinking of adding two more >> >> options >> >> >> in >> >> >> >> the >> >> >> >> >> > >> > > >> > auto.offset.rest, say namely "earliest-on-start" >> and >> >> >> >> >> > >> > > "latest-on-start", >> >> >> >> >> > >> > > >> > which sets the reset position ONLY at starting >> up. >> >> The >> >> >> >> reason >> >> >> >> >> > is >> >> >> >> >> > >> > that >> >> >> >> >> > >> > > >> the >> >> >> >> >> > >> > > >> > seekToXX was actually not designed to do such >> >> >> >> initialization >> >> >> >> >> > but >> >> >> >> >> > >> for >> >> >> >> >> > >> > > >> > calling during the lifetime of the consumer, and >> >> we'd >> >> >> >> better >> >> >> >> >> > >> provide >> >> >> >> >> > >> > > the >> >> >> >> >> > >> > > >> > right solution to do so. >> >> >> >> >> > >> > > >> > >> >> >> >> >> > >> > > >> > I can file the JIRA right away and start further >> >> >> >> discussions >> >> >> >> >> > >> there. >> >> >> >> >> > >> > > But >> >> >> >> >> > >> > > >> let >> >> >> >> >> > >> > > >> > me know if you have any other ideas. >> >> >> >> >> > >> > > >> > >> >> >> >> >> > >> > > >> > Guozhang >> >> >> >> >> > >> > > >> > >> >> >> >> >> > >> > > >> > On Wed, Mar 9, 2016 at 12:25 PM, Cody Koeninger < >> >> >> >> >> > >> c...@koeninger.org >> >> >> >> >> > >> > > >> >> >> >> >> > >> > > >> wrote: >> >> >> >> >> > >> > > >> > >> >> >> >> >> > >> > > >> >> Yeah, I think I understood what you were saying. >> >> What >> >> >> >> I'm >> >> >> >> >> > saying >> >> >> >> >> > >> > is >> >> >> >> >> > >> > > >> >> that if there were a way to just fetch metadata >> >> >> without >> >> >> >> >> doing >> >> >> >> >> > the >> >> >> >> >> > >> > > rest >> >> >> >> >> > >> > > >> >> of the work poll() does, it wouldn't be >> >> necessary. I >> >> >> >> guess >> >> >> >> >> I >> >> >> >> >> > can >> >> >> >> >> > >> > do >> >> >> >> >> > >> > > >> >> listTopics to get all metadata for all topics >> and >> >> then >> >> >> >> parse >> >> >> >> >> > it. >> >> >> >> >> > >> > > >> >> >> >> >> >> >> > >> > > >> >> Regarding running a single instance, that is the >> >> case >> >> >> for >> >> >> >> >> what >> >> >> >> >> > >> I'm >> >> >> >> >> > >> > > >> >> talking about. >> >> >> >> >> > >> > > >> >> >> >> >> >> >> > >> > > >> >> On Wed, Mar 9, 2016 at 2:02 PM, Guozhang Wang < >> >> >> >> >> > >> wangg...@gmail.com> >> >> >> >> >> > >> > > >> wrote: >> >> >> >> >> > >> > > >> >> > Cody, >> >> >> >> >> > >> > > >> >> > >> >> >> >> >> > >> > > >> >> > What I meant for a special case of `seekToXX` >> is >> >> >> that, >> >> >> >> >> today >> >> >> >> >> > >> when >> >> >> >> >> > >> > > the >> >> >> >> >> > >> > > >> >> > function is called with no partition >> parameters. >> >> It >> >> >> >> will >> >> >> >> >> > try to >> >> >> >> >> > >> > > >> execute >> >> >> >> >> > >> > > >> >> the >> >> >> >> >> > >> > > >> >> > logic on all "assigned" partitions for the >> >> consumer. >> >> >> >> And >> >> >> >> >> > once >> >> >> >> >> > >> > that >> >> >> >> >> > >> > > is >> >> >> >> >> > >> > > >> >> done, >> >> >> >> >> > >> > > >> >> > the subsequent poll() will not throw the >> >> exception >> >> >> >> since >> >> >> >> >> it >> >> >> >> >> > >> knows >> >> >> >> >> > >> > > >> those >> >> >> >> >> > >> > > >> >> > partitions needs to reset offsets. >> >> >> >> >> > >> > > >> >> > >> >> >> >> >> > >> > > >> >> > However for your case, there is no assigned >> >> >> partitions >> >> >> >> >> yet, >> >> >> >> >> > and >> >> >> >> >> > >> > > hence >> >> >> >> >> > >> > > >> >> > `seekToXX` will not take effects on any >> >> partitions. >> >> >> The >> >> >> >> >> > >> > assignment >> >> >> >> >> > >> > > is >> >> >> >> >> > >> > > >> >> > wrapped in the poll() call as you mentioned. >> And >> >> one >> >> >> >> way >> >> >> >> >> to >> >> >> >> >> > >> solve >> >> >> >> >> > >> > > it >> >> >> >> >> > >> > > >> is >> >> >> >> >> > >> > > >> >> to >> >> >> >> >> > >> > > >> >> > let the `seekToXX()` with no parameters do the >> >> >> >> >> coordination >> >> >> >> >> > and >> >> >> >> >> > >> > get >> >> >> >> >> > >> > > >> the >> >> >> >> >> > >> > > >> >> > assigned partitions if there are any >> subscribed >> >> >> >> topics, so >> >> >> >> >> > that >> >> >> >> >> > >> > the >> >> >> >> >> > >> > > >> >> > subsequent poll() will know those partitions >> need >> >> >> >> >> resetting >> >> >> >> >> > >> > > offsets. >> >> >> >> >> > >> > > >> Does >> >> >> >> >> > >> > > >> >> > that make sense? >> >> >> >> >> > >> > > >> >> > >> >> >> >> >> > >> > > >> >> > As for now another way I can think of is to >> get >> >> the >> >> >> >> >> > partition >> >> >> >> >> > >> > info >> >> >> >> >> > >> > > >> >> > beforehand and call `seekToBeginning` on all >> >> >> >> partitions. >> >> >> >> >> But >> >> >> >> >> > >> that >> >> >> >> >> > >> > > >> only >> >> >> >> >> > >> > > >> >> > works if the consumer knows it is going to get >> >> all >> >> >> the >> >> >> >> >> > >> partitions >> >> >> >> >> > >> > > >> >> assigned >> >> >> >> >> > >> > > >> >> > to itself (i.e. you are only running a single >> >> >> >> instance). >> >> >> >> >> > >> > > >> >> > >> >> >> >> >> > >> > > >> >> > Guozhang >> >> >> >> >> > >> > > >> >> > >> >> >> >> >> > >> > > >> >> > >> >> >> >> >> > >> > > >> >> > On Wed, Mar 9, 2016 at 6:22 AM, Cody >> Koeninger < >> >> >> >> >> > >> > c...@koeninger.org >> >> >> >> >> > >> > > > >> >> >> >> >> > >> > > >> >> wrote: >> >> >> >> >> > >> > > >> >> > >> >> >> >> >> > >> > > >> >> >> Another unfortunate thing about >> >> >> >> ConsumerRebalanceListener >> >> >> >> >> > is >> >> >> >> >> > >> > that >> >> >> >> >> > >> > > in >> >> >> >> >> > >> > > >> >> >> order to do meaningful work in the callback, >> you >> >> >> need >> >> >> >> a >> >> >> >> >> > >> > reference >> >> >> >> >> > >> > > to >> >> >> >> >> > >> > > >> >> >> the consumer that called it. But that >> reference >> >> >> isn't >> >> >> >> >> > >> provided >> >> >> >> >> > >> > to >> >> >> >> >> > >> > > >> the >> >> >> >> >> > >> > > >> >> >> callback, which means the listener >> >> implementation >> >> >> >> needs >> >> >> >> >> to >> >> >> >> >> > >> hold >> >> >> >> >> > >> > a >> >> >> >> >> > >> > > >> >> >> reference to the consumer. Seems like this >> >> makes >> >> >> it >> >> >> >> >> > >> > unnecessarily >> >> >> >> >> > >> > > >> >> >> awkward to serialize or provide a 0 arg >> >> constructor >> >> >> >> for >> >> >> >> >> the >> >> >> >> >> > >> > > >> listener. >> >> >> >> >> > >> > > >> >> >> >> >> >> >> >> > >> > > >> >> >> On Wed, Mar 9, 2016 at 7:28 AM, Cody >> Koeninger < >> >> >> >> >> > >> > > c...@koeninger.org> >> >> >> >> >> > >> > > >> >> wrote: >> >> >> >> >> > >> > > >> >> >> > I thought about ConsumerRebalanceListener, >> but >> >> >> >> seeking >> >> >> >> >> to >> >> >> >> >> > >> the >> >> >> >> >> > >> > > >> >> >> > beginning any time there's a rebalance for >> >> >> whatever >> >> >> >> >> > reason >> >> >> >> >> > >> is >> >> >> >> >> > >> > > not >> >> >> >> >> > >> > > >> >> >> > necessarily the same thing as seeking to >> the >> >> >> >> beginning >> >> >> >> >> > >> before >> >> >> >> >> > >> > > >> first >> >> >> >> >> > >> > > >> >> >> > starting the consumer. >> >> >> >> >> > >> > > >> >> >> > >> >> >> >> >> > >> > > >> >> >> > On Wed, Mar 9, 2016 at 2:24 AM, Kamal C < >> >> >> >> >> > >> > kamaltar...@gmail.com> >> >> >> >> >> > >> > > >> >> wrote: >> >> >> >> >> > >> > > >> >> >> >> Cody, >> >> >> >> >> > >> > > >> >> >> >> >> >> >> >> >> > >> > > >> >> >> >> Use ConsumerRebalanceListener to achieve >> >> that, >> >> >> >> >> > >> > > >> >> >> >> >> >> >> >> >> > >> > > >> >> >> >> ConsumerRebalanceListener listener = new >> >> >> >> >> > >> > > >> ConsumerRebalanceListener() >> >> >> >> >> > >> > > >> >> { >> >> >> >> >> > >> > > >> >> >> >> >> >> >> >> >> > >> > > >> >> >> >> @Override >> >> >> >> >> > >> > > >> >> >> >> public void >> >> >> >> >> > >> > > >> >> onPartitionsRevoked(Collection<TopicPartition> >> >> >> >> >> > >> > > >> >> >> >> partitions) { >> >> >> >> >> > >> > > >> >> >> >> } >> >> >> >> >> > >> > > >> >> >> >> >> >> >> >> >> > >> > > >> >> >> >> @Override >> >> >> >> >> > >> > > >> >> >> >> public void >> >> >> >> >> > >> > > >> >> onPartitionsAssigned(Collection<TopicPartition> >> >> >> >> >> > >> > > >> >> >> >> partitions) { >> >> >> >> >> > >> > > >> >> >> >> >> >> >> >> >> > >> > consumer.seekToBeginning(partitions.toArray(new >> >> >> >> >> > >> > > >> >> >> >> TopicPartition[0])); >> >> >> >> >> > >> > > >> >> >> >> } >> >> >> >> >> > >> > > >> >> >> >> }; >> >> >> >> >> > >> > > >> >> >> >> >> >> >> >> >> > >> > > >> >> >> >> consumer.subscribe(topics, listener); >> >> >> >> >> > >> > > >> >> >> >> >> >> >> >> >> > >> > > >> >> >> >> On Wed, Mar 9, 2016 at 12:05 PM, Cody >> >> Koeninger >> >> >> < >> >> >> >> >> > >> > > >> c...@koeninger.org> >> >> >> >> >> > >> > > >> >> >> wrote: >> >> >> >> >> > >> > > >> >> >> >> >> >> >> >> >> > >> > > >> >> >> >>> That suggestion doesn't work, for pretty >> >> much >> >> >> the >> >> >> >> >> same >> >> >> >> >> > >> > reason >> >> >> >> >> > >> > > - >> >> >> >> >> > >> > > >> at >> >> >> >> >> > >> > > >> >> the >> >> >> >> >> > >> > > >> >> >> >>> time poll is first called, there is no >> reset >> >> >> >> policy >> >> >> >> >> > and no >> >> >> >> >> > >> > > >> committed >> >> >> >> >> > >> > > >> >> >> >>> offset, so NoOffsetForPartitionException >> is >> >> >> thrown >> >> >> >> >> > >> > > >> >> >> >>> >> >> >> >> >> > >> > > >> >> >> >>> I feel like the underlying problem isn't >> so >> >> >> much >> >> >> >> that >> >> >> >> >> > >> > > seekToEnd >> >> >> >> >> > >> > > >> >> needs >> >> >> >> >> > >> > > >> >> >> >>> special case behavior. It's more that >> >> topic >> >> >> >> >> metadata >> >> >> >> >> > >> > > fetches, >> >> >> >> >> > >> > > >> >> >> >>> consumer position fetches, and message >> >> fetches >> >> >> are >> >> >> >> >> all >> >> >> >> >> > >> > lumped >> >> >> >> >> > >> > > >> >> together >> >> >> >> >> > >> > > >> >> >> >>> under a single poll() call, with no way >> to >> >> do >> >> >> them >> >> >> >> >> > >> > > individually >> >> >> >> >> > >> > > >> if >> >> >> >> >> > >> > > >> >> >> >>> necessary. >> >> >> >> >> > >> > > >> >> >> >>> >> >> >> >> >> > >> > > >> >> >> >>> What does "work" in this situation is to >> >> just >> >> >> >> catch >> >> >> >> >> the >> >> >> >> >> > >> > > >> exception >> >> >> >> >> > >> > > >> >> >> >>> (which leaves the consumer in a state >> where >> >> >> topics >> >> >> >> >> are >> >> >> >> >> > >> > > >> assigned) and >> >> >> >> >> > >> > > >> >> >> >>> then seek. But that is not exactly an >> >> elegant >> >> >> >> >> > interface. >> >> >> >> >> > >> > > >> >> >> >>> >> >> >> >> >> > >> > > >> >> >> >>> consumer.subscribe(topics) >> >> >> >> >> > >> > > >> >> >> >>> try { >> >> >> >> >> > >> > > >> >> >> >>> consumer.poll(0) >> >> >> >> >> > >> > > >> >> >> >>> } catch { >> >> >> >> >> > >> > > >> >> >> >>> case x: Throwable => >> >> >> >> >> > >> > > >> >> >> >>> } >> >> >> >> >> > >> > > >> >> >> >>> consumer.seekToBeginning() >> >> >> >> >> > >> > > >> >> >> >>> consumer.poll(0) >> >> >> >> >> > >> > > >> >> >> >>> >> >> >> >> >> > >> > > >> >> >> >>> >> >> >> >> >> > >> > > >> >> >> >>> >> >> >> >> >> > >> > > >> >> >> >>> >> >> >> >> >> > >> > > >> >> >> >>> On Tue, Mar 8, 2016 at 11:22 PM, Guozhang >> >> Wang >> >> >> < >> >> >> >> >> > >> > > >> wangg...@gmail.com> >> >> >> >> >> > >> > > >> >> >> wrote: >> >> >> >> >> > >> > > >> >> >> >>> > Hi Cody, >> >> >> >> >> > >> > > >> >> >> >>> > >> >> >> >> >> > >> > > >> >> >> >>> > The problem with that code is in >> >> >> >> >> `seekToBeginning()` >> >> >> >> >> > >> > > followed >> >> >> >> >> > >> > > >> by >> >> >> >> >> > >> > > >> >> >> >>> > `subscribe(topic)`. >> >> >> >> >> > >> > > >> >> >> >>> > >> >> >> >> >> > >> > > >> >> >> >>> > Since `subscribe` call is lazy >> evaluated, >> >> by >> >> >> the >> >> >> >> >> time >> >> >> >> >> > >> > > >> >> >> `seekToBeginning()` >> >> >> >> >> > >> > > >> >> >> >>> > is called no partition is assigned yet, >> >> and >> >> >> >> hence >> >> >> >> >> it >> >> >> >> >> > is >> >> >> >> >> > >> > > >> >> effectively >> >> >> >> >> > >> > > >> >> >> an >> >> >> >> >> > >> > > >> >> >> >>> > no-op. >> >> >> >> >> > >> > > >> >> >> >>> > >> >> >> >> >> > >> > > >> >> >> >>> > Try >> >> >> >> >> > >> > > >> >> >> >>> > >> >> >> >> >> > >> > > >> >> >> >>> > consumer.subscribe(topics) >> >> >> >> >> > >> > > >> >> >> >>> > consumer.poll(0); // get assigned >> >> >> >> partitions >> >> >> >> >> > >> > > >> >> >> >>> > consumer.seekToBeginning() >> >> >> >> >> > >> > > >> >> >> >>> > consumer.poll(0) >> >> >> >> >> > >> > > >> >> >> >>> > >> >> >> >> >> > >> > > >> >> >> >>> > to see if that works. >> >> >> >> >> > >> > > >> >> >> >>> > >> >> >> >> >> > >> > > >> >> >> >>> > I think it is a valid issue that can be >> >> >> fixed in >> >> >> >> >> the >> >> >> >> >> > new >> >> >> >> >> > >> > > >> consumer >> >> >> >> >> > >> > > >> >> >> that, >> >> >> >> >> > >> > > >> >> >> >>> > upon calling seekToEnd/Beginning with >> no >> >> >> >> parameter, >> >> >> >> >> > >> while >> >> >> >> >> > >> > no >> >> >> >> >> > >> > > >> >> >> assigned is >> >> >> >> >> > >> > > >> >> >> >>> > done yet, do the coordination behind >> the >> >> >> scene; >> >> >> >> it >> >> >> >> >> > will >> >> >> >> >> > >> > > though >> >> >> >> >> > >> > > >> >> >> change the >> >> >> >> >> > >> > > >> >> >> >>> > behavior of the functions as they are >> no >> >> >> longer >> >> >> >> >> > always >> >> >> >> >> > >> > > lazily >> >> >> >> >> > >> > > >> >> >> evaluated. >> >> >> >> >> > >> > > >> >> >> >>> > >> >> >> >> >> > >> > > >> >> >> >>> > >> >> >> >> >> > >> > > >> >> >> >>> > Guozhang >> >> >> >> >> > >> > > >> >> >> >>> > >> >> >> >> >> > >> > > >> >> >> >>> > >> >> >> >> >> > >> > > >> >> >> >>> > On Tue, Mar 8, 2016 at 2:08 PM, Cody >> >> >> Koeninger < >> >> >> >> >> > >> > > >> >> c...@koeninger.org> >> >> >> >> >> > >> > > >> >> >> >>> wrote: >> >> >> >> >> > >> > > >> >> >> >>> > >> >> >> >> >> > >> > > >> >> >> >>> >> Using the 0.9 consumer, I would like >> to >> >> >> start >> >> >> >> >> > consuming >> >> >> >> >> > >> > at >> >> >> >> >> > >> > > >> the >> >> >> >> >> > >> > > >> >> >> >>> >> beginning or end, without specifying >> >> >> >> >> > auto.offset.reset. >> >> >> >> >> > >> > > >> >> >> >>> >> >> >> >> >> >> > >> > > >> >> >> >>> >> This does not seem to be possible: >> >> >> >> >> > >> > > >> >> >> >>> >> >> >> >> >> >> > >> > > >> >> >> >>> >> val kafkaParams = Map[String, >> >> Object]( >> >> >> >> >> > >> > > >> >> >> >>> >> "bootstrap.servers" -> >> >> >> >> >> > >> > > conf.getString("kafka.brokers"), >> >> >> >> >> > >> > > >> >> >> >>> >> "key.deserializer" -> >> >> >> >> >> > >> classOf[StringDeserializer], >> >> >> >> >> > >> > > >> >> >> >>> >> "value.deserializer" -> >> >> >> >> >> > >> > classOf[StringDeserializer], >> >> >> >> >> > >> > > >> >> >> >>> >> "group.id" -> "example", >> >> >> >> >> > >> > > >> >> >> >>> >> "auto.offset.reset" -> "none" >> >> >> >> >> > >> > > >> >> >> >>> >> ).asJava >> >> >> >> >> > >> > > >> >> >> >>> >> val topics = >> >> >> >> >> > >> > > >> >> >> >> >> >> >> conf.getString("kafka.topics").split(",").toList.asJava >> >> >> >> >> > >> > > >> >> >> >>> >> val consumer = new >> >> KafkaConsumer[String, >> >> >> >> >> > >> > > >> String](kafkaParams) >> >> >> >> >> > >> > > >> >> >> >>> >> consumer.subscribe(topics) >> >> >> >> >> > >> > > >> >> >> >>> >> consumer.seekToBeginning() >> >> >> >> >> > >> > > >> >> >> >>> >> consumer.poll(0) >> >> >> >> >> > >> > > >> >> >> >>> >> >> >> >> >> >> > >> > > >> >> >> >>> >> >> >> >> >> >> > >> > > >> >> >> >>> >> Results in: >> >> >> >> >> > >> > > >> >> >> >>> >> >> >> >> >> >> > >> > > >> >> >> >>> >> Exception in thread "main" >> >> >> >> >> > >> > > >> >> >> >>> >> >> >> >> >> >> > >> > > >> >> >> >> >> >> >> org.apache.kafka.clients.consumer.NoOffsetForPartitionException: >> >> >> >> >> > >> > > >> >> >> >>> >> Undefined offset with no reset policy >> for >> >> >> >> >> partition: >> >> >> >> >> > >> > > >> testtwo-4 >> >> >> >> >> > >> > > >> >> >> >>> >> at >> >> >> >> >> > >> > > >> >> >> >>> >> >> >> >> >> >> > >> > > >> >> >> >>> >> >> >> >> >> > >> > > >> >> >> >> >> >> >> >> > >> > > >> >> >> >> >> >> >> > >> > > >> >> >> >> >> >> > >> > > >> >> >> >> >> > >> > >> >> >> >> >> > >> >> >> >> >> >> > >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:288) >> >> >> >> >> > >> > > >> >> >> >>> >> at >> >> >> >> >> > >> > > >> >> >> >>> >> >> >> >> >> >> > >> > > >> >> >> >>> >> >> >> >> >> > >> > > >> >> >> >> >> >> >> >> > >> > > >> >> >> >> >> >> >> > >> > > >> >> >> >> >> >> > >> > > >> >> >> >> >> > >> > >> >> >> >> >> > >> >> >> >> >> >> > >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> org.apache.kafka.clients.consumer.internals.Fetcher.updateFetchPositions(Fetcher.java:167) >> >> >> >> >> > >> > > >> >> >> >>> >> at >> >> >> >> >> > >> > > >> >> >> >>> >> >> >> >> >> >> > >> > > >> >> >> >>> >> >> >> >> >> > >> > > >> >> >> >> >> >> >> >> > >> > > >> >> >> >> >> >> >> > >> > > >> >> >> >> >> >> > >> > > >> >> >> >> >> > >> > >> >> >> >> >> > >> >> >> >> >> >> > >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1302) >> >> >> >> >> > >> > > >> >> >> >>> >> at >> >> >> >> >> > >> > > >> >> >> >>> >> >> >> >> >> >> > >> > > >> >> >> >>> >> >> >> >> >> > >> > > >> >> >> >> >> >> >> >> > >> > > >> >> >> >> >> >> >> > >> > > >> >> >> >> >> >> > >> > > >> >> >> >> >> > >> > >> >> >> >> >> > >> >> >> >> >> >> > >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:895) >> >> >> >> >> > >> > > >> >> >> >>> >> at >> >> >> >> >> > >> > > >> >> >> >>> >> >> >> >> >> >> > >> > > >> >> >> >>> >> >> >> >> >> > >> > > >> >> >> >> >> >> >> >> > >> > > >> >> >> >> >> >> >> > >> > > >> >> >> >> >> >> > >> > > >> >> >> >> >> > >> > >> >> >> >> >> > >> >> >> >> >> >> > >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853) >> >> >> >> >> > >> > > >> >> >> >>> >> at >> >> >> >> >> > >> > > >> >> >> >> >> >> >> >> > example.BasicKafkaConsumer$.main(BasicKafkaConsumer.scala:25) >> >> >> >> >> > >> > > >> >> >> >>> >> >> >> >> >> >> > >> > > >> >> >> >>> >> >> >> >> >> >> > >> > > >> >> >> >>> >> I'm assuming this is because, at the >> time >> >> >> >> >> > >> > seekToBeginning() >> >> >> >> >> > >> > > >> is >> >> >> >> >> > >> > > >> >> >> called, >> >> >> >> >> > >> > > >> >> >> >>> >> subscriptions.assignedPartitions isn't >> >> >> >> populated. >> >> >> >> >> > But >> >> >> >> >> > >> > > >> polling in >> >> >> >> >> > >> > > >> >> >> >>> >> order to assign topicpartitions >> results >> >> in >> >> >> an >> >> >> >> >> error, >> >> >> >> >> > >> > which >> >> >> >> >> > >> > > >> >> creates a >> >> >> >> >> > >> > > >> >> >> >>> >> chicken-or-the-egg situation. >> >> >> >> >> > >> > > >> >> >> >>> >> >> >> >> >> >> > >> > > >> >> >> >>> >> I don't want to set auto.offset.reset, >> >> >> because >> >> >> >> I >> >> >> >> >> > want a >> >> >> >> >> > >> > > hard >> >> >> >> >> > >> > > >> >> error >> >> >> >> >> > >> > > >> >> >> if >> >> >> >> >> > >> > > >> >> >> >>> >> the offsets are out of range at any >> other >> >> >> time >> >> >> >> >> > during >> >> >> >> >> > >> > > >> >> consumption. >> >> >> >> >> > >> > > >> >> >> >>> >> >> >> >> >> >> > >> > > >> >> >> >>> > >> >> >> >> >> > >> > > >> >> >> >>> > >> >> >> >> >> > >> > > >> >> >> >>> > >> >> >> >> >> > >> > > >> >> >> >>> > -- >> >> >> >> >> > >> > > >> >> >> >>> > -- Guozhang >> >> >> >> >> > >> > > >> >> >> >>> >> >> >> >> >> > >> > > >> >> >> >> >> >> >> >> > >> > > >> >> > >> >> >> >> >> > >> > > >> >> > >> >> >> >> >> > >> > > >> >> > >> >> >> >> >> > >> > > >> >> > -- >> >> >> >> >> > >> > > >> >> > -- Guozhang >> >> >> >> >> > >> > > >> >> >> >> >> >> >> > >> > > >> > >> >> >> >> >> > >> > > >> > >> >> >> >> >> > >> > > >> > >> >> >> >> >> > >> > > >> > -- >> >> >> >> >> > >> > > >> > -- Guozhang >> >> >> >> >> > >> > > >> >> >> >> >> >> > >> > > > >> >> >> >> >> > >> > > > >> >> >> >> >> > >> > > > >> >> >> >> >> > >> > > > -- >> >> >> >> >> > >> > > > -- Guozhang >> >> >> >> >> > >> > > > >> >> >> >> >> > >> > > >> >> >> >> >> > >> > > >> >> >> >> >> > >> > > >> >> >> >> >> > >> > > -- >> >> >> >> >> > >> > > -- Guozhang >> >> >> >> >> > >> > > >> >> >> >> >> > >> > >> >> >> >> >> > >> >> >> >> >> >> > >> >> >> >> >> >> > >> >> >> >> >> >> > >> -- >> >> >> >> >> > >> -- Guozhang >> >> >> >> >> > >> >> >> >> >> >> > >> >> >> >> >> >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > -- >> >> >> >> > -- Guozhang >> >> >> >> >> >> >> > >> >> >> > >> >> >> > >> >> >> > -- >> >> >> > -- Guozhang >> >> >> >> >> >> >>