Hi Cody, The problem with that code is in `seekToBeginning()` followed by `subscribe(topic)`.
Since `subscribe` call is lazy evaluated, by the time `seekToBeginning()` is called no partition is assigned yet, and hence it is effectively an no-op. Try consumer.subscribe(topics) consumer.poll(0); // get assigned partitions consumer.seekToBeginning() consumer.poll(0) to see if that works. I think it is a valid issue that can be fixed in the new consumer that, upon calling seekToEnd/Beginning with no parameter, while no assigned is done yet, do the coordination behind the scene; it will though change the behavior of the functions as they are no longer always lazily evaluated. Guozhang On Tue, Mar 8, 2016 at 2:08 PM, Cody Koeninger <c...@koeninger.org> wrote: > Using the 0.9 consumer, I would like to start consuming at the > beginning or end, without specifying auto.offset.reset. > > This does not seem to be possible: > > val kafkaParams = Map[String, Object]( > "bootstrap.servers" -> conf.getString("kafka.brokers"), > "key.deserializer" -> classOf[StringDeserializer], > "value.deserializer" -> classOf[StringDeserializer], > "group.id" -> "example", > "auto.offset.reset" -> "none" > ).asJava > val topics = conf.getString("kafka.topics").split(",").toList.asJava > val consumer = new KafkaConsumer[String, String](kafkaParams) > consumer.subscribe(topics) > consumer.seekToBeginning() > consumer.poll(0) > > > Results in: > > Exception in thread "main" > org.apache.kafka.clients.consumer.NoOffsetForPartitionException: > Undefined offset with no reset policy for partition: testtwo-4 > at > org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:288) > at > org.apache.kafka.clients.consumer.internals.Fetcher.updateFetchPositions(Fetcher.java:167) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1302) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:895) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853) > at example.BasicKafkaConsumer$.main(BasicKafkaConsumer.scala:25) > > > I'm assuming this is because, at the time seekToBeginning() is called, > subscriptions.assignedPartitions isn't populated. But polling in > order to assign topicpartitions results in an error, which creates a > chicken-or-the-egg situation. > > I don't want to set auto.offset.reset, because I want a hard error if > the offsets are out of range at any other time during consumption. > -- -- Guozhang