Read through the KIP and I have one comment: It seems like it is not looking strictly for cancellation but also implements rolling back to the original. I think it'd be much simpler to generate a reassignment json on cancellation that contains the original assignment and start a new partition reassignment completely. This way the reassignment algorithm (whatever it is) could be reused as a whole. Did you consider this or are there any obstacles that prevents doing this?
Regards, Viktor On Fri, Feb 22, 2019 at 2:24 PM Viktor Somogyi-Vass <viktorsomo...@gmail.com> wrote: > I've published the above mentioned KIP here: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-435%3A+Incremental+Partition+Reassignment > Will start a discussion about it soon. > > On Fri, Feb 22, 2019 at 12:45 PM Viktor Somogyi-Vass < > viktorsomo...@gmail.com> wrote: > >> Hi Folks, >> >> I also have a pending active work on the incremental partition >> reassignment stuff here: https://issues.apache.org/jira/browse/KAFKA-6794 >> I think it would be good to cooperate on this to make both work >> compatible with each other. >> >> I'll write up a KIP about this today so it'll be easier to see how to fit >> the two together. Basically in my work I operate on the >> /admin/reassign_partitions node on a fully compatible way, meaning I won't >> change it just calculate each increment based on that and the current state >> of the ISR set for the partition in reassignment. >> I hope we could collaborate on this. >> >> Viktor >> >> On Thu, Feb 21, 2019 at 9:04 PM Harsha <ka...@harsha.io> wrote: >> >>> Thanks George. LGTM. >>> Jun & Tom, Can you please take a look at the updated KIP. >>> Thanks, >>> Harsha >>> >>> On Wed, Feb 20, 2019, at 12:18 PM, George Li wrote: >>> > Hi, >>> > >>> > After discussing with Tom, Harsha and I are picking up KIP-236 < >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-236%3A+Interruptible+Partition+Reassignment>. >>> The work focused on safely/cleanly cancel / rollback pending reassignments >>> in a timely fashion. Pull Request #6296 < >>> https://github.com/apache/kafka/pull/6296> Still working on more >>> integration/system tests. >>> > >>> > Please review and provide feedbacks/suggestions. >>> > >>> > Thanks, >>> > George >>> > >>> > >>> > On Saturday, December 23, 2017, 0:51:13 GMT, Jun Rao <j...@confluent.io> >>> wrote: >>> > >>> > Hi, Tom, >>> >>> Thanks for the reply. >>> >>> 10. That's a good thought. Perhaps it's better to get rid of >>> /admin/reassignment_requests >>> too. The window when a controller is not available is small. So, we can >>> just failed the admin client if the controller is not reachable after the >>> timeout. >>> >>> 13. With the changes in 10, the old approach is handled through ZK >>> callback >>> and the new approach is through Kafka RPC. The ordering between the two >>> is >>> kind of arbitrary. Perhaps the ordering can just be based on the order >>> that >>> the reassignment is added to the controller request queue. From there, we >>> can either do the overriding or the prevention. >>> >>> Jun >>> >>> >>> On Fri, Dec 22, 2017 at 7:31 AM, Tom Bentley <t.j.bent...@gmail.com> >>> wrote: >>> >>> > Hi Jun, >>> > >>> > Thanks for responding, my replies are inline: >>> > >>> > 10. You explanation makes sense. My remaining concern is the >>> additional ZK >>> > > writes in the proposal. With the proposal, we will need to do >>> following >>> > > writes in ZK. >>> > > >>> > > a. write new assignment in /admin/reassignment_requests >>> > > >>> > > b. write new assignment and additional metadata in >>> > > /admin/reassignments/$topic/$partition >>> > > >>> > > c. write old + new assignment in /brokers/topics/[topic] >>> > > >>> > > d. write new assignment in /brokers/topics/[topic] >>> > > >>> > > e. delete /admin/reassignments/$topic/$partition >>> > > >>> > > So, there are quite a few ZK writes. I am wondering if it's better to >>> > > consolidate the info in /admin/reassignments/$topic/$partition into >>> > > /brokers/topics/[topic]. >>> > > For example, we can just add some new JSON fields in >>> > > /brokers/topics/[topic] >>> > > to remember the new assignment and potentially the original replica >>> count >>> > > when doing step c. Those fields with then be removed in step d. That >>> way, >>> > > we can get rid of step b and e, saving 2 ZK writes per partition. >>> > > >>> > >>> > This seems like a great idea to me. >>> > >>> > It might also be possible to get rid of the >>> /admin/reassignment_requests >>> > subtree too. I've not yet published the ideas I have for the >>> AdminClient >>> > API for reassigning partitions, but given the existence of such an >>> API, the >>> > route to starting a reassignment would be the AdminClient, and not >>> > zookeeper. In that case there is no need for >>> /admin/reassignment_requests >>> > at all. The only drawback that I can see is that while it's currently >>> > possible to trigger a reassignment even during a controller >>> > election/failover that would no longer be the case if all requests had >>> to >>> > go via the controller. >>> > >>> > >>> > > 11. What you described sounds good. We could potentially optimize the >>> > > dropped replicas a bit more. Suppose that assignment [0,1,2] is first >>> > > changed to [1,2,3] and then to [2,3,4]. When initiating the second >>> > > assignment, we may end up dropping replica 3 and only to restart it >>> > again. >>> > > In this case, we could only drop a replica if it's not going to be >>> added >>> > > back again. >>> > > >>> > >>> > I had missed that, thank you! I will update the proposed algorithm to >>> > prevent this. >>> > >>> > >>> > > 13. Since this is a corner case, we can either prevent or allow >>> > overriding >>> > > with old/new mechanisms. To me, it seems that allowing is simpler to >>> > > implement, the order in /admin/reassignment_requests determines the >>> > > ordering the of override, whether that's initiated by the new way or >>> the >>> > > old way. >>> > > >>> > >>> > That makes sense except for the corner case where: >>> > >>> > * There is no current controller and >>> > * Writes to both the new and old znodes happen >>> > >>> > On election of the new controller, for those partitions with both a >>> > reassignment_request and in /admin/reassign_partitions, we have to >>> decide >>> > which should win. You could use the modification time, though there are >>> > some very unlikely scenarios where that doesn't work properly, for >>> example >>> > if both znodes have the same mtime, or the /admin/reassign_partitions >>> was >>> > updated, but the assignment of the partition wasn't changed, like this: >>> > >>> > 0. /admin/reassign_partitions has my-topic/42 = [1,2,3] >>> > 1. Controller stops watching. >>> > 2. Create /admin/reassignment_requests/request_1234 to change the >>> > reassignment of partition my-topic/42 = [4,5,6] >>> > 3. Update /admin/reassign_partitions to add your-topic/12=[7,8,9] >>> > 4. New controller resumes >>> > >>> > >>> > >>> > > Thanks, >>> > > >>> > > Jun >>> > > >>> > > On Tue, Dec 19, 2017 at 2:43 AM, Tom Bentley <t.j.bent...@gmail.com> >>> > > wrote: >>> > > >>> > > > Hi Jun, >>> > > > >>> > > > 10. Another concern of mine is on consistency with the current >>> pattern. >>> > > The >>> > > > > current pattern for change notification based on ZK is (1) we >>> first >>> > > write >>> > > > > the actual value in the entity path and then write the change >>> > > > notification >>> > > > > path, and (2) the change notification path only includes what >>> entity >>> > > has >>> > > > > changed but not the actual changes. If we want to follow this >>> pattern >>> > > for >>> > > > > consistency, /admin/reassignment_requests/request_xxx will only >>> have >>> > > the >>> > > > > partitions whose reassignment have changed, but not the actual >>> > > > > reassignment. >>> > > > > >>> > > > >>> > > > Ah, I hadn't understood part (2). That means my concern about >>> > efficiency >>> > > > with the current pattern is misplaced. There are still some >>> interesting >>> > > > differences in semantics, however: >>> > > > >>> > > > a) The mechanism currently proposed in KIP-236 means that the >>> > controller >>> > > is >>> > > > the only writer to /admin/reassignments. This means it can include >>> > > > information in these znodes that requesters might not know, or >>> > > information >>> > > > that's necessary to perform the reassignment but not necessary to >>> > > describe >>> > > > the request. While this could be handled using the current pattern >>> it >>> > > would >>> > > > rely on all writers to preserve any information added by the >>> > controller, >>> > > > which seems complicated and hence fragile. >>> > > > >>> > > > b) The current pattern for change notification doesn't cope with >>> > > competing >>> > > > writers to the entity path: If two processes write to the entity >>> path >>> > > > before the controller can read it (due to notification) then one >>> set of >>> > > > updates will be lost. >>> > > > >>> > > > c) If a single writing process crashes after writing to the entity >>> > path, >>> > > > but before writing to the notification path then the write will be >>> > lost. >>> > > > >>> > > > I'm actually using point a) in my WIP (see below). Points b) and >>> c) are >>> > > > obviously edge cases. >>> > > > >>> > > > >>> > > > > 11. Ok. I am not sure that I fully understand the description of >>> that >>> > > > part. >>> > > > > Does "assigned" refer to the current assignment? Could you also >>> > > describe >>> > > > > where the length of the original assignment is stored in ZK? >>> > > > > >>> > > > >>> > > > Sorry if the description is not clear. Yes, "assigned" referrs to >>> the >>> > > > currently assigned replicas (taken from the >>> > > > ControllerContext.partitionReplicaAssignment). I would store the >>> > length >>> > > of >>> > > > the original assignment in the >>> /admin/reassignments/$topic/$partition >>> > > > znode >>> > > > (this is where the point (a) above is useful -- the requester >>> shouldn't >>> > > > know that this information is used by the controller). >>> > > > >>> > > > I've updated the KIP to make these points clearer. >>> > > > >>> > > > >>> > > > > 13. Hmm, I am not sure that the cancellation needs to be done >>> for the >>> > > > whole >>> > > > > batch. The reason that I brought this up is for consistency. The >>> KIP >>> > > > allows >>> > > > > override when using the new approach. It just seems that it's >>> simpler >>> > > to >>> > > > > extend this model when resolving multiple changes between the >>> old and >>> > > the >>> > > > > new approach. >>> > > > >>> > > > >>> > > > Ah, I think I've been unclear on this point too. Currently the >>> > > > ReassignPartitionsCommand enforces that you can't change >>> reassignments, >>> > > but >>> > > > this doesn't stop other ZK clients making changes to >>> > > > /admin/reassign_partitions directly and I believe some Kafka users >>> do >>> > > > indeed change reassignments in-flight by writing to >>> > > > /admin/reassign_partitions. What I'm proposing doesn't break that >>> at >>> > all. >>> > > > The semantic I've implemented is only that the controller only >>> refuses >>> > a >>> > > > reassignment change if there is already one in-flight (i.e. in >>> > > > /admin/reassignments/$topic/$partition) **via the other >>> mechansim**. >>> > So >>> > > if >>> > > > you're using /admin/reassign_partitions and you change or cancel >>> part >>> > of >>> > > it >>> > > > via /admin/reassign_partitions, that's OK. Likewise if you're using >>> > > > /admin/reassignment_request/request_xxx and you change or cancel >>> part >>> > of >>> > > > it >>> > > > via another /admin/reassignment_request/request_xxx, that's OK.What >>> > you >>> > > > can't do is change a request that was started via >>> > > > /admin/reassign_partitions via /admin/reassignment_request/ >>> > request_xxx, >>> > > or >>> > > > vice versa. >>> > > > >>> > > > What I was thinking of when I replied is the case where, on >>> controller >>> > > > failover, /admin/reassign_partitions has been changed and >>> > > > /admin/reassignment_request/request_xxx created (in the period when >>> > the >>> > > > new >>> > > > controller was being elected, for example) with a common >>> partition. In >>> > > this >>> > > > case we should apply a consistent rule to that used when the >>> > notification >>> > > > happen in real time. Your suggestion to use the modification time >>> of >>> > the >>> > > > znode would work here too (except in the edge case where ZK writes >>> to >>> > > both >>> > > > znodes happen within the same clock tick on the ZK server, so the >>> > mtimes >>> > > > are the same). >>> > > > >>> > > > Let me know if you think this is the right semantic and I'll try to >>> > > clarify >>> > > > the KIP. >>> > > > >>> > > > Many thanks, >>> > > > >>> > > > Tom >>> > > > >>> > > > On 18 December 2017 at 18:12, Jun Rao <j...@confluent.io> wrote: >>> > > > >>> > > > > Hi, Tom, >>> > > > > >>> > > > > Thanks for the reply. A few more followup comments below. >>> > > > > >>> > > > > 10. Another concern of mine is on consistency with the current >>> > pattern. >>> > > > The >>> > > > > current pattern for change notification based on ZK is (1) we >>> first >>> > > write >>> > > > > the actual value in the entity path and then write the change >>> > > > notification >>> > > > > path, and (2) the change notification path only includes what >>> entity >>> > > has >>> > > > > changed but not the actual changes. If we want to follow this >>> pattern >>> > > for >>> > > > > consistency, /admin/reassignment_requests/request_xxx will only >>> have >>> > > the >>> > > > > partitions whose reassignment have changed, but not the actual >>> > > > > reassignment. >>> > > > > >>> > > > > 11. Ok. I am not sure that I fully understand the description of >>> that >>> > > > part. >>> > > > > Does "assigned" refer to the current assignment? Could you also >>> > > describe >>> > > > > where the length of the original assignment is stored in ZK? >>> > > > > >>> > > > > 13. Hmm, I am not sure that the cancellation needs to be done >>> for the >>> > > > whole >>> > > > > batch. The reason that I brought this up is for consistency. The >>> KIP >>> > > > allows >>> > > > > override when using the new approach. It just seems that it's >>> simpler >>> > > to >>> > > > > extend this model when resolving multiple changes between the >>> old and >>> > > the >>> > > > > new approach. >>> > > > > >>> > > > > Jun >>> > > > > >>> > > > > >>> > > > > >>> > > > > On Mon, Dec 18, 2017 at 2:45 AM, Tom Bentley < >>> t.j.bent...@gmail.com> >>> > > > > wrote: >>> > > > > >>> > > > > > Hi Jun, >>> > > > > > >>> > > > > > Thanks for replying, some answers below: >>> > > > > > >>> > > > > > >>> > > > > > > 10. The proposal now stores the reassignment for all >>> partitions >>> > in >>> > > > > > > /admin/reassignment_requests/request_xxx. If the number of >>> > > > reassigned >>> > > > > > > partitions is larger, the ZK write may hit the default 1MB >>> limit >>> > > and >>> > > > > > fail. >>> > > > > > > An alternative approach is to have the reassignment requester >>> > first >>> > > > > write >>> > > > > > > the new assignment for each partition under >>> > > > > > > /admin/reassignments/$topic/$partition and then write >>> > > > > > > /admin/reassignment_requests/request_xxx with an empty value. >>> > The >>> > > > > > > controller can then read all values under >>> /admin/reassignments. >>> > > > > > > >>> > > > > > >>> > > > > > You're right that reassigning enough partitions would hit the >>> 1MB >>> > > > limit, >>> > > > > > but I don't think this would be a problem in practice because >>> it >>> > > would >>> > > > be >>> > > > > > trivial to split the partitions into several requests (i.e. >>> > mutleiple >>> > > > > > request_xxx). >>> > > > > > I don't think the non-atomicity this would imply is a problem. >>> By >>> > > > writing >>> > > > > > the partitions whose /admin/reassignments/$topic/$partition has >>> > been >>> > > > > > created or changed it makes it much more efficient to know >>> which of >>> > > > those >>> > > > > > znodes we need to read. If I understand your suggestion, you >>> would >>> > > have >>> > > > > to >>> > > > > > read every node under /admin/reassignments to figure out which >>> had >>> > > > > changed. >>> > > > > > >>> > > > > > >>> > > > > > > 11. The improvement you suggested in >>> onPartitionReassignment() >>> > > sounds >>> > > > > > good >>> > > > > > > at the high level. The computation of those dropped >>> partitions >>> > > seems >>> > > > a >>> > > > > > bit >>> > > > > > > complicated. Perhaps a simple approach is to drop the >>> replicas >>> > not >>> > > in >>> > > > > the >>> > > > > > > original assignment and newest reassignment? >>> > > > > > > >>> > > > > > >>> > > > > > This was what I came up with originally too, but when I looked >>> into >>> > > > > > implementing it I found a couple of things which made me >>> reconsider >>> > > it. >>> > > > > > Consider the reassignments [0,1] -> [2,3] -> [3,4]. In words: >>> we >>> > > start >>> > > > > > reassigning to [2,3], but then change our minds about 2 and >>> switch >>> > it >>> > > > to >>> > > > > 4 >>> > > > > > (maybe we've figured out a better overall balance). At that >>> point >>> > it >>> > > is >>> > > > > > perfectly possible that broker 2 is in-sync and broker 1 is not >>> > > > in-sync. >>> > > > > It >>> > > > > > seems silly to drop broker 2 in favour of broker 1: We're >>> > needlessly >>> > > > > giving >>> > > > > > the cluster more work to do. >>> > > > > > >>> > > > > > The second thing that made me reconsider was in that same >>> scenario >>> > > it's >>> > > > > > even possible that broker 2 is the leader of the partition. >>> > Obviously >>> > > > we >>> > > > > > can elect a new leader before dropping it, but not without >>> causing >>> > > > > > disruption to producers and consumers. >>> > > > > > >>> > > > > > By accepting a little more complexity in choosing which >>> brokers to >>> > > drop >>> > > > > we >>> > > > > > make the dropping simpler (no need for leader election) and >>> ensure >>> > > the >>> > > > > > cluster has less work to do. >>> > > > > > >>> > > > > > >>> > > > > > > 12. You brought up the need of remembering the original >>> > assignment. >>> > > > > This >>> > > > > > > will be lost if the assignment is changed multiple times if >>> we >>> > > follow >>> > > > > the >>> > > > > > > approach described in 10. One way is to store the original >>> > > assignment >>> > > > > in >>> > > > > > > /brokers/topics/[topic] as the following. When the final >>> > > reassignment >>> > > > > > > completes, we can remove the original field. >>> > > > > > > { >>> > > > > > > "version": 1, >>> > > > > > > "partitions": {"0": [0, 1, 3] }, >>> > > > > > > "originals": {"0": [0, 1, 2] } >>> > > > > > > } >>> > > > > > > >>> > > > > > >>> > > > > > While I was implementing my first version of >>> > > onPartitionReassignment(), >>> > > > > > where I preferred the originals, I was storing the originals >>> in the >>> > > > > > /admin/reassignments/$topic/$partition znodes. Since we will >>> > remove >>> > > > that >>> > > > > > znode at the end of reassignment anyway, I would suggest this >>> is a >>> > > > better >>> > > > > > place to store that data (if it's necessary to do so), so that >>> we >>> > can >>> > > > > avoid >>> > > > > > another ZK round trip. >>> > > > > > >>> > > > > > >>> > > > > > > 13. For resolving the conflict between >>> /admin/reassign_partitions >>> > > and >>> > > > > > > /admin/reassignments/$topic/$partition, perhaps it's more >>> > natural >>> > > to >>> > > > > > just >>> > > > > > > let the assignment with a newer timestamp to override the >>> older >>> > > one? >>> > > > > > > >>> > > > > > >>> > > > > > That would work but with slightly different semantics to what I >>> > have: >>> > > > > Since >>> > > > > > /admin/reassign_partitions contains multiple partitions, using >>> the >>> > > > > > timestamp means the whole batch wins or losses. By tracking how >>> > each >>> > > > > > request was made we can be more fine-grained. I'm to use the >>> > > > modification >>> > > > > > time if such granularity is not required. >>> > > > > > >>> > > > > > >>> > > > > > > 14. Implementation wise, currently, we register a watcher of >>> the >>> > > isr >>> > > > > path >>> > > > > > > of each partition being reassigned. This has the potential >>> issue >>> > of >>> > > > > > > registering many listeners. An improvement could be just >>> > > piggybacking >>> > > > > on >>> > > > > > > the existing IsrChangeNotificationHandler, which only >>> watches a >>> > > > single >>> > > > > ZK >>> > > > > > > path and is triggered on a batch of isr changes. This is >>> kind of >>> > > > > > orthogonal >>> > > > > > > to the KIP. However, if we are touching the reassignment >>> logic, >>> > it >>> > > > may >>> > > > > be >>> > > > > > > worth considering. >>> > > > > > >>> > > > > > >>> > > > > > Let me look into that. >>> > > > > > >>> > > > > > Thanks, >>> > > > > > >>> > > > > > Tom >>> > > > > > >>> > > > > > On 16 December 2017 at 02:19, Jun Rao <j...@confluent.io> >>> wrote: >>> > > > > > >>> > > > > > > Hi, Tom, >>> > > > > > > >>> > > > > > > Thanks for the updated KIP. A few more comments below. >>> > > > > > > >>> > > > > > > 10. The proposal now stores the reassignment for all >>> partitions >>> > in >>> > > > > > > /admin/reassignment_requests/request_xxx. If the number of >>> > > > reassigned >>> > > > > > > partitions is larger, the ZK write may hit the default 1MB >>> limit >>> > > and >>> > > > > > fail. >>> > > > > > > An alternative approach is to have the reassignment requester >>> > first >>> > > > > write >>> > > > > > > the new assignment for each partition under >>> > > > > > > /admin/reassignments/$topic/$partition and then write >>> > > > > > > /admin/reassignment_requests/request_xxx with an empty value. >>> > The >>> > > > > > > controller can then read all values under >>> /admin/reassignments. >>> > > > > > > >>> > > > > > > 11. The improvement you suggested in >>> onPartitionReassignment() >>> > > sounds >>> > > > > > good >>> > > > > > > at the high level. The computation of those dropped >>> partitions >>> > > seems >>> > > > a >>> > > > > > bit >>> > > > > > > complicated. Perhaps a simple approach is to drop the >>> replicas >>> > not >>> > > in >>> > > > > the >>> > > > > > > original assignment and newest reassignment? >>> > > > > > > >>> > > > > > > 12. You brought up the need of remembering the original >>> > assignment. >>> > > > > This >>> > > > > > > will be lost if the assignment is changed multiple times if >>> we >>> > > follow >>> > > > > the >>> > > > > > > approach described in 10. One way is to store the original >>> > > assignment >>> > > > > in >>> > > > > > > /brokers/topics/[topic] as the following. When the final >>> > > reassignment >>> > > > > > > completes, we can remove the original field. >>> > > > > > > { >>> > > > > > > "version": 1, >>> > > > > > > "partitions": {"0": [0, 1, 3] }, >>> > > > > > > "originals": {"0": [0, 1, 2] } >>> > > > > > > } >>> > > > > > > >>> > > > > > > 13. For resolving the conflict between >>> /admin/reassign_partitions >>> > > and >>> > > > > > > /admin/reassignments/$topic/$partition, perhaps it's more >>> > natural >>> > > to >>> > > > > > just >>> > > > > > > let the assignment with a newer timestamp to override the >>> older >>> > > one? >>> > > > > > > >>> > > > > > > 14. Implementation wise, currently, we register a watcher of >>> the >>> > > isr >>> > > > > path >>> > > > > > > of each partition being reassigned. This has the potential >>> issue >>> > of >>> > > > > > > registering many listeners. An improvement could be just >>> > > piggybacking >>> > > > > on >>> > > > > > > the existing IsrChangeNotificationHandler, which only >>> watches a >>> > > > single >>> > > > > ZK >>> > > > > > > path and is triggered on a batch of isr changes. This is >>> kind of >>> > > > > > orthogonal >>> > > > > > > to the KIP. However, if we are touching the reassignment >>> logic, >>> > it >>> > > > may >>> > > > > be >>> > > > > > > worth considering. >>> > > > > > > >>> > > > > > > Thanks, >>> > > > > > > >>> > > > > > > Jun >>> > > > > > > >>> > > > > > > On Fri, Dec 15, 2017 at 10:17 AM, Tom Bentley < >>> > > t.j.bent...@gmail.com >>> > > > > >>> > > > > > > wrote: >>> > > > > > > >>> > > > > > > > Just wanted to mention that I've started KIP-240, which >>> builds >>> > on >>> > > > top >>> > > > > > of >>> > > > > > > > this one to provide an AdminClient API for listing and >>> > describing >>> > > > > > > > reassignments. >>> > > > > > > > >>> > > > > > > > On 15 December 2017 at 14:34, Tom Bentley < >>> > t.j.bent...@gmail.com >>> > > > >>> > > > > > wrote: >>> > > > > > > > >>> > > > > > > > > > Should we seek to improve this algorithm in this KIP, >>> or >>> > > leave >>> > > > > that >>> > > > > > > as >>> > > > > > > > > a later optimisation? >>> > > > > > > > > >>> > > > > > > > > I've updated the KIP with a proposed algorithm. >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > > > On 14 December 2017 at 09:57, Tom Bentley < >>> > > t.j.bent...@gmail.com >>> > > > > >>> > > > > > > wrote: >>> > > > > > > > > >>> > > > > > > > >> Thanks Ted, now fixed. >>> > > > > > > > >> >>> > > > > > > > >> On 13 December 2017 at 18:38, Ted Yu < >>> yuzhih...@gmail.com> >>> > > > wrote: >>> > > > > > > > >> >>> > > > > > > > >>> Tom: >>> > > > > > > > >>> bq. create a znode >>> /admin/reassignments/$topic-$partition >>> > > > > > > > >>> >>> > > > > > > > >>> Looks like the tree structure above should be: >>> > > > > > > > >>> >>> > > > > > > > >>> /admin/reassignments/$topic/$partition >>> > > > > > > > >>> >>> > > > > > > > >>> bq. The controller removes /admin/reassignment/$topic/$ >>> > > > partition >>> > > > > > > > >>> >>> > > > > > > > >>> Note the lack of 's' for reassignment. It would be >>> good to >>> > > make >>> > > > > > > > zookeeper >>> > > > > > > > >>> paths consistent. >>> > > > > > > > >>> >>> > > > > > > > >>> Thanks >>> > > > > > > > >>> >>> > > > > > > > >>> On Wed, Dec 13, 2017 at 9:49 AM, Tom Bentley < >>> > > > > > t.j.bent...@gmail.com> >>> > > > > > > > >>> wrote: >>> > > > > > > > >>> >>> > > > > > > > >>> > Hi Jun and Ted, >>> > > > > > > > >>> > >>> > > > > > > > >>> > Jun, you're right that needing one watcher per >>> reassigned >>> > > > > > partition >>> > > > > > > > >>> > presents a scalability problem, and using a separate >>> > > > > notification >>> > > > > > > > path >>> > > > > > > > >>> > solves that. I also agree that it makes sense to >>> prevent >>> > > > users >>> > > > > > from >>> > > > > > > > >>> using >>> > > > > > > > >>> > both methods on the same reassignment. >>> > > > > > > > >>> > >>> > > > > > > > >>> > Ted, naming the reassignments like mytopic-42 was >>> simpler >>> > > > > while I >>> > > > > > > was >>> > > > > > > > >>> > proposing a watcher-per-reassignment (I'd have >>> needed a >>> > > child >>> > > > > > > watcher >>> > > > > > > > >>> on >>> > > > > > > > >>> > /admin/reassignments and also on >>> > > > /admin/reassignments/mytopic). >>> > > > > > > Using >>> > > > > > > > >>> the >>> > > > > > > > >>> > separate notification path means I don't need any >>> > watchers >>> > > in >>> > > > > the >>> > > > > > > > >>> > /admin/reassignments subtree, so switching to >>> > > > > > > > >>> /admin/reassignments/mytopic/ >>> > > > > > > > >>> > 42 >>> > > > > > > > >>> > would work, and avoid /admin/reassignments having a >>> very >>> > > > large >>> > > > > > > number >>> > > > > > > > >>> of >>> > > > > > > > >>> > child nodes. On the other hand it also means I have >>> to >>> > > create >>> > > > > and >>> > > > > > > > >>> delete >>> > > > > > > > >>> > the topic nodes (e.g. /admin/reassignments/mytopic), >>> > which >>> > > > > incurs >>> > > > > > > the >>> > > > > > > > >>> cost >>> > > > > > > > >>> > of extra round trips to zookeeper. I suppose that >>> since >>> > > > > > > reassignment >>> > > > > > > > is >>> > > > > > > > >>> > generally a slow process it makes little difference >>> if we >>> > > > > > increase >>> > > > > > > > the >>> > > > > > > > >>> > latency of the interactions with zookeeper. >>> > > > > > > > >>> > >>> > > > > > > > >>> > I have updated the KIP with these improvements, and a >>> > more >>> > > > > > detailed >>> > > > > > > > >>> > description of exactly how we would manage these >>> znodes. >>> > > > > > > > >>> > >>> > > > > > > > >>> > Reading the algorithm in KafkaController. >>> > > > > > > onPartitionReassignment(), >>> > > > > > > > it >>> > > > > > > > >>> > seems that it would be suboptimal for changing >>> > > reassignments >>> > > > > > > > in-flight. >>> > > > > > > > >>> > Consider an initial assignment of [1,2], reassigned >>> to >>> > > [2,3] >>> > > > > and >>> > > > > > > then >>> > > > > > > > >>> > changed to [2,4]. Broker 3 will remain in the >>> assigned >>> > > > replicas >>> > > > > > > until >>> > > > > > > > >>> > broker 4 is in sync, even though 3 wasn't actually >>> one of >>> > > the >>> > > > > > > > original >>> > > > > > > > >>> > assigned replicas and is no longer a new assigned >>> > replica. >>> > > I >>> > > > > > think >>> > > > > > > > this >>> > > > > > > > >>> > also affects the case where the reassignment is >>> cancelled >>> > > > > > > > >>> > ([1,2]->[2,3]->[1,2]): We again have to wait for 3 to >>> > catch >>> > > > up, >>> > > > > > > even >>> > > > > > > > >>> though >>> > > > > > > > >>> > its replica will then be deleted. >>> > > > > > > > >>> > >>> > > > > > > > >>> > Should we seek to improve this algorithm in this >>> KIP, or >>> > > > leave >>> > > > > > that >>> > > > > > > > as >>> > > > > > > > >>> a >>> > > > > > > > >>> > later optimisation? >>> > > > > > > > >>> > >>> > > > > > > > >>> > Cheers, >>> > > > > > > > >>> > >>> > > > > > > > >>> > Tom >>> > > > > > > > >>> > >>> > > > > > > > >>> > On 11 December 2017 at 21:31, Jun Rao < >>> j...@confluent.io> >>> > > > > wrote: >>> > > > > > > > >>> > >>> > > > > > > > >>> > > Another question is on the compatibility. Since now >>> > there >>> > > > > are 2 >>> > > > > > > > ways >>> > > > > > > > >>> of >>> > > > > > > > >>> > > specifying a partition reassignment, one under >>> > > > > > > > >>> /admin/reassign_partitions >>> > > > > > > > >>> > > and the other under /admin/reassignments, we >>> probably >>> > > want >>> > > > to >>> > > > > > > > >>> prevent the >>> > > > > > > > >>> > > same topic being reassigned under both paths at the >>> > same >>> > > > > time? >>> > > > > > > > >>> > > Thanks, >>> > > > > > > > >>> > > >>> > > > > > > > >>> > > Jun >>> > > > > > > > >>> > > >>> > > > > > > > >>> > > >>> > > > > > > > >>> > > >>> > > > > > > > >>> > > On Fri, Dec 8, 2017 at 5:41 PM, Jun Rao < >>> > > j...@confluent.io> >>> > > > > > > wrote: >>> > > > > > > > >>> > > >>> > > > > > > > >>> > > > Hi, Tom, >>> > > > > > > > >>> > > > >>> > > > > > > > >>> > > > Thanks for the KIP. It definitely addresses one >>> of >>> > the >>> > > > pain >>> > > > > > > > points >>> > > > > > > > >>> in >>> > > > > > > > >>> > > > partition reassignment. Another issue that it >>> also >>> > > > > addresses >>> > > > > > is >>> > > > > > > > >>> the ZK >>> > > > > > > > >>> > > node >>> > > > > > > > >>> > > > size limit when writing the reassignment JSON. >>> > > > > > > > >>> > > > >>> > > > > > > > >>> > > > My only concern is that the KIP needs to create >>> one >>> > > > watcher >>> > > > > > per >>> > > > > > > > >>> > > reassigned >>> > > > > > > > >>> > > > partition. This could add overhead in ZK and >>> > complexity >>> > > > for >>> > > > > > > > >>> debugging >>> > > > > > > > >>> > > when >>> > > > > > > > >>> > > > lots of partitions are being reassigned >>> > simultaneously. >>> > > > We >>> > > > > > > could >>> > > > > > > > >>> > > > potentially improve this by introducing a >>> separate ZK >>> > > > path >>> > > > > > for >>> > > > > > > > >>> change >>> > > > > > > > >>> > > > notification as we do for configs. For example, >>> every >>> > > > time >>> > > > > we >>> > > > > > > > >>> change >>> > > > > > > > >>> > the >>> > > > > > > > >>> > > > assignment for a set of partitions, we could >>> further >>> > > > write >>> > > > > a >>> > > > > > > > >>> sequential >>> > > > > > > > >>> > > > node /admin/reassignment_changes/[change_x]. That >>> > way, >>> > > > the >>> > > > > > > > >>> controller >>> > > > > > > > >>> > > > only needs to watch the change path. Once a >>> change is >>> > > > > > > triggered, >>> > > > > > > > >>> the >>> > > > > > > > >>> > > > controller can read everything under >>> > > > /admin/reassignments/. >>> > > > > > > > >>> > > > >>> > > > > > > > >>> > > > Jun >>> > > > > > > > >>> > > > >>> > > > > > > > >>> > > > >>> > > > > > > > >>> > > > On Wed, Dec 6, 2017 at 1:19 PM, Tom Bentley < >>> > > > > > > > t.j.bent...@gmail.com >>> > > > > > > > >>> > >>> > > > > > > > >>> > > wrote: >>> > > > > > > > >>> > > > >>> > > > > > > > >>> > > >> Hi, >>> > > > > > > > >>> > > >> >>> > > > > > > > >>> > > >> This is still very new, but I wanted some quick >>> > > feedback >>> > > > > on >>> > > > > > a >>> > > > > > > > >>> > > preliminary >>> > > > > > > > >>> > > >> KIP which could, I think, help with providing an >>> > > > > AdminClient >>> > > > > > > API >>> > > > > > > > >>> for >>> > > > > > > > >>> > > >> partition reassignment. >>> > > > > > > > >>> > > >> >>> > > > > > > > >>> > > >> https://cwiki.apache.org/ >>> > > confluence/display/KAFKA/KIP- >>> > > > > 236% >>> > > > > > > > >>> > > >> 3A+Interruptible+Partition+Reassignment >>> > > > > > > > >>> > > >> >>> > > > > > > > >>> > > >> I wasn't sure whether to start fleshing out a >>> whole >>> > > > > > > AdminClient >>> > > > > > > > >>> API in >>> > > > > > > > >>> > > >> this >>> > > > > > > > >>> > > >> KIP (which would make it very big, and >>> difficult to >>> > > > read), >>> > > > > > or >>> > > > > > > > >>> whether >>> > > > > > > > >>> > to >>> > > > > > > > >>> > > >> break it down into smaller KIPs (which makes it >>> > easier >>> > > > to >>> > > > > > read >>> > > > > > > > and >>> > > > > > > > >>> > > >> implement in pieces, but harder to get a >>> high-level >>> > > > > picture >>> > > > > > of >>> > > > > > > > the >>> > > > > > > > >>> > > >> ultimate >>> > > > > > > > >>> > > >> destination). For now I've gone for a very small >>> > > initial >>> > > > > > KIP, >>> > > > > > > > but >>> > > > > > > > >>> I'm >>> > > > > > > > >>> > > >> happy >>> > > > > > > > >>> > > >> to sketch the bigger picture here if people are >>> > > > > interested. >>> > > > > > > > >>> > > >> >>> > > > > > > > >>> > > >> Cheers, >>> > > > > > > > >>> > > >> >>> > > > > > > > >>> > > >> Tom >>> > > > > > > > >>> > > >> >>> > > > > > > > >>> > > > >>> > > > > > > > >>> > > > >>> > > > > > > > >>> > > >>> > > > > > > > >>> > >>> > > > > > > > >>> > >>> > > > > > > > >>> > On 11 December 2017 at 21:31, Jun Rao < >>> j...@confluent.io> >>> > > > > wrote: >>> > > > > > > > >>> > >>> > > > > > > > >>> > > Another question is on the compatibility. Since now >>> > there >>> > > > > are 2 >>> > > > > > > > ways >>> > > > > > > > >>> of >>> > > > > > > > >>> > > specifying a partition reassignment, one under >>> > > > > > > > >>> /admin/reassign_partitions >>> > > > > > > > >>> > > and the other under /admin/reassignments, we >>> probably >>> > > want >>> > > > to >>> > > > > > > > >>> prevent the >>> > > > > > > > >>> > > same topic being reassigned under both paths at the >>> > same >>> > > > > time? >>> > > > > > > > >>> > > Thanks, >>> > > > > > > > >>> > > >>> > > > > > > > >>> > > Jun >>> > > > > > > > >>> > > >>> > > > > > > > >>> > > >>> > > > > > > > >>> > > >>> > > > > > > > >>> > > On Fri, Dec 8, 2017 at 5:41 PM, Jun Rao < >>> > > j...@confluent.io> >>> > > > > > > wrote: >>> > > > > > > > >>> > > >>> > > > > > > > >>> > > > Hi, Tom, >>> > > > > > > > >>> > > > >>> > > > > > > > >>> > > > Thanks for the KIP. It definitely addresses one >>> of >>> > the >>> > > > pain >>> > > > > > > > points >>> > > > > > > > >>> in >>> > > > > > > > >>> > > > partition reassignment. Another issue that it >>> also >>> > > > > addresses >>> > > > > > is >>> > > > > > > > >>> the ZK >>> > > > > > > > >>> > > node >>> > > > > > > > >>> > > > size limit when writing the reassignment JSON. >>> > > > > > > > >>> > > > >>> > > > > > > > >>> > > > My only concern is that the KIP needs to create >>> one >>> > > > watcher >>> > > > > > per >>> > > > > > > > >>> > > reassigned >>> > > > > > > > >>> > > > partition. This could add overhead in ZK and >>> > complexity >>> > > > for >>> > > > > > > > >>> debugging >>> > > > > > > > >>> > > when >>> > > > > > > > >>> > > > lots of partitions are being reassigned >>> > simultaneously. >>> > > > We >>> > > > > > > could >>> > > > > > > > >>> > > > potentially improve this by introducing a >>> separate ZK >>> > > > path >>> > > > > > for >>> > > > > > > > >>> change >>> > > > > > > > >>> > > > notification as we do for configs. For example, >>> every >>> > > > time >>> > > > > we >>> > > > > > > > >>> change >>> > > > > > > > >>> > the >>> > > > > > > > >>> > > > assignment for a set of partitions, we could >>> further >>> > > > write >>> > > > > a >>> > > > > > > > >>> sequential >>> > > > > > > > >>> > > > node /admin/reassignment_changes/[change_x]. That >>> > way, >>> > > > the >>> > > > > > > > >>> controller >>> > > > > > > > >>> > > > only needs to watch the change path. Once a >>> change is >>> > > > > > > triggered, >>> > > > > > > > >>> the >>> > > > > > > > >>> > > > controller can read everything under >>> > > > /admin/reassignments/. >>> > > > > > > > >>> > > > >>> > > > > > > > >>> > > > Jun >>> > > > > > > > >>> > > > >>> > > > > > > > >>> > > > >>> > > > > > > > >>> > > > On Wed, Dec 6, 2017 at 1:19 PM, Tom Bentley < >>> > > > > > > > t.j.bent...@gmail.com >>> > > > > > > > >>> > >>> > > > > > > > >>> > > wrote: >>> > > > > > > > >>> > > > >>> > > > > > > > >>> > > >> Hi, >>> > > > > > > > >>> > > >> >>> > > > > > > > >>> > > >> This is still very new, but I wanted some quick >>> > > feedback >>> > > > > on >>> > > > > > a >>> > > > > > > > >>> > > preliminary >>> > > > > > > > >>> > > >> KIP which could, I think, help with providing an >>> > > > > AdminClient >>> > > > > > > API >>> > > > > > > > >>> for >>> > > > > > > > >>> > > >> partition reassignment. >>> > > > > > > > >>> > > >> >>> > > > > > > > >>> > > >> https://cwiki.apache.org/ >>> > > confluence/display/KAFKA/KIP- >>> > > > > 236% >>> > > > > > > > >>> > > >> 3A+Interruptible+Partition+Reassignment >>> > > > > > > > >>> > > >> >>> > > > > > > > >>> > > >> I wasn't sure whether to start fleshing out a >>> whole >>> > > > > > > AdminClient >>> > > > > > > > >>> API in >>> > > > > > > > >>> > > >> this >>> > > > > > > > >>> > > >> KIP (which would make it very big, and >>> difficult to >>> > > > read), >>> > > > > > or >>> > > > > > > > >>> whether >>> > > > > > > > >>> > to >>> > > > > > > > >>> > > >> break it down into smaller KIPs (which makes it >>> > easier >>> > > > to >>> > > > > > read >>> > > > > > > > and >>> > > > > > > > >>> > > >> implement in pieces, but harder to get a >>> high-level >>> > > > > picture >>> > > > > > of >>> > > > > > > > the >>> > > > > > > > >>> > > >> ultimate >>> > > > > > > > >>> > > >> destination). For now I've gone for a very small >>> > > initial >>> > > > > > KIP, >>> > > > > > > > but >>> > > > > > > > >>> I'm >>> > > > > > > > >>> > > >> happy >>> > > > > > > > >>> > > >> to sketch the bigger picture here if people are >>> > > > > interested. >>> > > > > > > > >>> > > >> >>> > > > > > > > >>> > > >> Cheers, >>> > > > > > > > >>> > > >> >>> > > > > > > > >>> > > >> Tom >>> > > > > > > > >>> > > >> >>> > > > > > > > >>> > > > >>> > > > > > > > >>> > > > >>> > > > > > > > >>> > > >>> > > > > > > > >>> > >>> > > > > > > > >>> >>> > > > > > > > >> >>> > > > > > > > >> >>> > > > > > > > > >>> > > > > > > > >>> > > > > > > >>> > > > > > >>> > > > > >>> > > > >>> > > >>> > >>> > >>> >>