Hi Stanislav & Viktor,

Kafka has historically delegated partition reassignment to external tools.  For 
example, the choice of which partitions to reassign is determined by an 
external tool.  The choice of what throttle to use is also set externally. 
 So I think that putting this into the controller is not very consistent with 
how Kafka currently works.  It seems better to continue to let batching be 
handled by an external tool, rather than handled by the controller.

The big advantage of having reassignment done by an external tool is that users 
are free to set their own reassignment policies without hacking Kafka itself.  
So, for example, you can build a reassignment tool that moves partitions only 
when they're smaller than 1 GB without writing a KIP for a new 
reassign.only.if.smaller.than.1g configuration.

The same is true of batching, I think.  Do you want to set up your batch so 
that you're always moving a given number of megabytes per second?  Or maybe you 
want to move 3 partitions at all times?  Do you want to move the biggest 
partitions first?  Etc. etc.  These things are all policy, and it's a lot 
easier to iterate on the policy when it's not hard-coded in the controller.  
Even changing these configurations at runtime will be very difficult if they 
are controller configs.

Of course, there are disadvantages to having reassignment done by an external 
tool.  The external tool is another thing you have to deploy and run.  The tool 
also needs to collect statistics about the cluster which the controller already 
mostly has, so there is some duplication there.

Having some reassignment policies set by the controller and some set by an 
external tool gets us the worst of both worlds.  Users would need to deploy and 
run external reassignment tools, but they would also need to worry about 
correctly configuring the controller so that it could execute their preferred 
policies.

We just got done making a major change to the reassignment API, and that change 
is still percolating through the environment.  Most external tools haven't been 
updated yet to use the new API.  I think we should see how the new API works 
out before making more major changes to reassignment.

In practice, external reassignment tools already do handle batching.  For 
example, Cruise Control decides what to move, but also divides up the work into 
batches.  The main functionality that we lack is a way to do this with just the 
code shipped as part of Apache Kafka itself.

So, if we want to give users a better out-of-the-box experience, why not add a 
mode to kafka-reassign-partitions.sh that lets it keep running for a while and 
breaks down the requested partition moves into batches?  I think this is much 
more consistent with how the system currently works, and much less likely to 
create technical debt for the future, than moving functionality into the 
controller.

best,
Colin


