That suggestion doesn't work, for pretty much the same reason - at the time poll is first called, there is no reset policy and no committed offset, so NoOffsetForPartitionException is thrown
I feel like the underlying problem isn't so much that seekToEnd needs special case behavior. It's more that topic metadata fetches, consumer position fetches, and message fetches are all lumped together under a single poll() call, with no way to do them individually if necessary. What does "work" in this situation is to just catch the exception (which leaves the consumer in a state where topics are assigned) and then seek. But that is not exactly an elegant interface. consumer.subscribe(topics) try { consumer.poll(0) } catch { case x: Throwable => } consumer.seekToBeginning() consumer.poll(0) On Tue, Mar 8, 2016 at 11:22 PM, Guozhang Wang <wangg...@gmail.com> wrote: > Hi Cody, > > The problem with that code is in `seekToBeginning()` followed by > `subscribe(topic)`. > > Since `subscribe` call is lazy evaluated, by the time `seekToBeginning()` > is called no partition is assigned yet, and hence it is effectively an > no-op. > > Try > > consumer.subscribe(topics) > consumer.poll(0); // get assigned partitions > consumer.seekToBeginning() > consumer.poll(0) > > to see if that works. > > I think it is a valid issue that can be fixed in the new consumer that, > upon calling seekToEnd/Beginning with no parameter, while no assigned is > done yet, do the coordination behind the scene; it will though change the > behavior of the functions as they are no longer always lazily evaluated. > > > Guozhang > > > On Tue, Mar 8, 2016 at 2:08 PM, Cody Koeninger <c...@koeninger.org> wrote: > >> Using the 0.9 consumer, I would like to start consuming at the >> beginning or end, without specifying auto.offset.reset. >> >> This does not seem to be possible: >> >> val kafkaParams = Map[String, Object]( >> "bootstrap.servers" -> conf.getString("kafka.brokers"), >> "key.deserializer" -> classOf[StringDeserializer], >> "value.deserializer" -> classOf[StringDeserializer], >> "group.id" -> "example", >> "auto.offset.reset" -> "none" >> ).asJava >> val topics = conf.getString("kafka.topics").split(",").toList.asJava >> val consumer = new KafkaConsumer[String, String](kafkaParams) >> consumer.subscribe(topics) >> consumer.seekToBeginning() >> consumer.poll(0) >> >> >> Results in: >> >> Exception in thread "main" >> org.apache.kafka.clients.consumer.NoOffsetForPartitionException: >> Undefined offset with no reset policy for partition: testtwo-4 >> at >> org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:288) >> at >> org.apache.kafka.clients.consumer.internals.Fetcher.updateFetchPositions(Fetcher.java:167) >> at >> org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1302) >> at >> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:895) >> at >> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853) >> at example.BasicKafkaConsumer$.main(BasicKafkaConsumer.scala:25) >> >> >> I'm assuming this is because, at the time seekToBeginning() is called, >> subscriptions.assignedPartitions isn't populated. But polling in >> order to assign topicpartitions results in an error, which creates a >> chicken-or-the-egg situation. >> >> I don't want to set auto.offset.reset, because I want a hard error if >> the offsets are out of range at any other time during consumption. >> > > > > -- > -- Guozhang