Cody: More specifically, you do not need the "listTopics" function if you already know your subscribed topics, just use "partitionsFor" is sufficient.
About the fix, I'm thinking of adding two more options in the auto.offset.rest, say namely "earliest-on-start" and "latest-on-start", which sets the reset position ONLY at starting up. The reason is that the seekToXX was actually not designed to do such initialization but for calling during the lifetime of the consumer, and we'd better provide the right solution to do so. I can file the JIRA right away and start further discussions there. But let me know if you have any other ideas. Guozhang On Wed, Mar 9, 2016 at 12:25 PM, Cody Koeninger <c...@koeninger.org> wrote: > Yeah, I think I understood what you were saying. What I'm saying is > that if there were a way to just fetch metadata without doing the rest > of the work poll() does, it wouldn't be necessary. I guess I can do > listTopics to get all metadata for all topics and then parse it. > > Regarding running a single instance, that is the case for what I'm > talking about. > > On Wed, Mar 9, 2016 at 2:02 PM, Guozhang Wang <wangg...@gmail.com> wrote: > > Cody, > > > > What I meant for a special case of `seekToXX` is that, today when the > > function is called with no partition parameters. It will try to execute > the > > logic on all "assigned" partitions for the consumer. And once that is > done, > > the subsequent poll() will not throw the exception since it knows those > > partitions needs to reset offsets. > > > > However for your case, there is no assigned partitions yet, and hence > > `seekToXX` will not take effects on any partitions. The assignment is > > wrapped in the poll() call as you mentioned. And one way to solve it is > to > > let the `seekToXX()` with no parameters do the coordination and get the > > assigned partitions if there are any subscribed topics, so that the > > subsequent poll() will know those partitions need resetting offsets. Does > > that make sense? > > > > As for now another way I can think of is to get the partition info > > beforehand and call `seekToBeginning` on all partitions. But that only > > works if the consumer knows it is going to get all the partitions > assigned > > to itself (i.e. you are only running a single instance). > > > > Guozhang > > > > > > On Wed, Mar 9, 2016 at 6:22 AM, Cody Koeninger <c...@koeninger.org> > wrote: > > > >> Another unfortunate thing about ConsumerRebalanceListener is that in > >> order to do meaningful work in the callback, you need a reference to > >> the consumer that called it. But that reference isn't provided to the > >> callback, which means the listener implementation needs to hold a > >> reference to the consumer. Seems like this makes it unnecessarily > >> awkward to serialize or provide a 0 arg constructor for the listener. > >> > >> On Wed, Mar 9, 2016 at 7:28 AM, Cody Koeninger <c...@koeninger.org> > wrote: > >> > I thought about ConsumerRebalanceListener, but seeking to the > >> > beginning any time there's a rebalance for whatever reason is not > >> > necessarily the same thing as seeking to the beginning before first > >> > starting the consumer. > >> > > >> > On Wed, Mar 9, 2016 at 2:24 AM, Kamal C <kamaltar...@gmail.com> > wrote: > >> >> Cody, > >> >> > >> >> Use ConsumerRebalanceListener to achieve that, > >> >> > >> >> ConsumerRebalanceListener listener = new ConsumerRebalanceListener() > { > >> >> > >> >> @Override > >> >> public void > onPartitionsRevoked(Collection<TopicPartition> > >> >> partitions) { > >> >> } > >> >> > >> >> @Override > >> >> public void > onPartitionsAssigned(Collection<TopicPartition> > >> >> partitions) { > >> >> consumer.seekToBeginning(partitions.toArray(new > >> >> TopicPartition[0])); > >> >> } > >> >> }; > >> >> > >> >> consumer.subscribe(topics, listener); > >> >> > >> >> On Wed, Mar 9, 2016 at 12:05 PM, Cody Koeninger <c...@koeninger.org> > >> wrote: > >> >> > >> >>> That suggestion doesn't work, for pretty much the same reason - at > the > >> >>> time poll is first called, there is no reset policy and no committed > >> >>> offset, so NoOffsetForPartitionException is thrown > >> >>> > >> >>> I feel like the underlying problem isn't so much that seekToEnd > needs > >> >>> special case behavior. It's more that topic metadata fetches, > >> >>> consumer position fetches, and message fetches are all lumped > together > >> >>> under a single poll() call, with no way to do them individually if > >> >>> necessary. > >> >>> > >> >>> What does "work" in this situation is to just catch the exception > >> >>> (which leaves the consumer in a state where topics are assigned) and > >> >>> then seek. But that is not exactly an elegant interface. > >> >>> > >> >>> consumer.subscribe(topics) > >> >>> try { > >> >>> consumer.poll(0) > >> >>> } catch { > >> >>> case x: Throwable => > >> >>> } > >> >>> consumer.seekToBeginning() > >> >>> consumer.poll(0) > >> >>> > >> >>> > >> >>> > >> >>> > >> >>> On Tue, Mar 8, 2016 at 11:22 PM, Guozhang Wang <wangg...@gmail.com> > >> wrote: > >> >>> > Hi Cody, > >> >>> > > >> >>> > The problem with that code is in `seekToBeginning()` followed by > >> >>> > `subscribe(topic)`. > >> >>> > > >> >>> > Since `subscribe` call is lazy evaluated, by the time > >> `seekToBeginning()` > >> >>> > is called no partition is assigned yet, and hence it is > effectively > >> an > >> >>> > no-op. > >> >>> > > >> >>> > Try > >> >>> > > >> >>> > consumer.subscribe(topics) > >> >>> > consumer.poll(0); // get assigned partitions > >> >>> > consumer.seekToBeginning() > >> >>> > consumer.poll(0) > >> >>> > > >> >>> > to see if that works. > >> >>> > > >> >>> > I think it is a valid issue that can be fixed in the new consumer > >> that, > >> >>> > upon calling seekToEnd/Beginning with no parameter, while no > >> assigned is > >> >>> > done yet, do the coordination behind the scene; it will though > >> change the > >> >>> > behavior of the functions as they are no longer always lazily > >> evaluated. > >> >>> > > >> >>> > > >> >>> > Guozhang > >> >>> > > >> >>> > > >> >>> > On Tue, Mar 8, 2016 at 2:08 PM, Cody Koeninger < > c...@koeninger.org> > >> >>> wrote: > >> >>> > > >> >>> >> Using the 0.9 consumer, I would like to start consuming at the > >> >>> >> beginning or end, without specifying auto.offset.reset. > >> >>> >> > >> >>> >> This does not seem to be possible: > >> >>> >> > >> >>> >> val kafkaParams = Map[String, Object]( > >> >>> >> "bootstrap.servers" -> conf.getString("kafka.brokers"), > >> >>> >> "key.deserializer" -> classOf[StringDeserializer], > >> >>> >> "value.deserializer" -> classOf[StringDeserializer], > >> >>> >> "group.id" -> "example", > >> >>> >> "auto.offset.reset" -> "none" > >> >>> >> ).asJava > >> >>> >> val topics = > >> conf.getString("kafka.topics").split(",").toList.asJava > >> >>> >> val consumer = new KafkaConsumer[String, String](kafkaParams) > >> >>> >> consumer.subscribe(topics) > >> >>> >> consumer.seekToBeginning() > >> >>> >> consumer.poll(0) > >> >>> >> > >> >>> >> > >> >>> >> Results in: > >> >>> >> > >> >>> >> Exception in thread "main" > >> >>> >> org.apache.kafka.clients.consumer.NoOffsetForPartitionException: > >> >>> >> Undefined offset with no reset policy for partition: testtwo-4 > >> >>> >> at > >> >>> >> > >> >>> > >> > org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:288) > >> >>> >> at > >> >>> >> > >> >>> > >> > org.apache.kafka.clients.consumer.internals.Fetcher.updateFetchPositions(Fetcher.java:167) > >> >>> >> at > >> >>> >> > >> >>> > >> > org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1302) > >> >>> >> at > >> >>> >> > >> >>> > >> > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:895) > >> >>> >> at > >> >>> >> > >> >>> > >> > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853) > >> >>> >> at > >> example.BasicKafkaConsumer$.main(BasicKafkaConsumer.scala:25) > >> >>> >> > >> >>> >> > >> >>> >> I'm assuming this is because, at the time seekToBeginning() is > >> called, > >> >>> >> subscriptions.assignedPartitions isn't populated. But polling in > >> >>> >> order to assign topicpartitions results in an error, which > creates a > >> >>> >> chicken-or-the-egg situation. > >> >>> >> > >> >>> >> I don't want to set auto.offset.reset, because I want a hard > error > >> if > >> >>> >> the offsets are out of range at any other time during > consumption. > >> >>> >> > >> >>> > > >> >>> > > >> >>> > > >> >>> > -- > >> >>> > -- Guozhang > >> >>> > >> > > > > > > > > -- > > -- Guozhang > -- -- Guozhang