Yeah, so I'd encourage you guys to consider fixing that while the
consumer is still in beta.

As I said, it makes it awkward to serialize or provide a zero-arg
constructor for a consumer rebalance listener, which is necessary in
our case for restarting a consumer job on behalf of a user.  It also
opens the door for strange behavior - what if the consumer that the
listener is holding is not the one that sent it a rebalance event?

On Wed, Mar 9, 2016 at 10:35 PM, Guozhang Wang <wangg...@gmail.com> wrote:
> In order to do anything meaningful with the consumer itself in rebalance
> callback (e.g. commit offset), you would need to hold on the consumer
> reference; admittedly it sounds a bit awkward, but by design we choose to
> not enforce it in the interface itself.
>
> Guozhang
>
> On Wed, Mar 9, 2016 at 3:39 PM, Cody Koeninger <c...@koeninger.org> wrote:
>
>> So what about my comments regarding the consumer rebalance listener
>> interface not providing access to a consumer?  I can probably work around
>> it, but it seems odd.
>> On Mar 9, 2016 5:04 PM, "Guozhang Wang" <wangg...@gmail.com> wrote:
>>
>> > One thing proposed by Jason:
>> >
>> > If you want to only reset offset upon initialization, and by
>> initialization
>> > you mean "no committed offset", you can do sth. like the following in
>> > rebalance callback.
>> >
>> >                 @Override
>> >
>> >                 public void
>> onPartitionsAssigned(Collection<TopicPartition>
>> > partitions) {
>> >
>> >                     for (TopicPartition partition : partitions)
>> >
>> >                         if (consumer.committed(partition) == null)
>> >
>> >                             consumer.seekToBeginning(partition);
>> >
>> >                 }
>> >
>> >
>> > Guozhang
>> >
>> > On Wed, Mar 9, 2016 at 2:11 PM, Guozhang Wang <wangg...@gmail.com>
>> wrote:
>> >
>> > > Filed https://issues.apache.org/jira/browse/KAFKA-3370.
>> > >
>> > > On Wed, Mar 9, 2016 at 1:11 PM, Cody Koeninger <c...@koeninger.org>
>> > wrote:
>> > >
>> > >> That sounds like an interesting way of addressing the problem, can
>> > >> continue further discussions on the JIRA
>> > >>
>> > >>
>> > >>
>> > >> On Wed, Mar 9, 2016 at 2:59 PM, Guozhang Wang <wangg...@gmail.com>
>> > wrote:
>> > >> > Cody:
>> > >> >
>> > >> > More specifically, you do not need the "listTopics" function if you
>> > >> already
>> > >> > know your subscribed topics, just use "partitionsFor" is sufficient.
>> > >> >
>> > >> > About the fix, I'm thinking of adding two more options in the
>> > >> > auto.offset.rest, say namely "earliest-on-start" and
>> > "latest-on-start",
>> > >> > which sets the reset position ONLY at starting up. The reason is
>> that
>> > >> the
>> > >> > seekToXX was actually not designed to do such initialization but for
>> > >> > calling during the lifetime of the consumer, and we'd better provide
>> > the
>> > >> > right solution to do so.
>> > >> >
>> > >> > I can file the JIRA right away and start further discussions there.
>> > But
>> > >> let
>> > >> > me know if you have any other ideas.
>> > >> >
>> > >> > Guozhang
>> > >> >
>> > >> > On Wed, Mar 9, 2016 at 12:25 PM, Cody Koeninger <c...@koeninger.org
>> >
>> > >> wrote:
>> > >> >
>> > >> >> Yeah, I think I understood what you were saying.  What I'm saying
>> is
>> > >> >> that if there were a way to just fetch metadata without doing the
>> > rest
>> > >> >> of the work poll() does, it wouldn't be necessary.  I guess I can
>> do
>> > >> >> listTopics to get all metadata for all topics and then parse it.
>> > >> >>
>> > >> >> Regarding running a single instance, that is the case for what I'm
>> > >> >> talking about.
>> > >> >>
>> > >> >> On Wed, Mar 9, 2016 at 2:02 PM, Guozhang Wang <wangg...@gmail.com>
>> > >> wrote:
>> > >> >> > Cody,
>> > >> >> >
>> > >> >> > What I meant for a special case of `seekToXX` is that, today when
>> > the
>> > >> >> > function is called with no partition parameters. It will try to
>> > >> execute
>> > >> >> the
>> > >> >> > logic on all "assigned" partitions for the consumer. And once
>> that
>> > is
>> > >> >> done,
>> > >> >> > the subsequent poll() will not throw the exception since it knows
>> > >> those
>> > >> >> > partitions needs to reset offsets.
>> > >> >> >
>> > >> >> > However for your case, there is no assigned partitions yet, and
>> > hence
>> > >> >> > `seekToXX` will not take effects on any partitions. The
>> assignment
>> > is
>> > >> >> > wrapped in the poll() call as you mentioned. And one way to solve
>> > it
>> > >> is
>> > >> >> to
>> > >> >> > let the `seekToXX()` with no parameters do the coordination and
>> get
>> > >> the
>> > >> >> > assigned partitions if there are any subscribed topics, so that
>> the
>> > >> >> > subsequent poll() will know those partitions need resetting
>> > offsets.
>> > >> Does
>> > >> >> > that make sense?
>> > >> >> >
>> > >> >> > As for now another way I can think of is to get the partition
>> info
>> > >> >> > beforehand and call `seekToBeginning` on all partitions. But that
>> > >> only
>> > >> >> > works if the consumer knows it is going to get all the partitions
>> > >> >> assigned
>> > >> >> > to itself (i.e. you are only running a single instance).
>> > >> >> >
>> > >> >> > Guozhang
>> > >> >> >
>> > >> >> >
>> > >> >> > On Wed, Mar 9, 2016 at 6:22 AM, Cody Koeninger <
>> c...@koeninger.org
>> > >
>> > >> >> wrote:
>> > >> >> >
>> > >> >> >> Another unfortunate thing about ConsumerRebalanceListener is
>> that
>> > in
>> > >> >> >> order to do meaningful work in the callback, you need a
>> reference
>> > to
>> > >> >> >> the consumer that called it.  But that reference isn't provided
>> to
>> > >> the
>> > >> >> >> callback, which means the listener implementation needs to hold
>> a
>> > >> >> >> reference to the consumer.  Seems like this makes it
>> unnecessarily
>> > >> >> >> awkward to serialize or provide a 0 arg constructor for the
>> > >> listener.
>> > >> >> >>
>> > >> >> >> On Wed, Mar 9, 2016 at 7:28 AM, Cody Koeninger <
>> > c...@koeninger.org>
>> > >> >> wrote:
>> > >> >> >> > I thought about ConsumerRebalanceListener, but seeking to the
>> > >> >> >> > beginning any time there's a rebalance for whatever reason is
>> > not
>> > >> >> >> > necessarily the same thing as seeking to the beginning before
>> > >> first
>> > >> >> >> > starting the consumer.
>> > >> >> >> >
>> > >> >> >> > On Wed, Mar 9, 2016 at 2:24 AM, Kamal C <
>> kamaltar...@gmail.com>
>> > >> >> wrote:
>> > >> >> >> >> Cody,
>> > >> >> >> >>
>> > >> >> >> >> Use ConsumerRebalanceListener to achieve that,
>> > >> >> >> >>
>> > >> >> >> >> ConsumerRebalanceListener listener = new
>> > >> ConsumerRebalanceListener()
>> > >> >> {
>> > >> >> >> >>
>> > >> >> >> >>             @Override
>> > >> >> >> >>             public void
>> > >> >> onPartitionsRevoked(Collection<TopicPartition>
>> > >> >> >> >> partitions) {
>> > >> >> >> >>             }
>> > >> >> >> >>
>> > >> >> >> >>             @Override
>> > >> >> >> >>             public void
>> > >> >> onPartitionsAssigned(Collection<TopicPartition>
>> > >> >> >> >> partitions) {
>> > >> >> >> >>
>>  consumer.seekToBeginning(partitions.toArray(new
>> > >> >> >> >> TopicPartition[0]));
>> > >> >> >> >>             }
>> > >> >> >> >>         };
>> > >> >> >> >>
>> > >> >> >> >> consumer.subscribe(topics, listener);
>> > >> >> >> >>
>> > >> >> >> >> On Wed, Mar 9, 2016 at 12:05 PM, Cody Koeninger <
>> > >> c...@koeninger.org>
>> > >> >> >> wrote:
>> > >> >> >> >>
>> > >> >> >> >>> That suggestion doesn't work, for pretty much the same
>> reason
>> > -
>> > >> at
>> > >> >> the
>> > >> >> >> >>> time poll is first called, there is no reset policy and no
>> > >> committed
>> > >> >> >> >>> offset, so NoOffsetForPartitionException is thrown
>> > >> >> >> >>>
>> > >> >> >> >>> I feel like the underlying problem isn't so much that
>> > seekToEnd
>> > >> >> needs
>> > >> >> >> >>> special case behavior.  It's more that  topic metadata
>> > fetches,
>> > >> >> >> >>> consumer position fetches, and message fetches are all
>> lumped
>> > >> >> together
>> > >> >> >> >>> under a single poll() call, with no way to do them
>> > individually
>> > >> if
>> > >> >> >> >>> necessary.
>> > >> >> >> >>>
>> > >> >> >> >>> What does "work" in this situation is to just catch the
>> > >> exception
>> > >> >> >> >>> (which leaves the consumer in a state where topics are
>> > >> assigned) and
>> > >> >> >> >>> then seek.  But that is not exactly an elegant interface.
>> > >> >> >> >>>
>> > >> >> >> >>>     consumer.subscribe(topics)
>> > >> >> >> >>>     try {
>> > >> >> >> >>>       consumer.poll(0)
>> > >> >> >> >>>     } catch {
>> > >> >> >> >>>       case x: Throwable =>
>> > >> >> >> >>>     }
>> > >> >> >> >>>     consumer.seekToBeginning()
>> > >> >> >> >>>     consumer.poll(0)
>> > >> >> >> >>>
>> > >> >> >> >>>
>> > >> >> >> >>>
>> > >> >> >> >>>
>> > >> >> >> >>> On Tue, Mar 8, 2016 at 11:22 PM, Guozhang Wang <
>> > >> wangg...@gmail.com>
>> > >> >> >> wrote:
>> > >> >> >> >>> > Hi Cody,
>> > >> >> >> >>> >
>> > >> >> >> >>> > The problem with that code is in `seekToBeginning()`
>> > followed
>> > >> by
>> > >> >> >> >>> > `subscribe(topic)`.
>> > >> >> >> >>> >
>> > >> >> >> >>> > Since `subscribe` call is lazy evaluated, by the time
>> > >> >> >> `seekToBeginning()`
>> > >> >> >> >>> > is called no partition is assigned yet, and hence it is
>> > >> >> effectively
>> > >> >> >> an
>> > >> >> >> >>> > no-op.
>> > >> >> >> >>> >
>> > >> >> >> >>> > Try
>> > >> >> >> >>> >
>> > >> >> >> >>> >     consumer.subscribe(topics)
>> > >> >> >> >>> >     consumer.poll(0);  // get assigned partitions
>> > >> >> >> >>> >     consumer.seekToBeginning()
>> > >> >> >> >>> >     consumer.poll(0)
>> > >> >> >> >>> >
>> > >> >> >> >>> > to see if that works.
>> > >> >> >> >>> >
>> > >> >> >> >>> > I think it is a valid issue that can be fixed in the new
>> > >> consumer
>> > >> >> >> that,
>> > >> >> >> >>> > upon calling seekToEnd/Beginning with no parameter, while
>> no
>> > >> >> >> assigned is
>> > >> >> >> >>> > done yet, do the coordination behind the scene; it will
>> > though
>> > >> >> >> change the
>> > >> >> >> >>> > behavior of the functions as they are no longer always
>> > lazily
>> > >> >> >> evaluated.
>> > >> >> >> >>> >
>> > >> >> >> >>> >
>> > >> >> >> >>> > Guozhang
>> > >> >> >> >>> >
>> > >> >> >> >>> >
>> > >> >> >> >>> > On Tue, Mar 8, 2016 at 2:08 PM, Cody Koeninger <
>> > >> >> c...@koeninger.org>
>> > >> >> >> >>> wrote:
>> > >> >> >> >>> >
>> > >> >> >> >>> >> Using the 0.9 consumer, I would like to start consuming
>> at
>> > >> the
>> > >> >> >> >>> >> beginning or end, without specifying auto.offset.reset.
>> > >> >> >> >>> >>
>> > >> >> >> >>> >> This does not seem to be possible:
>> > >> >> >> >>> >>
>> > >> >> >> >>> >>     val kafkaParams = Map[String, Object](
>> > >> >> >> >>> >>       "bootstrap.servers" ->
>> > conf.getString("kafka.brokers"),
>> > >> >> >> >>> >>       "key.deserializer" -> classOf[StringDeserializer],
>> > >> >> >> >>> >>       "value.deserializer" ->
>> classOf[StringDeserializer],
>> > >> >> >> >>> >>       "group.id" -> "example",
>> > >> >> >> >>> >>       "auto.offset.reset" -> "none"
>> > >> >> >> >>> >>     ).asJava
>> > >> >> >> >>> >>     val topics =
>> > >> >> >> conf.getString("kafka.topics").split(",").toList.asJava
>> > >> >> >> >>> >>     val consumer = new KafkaConsumer[String,
>> > >> String](kafkaParams)
>> > >> >> >> >>> >>     consumer.subscribe(topics)
>> > >> >> >> >>> >>     consumer.seekToBeginning()
>> > >> >> >> >>> >>     consumer.poll(0)
>> > >> >> >> >>> >>
>> > >> >> >> >>> >>
>> > >> >> >> >>> >> Results in:
>> > >> >> >> >>> >>
>> > >> >> >> >>> >> Exception in thread "main"
>> > >> >> >> >>> >>
>> > >> org.apache.kafka.clients.consumer.NoOffsetForPartitionException:
>> > >> >> >> >>> >> Undefined offset with no reset policy for partition:
>> > >> testtwo-4
>> > >> >> >> >>> >>         at
>> > >> >> >> >>> >>
>> > >> >> >> >>>
>> > >> >> >>
>> > >> >>
>> > >>
>> >
>> org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:288)
>> > >> >> >> >>> >>         at
>> > >> >> >> >>> >>
>> > >> >> >> >>>
>> > >> >> >>
>> > >> >>
>> > >>
>> >
>> org.apache.kafka.clients.consumer.internals.Fetcher.updateFetchPositions(Fetcher.java:167)
>> > >> >> >> >>> >>         at
>> > >> >> >> >>> >>
>> > >> >> >> >>>
>> > >> >> >>
>> > >> >>
>> > >>
>> >
>> org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1302)
>> > >> >> >> >>> >>         at
>> > >> >> >> >>> >>
>> > >> >> >> >>>
>> > >> >> >>
>> > >> >>
>> > >>
>> >
>> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:895)
>> > >> >> >> >>> >>         at
>> > >> >> >> >>> >>
>> > >> >> >> >>>
>> > >> >> >>
>> > >> >>
>> > >>
>> >
>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
>> > >> >> >> >>> >>         at
>> > >> >> >> example.BasicKafkaConsumer$.main(BasicKafkaConsumer.scala:25)
>> > >> >> >> >>> >>
>> > >> >> >> >>> >>
>> > >> >> >> >>> >> I'm assuming this is because, at the time
>> seekToBeginning()
>> > >> is
>> > >> >> >> called,
>> > >> >> >> >>> >> subscriptions.assignedPartitions isn't populated.  But
>> > >> polling in
>> > >> >> >> >>> >> order to assign topicpartitions results in an error,
>> which
>> > >> >> creates a
>> > >> >> >> >>> >> chicken-or-the-egg situation.
>> > >> >> >> >>> >>
>> > >> >> >> >>> >> I don't want to set auto.offset.reset, because I want a
>> > hard
>> > >> >> error
>> > >> >> >> if
>> > >> >> >> >>> >> the offsets are out of range at any other time during
>> > >> >> consumption.
>> > >> >> >> >>> >>
>> > >> >> >> >>> >
>> > >> >> >> >>> >
>> > >> >> >> >>> >
>> > >> >> >> >>> > --
>> > >> >> >> >>> > -- Guozhang
>> > >> >> >> >>>
>> > >> >> >>
>> > >> >> >
>> > >> >> >
>> > >> >> >
>> > >> >> > --
>> > >> >> > -- Guozhang
>> > >> >>
>> > >> >
>> > >> >
>> > >> >
>> > >> > --
>> > >> > -- Guozhang
>> > >>
>> > >
>> > >
>> > >
>> > > --
>> > > -- Guozhang
>> > >
>> >
>> >
>> >
>> > --
>> > -- Guozhang
>> >
>>
>
>
>
> --
> -- Guozhang

Reply via email to