Filed https://issues.apache.org/jira/browse/KAFKA-3370.
On Wed, Mar 9, 2016 at 1:11 PM, Cody Koeninger <c...@koeninger.org> wrote: > That sounds like an interesting way of addressing the problem, can > continue further discussions on the JIRA > > > > On Wed, Mar 9, 2016 at 2:59 PM, Guozhang Wang <wangg...@gmail.com> wrote: > > Cody: > > > > More specifically, you do not need the "listTopics" function if you > already > > know your subscribed topics, just use "partitionsFor" is sufficient. > > > > About the fix, I'm thinking of adding two more options in the > > auto.offset.rest, say namely "earliest-on-start" and "latest-on-start", > > which sets the reset position ONLY at starting up. The reason is that the > > seekToXX was actually not designed to do such initialization but for > > calling during the lifetime of the consumer, and we'd better provide the > > right solution to do so. > > > > I can file the JIRA right away and start further discussions there. But > let > > me know if you have any other ideas. > > > > Guozhang > > > > On Wed, Mar 9, 2016 at 12:25 PM, Cody Koeninger <c...@koeninger.org> > wrote: > > > >> Yeah, I think I understood what you were saying. What I'm saying is > >> that if there were a way to just fetch metadata without doing the rest > >> of the work poll() does, it wouldn't be necessary. I guess I can do > >> listTopics to get all metadata for all topics and then parse it. > >> > >> Regarding running a single instance, that is the case for what I'm > >> talking about. > >> > >> On Wed, Mar 9, 2016 at 2:02 PM, Guozhang Wang <wangg...@gmail.com> > wrote: > >> > Cody, > >> > > >> > What I meant for a special case of `seekToXX` is that, today when the > >> > function is called with no partition parameters. It will try to > execute > >> the > >> > logic on all "assigned" partitions for the consumer. And once that is > >> done, > >> > the subsequent poll() will not throw the exception since it knows > those > >> > partitions needs to reset offsets. > >> > > >> > However for your case, there is no assigned partitions yet, and hence > >> > `seekToXX` will not take effects on any partitions. The assignment is > >> > wrapped in the poll() call as you mentioned. And one way to solve it > is > >> to > >> > let the `seekToXX()` with no parameters do the coordination and get > the > >> > assigned partitions if there are any subscribed topics, so that the > >> > subsequent poll() will know those partitions need resetting offsets. > Does > >> > that make sense? > >> > > >> > As for now another way I can think of is to get the partition info > >> > beforehand and call `seekToBeginning` on all partitions. But that only > >> > works if the consumer knows it is going to get all the partitions > >> assigned > >> > to itself (i.e. you are only running a single instance). > >> > > >> > Guozhang > >> > > >> > > >> > On Wed, Mar 9, 2016 at 6:22 AM, Cody Koeninger <c...@koeninger.org> > >> wrote: > >> > > >> >> Another unfortunate thing about ConsumerRebalanceListener is that in > >> >> order to do meaningful work in the callback, you need a reference to > >> >> the consumer that called it. But that reference isn't provided to > the > >> >> callback, which means the listener implementation needs to hold a > >> >> reference to the consumer. Seems like this makes it unnecessarily > >> >> awkward to serialize or provide a 0 arg constructor for the listener. > >> >> > >> >> On Wed, Mar 9, 2016 at 7:28 AM, Cody Koeninger <c...@koeninger.org> > >> wrote: > >> >> > I thought about ConsumerRebalanceListener, but seeking to the > >> >> > beginning any time there's a rebalance for whatever reason is not > >> >> > necessarily the same thing as seeking to the beginning before first > >> >> > starting the consumer. > >> >> > > >> >> > On Wed, Mar 9, 2016 at 2:24 AM, Kamal C <kamaltar...@gmail.com> > >> wrote: > >> >> >> Cody, > >> >> >> > >> >> >> Use ConsumerRebalanceListener to achieve that, > >> >> >> > >> >> >> ConsumerRebalanceListener listener = new > ConsumerRebalanceListener() > >> { > >> >> >> > >> >> >> @Override > >> >> >> public void > >> onPartitionsRevoked(Collection<TopicPartition> > >> >> >> partitions) { > >> >> >> } > >> >> >> > >> >> >> @Override > >> >> >> public void > >> onPartitionsAssigned(Collection<TopicPartition> > >> >> >> partitions) { > >> >> >> consumer.seekToBeginning(partitions.toArray(new > >> >> >> TopicPartition[0])); > >> >> >> } > >> >> >> }; > >> >> >> > >> >> >> consumer.subscribe(topics, listener); > >> >> >> > >> >> >> On Wed, Mar 9, 2016 at 12:05 PM, Cody Koeninger < > c...@koeninger.org> > >> >> wrote: > >> >> >> > >> >> >>> That suggestion doesn't work, for pretty much the same reason - > at > >> the > >> >> >>> time poll is first called, there is no reset policy and no > committed > >> >> >>> offset, so NoOffsetForPartitionException is thrown > >> >> >>> > >> >> >>> I feel like the underlying problem isn't so much that seekToEnd > >> needs > >> >> >>> special case behavior. It's more that topic metadata fetches, > >> >> >>> consumer position fetches, and message fetches are all lumped > >> together > >> >> >>> under a single poll() call, with no way to do them individually > if > >> >> >>> necessary. > >> >> >>> > >> >> >>> What does "work" in this situation is to just catch the exception > >> >> >>> (which leaves the consumer in a state where topics are assigned) > and > >> >> >>> then seek. But that is not exactly an elegant interface. > >> >> >>> > >> >> >>> consumer.subscribe(topics) > >> >> >>> try { > >> >> >>> consumer.poll(0) > >> >> >>> } catch { > >> >> >>> case x: Throwable => > >> >> >>> } > >> >> >>> consumer.seekToBeginning() > >> >> >>> consumer.poll(0) > >> >> >>> > >> >> >>> > >> >> >>> > >> >> >>> > >> >> >>> On Tue, Mar 8, 2016 at 11:22 PM, Guozhang Wang < > wangg...@gmail.com> > >> >> wrote: > >> >> >>> > Hi Cody, > >> >> >>> > > >> >> >>> > The problem with that code is in `seekToBeginning()` followed > by > >> >> >>> > `subscribe(topic)`. > >> >> >>> > > >> >> >>> > Since `subscribe` call is lazy evaluated, by the time > >> >> `seekToBeginning()` > >> >> >>> > is called no partition is assigned yet, and hence it is > >> effectively > >> >> an > >> >> >>> > no-op. > >> >> >>> > > >> >> >>> > Try > >> >> >>> > > >> >> >>> > consumer.subscribe(topics) > >> >> >>> > consumer.poll(0); // get assigned partitions > >> >> >>> > consumer.seekToBeginning() > >> >> >>> > consumer.poll(0) > >> >> >>> > > >> >> >>> > to see if that works. > >> >> >>> > > >> >> >>> > I think it is a valid issue that can be fixed in the new > consumer > >> >> that, > >> >> >>> > upon calling seekToEnd/Beginning with no parameter, while no > >> >> assigned is > >> >> >>> > done yet, do the coordination behind the scene; it will though > >> >> change the > >> >> >>> > behavior of the functions as they are no longer always lazily > >> >> evaluated. > >> >> >>> > > >> >> >>> > > >> >> >>> > Guozhang > >> >> >>> > > >> >> >>> > > >> >> >>> > On Tue, Mar 8, 2016 at 2:08 PM, Cody Koeninger < > >> c...@koeninger.org> > >> >> >>> wrote: > >> >> >>> > > >> >> >>> >> Using the 0.9 consumer, I would like to start consuming at the > >> >> >>> >> beginning or end, without specifying auto.offset.reset. > >> >> >>> >> > >> >> >>> >> This does not seem to be possible: > >> >> >>> >> > >> >> >>> >> val kafkaParams = Map[String, Object]( > >> >> >>> >> "bootstrap.servers" -> conf.getString("kafka.brokers"), > >> >> >>> >> "key.deserializer" -> classOf[StringDeserializer], > >> >> >>> >> "value.deserializer" -> classOf[StringDeserializer], > >> >> >>> >> "group.id" -> "example", > >> >> >>> >> "auto.offset.reset" -> "none" > >> >> >>> >> ).asJava > >> >> >>> >> val topics = > >> >> conf.getString("kafka.topics").split(",").toList.asJava > >> >> >>> >> val consumer = new KafkaConsumer[String, > String](kafkaParams) > >> >> >>> >> consumer.subscribe(topics) > >> >> >>> >> consumer.seekToBeginning() > >> >> >>> >> consumer.poll(0) > >> >> >>> >> > >> >> >>> >> > >> >> >>> >> Results in: > >> >> >>> >> > >> >> >>> >> Exception in thread "main" > >> >> >>> >> > org.apache.kafka.clients.consumer.NoOffsetForPartitionException: > >> >> >>> >> Undefined offset with no reset policy for partition: testtwo-4 > >> >> >>> >> at > >> >> >>> >> > >> >> >>> > >> >> > >> > org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:288) > >> >> >>> >> at > >> >> >>> >> > >> >> >>> > >> >> > >> > org.apache.kafka.clients.consumer.internals.Fetcher.updateFetchPositions(Fetcher.java:167) > >> >> >>> >> at > >> >> >>> >> > >> >> >>> > >> >> > >> > org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1302) > >> >> >>> >> at > >> >> >>> >> > >> >> >>> > >> >> > >> > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:895) > >> >> >>> >> at > >> >> >>> >> > >> >> >>> > >> >> > >> > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853) > >> >> >>> >> at > >> >> example.BasicKafkaConsumer$.main(BasicKafkaConsumer.scala:25) > >> >> >>> >> > >> >> >>> >> > >> >> >>> >> I'm assuming this is because, at the time seekToBeginning() is > >> >> called, > >> >> >>> >> subscriptions.assignedPartitions isn't populated. But > polling in > >> >> >>> >> order to assign topicpartitions results in an error, which > >> creates a > >> >> >>> >> chicken-or-the-egg situation. > >> >> >>> >> > >> >> >>> >> I don't want to set auto.offset.reset, because I want a hard > >> error > >> >> if > >> >> >>> >> the offsets are out of range at any other time during > >> consumption. > >> >> >>> >> > >> >> >>> > > >> >> >>> > > >> >> >>> > > >> >> >>> > -- > >> >> >>> > -- Guozhang > >> >> >>> > >> >> > >> > > >> > > >> > > >> > -- > >> > -- Guozhang > >> > > > > > > > > -- > > -- Guozhang > -- -- Guozhang