Hello Konstantine,

Thanks for the marvelous effort writing up this KIP! I've made a pass over
it and here's some comments:

1) For other audience to better understand the gist of this proposal, I'd
suggest we add the following context before the "Changes to Connect's
Rebalancing Process" section:

"
The core in a group rebalance protocol is to have a synchronization barrier
such that every member of the group will coordinate on, such that before
everyone hit this barrier all the states will not be changed at all. In the
current rebalance protocol, this synchronization barrier is the reception
of the JoinGroup request: coordinator will not send any responses to any
members until it determines that all JoinGroup requests have been received.
And since right after this barrier the new assignment will be made and the
assigned partitions may no longer be re-assigned to the same member (i.e.
consumer) of the group, today we have to be conservative that all members
revoke all the resources they currently own before proceeding to the
synchronization barrier.

This KIP's key idea is to postpone the synchronization barrier to the
second rebalance's JoinGroup reception, so that in the first rebalance
since we know NO new assignment will ever be executed, members do not need
to revoke anything before joining the group. In other words, we are paying
more rebalances than the naive solution (at least two rebalances will be
required), but each rebalance now could be much lighter.
"

2) In this idea, the leader needs to be able to distinguish the "first"
rebalance where no new assignment will be executed, but only revocations
are indicated, and the "second" rebalance where there are some "either
revoked or left from leaving member" partitions to be assigned. What is not
clear to me is how to distinguish these two cases, and when it decides to
inject the delay (i.e. it is the first rebalance) v.s. not injecting
delays. Comparing the "*Non-first new member joins*" and "*Worker bounces*"
scenarios: in the former case, the leader would decide it is the first
rebalance and let W1 revoke some assignment, WITHOUT delay, while in the
latter case, when W2 rejoins (at this case it rejoined as a new member, so
from the coordinator and leader's point of view there should be no
difference compared to "W2 joins as a new member"), the leader assigns to
W2 with AT2 and BC0. Also what we did not illustrate in the KIP on consumer
failures in between rebalances: for another example, suppose in "*Non-first
new member joins*" W1 fails after revoked some partitions but before
triggers another rebalance, then when coordinator triggers another join
based on failure detection, how would the leader assign partitions? Would
it assign all five partitions immediately to W2 and W3 or would it inject
delays and not assign any to W2 and W3, or would it assign the ones
indicated for revocation to W2 and W3? Could you provide some pseudo code
on the leader logic, such that given the list of subscriptions, how would
the leader decides:

2.a) adds an delay or not;
2.b) assign new resources to some members or not;
2.c) revoke new resources to some members or not.

3) About compatibility, I'm also wondering how would downgrade be executed
here: suppose after upgrading the Connect jar and migrate to `cooperative`
mode, users discovered a bug and hence needs to downgrade back to older
versions that does not support `cooperative`.

4) This is sort of orthogonal to this KIP, but I'm also considering about
code sharing with the future Streams incremental rebalance protocol. For
Kafka Streams, one difference is that because of the state maintenance,
migrating tasks are heavier and hence we should consider bootstrapping the
assigned task before revoking it from the old client. So far it seems
Streams incremental rebalance protocol would be a bit different from the
Connect protocol proposed in KIP-415 here. What they may share in common
are a) flatbuffer utils for encoding metadata bytes, and b) consumer
members actively triggers another rebalance by sending join-group request.
So I'm wondering if we can push these two pieces of logic into the
AbstractCoordinator so it can be shared?

Guozhang



On Wed, Feb 6, 2019 at 9:58 PM Boyang Chen <bche...@outlook.com> wrote:

