Honestly the fact that everything is hidden inside poll() has been
confusing people since last year, e.g.

https://issues.apache.org/jira/browse/KAFKA-2359

I can try to formulate a KIP for this, but it seems clear that I'm not
the only one giving this feedback, and I may not understand all the
other use cases that have been brought up.

On Sun, Mar 13, 2016 at 3:09 PM, Guozhang Wang <wangg...@gmail.com> wrote:
> Cody,
>
> We do not have an umbrella JIRA for this, but rather a case-by-case JIRA
> ticket / KIP for API changes in consumer.
>
> If you feel strong about some specific change on the consumer API, please
> feel free to create a new KIP with the detailed motivation and proposed
> modifications.
>
> Guozhang
>
> On Fri, Mar 11, 2016 at 12:28 PM, Cody Koeninger <c...@koeninger.org> wrote:
>
>> Is there a KIP or Jira related to " working on improving these cases
>> with improved APIs " ?
>>
>> I saw that there was some discussion of it in KIP-41, but that seemed
>> to have been resolved in favor of keeping everything inside of poll()
>>
>> On Fri, Mar 11, 2016 at 11:17 AM, Guozhang Wang <wangg...@gmail.com>
>> wrote:
>> > Cody, Mansi:
>> >
>> > All good points! Let me try to answer them one-by-one.
>> >
>> > About this specific issue, as I suggested in the JIRA we can separate the
>> > case about resetting offset upon initializing a partition to fetch, from
>> > the case that fetching offset out-of-range in the auto.offset.reset
>> config.
>> > These two scenarios are indeed quite different and it's reasonable
>> treating
>> > them differently.
>> >
>> > About passing a consumer context to the rebalance callback's constructor,
>> > we left it for user's flexibility: if you want to use Kafka to commit
>> > offsets, for example, then you pass the consumer reference to the
>> callback;
>> > if you use an external service to store offsets, you can pass a JDBC
>> > connector, for example, to the callback; for some data mirroring you can
>> > even pass in another producer client into it. Always enforcing the
>> consumer
>> > context could be convenient (i.e. you do not need to pass in the argument
>> > to the constructor yourself) for some use cases, but not necessarily all.
>> >
>> > About wrapping coordination protocols (partition assignment, heartbeat)
>> > inside "poll()" behind the scene, we implemented the APIs in this way in
>> > order to abstract the underlying details from the users, and also to
>> > provide a simple "single-thread-poll-loop" design pattern in the new
>> > Consumer. We realized that it does actually make some of the use cases
>> more
>> > awkward, and are working on improving these cases with improved APIs as
>> > well. Let us know if you have any suggestions about this.
>> >
>> > Guozhang
>> >
>> >
>> >
>> >
>> >
>> >
>> > On Thu, Mar 10, 2016 at 7:53 AM, Mansi Shah <mansis...@maprtech.com>
>> wrote:
>> >
>> >> I second the need for having a consumer context passed to rebalance
>> >> callback. I have ran into issues several times because of that.
>> >>
>> >> About - subscribe vs assign - I have not read through your spark code
>> yet
>> >> (will do by eod), so I am not sure what you mean (other than I do agree
>> >> that new partitions should be consumed automatically). I guess we can
>> >> continue this discussion on the spark list then :-)
>> >>
>> >> Thanks
>> >> Mansi.
>> >>
>> >> On Thu, Mar 10, 2016 at 7:43 AM, Cody Koeninger <c...@koeninger.org>
>> >> wrote:
>> >>
>> >> > Mansi, I'd agree that the fact that everything is tied up in poll
>> >> > seems like the source of the awkward behavior.
>> >> >
>> >> > Regarding assign vs subscribe, most people using the spark integration
>> >> > are just going to want to provide a topic name, not go figure out a
>> >> > bunch of partitions.  They're also going to be surprised if things
>> >> > suddenly blow up once a partition is added, or that partition doesn't
>> >> > start being consumed (we already have that second issue today).
>> >> >
>> >> > Thats why separating the behavior of auto offset reset seems like the
>> >> > best idea I've heard so far.
>> >> >
>> >> > Consumer rebalance listeners are still probably going to be necessary
>> >> > for people who are storing offsets externally.
>> >> >
>> >> > On Thu, Mar 10, 2016 at 9:27 AM, Mansi Shah <mansis...@maprtech.com>
>> >> > wrote:
>> >> > > Guozhang
>> >> > >
>> >> > > Sorry for joining the party a little late. I have been thinking
>> about
>> >> > this
>> >> > > whole awkward behavior of having to call poll(0) to actually make
>> the
>> >> > > underlying subscriptions take effect. Is the core reason for this
>> >> design
>> >> > > the fact that poll is also the actual heartbeat and you want to make
>> >> the
>> >> > > listener group assignments through poll - so that timeouts and
>> >> > > reassignments can all go through poll? So I think clubbing liveness
>> >> with
>> >> > > poll (which in effect clubs consumer group assignments and hence
>> >> metadata
>> >> > > fetch with poll) is the real cause of this design. Were there issues
>> >> > where
>> >> > > you were seeing active consumers not calling poll that led to this
>> >> design
>> >> > > choice? I tried to look for a relevant JIRA but could not find one -
>> >> can
>> >> > > you please point me to something if you have it handy?
>> >> > >
>> >> > > Btw this would also means that your proposal to do the actual
>> >> assignments
>> >> > > through seek might not be ideal since there can still be indefinite
>> >> time
>> >> > > between seek and poll (just like between subscribe and poll) and the
>> >> > > consumer could timeout even before the first poll is called?
>> >> > >
>> >> > >
>> >> > > @Cody in your case if you really have only one consumer and it is
>> going
>> >> > to
>> >> > > get all the partitions of the topic anyway - then you might as well
>> >> > > subscribe using "assign" call instead of "subscribe" call. That will
>> >> make
>> >> > > at least your code cleaner and I do not think you are gaining
>> anything
>> >> > with
>> >> > > the listener group functionality anyway?
>> >> > >
>> >> > > - Mansi.
>> >> > >
>> >> > >
>> >> > >
>> >> > > On Wed, Mar 9, 2016 at 8:35 PM, Guozhang Wang <wangg...@gmail.com>
>> >> > wrote:
>> >> > >
>> >> > >> In order to do anything meaningful with the consumer itself in
>> >> rebalance
>> >> > >> callback (e.g. commit offset), you would need to hold on the
>> consumer
>> >> > >> reference; admittedly it sounds a bit awkward, but by design we
>> choose
>> >> > to
>> >> > >> not enforce it in the interface itself.
>> >> > >>
>> >> > >> Guozhang
>> >> > >>
>> >> > >> On Wed, Mar 9, 2016 at 3:39 PM, Cody Koeninger <c...@koeninger.org
>> >
>> >> > wrote:
>> >> > >>
>> >> > >> > So what about my comments regarding the consumer rebalance
>> listener
>> >> > >> > interface not providing access to a consumer?  I can probably
>> work
>> >> > around
>> >> > >> > it, but it seems odd.
>> >> > >> > On Mar 9, 2016 5:04 PM, "Guozhang Wang" <wangg...@gmail.com>
>> wrote:
>> >> > >> >
>> >> > >> > > One thing proposed by Jason:
>> >> > >> > >
>> >> > >> > > If you want to only reset offset upon initialization, and by
>> >> > >> > initialization
>> >> > >> > > you mean "no committed offset", you can do sth. like the
>> following
>> >> > in
>> >> > >> > > rebalance callback.
>> >> > >> > >
>> >> > >> > >                 @Override
>> >> > >> > >
>> >> > >> > >                 public void
>> >> > >> > onPartitionsAssigned(Collection<TopicPartition>
>> >> > >> > > partitions) {
>> >> > >> > >
>> >> > >> > >                     for (TopicPartition partition : partitions)
>> >> > >> > >
>> >> > >> > >                         if (consumer.committed(partition) ==
>> null)
>> >> > >> > >
>> >> > >> > >
>>  consumer.seekToBeginning(partition);
>> >> > >> > >
>> >> > >> > >                 }
>> >> > >> > >
>> >> > >> > >
>> >> > >> > > Guozhang
>> >> > >> > >
>> >> > >> > > On Wed, Mar 9, 2016 at 2:11 PM, Guozhang Wang <
>> wangg...@gmail.com
>> >> >
>> >> > >> > wrote:
>> >> > >> > >
>> >> > >> > > > Filed https://issues.apache.org/jira/browse/KAFKA-3370.
>> >> > >> > > >
>> >> > >> > > > On Wed, Mar 9, 2016 at 1:11 PM, Cody Koeninger <
>> >> > c...@koeninger.org>
>> >> > >> > > wrote:
>> >> > >> > > >
>> >> > >> > > >> That sounds like an interesting way of addressing the
>> problem,
>> >> > can
>> >> > >> > > >> continue further discussions on the JIRA
>> >> > >> > > >>
>> >> > >> > > >>
>> >> > >> > > >>
>> >> > >> > > >> On Wed, Mar 9, 2016 at 2:59 PM, Guozhang Wang <
>> >> > wangg...@gmail.com>
>> >> > >> > > wrote:
>> >> > >> > > >> > Cody:
>> >> > >> > > >> >
>> >> > >> > > >> > More specifically, you do not need the "listTopics"
>> function
>> >> if
>> >> > >> you
>> >> > >> > > >> already
>> >> > >> > > >> > know your subscribed topics, just use "partitionsFor" is
>> >> > >> sufficient.
>> >> > >> > > >> >
>> >> > >> > > >> > About the fix, I'm thinking of adding two more options in
>> the
>> >> > >> > > >> > auto.offset.rest, say namely "earliest-on-start" and
>> >> > >> > > "latest-on-start",
>> >> > >> > > >> > which sets the reset position ONLY at starting up. The
>> reason
>> >> > is
>> >> > >> > that
>> >> > >> > > >> the
>> >> > >> > > >> > seekToXX was actually not designed to do such
>> initialization
>> >> > but
>> >> > >> for
>> >> > >> > > >> > calling during the lifetime of the consumer, and we'd
>> better
>> >> > >> provide
>> >> > >> > > the
>> >> > >> > > >> > right solution to do so.
>> >> > >> > > >> >
>> >> > >> > > >> > I can file the JIRA right away and start further
>> discussions
>> >> > >> there.
>> >> > >> > > But
>> >> > >> > > >> let
>> >> > >> > > >> > me know if you have any other ideas.
>> >> > >> > > >> >
>> >> > >> > > >> > Guozhang
>> >> > >> > > >> >
>> >> > >> > > >> > On Wed, Mar 9, 2016 at 12:25 PM, Cody Koeninger <
>> >> > >> c...@koeninger.org
>> >> > >> > >
>> >> > >> > > >> wrote:
>> >> > >> > > >> >
>> >> > >> > > >> >> Yeah, I think I understood what you were saying.  What
>> I'm
>> >> > saying
>> >> > >> > is
>> >> > >> > > >> >> that if there were a way to just fetch metadata without
>> >> doing
>> >> > the
>> >> > >> > > rest
>> >> > >> > > >> >> of the work poll() does, it wouldn't be necessary.  I
>> guess
>> >> I
>> >> > can
>> >> > >> > do
>> >> > >> > > >> >> listTopics to get all metadata for all topics and then
>> parse
>> >> > it.
>> >> > >> > > >> >>
>> >> > >> > > >> >> Regarding running a single instance, that is the case for
>> >> what
>> >> > >> I'm
>> >> > >> > > >> >> talking about.
>> >> > >> > > >> >>
>> >> > >> > > >> >> On Wed, Mar 9, 2016 at 2:02 PM, Guozhang Wang <
>> >> > >> wangg...@gmail.com>
>> >> > >> > > >> wrote:
>> >> > >> > > >> >> > Cody,
>> >> > >> > > >> >> >
>> >> > >> > > >> >> > What I meant for a special case of `seekToXX` is that,
>> >> today
>> >> > >> when
>> >> > >> > > the
>> >> > >> > > >> >> > function is called with no partition parameters. It
>> will
>> >> > try to
>> >> > >> > > >> execute
>> >> > >> > > >> >> the
>> >> > >> > > >> >> > logic on all "assigned" partitions for the consumer.
>> And
>> >> > once
>> >> > >> > that
>> >> > >> > > is
>> >> > >> > > >> >> done,
>> >> > >> > > >> >> > the subsequent poll() will not throw the exception
>> since
>> >> it
>> >> > >> knows
>> >> > >> > > >> those
>> >> > >> > > >> >> > partitions needs to reset offsets.
>> >> > >> > > >> >> >
>> >> > >> > > >> >> > However for your case, there is no assigned partitions
>> >> yet,
>> >> > and
>> >> > >> > > hence
>> >> > >> > > >> >> > `seekToXX` will not take effects on any partitions. The
>> >> > >> > assignment
>> >> > >> > > is
>> >> > >> > > >> >> > wrapped in the poll() call as you mentioned. And one
>> way
>> >> to
>> >> > >> solve
>> >> > >> > > it
>> >> > >> > > >> is
>> >> > >> > > >> >> to
>> >> > >> > > >> >> > let the `seekToXX()` with no parameters do the
>> >> coordination
>> >> > and
>> >> > >> > get
>> >> > >> > > >> the
>> >> > >> > > >> >> > assigned partitions if there are any subscribed
>> topics, so
>> >> > that
>> >> > >> > the
>> >> > >> > > >> >> > subsequent poll() will know those partitions need
>> >> resetting
>> >> > >> > > offsets.
>> >> > >> > > >> Does
>> >> > >> > > >> >> > that make sense?
>> >> > >> > > >> >> >
>> >> > >> > > >> >> > As for now another way I can think of is to get the
>> >> > partition
>> >> > >> > info
>> >> > >> > > >> >> > beforehand and call `seekToBeginning` on all
>> partitions.
>> >> But
>> >> > >> that
>> >> > >> > > >> only
>> >> > >> > > >> >> > works if the consumer knows it is going to get all the
>> >> > >> partitions
>> >> > >> > > >> >> assigned
>> >> > >> > > >> >> > to itself (i.e. you are only running a single
>> instance).
>> >> > >> > > >> >> >
>> >> > >> > > >> >> > Guozhang
>> >> > >> > > >> >> >
>> >> > >> > > >> >> >
>> >> > >> > > >> >> > On Wed, Mar 9, 2016 at 6:22 AM, Cody Koeninger <
>> >> > >> > c...@koeninger.org
>> >> > >> > > >
>> >> > >> > > >> >> wrote:
>> >> > >> > > >> >> >
>> >> > >> > > >> >> >> Another unfortunate thing about
>> ConsumerRebalanceListener
>> >> > is
>> >> > >> > that
>> >> > >> > > in
>> >> > >> > > >> >> >> order to do meaningful work in the callback, you need
>> a
>> >> > >> > reference
>> >> > >> > > to
>> >> > >> > > >> >> >> the consumer that called it.  But that reference isn't
>> >> > >> provided
>> >> > >> > to
>> >> > >> > > >> the
>> >> > >> > > >> >> >> callback, which means the listener implementation
>> needs
>> >> to
>> >> > >> hold
>> >> > >> > a
>> >> > >> > > >> >> >> reference to the consumer.  Seems like this makes it
>> >> > >> > unnecessarily
>> >> > >> > > >> >> >> awkward to serialize or provide a 0 arg constructor
>> for
>> >> the
>> >> > >> > > >> listener.
>> >> > >> > > >> >> >>
>> >> > >> > > >> >> >> On Wed, Mar 9, 2016 at 7:28 AM, Cody Koeninger <
>> >> > >> > > c...@koeninger.org>
>> >> > >> > > >> >> wrote:
>> >> > >> > > >> >> >> > I thought about ConsumerRebalanceListener, but
>> seeking
>> >> to
>> >> > >> the
>> >> > >> > > >> >> >> > beginning any time there's a rebalance for whatever
>> >> > reason
>> >> > >> is
>> >> > >> > > not
>> >> > >> > > >> >> >> > necessarily the same thing as seeking to the
>> beginning
>> >> > >> before
>> >> > >> > > >> first
>> >> > >> > > >> >> >> > starting the consumer.
>> >> > >> > > >> >> >> >
>> >> > >> > > >> >> >> > On Wed, Mar 9, 2016 at 2:24 AM, Kamal C <
>> >> > >> > kamaltar...@gmail.com>
>> >> > >> > > >> >> wrote:
>> >> > >> > > >> >> >> >> Cody,
>> >> > >> > > >> >> >> >>
>> >> > >> > > >> >> >> >> Use ConsumerRebalanceListener to achieve that,
>> >> > >> > > >> >> >> >>
>> >> > >> > > >> >> >> >> ConsumerRebalanceListener listener = new
>> >> > >> > > >> ConsumerRebalanceListener()
>> >> > >> > > >> >> {
>> >> > >> > > >> >> >> >>
>> >> > >> > > >> >> >> >>             @Override
>> >> > >> > > >> >> >> >>             public void
>> >> > >> > > >> >> onPartitionsRevoked(Collection<TopicPartition>
>> >> > >> > > >> >> >> >> partitions) {
>> >> > >> > > >> >> >> >>             }
>> >> > >> > > >> >> >> >>
>> >> > >> > > >> >> >> >>             @Override
>> >> > >> > > >> >> >> >>             public void
>> >> > >> > > >> >> onPartitionsAssigned(Collection<TopicPartition>
>> >> > >> > > >> >> >> >> partitions) {
>> >> > >> > > >> >> >> >>
>> >> > >> >  consumer.seekToBeginning(partitions.toArray(new
>> >> > >> > > >> >> >> >> TopicPartition[0]));
>> >> > >> > > >> >> >> >>             }
>> >> > >> > > >> >> >> >>         };
>> >> > >> > > >> >> >> >>
>> >> > >> > > >> >> >> >> consumer.subscribe(topics, listener);
>> >> > >> > > >> >> >> >>
>> >> > >> > > >> >> >> >> On Wed, Mar 9, 2016 at 12:05 PM, Cody Koeninger <
>> >> > >> > > >> c...@koeninger.org>
>> >> > >> > > >> >> >> wrote:
>> >> > >> > > >> >> >> >>
>> >> > >> > > >> >> >> >>> That suggestion doesn't work, for pretty much the
>> >> same
>> >> > >> > reason
>> >> > >> > > -
>> >> > >> > > >> at
>> >> > >> > > >> >> the
>> >> > >> > > >> >> >> >>> time poll is first called, there is no reset
>> policy
>> >> > and no
>> >> > >> > > >> committed
>> >> > >> > > >> >> >> >>> offset, so NoOffsetForPartitionException is thrown
>> >> > >> > > >> >> >> >>>
>> >> > >> > > >> >> >> >>> I feel like the underlying problem isn't so much
>> that
>> >> > >> > > seekToEnd
>> >> > >> > > >> >> needs
>> >> > >> > > >> >> >> >>> special case behavior.  It's more that  topic
>> >> metadata
>> >> > >> > > fetches,
>> >> > >> > > >> >> >> >>> consumer position fetches, and message fetches are
>> >> all
>> >> > >> > lumped
>> >> > >> > > >> >> together
>> >> > >> > > >> >> >> >>> under a single poll() call, with no way to do them
>> >> > >> > > individually
>> >> > >> > > >> if
>> >> > >> > > >> >> >> >>> necessary.
>> >> > >> > > >> >> >> >>>
>> >> > >> > > >> >> >> >>> What does "work" in this situation is to just
>> catch
>> >> the
>> >> > >> > > >> exception
>> >> > >> > > >> >> >> >>> (which leaves the consumer in a state where topics
>> >> are
>> >> > >> > > >> assigned) and
>> >> > >> > > >> >> >> >>> then seek.  But that is not exactly an elegant
>> >> > interface.
>> >> > >> > > >> >> >> >>>
>> >> > >> > > >> >> >> >>>     consumer.subscribe(topics)
>> >> > >> > > >> >> >> >>>     try {
>> >> > >> > > >> >> >> >>>       consumer.poll(0)
>> >> > >> > > >> >> >> >>>     } catch {
>> >> > >> > > >> >> >> >>>       case x: Throwable =>
>> >> > >> > > >> >> >> >>>     }
>> >> > >> > > >> >> >> >>>     consumer.seekToBeginning()
>> >> > >> > > >> >> >> >>>     consumer.poll(0)
>> >> > >> > > >> >> >> >>>
>> >> > >> > > >> >> >> >>>
>> >> > >> > > >> >> >> >>>
>> >> > >> > > >> >> >> >>>
>> >> > >> > > >> >> >> >>> On Tue, Mar 8, 2016 at 11:22 PM, Guozhang Wang <
>> >> > >> > > >> wangg...@gmail.com>
>> >> > >> > > >> >> >> wrote:
>> >> > >> > > >> >> >> >>> > Hi Cody,
>> >> > >> > > >> >> >> >>> >
>> >> > >> > > >> >> >> >>> > The problem with that code is in
>> >> `seekToBeginning()`
>> >> > >> > > followed
>> >> > >> > > >> by
>> >> > >> > > >> >> >> >>> > `subscribe(topic)`.
>> >> > >> > > >> >> >> >>> >
>> >> > >> > > >> >> >> >>> > Since `subscribe` call is lazy evaluated, by the
>> >> time
>> >> > >> > > >> >> >> `seekToBeginning()`
>> >> > >> > > >> >> >> >>> > is called no partition is assigned yet, and
>> hence
>> >> it
>> >> > is
>> >> > >> > > >> >> effectively
>> >> > >> > > >> >> >> an
>> >> > >> > > >> >> >> >>> > no-op.
>> >> > >> > > >> >> >> >>> >
>> >> > >> > > >> >> >> >>> > Try
>> >> > >> > > >> >> >> >>> >
>> >> > >> > > >> >> >> >>> >     consumer.subscribe(topics)
>> >> > >> > > >> >> >> >>> >     consumer.poll(0);  // get assigned
>> partitions
>> >> > >> > > >> >> >> >>> >     consumer.seekToBeginning()
>> >> > >> > > >> >> >> >>> >     consumer.poll(0)
>> >> > >> > > >> >> >> >>> >
>> >> > >> > > >> >> >> >>> > to see if that works.
>> >> > >> > > >> >> >> >>> >
>> >> > >> > > >> >> >> >>> > I think it is a valid issue that can be fixed in
>> >> the
>> >> > new
>> >> > >> > > >> consumer
>> >> > >> > > >> >> >> that,
>> >> > >> > > >> >> >> >>> > upon calling seekToEnd/Beginning with no
>> parameter,
>> >> > >> while
>> >> > >> > no
>> >> > >> > > >> >> >> assigned is
>> >> > >> > > >> >> >> >>> > done yet, do the coordination behind the scene;
>> it
>> >> > will
>> >> > >> > > though
>> >> > >> > > >> >> >> change the
>> >> > >> > > >> >> >> >>> > behavior of the functions as they are no longer
>> >> > always
>> >> > >> > > lazily
>> >> > >> > > >> >> >> evaluated.
>> >> > >> > > >> >> >> >>> >
>> >> > >> > > >> >> >> >>> >
>> >> > >> > > >> >> >> >>> > Guozhang
>> >> > >> > > >> >> >> >>> >
>> >> > >> > > >> >> >> >>> >
>> >> > >> > > >> >> >> >>> > On Tue, Mar 8, 2016 at 2:08 PM, Cody Koeninger <
>> >> > >> > > >> >> c...@koeninger.org>
>> >> > >> > > >> >> >> >>> wrote:
>> >> > >> > > >> >> >> >>> >
>> >> > >> > > >> >> >> >>> >> Using the 0.9 consumer, I would like to start
>> >> > consuming
>> >> > >> > at
>> >> > >> > > >> the
>> >> > >> > > >> >> >> >>> >> beginning or end, without specifying
>> >> > auto.offset.reset.
>> >> > >> > > >> >> >> >>> >>
>> >> > >> > > >> >> >> >>> >> This does not seem to be possible:
>> >> > >> > > >> >> >> >>> >>
>> >> > >> > > >> >> >> >>> >>     val kafkaParams = Map[String, Object](
>> >> > >> > > >> >> >> >>> >>       "bootstrap.servers" ->
>> >> > >> > > conf.getString("kafka.brokers"),
>> >> > >> > > >> >> >> >>> >>       "key.deserializer" ->
>> >> > >> classOf[StringDeserializer],
>> >> > >> > > >> >> >> >>> >>       "value.deserializer" ->
>> >> > >> > classOf[StringDeserializer],
>> >> > >> > > >> >> >> >>> >>       "group.id" -> "example",
>> >> > >> > > >> >> >> >>> >>       "auto.offset.reset" -> "none"
>> >> > >> > > >> >> >> >>> >>     ).asJava
>> >> > >> > > >> >> >> >>> >>     val topics =
>> >> > >> > > >> >> >>
>> conf.getString("kafka.topics").split(",").toList.asJava
>> >> > >> > > >> >> >> >>> >>     val consumer = new KafkaConsumer[String,
>> >> > >> > > >> String](kafkaParams)
>> >> > >> > > >> >> >> >>> >>     consumer.subscribe(topics)
>> >> > >> > > >> >> >> >>> >>     consumer.seekToBeginning()
>> >> > >> > > >> >> >> >>> >>     consumer.poll(0)
>> >> > >> > > >> >> >> >>> >>
>> >> > >> > > >> >> >> >>> >>
>> >> > >> > > >> >> >> >>> >> Results in:
>> >> > >> > > >> >> >> >>> >>
>> >> > >> > > >> >> >> >>> >> Exception in thread "main"
>> >> > >> > > >> >> >> >>> >>
>> >> > >> > > >>
>> >> org.apache.kafka.clients.consumer.NoOffsetForPartitionException:
>> >> > >> > > >> >> >> >>> >> Undefined offset with no reset policy for
>> >> partition:
>> >> > >> > > >> testtwo-4
>> >> > >> > > >> >> >> >>> >>         at
>> >> > >> > > >> >> >> >>> >>
>> >> > >> > > >> >> >> >>>
>> >> > >> > > >> >> >>
>> >> > >> > > >> >>
>> >> > >> > > >>
>> >> > >> > >
>> >> > >> >
>> >> > >>
>> >> >
>> >>
>> org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:288)
>> >> > >> > > >> >> >> >>> >>         at
>> >> > >> > > >> >> >> >>> >>
>> >> > >> > > >> >> >> >>>
>> >> > >> > > >> >> >>
>> >> > >> > > >> >>
>> >> > >> > > >>
>> >> > >> > >
>> >> > >> >
>> >> > >>
>> >> >
>> >>
>> org.apache.kafka.clients.consumer.internals.Fetcher.updateFetchPositions(Fetcher.java:167)
>> >> > >> > > >> >> >> >>> >>         at
>> >> > >> > > >> >> >> >>> >>
>> >> > >> > > >> >> >> >>>
>> >> > >> > > >> >> >>
>> >> > >> > > >> >>
>> >> > >> > > >>
>> >> > >> > >
>> >> > >> >
>> >> > >>
>> >> >
>> >>
>> org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1302)
>> >> > >> > > >> >> >> >>> >>         at
>> >> > >> > > >> >> >> >>> >>
>> >> > >> > > >> >> >> >>>
>> >> > >> > > >> >> >>
>> >> > >> > > >> >>
>> >> > >> > > >>
>> >> > >> > >
>> >> > >> >
>> >> > >>
>> >> >
>> >>
>> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:895)
>> >> > >> > > >> >> >> >>> >>         at
>> >> > >> > > >> >> >> >>> >>
>> >> > >> > > >> >> >> >>>
>> >> > >> > > >> >> >>
>> >> > >> > > >> >>
>> >> > >> > > >>
>> >> > >> > >
>> >> > >> >
>> >> > >>
>> >> >
>> >>
>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
>> >> > >> > > >> >> >> >>> >>         at
>> >> > >> > > >> >> >>
>> >> > example.BasicKafkaConsumer$.main(BasicKafkaConsumer.scala:25)
>> >> > >> > > >> >> >> >>> >>
>> >> > >> > > >> >> >> >>> >>
>> >> > >> > > >> >> >> >>> >> I'm assuming this is because, at the time
>> >> > >> > seekToBeginning()
>> >> > >> > > >> is
>> >> > >> > > >> >> >> called,
>> >> > >> > > >> >> >> >>> >> subscriptions.assignedPartitions isn't
>> populated.
>> >> > But
>> >> > >> > > >> polling in
>> >> > >> > > >> >> >> >>> >> order to assign topicpartitions results in an
>> >> error,
>> >> > >> > which
>> >> > >> > > >> >> creates a
>> >> > >> > > >> >> >> >>> >> chicken-or-the-egg situation.
>> >> > >> > > >> >> >> >>> >>
>> >> > >> > > >> >> >> >>> >> I don't want to set auto.offset.reset, because
>> I
>> >> > want a
>> >> > >> > > hard
>> >> > >> > > >> >> error
>> >> > >> > > >> >> >> if
>> >> > >> > > >> >> >> >>> >> the offsets are out of range at any other time
>> >> > during
>> >> > >> > > >> >> consumption.
>> >> > >> > > >> >> >> >>> >>
>> >> > >> > > >> >> >> >>> >
>> >> > >> > > >> >> >> >>> >
>> >> > >> > > >> >> >> >>> >
>> >> > >> > > >> >> >> >>> > --
>> >> > >> > > >> >> >> >>> > -- Guozhang
>> >> > >> > > >> >> >> >>>
>> >> > >> > > >> >> >>
>> >> > >> > > >> >> >
>> >> > >> > > >> >> >
>> >> > >> > > >> >> >
>> >> > >> > > >> >> > --
>> >> > >> > > >> >> > -- Guozhang
>> >> > >> > > >> >>
>> >> > >> > > >> >
>> >> > >> > > >> >
>> >> > >> > > >> >
>> >> > >> > > >> > --
>> >> > >> > > >> > -- Guozhang
>> >> > >> > > >>
>> >> > >> > > >
>> >> > >> > > >
>> >> > >> > > >
>> >> > >> > > > --
>> >> > >> > > > -- Guozhang
>> >> > >> > > >
>> >> > >> > >
>> >> > >> > >
>> >> > >> > >
>> >> > >> > > --
>> >> > >> > > -- Guozhang
>> >> > >> > >
>> >> > >> >
>> >> > >>
>> >> > >>
>> >> > >>
>> >> > >> --
>> >> > >> -- Guozhang
>> >> > >>
>> >> >
>> >>
>> >
>> >
>> >
>> > --
>> > -- Guozhang
>>
>
>
>
> --
> -- Guozhang

Reply via email to