Filed https://issues.apache.org/jira/browse/KAFKA-3370.

On Wed, Mar 9, 2016 at 1:11 PM, Cody Koeninger <c...@koeninger.org> wrote:

> That sounds like an interesting way of addressing the problem, can
> continue further discussions on the JIRA
>
>
>
> On Wed, Mar 9, 2016 at 2:59 PM, Guozhang Wang <wangg...@gmail.com> wrote:
> > Cody:
> >
> > More specifically, you do not need the "listTopics" function if you
> already
> > know your subscribed topics, just use "partitionsFor" is sufficient.
> >
> > About the fix, I'm thinking of adding two more options in the
> > auto.offset.rest, say namely "earliest-on-start" and "latest-on-start",
> > which sets the reset position ONLY at starting up. The reason is that the
> > seekToXX was actually not designed to do such initialization but for
> > calling during the lifetime of the consumer, and we'd better provide the
> > right solution to do so.
> >
> > I can file the JIRA right away and start further discussions there. But
> let
> > me know if you have any other ideas.
> >
> > Guozhang
> >
> > On Wed, Mar 9, 2016 at 12:25 PM, Cody Koeninger <c...@koeninger.org>
> wrote:
> >
> >> Yeah, I think I understood what you were saying.  What I'm saying is
> >> that if there were a way to just fetch metadata without doing the rest
> >> of the work poll() does, it wouldn't be necessary.  I guess I can do
> >> listTopics to get all metadata for all topics and then parse it.
> >>
> >> Regarding running a single instance, that is the case for what I'm
> >> talking about.
> >>
> >> On Wed, Mar 9, 2016 at 2:02 PM, Guozhang Wang <wangg...@gmail.com>
> wrote:
> >> > Cody,
> >> >
> >> > What I meant for a special case of `seekToXX` is that, today when the
> >> > function is called with no partition parameters. It will try to
> execute
> >> the
> >> > logic on all "assigned" partitions for the consumer. And once that is
> >> done,
> >> > the subsequent poll() will not throw the exception since it knows
> those
> >> > partitions needs to reset offsets.
> >> >
> >> > However for your case, there is no assigned partitions yet, and hence
> >> > `seekToXX` will not take effects on any partitions. The assignment is
> >> > wrapped in the poll() call as you mentioned. And one way to solve it
> is
> >> to
> >> > let the `seekToXX()` with no parameters do the coordination and get
> the
> >> > assigned partitions if there are any subscribed topics, so that the
> >> > subsequent poll() will know those partitions need resetting offsets.
> Does
> >> > that make sense?
> >> >
> >> > As for now another way I can think of is to get the partition info
> >> > beforehand and call `seekToBeginning` on all partitions. But that only
> >> > works if the consumer knows it is going to get all the partitions
> >> assigned
> >> > to itself (i.e. you are only running a single instance).
> >> >
> >> > Guozhang
> >> >
> >> >
> >> > On Wed, Mar 9, 2016 at 6:22 AM, Cody Koeninger <c...@koeninger.org>
> >> wrote:
> >> >
> >> >> Another unfortunate thing about ConsumerRebalanceListener is that in
> >> >> order to do meaningful work in the callback, you need a reference to
> >> >> the consumer that called it.  But that reference isn't provided to
> the
> >> >> callback, which means the listener implementation needs to hold a
> >> >> reference to the consumer.  Seems like this makes it unnecessarily
> >> >> awkward to serialize or provide a 0 arg constructor for the listener.
> >> >>
> >> >> On Wed, Mar 9, 2016 at 7:28 AM, Cody Koeninger <c...@koeninger.org>
> >> wrote:
> >> >> > I thought about ConsumerRebalanceListener, but seeking to the
> >> >> > beginning any time there's a rebalance for whatever reason is not
> >> >> > necessarily the same thing as seeking to the beginning before first
> >> >> > starting the consumer.
> >> >> >
> >> >> > On Wed, Mar 9, 2016 at 2:24 AM, Kamal C <kamaltar...@gmail.com>
> >> wrote:
> >> >> >> Cody,
> >> >> >>
> >> >> >> Use ConsumerRebalanceListener to achieve that,
> >> >> >>
> >> >> >> ConsumerRebalanceListener listener = new
> ConsumerRebalanceListener()
> >> {
> >> >> >>
> >> >> >>             @Override
> >> >> >>             public void
> >> onPartitionsRevoked(Collection<TopicPartition>
> >> >> >> partitions) {
> >> >> >>             }
> >> >> >>
> >> >> >>             @Override
> >> >> >>             public void
> >> onPartitionsAssigned(Collection<TopicPartition>
> >> >> >> partitions) {
> >> >> >>                 consumer.seekToBeginning(partitions.toArray(new
> >> >> >> TopicPartition[0]));
> >> >> >>             }
> >> >> >>         };
> >> >> >>
> >> >> >> consumer.subscribe(topics, listener);
> >> >> >>
> >> >> >> On Wed, Mar 9, 2016 at 12:05 PM, Cody Koeninger <
> c...@koeninger.org>
> >> >> wrote:
> >> >> >>
> >> >> >>> That suggestion doesn't work, for pretty much the same reason -
> at
> >> the
> >> >> >>> time poll is first called, there is no reset policy and no
> committed
> >> >> >>> offset, so NoOffsetForPartitionException is thrown
> >> >> >>>
> >> >> >>> I feel like the underlying problem isn't so much that seekToEnd
> >> needs
> >> >> >>> special case behavior.  It's more that  topic metadata fetches,
> >> >> >>> consumer position fetches, and message fetches are all lumped
> >> together
> >> >> >>> under a single poll() call, with no way to do them individually
> if
> >> >> >>> necessary.
> >> >> >>>
> >> >> >>> What does "work" in this situation is to just catch the exception
> >> >> >>> (which leaves the consumer in a state where topics are assigned)
> and
> >> >> >>> then seek.  But that is not exactly an elegant interface.
> >> >> >>>
> >> >> >>>     consumer.subscribe(topics)
> >> >> >>>     try {
> >> >> >>>       consumer.poll(0)
> >> >> >>>     } catch {
> >> >> >>>       case x: Throwable =>
> >> >> >>>     }
> >> >> >>>     consumer.seekToBeginning()
> >> >> >>>     consumer.poll(0)
> >> >> >>>
> >> >> >>>
> >> >> >>>
> >> >> >>>
> >> >> >>> On Tue, Mar 8, 2016 at 11:22 PM, Guozhang Wang <
> wangg...@gmail.com>
> >> >> wrote:
> >> >> >>> > Hi Cody,
> >> >> >>> >
> >> >> >>> > The problem with that code is in `seekToBeginning()` followed
> by
> >> >> >>> > `subscribe(topic)`.
> >> >> >>> >
> >> >> >>> > Since `subscribe` call is lazy evaluated, by the time
> >> >> `seekToBeginning()`
> >> >> >>> > is called no partition is assigned yet, and hence it is
> >> effectively
> >> >> an
> >> >> >>> > no-op.
> >> >> >>> >
> >> >> >>> > Try
> >> >> >>> >
> >> >> >>> >     consumer.subscribe(topics)
> >> >> >>> >     consumer.poll(0);  // get assigned partitions
> >> >> >>> >     consumer.seekToBeginning()
> >> >> >>> >     consumer.poll(0)
> >> >> >>> >
> >> >> >>> > to see if that works.
> >> >> >>> >
> >> >> >>> > I think it is a valid issue that can be fixed in the new
> consumer
> >> >> that,
> >> >> >>> > upon calling seekToEnd/Beginning with no parameter, while no
> >> >> assigned is
> >> >> >>> > done yet, do the coordination behind the scene; it will though
> >> >> change the
> >> >> >>> > behavior of the functions as they are no longer always lazily
> >> >> evaluated.
> >> >> >>> >
> >> >> >>> >
> >> >> >>> > Guozhang
> >> >> >>> >
> >> >> >>> >
> >> >> >>> > On Tue, Mar 8, 2016 at 2:08 PM, Cody Koeninger <
> >> c...@koeninger.org>
> >> >> >>> wrote:
> >> >> >>> >
> >> >> >>> >> Using the 0.9 consumer, I would like to start consuming at the
> >> >> >>> >> beginning or end, without specifying auto.offset.reset.
> >> >> >>> >>
> >> >> >>> >> This does not seem to be possible:
> >> >> >>> >>
> >> >> >>> >>     val kafkaParams = Map[String, Object](
> >> >> >>> >>       "bootstrap.servers" -> conf.getString("kafka.brokers"),
> >> >> >>> >>       "key.deserializer" -> classOf[StringDeserializer],
> >> >> >>> >>       "value.deserializer" -> classOf[StringDeserializer],
> >> >> >>> >>       "group.id" -> "example",
> >> >> >>> >>       "auto.offset.reset" -> "none"
> >> >> >>> >>     ).asJava
> >> >> >>> >>     val topics =
> >> >> conf.getString("kafka.topics").split(",").toList.asJava
> >> >> >>> >>     val consumer = new KafkaConsumer[String,
> String](kafkaParams)
> >> >> >>> >>     consumer.subscribe(topics)
> >> >> >>> >>     consumer.seekToBeginning()
> >> >> >>> >>     consumer.poll(0)
> >> >> >>> >>
> >> >> >>> >>
> >> >> >>> >> Results in:
> >> >> >>> >>
> >> >> >>> >> Exception in thread "main"
> >> >> >>> >>
> org.apache.kafka.clients.consumer.NoOffsetForPartitionException:
> >> >> >>> >> Undefined offset with no reset policy for partition: testtwo-4
> >> >> >>> >>         at
> >> >> >>> >>
> >> >> >>>
> >> >>
> >>
> org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:288)
> >> >> >>> >>         at
> >> >> >>> >>
> >> >> >>>
> >> >>
> >>
> org.apache.kafka.clients.consumer.internals.Fetcher.updateFetchPositions(Fetcher.java:167)
> >> >> >>> >>         at
> >> >> >>> >>
> >> >> >>>
> >> >>
> >>
> org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1302)
> >> >> >>> >>         at
> >> >> >>> >>
> >> >> >>>
> >> >>
> >>
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:895)
> >> >> >>> >>         at
> >> >> >>> >>
> >> >> >>>
> >> >>
> >>
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
> >> >> >>> >>         at
> >> >> example.BasicKafkaConsumer$.main(BasicKafkaConsumer.scala:25)
> >> >> >>> >>
> >> >> >>> >>
> >> >> >>> >> I'm assuming this is because, at the time seekToBeginning() is
> >> >> called,
> >> >> >>> >> subscriptions.assignedPartitions isn't populated.  But
> polling in
> >> >> >>> >> order to assign topicpartitions results in an error, which
> >> creates a
> >> >> >>> >> chicken-or-the-egg situation.
> >> >> >>> >>
> >> >> >>> >> I don't want to set auto.offset.reset, because I want a hard
> >> error
> >> >> if
> >> >> >>> >> the offsets are out of range at any other time during
> >> consumption.
> >> >> >>> >>
> >> >> >>> >
> >> >> >>> >
> >> >> >>> >
> >> >> >>> > --
> >> >> >>> > -- Guozhang
> >> >> >>>
> >> >>
> >> >
> >> >
> >> >
> >> > --
> >> > -- Guozhang
> >>
> >
> >
> >
> > --
> > -- Guozhang
>



-- 
-- Guozhang

Reply via email to