Hey Boyang,

Thanks for the KIP. I think my main question is in the same vein as James'.
The problem is that the coordinator needs to be able to identify which
instance of a particular memberId is the active one. For EOS, each
transactionalId gets an epoch. When a new producer is started, it bumps the
epoch which allows the transaction coordinator to fence off any zombie
instances which may try to continue doing work with the old epoch. It seems
like we need a similar protection for consumer members.

Suppose for example that we distinguish between a registration id which is
provided by the user and a member id which is assigned uniquely by the
coordinator. In the JoinGroup request, both the registration id and the
member id are provided. When a consumer is first started, it doesn't know
the memberId, so it it provides only the registration id. The coordinator
can then assign a new memberId and invalidate the previous one that was
associated with the registration id. This would then fence off the previous
instance which was still trying to use the member id.

Taking a little bit of a step back, I think the main observation in this
KIP is that applications with heavy local state need to have a strong bias
toward being able to reuse that state. It is a bit like Kafka itself in the
sense that a replica is not moved just because the broker is shutdown as
the cost of moving the log is extremely high. I'm wondering if we need to
think about streams applications in a similar way. Should there be a static
notion of the members of the group so that streams can make rebalancing
decisions more easily without depending so heavily on transient membership?
I feel the hacks we've put in place in some cases to avoid rebalances are a
bit brittle. Delaying group joining for example is an example of this. If
you knew ahead of time who the stable members of the group were, then this
would not be needed. Anyway, just a thought.

Thanks,
Jason



On Fri, Jul 27, 2018 at 1:58 PM, James Cheng <wushuja...@gmail.com> wrote:

> When you say that it will "break", what does this breakage look like? Will
> the consumer-group be non-functional? Will just those instances be
> non-functional? Or will the group be functional, but the rebalancing be
> non-optimal and require more round-trips/data-transfer? (similar to the
> current algorithm)
>
> I'm trying to assess the potential for user-error and the impact of
> user-error.
>
> -James
>
> > On Jul 27, 2018, at 11:25 AM, Boyang Chen <bche...@outlook.com> wrote:
> >
> > Hey James,
> >
> >
> > the algorithm is relying on client side to provide unique consumer
> member id. It will break unless we enforce some sort of validation (host +
> port) on the server side. To simplify the first version, we do not plan to
> enforce validation. A good comparison would be the EOS producer which is in
> charge of generating unique transaction id sequence. IMO for broker logic,
> the tolerance of client side error is not unlimited.
> >
> >
> > Thank you!
> >
> >
> > ________________________________
> > From: James Cheng <wushuja...@gmail.com>
> > Sent: Saturday, July 28, 2018 1:26 AM
> > To: dev@kafka.apache.org
> > Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by
> specifying member id
> >
> >
> >> On Jul 26, 2018, at 11:09 PM, Guozhang Wang <wangg...@gmail.com> wrote:
> >>
> >> Hi Boyang,
> >>
> >> Thanks for the proposed KIP. I made a pass over the wiki and here are
> some
> >> comments / questions:
> >>
> >> 1. In order to preserve broker compatibility, we need to make sure the
> >> broker version discovery logic can be integrated with this new logic.
> I.e.
> >> if a newer versioned consumer is talking to an older versioned broker
> who
> >> does not recognize V4, the client needs to downgrade its
> JoinGroupRequest
> >> version to V3 and not setting the member-id specifically. You can take a
> >> look at the ApiVersionsRequest and see how to work with it.
> >>
> >> 2. There may exist some manners to validate that two different clients
> do
> >> not send with the same member id, for example if we pass along the
> >> host:port information from KafkaApis to the GroupCoordinator interface.
> But
> >> I think this is overly complicated the logic and may not worthwhile than
> >> relying on users to specify unique member ids.
> >
> > Boyang,
> >
> > Thanks for the KIP! How will the algorithm behave if multiple consumers
> provide the same member id?
> >
> > -James
> >
> >> 3. Minor: you would need to bumping up the version of JoinGroupResponse
> to
> >> V4 as well.
> >>
> >> 4. Minor: in the wiki page, you need to specify the actual string value
> for
> >> `MEMBER_ID`, for example "member.id".
> >>
> >> 5. When this additional config it specified by users, we should consider
> >> setting the default of internal `LEAVE_GROUP_ON_CLOSE_CONFIG` to false,
> >> since otherwise its effectiveness would be less.
> >>
> >>
> >> Guozhang
> >>
> >>
> >>
> >>> On Thu, Jul 26, 2018 at 9:20 PM, Boyang Chen <bche...@outlook.com>
> wrote:
> >>>
> >>> Hey friends,
> >>>
> >>>
> >>> I would like to open a discussion thread on KIP-345:
> >>>
> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-345%3A
> >>> +Reduce+multiple+consumer+rebalances+by+specifying+member+id
> >>>
> >>>
> >>> This KIP is trying to resolve multiple rebalances by maintaining the
> >>> consumer member id across rebalance generations. I have verified the
> theory
> >>> on our internal Stream application, and it could reduce rebalance time
> to a
> >>> few seconds when service is rolling restart.
> >>>
> >>>
> >>> Let me know your thoughts, thank you!
> >>>
> >>>
> >>> Best,
> >>>
> >>> Boyang
> >>>
> >>
> >>
> >>
> >> --
> >> -- Guozhang
>
>

Reply via email to