Honestly the fact that everything is hidden inside poll() has been confusing people since last year, e.g.
https://issues.apache.org/jira/browse/KAFKA-2359 I can try to formulate a KIP for this, but it seems clear that I'm not the only one giving this feedback, and I may not understand all the other use cases that have been brought up. On Sun, Mar 13, 2016 at 3:09 PM, Guozhang Wang <wangg...@gmail.com> wrote: > Cody, > > We do not have an umbrella JIRA for this, but rather a case-by-case JIRA > ticket / KIP for API changes in consumer. > > If you feel strong about some specific change on the consumer API, please > feel free to create a new KIP with the detailed motivation and proposed > modifications. > > Guozhang > > On Fri, Mar 11, 2016 at 12:28 PM, Cody Koeninger <c...@koeninger.org> wrote: > >> Is there a KIP or Jira related to " working on improving these cases >> with improved APIs " ? >> >> I saw that there was some discussion of it in KIP-41, but that seemed >> to have been resolved in favor of keeping everything inside of poll() >> >> On Fri, Mar 11, 2016 at 11:17 AM, Guozhang Wang <wangg...@gmail.com> >> wrote: >> > Cody, Mansi: >> > >> > All good points! Let me try to answer them one-by-one. >> > >> > About this specific issue, as I suggested in the JIRA we can separate the >> > case about resetting offset upon initializing a partition to fetch, from >> > the case that fetching offset out-of-range in the auto.offset.reset >> config. >> > These two scenarios are indeed quite different and it's reasonable >> treating >> > them differently. >> > >> > About passing a consumer context to the rebalance callback's constructor, >> > we left it for user's flexibility: if you want to use Kafka to commit >> > offsets, for example, then you pass the consumer reference to the >> callback; >> > if you use an external service to store offsets, you can pass a JDBC >> > connector, for example, to the callback; for some data mirroring you can >> > even pass in another producer client into it. Always enforcing the >> consumer >> > context could be convenient (i.e. you do not need to pass in the argument >> > to the constructor yourself) for some use cases, but not necessarily all. >> > >> > About wrapping coordination protocols (partition assignment, heartbeat) >> > inside "poll()" behind the scene, we implemented the APIs in this way in >> > order to abstract the underlying details from the users, and also to >> > provide a simple "single-thread-poll-loop" design pattern in the new >> > Consumer. We realized that it does actually make some of the use cases >> more >> > awkward, and are working on improving these cases with improved APIs as >> > well. Let us know if you have any suggestions about this. >> > >> > Guozhang >> > >> > >> > >> > >> > >> > >> > On Thu, Mar 10, 2016 at 7:53 AM, Mansi Shah <mansis...@maprtech.com> >> wrote: >> > >> >> I second the need for having a consumer context passed to rebalance >> >> callback. I have ran into issues several times because of that. >> >> >> >> About - subscribe vs assign - I have not read through your spark code >> yet >> >> (will do by eod), so I am not sure what you mean (other than I do agree >> >> that new partitions should be consumed automatically). I guess we can >> >> continue this discussion on the spark list then :-) >> >> >> >> Thanks >> >> Mansi. >> >> >> >> On Thu, Mar 10, 2016 at 7:43 AM, Cody Koeninger <c...@koeninger.org> >> >> wrote: >> >> >> >> > Mansi, I'd agree that the fact that everything is tied up in poll >> >> > seems like the source of the awkward behavior. >> >> > >> >> > Regarding assign vs subscribe, most people using the spark integration >> >> > are just going to want to provide a topic name, not go figure out a >> >> > bunch of partitions. They're also going to be surprised if things >> >> > suddenly blow up once a partition is added, or that partition doesn't >> >> > start being consumed (we already have that second issue today). >> >> > >> >> > Thats why separating the behavior of auto offset reset seems like the >> >> > best idea I've heard so far. >> >> > >> >> > Consumer rebalance listeners are still probably going to be necessary >> >> > for people who are storing offsets externally. >> >> > >> >> > On Thu, Mar 10, 2016 at 9:27 AM, Mansi Shah <mansis...@maprtech.com> >> >> > wrote: >> >> > > Guozhang >> >> > > >> >> > > Sorry for joining the party a little late. I have been thinking >> about >> >> > this >> >> > > whole awkward behavior of having to call poll(0) to actually make >> the >> >> > > underlying subscriptions take effect. Is the core reason for this >> >> design >> >> > > the fact that poll is also the actual heartbeat and you want to make >> >> the >> >> > > listener group assignments through poll - so that timeouts and >> >> > > reassignments can all go through poll? So I think clubbing liveness >> >> with >> >> > > poll (which in effect clubs consumer group assignments and hence >> >> metadata >> >> > > fetch with poll) is the real cause of this design. Were there issues >> >> > where >> >> > > you were seeing active consumers not calling poll that led to this >> >> design >> >> > > choice? I tried to look for a relevant JIRA but could not find one - >> >> can >> >> > > you please point me to something if you have it handy? >> >> > > >> >> > > Btw this would also means that your proposal to do the actual >> >> assignments >> >> > > through seek might not be ideal since there can still be indefinite >> >> time >> >> > > between seek and poll (just like between subscribe and poll) and the >> >> > > consumer could timeout even before the first poll is called? >> >> > > >> >> > > >> >> > > @Cody in your case if you really have only one consumer and it is >> going >> >> > to >> >> > > get all the partitions of the topic anyway - then you might as well >> >> > > subscribe using "assign" call instead of "subscribe" call. That will >> >> make >> >> > > at least your code cleaner and I do not think you are gaining >> anything >> >> > with >> >> > > the listener group functionality anyway? >> >> > > >> >> > > - Mansi. >> >> > > >> >> > > >> >> > > >> >> > > On Wed, Mar 9, 2016 at 8:35 PM, Guozhang Wang <wangg...@gmail.com> >> >> > wrote: >> >> > > >> >> > >> In order to do anything meaningful with the consumer itself in >> >> rebalance >> >> > >> callback (e.g. commit offset), you would need to hold on the >> consumer >> >> > >> reference; admittedly it sounds a bit awkward, but by design we >> choose >> >> > to >> >> > >> not enforce it in the interface itself. >> >> > >> >> >> > >> Guozhang >> >> > >> >> >> > >> On Wed, Mar 9, 2016 at 3:39 PM, Cody Koeninger <c...@koeninger.org >> > >> >> > wrote: >> >> > >> >> >> > >> > So what about my comments regarding the consumer rebalance >> listener >> >> > >> > interface not providing access to a consumer? I can probably >> work >> >> > around >> >> > >> > it, but it seems odd. >> >> > >> > On Mar 9, 2016 5:04 PM, "Guozhang Wang" <wangg...@gmail.com> >> wrote: >> >> > >> > >> >> > >> > > One thing proposed by Jason: >> >> > >> > > >> >> > >> > > If you want to only reset offset upon initialization, and by >> >> > >> > initialization >> >> > >> > > you mean "no committed offset", you can do sth. like the >> following >> >> > in >> >> > >> > > rebalance callback. >> >> > >> > > >> >> > >> > > @Override >> >> > >> > > >> >> > >> > > public void >> >> > >> > onPartitionsAssigned(Collection<TopicPartition> >> >> > >> > > partitions) { >> >> > >> > > >> >> > >> > > for (TopicPartition partition : partitions) >> >> > >> > > >> >> > >> > > if (consumer.committed(partition) == >> null) >> >> > >> > > >> >> > >> > > >> consumer.seekToBeginning(partition); >> >> > >> > > >> >> > >> > > } >> >> > >> > > >> >> > >> > > >> >> > >> > > Guozhang >> >> > >> > > >> >> > >> > > On Wed, Mar 9, 2016 at 2:11 PM, Guozhang Wang < >> wangg...@gmail.com >> >> > >> >> > >> > wrote: >> >> > >> > > >> >> > >> > > > Filed https://issues.apache.org/jira/browse/KAFKA-3370. >> >> > >> > > > >> >> > >> > > > On Wed, Mar 9, 2016 at 1:11 PM, Cody Koeninger < >> >> > c...@koeninger.org> >> >> > >> > > wrote: >> >> > >> > > > >> >> > >> > > >> That sounds like an interesting way of addressing the >> problem, >> >> > can >> >> > >> > > >> continue further discussions on the JIRA >> >> > >> > > >> >> >> > >> > > >> >> >> > >> > > >> >> >> > >> > > >> On Wed, Mar 9, 2016 at 2:59 PM, Guozhang Wang < >> >> > wangg...@gmail.com> >> >> > >> > > wrote: >> >> > >> > > >> > Cody: >> >> > >> > > >> > >> >> > >> > > >> > More specifically, you do not need the "listTopics" >> function >> >> if >> >> > >> you >> >> > >> > > >> already >> >> > >> > > >> > know your subscribed topics, just use "partitionsFor" is >> >> > >> sufficient. >> >> > >> > > >> > >> >> > >> > > >> > About the fix, I'm thinking of adding two more options in >> the >> >> > >> > > >> > auto.offset.rest, say namely "earliest-on-start" and >> >> > >> > > "latest-on-start", >> >> > >> > > >> > which sets the reset position ONLY at starting up. The >> reason >> >> > is >> >> > >> > that >> >> > >> > > >> the >> >> > >> > > >> > seekToXX was actually not designed to do such >> initialization >> >> > but >> >> > >> for >> >> > >> > > >> > calling during the lifetime of the consumer, and we'd >> better >> >> > >> provide >> >> > >> > > the >> >> > >> > > >> > right solution to do so. >> >> > >> > > >> > >> >> > >> > > >> > I can file the JIRA right away and start further >> discussions >> >> > >> there. >> >> > >> > > But >> >> > >> > > >> let >> >> > >> > > >> > me know if you have any other ideas. >> >> > >> > > >> > >> >> > >> > > >> > Guozhang >> >> > >> > > >> > >> >> > >> > > >> > On Wed, Mar 9, 2016 at 12:25 PM, Cody Koeninger < >> >> > >> c...@koeninger.org >> >> > >> > > >> >> > >> > > >> wrote: >> >> > >> > > >> > >> >> > >> > > >> >> Yeah, I think I understood what you were saying. What >> I'm >> >> > saying >> >> > >> > is >> >> > >> > > >> >> that if there were a way to just fetch metadata without >> >> doing >> >> > the >> >> > >> > > rest >> >> > >> > > >> >> of the work poll() does, it wouldn't be necessary. I >> guess >> >> I >> >> > can >> >> > >> > do >> >> > >> > > >> >> listTopics to get all metadata for all topics and then >> parse >> >> > it. >> >> > >> > > >> >> >> >> > >> > > >> >> Regarding running a single instance, that is the case for >> >> what >> >> > >> I'm >> >> > >> > > >> >> talking about. >> >> > >> > > >> >> >> >> > >> > > >> >> On Wed, Mar 9, 2016 at 2:02 PM, Guozhang Wang < >> >> > >> wangg...@gmail.com> >> >> > >> > > >> wrote: >> >> > >> > > >> >> > Cody, >> >> > >> > > >> >> > >> >> > >> > > >> >> > What I meant for a special case of `seekToXX` is that, >> >> today >> >> > >> when >> >> > >> > > the >> >> > >> > > >> >> > function is called with no partition parameters. It >> will >> >> > try to >> >> > >> > > >> execute >> >> > >> > > >> >> the >> >> > >> > > >> >> > logic on all "assigned" partitions for the consumer. >> And >> >> > once >> >> > >> > that >> >> > >> > > is >> >> > >> > > >> >> done, >> >> > >> > > >> >> > the subsequent poll() will not throw the exception >> since >> >> it >> >> > >> knows >> >> > >> > > >> those >> >> > >> > > >> >> > partitions needs to reset offsets. >> >> > >> > > >> >> > >> >> > >> > > >> >> > However for your case, there is no assigned partitions >> >> yet, >> >> > and >> >> > >> > > hence >> >> > >> > > >> >> > `seekToXX` will not take effects on any partitions. The >> >> > >> > assignment >> >> > >> > > is >> >> > >> > > >> >> > wrapped in the poll() call as you mentioned. And one >> way >> >> to >> >> > >> solve >> >> > >> > > it >> >> > >> > > >> is >> >> > >> > > >> >> to >> >> > >> > > >> >> > let the `seekToXX()` with no parameters do the >> >> coordination >> >> > and >> >> > >> > get >> >> > >> > > >> the >> >> > >> > > >> >> > assigned partitions if there are any subscribed >> topics, so >> >> > that >> >> > >> > the >> >> > >> > > >> >> > subsequent poll() will know those partitions need >> >> resetting >> >> > >> > > offsets. >> >> > >> > > >> Does >> >> > >> > > >> >> > that make sense? >> >> > >> > > >> >> > >> >> > >> > > >> >> > As for now another way I can think of is to get the >> >> > partition >> >> > >> > info >> >> > >> > > >> >> > beforehand and call `seekToBeginning` on all >> partitions. >> >> But >> >> > >> that >> >> > >> > > >> only >> >> > >> > > >> >> > works if the consumer knows it is going to get all the >> >> > >> partitions >> >> > >> > > >> >> assigned >> >> > >> > > >> >> > to itself (i.e. you are only running a single >> instance). >> >> > >> > > >> >> > >> >> > >> > > >> >> > Guozhang >> >> > >> > > >> >> > >> >> > >> > > >> >> > >> >> > >> > > >> >> > On Wed, Mar 9, 2016 at 6:22 AM, Cody Koeninger < >> >> > >> > c...@koeninger.org >> >> > >> > > > >> >> > >> > > >> >> wrote: >> >> > >> > > >> >> > >> >> > >> > > >> >> >> Another unfortunate thing about >> ConsumerRebalanceListener >> >> > is >> >> > >> > that >> >> > >> > > in >> >> > >> > > >> >> >> order to do meaningful work in the callback, you need >> a >> >> > >> > reference >> >> > >> > > to >> >> > >> > > >> >> >> the consumer that called it. But that reference isn't >> >> > >> provided >> >> > >> > to >> >> > >> > > >> the >> >> > >> > > >> >> >> callback, which means the listener implementation >> needs >> >> to >> >> > >> hold >> >> > >> > a >> >> > >> > > >> >> >> reference to the consumer. Seems like this makes it >> >> > >> > unnecessarily >> >> > >> > > >> >> >> awkward to serialize or provide a 0 arg constructor >> for >> >> the >> >> > >> > > >> listener. >> >> > >> > > >> >> >> >> >> > >> > > >> >> >> On Wed, Mar 9, 2016 at 7:28 AM, Cody Koeninger < >> >> > >> > > c...@koeninger.org> >> >> > >> > > >> >> wrote: >> >> > >> > > >> >> >> > I thought about ConsumerRebalanceListener, but >> seeking >> >> to >> >> > >> the >> >> > >> > > >> >> >> > beginning any time there's a rebalance for whatever >> >> > reason >> >> > >> is >> >> > >> > > not >> >> > >> > > >> >> >> > necessarily the same thing as seeking to the >> beginning >> >> > >> before >> >> > >> > > >> first >> >> > >> > > >> >> >> > starting the consumer. >> >> > >> > > >> >> >> > >> >> > >> > > >> >> >> > On Wed, Mar 9, 2016 at 2:24 AM, Kamal C < >> >> > >> > kamaltar...@gmail.com> >> >> > >> > > >> >> wrote: >> >> > >> > > >> >> >> >> Cody, >> >> > >> > > >> >> >> >> >> >> > >> > > >> >> >> >> Use ConsumerRebalanceListener to achieve that, >> >> > >> > > >> >> >> >> >> >> > >> > > >> >> >> >> ConsumerRebalanceListener listener = new >> >> > >> > > >> ConsumerRebalanceListener() >> >> > >> > > >> >> { >> >> > >> > > >> >> >> >> >> >> > >> > > >> >> >> >> @Override >> >> > >> > > >> >> >> >> public void >> >> > >> > > >> >> onPartitionsRevoked(Collection<TopicPartition> >> >> > >> > > >> >> >> >> partitions) { >> >> > >> > > >> >> >> >> } >> >> > >> > > >> >> >> >> >> >> > >> > > >> >> >> >> @Override >> >> > >> > > >> >> >> >> public void >> >> > >> > > >> >> onPartitionsAssigned(Collection<TopicPartition> >> >> > >> > > >> >> >> >> partitions) { >> >> > >> > > >> >> >> >> >> >> > >> > consumer.seekToBeginning(partitions.toArray(new >> >> > >> > > >> >> >> >> TopicPartition[0])); >> >> > >> > > >> >> >> >> } >> >> > >> > > >> >> >> >> }; >> >> > >> > > >> >> >> >> >> >> > >> > > >> >> >> >> consumer.subscribe(topics, listener); >> >> > >> > > >> >> >> >> >> >> > >> > > >> >> >> >> On Wed, Mar 9, 2016 at 12:05 PM, Cody Koeninger < >> >> > >> > > >> c...@koeninger.org> >> >> > >> > > >> >> >> wrote: >> >> > >> > > >> >> >> >> >> >> > >> > > >> >> >> >>> That suggestion doesn't work, for pretty much the >> >> same >> >> > >> > reason >> >> > >> > > - >> >> > >> > > >> at >> >> > >> > > >> >> the >> >> > >> > > >> >> >> >>> time poll is first called, there is no reset >> policy >> >> > and no >> >> > >> > > >> committed >> >> > >> > > >> >> >> >>> offset, so NoOffsetForPartitionException is thrown >> >> > >> > > >> >> >> >>> >> >> > >> > > >> >> >> >>> I feel like the underlying problem isn't so much >> that >> >> > >> > > seekToEnd >> >> > >> > > >> >> needs >> >> > >> > > >> >> >> >>> special case behavior. It's more that topic >> >> metadata >> >> > >> > > fetches, >> >> > >> > > >> >> >> >>> consumer position fetches, and message fetches are >> >> all >> >> > >> > lumped >> >> > >> > > >> >> together >> >> > >> > > >> >> >> >>> under a single poll() call, with no way to do them >> >> > >> > > individually >> >> > >> > > >> if >> >> > >> > > >> >> >> >>> necessary. >> >> > >> > > >> >> >> >>> >> >> > >> > > >> >> >> >>> What does "work" in this situation is to just >> catch >> >> the >> >> > >> > > >> exception >> >> > >> > > >> >> >> >>> (which leaves the consumer in a state where topics >> >> are >> >> > >> > > >> assigned) and >> >> > >> > > >> >> >> >>> then seek. But that is not exactly an elegant >> >> > interface. >> >> > >> > > >> >> >> >>> >> >> > >> > > >> >> >> >>> consumer.subscribe(topics) >> >> > >> > > >> >> >> >>> try { >> >> > >> > > >> >> >> >>> consumer.poll(0) >> >> > >> > > >> >> >> >>> } catch { >> >> > >> > > >> >> >> >>> case x: Throwable => >> >> > >> > > >> >> >> >>> } >> >> > >> > > >> >> >> >>> consumer.seekToBeginning() >> >> > >> > > >> >> >> >>> consumer.poll(0) >> >> > >> > > >> >> >> >>> >> >> > >> > > >> >> >> >>> >> >> > >> > > >> >> >> >>> >> >> > >> > > >> >> >> >>> >> >> > >> > > >> >> >> >>> On Tue, Mar 8, 2016 at 11:22 PM, Guozhang Wang < >> >> > >> > > >> wangg...@gmail.com> >> >> > >> > > >> >> >> wrote: >> >> > >> > > >> >> >> >>> > Hi Cody, >> >> > >> > > >> >> >> >>> > >> >> > >> > > >> >> >> >>> > The problem with that code is in >> >> `seekToBeginning()` >> >> > >> > > followed >> >> > >> > > >> by >> >> > >> > > >> >> >> >>> > `subscribe(topic)`. >> >> > >> > > >> >> >> >>> > >> >> > >> > > >> >> >> >>> > Since `subscribe` call is lazy evaluated, by the >> >> time >> >> > >> > > >> >> >> `seekToBeginning()` >> >> > >> > > >> >> >> >>> > is called no partition is assigned yet, and >> hence >> >> it >> >> > is >> >> > >> > > >> >> effectively >> >> > >> > > >> >> >> an >> >> > >> > > >> >> >> >>> > no-op. >> >> > >> > > >> >> >> >>> > >> >> > >> > > >> >> >> >>> > Try >> >> > >> > > >> >> >> >>> > >> >> > >> > > >> >> >> >>> > consumer.subscribe(topics) >> >> > >> > > >> >> >> >>> > consumer.poll(0); // get assigned >> partitions >> >> > >> > > >> >> >> >>> > consumer.seekToBeginning() >> >> > >> > > >> >> >> >>> > consumer.poll(0) >> >> > >> > > >> >> >> >>> > >> >> > >> > > >> >> >> >>> > to see if that works. >> >> > >> > > >> >> >> >>> > >> >> > >> > > >> >> >> >>> > I think it is a valid issue that can be fixed in >> >> the >> >> > new >> >> > >> > > >> consumer >> >> > >> > > >> >> >> that, >> >> > >> > > >> >> >> >>> > upon calling seekToEnd/Beginning with no >> parameter, >> >> > >> while >> >> > >> > no >> >> > >> > > >> >> >> assigned is >> >> > >> > > >> >> >> >>> > done yet, do the coordination behind the scene; >> it >> >> > will >> >> > >> > > though >> >> > >> > > >> >> >> change the >> >> > >> > > >> >> >> >>> > behavior of the functions as they are no longer >> >> > always >> >> > >> > > lazily >> >> > >> > > >> >> >> evaluated. >> >> > >> > > >> >> >> >>> > >> >> > >> > > >> >> >> >>> > >> >> > >> > > >> >> >> >>> > Guozhang >> >> > >> > > >> >> >> >>> > >> >> > >> > > >> >> >> >>> > >> >> > >> > > >> >> >> >>> > On Tue, Mar 8, 2016 at 2:08 PM, Cody Koeninger < >> >> > >> > > >> >> c...@koeninger.org> >> >> > >> > > >> >> >> >>> wrote: >> >> > >> > > >> >> >> >>> > >> >> > >> > > >> >> >> >>> >> Using the 0.9 consumer, I would like to start >> >> > consuming >> >> > >> > at >> >> > >> > > >> the >> >> > >> > > >> >> >> >>> >> beginning or end, without specifying >> >> > auto.offset.reset. >> >> > >> > > >> >> >> >>> >> >> >> > >> > > >> >> >> >>> >> This does not seem to be possible: >> >> > >> > > >> >> >> >>> >> >> >> > >> > > >> >> >> >>> >> val kafkaParams = Map[String, Object]( >> >> > >> > > >> >> >> >>> >> "bootstrap.servers" -> >> >> > >> > > conf.getString("kafka.brokers"), >> >> > >> > > >> >> >> >>> >> "key.deserializer" -> >> >> > >> classOf[StringDeserializer], >> >> > >> > > >> >> >> >>> >> "value.deserializer" -> >> >> > >> > classOf[StringDeserializer], >> >> > >> > > >> >> >> >>> >> "group.id" -> "example", >> >> > >> > > >> >> >> >>> >> "auto.offset.reset" -> "none" >> >> > >> > > >> >> >> >>> >> ).asJava >> >> > >> > > >> >> >> >>> >> val topics = >> >> > >> > > >> >> >> >> conf.getString("kafka.topics").split(",").toList.asJava >> >> > >> > > >> >> >> >>> >> val consumer = new KafkaConsumer[String, >> >> > >> > > >> String](kafkaParams) >> >> > >> > > >> >> >> >>> >> consumer.subscribe(topics) >> >> > >> > > >> >> >> >>> >> consumer.seekToBeginning() >> >> > >> > > >> >> >> >>> >> consumer.poll(0) >> >> > >> > > >> >> >> >>> >> >> >> > >> > > >> >> >> >>> >> >> >> > >> > > >> >> >> >>> >> Results in: >> >> > >> > > >> >> >> >>> >> >> >> > >> > > >> >> >> >>> >> Exception in thread "main" >> >> > >> > > >> >> >> >>> >> >> >> > >> > > >> >> >> org.apache.kafka.clients.consumer.NoOffsetForPartitionException: >> >> > >> > > >> >> >> >>> >> Undefined offset with no reset policy for >> >> partition: >> >> > >> > > >> testtwo-4 >> >> > >> > > >> >> >> >>> >> at >> >> > >> > > >> >> >> >>> >> >> >> > >> > > >> >> >> >>> >> >> > >> > > >> >> >> >> >> > >> > > >> >> >> >> > >> > > >> >> >> > >> > > >> >> > >> > >> >> > >> >> >> > >> >> >> org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:288) >> >> > >> > > >> >> >> >>> >> at >> >> > >> > > >> >> >> >>> >> >> >> > >> > > >> >> >> >>> >> >> > >> > > >> >> >> >> >> > >> > > >> >> >> >> > >> > > >> >> >> > >> > > >> >> > >> > >> >> > >> >> >> > >> >> >> org.apache.kafka.clients.consumer.internals.Fetcher.updateFetchPositions(Fetcher.java:167) >> >> > >> > > >> >> >> >>> >> at >> >> > >> > > >> >> >> >>> >> >> >> > >> > > >> >> >> >>> >> >> > >> > > >> >> >> >> >> > >> > > >> >> >> >> > >> > > >> >> >> > >> > > >> >> > >> > >> >> > >> >> >> > >> >> >> org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1302) >> >> > >> > > >> >> >> >>> >> at >> >> > >> > > >> >> >> >>> >> >> >> > >> > > >> >> >> >>> >> >> > >> > > >> >> >> >> >> > >> > > >> >> >> >> > >> > > >> >> >> > >> > > >> >> > >> > >> >> > >> >> >> > >> >> >> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:895) >> >> > >> > > >> >> >> >>> >> at >> >> > >> > > >> >> >> >>> >> >> >> > >> > > >> >> >> >>> >> >> > >> > > >> >> >> >> >> > >> > > >> >> >> >> > >> > > >> >> >> > >> > > >> >> > >> > >> >> > >> >> >> > >> >> >> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853) >> >> > >> > > >> >> >> >>> >> at >> >> > >> > > >> >> >> >> >> > example.BasicKafkaConsumer$.main(BasicKafkaConsumer.scala:25) >> >> > >> > > >> >> >> >>> >> >> >> > >> > > >> >> >> >>> >> >> >> > >> > > >> >> >> >>> >> I'm assuming this is because, at the time >> >> > >> > seekToBeginning() >> >> > >> > > >> is >> >> > >> > > >> >> >> called, >> >> > >> > > >> >> >> >>> >> subscriptions.assignedPartitions isn't >> populated. >> >> > But >> >> > >> > > >> polling in >> >> > >> > > >> >> >> >>> >> order to assign topicpartitions results in an >> >> error, >> >> > >> > which >> >> > >> > > >> >> creates a >> >> > >> > > >> >> >> >>> >> chicken-or-the-egg situation. >> >> > >> > > >> >> >> >>> >> >> >> > >> > > >> >> >> >>> >> I don't want to set auto.offset.reset, because >> I >> >> > want a >> >> > >> > > hard >> >> > >> > > >> >> error >> >> > >> > > >> >> >> if >> >> > >> > > >> >> >> >>> >> the offsets are out of range at any other time >> >> > during >> >> > >> > > >> >> consumption. >> >> > >> > > >> >> >> >>> >> >> >> > >> > > >> >> >> >>> > >> >> > >> > > >> >> >> >>> > >> >> > >> > > >> >> >> >>> > >> >> > >> > > >> >> >> >>> > -- >> >> > >> > > >> >> >> >>> > -- Guozhang >> >> > >> > > >> >> >> >>> >> >> > >> > > >> >> >> >> >> > >> > > >> >> > >> >> > >> > > >> >> > >> >> > >> > > >> >> > >> >> > >> > > >> >> > -- >> >> > >> > > >> >> > -- Guozhang >> >> > >> > > >> >> >> >> > >> > > >> > >> >> > >> > > >> > >> >> > >> > > >> > >> >> > >> > > >> > -- >> >> > >> > > >> > -- Guozhang >> >> > >> > > >> >> >> > >> > > > >> >> > >> > > > >> >> > >> > > > >> >> > >> > > > -- >> >> > >> > > > -- Guozhang >> >> > >> > > > >> >> > >> > > >> >> > >> > > >> >> > >> > > >> >> > >> > > -- >> >> > >> > > -- Guozhang >> >> > >> > > >> >> > >> > >> >> > >> >> >> > >> >> >> > >> >> >> > >> -- >> >> > >> -- Guozhang >> >> > >> >> >> > >> >> >> > >> > >> > >> > -- >> > -- Guozhang >> > > > > -- > -- Guozhang