Hi Folks, I've updated the KIP with the batching which would work on both replica and partition level. To explain it briefly: for instance if the replica level is set to 2 and partition level is set to 3, then 2x3=6 replica reassignment would be in progress at the same time. In case of reassignment for a single partition from (0, 1, 2, 3, 4) to (5, 6, 7, 8, 9) we would form the batches (0, 1) → (5, 6); (2, 3) → (7, 8) and 4 → 9 and would execute the reassignment in this order.
Let me know what you think. Best, Viktor On Mon, Apr 15, 2019 at 7:01 PM Viktor Somogyi-Vass <viktorsomo...@gmail.com> wrote: > A follow up on the batching topic to clarify my points above. > > Generally I think that batching should be a core feature as Colin said the > controller should possess all information that are related. > Also Cruise Control (or really any 3rd party admin system) might build > upon this to give more holistic approach to balance brokers. We may cater > them with APIs that act like building blocks to make their life easier like > incrementalization, batching, cancellation and rollback but I think the > more advanced we go we'll need more advanced control surface and Kafka's > basic tooling might not be suitable for that. > > Best, > Viktor > > > On Mon, 15 Apr 2019, 18:22 Viktor Somogyi-Vass, <viktorsomo...@gmail.com> > wrote: > >> Hey Guys, >> >> I'll reply to you all in this email: >> >> @Jun: >> 1. yes, it'd be a good idea to add this feature, I'll write this into the >> KIP. I was actually thinking about introducing a dynamic config called >> reassignment.parallel.partition.count and >> reassignment.parallel.replica.count. The first property would control how >> many partition reassignment can we do concurrently. The second would go one >> level in granularity and would control how many replicas do we want to move >> for a given partition. Also one more thing that'd be useful to fix is that >> a given list of partition -> replica list would be executed in the same >> order (from first to last) so it's overall predictable and the user would >> have some control over the order of reassignments should be specified as >> the JSON is still assembled by the user. >> 2. the /kafka/brokers/topics/{topic} znode to be specific. I'll update >> the KIP to contain this. >> >> @Jason: >> I think building this functionality into Kafka would definitely benefit >> all the users and that CC as well as it'd simplify their software as you >> said. As I understand the main advantage of CC and other similar softwares >> are to give high level features for automatic load balancing. Reliability, >> stability and predictability of the reassignment should be a core feature >> of Kafka. I think the incrementalization feature would make it more stable. >> I would consider cancellation too as a core feature and we can leave the >> gate open for external tools to feed in their reassignment json as they >> want. I was also thinking about what are the set of features we can provide >> for Kafka but I think the more advanced we go the more need there is for an >> administrative UI component. >> Regarding KIP-352: Thanks for pointing this out, I didn't see this >> although lately I was also thinking about the throttling aspect of it. >> Would be a nice add-on to Kafka since though the above configs provide some >> level of control, it'd be nice to put an upper cap on the bandwidth and >> make it monitorable. >> >> Viktor >> >> On Wed, Apr 10, 2019 at 2:57 AM Jason Gustafson <ja...@confluent.io> >> wrote: >> >>> Hi Colin, >>> >>> On a related note, what do you think about the idea of storing the >>> > reassigning replicas in >>> > /brokers/topics/[topic]/partitions/[partitionId]/state, rather than in >>> the >>> > reassignment znode? I don't think this requires a major change to the >>> > proposal-- when the controller becomes aware that it should do a >>> > reassignment, the controller could make the changes. This also helps >>> keep >>> > the reassignment znode from getting larger, which has been a problem. >>> >>> >>> Yeah, I think it's a good idea to store the reassignment state at a finer >>> level. I'm not sure the LeaderAndIsr znode is the right one though. >>> Another >>> option is /brokers/topics/{topic}. That is where we currently store the >>> replica assignment. I think we basically want to represent both the >>> current >>> state and the desired state. This would also open the door to a cleaner >>> way >>> to update a reassignment while it is still in progress. >>> >>> -Jason >>> >>> >>> >>> >>> On Mon, Apr 8, 2019 at 11:14 PM George Li <sql_consult...@yahoo.com >>> .invalid> >>> wrote: >>> >>> > Hi Colin / Jason, >>> > >>> > Reassignment should really be doing a batches. I am not too worried >>> about >>> > reassignment znode getting larger. In a real production environment, >>> too >>> > many concurrent reassignment and too frequent submission of >>> reassignments >>> > seemed to cause latency spikes of kafka cluster. So >>> > batching/staggering/throttling of submitting reassignments is >>> recommended. >>> > >>> > In KIP-236, The "originalReplicas" are only kept for the current >>> > reassigning partitions (small #), and kept in memory of the controller >>> > context partitionsBeingReassigned as well as in the znode >>> > /admin/reassign_partitions, I think below "setting in the RPC like >>> null = >>> > no replicas are reassigning" is a good idea. >>> > >>> > There seems to be some issues with the Mail archive server of this >>> mailing >>> > list? I didn't receive email after April 7th, and the archive for >>> April >>> > 2019 has only 50 messages ( >>> > http://mail-archives.apache.org/mod_mbox/kafka-dev/201904.mbox/thread) >>> ? >>> > >>> > Thanks, >>> > George >>> > >>> > on, 08 Apr 2019 17:54:48 GMT Colin McCabe wrote: >>> > >>> > Yeah, I think adding this information to LeaderAndIsr makes sense. >>> It >>> > would be better to track >>> > "reassigningReplicas" than "originalReplicas", I think. Tracking >>> > "originalReplicas" is going >>> > to involve sending a lot more data, since most replicas in the system >>> are >>> > not reassigning >>> > at any given point. Or we would need a hack in the RPC like null = no >>> > replicas are reassigning. >>> > >>> > On a related note, what do you think about the idea of storing the >>> > reassigning replicas in >>> > /brokers/topics/[topic]/partitions/[partitionId]/state, rather than in >>> > the reassignment znode? >>> > I don't think this requires a major change to the proposal-- when the >>> > controller becomes >>> > aware that it should do a reassignment, the controller could make the >>> > changes. This also >>> > helps keep the reassignment znode from getting larger, which has been a >>> > problem. >>> > >>> > best, >>> > Colin >>> > >>> > >>> > On Mon, Apr 8, 2019, at 09:29, Jason Gustafson wrote: >>> > > Hey George, >>> > > >>> > > For the URP during a reassignment, if the "original_replicas" is >>> kept >>> > for >>> > > > the current pending reassignment. I think it will be very easy to >>> > compare >>> > > > that with the topic/partition's ISR. If all "original_replicas" >>> are in >>> > > > ISR, then URP should be 0 for that topic/partition. >>> > > >>> > > >>> > > Yeah, that makes sense. But I guess we would need >>> "original_replicas" to >>> > be >>> > > propagated to partition leaders in the LeaderAndIsr request since >>> leaders >>> > > are the ones that are computing URPs. That is basically what KIP-352 >>> had >>> > > proposed, but we also need the changes to the reassignment path. >>> Perhaps >>> > it >>> > > makes more sense to address this problem in KIP-236 since that is >>> where >>> > you >>> > > have already introduced "original_replicas"? I'm also happy to do >>> KIP-352 >>> > > as a follow-up to KIP-236. >>> > > >>> > > Best, >>> > > Jason >>> > > >>> > > >>> > > On Sun, Apr 7, 2019 at 5:09 PM Ismael Juma <isma...@gmail.com> >>> wrote: >>> > > >>> > > > Good discussion about where we should do batching. I think if >>> there is >>> > a >>> > > > clear great way to batch, then it makes a lot of sense to just do >>> it >>> > once. >>> > > > However, if we think there is scope for experimenting with >>> different >>> > > > approaches, then an API that tools can use makes a lot of sense. >>> They >>> > can >>> > > > experiment and innovate. Eventually, we can integrate something >>> into >>> > Kafka >>> > > > if it makes sense. >>> > > > >>> > > > Ismael >>> > > > >>> > > > On Sun, Apr 7, 2019, 11:03 PM Colin McCabe <cmcc...@apache.org> >>> wrote: >>> > > > >>> > > > > Hi George, >>> > > > > >>> > > > > As Jason was saying, it seems like there are two directions we >>> could >>> > go >>> > > > > here: an external system handling batching, and the controller >>> > handling >>> > > > > batching. I think the controller handling batching would be >>> better, >>> > > > since >>> > > > > the controller has more information about the state of the >>> system. >>> > If >>> > > > the >>> > > > > controller handles batching, then the controller could also >>> handle >>> > things >>> > > > > like setting up replication quotas for individual partitions. >>> The >>> > > > > controller could do things like throttle replication down if the >>> > cluster >>> > > > > was having problems. >>> > > > > >>> > > > > We kind of need to figure out which way we're going to go on >>> this one >>> > > > > before we set up big new APIs, I think. If we want an external >>> > system to >>> > > > > handle batching, then we can keep the idea that there is only one >>> > > > > reassignment in progress at once. If we want the controller to >>> > handle >>> > > > > batching, we will need to get away from that idea. Instead, we >>> > should >>> > > > just >>> > > > > have a bunch of "ideal assignments" that we tell the controller >>> > about, >>> > > > and >>> > > > > let it decide how to do the batching. These ideal assignments >>> could >>> > > > change >>> > > > > continuously over time, so from the admin's point of view, there >>> > would be >>> > > > > no start/stop/cancel, but just individual partition reassignments >>> > that we >>> > > > > submit, perhaps over a long period of time. And then >>> cancellation >>> > might >>> > > > > just mean cancelling just that individual partition reassignment, >>> > not all >>> > > > > partition reassignments. >>> > > > > >>> > > > > best, >>> > > > > Colin >>> > > > > >>> > > > > On Fri, Apr 5, 2019, at 19:34, George Li wrote: >>> > > > > > Hi Jason / Viktor, >>> > > > > > >>> > > > > > For the URP during a reassignment, if the "original_replicas" >>> is >>> > kept >>> > > > > > for the current pending reassignment. I think it will be very >>> easy >>> > to >>> > > > > > compare that with the topic/partition's ISR. If all >>> > > > > > "original_replicas" are in ISR, then URP should be 0 for that >>> > > > > > topic/partition. >>> > > > > > >>> > > > > > It would be also nice to separate the metrics MaxLag/TotalLag >>> for >>> > > > > > Reassignments. I think that will also require >>> "original_replicas" >>> > (the >>> > > > > > topic/partition's replicas just before reassignment when the AR >>> > > > > > (Assigned Replicas) is set to Set(original_replicas) + >>> > > > > > Set(new_replicas_in_reassign_partitions) ). >>> > > > > > >>> > > > > > Thanks, >>> > > > > > George >>> > > > > > >>> > > > > > On Friday, April 5, 2019, 6:29:55 PM PDT, Jason Gustafson >>> > > > > > <ja...@confluent.io> wrote: >>> > > > > > >>> > > > > > Hi Viktor, >>> > > > > > >>> > > > > > Thanks for writing this up. As far as questions about overlap >>> with >>> > > > > KIP-236, >>> > > > > > I agree it seems mostly orthogonal. I think KIP-236 may have >>> had a >>> > > > larger >>> > > > > > initial scope, but now it focuses on cancellation and batching >>> is >>> > left >>> > > > > for >>> > > > > > future work. >>> > > > > > >>> > > > > > With that said, I think we may not actually need a KIP for the >>> > current >>> > > > > > proposal since it doesn't change any APIs. To make it more >>> > generally >>> > > > > > useful, however, it would be nice to handle batching at the >>> > partition >>> > > > > level >>> > > > > > as well as Jun suggests. The basic question is at what level >>> > should the >>> > > > > > batching be determined. You could rely on external processes >>> (e.g. >>> > > > cruise >>> > > > > > control) or it could be built into the controller. There are >>> > tradeoffs >>> > > > > > either way, but I think it simplifies such tools if it is >>> handled >>> > > > > > internally. Then it would be much safer to submit a larger >>> > reassignment >>> > > > > > even just using the simple tools that come with Kafka. >>> > > > > > >>> > > > > > By the way, since you are looking into some of the reassignment >>> > logic, >>> > > > > > another problem that we might want to address is the misleading >>> > way we >>> > > > > > report URPs during a reassignment. I had a naive proposal for >>> this >>> > > > > > previously, but it didn't really work >>> > > > > > >>> > > > > >>> > > > >>> > >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-352%3A+Distinguish+URPs+caused+by+reassignment >>> > > > > . >>> > > > > > Potentially fixing that could fall under this work as well if >>> you >>> > think >>> > > > > > it >>> > > > > > makes sense. >>> > > > > > >>> > > > > > Best, >>> > > > > > Jason >>> > > > > > >>> > > > > > On Thu, Apr 4, 2019 at 4:49 PM Jun Rao <j...@confluent.io> >>> wrote: >>> > > > > > >>> > > > > > > Hi, Viktor, >>> > > > > > > >>> > > > > > > Thanks for the KIP. A couple of comments below. >>> > > > > > > >>> > > > > > > 1. Another potential thing to do reassignment incrementally >>> is to >>> > > > move >>> > > > > a >>> > > > > > > batch of partitions at a time, instead of all partitions. >>> This >>> > may >>> > > > > lead to >>> > > > > > > less data replication since by the time the first batch of >>> > partitions >>> > > > > have >>> > > > > > > been completely moved, some data of the next batch may have >>> been >>> > > > > deleted >>> > > > > > > due to retention and doesn't need to be replicated. >>> > > > > > > >>> > > > > > > 2. "Update CR in Zookeeper with TR for the given partition". >>> > Which >>> > ZK >>> > > > > path >>> > > > > > > is this for? >>> > > > > > > >>> > > > > > > Jun >>> > > > > > > >>> > > > > > > On Sat, Feb 23, 2019 at 2:12 AM Viktor Somogyi-Vass < >>> > > > > > > viktorsomo...@gmail.com> >>> > > > > > > wrote: >>> > > > > > > >>> > > > > > > > Hi Harsha, >>> > > > > > > > >>> > > > > > > > As far as I understand KIP-236 it's about enabling >>> reassignment >>> > > > > > > > cancellation and as a future plan providing a queue of >>> replica >>> > > > > > > reassignment >>> > > > > > > > steps to allow manual reassignment chains. While I agree >>> that >>> > the >>> > > > > > > > reassignment chain has a specific use case that allows fine >>> > grain >>> > > > > control >>> > > > > > > > over reassignment process, My proposal on the other hand >>> > doesn't >>> > > > talk >>> > > > > > > about >>> > > > > > > > cancellation but it only provides an automatic way to >>> > > > incrementalize >>> > > > > an >>> > > > > > > > arbitrary reassignment which I think fits the general use >>> case >>> > > > where >>> > > > > > > users >>> > > > > > > > don't want that level of control but still would like a >>> > balanced >>> > > > way >>> > > > > of >>> > > > > > > > reassignments. Therefore I think it's still relevant as an >>> > > > > improvement of >>> > > > > > > > the current algorithm. >>> > > > > > > > Nevertheless I'm happy to add my ideas to KIP-236 as I >>> think >>> > it >>> > > > > would be >>> > > > > > > a >>> > > > > > > > great improvement to Kafka. >>> > > > > > > > >>> > > > > > > > Cheers, >>> > > > > > > > Viktor >>> > > > > > > > >>> > > > > > > > On Fri, Feb 22, 2019 at 5:05 PM Harsha <ka...@harsha.io> >>> > wrote: >>> > > > > > > > >>> > > > > > > > > Hi Viktor, >>> > > > > > > > > There is already KIP-236 for the same feature >>> > and >>> > > > George >>> > > > > > > made >>> > > > > > > > > a PR for this as well. >>> > > > > > > > > Lets consolidate these two discussions. If you have any >>> > cases >>> > > > that >>> > > > > are >>> > > > > > > > not >>> > > > > > > > > being solved by KIP-236 can you please mention them in >>> > that >>> > > > > thread. We >>> > > > > > > > can >>> > > > > > > > > address as part of KIP-236. >>> > > > > > > > > >>> > > > > > > > > Thanks, >>> > > > > > > > > Harsha >>> > > > > > > > > >>> > > > > > > > > On Fri, Feb 22, 2019, at 5:44 AM, Viktor Somogyi-Vass >>> wrote: >>> > > > > > > > > > Hi Folks, >>> > > > > > > > > > >>> > > > > > > > > > I've created a KIP about an improvement of the >>> reassignment >>> > > > > algorithm >>> > > > > > > > we >>> > > > > > > > > > have. It aims to enable partition-wise incremental >>> > > > reassignment. >>> > > > > The >>> > > > > > > > > > motivation for this is to avoid excess load that the >>> > current >>> > > > > > > > replication >>> > > > > > > > > > algorithm implicitly carries as in that case there >>> > are points >>> > > > in >>> > > > > the >>> > > > > > > > > > algorithm where both the new and old replica set could >>> > be >>> > > > online >>> > > > > and >>> > > > > > > > > > replicating which puts double (or almost double) >>> pressure >>> > on >>> > > > the >>> > > > > > > > brokers >>> > > > > > > > > > which could cause problems. >>> > > > > > > > > > Instead my proposal would slice this up into several >>> > steps >>> > > > where >>> > > > > each >>> > > > > > > > > step >>> > > > > > > > > > is calculated based on the final target replicas and >>> > the >>> > > > current >>> > > > > > > > replica >>> > > > > > > > > > assignment taking into account scenarios where brokers >>> > could be >>> > > > > > > offline >>> > > > > > > > > and >>> > > > > > > > > > when there are not enough replicas to fulfil the >>> > > > > min.insync.replica >>> > > > > > > > > > requirement. >>> > > > > > > > > > >>> > > > > > > > > > The link to the KIP: >>> > > > > > > > > > >>> > > > > > > > > >>> > > > > > > > >>> > > > > > > >>> > > > > >>> > > > >>> > >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-435%3A+Incremental+Partition+Reassignment >>> > > > > > > > > > >>> > > > > > > > > > I'd be happy to receive any feedback. >>> > > > > > > > > > >>> > > > > > > > > > An important note is that this KIP and another one, >>> > KIP-236 >>> > > > that >>> > > > > is >>> > > > > > > > > > about >>> > > > > > > > > > interruptible reassignment ( >>> > > > > > > > > > >>> > > > > > > > > >>> > > > > > > > >>> > > > > > > >>> > > > > >>> > > > >>> > >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-236%3A+Interruptible+Partition+Reassignment >>> > > > > > > > > ) >>> > > > > > > > > > should be compatible. >>> > > > > > > > > > >>> > > > > > > > > > Thanks, >>> > > > > > > > > > Viktor >>> > > > > > > > > > >>> > > > > > > > > >>> > > > > > > > >>> > > > > > > >>> > > > > > >>> > > > > >>> > > > >>> > > >>> > >>> >>