On Wed, Dec 4, 2019, at 06:11, Stanislav Kozlovski wrote:
> Hey all,
> 
> I spoke offline with Viktor regarding this KIP and intentions on starting a
> vote soon, so I made another pass on it. I have some comments:
> 
> From our past discussion:
> 
> > I reiterated on the current algorithm and indeed it would change the order
> of replicas in ZK but wouldn't do a leader election, so one would need to
> run the preferred replica election tool. However still in the light of this
> I'm not sure I'd keep the existing behavior as users wouldn't win anything
> with it.
> 
> I think we should mention the change in the KIP at the very least such that
> we can discuss it with other reviewers.
> 
> > If I understand correctly, you're suggesting that we count the
> "reassignment.parallel.leader.movements" config against such preferred
> elections. I think that makes sense. If we are to do that we should
> explicitly mention that this KIP touches the preferred election logic as
> well. Would that config also bound the number of leader movements the auto
> rebalance inside the broker would do as well?
> > ... It might be easier to limit the scope of this KIP and not place a
> bound on preferred leader election.
> 
> What are your thoughts on this?
> 
> And now comments on the KIP:
> 
> 1.
> 
>    1. > Update CR in Zookeeper
>    (/brokers/topics/[topic]/partitions/[partitionId]/state) with TR for the
>    given partitions.
> 
> 
> I believe the leader for the partition is the "owner" of that znode. Having
> the controller update it may complicate things.
> Has this been updated with the latest KIP-455? We keep reassignment state
> in the /brokers/topics/[topic] znode
> 
> 2.
> 
> How does reassignment cancellation work with these new incremental steps?
> In your example, what happens if I cancel the current reassignment while
> the replica set is (5, 6, 2, 3, 4)? Where and how would we keep the state
> to revert back to (0, 1, 2, 3, 4)?
> 
> 
> 3.
> 
> > The only exception is the first step where (if needed) we add that many
> replicas that is enough to fulfil the min.insync.replicas requirement set
> on the broker, even if it exceeds the limit on parallel replica
> reassignments. For instance if there are 4 brokers, min.insync.replicas set
> to 3 but there are only 1 in-sync replica, then we immediately add 2 other
> in one step, so the producers are able to continue.
> 
> Wouldn't that be adding extra replication traffic and pressure on an
> already problematic partition? I may be missing something but I actually
> think that this wouldn't help in common circumstances.
> 
> 4.
> 
> > Calculate the replicas to be dropped (DR):
> 
>    1. Calculate n = max(reassignment.parallel.replica.count, size(FTR) -
>    size(CR))
> 
> 
> Do you mean min() here?
> 
> 5.
> 
>    1.
>       1. Take the first *reassignment.parallel.replica.count* replicas of
>       ER, that will be the set of dropped replicas
> 
> 
> Do you mean `N replicas of ER` here?
> 
> 6.
> 
> > Calculate the new replicas (NR) to be added
> 
>    1. Calculate that if the partition has enough online replicas to fulfil
>    the min.insync.replicas config so the producers are able to continue.
>    2. If the preferred leader is different in FTR and it is not yet
>    reassigned, then add it to the NR
>    3. If size(NR) < min.insync.replicas then take min(min.insync.replicas,
>    reassignment.parallel.replica.count) - size(NR) replicas from FTR
>    4. Otherwise take as many replica as
>    *reassignment.parallel.replica.count* allows
> 
> 
> Not sure I understand this. Wouldn't we always take as many new replicas
> (NR) as the ones we are dropping (DR)?
> 
> 7.
> 
> This new configuration would tell how many replicas of a single partition
> can be moved at once.
> 
> I believe this description is outdated
> 
> 8.
> 
> I don't see it as a rejected alternative - have we considered delegating
> this logic out to the client and making Kafka accept a series of
> reassignment "steps" to run?
> 
> 
> Thanks!
> Stanislav
> 
> On Wed, Aug 21, 2019 at 12:33 PM Viktor Somogyi-Vass <
> viktorsomo...@gmail.com> wrote:
> 
> > Hey Folks,
> >
> > I think I'll open a vote early next week about this if there are no more
> > feedback.
> >
> > Thanks,
> > Viktor
> >
> > On Fri, Aug 9, 2019 at 5:25 PM Viktor Somogyi-Vass <
> > viktorsomo...@gmail.com>
> > wrote:
> >
> > > Hey Stanislav,
> > >
> > > I reiterated on the current algorithm and indeed it would change the
> > order
> > > of replicas in ZK but wouldn't do a leader election, so one would need to
> > > run the preferred replica election tool. However still in the light of
> > this
> > > I'm not sure I'd keep the existing behavior as users wouldn't win
> > anything
> > > with it. Changing leadership automatically would result in a simpler,
> > > easier and more responsive reassign algorithm which is especially
> > important
> > > if the reassignment is done to free up the broker from under heavy load.
> > > Automated tools (Cruise Control) would also have simpler life.
> > > Let me know what you think.
> > >
> > > Viktor
> > >
> > > On Thu, Jul 11, 2019 at 7:16 PM Viktor Somogyi-Vass <
> > > viktorsomo...@gmail.com> wrote:
> > >
> > >> Hi Stan,
> > >>
> > >> I meant the following (maybe rare) scenario - we have replicas [1, 2, 3]
> > >> on
> > >> a lot of partitions and the user runs a massive rebalance to change them
> > >> all to [3, 2, 1]. In the old behavior, I think that this would not do
> > >> anything but simply change the replica set in ZK.
> > >> Then, the user could run kafka-preferred-replica-election.sh on a given
> > >> set
> > >> of partitions to make sure the new leader 3 gets elected.
> > >>
> > >> I thought the old algorithm would elect 3 as the leader in this case
> > >> right away at the end but I have to double check this. In any case I
> > think
> > >> it would make sense in the new algorithm if we elected the new preferred
> > >> leader right away, regardless of the new leader is chosen from the
> > existing
> > >> replicas or not. If the whole reassignment is in fact just changing the
> > >> replica order then either way it is a simple (trivial) operation and
> > doing
> > >> it batched wouldn't slow it down much as there is no data movement
> > >> involved. If the reassignment is mixed, meaning it contains reordering
> > and
> > >> real movement as well then in fact it would help to even out the load
> > >> faster as data movements can get long. For instance in case of a
> > >> reassignment batch of two partitions concurrently: P1: (1,2,3) ->
> > (3,2,1)
> > >> and P2: (4,5,6) -> (7,8,9) the P2 reassignment would elect a new leader
> > but
> > >> P1 wouldn't and it wouldn't help the goal of normalizing traffic on
> > broker
> > >> 1 that much.
> > >> Again, I'll have to check how the current algorithm works and if it has
> > >> any unknown drawbacks to implement what I sketched up above.
> > >>
> > >> As for generic preferred leader election when called from the admin api
> > >> or the auto leader balance feature I think you're right that we should
> > >> leave it as it is. It doesn't involve any data movement so it's fairly
> > fast
> > >> and it normalizes the cluster state quickly.
> > >>
> > >> Viktor
> > >>
> > >> On Tue, Jul 9, 2019 at 9:04 PM Stanislav Kozlovski <
> > >> stanis...@confluent.io> wrote:
> > >>
> > >>> Hey Viktor,
> > >>>
> > >>>  I think it is intuitive if there are on a global level...If we applied
> > >>> > them on every batch then we
> > >>> > couldn't really guarantee any limits as the user would be able to get
> > >>> > around it with submitting lots of reassignments.
> > >>>
> > >>>
> > >>> Agreed. Could we mention this explicitly in the KIP?
> > >>>
> > >>> Also if I understand correctly, AlterPartitionAssignmentsRequest would
> > >>> be a
> > >>> > partition level batching, isn't it? So if you submit 10 partitions at
> > >>> once
> > >>> > then they'll all be started by the controller immediately as per my
> > >>> > understanding.
> > >>>
> > >>>
> > >>> Yep, absolutely
> > >>>
> > >>> I've raised the ordering problem on the discussion of KIP-455 in a bit
> > >>> > different form and as I remember the verdict there was that we
> > >>> shouldn't
> > >>> > expose ordering as an API. It might not be easy as you say and there
> > >>> might
> > >>> > be much better strategies to follow (like disk or network utilization
> > >>> > goals). Therefore I'll remove this section from the KIP.
> > >>>
> > >>>
> > >>> Sounds good to me.
> > >>>
> > >>> I'm not sure I get this scenario. So are you saying that after they
> > >>> > submitted a reassignment they also submit a preferred leader change?
> > >>> > In my mind what I would do is:
> > >>> > i) make auto.leader.rebalance.enable to obey the leader movement
> > limit
> > >>> as
> > >>> > this way it will be easier to calculate the reassignment batches.
> > >>> >
> > >>>
> > >>> Sorry, this is my fault -- I should have been more clear.
> > >>> First, I didn't think through this well enough at the time, I don't
> > >>> think.
> > >>> If we have replicas=[1, 2, 3] and we reassign them to [4, 5, 6], it is
> > >>> obvious that a leader shift will happen. Your KIP proposes we shift
> > >>> replicas 1 and 4 first.
> > >>>
> > >>> I meant the following (maybe rare) scenario - we have replicas [1, 2,
> > 3]
> > >>> on
> > >>> a lot of partitions and the user runs a massive rebalance to change
> > them
> > >>> all to [3, 2, 1]. In the old behavior, I think that this would not do
> > >>> anything but simply change the replica set in ZK.
> > >>> Then, the user could run kafka-preferred-replica-election.sh on a given
> > >>> set
> > >>> of partitions to make sure the new leader 3 gets elected.
> > >>>
> > >>> ii) the user could submit preferred leader election but it's basically
> > a
> > >>> > form of reassignment so it would fall under the batching criterias.
> > If
> > >>> the
> > >>> > batch they submit is smaller than the internal, then it'd be executed
> > >>> > immediately but otherwise it would be split into more batches. It
> > >>> might be
> > >>> > a different behavior as it may not be executed it in one batch but I
> > >>> think
> > >>> > it isn't a problem because we'll default to Int.MAX with the batches
> > >>> and
> > >>> > otherwise because since it's a form of reassignment I think it makes
> > >>> sense
> > >>> > to apply the same logic.
> > >>>
> > >>>
> > >>> The AdminAPI for that is
> > >>> "electPreferredLeaders(Collection<TopicPartition>
> > >>> partitions)" and the old zNode is "{"partitions": [{"topic": "a",
> > >>> "partition": 0}]}" so it is a bit less explicit than our other
> > >>> reassignment
> > >>> API, but the functionality is the same.
> > >>> You're 100% right that it is a form of reassignment, but I hadn't
> > thought
> > >>> of it like that and I some other people wouldn't have either.
> > >>> If I understand correctly, you're suggesting that we count the
> > >>> "reassignment.parallel.leader.movements" config against such preferred
> > >>> elections. I think that makes sense. If we are to do that we should
> > >>> explicitly mention that this KIP touches the preferred election logic
> > as
> > >>> well. Would that config also bound the number of leader movements the
> > >>> auto
> > >>> rebalance inside the broker would do as well?
> > >>>
> > >>> Then again, the "reassignment.parallel.leader.movements" config has a
> > bit
> > >>> of a different meaning when used in the context of a real reassignment
> > >>> (where data moves from the leader to N more replicas) versus in the
> > >>> preferred election switch (where all we need is two lightweight
> > >>> LeaderAndIsr requests), so I am not entirely convinced we may want to
> > use
> > >>> the same config for that.
> > >>> It might be easier to limit the scope of this KIP and not place a bound
> > >>> on
> > >>> preferred leader election.
> > >>>
> > >>> Thanks,
> > >>> Stanislav
> > >>>
> > >>>
> > >>> On Mon, Jul 8, 2019 at 4:28 PM Viktor Somogyi-Vass <
> > >>> viktorsomo...@gmail.com>
> > >>> wrote:
> > >>>
> > >>> > Hey Stanislav,
> > >>> >
> > >>> > Thanks for the thorough look at the KIP! :)
> > >>> >
> > >>> > > Let me first explicitly confirm my understanding of the configs and
> > >>> the
> > >>> > > algorithm:
> > >>> > > * reassignment.parallel.replica.count - the maximum total number of
> > >>> > > replicas that we can move at once, *per partition*
> > >>> > > * reassignment.parallel.partition.count - the maximum number of
> > >>> > partitions
> > >>> > > we can move at once
> > >>> > > * reassignment.parallel.leader.movements - the maximum number of
> > >>> leader
> > >>> > > movements we can have at once
> > >>> >
> > >>> > Yes.
> > >>> >
> > >>> > > As far as I currently understand it, your proposed algorithm will
> > >>> > naturally
> > >>> > > prioritize leader movement first. e.g if
> > >>> > > reassignment.parallel.replica.count=1 and
> > >>> > >
> > >>> >
> > >>> >
> > >>>
> > reassignment.parallel.partition.count==reassignment.parallel.leader.movements,
> > >>> > > we will always move one, the first possible, replica in the replica
> > >>> set
> > >>> > > (which will be the leader if part of the excess replica set (ER)).
> > >>> > > Am I correct in saying that?
> > >>> >
> > >>> > Yes.
> > >>> >
> > >>> > > 1. Does it make sense to add `max` somewhere in the configs' names?
> > >>> >
> > >>> > If you imply that it's not always an exact number (because the last
> > >>> batch
> > >>> > could be smaller) than I think it's a good idea. I don't mind this
> > >>> naming
> > >>> > or having max in it either.
> > >>> >
> > >>> > > 2. How does this KIP play along with KIP-455's notion of multiple
> > >>> > > rebalances - do the configs apply on a
> > >>> > > single AlterPartitionAssignmentsRequest or are they global?
> > >>> >
> > >>> > I think it is intuitive if there are on a global level and the user
> > >>> would
> > >>> > have the option to set KIP-435's settings to Int.MAX and thus
> > >>> eliminate any
> > >>> > batching and submit their own batches through
> > >>> > AlterPartitionAssignmentsRequest. If we applied them on every batch
> > >>> then we
> > >>> > couldn't really guarantee any limits as the user would be able to get
> > >>> > around it with submitting lots of reassignments. By default though we
> > >>> set
> > >>> > the batching limits to Int.MAX so effectively they're disabled.
> > >>> > Also if I understand correctly, AlterPartitionAssignmentsRequest
> > would
> > >>> be a
> > >>> > partition level batching, isn't it? So if you submit 10 partitions at
> > >>> once
> > >>> > then they'll all be started by the controller immediately as per my
> > >>> > understanding.
> > >>> >
> > >>> > > 3. Unless I've missed it, the algorithm does not take into account
> > >>> > > `reassignment.parallel.leader.movements`
> > >>> >
> > >>> > Yea the reassignment step calculation doesn't contain that because it
> > >>> > describes only one partition's reassignment step calculation. We
> > simply
> > >>> > fill our reassignment batch with as many leader movements as we can
> > and
> > >>> > then fill the rest with reassignments which don't require leader
> > >>> movement
> > >>> > if we have space. I'll create a pseudo code block on the KIP for
> > this.
> > >>> >
> > >>> > > 4. The KIP says that the order of the input has some control over
> > >>> how the
> > >>> > > batches are created - i.e it's deterministic. What would the
> > batches
> > >>> of
> > >>> > the
> > >>> > > following reassignment look like:
> > >>> > > reassignment.parallel.replica.count=1
> > >>> > > reassignment.parallel.partition.count=MAX_INT
> > >>> > > reassignment.parallel.leader.movements=1
> > >>> > > partitionA - (0,1,2) -> (3, 4, 5)
> > >>> > > partitionB - (0,1,2) -> (3,4,5)
> > >>> > > partitionC - (0,1,2) -> (3, 4, 5)
> > >>> > > From my understanding, we would start with A(0->3), B(1->4) and
> > >>> C(1->4).
> > >>> > Is
> > >>> > > that correct? Would the second step then continue with B(0->3)?
> > >>> > >
> > >>> > > If the configurations are global, I can imagine we will have a bit
> > >>> more
> > >>> > > trouble in preserving the expected ordering, especially across
> > >>> controller
> > >>> > > failovers -- but I'll avoid speculating until you confirm the scope
> > >>> of
> > >>> > the
> > >>> > > config.
> > >>> >
> > >>> > I've raised the ordering problem on the discussion of KIP-455 in a
> > bit
> > >>> > different form and as I remember the verdict there was that we
> > >>> shouldn't
> > >>> > expose ordering as an API. It might not be easy as you say and there
> > >>> might
> > >>> > be much better strategies to follow (like disk or network utilization
> > >>> > goals). Therefore I'll remove this section from the KIP. (Otherwise
> > >>> yes, I
> > >>> > would have thought of the same behavior what you described.)
> > >>> > Since #3 also was about the pseudo code, it would be something like
> > >>> this:
> > >>> > Config:
> > >>> > reassignment.parallel.replica.count=R
> > >>> > reassignment.parallel.partition.count=P
> > >>> > reassignment.parallel.leader.movements=L
> > >>> > Code:
> > >>> > val batchSize = max(L,P)
> > >>> > // split the individual partition reassignments whether they involve
> > >>> leader
> > >>> > movement or not
> > >>> > val partitionMovements =
> > >>> >
> > >>> >
> > >>>
> > calculateReassignmentStepsFor(partitionsToReassign).partition(partitionReassignment.involvesLeaderReassignment)
> > >>> > // fill the batch with as much leader movements as possible and take
> > >>> the
> > >>> > rest from other reassignments
> > >>> > val currentBatch = if (partitionMovements.leaderMovements.size <
> > >>> batchSize)
> > >>> >   partitionMovements.leaderMovements ++
> > >>> > partitionsToReassign.otherPartitionMovements.take(batchSize -
> > >>> > partitionMovements.leaderMovements.size)
> > >>> > else
> > >>> >  partitionMovements.leaderMovements.take(batchSize)
> > >>> > executeReassignmentOnBatch(currentBatch)
> > >>> >
> > >>> > > 5. Regarding the new behavior of electing the new preferred leader
> > >>> in the
> > >>> > > "first step" of the reassignment - does this obey the
> > >>> > > `auto.leader.rebalance.enable` config?
> > >>> > > If not, I have concerns regarding how backwards compatible this
> > >>> might be
> > >>> > -
> > >>> > > e.g imagine a user does a huge reassignment (as they have always
> > >>> done)
> > >>> > and
> > >>> > > suddenly a huge leader shift happens, whereas the user expected to
> > >>> > manually
> > >>> > > shift preferred leaders at a slower rate via
> > >>> > > the kafka-preferred-replica-election.sh tool.
> > >>> >
> > >>> > I'm not sure I get this scenario. So are you saying that after they
> > >>> > submitted a reassignment they also submit a preferred leader change?
> > >>> > In my mind what I would do is:
> > >>> > i) make auto.leader.rebalance.enable to obey the leader movement
> > limit
> > >>> as
> > >>> > this way it will be easier to calculate the reassignment batches.
> > >>> > ii) the user could submit preferred leader election but it's
> > basically
> > >>> a
> > >>> > form of reassignment so it would fall under the batching criterias.
> > If
> > >>> the
> > >>> > batch they submit is smaller than the internal, then it'd be executed
> > >>> > immediately but otherwise it would be split into more batches. It
> > >>> might be
> > >>> > a different behavior as it may not be executed it in one batch but I
> > >>> think
> > >>> > it isn't a problem because we'll default to Int.MAX with the batches
> > >>> and
> > >>> > otherwise because since it's a form of reassignment I think it makes
> > >>> sense
> > >>> > to apply the same logic.
> > >>> >
> > >>> > > 6. What is the expected behavior if we dynamically change one of
> > the
> > >>> > > configs to a lower value while a reassignment is happening. Would
> > we
> > >>> > cancel
> > >>> > > some of the currently reassigned partitions or would we account for
> > >>> the
> > >>> > new
> > >>> > > values on the next reassignment? I assume the latter but it's good
> > >>> to be
> > >>> > > explicit
> > >>> >
> > >>> > Yep, it would be the latter. I'll make this explicit in the KIP too.
> > >>> >
> > >>> > About the nits:
> > >>> > Noted, will update the KIP to reflect your suggestions :)
> > >>> >
> > >>> > Viktor
> > >>> >
> > >>> > On Fri, Jun 28, 2019 at 5:59 PM Stanislav Kozlovski <
> > >>> > stanis...@confluent.io>
> > >>> > wrote:
> > >>> > >
> > >>> > > Hey there Viktor,
> > >>> > >
> > >>> > > Thanks for working on this KIP! I agree with the notion that
> > >>> reliability,
> > >>> > > stability and predictability of a reassignment should be a core
> > >>> feature
> > >>> > of
> > >>> > > Kafka.
> > >>> > >
> > >>> > > Let me first explicitly confirm my understanding of the configs and
> > >>> the
> > >>> > > algorithm:
> > >>> > > * reassignment.parallel.replica.count - the maximum total number of
> > >>> > > replicas that we can move at once, *per partition*
> > >>> > > * reassignment.parallel.partition.count - the maximum number of
> > >>> > partitions
> > >>> > > we can move at once
> > >>> > > * reassignment.parallel.leader.movements - the maximum number of
> > >>> leader
> > >>> > > movements we can have at once
> > >>> > >
> > >>> > > As far as I currently understand it, your proposed algorithm will
> > >>> > naturally
> > >>> > > prioritize leader movement first. e.g if
> > >>> > > reassignment.parallel.replica.count=1 and
> > >>> > >
> > >>> >
> > >>> >
> > >>>
> > reassignment.parallel.partition.count==reassignment.parallel.leader.movements,
> > >>> > > we will always move one, the first possible, replica in the replica
> > >>> set
> > >>> > > (which will be the leader if part of the excess replica set (ER)).
> > >>> > > Am I correct in saying that?
> > >>> > >
> > >>> > > Regarding the KIP, I've got a couple of comments/questions::
> > >>> > >
> > >>> > > 1. Does it make sense to add `max` somewhere in the configs' names?
> > >>> > >
> > >>> > > 2. How does this KIP play along with KIP-455's notion of multiple
> > >>> > > rebalances - do the configs apply on a
> > >>> > > single AlterPartitionAssignmentsRequest or are they global?
> > >>> > >
> > >>> > > 3. Unless I've missed it, the algorithm does not take into account
> > >>> > > `reassignment.parallel.leader.movements`
> > >>> > >
> > >>> > > 4. The KIP says that the order of the input has some control over
> > >>> how the
> > >>> > > batches are created - i.e it's deterministic. What would the
> > batches
> > >>> of
> > >>> > the
> > >>> > > following reassignment look like:
> > >>> > > reassignment.parallel.replica.count=1
> > >>> > > reassignment.parallel.partition.count=MAX_INT
> > >>> > > reassignment.parallel.leader.movements=1
> > >>> > > partitionA - (0,1,2) -> (3, 4, 5)
> > >>> > > partitionB - (0,1,2) -> (3,4,5)
> > >>> > > partitionC - (0,1,2) -> (3, 4, 5)
> > >>> > > From my understanding, we would start with A(0->3), B(1->4) and
> > >>> C(1->4).
> > >>> > Is
> > >>> > > that correct? Would the second step then continue with B(0->3)?
> > >>> > >
> > >>> > > If the configurations are global, I can imagine we will have a bit
> > >>> more
> > >>> > > trouble in preserving the expected ordering, especially across
> > >>> controller
> > >>> > > failovers -- but I'll avoid speculating until you confirm the scope
> > >>> of
> > >>> > the
> > >>> > > config.
> > >>> > >
> > >>> > > 5. Regarding the new behavior of electing the new preferred leader
> > >>> in the
> > >>> > > "first step" of the reassignment - does this obey the
> > >>> > > `auto.leader.rebalance.enable` config?
> > >>> > > If not, I have concerns regarding how backwards compatible this
> > >>> might be
> > >>> > -
> > >>> > > e.g imagine a user does a huge reassignment (as they have always
> > >>> done)
> > >>> > and
> > >>> > > suddenly a huge leader shift happens, whereas the user expected to
> > >>> > manually
> > >>> > > shift preferred leaders at a slower rate via
> > >>> > > the kafka-preferred-replica-election.sh tool.
> > >>> > >
> > >>> > > 6. What is the expected behavior if we dynamically change one of
> > the
> > >>> > > configs to a lower value while a reassignment is happening. Would
> > we
> > >>> > cancel
> > >>> > > some of the currently reassigned partitions or would we account for
> > >>> the
> > >>> > new
> > >>> > > values on the next reassignment? I assume the latter but it's good
> > >>> to be
> > >>> > > explicit
> > >>> > >
> > >>> > > As some small nits:
> > >>> > > - could we have a section in the KIP where we explicitly define
> > what
> > >>> each
> > >>> > > config does? This can be inferred from the KIP as is but requires
> > >>> careful
> > >>> > > reading, whereas some developers might want to skim through the
> > >>> change to
> > >>> > > get a quick sense. It also improves readability but that's my
> > >>> personal
> > >>> > > opinion.
> > >>> > > - could you better clarify how a reassignment step is different
> > from
> > >>> the
> > >>> > > currently existing algorithm? maybe laying both algorithms out in
> > >>> the KIP
> > >>> > > would be most clear
> > >>> > > - the names for the OngoingPartitionReassignment and
> > >>> > > CurrentPartitionReassignment fields in the
> > >>> > > ListPartitionReassignmentsResponse are a bit confusing to me.
> > >>> > > Unfortunately, I don't have a better suggestion, but maybe somebody
> > >>> else
> > >>> > in
> > >>> > > the community has.
> > >>> > >
> > >>> > > Thanks,
> > >>> > > Stanislav
> > >>> > >
> > >>> > > On Thu, Jun 27, 2019 at 3:24 PM Viktor Somogyi-Vass <
> > >>> > viktorsomo...@gmail.com>
> > >>> > > wrote:
> > >>> > >
> > >>> > > > Hi All,
> > >>> > > >
> > >>> > > > I've renamed my KIP as its name was a bit confusing so we'll
> > >>> continue
> > >>> > it in
> > >>> > > > this thread.
> > >>> > > > The previous thread for record:
> > >>> > > >
> > >>> > > >
> > >>> >
> > >>> >
> > >>>
> > https://lists.apache.org/thread.html/0e97e30271f80540d4da1947bba94832639767e511a87bb2ba530fe7@%3Cdev.kafka.apache.org%3E
> > >>> > > >
> > >>> > > > A short summary of the KIP:
> > >>> > > > In case of a vast partition reassignment (thousands of partitions
> > >>> at
> > >>> > once)
> > >>> > > > Kafka can collapse under the increased replication traffic. This
> > >>> KIP
> > >>> > will
> > >>> > > > mitigate it by introducing internal batching done by the
> > >>> controller.
> > >>> > > > Besides putting a bandwidth limit on the replication it is useful
> > >>> to
> > >>> > batch
> > >>> > > > partition movements as fewer number of partitions will use the
> > >>> > available
> > >>> > > > bandwidth for reassignment and they finish faster.
> > >>> > > > The main control handles are:
> > >>> > > > - the number of parallel leader movements,
> > >>> > > > - the number of parallel partition movements
> > >>> > > > - and the number of parallel replica movements.
> > >>> > > >
> > >>> > > > Thank you for the feedback and ideas so far in the previous
> > thread
> > >>> and
> > >>> > I'm
> > >>> > > > happy to receive more.
> > >>> > > >
> > >>> > > > Regards,
> > >>> > > > Viktor
> > >>> > > >
> > >>> > >
> > >>> > >
> > >>> > > --
> > >>> > > Best,
> > >>> > > Stanislav
> > >>> >
> > >>>
> > >>>
> > >>> --
> > >>> Best,
> > >>> Stanislav
> > >>>
> > >>
> >
> 
> 
> -- 
> Best,
> Stanislav
>

Reply via email to