Cody,

What I meant for a special case of `seekToXX` is that, today when the
function is called with no partition parameters. It will try to execute the
logic on all "assigned" partitions for the consumer. And once that is done,
the subsequent poll() will not throw the exception since it knows those
partitions needs to reset offsets.

However for your case, there is no assigned partitions yet, and hence
`seekToXX` will not take effects on any partitions. The assignment is
wrapped in the poll() call as you mentioned. And one way to solve it is to
let the `seekToXX()` with no parameters do the coordination and get the
assigned partitions if there are any subscribed topics, so that the
subsequent poll() will know those partitions need resetting offsets. Does
that make sense?

As for now another way I can think of is to get the partition info
beforehand and call `seekToBeginning` on all partitions. But that only
works if the consumer knows it is going to get all the partitions assigned
to itself (i.e. you are only running a single instance).

Guozhang


On Wed, Mar 9, 2016 at 6:22 AM, Cody Koeninger <c...@koeninger.org> wrote:

> Another unfortunate thing about ConsumerRebalanceListener is that in
> order to do meaningful work in the callback, you need a reference to
> the consumer that called it.  But that reference isn't provided to the
> callback, which means the listener implementation needs to hold a
> reference to the consumer.  Seems like this makes it unnecessarily
> awkward to serialize or provide a 0 arg constructor for the listener.
>
> On Wed, Mar 9, 2016 at 7:28 AM, Cody Koeninger <c...@koeninger.org> wrote:
> > I thought about ConsumerRebalanceListener, but seeking to the
> > beginning any time there's a rebalance for whatever reason is not
> > necessarily the same thing as seeking to the beginning before first
> > starting the consumer.
> >
> > On Wed, Mar 9, 2016 at 2:24 AM, Kamal C <kamaltar...@gmail.com> wrote:
> >> Cody,
> >>
> >> Use ConsumerRebalanceListener to achieve that,
> >>
> >> ConsumerRebalanceListener listener = new ConsumerRebalanceListener() {
> >>
> >>             @Override
> >>             public void onPartitionsRevoked(Collection<TopicPartition>
> >> partitions) {
> >>             }
> >>
> >>             @Override
> >>             public void onPartitionsAssigned(Collection<TopicPartition>
> >> partitions) {
> >>                 consumer.seekToBeginning(partitions.toArray(new
> >> TopicPartition[0]));
> >>             }
> >>         };
> >>
> >> consumer.subscribe(topics, listener);
> >>
> >> On Wed, Mar 9, 2016 at 12:05 PM, Cody Koeninger <c...@koeninger.org>
> wrote:
> >>
> >>> That suggestion doesn't work, for pretty much the same reason - at the
> >>> time poll is first called, there is no reset policy and no committed
> >>> offset, so NoOffsetForPartitionException is thrown
> >>>
> >>> I feel like the underlying problem isn't so much that seekToEnd needs
> >>> special case behavior.  It's more that  topic metadata fetches,
> >>> consumer position fetches, and message fetches are all lumped together
> >>> under a single poll() call, with no way to do them individually if
> >>> necessary.
> >>>
> >>> What does "work" in this situation is to just catch the exception
> >>> (which leaves the consumer in a state where topics are assigned) and
> >>> then seek.  But that is not exactly an elegant interface.
> >>>
> >>>     consumer.subscribe(topics)
> >>>     try {
> >>>       consumer.poll(0)
> >>>     } catch {
> >>>       case x: Throwable =>
> >>>     }
> >>>     consumer.seekToBeginning()
> >>>     consumer.poll(0)
> >>>
> >>>
> >>>
> >>>
> >>> On Tue, Mar 8, 2016 at 11:22 PM, Guozhang Wang <wangg...@gmail.com>
> wrote:
> >>> > Hi Cody,
> >>> >
> >>> > The problem with that code is in `seekToBeginning()` followed by
> >>> > `subscribe(topic)`.
> >>> >
> >>> > Since `subscribe` call is lazy evaluated, by the time
> `seekToBeginning()`
> >>> > is called no partition is assigned yet, and hence it is effectively
> an
> >>> > no-op.
> >>> >
> >>> > Try
> >>> >
> >>> >     consumer.subscribe(topics)
> >>> >     consumer.poll(0);  // get assigned partitions
> >>> >     consumer.seekToBeginning()
> >>> >     consumer.poll(0)
> >>> >
> >>> > to see if that works.
> >>> >
> >>> > I think it is a valid issue that can be fixed in the new consumer
> that,
> >>> > upon calling seekToEnd/Beginning with no parameter, while no
> assigned is
> >>> > done yet, do the coordination behind the scene; it will though
> change the
> >>> > behavior of the functions as they are no longer always lazily
> evaluated.
> >>> >
> >>> >
> >>> > Guozhang
> >>> >
> >>> >
> >>> > On Tue, Mar 8, 2016 at 2:08 PM, Cody Koeninger <c...@koeninger.org>
> >>> wrote:
> >>> >
> >>> >> Using the 0.9 consumer, I would like to start consuming at the
> >>> >> beginning or end, without specifying auto.offset.reset.
> >>> >>
> >>> >> This does not seem to be possible:
> >>> >>
> >>> >>     val kafkaParams = Map[String, Object](
> >>> >>       "bootstrap.servers" -> conf.getString("kafka.brokers"),
> >>> >>       "key.deserializer" -> classOf[StringDeserializer],
> >>> >>       "value.deserializer" -> classOf[StringDeserializer],
> >>> >>       "group.id" -> "example",
> >>> >>       "auto.offset.reset" -> "none"
> >>> >>     ).asJava
> >>> >>     val topics =
> conf.getString("kafka.topics").split(",").toList.asJava
> >>> >>     val consumer = new KafkaConsumer[String, String](kafkaParams)
> >>> >>     consumer.subscribe(topics)
> >>> >>     consumer.seekToBeginning()
> >>> >>     consumer.poll(0)
> >>> >>
> >>> >>
> >>> >> Results in:
> >>> >>
> >>> >> Exception in thread "main"
> >>> >> org.apache.kafka.clients.consumer.NoOffsetForPartitionException:
> >>> >> Undefined offset with no reset policy for partition: testtwo-4
> >>> >>         at
> >>> >>
> >>>
> org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:288)
> >>> >>         at
> >>> >>
> >>>
> org.apache.kafka.clients.consumer.internals.Fetcher.updateFetchPositions(Fetcher.java:167)
> >>> >>         at
> >>> >>
> >>>
> org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1302)
> >>> >>         at
> >>> >>
> >>>
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:895)
> >>> >>         at
> >>> >>
> >>>
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
> >>> >>         at
> example.BasicKafkaConsumer$.main(BasicKafkaConsumer.scala:25)
> >>> >>
> >>> >>
> >>> >> I'm assuming this is because, at the time seekToBeginning() is
> called,
> >>> >> subscriptions.assignedPartitions isn't populated.  But polling in
> >>> >> order to assign topicpartitions results in an error, which creates a
> >>> >> chicken-or-the-egg situation.
> >>> >>
> >>> >> I don't want to set auto.offset.reset, because I want a hard error
> if
> >>> >> the offsets are out of range at any other time during consumption.
> >>> >>
> >>> >
> >>> >
> >>> >
> >>> > --
> >>> > -- Guozhang
> >>>
>



-- 
-- Guozhang

Reply via email to