Hey Guozhang, thanks for the thorough reply!

I definitely went back and forth on whether to make it a blocking call,
and ultimately went with blocking just to leave it open to potential future
use cases (in particular non-Streams apps). But on second (or third)
thought I think I agree that no use case wouldn't be similarly covered by
just calling poll() immediately after enforceRebalance(). It seems best to
leave all rebalancing action within the scope of poll alone and avoid
introducing unnecessary complexity -- happy to revert this then.

I think that ends up addressing most of your other concerns, although
there's one I would push back on: this method should still explicitly
call out whether a rebalance is already in progress and the call is thus
a no-op. If throwing a RebalanceInProgressException seems too
heavy maybe we can just return a boolean indicating whether a new
rebalance was triggered or not.

The snippet you included does work around this, by checking the
condition again in the rebalance listener. But I would argue that a) many
applications don't use a rebalance listener, and shouldn't be forced to
implement it to fully use this new API. More importantly, since you can
probably use the assignor's onAssignment method to achieve the same
thing, b) it adds unnecessary complexity, and as we've seen in Streams
the interactions between the rebalance callbacks and main consumer
can already get quite ugly.

For simplicity's sake then, I'll propose to just return the bool over the
exception and change the signature to

/**
 * @return Whether a new rebalance was triggered (false if a rebalance was
already in progress)
 * @throws java.lang.IllegalStateException if the consumer does not use
group subscription
 */
boolean enforceRebalance();

Thoughts?

On Tue, Feb 11, 2020 at 5:29 PM Guozhang Wang <wangg...@gmail.com> wrote:

