Cody,

Use ConsumerRebalanceListener to achieve that,

ConsumerRebalanceListener listener = new ConsumerRebalanceListener() {

            @Override
            public void onPartitionsRevoked(Collection<TopicPartition>
partitions) {
            }

            @Override
            public void onPartitionsAssigned(Collection<TopicPartition>
partitions) {
                consumer.seekToBeginning(partitions.toArray(new
TopicPartition[0]));
            }
        };

consumer.subscribe(topics, listener);

On Wed, Mar 9, 2016 at 12:05 PM, Cody Koeninger <c...@koeninger.org> wrote:

> That suggestion doesn't work, for pretty much the same reason - at the
> time poll is first called, there is no reset policy and no committed
> offset, so NoOffsetForPartitionException is thrown
>
> I feel like the underlying problem isn't so much that seekToEnd needs
> special case behavior.  It's more that  topic metadata fetches,
> consumer position fetches, and message fetches are all lumped together
> under a single poll() call, with no way to do them individually if
> necessary.
>
> What does "work" in this situation is to just catch the exception
> (which leaves the consumer in a state where topics are assigned) and
> then seek.  But that is not exactly an elegant interface.
>
>     consumer.subscribe(topics)
>     try {
>       consumer.poll(0)
>     } catch {
>       case x: Throwable =>
>     }
>     consumer.seekToBeginning()
>     consumer.poll(0)
>
>
>
>
> On Tue, Mar 8, 2016 at 11:22 PM, Guozhang Wang <wangg...@gmail.com> wrote:
> > Hi Cody,
> >
> > The problem with that code is in `seekToBeginning()` followed by
> > `subscribe(topic)`.
> >
> > Since `subscribe` call is lazy evaluated, by the time `seekToBeginning()`
> > is called no partition is assigned yet, and hence it is effectively an
> > no-op.
> >
> > Try
> >
> >     consumer.subscribe(topics)
> >     consumer.poll(0);  // get assigned partitions
> >     consumer.seekToBeginning()
> >     consumer.poll(0)
> >
> > to see if that works.
> >
> > I think it is a valid issue that can be fixed in the new consumer that,
> > upon calling seekToEnd/Beginning with no parameter, while no assigned is
> > done yet, do the coordination behind the scene; it will though change the
> > behavior of the functions as they are no longer always lazily evaluated.
> >
> >
> > Guozhang
> >
> >
> > On Tue, Mar 8, 2016 at 2:08 PM, Cody Koeninger <c...@koeninger.org>
> wrote:
> >
> >> Using the 0.9 consumer, I would like to start consuming at the
> >> beginning or end, without specifying auto.offset.reset.
> >>
> >> This does not seem to be possible:
> >>
> >>     val kafkaParams = Map[String, Object](
> >>       "bootstrap.servers" -> conf.getString("kafka.brokers"),
> >>       "key.deserializer" -> classOf[StringDeserializer],
> >>       "value.deserializer" -> classOf[StringDeserializer],
> >>       "group.id" -> "example",
> >>       "auto.offset.reset" -> "none"
> >>     ).asJava
> >>     val topics = conf.getString("kafka.topics").split(",").toList.asJava
> >>     val consumer = new KafkaConsumer[String, String](kafkaParams)
> >>     consumer.subscribe(topics)
> >>     consumer.seekToBeginning()
> >>     consumer.poll(0)
> >>
> >>
> >> Results in:
> >>
> >> Exception in thread "main"
> >> org.apache.kafka.clients.consumer.NoOffsetForPartitionException:
> >> Undefined offset with no reset policy for partition: testtwo-4
> >>         at
> >>
> org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:288)
> >>         at
> >>
> org.apache.kafka.clients.consumer.internals.Fetcher.updateFetchPositions(Fetcher.java:167)
> >>         at
> >>
> org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1302)
> >>         at
> >>
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:895)
> >>         at
> >>
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
> >>         at example.BasicKafkaConsumer$.main(BasicKafkaConsumer.scala:25)
> >>
> >>
> >> I'm assuming this is because, at the time seekToBeginning() is called,
> >> subscriptions.assignedPartitions isn't populated.  But polling in
> >> order to assign topicpartitions results in an error, which creates a
> >> chicken-or-the-egg situation.
> >>
> >> I don't want to set auto.offset.reset, because I want a hard error if
> >> the offsets are out of range at any other time during consumption.
> >>
> >
> >
> >
> > --
> > -- Guozhang
>

Reply via email to