I thought about ConsumerRebalanceListener, but seeking to the beginning any time there's a rebalance for whatever reason is not necessarily the same thing as seeking to the beginning before first starting the consumer.
On Wed, Mar 9, 2016 at 2:24 AM, Kamal C <kamaltar...@gmail.com> wrote: > Cody, > > Use ConsumerRebalanceListener to achieve that, > > ConsumerRebalanceListener listener = new ConsumerRebalanceListener() { > > @Override > public void onPartitionsRevoked(Collection<TopicPartition> > partitions) { > } > > @Override > public void onPartitionsAssigned(Collection<TopicPartition> > partitions) { > consumer.seekToBeginning(partitions.toArray(new > TopicPartition[0])); > } > }; > > consumer.subscribe(topics, listener); > > On Wed, Mar 9, 2016 at 12:05 PM, Cody Koeninger <c...@koeninger.org> wrote: > >> That suggestion doesn't work, for pretty much the same reason - at the >> time poll is first called, there is no reset policy and no committed >> offset, so NoOffsetForPartitionException is thrown >> >> I feel like the underlying problem isn't so much that seekToEnd needs >> special case behavior. It's more that topic metadata fetches, >> consumer position fetches, and message fetches are all lumped together >> under a single poll() call, with no way to do them individually if >> necessary. >> >> What does "work" in this situation is to just catch the exception >> (which leaves the consumer in a state where topics are assigned) and >> then seek. But that is not exactly an elegant interface. >> >> consumer.subscribe(topics) >> try { >> consumer.poll(0) >> } catch { >> case x: Throwable => >> } >> consumer.seekToBeginning() >> consumer.poll(0) >> >> >> >> >> On Tue, Mar 8, 2016 at 11:22 PM, Guozhang Wang <wangg...@gmail.com> wrote: >> > Hi Cody, >> > >> > The problem with that code is in `seekToBeginning()` followed by >> > `subscribe(topic)`. >> > >> > Since `subscribe` call is lazy evaluated, by the time `seekToBeginning()` >> > is called no partition is assigned yet, and hence it is effectively an >> > no-op. >> > >> > Try >> > >> > consumer.subscribe(topics) >> > consumer.poll(0); // get assigned partitions >> > consumer.seekToBeginning() >> > consumer.poll(0) >> > >> > to see if that works. >> > >> > I think it is a valid issue that can be fixed in the new consumer that, >> > upon calling seekToEnd/Beginning with no parameter, while no assigned is >> > done yet, do the coordination behind the scene; it will though change the >> > behavior of the functions as they are no longer always lazily evaluated. >> > >> > >> > Guozhang >> > >> > >> > On Tue, Mar 8, 2016 at 2:08 PM, Cody Koeninger <c...@koeninger.org> >> wrote: >> > >> >> Using the 0.9 consumer, I would like to start consuming at the >> >> beginning or end, without specifying auto.offset.reset. >> >> >> >> This does not seem to be possible: >> >> >> >> val kafkaParams = Map[String, Object]( >> >> "bootstrap.servers" -> conf.getString("kafka.brokers"), >> >> "key.deserializer" -> classOf[StringDeserializer], >> >> "value.deserializer" -> classOf[StringDeserializer], >> >> "group.id" -> "example", >> >> "auto.offset.reset" -> "none" >> >> ).asJava >> >> val topics = conf.getString("kafka.topics").split(",").toList.asJava >> >> val consumer = new KafkaConsumer[String, String](kafkaParams) >> >> consumer.subscribe(topics) >> >> consumer.seekToBeginning() >> >> consumer.poll(0) >> >> >> >> >> >> Results in: >> >> >> >> Exception in thread "main" >> >> org.apache.kafka.clients.consumer.NoOffsetForPartitionException: >> >> Undefined offset with no reset policy for partition: testtwo-4 >> >> at >> >> >> org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:288) >> >> at >> >> >> org.apache.kafka.clients.consumer.internals.Fetcher.updateFetchPositions(Fetcher.java:167) >> >> at >> >> >> org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1302) >> >> at >> >> >> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:895) >> >> at >> >> >> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853) >> >> at example.BasicKafkaConsumer$.main(BasicKafkaConsumer.scala:25) >> >> >> >> >> >> I'm assuming this is because, at the time seekToBeginning() is called, >> >> subscriptions.assignedPartitions isn't populated. But polling in >> >> order to assign topicpartitions results in an error, which creates a >> >> chicken-or-the-egg situation. >> >> >> >> I don't want to set auto.offset.reset, because I want a hard error if >> >> the offsets are out of range at any other time during consumption. >> >> >> > >> > >> > >> > -- >> > -- Guozhang >>