Cody,
Use ConsumerRebalanceListener to achieve that,
ConsumerRebalanceListener listener = new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition>
partitions) {
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition>
partitions) {
consumer.seekToBeginning(partitions.toArray(new
TopicPartition[0]));
}
};
consumer.subscribe(topics, listener);
On Wed, Mar 9, 2016 at 12:05 PM, Cody Koeninger <[email protected]> wrote:
> That suggestion doesn't work, for pretty much the same reason - at the
> time poll is first called, there is no reset policy and no committed
> offset, so NoOffsetForPartitionException is thrown
>
> I feel like the underlying problem isn't so much that seekToEnd needs
> special case behavior. It's more that topic metadata fetches,
> consumer position fetches, and message fetches are all lumped together
> under a single poll() call, with no way to do them individually if
> necessary.
>
> What does "work" in this situation is to just catch the exception
> (which leaves the consumer in a state where topics are assigned) and
> then seek. But that is not exactly an elegant interface.
>
> consumer.subscribe(topics)
> try {
> consumer.poll(0)
> } catch {
> case x: Throwable =>
> }
> consumer.seekToBeginning()
> consumer.poll(0)
>
>
>
>
> On Tue, Mar 8, 2016 at 11:22 PM, Guozhang Wang <[email protected]> wrote:
> > Hi Cody,
> >
> > The problem with that code is in `seekToBeginning()` followed by
> > `subscribe(topic)`.
> >
> > Since `subscribe` call is lazy evaluated, by the time `seekToBeginning()`
> > is called no partition is assigned yet, and hence it is effectively an
> > no-op.
> >
> > Try
> >
> > consumer.subscribe(topics)
> > consumer.poll(0); // get assigned partitions
> > consumer.seekToBeginning()
> > consumer.poll(0)
> >
> > to see if that works.
> >
> > I think it is a valid issue that can be fixed in the new consumer that,
> > upon calling seekToEnd/Beginning with no parameter, while no assigned is
> > done yet, do the coordination behind the scene; it will though change the
> > behavior of the functions as they are no longer always lazily evaluated.
> >
> >
> > Guozhang
> >
> >
> > On Tue, Mar 8, 2016 at 2:08 PM, Cody Koeninger <[email protected]>
> wrote:
> >
> >> Using the 0.9 consumer, I would like to start consuming at the
> >> beginning or end, without specifying auto.offset.reset.
> >>
> >> This does not seem to be possible:
> >>
> >> val kafkaParams = Map[String, Object](
> >> "bootstrap.servers" -> conf.getString("kafka.brokers"),
> >> "key.deserializer" -> classOf[StringDeserializer],
> >> "value.deserializer" -> classOf[StringDeserializer],
> >> "group.id" -> "example",
> >> "auto.offset.reset" -> "none"
> >> ).asJava
> >> val topics = conf.getString("kafka.topics").split(",").toList.asJava
> >> val consumer = new KafkaConsumer[String, String](kafkaParams)
> >> consumer.subscribe(topics)
> >> consumer.seekToBeginning()
> >> consumer.poll(0)
> >>
> >>
> >> Results in:
> >>
> >> Exception in thread "main"
> >> org.apache.kafka.clients.consumer.NoOffsetForPartitionException:
> >> Undefined offset with no reset policy for partition: testtwo-4
> >> at
> >>
> org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:288)
> >> at
> >>
> org.apache.kafka.clients.consumer.internals.Fetcher.updateFetchPositions(Fetcher.java:167)
> >> at
> >>
> org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1302)
> >> at
> >>
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:895)
> >> at
> >>
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
> >> at example.BasicKafkaConsumer$.main(BasicKafkaConsumer.scala:25)
> >>
> >>
> >> I'm assuming this is because, at the time seekToBeginning() is called,
> >> subscriptions.assignedPartitions isn't populated. But polling in
> >> order to assign topicpartitions results in an error, which creates a
> >> chicken-or-the-egg situation.
> >>
> >> I don't want to set auto.offset.reset, because I want a hard error if
> >> the offsets are out of range at any other time during consumption.
> >>
> >
> >
> >
> > --
> > -- Guozhang
>