I can't get initial subscriptions without poll.  As far as I can tell,
I won't get updated subscriptions (because a partition was added or
another topic matching the pattern was added) without poll either,
right?

I'll take a look at those jiras.

On Mon, Mar 14, 2016 at 4:56 PM, Jason Gustafson <ja...@confluent.io> wrote:
> The offset API is definitely a gap at the moment. I think there were some
> problems with the old consumer's API and we wanted to make sure we didn't
> make the same mistakes. Unfortunately, I'm not sure anyone has had the time
> to give this the attention it needs. Here's a couple JIRAS if you want to
> have a look:
>
> https://issues.apache.org/jira/browse/KAFKA-2076
> https://issues.apache.org/jira/browse/KAFKA-2500
>
> For now, the workaround above should work. However, you should be able to
> skip the call to poll() between the seek since position() will block to get
> the new offset.
>
> -Jason
>
> On Mon, Mar 14, 2016 at 2:37 PM, Cody Koeninger <c...@koeninger.org> wrote:
>
>> Sorry, by metadata I also meant the equivalent of the old
>> OffsetRequest api, which partitionsFor doesn't give you.  I understand
>> why you didn't want to expose the broken "offsets before a certain
>> time" api, but I don't understand why equivalent functionality for
>> first or last offset isn't available in the new consumer.
>>
>> For instance, I need to know the log end offset, preferably without
>> consuming messages.
>>
>> So it sounds like you're suggesting something like this:
>>
>> val kc = KafkaConsumer(...)
>> kc.subscribe(somePattern)
>>
>> while(...) {
>>    kc.assignment.foreach(tp => kc.pause(tp))
>>    kc.seekToEnd()
>>    kc.poll(0)
>>    val logEnd = kc.assignment.map(tp => kc.position(tp))
>>    ... do something with logEnd
>> }
>>
>> Seems like that will work.
>>
>> Once timestamp indexing is implemented and an actual getOffsetsBefore
>> api could work, are people still going to have to mutate consumer
>> position just to get information?  Is this a case of just wanting to
>> hide as much as possible, even if that means making some things harder
>> than they were with the old simple consumer?
>>
>>
>>
>>
>>
>> On Mon, Mar 14, 2016 at 2:35 PM, Jason Gustafson <ja...@confluent.io>
>> wrote:
>> > Ah, that makes more sense. I have no idea about the limitations of your
>> use
>> > case, but maybe you could expose a different interface to users.
>> >
>> > interface RebalanceListener {
>> >   void onPartitionsAssigned(Consumer<K, V> consumer,
>> > Collection<TopicPartition> partitions);
>> >   void onPartitionsRevoked(Consumer<K, V> consumer,
>> > Collection<TopicPartition> partitions);
>> > }
>> >
>> > Then you can adapt it internally in the call to subscribe(). Would that
>> > work?
>> >
>> > Also, we already have partitionsFor() to fetch topic metadata. And you
>> can
>> > look at the pause/resume API which lets you call poll() without consuming
>> > messages. We'd probably want to understand why those are insufficient
>> > before considering new APIs.
>> >
>> > -Jason
>> >
>> > On Mon, Mar 14, 2016 at 12:17 PM, Cody Koeninger <c...@koeninger.org>
>> wrote:
>> >
>> >> Regarding the rebalance listener, in the case of the spark
>> >> integration, it is possible a job can fail and be restarted from
>> >> checkpoint in a new jvm.  That means that you need to be able to
>> >> reconstruct objects.  Any reasonable rebalance listener can't have a
>> >> 0-arg constructor, because it needs a reference to the consumer in
>> >> order to do anything useful.  I can get around this by making the user
>> >> provide a 0-arg function to return a fully configured + subscribed
>> >> Kafka consumer, so that they get to do the dance of constructing a
>> >> consumer before constructing the listener.  I don't think the current
>> >> rebalance listener is a dealbreaker, I just think it is an
>> >> unnecessarily awkward API.
>> >>
>> >> Regarding poll(), I honestly don't see what the problem is with
>> >> keeping poll() behavior as is, but also having public methods to e.g.
>> >> fetch metadata, for people that need that functionality without
>> >> actually consuming messages.
>> >>
>> >> On Mon, Mar 14, 2016 at 1:01 PM, Jason Gustafson <ja...@confluent.io>
>> >> wrote:
>> >> > Late arrival to this discussion. I'm not really sure I see the problem
>> >> with
>> >> > accessing the consumer in the rebalance listener. Before we passed the
>> >> > consumer instance as a separate argument, but that was only because
>> the
>> >> > rebalance listener had to be passed by classname before a reference to
>> >> the
>> >> > consumer was available. After we changed the API to pass it in
>> >> subscribe()
>> >> > instead, getting a reference to the consumer shouldn't be a problem.
>> >> Maybe
>> >> > I'm missing something?
>> >> >
>> >> > As for poll() doing everything... It's definitely caused some
>> confusion,
>> >> > but I'm a little doubtful that trying to split out the functionality
>> is
>> >> > going to help as much as people think. A heartbeat() API has come up
>> >> > several times, for example, but you have to figure out how it's going
>> to
>> >> > affect rebalancing. And rebalancing affects which partitions need to
>> be
>> >> > fetched or can be committed. They're all so intertwined that I'm not
>> sure
>> >> > you can divide them up without creating a bigger problem than the one
>> >> > you're trying to solve. At this point, with 0.10 shortly on the way,
>> it
>> >> > seems unlikely that incompatible changes to the API will be accepted.
>> >> > However, if someone can propose a compatible solution which addresses
>> >> some
>> >> > of the concerns mentioned, we'd love to hear about it!
>> >> >
>> >> > -Jason
>> >> >
>> >> > On Mon, Mar 14, 2016 at 9:21 AM, Cody Koeninger <c...@koeninger.org>
>> >> wrote:
>> >> >
>> >> >> Honestly the fact that everything is hidden inside poll() has been
>> >> >> confusing people since last year, e.g.
>> >> >>
>> >> >> https://issues.apache.org/jira/browse/KAFKA-2359
>> >> >>
>> >> >> I can try to formulate a KIP for this, but it seems clear that I'm
>> not
>> >> >> the only one giving this feedback, and I may not understand all the
>> >> >> other use cases that have been brought up.
>> >> >>
>> >> >> On Sun, Mar 13, 2016 at 3:09 PM, Guozhang Wang <wangg...@gmail.com>
>> >> wrote:
>> >> >> > Cody,
>> >> >> >
>> >> >> > We do not have an umbrella JIRA for this, but rather a case-by-case
>> >> JIRA
>> >> >> > ticket / KIP for API changes in consumer.
>> >> >> >
>> >> >> > If you feel strong about some specific change on the consumer API,
>> >> please
>> >> >> > feel free to create a new KIP with the detailed motivation and
>> >> proposed
>> >> >> > modifications.
>> >> >> >
>> >> >> > Guozhang
>> >> >> >
>> >> >> > On Fri, Mar 11, 2016 at 12:28 PM, Cody Koeninger <
>> c...@koeninger.org>
>> >> >> wrote:
>> >> >> >
>> >> >> >> Is there a KIP or Jira related to " working on improving these
>> cases
>> >> >> >> with improved APIs " ?
>> >> >> >>
>> >> >> >> I saw that there was some discussion of it in KIP-41, but that
>> seemed
>> >> >> >> to have been resolved in favor of keeping everything inside of
>> poll()
>> >> >> >>
>> >> >> >> On Fri, Mar 11, 2016 at 11:17 AM, Guozhang Wang <
>> wangg...@gmail.com>
>> >> >> >> wrote:
>> >> >> >> > Cody, Mansi:
>> >> >> >> >
>> >> >> >> > All good points! Let me try to answer them one-by-one.
>> >> >> >> >
>> >> >> >> > About this specific issue, as I suggested in the JIRA we can
>> >> separate
>> >> >> the
>> >> >> >> > case about resetting offset upon initializing a partition to
>> fetch,
>> >> >> from
>> >> >> >> > the case that fetching offset out-of-range in the
>> auto.offset.reset
>> >> >> >> config.
>> >> >> >> > These two scenarios are indeed quite different and it's
>> reasonable
>> >> >> >> treating
>> >> >> >> > them differently.
>> >> >> >> >
>> >> >> >> > About passing a consumer context to the rebalance callback's
>> >> >> constructor,
>> >> >> >> > we left it for user's flexibility: if you want to use Kafka to
>> >> commit
>> >> >> >> > offsets, for example, then you pass the consumer reference to
>> the
>> >> >> >> callback;
>> >> >> >> > if you use an external service to store offsets, you can pass a
>> >> JDBC
>> >> >> >> > connector, for example, to the callback; for some data mirroring
>> >> you
>> >> >> can
>> >> >> >> > even pass in another producer client into it. Always enforcing
>> the
>> >> >> >> consumer
>> >> >> >> > context could be convenient (i.e. you do not need to pass in the
>> >> >> argument
>> >> >> >> > to the constructor yourself) for some use cases, but not
>> >> necessarily
>> >> >> all.
>> >> >> >> >
>> >> >> >> > About wrapping coordination protocols (partition assignment,
>> >> >> heartbeat)
>> >> >> >> > inside "poll()" behind the scene, we implemented the APIs in
>> this
>> >> way
>> >> >> in
>> >> >> >> > order to abstract the underlying details from the users, and
>> also
>> >> to
>> >> >> >> > provide a simple "single-thread-poll-loop" design pattern in the
>> >> new
>> >> >> >> > Consumer. We realized that it does actually make some of the use
>> >> cases
>> >> >> >> more
>> >> >> >> > awkward, and are working on improving these cases with improved
>> >> APIs
>> >> >> as
>> >> >> >> > well. Let us know if you have any suggestions about this.
>> >> >> >> >
>> >> >> >> > Guozhang
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> > On Thu, Mar 10, 2016 at 7:53 AM, Mansi Shah <
>> >> mansis...@maprtech.com>
>> >> >> >> wrote:
>> >> >> >> >
>> >> >> >> >> I second the need for having a consumer context passed to
>> >> rebalance
>> >> >> >> >> callback. I have ran into issues several times because of that.
>> >> >> >> >>
>> >> >> >> >> About - subscribe vs assign - I have not read through your
>> spark
>> >> code
>> >> >> >> yet
>> >> >> >> >> (will do by eod), so I am not sure what you mean (other than I
>> do
>> >> >> agree
>> >> >> >> >> that new partitions should be consumed automatically). I guess
>> we
>> >> can
>> >> >> >> >> continue this discussion on the spark list then :-)
>> >> >> >> >>
>> >> >> >> >> Thanks
>> >> >> >> >> Mansi.
>> >> >> >> >>
>> >> >> >> >> On Thu, Mar 10, 2016 at 7:43 AM, Cody Koeninger <
>> >> c...@koeninger.org>
>> >> >> >> >> wrote:
>> >> >> >> >>
>> >> >> >> >> > Mansi, I'd agree that the fact that everything is tied up in
>> >> poll
>> >> >> >> >> > seems like the source of the awkward behavior.
>> >> >> >> >> >
>> >> >> >> >> > Regarding assign vs subscribe, most people using the spark
>> >> >> integration
>> >> >> >> >> > are just going to want to provide a topic name, not go figure
>> >> out a
>> >> >> >> >> > bunch of partitions.  They're also going to be surprised if
>> >> things
>> >> >> >> >> > suddenly blow up once a partition is added, or that partition
>> >> >> doesn't
>> >> >> >> >> > start being consumed (we already have that second issue
>> today).
>> >> >> >> >> >
>> >> >> >> >> > Thats why separating the behavior of auto offset reset seems
>> >> like
>> >> >> the
>> >> >> >> >> > best idea I've heard so far.
>> >> >> >> >> >
>> >> >> >> >> > Consumer rebalance listeners are still probably going to be
>> >> >> necessary
>> >> >> >> >> > for people who are storing offsets externally.
>> >> >> >> >> >
>> >> >> >> >> > On Thu, Mar 10, 2016 at 9:27 AM, Mansi Shah <
>> >> >> mansis...@maprtech.com>
>> >> >> >> >> > wrote:
>> >> >> >> >> > > Guozhang
>> >> >> >> >> > >
>> >> >> >> >> > > Sorry for joining the party a little late. I have been
>> >> thinking
>> >> >> >> about
>> >> >> >> >> > this
>> >> >> >> >> > > whole awkward behavior of having to call poll(0) to
>> actually
>> >> make
>> >> >> >> the
>> >> >> >> >> > > underlying subscriptions take effect. Is the core reason
>> for
>> >> this
>> >> >> >> >> design
>> >> >> >> >> > > the fact that poll is also the actual heartbeat and you
>> want
>> >> to
>> >> >> make
>> >> >> >> >> the
>> >> >> >> >> > > listener group assignments through poll - so that timeouts
>> and
>> >> >> >> >> > > reassignments can all go through poll? So I think clubbing
>> >> >> liveness
>> >> >> >> >> with
>> >> >> >> >> > > poll (which in effect clubs consumer group assignments and
>> >> hence
>> >> >> >> >> metadata
>> >> >> >> >> > > fetch with poll) is the real cause of this design. Were
>> there
>> >> >> issues
>> >> >> >> >> > where
>> >> >> >> >> > > you were seeing active consumers not calling poll that led
>> to
>> >> >> this
>> >> >> >> >> design
>> >> >> >> >> > > choice? I tried to look for a relevant JIRA but could not
>> find
>> >> >> one -
>> >> >> >> >> can
>> >> >> >> >> > > you please point me to something if you have it handy?
>> >> >> >> >> > >
>> >> >> >> >> > > Btw this would also means that your proposal to do the
>> actual
>> >> >> >> >> assignments
>> >> >> >> >> > > through seek might not be ideal since there can still be
>> >> >> indefinite
>> >> >> >> >> time
>> >> >> >> >> > > between seek and poll (just like between subscribe and
>> poll)
>> >> and
>> >> >> the
>> >> >> >> >> > > consumer could timeout even before the first poll is
>> called?
>> >> >> >> >> > >
>> >> >> >> >> > >
>> >> >> >> >> > > @Cody in your case if you really have only one consumer and
>> >> it is
>> >> >> >> going
>> >> >> >> >> > to
>> >> >> >> >> > > get all the partitions of the topic anyway - then you
>> might as
>> >> >> well
>> >> >> >> >> > > subscribe using "assign" call instead of "subscribe" call.
>> >> That
>> >> >> will
>> >> >> >> >> make
>> >> >> >> >> > > at least your code cleaner and I do not think you are
>> gaining
>> >> >> >> anything
>> >> >> >> >> > with
>> >> >> >> >> > > the listener group functionality anyway?
>> >> >> >> >> > >
>> >> >> >> >> > > - Mansi.
>> >> >> >> >> > >
>> >> >> >> >> > >
>> >> >> >> >> > >
>> >> >> >> >> > > On Wed, Mar 9, 2016 at 8:35 PM, Guozhang Wang <
>> >> >> wangg...@gmail.com>
>> >> >> >> >> > wrote:
>> >> >> >> >> > >
>> >> >> >> >> > >> In order to do anything meaningful with the consumer
>> itself
>> >> in
>> >> >> >> >> rebalance
>> >> >> >> >> > >> callback (e.g. commit offset), you would need to hold on
>> the
>> >> >> >> consumer
>> >> >> >> >> > >> reference; admittedly it sounds a bit awkward, but by
>> design
>> >> we
>> >> >> >> choose
>> >> >> >> >> > to
>> >> >> >> >> > >> not enforce it in the interface itself.
>> >> >> >> >> > >>
>> >> >> >> >> > >> Guozhang
>> >> >> >> >> > >>
>> >> >> >> >> > >> On Wed, Mar 9, 2016 at 3:39 PM, Cody Koeninger <
>> >> >> c...@koeninger.org
>> >> >> >> >
>> >> >> >> >> > wrote:
>> >> >> >> >> > >>
>> >> >> >> >> > >> > So what about my comments regarding the consumer
>> rebalance
>> >> >> >> listener
>> >> >> >> >> > >> > interface not providing access to a consumer?  I can
>> >> probably
>> >> >> >> work
>> >> >> >> >> > around
>> >> >> >> >> > >> > it, but it seems odd.
>> >> >> >> >> > >> > On Mar 9, 2016 5:04 PM, "Guozhang Wang" <
>> >> wangg...@gmail.com>
>> >> >> >> wrote:
>> >> >> >> >> > >> >
>> >> >> >> >> > >> > > One thing proposed by Jason:
>> >> >> >> >> > >> > >
>> >> >> >> >> > >> > > If you want to only reset offset upon initialization,
>> >> and by
>> >> >> >> >> > >> > initialization
>> >> >> >> >> > >> > > you mean "no committed offset", you can do sth. like
>> the
>> >> >> >> following
>> >> >> >> >> > in
>> >> >> >> >> > >> > > rebalance callback.
>> >> >> >> >> > >> > >
>> >> >> >> >> > >> > >                 @Override
>> >> >> >> >> > >> > >
>> >> >> >> >> > >> > >                 public void
>> >> >> >> >> > >> > onPartitionsAssigned(Collection<TopicPartition>
>> >> >> >> >> > >> > > partitions) {
>> >> >> >> >> > >> > >
>> >> >> >> >> > >> > >                     for (TopicPartition partition :
>> >> >> partitions)
>> >> >> >> >> > >> > >
>> >> >> >> >> > >> > >                         if
>> >> (consumer.committed(partition) ==
>> >> >> >> null)
>> >> >> >> >> > >> > >
>> >> >> >> >> > >> > >
>> >> >> >>  consumer.seekToBeginning(partition);
>> >> >> >> >> > >> > >
>> >> >> >> >> > >> > >                 }
>> >> >> >> >> > >> > >
>> >> >> >> >> > >> > >
>> >> >> >> >> > >> > > Guozhang
>> >> >> >> >> > >> > >
>> >> >> >> >> > >> > > On Wed, Mar 9, 2016 at 2:11 PM, Guozhang Wang <
>> >> >> >> wangg...@gmail.com
>> >> >> >> >> >
>> >> >> >> >> > >> > wrote:
>> >> >> >> >> > >> > >
>> >> >> >> >> > >> > > > Filed
>> https://issues.apache.org/jira/browse/KAFKA-3370
>> >> .
>> >> >> >> >> > >> > > >
>> >> >> >> >> > >> > > > On Wed, Mar 9, 2016 at 1:11 PM, Cody Koeninger <
>> >> >> >> >> > c...@koeninger.org>
>> >> >> >> >> > >> > > wrote:
>> >> >> >> >> > >> > > >
>> >> >> >> >> > >> > > >> That sounds like an interesting way of addressing
>> the
>> >> >> >> problem,
>> >> >> >> >> > can
>> >> >> >> >> > >> > > >> continue further discussions on the JIRA
>> >> >> >> >> > >> > > >>
>> >> >> >> >> > >> > > >>
>> >> >> >> >> > >> > > >>
>> >> >> >> >> > >> > > >> On Wed, Mar 9, 2016 at 2:59 PM, Guozhang Wang <
>> >> >> >> >> > wangg...@gmail.com>
>> >> >> >> >> > >> > > wrote:
>> >> >> >> >> > >> > > >> > Cody:
>> >> >> >> >> > >> > > >> >
>> >> >> >> >> > >> > > >> > More specifically, you do not need the
>> "listTopics"
>> >> >> >> function
>> >> >> >> >> if
>> >> >> >> >> > >> you
>> >> >> >> >> > >> > > >> already
>> >> >> >> >> > >> > > >> > know your subscribed topics, just use
>> >> "partitionsFor"
>> >> >> is
>> >> >> >> >> > >> sufficient.
>> >> >> >> >> > >> > > >> >
>> >> >> >> >> > >> > > >> > About the fix, I'm thinking of adding two more
>> >> options
>> >> >> in
>> >> >> >> the
>> >> >> >> >> > >> > > >> > auto.offset.rest, say namely "earliest-on-start"
>> and
>> >> >> >> >> > >> > > "latest-on-start",
>> >> >> >> >> > >> > > >> > which sets the reset position ONLY at starting
>> up.
>> >> The
>> >> >> >> reason
>> >> >> >> >> > is
>> >> >> >> >> > >> > that
>> >> >> >> >> > >> > > >> the
>> >> >> >> >> > >> > > >> > seekToXX was actually not designed to do such
>> >> >> >> initialization
>> >> >> >> >> > but
>> >> >> >> >> > >> for
>> >> >> >> >> > >> > > >> > calling during the lifetime of the consumer, and
>> >> we'd
>> >> >> >> better
>> >> >> >> >> > >> provide
>> >> >> >> >> > >> > > the
>> >> >> >> >> > >> > > >> > right solution to do so.
>> >> >> >> >> > >> > > >> >
>> >> >> >> >> > >> > > >> > I can file the JIRA right away and start further
>> >> >> >> discussions
>> >> >> >> >> > >> there.
>> >> >> >> >> > >> > > But
>> >> >> >> >> > >> > > >> let
>> >> >> >> >> > >> > > >> > me know if you have any other ideas.
>> >> >> >> >> > >> > > >> >
>> >> >> >> >> > >> > > >> > Guozhang
>> >> >> >> >> > >> > > >> >
>> >> >> >> >> > >> > > >> > On Wed, Mar 9, 2016 at 12:25 PM, Cody Koeninger <
>> >> >> >> >> > >> c...@koeninger.org
>> >> >> >> >> > >> > >
>> >> >> >> >> > >> > > >> wrote:
>> >> >> >> >> > >> > > >> >
>> >> >> >> >> > >> > > >> >> Yeah, I think I understood what you were saying.
>> >> What
>> >> >> >> I'm
>> >> >> >> >> > saying
>> >> >> >> >> > >> > is
>> >> >> >> >> > >> > > >> >> that if there were a way to just fetch metadata
>> >> >> without
>> >> >> >> >> doing
>> >> >> >> >> > the
>> >> >> >> >> > >> > > rest
>> >> >> >> >> > >> > > >> >> of the work poll() does, it wouldn't be
>> >> necessary.  I
>> >> >> >> guess
>> >> >> >> >> I
>> >> >> >> >> > can
>> >> >> >> >> > >> > do
>> >> >> >> >> > >> > > >> >> listTopics to get all metadata for all topics
>> and
>> >> then
>> >> >> >> parse
>> >> >> >> >> > it.
>> >> >> >> >> > >> > > >> >>
>> >> >> >> >> > >> > > >> >> Regarding running a single instance, that is the
>> >> case
>> >> >> for
>> >> >> >> >> what
>> >> >> >> >> > >> I'm
>> >> >> >> >> > >> > > >> >> talking about.
>> >> >> >> >> > >> > > >> >>
>> >> >> >> >> > >> > > >> >> On Wed, Mar 9, 2016 at 2:02 PM, Guozhang Wang <
>> >> >> >> >> > >> wangg...@gmail.com>
>> >> >> >> >> > >> > > >> wrote:
>> >> >> >> >> > >> > > >> >> > Cody,
>> >> >> >> >> > >> > > >> >> >
>> >> >> >> >> > >> > > >> >> > What I meant for a special case of `seekToXX`
>> is
>> >> >> that,
>> >> >> >> >> today
>> >> >> >> >> > >> when
>> >> >> >> >> > >> > > the
>> >> >> >> >> > >> > > >> >> > function is called with no partition
>> parameters.
>> >> It
>> >> >> >> will
>> >> >> >> >> > try to
>> >> >> >> >> > >> > > >> execute
>> >> >> >> >> > >> > > >> >> the
>> >> >> >> >> > >> > > >> >> > logic on all "assigned" partitions for the
>> >> consumer.
>> >> >> >> And
>> >> >> >> >> > once
>> >> >> >> >> > >> > that
>> >> >> >> >> > >> > > is
>> >> >> >> >> > >> > > >> >> done,
>> >> >> >> >> > >> > > >> >> > the subsequent poll() will not throw the
>> >> exception
>> >> >> >> since
>> >> >> >> >> it
>> >> >> >> >> > >> knows
>> >> >> >> >> > >> > > >> those
>> >> >> >> >> > >> > > >> >> > partitions needs to reset offsets.
>> >> >> >> >> > >> > > >> >> >
>> >> >> >> >> > >> > > >> >> > However for your case, there is no assigned
>> >> >> partitions
>> >> >> >> >> yet,
>> >> >> >> >> > and
>> >> >> >> >> > >> > > hence
>> >> >> >> >> > >> > > >> >> > `seekToXX` will not take effects on any
>> >> partitions.
>> >> >> The
>> >> >> >> >> > >> > assignment
>> >> >> >> >> > >> > > is
>> >> >> >> >> > >> > > >> >> > wrapped in the poll() call as you mentioned.
>> And
>> >> one
>> >> >> >> way
>> >> >> >> >> to
>> >> >> >> >> > >> solve
>> >> >> >> >> > >> > > it
>> >> >> >> >> > >> > > >> is
>> >> >> >> >> > >> > > >> >> to
>> >> >> >> >> > >> > > >> >> > let the `seekToXX()` with no parameters do the
>> >> >> >> >> coordination
>> >> >> >> >> > and
>> >> >> >> >> > >> > get
>> >> >> >> >> > >> > > >> the
>> >> >> >> >> > >> > > >> >> > assigned partitions if there are any
>> subscribed
>> >> >> >> topics, so
>> >> >> >> >> > that
>> >> >> >> >> > >> > the
>> >> >> >> >> > >> > > >> >> > subsequent poll() will know those partitions
>> need
>> >> >> >> >> resetting
>> >> >> >> >> > >> > > offsets.
>> >> >> >> >> > >> > > >> Does
>> >> >> >> >> > >> > > >> >> > that make sense?
>> >> >> >> >> > >> > > >> >> >
>> >> >> >> >> > >> > > >> >> > As for now another way I can think of is to
>> get
>> >> the
>> >> >> >> >> > partition
>> >> >> >> >> > >> > info
>> >> >> >> >> > >> > > >> >> > beforehand and call `seekToBeginning` on all
>> >> >> >> partitions.
>> >> >> >> >> But
>> >> >> >> >> > >> that
>> >> >> >> >> > >> > > >> only
>> >> >> >> >> > >> > > >> >> > works if the consumer knows it is going to get
>> >> all
>> >> >> the
>> >> >> >> >> > >> partitions
>> >> >> >> >> > >> > > >> >> assigned
>> >> >> >> >> > >> > > >> >> > to itself (i.e. you are only running a single
>> >> >> >> instance).
>> >> >> >> >> > >> > > >> >> >
>> >> >> >> >> > >> > > >> >> > Guozhang
>> >> >> >> >> > >> > > >> >> >
>> >> >> >> >> > >> > > >> >> >
>> >> >> >> >> > >> > > >> >> > On Wed, Mar 9, 2016 at 6:22 AM, Cody
>> Koeninger <
>> >> >> >> >> > >> > c...@koeninger.org
>> >> >> >> >> > >> > > >
>> >> >> >> >> > >> > > >> >> wrote:
>> >> >> >> >> > >> > > >> >> >
>> >> >> >> >> > >> > > >> >> >> Another unfortunate thing about
>> >> >> >> ConsumerRebalanceListener
>> >> >> >> >> > is
>> >> >> >> >> > >> > that
>> >> >> >> >> > >> > > in
>> >> >> >> >> > >> > > >> >> >> order to do meaningful work in the callback,
>> you
>> >> >> need
>> >> >> >> a
>> >> >> >> >> > >> > reference
>> >> >> >> >> > >> > > to
>> >> >> >> >> > >> > > >> >> >> the consumer that called it.  But that
>> reference
>> >> >> isn't
>> >> >> >> >> > >> provided
>> >> >> >> >> > >> > to
>> >> >> >> >> > >> > > >> the
>> >> >> >> >> > >> > > >> >> >> callback, which means the listener
>> >> implementation
>> >> >> >> needs
>> >> >> >> >> to
>> >> >> >> >> > >> hold
>> >> >> >> >> > >> > a
>> >> >> >> >> > >> > > >> >> >> reference to the consumer.  Seems like this
>> >> makes
>> >> >> it
>> >> >> >> >> > >> > unnecessarily
>> >> >> >> >> > >> > > >> >> >> awkward to serialize or provide a 0 arg
>> >> constructor
>> >> >> >> for
>> >> >> >> >> the
>> >> >> >> >> > >> > > >> listener.
>> >> >> >> >> > >> > > >> >> >>
>> >> >> >> >> > >> > > >> >> >> On Wed, Mar 9, 2016 at 7:28 AM, Cody
>> Koeninger <
>> >> >> >> >> > >> > > c...@koeninger.org>
>> >> >> >> >> > >> > > >> >> wrote:
>> >> >> >> >> > >> > > >> >> >> > I thought about ConsumerRebalanceListener,
>> but
>> >> >> >> seeking
>> >> >> >> >> to
>> >> >> >> >> > >> the
>> >> >> >> >> > >> > > >> >> >> > beginning any time there's a rebalance for
>> >> >> whatever
>> >> >> >> >> > reason
>> >> >> >> >> > >> is
>> >> >> >> >> > >> > > not
>> >> >> >> >> > >> > > >> >> >> > necessarily the same thing as seeking to
>> the
>> >> >> >> beginning
>> >> >> >> >> > >> before
>> >> >> >> >> > >> > > >> first
>> >> >> >> >> > >> > > >> >> >> > starting the consumer.
>> >> >> >> >> > >> > > >> >> >> >
>> >> >> >> >> > >> > > >> >> >> > On Wed, Mar 9, 2016 at 2:24 AM, Kamal C <
>> >> >> >> >> > >> > kamaltar...@gmail.com>
>> >> >> >> >> > >> > > >> >> wrote:
>> >> >> >> >> > >> > > >> >> >> >> Cody,
>> >> >> >> >> > >> > > >> >> >> >>
>> >> >> >> >> > >> > > >> >> >> >> Use ConsumerRebalanceListener to achieve
>> >> that,
>> >> >> >> >> > >> > > >> >> >> >>
>> >> >> >> >> > >> > > >> >> >> >> ConsumerRebalanceListener listener = new
>> >> >> >> >> > >> > > >> ConsumerRebalanceListener()
>> >> >> >> >> > >> > > >> >> {
>> >> >> >> >> > >> > > >> >> >> >>
>> >> >> >> >> > >> > > >> >> >> >>             @Override
>> >> >> >> >> > >> > > >> >> >> >>             public void
>> >> >> >> >> > >> > > >> >> onPartitionsRevoked(Collection<TopicPartition>
>> >> >> >> >> > >> > > >> >> >> >> partitions) {
>> >> >> >> >> > >> > > >> >> >> >>             }
>> >> >> >> >> > >> > > >> >> >> >>
>> >> >> >> >> > >> > > >> >> >> >>             @Override
>> >> >> >> >> > >> > > >> >> >> >>             public void
>> >> >> >> >> > >> > > >> >> onPartitionsAssigned(Collection<TopicPartition>
>> >> >> >> >> > >> > > >> >> >> >> partitions) {
>> >> >> >> >> > >> > > >> >> >> >>
>> >> >> >> >> > >> >  consumer.seekToBeginning(partitions.toArray(new
>> >> >> >> >> > >> > > >> >> >> >> TopicPartition[0]));
>> >> >> >> >> > >> > > >> >> >> >>             }
>> >> >> >> >> > >> > > >> >> >> >>         };
>> >> >> >> >> > >> > > >> >> >> >>
>> >> >> >> >> > >> > > >> >> >> >> consumer.subscribe(topics, listener);
>> >> >> >> >> > >> > > >> >> >> >>
>> >> >> >> >> > >> > > >> >> >> >> On Wed, Mar 9, 2016 at 12:05 PM, Cody
>> >> Koeninger
>> >> >> <
>> >> >> >> >> > >> > > >> c...@koeninger.org>
>> >> >> >> >> > >> > > >> >> >> wrote:
>> >> >> >> >> > >> > > >> >> >> >>
>> >> >> >> >> > >> > > >> >> >> >>> That suggestion doesn't work, for pretty
>> >> much
>> >> >> the
>> >> >> >> >> same
>> >> >> >> >> > >> > reason
>> >> >> >> >> > >> > > -
>> >> >> >> >> > >> > > >> at
>> >> >> >> >> > >> > > >> >> the
>> >> >> >> >> > >> > > >> >> >> >>> time poll is first called, there is no
>> reset
>> >> >> >> policy
>> >> >> >> >> > and no
>> >> >> >> >> > >> > > >> committed
>> >> >> >> >> > >> > > >> >> >> >>> offset, so NoOffsetForPartitionException
>> is
>> >> >> thrown
>> >> >> >> >> > >> > > >> >> >> >>>
>> >> >> >> >> > >> > > >> >> >> >>> I feel like the underlying problem isn't
>> so
>> >> >> much
>> >> >> >> that
>> >> >> >> >> > >> > > seekToEnd
>> >> >> >> >> > >> > > >> >> needs
>> >> >> >> >> > >> > > >> >> >> >>> special case behavior.  It's more that
>> >> topic
>> >> >> >> >> metadata
>> >> >> >> >> > >> > > fetches,
>> >> >> >> >> > >> > > >> >> >> >>> consumer position fetches, and message
>> >> fetches
>> >> >> are
>> >> >> >> >> all
>> >> >> >> >> > >> > lumped
>> >> >> >> >> > >> > > >> >> together
>> >> >> >> >> > >> > > >> >> >> >>> under a single poll() call, with no way
>> to
>> >> do
>> >> >> them
>> >> >> >> >> > >> > > individually
>> >> >> >> >> > >> > > >> if
>> >> >> >> >> > >> > > >> >> >> >>> necessary.
>> >> >> >> >> > >> > > >> >> >> >>>
>> >> >> >> >> > >> > > >> >> >> >>> What does "work" in this situation is to
>> >> just
>> >> >> >> catch
>> >> >> >> >> the
>> >> >> >> >> > >> > > >> exception
>> >> >> >> >> > >> > > >> >> >> >>> (which leaves the consumer in a state
>> where
>> >> >> topics
>> >> >> >> >> are
>> >> >> >> >> > >> > > >> assigned) and
>> >> >> >> >> > >> > > >> >> >> >>> then seek.  But that is not exactly an
>> >> elegant
>> >> >> >> >> > interface.
>> >> >> >> >> > >> > > >> >> >> >>>
>> >> >> >> >> > >> > > >> >> >> >>>     consumer.subscribe(topics)
>> >> >> >> >> > >> > > >> >> >> >>>     try {
>> >> >> >> >> > >> > > >> >> >> >>>       consumer.poll(0)
>> >> >> >> >> > >> > > >> >> >> >>>     } catch {
>> >> >> >> >> > >> > > >> >> >> >>>       case x: Throwable =>
>> >> >> >> >> > >> > > >> >> >> >>>     }
>> >> >> >> >> > >> > > >> >> >> >>>     consumer.seekToBeginning()
>> >> >> >> >> > >> > > >> >> >> >>>     consumer.poll(0)
>> >> >> >> >> > >> > > >> >> >> >>>
>> >> >> >> >> > >> > > >> >> >> >>>
>> >> >> >> >> > >> > > >> >> >> >>>
>> >> >> >> >> > >> > > >> >> >> >>>
>> >> >> >> >> > >> > > >> >> >> >>> On Tue, Mar 8, 2016 at 11:22 PM, Guozhang
>> >> Wang
>> >> >> <
>> >> >> >> >> > >> > > >> wangg...@gmail.com>
>> >> >> >> >> > >> > > >> >> >> wrote:
>> >> >> >> >> > >> > > >> >> >> >>> > Hi Cody,
>> >> >> >> >> > >> > > >> >> >> >>> >
>> >> >> >> >> > >> > > >> >> >> >>> > The problem with that code is in
>> >> >> >> >> `seekToBeginning()`
>> >> >> >> >> > >> > > followed
>> >> >> >> >> > >> > > >> by
>> >> >> >> >> > >> > > >> >> >> >>> > `subscribe(topic)`.
>> >> >> >> >> > >> > > >> >> >> >>> >
>> >> >> >> >> > >> > > >> >> >> >>> > Since `subscribe` call is lazy
>> evaluated,
>> >> by
>> >> >> the
>> >> >> >> >> time
>> >> >> >> >> > >> > > >> >> >> `seekToBeginning()`
>> >> >> >> >> > >> > > >> >> >> >>> > is called no partition is assigned yet,
>> >> and
>> >> >> >> hence
>> >> >> >> >> it
>> >> >> >> >> > is
>> >> >> >> >> > >> > > >> >> effectively
>> >> >> >> >> > >> > > >> >> >> an
>> >> >> >> >> > >> > > >> >> >> >>> > no-op.
>> >> >> >> >> > >> > > >> >> >> >>> >
>> >> >> >> >> > >> > > >> >> >> >>> > Try
>> >> >> >> >> > >> > > >> >> >> >>> >
>> >> >> >> >> > >> > > >> >> >> >>> >     consumer.subscribe(topics)
>> >> >> >> >> > >> > > >> >> >> >>> >     consumer.poll(0);  // get assigned
>> >> >> >> partitions
>> >> >> >> >> > >> > > >> >> >> >>> >     consumer.seekToBeginning()
>> >> >> >> >> > >> > > >> >> >> >>> >     consumer.poll(0)
>> >> >> >> >> > >> > > >> >> >> >>> >
>> >> >> >> >> > >> > > >> >> >> >>> > to see if that works.
>> >> >> >> >> > >> > > >> >> >> >>> >
>> >> >> >> >> > >> > > >> >> >> >>> > I think it is a valid issue that can be
>> >> >> fixed in
>> >> >> >> >> the
>> >> >> >> >> > new
>> >> >> >> >> > >> > > >> consumer
>> >> >> >> >> > >> > > >> >> >> that,
>> >> >> >> >> > >> > > >> >> >> >>> > upon calling seekToEnd/Beginning with
>> no
>> >> >> >> parameter,
>> >> >> >> >> > >> while
>> >> >> >> >> > >> > no
>> >> >> >> >> > >> > > >> >> >> assigned is
>> >> >> >> >> > >> > > >> >> >> >>> > done yet, do the coordination behind
>> the
>> >> >> scene;
>> >> >> >> it
>> >> >> >> >> > will
>> >> >> >> >> > >> > > though
>> >> >> >> >> > >> > > >> >> >> change the
>> >> >> >> >> > >> > > >> >> >> >>> > behavior of the functions as they are
>> no
>> >> >> longer
>> >> >> >> >> > always
>> >> >> >> >> > >> > > lazily
>> >> >> >> >> > >> > > >> >> >> evaluated.
>> >> >> >> >> > >> > > >> >> >> >>> >
>> >> >> >> >> > >> > > >> >> >> >>> >
>> >> >> >> >> > >> > > >> >> >> >>> > Guozhang
>> >> >> >> >> > >> > > >> >> >> >>> >
>> >> >> >> >> > >> > > >> >> >> >>> >
>> >> >> >> >> > >> > > >> >> >> >>> > On Tue, Mar 8, 2016 at 2:08 PM, Cody
>> >> >> Koeninger <
>> >> >> >> >> > >> > > >> >> c...@koeninger.org>
>> >> >> >> >> > >> > > >> >> >> >>> wrote:
>> >> >> >> >> > >> > > >> >> >> >>> >
>> >> >> >> >> > >> > > >> >> >> >>> >> Using the 0.9 consumer, I would like
>> to
>> >> >> start
>> >> >> >> >> > consuming
>> >> >> >> >> > >> > at
>> >> >> >> >> > >> > > >> the
>> >> >> >> >> > >> > > >> >> >> >>> >> beginning or end, without specifying
>> >> >> >> >> > auto.offset.reset.
>> >> >> >> >> > >> > > >> >> >> >>> >>
>> >> >> >> >> > >> > > >> >> >> >>> >> This does not seem to be possible:
>> >> >> >> >> > >> > > >> >> >> >>> >>
>> >> >> >> >> > >> > > >> >> >> >>> >>     val kafkaParams = Map[String,
>> >> Object](
>> >> >> >> >> > >> > > >> >> >> >>> >>       "bootstrap.servers" ->
>> >> >> >> >> > >> > > conf.getString("kafka.brokers"),
>> >> >> >> >> > >> > > >> >> >> >>> >>       "key.deserializer" ->
>> >> >> >> >> > >> classOf[StringDeserializer],
>> >> >> >> >> > >> > > >> >> >> >>> >>       "value.deserializer" ->
>> >> >> >> >> > >> > classOf[StringDeserializer],
>> >> >> >> >> > >> > > >> >> >> >>> >>       "group.id" -> "example",
>> >> >> >> >> > >> > > >> >> >> >>> >>       "auto.offset.reset" -> "none"
>> >> >> >> >> > >> > > >> >> >> >>> >>     ).asJava
>> >> >> >> >> > >> > > >> >> >> >>> >>     val topics =
>> >> >> >> >> > >> > > >> >> >>
>> >> >> >> conf.getString("kafka.topics").split(",").toList.asJava
>> >> >> >> >> > >> > > >> >> >> >>> >>     val consumer = new
>> >> KafkaConsumer[String,
>> >> >> >> >> > >> > > >> String](kafkaParams)
>> >> >> >> >> > >> > > >> >> >> >>> >>     consumer.subscribe(topics)
>> >> >> >> >> > >> > > >> >> >> >>> >>     consumer.seekToBeginning()
>> >> >> >> >> > >> > > >> >> >> >>> >>     consumer.poll(0)
>> >> >> >> >> > >> > > >> >> >> >>> >>
>> >> >> >> >> > >> > > >> >> >> >>> >>
>> >> >> >> >> > >> > > >> >> >> >>> >> Results in:
>> >> >> >> >> > >> > > >> >> >> >>> >>
>> >> >> >> >> > >> > > >> >> >> >>> >> Exception in thread "main"
>> >> >> >> >> > >> > > >> >> >> >>> >>
>> >> >> >> >> > >> > > >>
>> >> >> >> >>
>> org.apache.kafka.clients.consumer.NoOffsetForPartitionException:
>> >> >> >> >> > >> > > >> >> >> >>> >> Undefined offset with no reset policy
>> for
>> >> >> >> >> partition:
>> >> >> >> >> > >> > > >> testtwo-4
>> >> >> >> >> > >> > > >> >> >> >>> >>         at
>> >> >> >> >> > >> > > >> >> >> >>> >>
>> >> >> >> >> > >> > > >> >> >> >>>
>> >> >> >> >> > >> > > >> >> >>
>> >> >> >> >> > >> > > >> >>
>> >> >> >> >> > >> > > >>
>> >> >> >> >> > >> > >
>> >> >> >> >> > >> >
>> >> >> >> >> > >>
>> >> >> >> >> >
>> >> >> >> >>
>> >> >> >>
>> >> >>
>> >>
>> org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:288)
>> >> >> >> >> > >> > > >> >> >> >>> >>         at
>> >> >> >> >> > >> > > >> >> >> >>> >>
>> >> >> >> >> > >> > > >> >> >> >>>
>> >> >> >> >> > >> > > >> >> >>
>> >> >> >> >> > >> > > >> >>
>> >> >> >> >> > >> > > >>
>> >> >> >> >> > >> > >
>> >> >> >> >> > >> >
>> >> >> >> >> > >>
>> >> >> >> >> >
>> >> >> >> >>
>> >> >> >>
>> >> >>
>> >>
>> org.apache.kafka.clients.consumer.internals.Fetcher.updateFetchPositions(Fetcher.java:167)
>> >> >> >> >> > >> > > >> >> >> >>> >>         at
>> >> >> >> >> > >> > > >> >> >> >>> >>
>> >> >> >> >> > >> > > >> >> >> >>>
>> >> >> >> >> > >> > > >> >> >>
>> >> >> >> >> > >> > > >> >>
>> >> >> >> >> > >> > > >>
>> >> >> >> >> > >> > >
>> >> >> >> >> > >> >
>> >> >> >> >> > >>
>> >> >> >> >> >
>> >> >> >> >>
>> >> >> >>
>> >> >>
>> >>
>> org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1302)
>> >> >> >> >> > >> > > >> >> >> >>> >>         at
>> >> >> >> >> > >> > > >> >> >> >>> >>
>> >> >> >> >> > >> > > >> >> >> >>>
>> >> >> >> >> > >> > > >> >> >>
>> >> >> >> >> > >> > > >> >>
>> >> >> >> >> > >> > > >>
>> >> >> >> >> > >> > >
>> >> >> >> >> > >> >
>> >> >> >> >> > >>
>> >> >> >> >> >
>> >> >> >> >>
>> >> >> >>
>> >> >>
>> >>
>> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:895)
>> >> >> >> >> > >> > > >> >> >> >>> >>         at
>> >> >> >> >> > >> > > >> >> >> >>> >>
>> >> >> >> >> > >> > > >> >> >> >>>
>> >> >> >> >> > >> > > >> >> >>
>> >> >> >> >> > >> > > >> >>
>> >> >> >> >> > >> > > >>
>> >> >> >> >> > >> > >
>> >> >> >> >> > >> >
>> >> >> >> >> > >>
>> >> >> >> >> >
>> >> >> >> >>
>> >> >> >>
>> >> >>
>> >>
>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
>> >> >> >> >> > >> > > >> >> >> >>> >>         at
>> >> >> >> >> > >> > > >> >> >>
>> >> >> >> >> > example.BasicKafkaConsumer$.main(BasicKafkaConsumer.scala:25)
>> >> >> >> >> > >> > > >> >> >> >>> >>
>> >> >> >> >> > >> > > >> >> >> >>> >>
>> >> >> >> >> > >> > > >> >> >> >>> >> I'm assuming this is because, at the
>> time
>> >> >> >> >> > >> > seekToBeginning()
>> >> >> >> >> > >> > > >> is
>> >> >> >> >> > >> > > >> >> >> called,
>> >> >> >> >> > >> > > >> >> >> >>> >> subscriptions.assignedPartitions isn't
>> >> >> >> populated.
>> >> >> >> >> > But
>> >> >> >> >> > >> > > >> polling in
>> >> >> >> >> > >> > > >> >> >> >>> >> order to assign topicpartitions
>> results
>> >> in
>> >> >> an
>> >> >> >> >> error,
>> >> >> >> >> > >> > which
>> >> >> >> >> > >> > > >> >> creates a
>> >> >> >> >> > >> > > >> >> >> >>> >> chicken-or-the-egg situation.
>> >> >> >> >> > >> > > >> >> >> >>> >>
>> >> >> >> >> > >> > > >> >> >> >>> >> I don't want to set auto.offset.reset,
>> >> >> because
>> >> >> >> I
>> >> >> >> >> > want a
>> >> >> >> >> > >> > > hard
>> >> >> >> >> > >> > > >> >> error
>> >> >> >> >> > >> > > >> >> >> if
>> >> >> >> >> > >> > > >> >> >> >>> >> the offsets are out of range at any
>> other
>> >> >> time
>> >> >> >> >> > during
>> >> >> >> >> > >> > > >> >> consumption.
>> >> >> >> >> > >> > > >> >> >> >>> >>
>> >> >> >> >> > >> > > >> >> >> >>> >
>> >> >> >> >> > >> > > >> >> >> >>> >
>> >> >> >> >> > >> > > >> >> >> >>> >
>> >> >> >> >> > >> > > >> >> >> >>> > --
>> >> >> >> >> > >> > > >> >> >> >>> > -- Guozhang
>> >> >> >> >> > >> > > >> >> >> >>>
>> >> >> >> >> > >> > > >> >> >>
>> >> >> >> >> > >> > > >> >> >
>> >> >> >> >> > >> > > >> >> >
>> >> >> >> >> > >> > > >> >> >
>> >> >> >> >> > >> > > >> >> > --
>> >> >> >> >> > >> > > >> >> > -- Guozhang
>> >> >> >> >> > >> > > >> >>
>> >> >> >> >> > >> > > >> >
>> >> >> >> >> > >> > > >> >
>> >> >> >> >> > >> > > >> >
>> >> >> >> >> > >> > > >> > --
>> >> >> >> >> > >> > > >> > -- Guozhang
>> >> >> >> >> > >> > > >>
>> >> >> >> >> > >> > > >
>> >> >> >> >> > >> > > >
>> >> >> >> >> > >> > > >
>> >> >> >> >> > >> > > > --
>> >> >> >> >> > >> > > > -- Guozhang
>> >> >> >> >> > >> > > >
>> >> >> >> >> > >> > >
>> >> >> >> >> > >> > >
>> >> >> >> >> > >> > >
>> >> >> >> >> > >> > > --
>> >> >> >> >> > >> > > -- Guozhang
>> >> >> >> >> > >> > >
>> >> >> >> >> > >> >
>> >> >> >> >> > >>
>> >> >> >> >> > >>
>> >> >> >> >> > >>
>> >> >> >> >> > >> --
>> >> >> >> >> > >> -- Guozhang
>> >> >> >> >> > >>
>> >> >> >> >> >
>> >> >> >> >>
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> > --
>> >> >> >> > -- Guozhang
>> >> >> >>
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> > --
>> >> >> > -- Guozhang
>> >> >>
>> >>
>>
>>

Reply via email to