Hi Bruno, Thanks for the feedback. Please check my answers below:
1. No objections; sounds good. Updated KIP 2. No objections; sounds good. Updated KIP 3. Thanks for the information; I can change KIP only to expose prefix method instead of a constant if it’s the way forward. 4. Done. Updated KIP 5. Yes, order in standby.replicas.awareness config counts as stated in the STANDBY_REPLICA_AWARENESS_DOC. Actually, it plays a role in Partially Preferred distribution. In the example presented in the KIP, while one of the standby tasks can be placed in a different cluster and different zone compared to the active task, we have to choose either the same cluster or the same zone for the second standby task. In the first example presented in the KIP, while Node-5 is in the other cluster and other zone compared to the active task, the second standby task's preferred options are in different zones than Node-1 and Node-5, but in the same cluster as active task or the first standby task. Without importance semantics in standby.replicas.awareness, putting second standby task in Node-4 (different cluster, same zone as active task) would have been a valid option. I’ve updated KIP to clarify this a bit more, I hope this helps. 6. Thanks for pointing that out, it was a mistake. I’ve removed that phrase from the KIP. 7. It shouldn’t affect HighAvailabilityTaskAssignor in a “breaking way” meaning that all the existing behavior should stay as is (e.g., when new configurations are not specified). Once required configurations are set, the main change should happen in HighAvailabilityTaskAssignor#assignStandbyReplicaTasks and HighAvailabilityTaskAssignor#assignStandbyTaskMovements I hope this answers your questions. Regards, Levani > On 18. Feb 2021, at 15:10, Bruno Cadonna <br...@confluent.io> wrote: > > Hi Levani, > > Thank you for the KIP. > > Really interesting! > > Here my comments: > > 1. To be consistent with the other configs that involve standbys , I would > rename > standby.task.assignment.awareness -> standby.replicas.awareness > > 2. I would also rename the prefix > instance.tag -> client.tag > > 3. The following is a question about prefixes in general that maybe somebody > else can answer. In the config it says for other prefixes that it is > recommended to use the method *Prefix(final String prop) instead of the raw > prefix string. > > Is the plan to make the raw prefix string private in a future release? > Should we consider making only the prefix method for this KIP public? > > 4. Could you provide a mathematical formula instead of Java code for absolute > preferred standby task distribution and the other distributtion properties? > Could you also add an example for absolute preffered distribution for the > computation of the formula similar to what you did for the other properties? > > 5. Does the order of the tags given for standby.task.assignment.awareness > count? You mention it once, but then for the Partially Preferred standby task > distribution property it does not seem to be important. > > 6. In the section about least preferred standby task distribution, you state > that "and one [zone] will be reserved for active task". What do you mean by > that? All Streams clients will participate in the task assignment of active > tasks irrespective of their tags, right? The statement does also not really > fit with the example where active stateful task 0_0 is on Node-1, does it? > > 7. Could you also say some words about how this KIP affects the current > HighAvailabilityTaskAssignor? > > > Best, > Bruno > > On 09.02.21 15:54, Levani Kokhreidze wrote: >> Hello all, >> I’ve updated KIP-708 [1] to reflect the latest discussion outcomes. >> I’m looking forward to your feedback. >> Regards, >> Levani >> [1] - >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+awarness+for+Kafka+Streams >>> On 2. Feb 2021, at 22:03, Levani Kokhreidze <levani.co...@gmail.com> wrote: >>> >>> Hi John. >>> >>> Thanks a lot for this detailed analysis! >>> Yes, that is what I had in mind as well. >>> I also like that idea of having “task.assignment.awareness” configuration >>> to tell which instance tags can be used for rack awareness. >>> I may borrow it for this KIP if you don’t mind :) >>> >>> Thanks again John for this discussion, it’s really valuable. >>> >>> I’ll update the proposal and share it once again in this discussion thread. >>> >>> Regards, >>> Levani >>> >>>> On 2. Feb 2021, at 18:47, John Roesler <vvcep...@apache.org >>>> <mailto:vvcep...@apache.org> <mailto:vvcep...@apache.org >>>> <mailto:vvcep...@apache.org>>> wrote: >>>> >>>> Hi Levani, >>>> >>>> 1. Thanks for the details. >>>> >>>> I figured it must be something like this two-dimensional definition of >>>> "rack". >>>> >>>> It does seem like, if we make the config take a list of tags, we can define >>>> the semantics to be that the system will make a best effort to distribute >>>> the standbys over each rack dimension. >>>> >>>> In your example, there are two clusters and three AZs. The example >>>> configs would be: >>>> >>>> Node 1: >>>> instance.tag.cluster: K8s_Cluster1 >>>> instance.tag.zone: eu-central-1a >>>> task.assignment.awareness: cluster,zone >>>> >>>> Node 2: >>>> instance.tag.cluster: K8s_Cluster1 >>>> instance.tag.zone: eu-central-1b >>>> task.assignment.awareness: cluster,zone >>>> >>>> Node 3: >>>> instance.tag.cluster: K8s_Cluster1 >>>> instance.tag.zone: eu-central-1c >>>> task.assignment.awareness: cluster,zone >>>> >>>> Node 4: >>>> instance.tag.cluster: K8s_Cluster2 >>>> instance.tag.zone: eu-central-1a >>>> task.assignment.awareness: cluster,zone >>>> >>>> Node 5: >>>> instance.tag.cluster: K8s_Cluster2 >>>> instance.tag.zone: eu-central-1b >>>> task.assignment.awareness: cluster,zone >>>> >>>> Node 6: >>>> instance.tag.cluster: K8s_Cluster2 >>>> instance.tag.zone: eu-central-1c >>>> task.assignment.awareness: cluster,zone >>>> >>>> >>>> Now, if we have a task 0_0 with an active and two replicas, >>>> there are three total copies of the task to distribute over: >>>> * 6 instances >>>> * 2 clusters >>>> * 3 zones >>>> >>>> There is a constraint that we _cannot_ assign two copies of a task >>>> to a single instance, but it seems like the default rack awareness >>>> would permit us to assign two copies of a task to a rack, if (and only >>>> if) the number of copies is greater than the number of racks. >>>> >>>> So, the assignment we would get is like this: >>>> * assigned to three different instances >>>> * one copy in each of zone a, b, and c >>>> * two copies in one cluster and one in the other cluster >>>> >>>> For example, we might have 0_0 assigned to: >>>> * Node 1 (cluster 1, zone a) >>>> * Node 5 (cluster 2, zone b) >>>> * Node 3 (cluster 1, zone c) >>>> >>>> Is that what you were also thinking? >>>> >>>> Thanks, >>>> -John >>>> >>>> On Tue, Feb 2, 2021, at 02:24, Levani Kokhreidze wrote: >>>>> Hi John, >>>>> >>>>> 1. Main reason was that it seemed easier change compared to having >>>>> multiple tags assigned to each host. >>>>> >>>>> --- >>>>> >>>>> Answering your question what use-case I have in mind: >>>>> Lets say we have two Kubernetes clusters running the same Kafka Streams >>>>> application. >>>>> And each Kubernetes cluster is spanned across multiple AZ. >>>>> So the setup overall looks something like this: >>>>> >>>>> K8s_Cluster1 [eu-central-1a, eu-central-1b, eu-central-1c] >>>>> K8s_Cluster2 [eu-central-1a, eu-central-1b, eu-central-1c] >>>>> >>>>> Now, if Kafka Streams application is launched in K8s_Clister1: >>>>> eu-central-1a, >>>>> ideally I would want standby task to be created in the different K8s >>>>> cluster and region. >>>>> So in this example it can be K8s_Cluster2: [eu-central-1b, >>>>> eu-central-1c] >>>>> >>>>> But giving it a bit more thought, this can be implemented if we change >>>>> semantics of “tags” a bit. >>>>> So instead of doing full match with tags, we can do iterative matching >>>>> and it should work. >>>>> (If this is what you had in mind, apologies for the misunderstanding). >>>>> >>>>> If we consider the same example as mentioned above, for the active task >>>>> we would >>>>> have following tags: [K8s_Cluster1, eu-central-1a]. In order to >>>>> distribute standby task >>>>> in the different K8s cluster, plus in the different AWS region, standby >>>>> task assignment >>>>> algorithm can compare each tag by index. So steps would be something >>>>> like: >>>>> >>>>> // this will result in selecting client in the different K8s cluster >>>>> 1. clientsInDifferentCluster = (tagsOfActiveTask[0] != allClientTags[0]) >>>>> // this will result in selecting the client in different AWS region >>>>> 2. selectedClientForStandbyTask = (tagsOfActiveTask[1] != >>>>> clientsInDifferentCluster[1] ) >>>>> >>>>> WDYT? >>>>> >>>>> If you agree with the use-case I’ve mentioned, the pluggable assignor >>>>> can be differed to another KIP, yes. >>>>> As it won’t be required for this KIP and use-cases I had in mind to >>>>> work. >>>>> >>>>> Regards, >>>>> Levani >>>>> >>>>> >>>>>> On 2. Feb 2021, at 07:55, John Roesler <vvcep...@apache.org >>>>>> <mailto:vvcep...@apache.org> <mailto:vvcep...@apache.org >>>>>> <mailto:vvcep...@apache.org>>> wrote: >>>>>> >>>>>> Hello Levani, >>>>>> >>>>>> Thanks for the reply. >>>>>> >>>>>> 1. Interesting; why did you change your mind? >>>>>> >>>>>> I have a gut feeling that we can achieve pretty much any rack awareness >>>>>> need that people have by using purely config, which is obviously much >>>>>> easier to use. But if you had a case in mind where this wouldn’t work, >>>>>> it would be good to know. >>>>>> >>>>>> In fact, if that is true, then perhaps you could just defer the whole >>>>>> idea of a pluggable interface (point 2) to a separate KIP. I do think a >>>>>> pluggable assignor would be extremely valuable, but it might be nice to >>>>>> cut the scope of KIP-708 if just a config will suffice. >>>>>> >>>>>> What do you think? >>>>>> Thanks, >>>>>> John >>>>>> >>>>>> >>>>>> On Mon, Feb 1, 2021, at 06:07, Levani Kokhreidze wrote: >>>>>>> Hi John, >>>>>>> >>>>>>> Thanks a lot for thorough feedback, it’s really valuable. >>>>>>> >>>>>>> 1. Agree with this. Had the same idea initially. >>>>>>> We can set some upper limit in terms of what’s >>>>>>> the max number of tags users can set to make >>>>>>> sure it’s not overused. By default, we can create >>>>>>> standby tasks where tags are different from active task (full match). >>>>>>> This should mimic default rack awareness behaviour. >>>>>>> >>>>>>> 2. I like the idea and I’d be happy to work on >>>>>>> refactoring TaskAssignor to accommodate rack awareness use-case. >>>>>>> When I was going through the code, it felt way more natural >>>>>>> to use pluggable TaskAssignor for achieving rack awareness >>>>>>> instead of introducing new interface and contract. >>>>>>> But I thought approach mentioned in the KIP is simpler so >>>>>>> decided to move forward with it as an initial proposal :). >>>>>>> But I agree with you, it will be much better if we can have >>>>>>> TaskAssignor as pluggable interface users can use. >>>>>>> One potential challenge I see with this is that, if we just let >>>>>>> users implement TaskAssignor in its current form, we will be forcing >>>>>>> users to implement functionality for active task assignment, as well as >>>>>>> standby task assignment. This feels like not very clear contract, >>>>>>> because with >>>>>>> just TaskAssignor interface it’s not really clear they one needs to >>>>>>> allocate >>>>>>> standby tasks as well. We can enforce it on some level with the return >>>>>>> object >>>>>>> You’ve mentioned TaskAssignor#assign has to return, but still feels >>>>>>> error prone. >>>>>>> In addition, I suspect in most of the cases users would want >>>>>>> to control standby task assignment and leave active task assignment as >>>>>>> is. >>>>>>> To make implementation of standby task assignment easier for users, >>>>>>> what if >>>>>>> we decouple active and standby task assignment from the `TaskAssignor`? >>>>>>> Idea I have in mind is to split TaskAssignor into ActiveTaskAssignor >>>>>>> and StandbyTaskAssignor >>>>>>> and let users add their own implementation for them separately if they >>>>>>> like via config. >>>>>>> >>>>>>> If this approach sounds reasonable, I’ll work on updating KIP this week. >>>>>>> >>>>>>> Thanks, >>>>>>> Levani >>>>>>> >>>>>>>> On 28. Jan 2021, at 19:20, John Roesler <vvcep...@apache.org >>>>>>>> <mailto:vvcep...@apache.org> <mailto:vvcep...@apache.org >>>>>>>> <mailto:vvcep...@apache.org>>> wrote: >>>>>>>> >>>>>>>> Thanks, Levani! >>>>>>>> >>>>>>>> I was reflecting more on your KIP last night. >>>>>>>> >>>>>>>> One thing I should mention is that I have previously used >>>>>>>> the rack awareness feature of Elasticsearch, and found it to >>>>>>>> be pretty intuitive and also capable of what we needed in >>>>>>>> our AWS clusters. As you look at related work, you might >>>>>>>> take ES into consideration. >>>>>>>> >>>>>>>> I was also had some thoughts about your proposal. >>>>>>>> >>>>>>>> 1. I'm wondering if we instead allow people to add arbitrary >>>>>>>> tags to each host, and then have a configuration to specify >>>>>>>> a combination of tags to use for rack awareness. This seems >>>>>>>> easier to manage than for the use case you anticipate where >>>>>>>> people would concatenate rackId = (clusterId + AZ), and then >>>>>>>> have to parse the rackId back out to compute the assignment. >>>>>>>> >>>>>>>> 2. About the proposed RackAwareStandbyTaskAssignor, I'm >>>>>>>> wondering if we can change the level of abstraction a little >>>>>>>> bit and capture even more value here. One thing we wanted to >>>>>>>> do in KIP-441, but decided to cut from the scope, was to >>>>>>>> define a public TaskAssignor interface so that people can >>>>>>>> plug in the whole task assignment algorithm. >>>>>>>> >>>>>>>> In fact, there is already an internal config and interface >>>>>>>> for this (`internal.task.assignor.class`: >>>>>>>> `org.apache.kafka.streams.processor.internals.assignment.Tas >>>>>>>> kAssignor`). >>>>>>>> >>>>>>>> We kept that interface and config internal because the >>>>>>>> current TaskAssignor interface has a number of flaws, but if >>>>>>>> we correct those flaws, we can offer a nice public interface >>>>>>>> that people can use to control the standby allocation, but >>>>>>>> also active task allocation, based on the tags I suggested >>>>>>>> in (1). >>>>>>>> >>>>>>>> I don't think we need too much work to refactor >>>>>>>> TaskAssignor, the main problems are that the assign method >>>>>>>> mutates its input and that it doesn't expose the full >>>>>>>> metadata from the cluster members. Therefore, if you like >>>>>>>> this idea, we should propose to refactor TaskAssignor with: >>>>>>>> * input: an immutable description of the cluster, including >>>>>>>> current lags of all stateful tasks and current stateless >>>>>>>> task assignments, as well as metadata for each host. >>>>>>>> * output: an object describing the new assignment as well as >>>>>>>> a flag on whether to schedule a followup probing rebalance. >>>>>>>> >>>>>>>> An even more stretchy stretch goal would be to include >>>>>>>> metadata of the brokers, which could be used to achieve >>>>>>>> higher levels of rack awareness. For example, we could co- >>>>>>>> locate tasks in the same "rack" (AZ) as the partition leader >>>>>>>> for their input or output topics, to minimize cross-AZ >>>>>>>> traffic. I'm not sure to what extent clients can learn the >>>>>>>> relevant broker metadata, so this stretch might not be >>>>>>>> currently feasible, but as long as we design the >>>>>>>> TaskAssignor for extensibility, we can do something like >>>>>>>> this in the future. >>>>>>>> >>>>>>>> Thanks again for this proposal, I hope the above is more >>>>>>>> inspiring than annoying :) >>>>>>>> >>>>>>>> I really think your KIP is super high value in whatever form >>>>>>>> you ultimately land on. >>>>>>>> >>>>>>>> >>>>>>>> Thanks, >>>>>>>> John >>>>>>>> >>>>>>>> On Thu, 2021-01-28 at 13:08 +0200, Levani Kokhreidze wrote: >>>>>>>>> Hi John >>>>>>>>> >>>>>>>>> Thanks for the feedback (and for the great work on KIP441 :) ). >>>>>>>>> Makes sense, will add a section in the KIP explaining rack >>>>>>>>> awarenesses on high level and how it’s implemented in the different >>>>>>>>> distributed systems. >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> Levani >>>>>>>>> >>>>>>>>>> On 27. Jan 2021, at 16:07, John Roesler <vvcep...@apache.org >>>>>>>>>> <mailto:vvcep...@apache.org> <mailto:vvcep...@apache.org >>>>>>>>>> <mailto:vvcep...@apache.org>>> wrote: >>>>>>>>>> >>>>>>>>>> Hi Levani, >>>>>>>>>> >>>>>>>>>> Thanks for this KIP! I think this is really high value; it was >>>>>>>>>> something I was disappointed I didn’t get to do as part of KIP-441. >>>>>>>>>> >>>>>>>>>> Rack awareness is a feature provided by other distributed systems as >>>>>>>>>> well. I wonder if your KIP could devote a section to summarizing >>>>>>>>>> what rack awareness looks like in other distributed systems, to help >>>>>>>>>> us put this design in context. >>>>>>>>>> >>>>>>>>>> Thanks! >>>>>>>>>> John >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Tue, Jan 26, 2021, at 16:46, Levani Kokhreidze wrote: >>>>>>>>>>> Hello all, >>>>>>>>>>> >>>>>>>>>>> I’d like to start discussion on KIP-708 [1] that aims to introduce >>>>>>>>>>> rack >>>>>>>>>>> aware standby task distribution in Kafka Streams. >>>>>>>>>>> In addition to changes mentioned in the KIP, I’d like to get some >>>>>>>>>>> ideas >>>>>>>>>>> on additional change I have in mind. >>>>>>>>>>> Assuming KIP moves forward, I was wondering if it makes sense to >>>>>>>>>>> configure Kafka Streams consumer instances with the rack ID passed >>>>>>>>>>> with >>>>>>>>>>> the new StreamsConfig#RACK_ID_CONFIG property. >>>>>>>>>>> In practice, that would mean that when “rack.id <http://rack.id/> >>>>>>>>>>> <http://rack.id/ <http://rack.id/>> <http://rack.id/ >>>>>>>>>>> <http://rack.id/> <http://rack.id/ <http://rack.id/>>>” is >>>>>>>>>>> configured in Kafka Streams, it will automatically translate into >>>>>>>>>>> ConsumerConfig#CLIENT_RACK_ID config for all the KafkaConsumer >>>>>>>>>>> clients >>>>>>>>>>> that is used by Kafka Streams internally. >>>>>>>>>>> >>>>>>>>>>> [1] >>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor >>>>>>>>>>> >>>>>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor> >>>>>>>>>>> >>>>>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor >>>>>>>>>>> >>>>>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor>> >>>>>>>>>>> >>>>>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor >>>>>>>>>>> >>>>>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor> >>>>>>>>>>> >>>>>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor >>>>>>>>>>> >>>>>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor>>> >>>>>>>>>>> >>>>>>>>>>> P.S >>>>>>>>>>> I have draft PR ready, if it helps the discussion moving forward, I >>>>>>>>>>> can >>>>>>>>>>> provide the draft PR link in this thread. >>>>>>>>>>> >>>>>>>>>>> Regards, >>>>>>>>>>> Levani