> Thanks Konstantine for the great summary! +1 for having a separate KIP
> discussing the trade-offs for using a new serialization format for the
> protocol encoding. We probably could discuss a wider options and benchmark
> on the performance before reaching a final decision.
>
> Best,
> Boyang
> ________________________________
> From: Konstantine Karantasis <konstant...@confluent.io>
> Sent: Tuesday, February 5, 2019 4:23 AM
> To: dev@kafka.apache.org
> Subject: Re: [DISCUSS] KIP-415: Incremental Cooperative Rebalancing in
> Kafka Connect
>
> Hi all,
>
> Thank you for your comments so far.
> Now that KIP freeze and feature freeze are behind us for version 2.2, I'd
> like to bring this thread back at the top of the email stack, with the
> following suggestion:
>
> I'll be changing KIP-415's description to include a serialization format
> that extends the current scheme and is based on Kafka structs.
>
> The initial suggestion to transition to using an alternative serialization
> format (e.g. flatbuffers) was made just in case we saw this would have a
> good potential and we could arrive in a quick consensus on this matter. I
> believe the arguments for such a transition make sense, but the pros are
> probably not enough to outweigh the introduction of a dependency at this
> point and justify changes in every client that will potentially use
> incremental cooperative rebalancing in the future. The changes in the
> rebalancing protocol have not been very frequent so far.
>
> Admittedly, even more important is the fact that the discussion around the
> serialization format of the new protocol is only tangentially related to
> the core of KIP-415. Thus, in order to keep the discussion focused on the
> essential changes required by KIP-415, which are expected to have
> significant impact in addressing the stop-the-world effect, I'd like to
> punt any optimizations to the serialization format and change the KIP to
> describe a schema that depends on Kafka structs as the current (V0) version
> does.
>
> I hope this will allow us to make progress easier and bring the changes of
> this new rebalancing protocol to Kafka clients, beginning with Kafka
> Connect, in a more applicable and less disruptive way.
>
> I'll change the schema descriptions by end of day.
>
> Looking forward to your next comments!
>
> Konstantine
>
> On Mon, Jan 28, 2019 at 5:22 PM Konstantine Karantasis <
> konstant...@confluent.io> wrote:
>
> >
> > Hi Ismael,
> > thanks for bringing up serialization in the discussion!
> >
> > Indeed, JSON was considered given it's the prevalent text-based
> > serialization option.
> >
> > In comparison to flatbuffers, most generic pros and cons are valid in
> this
> > context too. Higher perfomance during serde, small size, optional fields,
> > strongly typed and others.
> >
> > Specifically, for Connect's use case, flatbuffers serialization, although
> > it introduces a single dependency, it appears more appealing for the
> > following reasons:
> >
> > * The protocol is evolving from a binary format again to a binary one.
> > * Although new fields, nested or not, are expected to be introduced (as
> in
> > KIP-415) or old fields may get deprecated, the protocol schemas are
> > expected to be simple, mostly flat and manageable. We won't need to
> process
> > arbitrarily nested structures during runtime, for which JSON would be a
> > better fit. The current proposal aims to make the current append only
> > format a bit more flexible.
> > * It's good to keep performance tight because the loop that includes
> > subprotocol serde will need to accomodate resource release and
> assignment.
> > Also, rebalancing in it's incremental cooperative form which is expected
> to
> > be lighter has the potential to start happening more frequently. Parsing
> > JSON with Jackson has been a hotspot in certain occasions in the past if
> I
> > remember correctly.
> > * Evolution will be facilitated by handling or ignoring optional fields
> > easily. The protocol may evolve with fewer hard version bumps like the
> one
> > proposed here from V0 to V1.
> > * Optional fields are omitted, not just compressed.
> > * Unpacking of fields does not require deserialization of the whole
> > message, making transition between versions or flavors of the protocol
> easy
> > and performant.
> > * Flatbuffers' specification is simple and can be implemented, even in
> the
> > absence of appropriate clients.
> >
> > I hope the above highlight why flatbuffers is a good candidate for this
> > use case and, thus, worth adding as a dependency.
> > Strictly speaking, yes, they introduce a new compile-time dependency. But
> > during runtime, such a dependency seems equivalent to introducing a JSON
> > parser (such as Jackson that is already being used in AK).
> >
> > Your question is very valid. It's probably worth adding an item under
> > rejected alternatives, once we agree how we want to move forward.
> >
> > Best,
> > Konstantine
> >
> >
> >
> > On Fri, Jan 25, 2019 at 11:13 PM Ismael Juma <isma...@gmail.com> wrote:
> >
> >> Thanks for the KIP Konstantine. Quick question: introducing a new
> >> serialization format (ie flatbuffers) has major implications. Have we
> >> considered json? If so, why did we reject it?
> >>
> >> Ismael
> >>
> >> On Fri, Jan 11, 2019, 3:44 PM Konstantine Karantasis <
> >> konstant...@confluent.io wrote:
> >>
> >> > Hi all,
> >> >
> >> > I just published KIP-415: Incremental Cooperative Rebalancing in Kafka
> >> > Connect
> >> > on the wiki here:
> >> >
> >> >
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals
> >> >
> >> > This is the first KIP to suggest an implementation of incremental and
> >> > cooperative rebalancing in the context of Kafka Connect. It aims to
> >> provide
> >> > an adequate solution to the stop-the-world effect that occurs in a
> >> Connect
> >> > cluster whenever a new connector configuration is submitted or a
> Connect
> >> > Worker is added or removed from the cluster.
> >> >
> >> > Looking forward to your insightful feedback!
> >> >
> >> > Regards,
> >> > Konstantine
> >> >
> >>
> >
>


-- 
-- Guozhang

Reply via email to