Cody, Use ConsumerRebalanceListener to achieve that,
ConsumerRebalanceListener listener = new ConsumerRebalanceListener() { @Override public void onPartitionsRevoked(Collection<TopicPartition> partitions) { } @Override public void onPartitionsAssigned(Collection<TopicPartition> partitions) { consumer.seekToBeginning(partitions.toArray(new TopicPartition[0])); } }; consumer.subscribe(topics, listener); On Wed, Mar 9, 2016 at 12:05 PM, Cody Koeninger <c...@koeninger.org> wrote: > That suggestion doesn't work, for pretty much the same reason - at the > time poll is first called, there is no reset policy and no committed > offset, so NoOffsetForPartitionException is thrown > > I feel like the underlying problem isn't so much that seekToEnd needs > special case behavior. It's more that topic metadata fetches, > consumer position fetches, and message fetches are all lumped together > under a single poll() call, with no way to do them individually if > necessary. > > What does "work" in this situation is to just catch the exception > (which leaves the consumer in a state where topics are assigned) and > then seek. But that is not exactly an elegant interface. > > consumer.subscribe(topics) > try { > consumer.poll(0) > } catch { > case x: Throwable => > } > consumer.seekToBeginning() > consumer.poll(0) > > > > > On Tue, Mar 8, 2016 at 11:22 PM, Guozhang Wang <wangg...@gmail.com> wrote: > > Hi Cody, > > > > The problem with that code is in `seekToBeginning()` followed by > > `subscribe(topic)`. > > > > Since `subscribe` call is lazy evaluated, by the time `seekToBeginning()` > > is called no partition is assigned yet, and hence it is effectively an > > no-op. > > > > Try > > > > consumer.subscribe(topics) > > consumer.poll(0); // get assigned partitions > > consumer.seekToBeginning() > > consumer.poll(0) > > > > to see if that works. > > > > I think it is a valid issue that can be fixed in the new consumer that, > > upon calling seekToEnd/Beginning with no parameter, while no assigned is > > done yet, do the coordination behind the scene; it will though change the > > behavior of the functions as they are no longer always lazily evaluated. > > > > > > Guozhang > > > > > > On Tue, Mar 8, 2016 at 2:08 PM, Cody Koeninger <c...@koeninger.org> > wrote: > > > >> Using the 0.9 consumer, I would like to start consuming at the > >> beginning or end, without specifying auto.offset.reset. > >> > >> This does not seem to be possible: > >> > >> val kafkaParams = Map[String, Object]( > >> "bootstrap.servers" -> conf.getString("kafka.brokers"), > >> "key.deserializer" -> classOf[StringDeserializer], > >> "value.deserializer" -> classOf[StringDeserializer], > >> "group.id" -> "example", > >> "auto.offset.reset" -> "none" > >> ).asJava > >> val topics = conf.getString("kafka.topics").split(",").toList.asJava > >> val consumer = new KafkaConsumer[String, String](kafkaParams) > >> consumer.subscribe(topics) > >> consumer.seekToBeginning() > >> consumer.poll(0) > >> > >> > >> Results in: > >> > >> Exception in thread "main" > >> org.apache.kafka.clients.consumer.NoOffsetForPartitionException: > >> Undefined offset with no reset policy for partition: testtwo-4 > >> at > >> > org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:288) > >> at > >> > org.apache.kafka.clients.consumer.internals.Fetcher.updateFetchPositions(Fetcher.java:167) > >> at > >> > org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1302) > >> at > >> > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:895) > >> at > >> > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853) > >> at example.BasicKafkaConsumer$.main(BasicKafkaConsumer.scala:25) > >> > >> > >> I'm assuming this is because, at the time seekToBeginning() is called, > >> subscriptions.assignedPartitions isn't populated. But polling in > >> order to assign topicpartitions results in an error, which creates a > >> chicken-or-the-egg situation. > >> > >> I don't want to set auto.offset.reset, because I want a hard error if > >> the offsets are out of range at any other time during consumption. > >> > > > > > > > > -- > > -- Guozhang >