> Hello Sophie, thanks for brining up this KIP, and the great write-up
> summarizing the motivations of the proposal. Here are some comments:
>
> Minor:
>
> 1. If we want to make it a blocking call (I have some thoughts about this
> below :), to be consistent we need to consider having two overloaded
> function, one without the timeout which then relies on `
> DEFAULT_API_TIMEOUT_MS_CONFIG`.
>
> 2. Also I'd suggest that, again for API consistency, we a) throw
> TimeoutException if the operation cannot be completed within the timeout
> value, b) return false immediately if we cannot trigger a rebalance either
> because coordinator is unknown.
>
> Meta:
>
> 3. I'm not sure if we have a concrete scenario that we want to wait until
> the rebalance is completed in KIP-441 / 268, rather than calling
> "consumer.enforceRebalance(); consumer.poll()" consecutively and try to
> execute the rebalance in the poll call? If there's no valid motivations I'm
> still a bit inclined to make it non-blocking (i.e. just setting a bit and
> then execute the process in the later poll call) similar to our `seek`
> functions. By doing this we can also make this function simpler as it would
> never throw RebalanceInProgress or Timeout or even KafkaExceptions.
>
> 4. Re: the case "when a rebalance is already in progress", this may be
> related to 3) above. I think we can simplify this case as well but just not
> triggering a new rebalance and let the the caller handle it: for example in
> KIP-441, in each iteration of the stream thread, we can if a standby task
> is ready, and if yes we call `enforceRebalance`, if there's already a
> rebalance in progress (either with the new subscription metadata, or not)
> this call would be a no-op, and then in the next iteration we would just
> call that function again, and eventually we would trigger the rebalance
> with the new subscription metadata and previous calls would be no-op and
> hence no cost anyways. I feel this would be simpler than letting the caller
> to capture RebalanceInProgressException:
>
>
> mainProcessingLoop() {
>     if (needsRebalance) {
>         consumer.enforceRebalance();
>     }
>
>     records = consumer.poll();
>     ...
>     // do some processing
> }
>
> RebalanceListener {
>
>    onPartitionsAssigned(...) {
>       if (rebalanceGoalAchieved()) {
>         needsRebalance = false;
>       }
>     }
> }
>
>
> WDYT?
>
>
>
>
> On Tue, Feb 11, 2020 at 3:59 PM Sophie Blee-Goldman <sop...@confluent.io>
> wrote:
>
> > Hey Boyang,
> >
> > Originally I had it as a nonblocking call, but decided to change it to
> > blocking
> > with a timeout parameter. I'm not sure a future makes sense to return
> here,
> > because the rebalance either does or does not complete within the
> timeout:
> > if it does not, you will have to call poll again to complete it (as is
> the
> > case with
> > any other rebalance). I'll call this out in the javadocs as well.
> >
> > I also added an example demonstrating how/when to use this new API.
> >
> > Thanks!
> >
> > On Tue, Feb 11, 2020 at 1:49 PM Boyang Chen <reluctanthero...@gmail.com>
> > wrote:
> >
> > > Hey Sophie,
> > >
> > > is the `enforceRebalance` a blocking call? Could we add a code sample
> to
> > > the KIP on how this API should be used?
> > >
> > > Returning a future instead of a boolean might be easier as we are
> > allowing
> > > consumer to make progress during rebalance after 429 IMHO.
> > >
> > > Boyang
> > >
> > >
> > > On Tue, Feb 11, 2020 at 1:17 PM Konstantine Karantasis <
> > > konstant...@confluent.io> wrote:
> > >
> > > > Thanks for the quick turnaround Sophie. My points have been
> addressed.
> > > > I think the intended use is quite clear now.
> > > >
> > > > Best,
> > > > Konstantine
> > > >
> > > >
> > > > On Tue, Feb 11, 2020 at 12:57 PM Sophie Blee-Goldman <
> > > sop...@confluent.io>
> > > > wrote:
> > > >
> > > > > Konstantine,
> > > > > Thanks for the feedback! I've updated the sections with your
> > > > suggestions. I
> > > > > agree
> > > > > in particular that it's really important to make sure users don't
> > call
> > > > this
> > > > > unnecessarily,
> > > > >  or for the wrong reasons: to that end I also extended the javadocs
> > to
> > > > > specify that this
> > > > > API is for when changes to the subscription userdata occur.
> Hopefully
> > > > that
> > > > > should make
> > > > > its intended usage quite clear.
> > > > >
> > > > > Bill,
> > > > > The rebalance triggered by this new API will be a "normal"
> rebalance,
> > > and
> > > > > therefore
> > > > > follow the existing listener semantics. For example a cooperative
> > > > rebalance
> > > > > will always
> > > > > call onPartitionsAssigned, even if no partitions are actually
> moved.
> > > > > An eager rebalance will still revoke all partitions first anyway.
> > > > >
> > > > > Thanks for the feedback!
> > > > > Sophie
> > > > >
> > > > > On Tue, Feb 11, 2020 at 9:52 AM Bill Bejeck <bbej...@gmail.com>
> > wrote:
> > > > >
> > > > > > Hi Sophie,
> > > > > >
> > > > > > Thanks for the KIP, makes sense to me.
> > > > > >
> > > > > > One quick question, I'm not sure if it's relevant or not.
> > > > > >
> > > > > > If a user provides a `ConsumerRebalanceListener` and a rebalance
> is
> > > > > > triggered from an `enforceRebalance`  call,
> > > > > > it seems possible the listener won't get called since partition
> > > > > assignments
> > > > > > might not change.
> > > > > > If that is the case, do we want to possibly consider adding a
> > method
> > > to
> > > > > the
> > > > > > `ConsumerRebalanceListener` for callbacks on `enforceRebalance`
> > > > actions?
> > > > > >
> > > > > > Thanks,
> > > > > > Bill
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Tue, Feb 11, 2020 at 12:11 PM Konstantine Karantasis <
> > > > > > konstant...@confluent.io> wrote:
> > > > > >
> > > > > > > Hi Sophie.
> > > > > > >
> > > > > > > Thanks for the KIP. I liked how focused the proposal is. Also,
> > its
> > > > > > > motivation is clear after carefully reading the KIP and its
> > > > references.
> > > > > > >
> > > > > > > Yet, I think it'd be a good idea to call out explicitly on the
> > > > Rejected
> > > > > > > Alternatives section that an automatic and periodic triggering
> of
> > > > > > > rebalances that would not require exposing this capability
> > through
> > > > the
> > > > > > > Consumer interface does not cover your specific use cases and
> > > > therefore
> > > > > > is
> > > > > > > not chosen as a desired approach. Maybe, even consider
> mentioning
> > > > again
> > > > > > > here that this method is expected to be used to respond to
> system
> > > > > changes
> > > > > > > external to the consumer and its membership logic and is not
> > > proposed
> > > > > as
> > > > > > a
> > > > > > > way to resolve temporary imbalances due to membership changes
> > that
> > > > > should
> > > > > > > inherently be resolved by the assignor logic itself with one or
> > > more
> > > > > > > consecutive rebalances.
> > > > > > >
> > > > > > > Also, in your javadoc I'd add some context similar to what
> > someone
> > > > can
> > > > > > read
> > > > > > > on the KIP. Specifically where you say: "for example if some
> > > > condition
> > > > > > has
> > > > > > > changed that has implications for the partition assignment."
> I'd
> > > > rather
> > > > > > add
> > > > > > > something like "for example, if some condition external and
> > > invisible
> > > > > to
> > > > > > > the Consumer and its group membership has changed in ways that
> > > would
> > > > > > > justify a new partition assignment". That's just an example,
> feel
> > > > free
> > > > > to
> > > > > > > reword, but I believe that saying explicitly that this
> condition
> > is
> > > > not
> > > > > > > visible to the consumer is useful to understand that this is
> not
> > > > > > necessary
> > > > > > > under normal circumstances.
> > > > > > >
> > > > > > > In Compatibility, Deprecation, and Migration Plan section I
> think
> > > > it's
> > > > > > > worth mentioning that this is a new feature that affects new
> > > > > > > implementations of the Consumer interface and any such new
> > > > > implementation
> > > > > > > should override the new method. Implementations that wish to
> > > upgrade
> > > > > to a
> > > > > > > newer version should be extended and recompiled, since no
> default
> > > > > > > implementation will be provided.
> > > > > > >
> > > > > > > Naming is hard here, if someone wants to emphasize the ad hoc
> and
> > > > > > irregular
> > > > > > > nature of this call. After some thought I'm fine with
> > > > > 'enforceRebalance'
> > > > > > > even if it could potentially be confused to a method that is
> > > supposed
> > > > > to
> > > > > > be
> > > > > > > called to remediate one or more previously unsuccessful
> > rebalances
> > > > > (which
> > > > > > > is partly what StreamThread#enforceRebalance is used for). The
> > > best I
> > > > > > could
> > > > > > > think of was 'onRequestRebalance' but that's not perfect
> either.
> > > > > > >
> > > > > > > Best,
> > > > > > > Konstantine
> > > > > > >
> > > > > > >
> > > > > > > On Mon, Feb 10, 2020 at 5:18 PM Sophie Blee-Goldman <
> > > > > sop...@confluent.io
> > > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Thanks John. I took out the KafkaConsumer method and moved
> the
> > > > > javadocs
> > > > > > > > to the Consumer#enforceRebalance in the KIP -- hope you're
> > happy
> > > :P
> > > > > > > >
> > > > > > > > Also, I wanted to point out one minor change to the current
> > > > proposal:
> > > > > > > make
> > > > > > > > this
> > > > > > > > a blocking call, which accepts a timeout and returns whether
> > the
> > > > > > > rebalance
> > > > > > > > completed within the timeout. It will still reduce to a
> > > nonblocking
> > > > > > call
> > > > > > > if
> > > > > > > > a "zero"
> > > > > > > > timeout is supplied. I've updated the KIP accordingly.
> > > > > > > >
> > > > > > > > Let me know if there are any further concerns, else I'll call
> > > for a
> > > > > > vote.
> > > > > > > >
> > > > > > > > Cheers!
> > > > > > > > Sophie
> > > > > > > >
> > > > > > > > On Mon, Feb 10, 2020 at 12:47 PM John Roesler <
> > > vvcep...@apache.org
> > > > >
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Thanks Sophie,
> > > > > > > > >
> > > > > > > > > Sorry I didn't respond. I think your new method name sounds
> > > good.
> > > > > > > > >
> > > > > > > > > Regarding the interface vs implementation, I agree it's
> > > > confusing.
> > > > > > It's
> > > > > > > > > always bothered me that the interface redirects you to an
> > > > > > > implementation
> > > > > > > > > JavaDocs, but never enough for me to stop what I'm doing to
> > fix
> > > > it.
> > > > > > > > > It's not a big deal either way, I just thought it was
> strange
> > > to
> > > > > > > propose
> > > > > > > > a
> > > > > > > > > "public interface" change, but not in terms of the actual
> > > > interface
> > > > > > > > class.
> > > > > > > > >
> > > > > > > > > It _is_ true that KafkaConsumer is also part of the public
> > API,
> > > > but
> > > > > > > only
> > > > > > > > > really
> > > > > > > > > for the constructor. Any proposal to define a new "consumer
> > > > client"
> > > > > > API
> > > > > > > > > should be on the Consumer interface (which you said you
> plan
> > to
> > > > do
> > > > > > > > anyway).
> > > > > > > > > I guess I brought it up because proposing an addition to
> > > Consumer
> > > > > > > implies
> > > > > > > > > it would be added to KafkaConsumer, but proposing an
> addition
> > > to
> > > > > > > > > KafkaConsumer does not necessarily imply it would also be
> > added
> > > > to
> > > > > > > > > Consumer. Does that make sense?
> > > > > > > > >
> > > > > > > > > Anyway, thanks for updating the KIP.
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > > -John
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Mon, Feb 10, 2020, at 14:38, Sophie Blee-Goldman wrote:
> > > > > > > > > > Since this doesn't seem too controversial, I'll probably
> > call
> > > > > for a
> > > > > > > > vote
> > > > > > > > > by
> > > > > > > > > > end of day.
> > > > > > > > > > If there any further comments/questions/concerns, please
> > let
> > > me
> > > > > > know!
> > > > > > > > > >
> > > > > > > > > > Thanks,
> > > > > > > > > > Sophie
> > > > > > > > > >
> > > > > > > > > > On Sat, Feb 8, 2020 at 12:19 AM Sophie Blee-Goldman <
> > > > > > > > sop...@confluent.io
> > > > > > > > > >
> > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Thanks for the feedback! That's a good point about
> trying
> > > to
> > > > > > > prevent
> > > > > > > > > users
> > > > > > > > > > > from
> > > > > > > > > > > thinking they should use this API during normal
> > processing
> > > > and
> > > > > > > > > clarifying
> > > > > > > > > > > when/why
> > > > > > > > > > > you might need it -- regardless of the method name, we
> > > should
> > > > > > > > > explicitly
> > > > > > > > > > > call this out
> > > > > > > > > > > in the javadocs.
> > > > > > > > > > >
> > > > > > > > > > > As for the method name, on reflection I agree that
> > > > > "rejoinGroup"
> > > > > > > does
> > > > > > > > > not
> > > > > > > > > > > seem to be
> > > > > > > > > > > appropriate. Of course that's what the consumer will
> > > actually
> > > > > be
> > > > > > > > doing,
> > > > > > > > > > > but that's just an
> > > > > > > > > > > implementation detail -- the name should reflect what
> the
> > > API
> > > > > is
> > > > > > > > doing,
> > > > > > > > > > > not how it does it
> > > > > > > > > > > (which can always change).
> > > > > > > > > > >
> > > > > > > > > > > How about "enforceRebalance"? This is stolen from the
> > > > > > StreamThread
> > > > > > > > > method
> > > > > > > > > > > that does
> > > > > > > > > > > exactly this (by unsubscribing) so it seems to fit.
> I'll
> > > > update
> > > > > > the
> > > > > > > > KIP
> > > > > > > > > > > with this unless anyone
> > > > > > > > > > > has another suggestion.
> > > > > > > > > > >
> > > > > > > > > > > Regarding the Consumer vs KafkaConsumer matter, I
> > included
> > > > the
> > > > > > > > > > > KafkaConsumer method
> > > > > > > > > > > because that's where all the javadocs redirect to in
> the
> > > > > Consumer
> > > > > > > > > > > interface. Also, FWIW
> > > > > > > > > > > I'm pretty sure KafkaConsumer is also part of the
> public
> > > API
> > > > --
> > > > > > we
> > > > > > > > > would
> > > > > > > > > > > be adding a new
> > > > > > > > > > > method to both.
> > > > > > > > > > >
> > > > > > > > > > > On Fri, Feb 7, 2020 at 7:42 PM John Roesler <
> > > > > vvcep...@apache.org
> > > > > > >
> > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > >> Hi all,
> > > > > > > > > > >>
> > > > > > > > > > >> Thanks for the well motivated KIP, Sophie. I had some
> > > > > > alternatives
> > > > > > > > in
> > > > > > > > > > >> mind, which
> > > > > > > > > > >> I won't even bother to relate because I feel like the
> > > > > motivation
> > > > > > > > made
> > > > > > > > > a
> > > > > > > > > > >> compelling
> > > > > > > > > > >> argument for the API as proposed.
> > > > > > > > > > >>
> > > > > > > > > > >> One very minor point you might as well fix is that the
> > API
> > > > > > change
> > > > > > > is
> > > > > > > > > > >> targeted at
> > > > > > > > > > >> KafkaConsumer (the implementation), but should be
> > targeted
> > > > at
> > > > > > > > > > >> Consumer (the interface).
> > > > > > > > > > >>
> > > > > > > > > > >> I agree with your discomfort about the name. Adding a
> > > > "rejoin"
> > > > > > > > method
> > > > > > > > > > >> seems strange
> > > > > > > > > > >> since there's no "join" method. Instead the way you
> join
> > > the
> > > > > > group
> > > > > > > > the
> > > > > > > > > > >> first time is just
> > > > > > > > > > >> by calling "subscribe". But "resubscribe" seems too
> > > indirect
> > > > > > from
> > > > > > > > what
> > > > > > > > > > >> we're really trying
> > > > > > > > > > >> to do, which is to trigger a rebalance by sending a
> new
> > > > > > JoinGroup
> > > > > > > > > request.
> > > > > > > > > > >>
> > > > > > > > > > >> Another angle is that we don't want the method to
> sound
> > > like
> > > > > > > > something
> > > > > > > > > > >> you should
> > > > > > > > > > >> be calling in normal circumstances, or people will be
> > > > > "tricked"
> > > > > > > into
> > > > > > > > > > >> calling it unnecessarily.
> > > > > > > > > > >>
> > > > > > > > > > >> So, I think "rejoinGroup" is fine, although a person
> > > _might_
> > > > > be
> > > > > > > > > forgiven
> > > > > > > > > > >> for thinking they
> > > > > > > > > > >> need to call it periodically or something. Did you
> > > consider
> > > > > > > > > > >> "triggerRebalance", which
> > > > > > > > > > >> sounds pretty advanced-ish, and accurately describes
> > what
> > > > > > happens
> > > > > > > > when
> > > > > > > > > > >> you call it?
> > > > > > > > > > >>
> > > > > > > > > > >> All in all, the KIP sounds good to me, and I'm in
> favor.
> > > > > > > > > > >>
> > > > > > > > > > >> Thanks,
> > > > > > > > > > >> -John
> > > > > > > > > > >>
> > > > > > > > > > >> On Fri, Feb 7, 2020, at 21:22, Anna McDonald wrote:
> > > > > > > > > > >> > This situation was discussed at length after a
> recent
> > > > talk I
> > > > > > > gave.
> > > > > > > > > This
> > > > > > > > > > >> KIP
> > > > > > > > > > >> > would be a great step towards increased availability
> > and
> > > > in
> > > > > > > > > facilitating
> > > > > > > > > > >> > lightweight rebalances.
> > > > > > > > > > >> >
> > > > > > > > > > >> > anna
> > > > > > > > > > >> >
> > > > > > > > > > >> >
> > > > > > > > > > >> >
> > > > > > > > > > >> > On Fri, Feb 7, 2020, 9:38 PM Sophie Blee-Goldman <
> > > > > > > > > sop...@confluent.io>
> > > > > > > > > > >> > wrote:
> > > > > > > > > > >> >
> > > > > > > > > > >> > > Hi all,
> > > > > > > > > > >> > >
> > > > > > > > > > >> > > In light of some recent and upcoming rebalancing
> and
> > > > > > > > availability
> > > > > > > > > > >> > > improvements, it seems we have a need for
> explicitly
> > > > > > > triggering
> > > > > > > > a
> > > > > > > > > > >> consumer
> > > > > > > > > > >> > > group rebalance. Therefore I'd like to propose
> > adding
> > > a
> > > > > new
> > > > > > > > > > >> > > rejoinGroup()method
> > > > > > > > > > >> > > to the Consumer client (better method name
> > suggestions
> > > > are
> > > > > > > very
> > > > > > > > > > >> welcome).
> > > > > > > > > > >> > >
> > > > > > > > > > >> > > Please take a look at the KIP and let me know what
> > you
> > > > > > think!
> > > > > > > > > > >> > >
> > > > > > > > > > >> > > KIP document:
> > > > > > > > > > >> > >
> > > > > > > > > > >> > >
> > > > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-568%3A+Explicit+rebalance+triggering+on+the+Consumer
> > > > > > > > > > >> > >
> > > > > > > > > > >> > > JIRA:
> > > https://issues.apache.org/jira/browse/KAFKA-9525
> > > > > > > > > > >> > >
> > > > > > > > > > >> > > Cheers,
> > > > > > > > > > >> > > Sophie
> > > > > > > > > > >> > >
> > > > > > > > > > >> >
> > > > > > > > > > >>
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>
>
> --
> -- Guozhang
>

Reply via email to