Yeah, my expectation was that the use of flatbuffers in v1 was giving us another option to evolve the protocol, so I was hoping it would be some time before we needed v2. Maybe we could delay the introduction of `connect.protocol` until we're convinced it's necessary? The cost of supporting both v0 and v1 seems small in any case. Always easier to add configs than remove them ;)
-Jason On Fri, Jan 25, 2019 at 11:04 AM Ewen Cheslack-Postava <e...@confluent.io> wrote: > I was going to make a related comment about connect.protocol. Even if we > have the config, we should default it to compatible given v0 state is small > and we believe v1 is better and people should migrate to it. > > While I like getting rid of configs, not sure whether we should remove it > here. If we think the protocol might continue to evolve, just setting us up > with a method to do this cleanly without just defaulting to including all > versions would be better. That said, it took 3+ years to get to v1 and I'm > not sure we're aware of any blatant deficiencies in this version, so maybe > we won't ever get to v2 anyway... > > -Ewen > > On Fri, Jan 25, 2019 at 9:38 AM Jason Gustafson <ja...@confluent.io> > wrote: > > > Hey Konstantine, > > > > Thanks for the reply. Just one response below: > > > > In 'compatible' mode, the worker sends both protocols to the broker > > > coordinator during the Join request. The field is already a list of > > > ProtocolMetadata. The broker, after gathering all the requests follows > a > > > process of selecting the first choice that is common among all the > > workers. > > > > > > Discussed briefly offline. The point I was trying to make is that the > > consumer doesn't know when a rebalance begins whether the coordinator > will > > decide to use the "eager" or "cooperative" protocol. The question is how > > that affects cooperative semantics. In other words, is it safe to allow > > tasks to continue executing when a rebalance begins even if the > coordinator > > ultimate decides to use the "eager" protocol? If it is, then I think the > > value of the `connect.protocol` configuration is diminished. We can just > > implement the "compatible" behavior and we won't need the two rolling > > restart upgrade. The size of the join state in v0 is small, so I don't > see > > much downside to retaining compatibility and our users will thank us for > > it. > > > > Best, > > Jason > > > > > > On Thu, Jan 24, 2019 at 7:04 PM Konstantine Karantasis < > > konstant...@confluent.io> wrote: > > > > > Thank you for the questions Cyrus! Replies inline: > > > > > > On Thu, Jan 24, 2019 at 6:03 PM Cyrus Vafadari <cy...@confluent.io> > > wrote: > > > > > > > "scheduled.rebalance.max.delay.ms" -- "...the actual delay used by > the > > > > leader to hold off redistribution of connectors and tasks and > maintain > > > > imbalance may be less or equal to this value." How is the actual > > delay > > > > determined and specified in the AssignmentFlatBuffer? I see this > > defaults > > > > to a max of 5 minutes -- how will a 5 minute delay for rebalancing > > affect > > > > current users who don't explicitly set this config? > > > > > > > > > > Current users that don't choose to enable the new protocol are not > > > affected. They keep running Connect with the current behavior. > > > The leader of the group sets the actual delay. It's a hint to the other > > > workers not to join for rebalance before the delay expires. > > > But it's not a hard requirement. A new worker might join in the > meantime, > > > or another worker might leave the group while a delay is in effect. > > > > > > > > > > > > > > If both this KIP and "Static Membership" KIP are merged, is there a > > case > > > > where any user would use static membership over incremental > rebalancing > > > > (which has less configuration/user-input)? If yes, can you elaborate > > why > > > a > > > > user would use both side-by-side, or if there is situation where one > is > > > an > > > > obvious "best" choice? I raise the concern with > usability/config-sprawl > > > in > > > > mind, that if this KIP deprecates a config property, we should note > > that. > > > > explicitly. > > > > > > > > > > > > > As I mentioned above, KIP-345 and KIP-415 conceptually share some > common > > > goals. They avoid unnecessary re-assignments of resources to the > members > > of > > > the group. > > > In their current form, they can't be combined immediately, even after > the > > > code is merged, because KIP-345 is targeting Consumer groups and > KIP-415 > > > will be implemented for Connect worker groups. KIP-345 is more precise > in > > > re-assignment by requiring a unique id for each group member. KIP-415 > > > tolerates temporary imbalances and is more broad in the sense that > > > addresses group resizes as well. Static membership could be used along > > with > > > incremental cooperative rebalancing in the future to improve assignment > > of > > > resources (e.g. topic-partitions or connectors and tasks). Therefore, > > even > > > if they are combined for the same group at some point, I don't foresee > an > > > immediate need to deprecate one over the other. > > > > > > Cheers, > > > Konstantine > > > > > > > > > > > > > On Tue, Jan 22, 2019 at 6:26 PM Konstantine Karantasis < > > > > konstant...@confluent.io> wrote: > > > > > > > > > Hi Randall, thanks for you comments! Replying inline below: > > > > > > > > > > > > > > > On Sat, Jan 19, 2019 at 4:26 PM Randall Hauch <rha...@gmail.com> > > > wrote: > > > > > > > > > > > Thanks for all this work, Konstantine. > > > > > > > > > > > > I have a question about when a member leaves. Here's the partial > > > > > scenario, > > > > > > repeated from the KIP: > > > > > > > > > > > > > > > > > > Initial group and assignment: W1([AC0, AT1]), W2([AT2, BC0]), > > > W3([BT1]) > > > > > > Config topic contains: AC0, AT1, AT2, BC0, BT1 > > > > > > W2 leaves > > > > > > Rebalance is triggered > > > > > > W1 joins with assignment: [AC0, AT1] > > > > > > W3 joins with assignment: [BT1] > > > > > > W1 becomes leader > > > > > > W1 computes and sends assignments: > > > > > > W1(delay: d, assigned: [AC0, AT1], revoked: []) > > > > > > W3(delay: d, assigned: [BT1], revoked: []) > > > > > > After delay d: > > > > > > W1 joins with assignment: [AC0, AT1] > > > > > > W3 joins with assignment: [BT1] > > > > > > Rebalance is triggered > > > > > > ... > > > > > > > > > > > > How does the leader/group know that all of the still-alive > members > > > have > > > > > > joined and that it's safe to proceed by triggering the rebalance? > > Is > > > it > > > > > > because this time is capped by the heartbeat interval, or is it > > > because > > > > > the > > > > > > leader sees a join from all of the workers to which it sent > > > > assignments? > > > > > > > > > > > > > > > > > A rebalance is triggered by the broker coordinator after a new > worker > > > > joins > > > > > or a current worker leaves the group. The other members are > notified > > > via > > > > > heartbeats. > > > > > What the KIP suggests is that after the initial rebalance is > > triggered, > > > > the > > > > > leader will choose an assignment and possibly a delay. After that > > > delay, > > > > > the workers will attempt another rebalance to resolve any potential > > > > > imbalances left pending during the previous round. > > > > > > > > > > Also, would it be helpful for the description of the " > > > > > > scheduled.rebalance.max.delay.ms" property denote how well this > > > might > > > > > > tolerate workers that leave and rejoin? Without that context it > > might > > > > be > > > > > > difficult for users to know what the value really means. > > > > > > > > > > > > > > > > That is a good point. The description would appreciate further > > > > explanation. > > > > > Here's a suggestion I'm adding to the KIP. > > > > > > > > > > "This is a delay that the leader may set to tolerate departures of > > > > workers > > > > > from the group by allowing a transient imbalance connector and task > > > > > assignments. During this delay a worker has the opportunity to > return > > > to > > > > > the group and get reassigned the same or similar amount of work as > > > > before. > > > > > This property corresponds to the maximum delay that the leader may > > set > > > > in a > > > > > single assignment. The actual delay used by the leader to hold off > > > > > redistribution of connectors and tasks and maintain imbalance may > be > > > less > > > > > or equal to this value." > > > > > > > > > > > > > > > > > > > > > The KIP does a nice job showing how this change will better > handle > > > > > > evolution of the embedded protocol for Connect, especially with > the > > > > > > flatbuffers. How might the values of the "connect.protocol" > > property > > > > > evolve > > > > > > with those potential changes? Do the current literals lock us in > > more > > > > > than > > > > > > we'd like? > > > > > > > > > > > > > > > > > With respect to naming here, I'm definitely open to suggestions. I > > > chose > > > > > more descriptive, enum-like names for the options of the > > > > 'connect.protocol' > > > > > property that correspond to the current and proposed status. I > > > anticipate > > > > > in the future, when we introduce policies that will require a > > different > > > > > value here, that we should be able to find appropriate names to map > > to > > > > the > > > > > additional values. > > > > > > > > > > Finally, it's good to see the "Compatibility, Deprecation, and > > > Migration > > > > > > Plan" section discuss a plan for deprecating the current (v0) > > > embedded > > > > > > protocol in: > > > > > > > > > > > > "Connect protocol version 0 will be marked deprecated in the next > > > major > > > > > > release of Apache Kafka (currently 3.0.0). After adding a > > deprecation > > > > > > notice on this release, support of version 0 of the Connect > > protocol > > > > will > > > > > > be removed on the subsequent major release of Apache Kafka > > (currently > > > > > > 4.0.0)." > > > > > > > > > > > > > > > > > > One concern I have with this wording is that it leaves no room > for > > > > > proving > > > > > > and validating the new cooperative protocol in real world use. Do > > we > > > > need > > > > > > to instead suggest that the v0 protocol will be deprecated in a > > > future > > > > > > release after the cooperative protocol has been proven at least > as > > > good > > > > > as > > > > > > the V0 protocol, and removed in the first major release after > that? > > > > > > > > > > > > > > > > > My impression is that accuracy in deprecation plans is appreciated > in > > > > KIPs. > > > > > However, I definitely don't consider this suggestions set in stone, > > and > > > > I'm > > > > > happy to adjust or keep the plan open if we think this is > > preferrable. > > > > Just > > > > > to note here, that my initial suggestion is for deprecation to be > > > > triggered > > > > > in the next major release, and to discontinue support of the old > > format > > > > in > > > > > the major release after the next one. Again, I don't have strong > > > > objections > > > > > about extending the plan even further. > > > > > > > > > > Thanks for the comments! > > > > > Best, > > > > > Konstantine > > > > > > > > > > Once again, very nice work, Konstantine. > > > > > > > > > > > > Best regards, > > > > > > > > > > > > Randall > > > > > > > > > > > > > > > > > > On Fri, Jan 18, 2019 at 2:00 PM Boyang Chen <bche...@outlook.com > > > > > > wrote: > > > > > > > > > > > > > Thanks a lot for the detailed explanation here Konstantine! I > > > > strongly > > > > > > > agree that a rolling start of > > > > > > > Kafka broker is not the optimal solution when we have an > > > alternative > > > > to > > > > > > > just upgrade the client. Also > > > > > > > I fully understood your explanation on task shuffle minimum > > impact > > > in > > > > > the > > > > > > > workers scenario, because > > > > > > > the local storage usage is very limited. > > > > > > > > > > > > > > Focusing on the current KIP, a few more suggestions are: > > > > > > > > > > > > > > 1. I copy-pasted partial scenario on the Leader bounces > section > > > > > > > d' is the remaining delay > > > > > > > W1, which is the leader, leaves > > > > > > > Rebalance is triggered > > > > > > > W2 joins with assignment: [] > > > > > > > W3 joins with assignment: [BT1] > > > > > > > W3 becomes leader. > > > > > > > There's an active delay in progress. > > > > > > > W3 computes and sends assignments: > > > > > > > W2(delay: d'', assigned: [], revoked: []) > > > > > > > W3(delay: d'', assigned: [BT1, AT1], revoked: []) > > > > > > > after we start d' round of delayed rebalance. Why does W3 send > > > > > assignment > > > > > > > [BT1, AT1] instead of just [BT1] here? I guess we won't do the > > > > > > > actual rebalance until the original scheduled delay d is > reached > > > > right? > > > > > > > > > > > > > > 2. we are basically relying on the leader subscription to > persist > > > the > > > > > > > group assignment across the generation and leader rejoin to > > trigger > > > > > > > necessary rebalance. This assumption could potentially be > broken > > > with > > > > > > > future upgrades of > > > > > > > broker as we are discussing > > > > > > > https://issues.apache.org/jira/browse/KAFKA-7728. This JIRA > will > > > be > > > > > > > converted to a ready KIP by Mayuresh pretty soon, > > > > > > > and our goal here is to avoid unnecessary rebalance due to > leader > > > > > bounces > > > > > > > by specifying a field called JoinReason for broker to > interpret. > > > With > > > > > > that > > > > > > > change in mind, I think it's worth mentioning this potential > > > > dependency > > > > > > > within KIP-415 so that we don't forget to have corresponding > > change > > > > to > > > > > > adapt > > > > > > > to 7728 broker upgrade in case JoinReason change happens before > > > > > KIP-415. > > > > > > > Am I clear on the situation explanation? > > > > > > > > > > > > > > 3. cooperative cmeans -> means that only Incremental > Cooperative > > > > > Connect > > > > > > > protocol is enabled (version 1 or higher). > > > > > > > > > > > > > > 4. For the compatibility change, I'm wondering whether we could > > > just > > > > > use > > > > > > 2 > > > > > > > connect protocols instead of 3. Because the user knows when all > > the > > > > > > workers > > > > > > > all upgraded to version 1, we could just use `compatible` for > the > > > > first > > > > > > > rolling bounce > > > > > > > and 'cooperative' for the second bounce. Could you explain a > bit > > > why > > > > we > > > > > > > need to start from `eager` stage? > > > > > > > > > > > > > > cc Mayuresh on this thread. > > > > > > > > > > > > > > Thanks, > > > > > > > Boyang > > > > > > > > > > > > > > ________________________________ > > > > > > > From: Konstantine Karantasis <konstant...@confluent.io> > > > > > > > Sent: Friday, January 18, 2019 8:32 AM > > > > > > > To: dev@kafka.apache.org > > > > > > > Subject: Re: [DISCUSS] KIP-415: Incremental Cooperative > > Rebalancing > > > > in > > > > > > > Kafka Connect > > > > > > > > > > > > > > Hi Stanislav and Boyang. Thanks for your comments. > > > > > > > > > > > > > > You are both asking how KIP-345 affects this KIP, so, first, > I'll > > > > > address > > > > > > > this point. > > > > > > > > > > > > > > KIP-345 has recently introduced an option that will allow Kafka > > > > > consumer > > > > > > > applications to avoid rebalances due to the departure and > > > > subsequently > > > > > > > return of a member in the group. That way KIP-345 offers a > remedy > > > for > > > > > the > > > > > > > cases of rolling bounces and replacement of nodes due to > failures > > > > that > > > > > > can > > > > > > > happen quickly. > > > > > > > > > > > > > > Without ruling out that policies of Incremental Cooperative > > > > Rebalancing > > > > > > may > > > > > > > use static membership eventually in order to better address > such > > > use > > > > > > cases, > > > > > > > next I'll mention a few reasons why I believe KIP-415, which is > > the > > > > > > > proposal of Incremental Cooperative Rebalancing in Connect, > > should > > > > > > proceed > > > > > > > independently at first: > > > > > > > > > > > > > > * KIP-345 requires an upgrade to both Kafka brokers and > Connect > > > > > > workers. > > > > > > > This requirement is very strict for a big group of Connect > users > > > that > > > > > > > anticipate a solution to the stop-the-world effect in Connect > in > > > > order > > > > > to > > > > > > > grow their Connect clusters, but can not afford to also have to > > > > upgrade > > > > > > > their Kafka brokers in order to enjoy the suggested > improvements. > > > > > > > * Connect clusters are traditionally locally stateless and > > > > > lightweight, > > > > > > > in the sense that they don't store state outside Kafka and that > > > this > > > > > > state > > > > > > > is easy to process during startup. Overall, based on their > > overall > > > > > > > configuration and deployment requirements, Connect Workers very > > > > > suitable > > > > > > to > > > > > > > run in containers. With respect to the resources that Connect > > > Workers > > > > > are > > > > > > > rebalancing, connectors and tasks are (and honestly should be) > > easy > > > > to > > > > > > spin > > > > > > > and redistribute in a Connect cluster. This is true because > > > > connectors > > > > > > > depend on Kafka and the systems they connect in order to save > > their > > > > > > > progress. They don't use the Workers' local instances. Given > this > > > > > > reality, > > > > > > > the configuration of a unique id, per KIP-345's requirement, > > > doesn't > > > > > seem > > > > > > > necessary for Connect to introduce yet. The upgrade path is > made > > > even > > > > > > > easier without having to define unique ids and in practice the > > > > > heuristics > > > > > > > of Incremental Cooperative Rebalancing have the potential to > > cover > > > > > > > (immediately or eventually) most of the scenarios that make > > > > rebalancing > > > > > > and > > > > > > > stop-the-world problematic in Connect today. > > > > > > > * Static membership has not been merged yet. Given that > KIP-415 > > > > > > addresses > > > > > > > also scale-up and scale-down scenarios and the important > > > side-effect > > > > > that > > > > > > > the submission of a new connector has to other connectors in > the > > > > > worker's > > > > > > > group, it seems to me that introducing an interdependency > between > > > the > > > > > two > > > > > > > proposals is not necessary. Again, this doesn't prevent > > > reconsidering > > > > > > > integration in the future. > > > > > > > * Finally, it's not immediately obvious to me that > integration > > > > > between > > > > > > > the two proposals also means significantly simpler > implementation > > > in > > > > > > > Connect. That's because Connect Workers will have to handle a > > delay > > > > one > > > > > > way > > > > > > > or the other. Plus, the group management and resource > assignment > > > code > > > > > is > > > > > > > mostly separate between Connect and the Consumer. > > > > > > > > > > > > > > With respect to your other comments, Stanislav, glad you found > > the > > > > > > examples > > > > > > > easy to read. I'll change the KIP to show who's leader at the > > > > beginning > > > > > > as > > > > > > > well. > > > > > > > Boyang, I'll add a paragraph to highlight why local state is > not > > > the > > > > > most > > > > > > > pressing issue in Connect. > > > > > > > > > > > > > > Thank you both for your initial comments. > > > > > > > Konstantine > > > > > > > > > > > > > > > > > > > > > On Mon, Jan 14, 2019 at 9:24 AM Boyang Chen < > bche...@outlook.com > > > > > > > > wrote: > > > > > > > > > > > > > > > Hey Konstantine, > > > > > > > > > > > > > > > > great work for making this happen! Incremental rebalancing is > > > super > > > > > > > > important for avoiding unnecessary resource shuffle and > > improving > > > > the > > > > > > > > overall Connect framework stability. > > > > > > > > > > > > > > > > After the first pass, two questions across my mind are: > > > > > > > > > > > > > > > > 1. For my understanding, the general rebalancing case > could > > be > > > > > > covered > > > > > > > > by configuring the workers as static members, so that we > don't > > > need > > > > > to > > > > > > > > worry about worker temporarily leaving group case. Basically > > > > KIP-345 > > > > > > > could > > > > > > > > help with avoiding unexpected rebalances during cluster > rolling > > > > > bounce > > > > > > > > which I feel the same way as Stanislav that parts of 415 > logic > > > > could > > > > > be > > > > > > > > simplified. It would be great if we could look at these two > > > > > initiatives > > > > > > > > holistically to help reduce the common workload. > > > > > > > > 2. Since I never used Connect before, I do hope you could > > > > > enlighten > > > > > > me > > > > > > > > on the potential effort involved in task transfer between > > > workers. > > > > > The > > > > > > > > reason I ask is to estimate how much burden will we introduce > > by > > > > > > > starting a > > > > > > > > task on the brand new worker? Is there any local state to be > > > > > replayed? > > > > > > It > > > > > > > > would be good to also provide this background in the KIP > > > motivation > > > > > so > > > > > > > that > > > > > > > > people could understand better of the symptom and build > > > > constructive > > > > > > > > feedbacks. > > > > > > > > > > > > > > > > Thanks a lot! > > > > > > > > > > > > > > > > Boyang > > > > > > > > ________________________________ > > > > > > > > From: Stanislav Kozlovski <stanis...@confluent.io> > > > > > > > > Sent: Monday, January 14, 2019 3:15 PM > > > > > > > > To: dev@kafka.apache.org > > > > > > > > Subject: Re: [DISCUSS] KIP-415: Incremental Cooperative > > > Rebalancing > > > > > in > > > > > > > > Kafka Connect > > > > > > > > > > > > > > > > Hey Konstantine, > > > > > > > > > > > > > > > > This is a very exciting and fundamental-improving KIP, > thanks a > > > lot > > > > > for > > > > > > > > working on it! > > > > > > > > > > > > > > > > Have you seen KIP-345 > > > > > > > > <https://cwiki.apache.org/confluence/display/KAFKA/KIP-345>? > I > > > was > > > > > > > > wondering whether Connect would support the static group > > > > membership - > > > > > > > > potentially limiting the need to handle "node bounce" cases > > > > through a > > > > > > > > rebalance (even though there wouldn't be downtime). I find it > > is > > > > > > somewhat > > > > > > > > related to the `scheduled.rebalance.max.delay.ms` config > > > described > > > > > in > > > > > > > > KIP-415. The main difference I think is that rebalance delay > in > > > > > KIP-345 > > > > > > > is > > > > > > > > configurable through `session.timeout.ms` which is tied to > the > > > > > > liveness > > > > > > > > heartbeat, whereas here we have a separate config. > > > > > > > > > > > > > > > > The original design document suggested > > > > > > > > > Assignment response includes usual assignment information. > > > Start > > > > > > > > processing any new partitions. (Since we expect sticky > > > assignment, > > > > we > > > > > > > could > > > > > > > > also optimize this and omit the assignment when it is just > > > > repeating > > > > > a > > > > > > > > previous assignment) > > > > > > > > Have we decided on whether we would make use of the > > optimization > > > as > > > > > to > > > > > > > not > > > > > > > > send the assignment that the worker already knows about? > > > > > > > > > > > > > > > > I enjoyed reading the rebalancing examples. As a small > > > readability > > > > > > > > improvement, could I suggest we clarify which Worker > (W1,W2,W3) > > > is > > > > > the > > > > > > > > leader in the "Initial group and assignment" part? For > example, > > > in > > > > > the > > > > > > > > `Leader bounces` I was left thinking whether the leaving W2 > was > > > the > > > > > > > initial > > > > > > > > leader or not. > > > > > > > > > > > > > > > > Thanks, > > > > > > > > Stanislav > > > > > > > > > > > > > > > > On Sat, Jan 12, 2019 at 1:44 AM Konstantine Karantasis < > > > > > > > > konstant...@confluent.io> wrote: > > > > > > > > > > > > > > > > > Hi all, > > > > > > > > > > > > > > > > > > I just published KIP-415: Incremental Cooperative > Rebalancing > > > in > > > > > > Kafka > > > > > > > > > Connect > > > > > > > > > on the wiki here: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals > > > > > > > > > > > > > > > > > > This is the first KIP to suggest an implementation of > > > incremental > > > > > and > > > > > > > > > cooperative rebalancing in the context of Kafka Connect. It > > > aims > > > > to > > > > > > > > provide > > > > > > > > > an adequate solution to the stop-the-world effect that > occurs > > > in > > > > a > > > > > > > > Connect > > > > > > > > > cluster whenever a new connector configuration is submitted > > or > > > a > > > > > > > Connect > > > > > > > > > Worker is added or removed from the cluster. > > > > > > > > > > > > > > > > > > Looking forward to your insightful feedback! > > > > > > > > > > > > > > > > > > Regards, > > > > > > > > > Konstantine > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > Best, > > > > > > > > Stanislav > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >