Ah, that makes more sense. I have no idea about the limitations of your use
case, but maybe you could expose a different interface to users.

interface RebalanceListener {
  void onPartitionsAssigned(Consumer<K, V> consumer,
Collection<TopicPartition> partitions);
  void onPartitionsRevoked(Consumer<K, V> consumer,
Collection<TopicPartition> partitions);
}

Then you can adapt it internally in the call to subscribe(). Would that
work?

Also, we already have partitionsFor() to fetch topic metadata. And you can
look at the pause/resume API which lets you call poll() without consuming
messages. We'd probably want to understand why those are insufficient
before considering new APIs.

-Jason

On Mon, Mar 14, 2016 at 12:17 PM, Cody Koeninger <c...@koeninger.org> wrote:

> Regarding the rebalance listener, in the case of the spark
> integration, it is possible a job can fail and be restarted from
> checkpoint in a new jvm.  That means that you need to be able to
> reconstruct objects.  Any reasonable rebalance listener can't have a
> 0-arg constructor, because it needs a reference to the consumer in
> order to do anything useful.  I can get around this by making the user
> provide a 0-arg function to return a fully configured + subscribed
> Kafka consumer, so that they get to do the dance of constructing a
> consumer before constructing the listener.  I don't think the current
> rebalance listener is a dealbreaker, I just think it is an
> unnecessarily awkward API.
>
> Regarding poll(), I honestly don't see what the problem is with
> keeping poll() behavior as is, but also having public methods to e.g.
> fetch metadata, for people that need that functionality without
> actually consuming messages.
>
> On Mon, Mar 14, 2016 at 1:01 PM, Jason Gustafson <ja...@confluent.io>
> wrote:
> > Late arrival to this discussion. I'm not really sure I see the problem
> with
> > accessing the consumer in the rebalance listener. Before we passed the
> > consumer instance as a separate argument, but that was only because the
> > rebalance listener had to be passed by classname before a reference to
> the
> > consumer was available. After we changed the API to pass it in
> subscribe()
> > instead, getting a reference to the consumer shouldn't be a problem.
> Maybe
> > I'm missing something?
> >
> > As for poll() doing everything... It's definitely caused some confusion,
> > but I'm a little doubtful that trying to split out the functionality is
> > going to help as much as people think. A heartbeat() API has come up
> > several times, for example, but you have to figure out how it's going to
> > affect rebalancing. And rebalancing affects which partitions need to be
> > fetched or can be committed. They're all so intertwined that I'm not sure
> > you can divide them up without creating a bigger problem than the one
> > you're trying to solve. At this point, with 0.10 shortly on the way, it
> > seems unlikely that incompatible changes to the API will be accepted.
> > However, if someone can propose a compatible solution which addresses
> some
> > of the concerns mentioned, we'd love to hear about it!
> >
> > -Jason
> >
> > On Mon, Mar 14, 2016 at 9:21 AM, Cody Koeninger <c...@koeninger.org>
> wrote:
> >
> >> Honestly the fact that everything is hidden inside poll() has been
> >> confusing people since last year, e.g.
> >>
> >> https://issues.apache.org/jira/browse/KAFKA-2359
> >>
> >> I can try to formulate a KIP for this, but it seems clear that I'm not
> >> the only one giving this feedback, and I may not understand all the
> >> other use cases that have been brought up.
> >>
> >> On Sun, Mar 13, 2016 at 3:09 PM, Guozhang Wang <wangg...@gmail.com>
> wrote:
> >> > Cody,
> >> >
> >> > We do not have an umbrella JIRA for this, but rather a case-by-case
> JIRA
> >> > ticket / KIP for API changes in consumer.
> >> >
> >> > If you feel strong about some specific change on the consumer API,
> please
> >> > feel free to create a new KIP with the detailed motivation and
> proposed
> >> > modifications.
> >> >
> >> > Guozhang
> >> >
> >> > On Fri, Mar 11, 2016 at 12:28 PM, Cody Koeninger <c...@koeninger.org>
> >> wrote:
> >> >
> >> >> Is there a KIP or Jira related to " working on improving these cases
> >> >> with improved APIs " ?
> >> >>
> >> >> I saw that there was some discussion of it in KIP-41, but that seemed
> >> >> to have been resolved in favor of keeping everything inside of poll()
> >> >>
> >> >> On Fri, Mar 11, 2016 at 11:17 AM, Guozhang Wang <wangg...@gmail.com>
> >> >> wrote:
> >> >> > Cody, Mansi:
> >> >> >
> >> >> > All good points! Let me try to answer them one-by-one.
> >> >> >
> >> >> > About this specific issue, as I suggested in the JIRA we can
> separate
> >> the
> >> >> > case about resetting offset upon initializing a partition to fetch,
> >> from
> >> >> > the case that fetching offset out-of-range in the auto.offset.reset
> >> >> config.
> >> >> > These two scenarios are indeed quite different and it's reasonable
> >> >> treating
> >> >> > them differently.
> >> >> >
> >> >> > About passing a consumer context to the rebalance callback's
> >> constructor,
> >> >> > we left it for user's flexibility: if you want to use Kafka to
> commit
> >> >> > offsets, for example, then you pass the consumer reference to the
> >> >> callback;
> >> >> > if you use an external service to store offsets, you can pass a
> JDBC
> >> >> > connector, for example, to the callback; for some data mirroring
> you
> >> can
> >> >> > even pass in another producer client into it. Always enforcing the
> >> >> consumer
> >> >> > context could be convenient (i.e. you do not need to pass in the
> >> argument
> >> >> > to the constructor yourself) for some use cases, but not
> necessarily
> >> all.
> >> >> >
> >> >> > About wrapping coordination protocols (partition assignment,
> >> heartbeat)
> >> >> > inside "poll()" behind the scene, we implemented the APIs in this
> way
> >> in
> >> >> > order to abstract the underlying details from the users, and also
> to
> >> >> > provide a simple "single-thread-poll-loop" design pattern in the
> new
> >> >> > Consumer. We realized that it does actually make some of the use
> cases
> >> >> more
> >> >> > awkward, and are working on improving these cases with improved
> APIs
> >> as
> >> >> > well. Let us know if you have any suggestions about this.
> >> >> >
> >> >> > Guozhang
> >> >> >
> >> >> >
> >> >> >
> >> >> >
> >> >> >
> >> >> >
> >> >> > On Thu, Mar 10, 2016 at 7:53 AM, Mansi Shah <
> mansis...@maprtech.com>
> >> >> wrote:
> >> >> >
> >> >> >> I second the need for having a consumer context passed to
> rebalance
> >> >> >> callback. I have ran into issues several times because of that.
> >> >> >>
> >> >> >> About - subscribe vs assign - I have not read through your spark
> code
> >> >> yet
> >> >> >> (will do by eod), so I am not sure what you mean (other than I do
> >> agree
> >> >> >> that new partitions should be consumed automatically). I guess we
> can
> >> >> >> continue this discussion on the spark list then :-)
> >> >> >>
> >> >> >> Thanks
> >> >> >> Mansi.
> >> >> >>
> >> >> >> On Thu, Mar 10, 2016 at 7:43 AM, Cody Koeninger <
> c...@koeninger.org>
> >> >> >> wrote:
> >> >> >>
> >> >> >> > Mansi, I'd agree that the fact that everything is tied up in
> poll
> >> >> >> > seems like the source of the awkward behavior.
> >> >> >> >
> >> >> >> > Regarding assign vs subscribe, most people using the spark
> >> integration
> >> >> >> > are just going to want to provide a topic name, not go figure
> out a
> >> >> >> > bunch of partitions.  They're also going to be surprised if
> things
> >> >> >> > suddenly blow up once a partition is added, or that partition
> >> doesn't
> >> >> >> > start being consumed (we already have that second issue today).
> >> >> >> >
> >> >> >> > Thats why separating the behavior of auto offset reset seems
> like
> >> the
> >> >> >> > best idea I've heard so far.
> >> >> >> >
> >> >> >> > Consumer rebalance listeners are still probably going to be
> >> necessary
> >> >> >> > for people who are storing offsets externally.
> >> >> >> >
> >> >> >> > On Thu, Mar 10, 2016 at 9:27 AM, Mansi Shah <
> >> mansis...@maprtech.com>
> >> >> >> > wrote:
> >> >> >> > > Guozhang
> >> >> >> > >
> >> >> >> > > Sorry for joining the party a little late. I have been
> thinking
> >> >> about
> >> >> >> > this
> >> >> >> > > whole awkward behavior of having to call poll(0) to actually
> make
> >> >> the
> >> >> >> > > underlying subscriptions take effect. Is the core reason for
> this
> >> >> >> design
> >> >> >> > > the fact that poll is also the actual heartbeat and you want
> to
> >> make
> >> >> >> the
> >> >> >> > > listener group assignments through poll - so that timeouts and
> >> >> >> > > reassignments can all go through poll? So I think clubbing
> >> liveness
> >> >> >> with
> >> >> >> > > poll (which in effect clubs consumer group assignments and
> hence
> >> >> >> metadata
> >> >> >> > > fetch with poll) is the real cause of this design. Were there
> >> issues
> >> >> >> > where
> >> >> >> > > you were seeing active consumers not calling poll that led to
> >> this
> >> >> >> design
> >> >> >> > > choice? I tried to look for a relevant JIRA but could not find
> >> one -
> >> >> >> can
> >> >> >> > > you please point me to something if you have it handy?
> >> >> >> > >
> >> >> >> > > Btw this would also means that your proposal to do the actual
> >> >> >> assignments
> >> >> >> > > through seek might not be ideal since there can still be
> >> indefinite
> >> >> >> time
> >> >> >> > > between seek and poll (just like between subscribe and poll)
> and
> >> the
> >> >> >> > > consumer could timeout even before the first poll is called?
> >> >> >> > >
> >> >> >> > >
> >> >> >> > > @Cody in your case if you really have only one consumer and
> it is
> >> >> going
> >> >> >> > to
> >> >> >> > > get all the partitions of the topic anyway - then you might as
> >> well
> >> >> >> > > subscribe using "assign" call instead of "subscribe" call.
> That
> >> will
> >> >> >> make
> >> >> >> > > at least your code cleaner and I do not think you are gaining
> >> >> anything
> >> >> >> > with
> >> >> >> > > the listener group functionality anyway?
> >> >> >> > >
> >> >> >> > > - Mansi.
> >> >> >> > >
> >> >> >> > >
> >> >> >> > >
> >> >> >> > > On Wed, Mar 9, 2016 at 8:35 PM, Guozhang Wang <
> >> wangg...@gmail.com>
> >> >> >> > wrote:
> >> >> >> > >
> >> >> >> > >> In order to do anything meaningful with the consumer itself
> in
> >> >> >> rebalance
> >> >> >> > >> callback (e.g. commit offset), you would need to hold on the
> >> >> consumer
> >> >> >> > >> reference; admittedly it sounds a bit awkward, but by design
> we
> >> >> choose
> >> >> >> > to
> >> >> >> > >> not enforce it in the interface itself.
> >> >> >> > >>
> >> >> >> > >> Guozhang
> >> >> >> > >>
> >> >> >> > >> On Wed, Mar 9, 2016 at 3:39 PM, Cody Koeninger <
> >> c...@koeninger.org
> >> >> >
> >> >> >> > wrote:
> >> >> >> > >>
> >> >> >> > >> > So what about my comments regarding the consumer rebalance
> >> >> listener
> >> >> >> > >> > interface not providing access to a consumer?  I can
> probably
> >> >> work
> >> >> >> > around
> >> >> >> > >> > it, but it seems odd.
> >> >> >> > >> > On Mar 9, 2016 5:04 PM, "Guozhang Wang" <
> wangg...@gmail.com>
> >> >> wrote:
> >> >> >> > >> >
> >> >> >> > >> > > One thing proposed by Jason:
> >> >> >> > >> > >
> >> >> >> > >> > > If you want to only reset offset upon initialization,
> and by
> >> >> >> > >> > initialization
> >> >> >> > >> > > you mean "no committed offset", you can do sth. like the
> >> >> following
> >> >> >> > in
> >> >> >> > >> > > rebalance callback.
> >> >> >> > >> > >
> >> >> >> > >> > >                 @Override
> >> >> >> > >> > >
> >> >> >> > >> > >                 public void
> >> >> >> > >> > onPartitionsAssigned(Collection<TopicPartition>
> >> >> >> > >> > > partitions) {
> >> >> >> > >> > >
> >> >> >> > >> > >                     for (TopicPartition partition :
> >> partitions)
> >> >> >> > >> > >
> >> >> >> > >> > >                         if
> (consumer.committed(partition) ==
> >> >> null)
> >> >> >> > >> > >
> >> >> >> > >> > >
> >> >>  consumer.seekToBeginning(partition);
> >> >> >> > >> > >
> >> >> >> > >> > >                 }
> >> >> >> > >> > >
> >> >> >> > >> > >
> >> >> >> > >> > > Guozhang
> >> >> >> > >> > >
> >> >> >> > >> > > On Wed, Mar 9, 2016 at 2:11 PM, Guozhang Wang <
> >> >> wangg...@gmail.com
> >> >> >> >
> >> >> >> > >> > wrote:
> >> >> >> > >> > >
> >> >> >> > >> > > > Filed https://issues.apache.org/jira/browse/KAFKA-3370
> .
> >> >> >> > >> > > >
> >> >> >> > >> > > > On Wed, Mar 9, 2016 at 1:11 PM, Cody Koeninger <
> >> >> >> > c...@koeninger.org>
> >> >> >> > >> > > wrote:
> >> >> >> > >> > > >
> >> >> >> > >> > > >> That sounds like an interesting way of addressing the
> >> >> problem,
> >> >> >> > can
> >> >> >> > >> > > >> continue further discussions on the JIRA
> >> >> >> > >> > > >>
> >> >> >> > >> > > >>
> >> >> >> > >> > > >>
> >> >> >> > >> > > >> On Wed, Mar 9, 2016 at 2:59 PM, Guozhang Wang <
> >> >> >> > wangg...@gmail.com>
> >> >> >> > >> > > wrote:
> >> >> >> > >> > > >> > Cody:
> >> >> >> > >> > > >> >
> >> >> >> > >> > > >> > More specifically, you do not need the "listTopics"
> >> >> function
> >> >> >> if
> >> >> >> > >> you
> >> >> >> > >> > > >> already
> >> >> >> > >> > > >> > know your subscribed topics, just use
> "partitionsFor"
> >> is
> >> >> >> > >> sufficient.
> >> >> >> > >> > > >> >
> >> >> >> > >> > > >> > About the fix, I'm thinking of adding two more
> options
> >> in
> >> >> the
> >> >> >> > >> > > >> > auto.offset.rest, say namely "earliest-on-start" and
> >> >> >> > >> > > "latest-on-start",
> >> >> >> > >> > > >> > which sets the reset position ONLY at starting up.
> The
> >> >> reason
> >> >> >> > is
> >> >> >> > >> > that
> >> >> >> > >> > > >> the
> >> >> >> > >> > > >> > seekToXX was actually not designed to do such
> >> >> initialization
> >> >> >> > but
> >> >> >> > >> for
> >> >> >> > >> > > >> > calling during the lifetime of the consumer, and
> we'd
> >> >> better
> >> >> >> > >> provide
> >> >> >> > >> > > the
> >> >> >> > >> > > >> > right solution to do so.
> >> >> >> > >> > > >> >
> >> >> >> > >> > > >> > I can file the JIRA right away and start further
> >> >> discussions
> >> >> >> > >> there.
> >> >> >> > >> > > But
> >> >> >> > >> > > >> let
> >> >> >> > >> > > >> > me know if you have any other ideas.
> >> >> >> > >> > > >> >
> >> >> >> > >> > > >> > Guozhang
> >> >> >> > >> > > >> >
> >> >> >> > >> > > >> > On Wed, Mar 9, 2016 at 12:25 PM, Cody Koeninger <
> >> >> >> > >> c...@koeninger.org
> >> >> >> > >> > >
> >> >> >> > >> > > >> wrote:
> >> >> >> > >> > > >> >
> >> >> >> > >> > > >> >> Yeah, I think I understood what you were saying.
> What
> >> >> I'm
> >> >> >> > saying
> >> >> >> > >> > is
> >> >> >> > >> > > >> >> that if there were a way to just fetch metadata
> >> without
> >> >> >> doing
> >> >> >> > the
> >> >> >> > >> > > rest
> >> >> >> > >> > > >> >> of the work poll() does, it wouldn't be
> necessary.  I
> >> >> guess
> >> >> >> I
> >> >> >> > can
> >> >> >> > >> > do
> >> >> >> > >> > > >> >> listTopics to get all metadata for all topics and
> then
> >> >> parse
> >> >> >> > it.
> >> >> >> > >> > > >> >>
> >> >> >> > >> > > >> >> Regarding running a single instance, that is the
> case
> >> for
> >> >> >> what
> >> >> >> > >> I'm
> >> >> >> > >> > > >> >> talking about.
> >> >> >> > >> > > >> >>
> >> >> >> > >> > > >> >> On Wed, Mar 9, 2016 at 2:02 PM, Guozhang Wang <
> >> >> >> > >> wangg...@gmail.com>
> >> >> >> > >> > > >> wrote:
> >> >> >> > >> > > >> >> > Cody,
> >> >> >> > >> > > >> >> >
> >> >> >> > >> > > >> >> > What I meant for a special case of `seekToXX` is
> >> that,
> >> >> >> today
> >> >> >> > >> when
> >> >> >> > >> > > the
> >> >> >> > >> > > >> >> > function is called with no partition parameters.
> It
> >> >> will
> >> >> >> > try to
> >> >> >> > >> > > >> execute
> >> >> >> > >> > > >> >> the
> >> >> >> > >> > > >> >> > logic on all "assigned" partitions for the
> consumer.
> >> >> And
> >> >> >> > once
> >> >> >> > >> > that
> >> >> >> > >> > > is
> >> >> >> > >> > > >> >> done,
> >> >> >> > >> > > >> >> > the subsequent poll() will not throw the
> exception
> >> >> since
> >> >> >> it
> >> >> >> > >> knows
> >> >> >> > >> > > >> those
> >> >> >> > >> > > >> >> > partitions needs to reset offsets.
> >> >> >> > >> > > >> >> >
> >> >> >> > >> > > >> >> > However for your case, there is no assigned
> >> partitions
> >> >> >> yet,
> >> >> >> > and
> >> >> >> > >> > > hence
> >> >> >> > >> > > >> >> > `seekToXX` will not take effects on any
> partitions.
> >> The
> >> >> >> > >> > assignment
> >> >> >> > >> > > is
> >> >> >> > >> > > >> >> > wrapped in the poll() call as you mentioned. And
> one
> >> >> way
> >> >> >> to
> >> >> >> > >> solve
> >> >> >> > >> > > it
> >> >> >> > >> > > >> is
> >> >> >> > >> > > >> >> to
> >> >> >> > >> > > >> >> > let the `seekToXX()` with no parameters do the
> >> >> >> coordination
> >> >> >> > and
> >> >> >> > >> > get
> >> >> >> > >> > > >> the
> >> >> >> > >> > > >> >> > assigned partitions if there are any subscribed
> >> >> topics, so
> >> >> >> > that
> >> >> >> > >> > the
> >> >> >> > >> > > >> >> > subsequent poll() will know those partitions need
> >> >> >> resetting
> >> >> >> > >> > > offsets.
> >> >> >> > >> > > >> Does
> >> >> >> > >> > > >> >> > that make sense?
> >> >> >> > >> > > >> >> >
> >> >> >> > >> > > >> >> > As for now another way I can think of is to get
> the
> >> >> >> > partition
> >> >> >> > >> > info
> >> >> >> > >> > > >> >> > beforehand and call `seekToBeginning` on all
> >> >> partitions.
> >> >> >> But
> >> >> >> > >> that
> >> >> >> > >> > > >> only
> >> >> >> > >> > > >> >> > works if the consumer knows it is going to get
> all
> >> the
> >> >> >> > >> partitions
> >> >> >> > >> > > >> >> assigned
> >> >> >> > >> > > >> >> > to itself (i.e. you are only running a single
> >> >> instance).
> >> >> >> > >> > > >> >> >
> >> >> >> > >> > > >> >> > Guozhang
> >> >> >> > >> > > >> >> >
> >> >> >> > >> > > >> >> >
> >> >> >> > >> > > >> >> > On Wed, Mar 9, 2016 at 6:22 AM, Cody Koeninger <
> >> >> >> > >> > c...@koeninger.org
> >> >> >> > >> > > >
> >> >> >> > >> > > >> >> wrote:
> >> >> >> > >> > > >> >> >
> >> >> >> > >> > > >> >> >> Another unfortunate thing about
> >> >> ConsumerRebalanceListener
> >> >> >> > is
> >> >> >> > >> > that
> >> >> >> > >> > > in
> >> >> >> > >> > > >> >> >> order to do meaningful work in the callback, you
> >> need
> >> >> a
> >> >> >> > >> > reference
> >> >> >> > >> > > to
> >> >> >> > >> > > >> >> >> the consumer that called it.  But that reference
> >> isn't
> >> >> >> > >> provided
> >> >> >> > >> > to
> >> >> >> > >> > > >> the
> >> >> >> > >> > > >> >> >> callback, which means the listener
> implementation
> >> >> needs
> >> >> >> to
> >> >> >> > >> hold
> >> >> >> > >> > a
> >> >> >> > >> > > >> >> >> reference to the consumer.  Seems like this
> makes
> >> it
> >> >> >> > >> > unnecessarily
> >> >> >> > >> > > >> >> >> awkward to serialize or provide a 0 arg
> constructor
> >> >> for
> >> >> >> the
> >> >> >> > >> > > >> listener.
> >> >> >> > >> > > >> >> >>
> >> >> >> > >> > > >> >> >> On Wed, Mar 9, 2016 at 7:28 AM, Cody Koeninger <
> >> >> >> > >> > > c...@koeninger.org>
> >> >> >> > >> > > >> >> wrote:
> >> >> >> > >> > > >> >> >> > I thought about ConsumerRebalanceListener, but
> >> >> seeking
> >> >> >> to
> >> >> >> > >> the
> >> >> >> > >> > > >> >> >> > beginning any time there's a rebalance for
> >> whatever
> >> >> >> > reason
> >> >> >> > >> is
> >> >> >> > >> > > not
> >> >> >> > >> > > >> >> >> > necessarily the same thing as seeking to the
> >> >> beginning
> >> >> >> > >> before
> >> >> >> > >> > > >> first
> >> >> >> > >> > > >> >> >> > starting the consumer.
> >> >> >> > >> > > >> >> >> >
> >> >> >> > >> > > >> >> >> > On Wed, Mar 9, 2016 at 2:24 AM, Kamal C <
> >> >> >> > >> > kamaltar...@gmail.com>
> >> >> >> > >> > > >> >> wrote:
> >> >> >> > >> > > >> >> >> >> Cody,
> >> >> >> > >> > > >> >> >> >>
> >> >> >> > >> > > >> >> >> >> Use ConsumerRebalanceListener to achieve
> that,
> >> >> >> > >> > > >> >> >> >>
> >> >> >> > >> > > >> >> >> >> ConsumerRebalanceListener listener = new
> >> >> >> > >> > > >> ConsumerRebalanceListener()
> >> >> >> > >> > > >> >> {
> >> >> >> > >> > > >> >> >> >>
> >> >> >> > >> > > >> >> >> >>             @Override
> >> >> >> > >> > > >> >> >> >>             public void
> >> >> >> > >> > > >> >> onPartitionsRevoked(Collection<TopicPartition>
> >> >> >> > >> > > >> >> >> >> partitions) {
> >> >> >> > >> > > >> >> >> >>             }
> >> >> >> > >> > > >> >> >> >>
> >> >> >> > >> > > >> >> >> >>             @Override
> >> >> >> > >> > > >> >> >> >>             public void
> >> >> >> > >> > > >> >> onPartitionsAssigned(Collection<TopicPartition>
> >> >> >> > >> > > >> >> >> >> partitions) {
> >> >> >> > >> > > >> >> >> >>
> >> >> >> > >> >  consumer.seekToBeginning(partitions.toArray(new
> >> >> >> > >> > > >> >> >> >> TopicPartition[0]));
> >> >> >> > >> > > >> >> >> >>             }
> >> >> >> > >> > > >> >> >> >>         };
> >> >> >> > >> > > >> >> >> >>
> >> >> >> > >> > > >> >> >> >> consumer.subscribe(topics, listener);
> >> >> >> > >> > > >> >> >> >>
> >> >> >> > >> > > >> >> >> >> On Wed, Mar 9, 2016 at 12:05 PM, Cody
> Koeninger
> >> <
> >> >> >> > >> > > >> c...@koeninger.org>
> >> >> >> > >> > > >> >> >> wrote:
> >> >> >> > >> > > >> >> >> >>
> >> >> >> > >> > > >> >> >> >>> That suggestion doesn't work, for pretty
> much
> >> the
> >> >> >> same
> >> >> >> > >> > reason
> >> >> >> > >> > > -
> >> >> >> > >> > > >> at
> >> >> >> > >> > > >> >> the
> >> >> >> > >> > > >> >> >> >>> time poll is first called, there is no reset
> >> >> policy
> >> >> >> > and no
> >> >> >> > >> > > >> committed
> >> >> >> > >> > > >> >> >> >>> offset, so NoOffsetForPartitionException is
> >> thrown
> >> >> >> > >> > > >> >> >> >>>
> >> >> >> > >> > > >> >> >> >>> I feel like the underlying problem isn't so
> >> much
> >> >> that
> >> >> >> > >> > > seekToEnd
> >> >> >> > >> > > >> >> needs
> >> >> >> > >> > > >> >> >> >>> special case behavior.  It's more that
> topic
> >> >> >> metadata
> >> >> >> > >> > > fetches,
> >> >> >> > >> > > >> >> >> >>> consumer position fetches, and message
> fetches
> >> are
> >> >> >> all
> >> >> >> > >> > lumped
> >> >> >> > >> > > >> >> together
> >> >> >> > >> > > >> >> >> >>> under a single poll() call, with no way to
> do
> >> them
> >> >> >> > >> > > individually
> >> >> >> > >> > > >> if
> >> >> >> > >> > > >> >> >> >>> necessary.
> >> >> >> > >> > > >> >> >> >>>
> >> >> >> > >> > > >> >> >> >>> What does "work" in this situation is to
> just
> >> >> catch
> >> >> >> the
> >> >> >> > >> > > >> exception
> >> >> >> > >> > > >> >> >> >>> (which leaves the consumer in a state where
> >> topics
> >> >> >> are
> >> >> >> > >> > > >> assigned) and
> >> >> >> > >> > > >> >> >> >>> then seek.  But that is not exactly an
> elegant
> >> >> >> > interface.
> >> >> >> > >> > > >> >> >> >>>
> >> >> >> > >> > > >> >> >> >>>     consumer.subscribe(topics)
> >> >> >> > >> > > >> >> >> >>>     try {
> >> >> >> > >> > > >> >> >> >>>       consumer.poll(0)
> >> >> >> > >> > > >> >> >> >>>     } catch {
> >> >> >> > >> > > >> >> >> >>>       case x: Throwable =>
> >> >> >> > >> > > >> >> >> >>>     }
> >> >> >> > >> > > >> >> >> >>>     consumer.seekToBeginning()
> >> >> >> > >> > > >> >> >> >>>     consumer.poll(0)
> >> >> >> > >> > > >> >> >> >>>
> >> >> >> > >> > > >> >> >> >>>
> >> >> >> > >> > > >> >> >> >>>
> >> >> >> > >> > > >> >> >> >>>
> >> >> >> > >> > > >> >> >> >>> On Tue, Mar 8, 2016 at 11:22 PM, Guozhang
> Wang
> >> <
> >> >> >> > >> > > >> wangg...@gmail.com>
> >> >> >> > >> > > >> >> >> wrote:
> >> >> >> > >> > > >> >> >> >>> > Hi Cody,
> >> >> >> > >> > > >> >> >> >>> >
> >> >> >> > >> > > >> >> >> >>> > The problem with that code is in
> >> >> >> `seekToBeginning()`
> >> >> >> > >> > > followed
> >> >> >> > >> > > >> by
> >> >> >> > >> > > >> >> >> >>> > `subscribe(topic)`.
> >> >> >> > >> > > >> >> >> >>> >
> >> >> >> > >> > > >> >> >> >>> > Since `subscribe` call is lazy evaluated,
> by
> >> the
> >> >> >> time
> >> >> >> > >> > > >> >> >> `seekToBeginning()`
> >> >> >> > >> > > >> >> >> >>> > is called no partition is assigned yet,
> and
> >> >> hence
> >> >> >> it
> >> >> >> > is
> >> >> >> > >> > > >> >> effectively
> >> >> >> > >> > > >> >> >> an
> >> >> >> > >> > > >> >> >> >>> > no-op.
> >> >> >> > >> > > >> >> >> >>> >
> >> >> >> > >> > > >> >> >> >>> > Try
> >> >> >> > >> > > >> >> >> >>> >
> >> >> >> > >> > > >> >> >> >>> >     consumer.subscribe(topics)
> >> >> >> > >> > > >> >> >> >>> >     consumer.poll(0);  // get assigned
> >> >> partitions
> >> >> >> > >> > > >> >> >> >>> >     consumer.seekToBeginning()
> >> >> >> > >> > > >> >> >> >>> >     consumer.poll(0)
> >> >> >> > >> > > >> >> >> >>> >
> >> >> >> > >> > > >> >> >> >>> > to see if that works.
> >> >> >> > >> > > >> >> >> >>> >
> >> >> >> > >> > > >> >> >> >>> > I think it is a valid issue that can be
> >> fixed in
> >> >> >> the
> >> >> >> > new
> >> >> >> > >> > > >> consumer
> >> >> >> > >> > > >> >> >> that,
> >> >> >> > >> > > >> >> >> >>> > upon calling seekToEnd/Beginning with no
> >> >> parameter,
> >> >> >> > >> while
> >> >> >> > >> > no
> >> >> >> > >> > > >> >> >> assigned is
> >> >> >> > >> > > >> >> >> >>> > done yet, do the coordination behind the
> >> scene;
> >> >> it
> >> >> >> > will
> >> >> >> > >> > > though
> >> >> >> > >> > > >> >> >> change the
> >> >> >> > >> > > >> >> >> >>> > behavior of the functions as they are no
> >> longer
> >> >> >> > always
> >> >> >> > >> > > lazily
> >> >> >> > >> > > >> >> >> evaluated.
> >> >> >> > >> > > >> >> >> >>> >
> >> >> >> > >> > > >> >> >> >>> >
> >> >> >> > >> > > >> >> >> >>> > Guozhang
> >> >> >> > >> > > >> >> >> >>> >
> >> >> >> > >> > > >> >> >> >>> >
> >> >> >> > >> > > >> >> >> >>> > On Tue, Mar 8, 2016 at 2:08 PM, Cody
> >> Koeninger <
> >> >> >> > >> > > >> >> c...@koeninger.org>
> >> >> >> > >> > > >> >> >> >>> wrote:
> >> >> >> > >> > > >> >> >> >>> >
> >> >> >> > >> > > >> >> >> >>> >> Using the 0.9 consumer, I would like to
> >> start
> >> >> >> > consuming
> >> >> >> > >> > at
> >> >> >> > >> > > >> the
> >> >> >> > >> > > >> >> >> >>> >> beginning or end, without specifying
> >> >> >> > auto.offset.reset.
> >> >> >> > >> > > >> >> >> >>> >>
> >> >> >> > >> > > >> >> >> >>> >> This does not seem to be possible:
> >> >> >> > >> > > >> >> >> >>> >>
> >> >> >> > >> > > >> >> >> >>> >>     val kafkaParams = Map[String,
> Object](
> >> >> >> > >> > > >> >> >> >>> >>       "bootstrap.servers" ->
> >> >> >> > >> > > conf.getString("kafka.brokers"),
> >> >> >> > >> > > >> >> >> >>> >>       "key.deserializer" ->
> >> >> >> > >> classOf[StringDeserializer],
> >> >> >> > >> > > >> >> >> >>> >>       "value.deserializer" ->
> >> >> >> > >> > classOf[StringDeserializer],
> >> >> >> > >> > > >> >> >> >>> >>       "group.id" -> "example",
> >> >> >> > >> > > >> >> >> >>> >>       "auto.offset.reset" -> "none"
> >> >> >> > >> > > >> >> >> >>> >>     ).asJava
> >> >> >> > >> > > >> >> >> >>> >>     val topics =
> >> >> >> > >> > > >> >> >>
> >> >> conf.getString("kafka.topics").split(",").toList.asJava
> >> >> >> > >> > > >> >> >> >>> >>     val consumer = new
> KafkaConsumer[String,
> >> >> >> > >> > > >> String](kafkaParams)
> >> >> >> > >> > > >> >> >> >>> >>     consumer.subscribe(topics)
> >> >> >> > >> > > >> >> >> >>> >>     consumer.seekToBeginning()
> >> >> >> > >> > > >> >> >> >>> >>     consumer.poll(0)
> >> >> >> > >> > > >> >> >> >>> >>
> >> >> >> > >> > > >> >> >> >>> >>
> >> >> >> > >> > > >> >> >> >>> >> Results in:
> >> >> >> > >> > > >> >> >> >>> >>
> >> >> >> > >> > > >> >> >> >>> >> Exception in thread "main"
> >> >> >> > >> > > >> >> >> >>> >>
> >> >> >> > >> > > >>
> >> >> >> org.apache.kafka.clients.consumer.NoOffsetForPartitionException:
> >> >> >> > >> > > >> >> >> >>> >> Undefined offset with no reset policy for
> >> >> >> partition:
> >> >> >> > >> > > >> testtwo-4
> >> >> >> > >> > > >> >> >> >>> >>         at
> >> >> >> > >> > > >> >> >> >>> >>
> >> >> >> > >> > > >> >> >> >>>
> >> >> >> > >> > > >> >> >>
> >> >> >> > >> > > >> >>
> >> >> >> > >> > > >>
> >> >> >> > >> > >
> >> >> >> > >> >
> >> >> >> > >>
> >> >> >> >
> >> >> >>
> >> >>
> >>
> org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:288)
> >> >> >> > >> > > >> >> >> >>> >>         at
> >> >> >> > >> > > >> >> >> >>> >>
> >> >> >> > >> > > >> >> >> >>>
> >> >> >> > >> > > >> >> >>
> >> >> >> > >> > > >> >>
> >> >> >> > >> > > >>
> >> >> >> > >> > >
> >> >> >> > >> >
> >> >> >> > >>
> >> >> >> >
> >> >> >>
> >> >>
> >>
> org.apache.kafka.clients.consumer.internals.Fetcher.updateFetchPositions(Fetcher.java:167)
> >> >> >> > >> > > >> >> >> >>> >>         at
> >> >> >> > >> > > >> >> >> >>> >>
> >> >> >> > >> > > >> >> >> >>>
> >> >> >> > >> > > >> >> >>
> >> >> >> > >> > > >> >>
> >> >> >> > >> > > >>
> >> >> >> > >> > >
> >> >> >> > >> >
> >> >> >> > >>
> >> >> >> >
> >> >> >>
> >> >>
> >>
> org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1302)
> >> >> >> > >> > > >> >> >> >>> >>         at
> >> >> >> > >> > > >> >> >> >>> >>
> >> >> >> > >> > > >> >> >> >>>
> >> >> >> > >> > > >> >> >>
> >> >> >> > >> > > >> >>
> >> >> >> > >> > > >>
> >> >> >> > >> > >
> >> >> >> > >> >
> >> >> >> > >>
> >> >> >> >
> >> >> >>
> >> >>
> >>
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:895)
> >> >> >> > >> > > >> >> >> >>> >>         at
> >> >> >> > >> > > >> >> >> >>> >>
> >> >> >> > >> > > >> >> >> >>>
> >> >> >> > >> > > >> >> >>
> >> >> >> > >> > > >> >>
> >> >> >> > >> > > >>
> >> >> >> > >> > >
> >> >> >> > >> >
> >> >> >> > >>
> >> >> >> >
> >> >> >>
> >> >>
> >>
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
> >> >> >> > >> > > >> >> >> >>> >>         at
> >> >> >> > >> > > >> >> >>
> >> >> >> > example.BasicKafkaConsumer$.main(BasicKafkaConsumer.scala:25)
> >> >> >> > >> > > >> >> >> >>> >>
> >> >> >> > >> > > >> >> >> >>> >>
> >> >> >> > >> > > >> >> >> >>> >> I'm assuming this is because, at the time
> >> >> >> > >> > seekToBeginning()
> >> >> >> > >> > > >> is
> >> >> >> > >> > > >> >> >> called,
> >> >> >> > >> > > >> >> >> >>> >> subscriptions.assignedPartitions isn't
> >> >> populated.
> >> >> >> > But
> >> >> >> > >> > > >> polling in
> >> >> >> > >> > > >> >> >> >>> >> order to assign topicpartitions results
> in
> >> an
> >> >> >> error,
> >> >> >> > >> > which
> >> >> >> > >> > > >> >> creates a
> >> >> >> > >> > > >> >> >> >>> >> chicken-or-the-egg situation.
> >> >> >> > >> > > >> >> >> >>> >>
> >> >> >> > >> > > >> >> >> >>> >> I don't want to set auto.offset.reset,
> >> because
> >> >> I
> >> >> >> > want a
> >> >> >> > >> > > hard
> >> >> >> > >> > > >> >> error
> >> >> >> > >> > > >> >> >> if
> >> >> >> > >> > > >> >> >> >>> >> the offsets are out of range at any other
> >> time
> >> >> >> > during
> >> >> >> > >> > > >> >> consumption.
> >> >> >> > >> > > >> >> >> >>> >>
> >> >> >> > >> > > >> >> >> >>> >
> >> >> >> > >> > > >> >> >> >>> >
> >> >> >> > >> > > >> >> >> >>> >
> >> >> >> > >> > > >> >> >> >>> > --
> >> >> >> > >> > > >> >> >> >>> > -- Guozhang
> >> >> >> > >> > > >> >> >> >>>
> >> >> >> > >> > > >> >> >>
> >> >> >> > >> > > >> >> >
> >> >> >> > >> > > >> >> >
> >> >> >> > >> > > >> >> >
> >> >> >> > >> > > >> >> > --
> >> >> >> > >> > > >> >> > -- Guozhang
> >> >> >> > >> > > >> >>
> >> >> >> > >> > > >> >
> >> >> >> > >> > > >> >
> >> >> >> > >> > > >> >
> >> >> >> > >> > > >> > --
> >> >> >> > >> > > >> > -- Guozhang
> >> >> >> > >> > > >>
> >> >> >> > >> > > >
> >> >> >> > >> > > >
> >> >> >> > >> > > >
> >> >> >> > >> > > > --
> >> >> >> > >> > > > -- Guozhang
> >> >> >> > >> > > >
> >> >> >> > >> > >
> >> >> >> > >> > >
> >> >> >> > >> > >
> >> >> >> > >> > > --
> >> >> >> > >> > > -- Guozhang
> >> >> >> > >> > >
> >> >> >> > >> >
> >> >> >> > >>
> >> >> >> > >>
> >> >> >> > >>
> >> >> >> > >> --
> >> >> >> > >> -- Guozhang
> >> >> >> > >>
> >> >> >> >
> >> >> >>
> >> >> >
> >> >> >
> >> >> >
> >> >> > --
> >> >> > -- Guozhang
> >> >>
> >> >
> >> >
> >> >
> >> > --
> >> > -- Guozhang
> >>
>

Reply via email to