Hi Bruno, Thanks for the feedback. I think it makes sense. I’ve updated the KIP [1] and tried to omit implementation details around the algorithm.
Please let me know if the latest version looks OK. Regards, Levani [1] https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+awarness+for+Kafka+Streams > On 25. Feb 2021, at 17:59, Bruno Cadonna <br...@confluent.io> wrote: > > Hi Levani, > > I discussed your KIP with John the other day and we both think it is a really > interesting KIP and you did a good job in writing it. However, we think that > the KIP exposes to many implementation details. That makes future changes to > the implementation of the distribution algorithm harder without a KIP. So, we > would like to propose to just describe the config and the properties that any > implementation of the distribution algorithm should have. We did something > similar in KIP-441 for the task assignment algorithm [1]. > > Specifically, for your KIP, any possible implementation of the distribution > algorithm should read the tags to be considered for rack awareness from the > config and if the cluster allows to distribute each active task and its > replicas to Streams clients with different values for each tag, the algorithm > will do so. How the implementation behaves, if a cluster does not allow to > distribute over all tag values can be left as an implementation detail. This > would give us flexibility for future changes to the distribution algorithm. > > Since there may be distribution algorithms that do not use the order of the > tags, it would be better to not mention the order of the tags in the config > doc. I would propose to omit the config doc from the KIP or formulate it > really generic. > > We would also like to rename standby.replicas.awareness to > task.assignment.rack.awareness or something that does not contain standby > and/or replica (sorry for requesting again to change this name). That way, we > might be able to use this config also when we decide to make the active task > assignment rack aware. > > I hope all of this makes sense to you. > > Thank you again for the interesting KIP! > > Looking forward to your implementation! > > Best, > Bruno > > > [1] > https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams#KIP441:SmoothScalingOutforKafkaStreams-AssignmentAlgorithm > > <https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams#KIP441:SmoothScalingOutforKafkaStreams-AssignmentAlgorithm> > > On 22.02.21 17:42, Levani Kokhreidze wrote: >> Hi Bruno, >> Thanks for the quick reply >> 5.Sorry, maybe I am not making it clear. >> What you have described is how it should work, yes. As it is stated in KIP, >> with the importance semantics in standby.replicas.awareness, >> if we have an active task on Node-1 and the first standby task on Node-5, >> the third standby should be on Node-3 or Node-6 (both of them are in the >> different AZ compared to the active and the first standby task). >> That flow is described in Partially Preferred Distribution section [1]. >> Node-4 could have been a valid option IF standby.replicas.awareness didn’t >> have the importance semantics because Kafka Streams could have just picked >> Node-4 in that case. >> 7. Yup, will do. >> 8. Good question, So far assumption I had was that the configuration between >> different Kafka Streams instances is the same. Can we have an extra >> validation check to make sure that is the case? >> If not, what you have mentioned in point 5 always preferring the dimension >> where there’re enough KS instances is very valid. >> On the other hand, I thought allowing to specify the importance of the >> various dimensions may give users extra flexibility over standby task >> allocation. >> [1] >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+awarness+for+Kafka+Streams#KIP708:RackawarnessforKafkaStreams-PartiallyPreferredStandbyTaskDistribution >> Regards, >> Levani >>> On 22. Feb 2021, at 16:51, Bruno Cadonna <br...@confluent.io> wrote: >>> >>> Hi Levani, >>> >>> Thanks for the modifications! >>> >>> I have some follow up questions/comments: >>> >>> 5. Something is not clear to me. If the active is on Node-1 and the first >>> replica is on Node-5 (different cluster, different zone), why would the >>> second replica go to Node-4 that has a different cluster than but the same >>> zone as the active instead of Node-6 which has a different zone of Node-1? >>> In general wouldn't it be better to guarantee under Partially Preferred >>> task distribution to distribute active and standby replicas of the same >>> task over the dimension that has at least as many values as the number of >>> replicas + 1 and then over the dimensions that have less values? That would >>> then also be independent on the ordering of the tags. >>> >>> 7. I agree with you. Could you add a sentence or two about this to the KIP? >>> >>> New question: >>> >>> 8. How would the assignor react on different numbers and different >>> orderings of the tags in standby.replicas.awareness across Streams clients? >>> >>> Best, >>> Bruno >>> >>> >>> On 22.02.21 11:46, Levani Kokhreidze wrote: >>>> Hi Bruno, >>>> Thanks for the feedback. Please check my answers below: >>>> 1. No objections; sounds good. Updated KIP >>>> 2. No objections; sounds good. Updated KIP >>>> 3. Thanks for the information; I can change KIP only to expose prefix >>>> method instead of a constant if it’s the way forward. >>>> 4. Done. Updated KIP >>>> 5. Yes, order in standby.replicas.awareness config counts as stated in the >>>> STANDBY_REPLICA_AWARENESS_DOC. >>>> Actually, it plays a role in Partially Preferred distribution. In the >>>> example presented in the KIP, while one of the standby tasks can be placed >>>> in a different cluster and different zone compared to the active task, we >>>> have to choose either the same cluster or the same zone for the second >>>> standby task. In the first example presented in the KIP, while Node-5 is >>>> in the other cluster and other zone compared to the active task, the >>>> second standby task's preferred options are in different zones than Node-1 >>>> and Node-5, but in the same cluster as active task or the first standby >>>> task. Without importance semantics in standby.replicas.awareness, putting >>>> second standby task in Node-4 (different cluster, same zone as active >>>> task) would have been a valid option. >>>> I’ve updated KIP to clarify this a bit more, I hope this helps. >>>> 6. Thanks for pointing that out, it was a mistake. I’ve removed that >>>> phrase from the KIP. >>>> 7. It shouldn’t affect HighAvailabilityTaskAssignor in a “breaking way” >>>> meaning that all the existing behavior should stay as is (e.g., when new >>>> configurations are not specified). Once required configurations are set, >>>> the main change should happen in >>>> HighAvailabilityTaskAssignor#assignStandbyReplicaTasks and >>>> HighAvailabilityTaskAssignor#assignStandbyTaskMovements >>>> I hope this answers your questions. >>>> Regards, >>>> Levani >>>>> On 18. Feb 2021, at 15:10, Bruno Cadonna <br...@confluent.io> wrote: >>>>> >>>>> Hi Levani, >>>>> >>>>> Thank you for the KIP. >>>>> >>>>> Really interesting! >>>>> >>>>> Here my comments: >>>>> >>>>> 1. To be consistent with the other configs that involve standbys , I >>>>> would rename >>>>> standby.task.assignment.awareness -> standby.replicas.awareness >>>>> >>>>> 2. I would also rename the prefix >>>>> instance.tag -> client.tag >>>>> >>>>> 3. The following is a question about prefixes in general that maybe >>>>> somebody else can answer. In the config it says for other prefixes that >>>>> it is recommended to use the method *Prefix(final String prop) instead of >>>>> the raw prefix string. >>>>> >>>>> Is the plan to make the raw prefix string private in a future release? >>>>> Should we consider making only the prefix method for this KIP public? >>>>> >>>>> 4. Could you provide a mathematical formula instead of Java code for >>>>> absolute preferred standby task distribution and the other distributtion >>>>> properties? Could you also add an example for absolute preffered >>>>> distribution for the computation of the formula similar to what you did >>>>> for the other properties? >>>>> >>>>> 5. Does the order of the tags given for standby.task.assignment.awareness >>>>> count? You mention it once, but then for the Partially Preferred standby >>>>> task distribution property it does not seem to be important. >>>>> >>>>> 6. In the section about least preferred standby task distribution, you >>>>> state that "and one [zone] will be reserved for active task". What do you >>>>> mean by that? All Streams clients will participate in the task assignment >>>>> of active tasks irrespective of their tags, right? The statement does >>>>> also not really fit with the example where active stateful task 0_0 is on >>>>> Node-1, does it? >>>>> >>>>> 7. Could you also say some words about how this KIP affects the current >>>>> HighAvailabilityTaskAssignor? >>>>> >>>>> >>>>> Best, >>>>> Bruno >>>>> >>>>> On 09.02.21 15:54, Levani Kokhreidze wrote: >>>>>> Hello all, >>>>>> I’ve updated KIP-708 [1] to reflect the latest discussion outcomes. >>>>>> I’m looking forward to your feedback. >>>>>> Regards, >>>>>> Levani >>>>>> [1] - >>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+awarness+for+Kafka+Streams >>>>>>> On 2. Feb 2021, at 22:03, Levani Kokhreidze <levani.co...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>> Hi John. >>>>>>> >>>>>>> Thanks a lot for this detailed analysis! >>>>>>> Yes, that is what I had in mind as well. >>>>>>> I also like that idea of having “task.assignment.awareness” >>>>>>> configuration >>>>>>> to tell which instance tags can be used for rack awareness. >>>>>>> I may borrow it for this KIP if you don’t mind :) >>>>>>> >>>>>>> Thanks again John for this discussion, it’s really valuable. >>>>>>> >>>>>>> I’ll update the proposal and share it once again in this discussion >>>>>>> thread. >>>>>>> >>>>>>> Regards, >>>>>>> Levani >>>>>>> >>>>>>>> On 2. Feb 2021, at 18:47, John Roesler <vvcep...@apache.org >>>>>>>> <mailto:vvcep...@apache.org> <mailto:vvcep...@apache.org >>>>>>>> <mailto:vvcep...@apache.org>> <mailto:vvcep...@apache.org >>>>>>>> <mailto:vvcep...@apache.org> <mailto:vvcep...@apache.org >>>>>>>> <mailto:vvcep...@apache.org>>> <mailto:vvcep...@apache.org >>>>>>>> <mailto:vvcep...@apache.org> <mailto:vvcep...@apache.org >>>>>>>> <mailto:vvcep...@apache.org>> <mailto:vvcep...@apache.org >>>>>>>> <mailto:vvcep...@apache.org> <mailto:vvcep...@apache.org >>>>>>>> <mailto:vvcep...@apache.org>>>>> wrote: >>>>>>>> >>>>>>>> Hi Levani, >>>>>>>> >>>>>>>> 1. Thanks for the details. >>>>>>>> >>>>>>>> I figured it must be something like this two-dimensional definition of >>>>>>>> "rack". >>>>>>>> >>>>>>>> It does seem like, if we make the config take a list of tags, we can >>>>>>>> define >>>>>>>> the semantics to be that the system will make a best effort to >>>>>>>> distribute >>>>>>>> the standbys over each rack dimension. >>>>>>>> >>>>>>>> In your example, there are two clusters and three AZs. The example >>>>>>>> configs would be: >>>>>>>> >>>>>>>> Node 1: >>>>>>>> instance.tag.cluster: K8s_Cluster1 >>>>>>>> instance.tag.zone: eu-central-1a >>>>>>>> task.assignment.awareness: cluster,zone >>>>>>>> >>>>>>>> Node 2: >>>>>>>> instance.tag.cluster: K8s_Cluster1 >>>>>>>> instance.tag.zone: eu-central-1b >>>>>>>> task.assignment.awareness: cluster,zone >>>>>>>> >>>>>>>> Node 3: >>>>>>>> instance.tag.cluster: K8s_Cluster1 >>>>>>>> instance.tag.zone: eu-central-1c >>>>>>>> task.assignment.awareness: cluster,zone >>>>>>>> >>>>>>>> Node 4: >>>>>>>> instance.tag.cluster: K8s_Cluster2 >>>>>>>> instance.tag.zone: eu-central-1a >>>>>>>> task.assignment.awareness: cluster,zone >>>>>>>> >>>>>>>> Node 5: >>>>>>>> instance.tag.cluster: K8s_Cluster2 >>>>>>>> instance.tag.zone: eu-central-1b >>>>>>>> task.assignment.awareness: cluster,zone >>>>>>>> >>>>>>>> Node 6: >>>>>>>> instance.tag.cluster: K8s_Cluster2 >>>>>>>> instance.tag.zone: eu-central-1c >>>>>>>> task.assignment.awareness: cluster,zone >>>>>>>> >>>>>>>> >>>>>>>> Now, if we have a task 0_0 with an active and two replicas, >>>>>>>> there are three total copies of the task to distribute over: >>>>>>>> * 6 instances >>>>>>>> * 2 clusters >>>>>>>> * 3 zones >>>>>>>> >>>>>>>> There is a constraint that we _cannot_ assign two copies of a task >>>>>>>> to a single instance, but it seems like the default rack awareness >>>>>>>> would permit us to assign two copies of a task to a rack, if (and only >>>>>>>> if) the number of copies is greater than the number of racks. >>>>>>>> >>>>>>>> So, the assignment we would get is like this: >>>>>>>> * assigned to three different instances >>>>>>>> * one copy in each of zone a, b, and c >>>>>>>> * two copies in one cluster and one in the other cluster >>>>>>>> >>>>>>>> For example, we might have 0_0 assigned to: >>>>>>>> * Node 1 (cluster 1, zone a) >>>>>>>> * Node 5 (cluster 2, zone b) >>>>>>>> * Node 3 (cluster 1, zone c) >>>>>>>> >>>>>>>> Is that what you were also thinking? >>>>>>>> >>>>>>>> Thanks, >>>>>>>> -John >>>>>>>> >>>>>>>> On Tue, Feb 2, 2021, at 02:24, Levani Kokhreidze wrote: >>>>>>>>> Hi John, >>>>>>>>> >>>>>>>>> 1. Main reason was that it seemed easier change compared to having >>>>>>>>> multiple tags assigned to each host. >>>>>>>>> >>>>>>>>> --- >>>>>>>>> >>>>>>>>> Answering your question what use-case I have in mind: >>>>>>>>> Lets say we have two Kubernetes clusters running the same Kafka >>>>>>>>> Streams >>>>>>>>> application. >>>>>>>>> And each Kubernetes cluster is spanned across multiple AZ. >>>>>>>>> So the setup overall looks something like this: >>>>>>>>> >>>>>>>>> K8s_Cluster1 [eu-central-1a, eu-central-1b, eu-central-1c] >>>>>>>>> K8s_Cluster2 [eu-central-1a, eu-central-1b, eu-central-1c] >>>>>>>>> >>>>>>>>> Now, if Kafka Streams application is launched in K8s_Clister1: >>>>>>>>> eu-central-1a, >>>>>>>>> ideally I would want standby task to be created in the different K8s >>>>>>>>> cluster and region. >>>>>>>>> So in this example it can be K8s_Cluster2: [eu-central-1b, >>>>>>>>> eu-central-1c] >>>>>>>>> >>>>>>>>> But giving it a bit more thought, this can be implemented if we change >>>>>>>>> semantics of “tags” a bit. >>>>>>>>> So instead of doing full match with tags, we can do iterative matching >>>>>>>>> and it should work. >>>>>>>>> (If this is what you had in mind, apologies for the misunderstanding). >>>>>>>>> >>>>>>>>> If we consider the same example as mentioned above, for the active >>>>>>>>> task >>>>>>>>> we would >>>>>>>>> have following tags: [K8s_Cluster1, eu-central-1a]. In order to >>>>>>>>> distribute standby task >>>>>>>>> in the different K8s cluster, plus in the different AWS region, >>>>>>>>> standby >>>>>>>>> task assignment >>>>>>>>> algorithm can compare each tag by index. So steps would be something >>>>>>>>> like: >>>>>>>>> >>>>>>>>> // this will result in selecting client in the different K8s cluster >>>>>>>>> 1. clientsInDifferentCluster = (tagsOfActiveTask[0] != >>>>>>>>> allClientTags[0]) >>>>>>>>> // this will result in selecting the client in different AWS region >>>>>>>>> 2. selectedClientForStandbyTask = (tagsOfActiveTask[1] != >>>>>>>>> clientsInDifferentCluster[1] ) >>>>>>>>> >>>>>>>>> WDYT? >>>>>>>>> >>>>>>>>> If you agree with the use-case I’ve mentioned, the pluggable assignor >>>>>>>>> can be differed to another KIP, yes. >>>>>>>>> As it won’t be required for this KIP and use-cases I had in mind to >>>>>>>>> work. >>>>>>>>> >>>>>>>>> Regards, >>>>>>>>> Levani >>>>>>>>> >>>>>>>>> >>>>>>>>>> On 2. Feb 2021, at 07:55, John Roesler <vvcep...@apache.org >>>>>>>>>> <mailto:vvcep...@apache.org> <mailto:vvcep...@apache.org >>>>>>>>>> <mailto:vvcep...@apache.org>> <mailto:vvcep...@apache.org >>>>>>>>>> <mailto:vvcep...@apache.org> <mailto:vvcep...@apache.org >>>>>>>>>> <mailto:vvcep...@apache.org>>> <mailto:vvcep...@apache.org >>>>>>>>>> <mailto:vvcep...@apache.org> <mailto:vvcep...@apache.org >>>>>>>>>> <mailto:vvcep...@apache.org>> <mailto:vvcep...@apache.org >>>>>>>>>> <mailto:vvcep...@apache.org> <mailto:vvcep...@apache.org >>>>>>>>>> <mailto:vvcep...@apache.org>>>>> wrote: >>>>>>>>>> >>>>>>>>>> Hello Levani, >>>>>>>>>> >>>>>>>>>> Thanks for the reply. >>>>>>>>>> >>>>>>>>>> 1. Interesting; why did you change your mind? >>>>>>>>>> >>>>>>>>>> I have a gut feeling that we can achieve pretty much any rack >>>>>>>>>> awareness need that people have by using purely config, which is >>>>>>>>>> obviously much easier to use. But if you had a case in mind where >>>>>>>>>> this wouldn’t work, it would be good to know. >>>>>>>>>> >>>>>>>>>> In fact, if that is true, then perhaps you could just defer the >>>>>>>>>> whole idea of a pluggable interface (point 2) to a separate KIP. I >>>>>>>>>> do think a pluggable assignor would be extremely valuable, but it >>>>>>>>>> might be nice to cut the scope of KIP-708 if just a config will >>>>>>>>>> suffice. >>>>>>>>>> >>>>>>>>>> What do you think? >>>>>>>>>> Thanks, >>>>>>>>>> John >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Mon, Feb 1, 2021, at 06:07, Levani Kokhreidze wrote: >>>>>>>>>>> Hi John, >>>>>>>>>>> >>>>>>>>>>> Thanks a lot for thorough feedback, it’s really valuable. >>>>>>>>>>> >>>>>>>>>>> 1. Agree with this. Had the same idea initially. >>>>>>>>>>> We can set some upper limit in terms of what’s >>>>>>>>>>> the max number of tags users can set to make >>>>>>>>>>> sure it’s not overused. By default, we can create >>>>>>>>>>> standby tasks where tags are different from active task (full >>>>>>>>>>> match). >>>>>>>>>>> This should mimic default rack awareness behaviour. >>>>>>>>>>> >>>>>>>>>>> 2. I like the idea and I’d be happy to work on >>>>>>>>>>> refactoring TaskAssignor to accommodate rack awareness use-case. >>>>>>>>>>> When I was going through the code, it felt way more natural >>>>>>>>>>> to use pluggable TaskAssignor for achieving rack awareness >>>>>>>>>>> instead of introducing new interface and contract. >>>>>>>>>>> But I thought approach mentioned in the KIP is simpler so >>>>>>>>>>> decided to move forward with it as an initial proposal :). >>>>>>>>>>> But I agree with you, it will be much better if we can have >>>>>>>>>>> TaskAssignor as pluggable interface users can use. >>>>>>>>>>> One potential challenge I see with this is that, if we just let >>>>>>>>>>> users implement TaskAssignor in its current form, we will be forcing >>>>>>>>>>> users to implement functionality for active task assignment, as >>>>>>>>>>> well as >>>>>>>>>>> standby task assignment. This feels like not very clear contract, >>>>>>>>>>> because with >>>>>>>>>>> just TaskAssignor interface it’s not really clear they one needs to >>>>>>>>>>> allocate >>>>>>>>>>> standby tasks as well. We can enforce it on some level with the >>>>>>>>>>> return >>>>>>>>>>> object >>>>>>>>>>> You’ve mentioned TaskAssignor#assign has to return, but still feels >>>>>>>>>>> error prone. >>>>>>>>>>> In addition, I suspect in most of the cases users would want >>>>>>>>>>> to control standby task assignment and leave active task assignment >>>>>>>>>>> as >>>>>>>>>>> is. >>>>>>>>>>> To make implementation of standby task assignment easier for users, >>>>>>>>>>> what if >>>>>>>>>>> we decouple active and standby task assignment from the >>>>>>>>>>> `TaskAssignor`? >>>>>>>>>>> Idea I have in mind is to split TaskAssignor into ActiveTaskAssignor >>>>>>>>>>> and StandbyTaskAssignor >>>>>>>>>>> and let users add their own implementation for them separately if >>>>>>>>>>> they >>>>>>>>>>> like via config. >>>>>>>>>>> >>>>>>>>>>> If this approach sounds reasonable, I’ll work on updating KIP this >>>>>>>>>>> week. >>>>>>>>>>> >>>>>>>>>>> Thanks, >>>>>>>>>>> Levani >>>>>>>>>>> >>>>>>>>>>>> On 28. Jan 2021, at 19:20, John Roesler <vvcep...@apache.org >>>>>>>>>>>> <mailto:vvcep...@apache.org> <mailto:vvcep...@apache.org >>>>>>>>>>>> <mailto:vvcep...@apache.org>> <mailto:vvcep...@apache.org >>>>>>>>>>>> <mailto:vvcep...@apache.org> <mailto:vvcep...@apache.org >>>>>>>>>>>> <mailto:vvcep...@apache.org>>> <mailto:vvcep...@apache.org >>>>>>>>>>>> <mailto:vvcep...@apache.org> <mailto:vvcep...@apache.org >>>>>>>>>>>> <mailto:vvcep...@apache.org>> <mailto:vvcep...@apache.org >>>>>>>>>>>> <mailto:vvcep...@apache.org> <mailto:vvcep...@apache.org >>>>>>>>>>>> <mailto:vvcep...@apache.org>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>> Thanks, Levani! >>>>>>>>>>>> >>>>>>>>>>>> I was reflecting more on your KIP last night. >>>>>>>>>>>> >>>>>>>>>>>> One thing I should mention is that I have previously used >>>>>>>>>>>> the rack awareness feature of Elasticsearch, and found it to >>>>>>>>>>>> be pretty intuitive and also capable of what we needed in >>>>>>>>>>>> our AWS clusters. As you look at related work, you might >>>>>>>>>>>> take ES into consideration. >>>>>>>>>>>> >>>>>>>>>>>> I was also had some thoughts about your proposal. >>>>>>>>>>>> >>>>>>>>>>>> 1. I'm wondering if we instead allow people to add arbitrary >>>>>>>>>>>> tags to each host, and then have a configuration to specify >>>>>>>>>>>> a combination of tags to use for rack awareness. This seems >>>>>>>>>>>> easier to manage than for the use case you anticipate where >>>>>>>>>>>> people would concatenate rackId = (clusterId + AZ), and then >>>>>>>>>>>> have to parse the rackId back out to compute the assignment. >>>>>>>>>>>> >>>>>>>>>>>> 2. About the proposed RackAwareStandbyTaskAssignor, I'm >>>>>>>>>>>> wondering if we can change the level of abstraction a little >>>>>>>>>>>> bit and capture even more value here. One thing we wanted to >>>>>>>>>>>> do in KIP-441, but decided to cut from the scope, was to >>>>>>>>>>>> define a public TaskAssignor interface so that people can >>>>>>>>>>>> plug in the whole task assignment algorithm. >>>>>>>>>>>> >>>>>>>>>>>> In fact, there is already an internal config and interface >>>>>>>>>>>> for this (`internal.task.assignor.class`: >>>>>>>>>>>> `org.apache.kafka.streams.processor.internals.assignment.Tas >>>>>>>>>>>> kAssignor`). >>>>>>>>>>>> >>>>>>>>>>>> We kept that interface and config internal because the >>>>>>>>>>>> current TaskAssignor interface has a number of flaws, but if >>>>>>>>>>>> we correct those flaws, we can offer a nice public interface >>>>>>>>>>>> that people can use to control the standby allocation, but >>>>>>>>>>>> also active task allocation, based on the tags I suggested >>>>>>>>>>>> in (1). >>>>>>>>>>>> >>>>>>>>>>>> I don't think we need too much work to refactor >>>>>>>>>>>> TaskAssignor, the main problems are that the assign method >>>>>>>>>>>> mutates its input and that it doesn't expose the full >>>>>>>>>>>> metadata from the cluster members. Therefore, if you like >>>>>>>>>>>> this idea, we should propose to refactor TaskAssignor with: >>>>>>>>>>>> * input: an immutable description of the cluster, including >>>>>>>>>>>> current lags of all stateful tasks and current stateless >>>>>>>>>>>> task assignments, as well as metadata for each host. >>>>>>>>>>>> * output: an object describing the new assignment as well as >>>>>>>>>>>> a flag on whether to schedule a followup probing rebalance. >>>>>>>>>>>> >>>>>>>>>>>> An even more stretchy stretch goal would be to include >>>>>>>>>>>> metadata of the brokers, which could be used to achieve >>>>>>>>>>>> higher levels of rack awareness. For example, we could co- >>>>>>>>>>>> locate tasks in the same "rack" (AZ) as the partition leader >>>>>>>>>>>> for their input or output topics, to minimize cross-AZ >>>>>>>>>>>> traffic. I'm not sure to what extent clients can learn the >>>>>>>>>>>> relevant broker metadata, so this stretch might not be >>>>>>>>>>>> currently feasible, but as long as we design the >>>>>>>>>>>> TaskAssignor for extensibility, we can do something like >>>>>>>>>>>> this in the future. >>>>>>>>>>>> >>>>>>>>>>>> Thanks again for this proposal, I hope the above is more >>>>>>>>>>>> inspiring than annoying :) >>>>>>>>>>>> >>>>>>>>>>>> I really think your KIP is super high value in whatever form >>>>>>>>>>>> you ultimately land on. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Thanks, >>>>>>>>>>>> John >>>>>>>>>>>> >>>>>>>>>>>> On Thu, 2021-01-28 at 13:08 +0200, Levani Kokhreidze wrote: >>>>>>>>>>>>> Hi John >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks for the feedback (and for the great work on KIP441 :) ). >>>>>>>>>>>>> Makes sense, will add a section in the KIP explaining rack >>>>>>>>>>>>> awarenesses on high level and how it’s implemented in the >>>>>>>>>>>>> different distributed systems. >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks, >>>>>>>>>>>>> Levani >>>>>>>>>>>>> >>>>>>>>>>>>>> On 27. Jan 2021, at 16:07, John Roesler <vvcep...@apache.org >>>>>>>>>>>>>> <mailto:vvcep...@apache.org> <mailto:vvcep...@apache.org >>>>>>>>>>>>>> <mailto:vvcep...@apache.org>> <mailto:vvcep...@apache.org >>>>>>>>>>>>>> <mailto:vvcep...@apache.org> <mailto:vvcep...@apache.org >>>>>>>>>>>>>> <mailto:vvcep...@apache.org>>> <mailto:vvcep...@apache.org >>>>>>>>>>>>>> <mailto:vvcep...@apache.org> <mailto:vvcep...@apache.org >>>>>>>>>>>>>> <mailto:vvcep...@apache.org>> <mailto:vvcep...@apache.org >>>>>>>>>>>>>> <mailto:vvcep...@apache.org> <mailto:vvcep...@apache.org >>>>>>>>>>>>>> <mailto:vvcep...@apache.org>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>> Hi Levani, >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks for this KIP! I think this is really high value; it was >>>>>>>>>>>>>> something I was disappointed I didn’t get to do as part of >>>>>>>>>>>>>> KIP-441. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Rack awareness is a feature provided by other distributed >>>>>>>>>>>>>> systems as well. I wonder if your KIP could devote a section to >>>>>>>>>>>>>> summarizing what rack awareness looks like in other distributed >>>>>>>>>>>>>> systems, to help us put this design in context. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks! >>>>>>>>>>>>>> John >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Tue, Jan 26, 2021, at 16:46, Levani Kokhreidze wrote: >>>>>>>>>>>>>>> Hello all, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I’d like to start discussion on KIP-708 [1] that aims to >>>>>>>>>>>>>>> introduce rack >>>>>>>>>>>>>>> aware standby task distribution in Kafka Streams. >>>>>>>>>>>>>>> In addition to changes mentioned in the KIP, I’d like to get >>>>>>>>>>>>>>> some ideas >>>>>>>>>>>>>>> on additional change I have in mind. >>>>>>>>>>>>>>> Assuming KIP moves forward, I was wondering if it makes sense to >>>>>>>>>>>>>>> configure Kafka Streams consumer instances with the rack ID >>>>>>>>>>>>>>> passed with >>>>>>>>>>>>>>> the new StreamsConfig#RACK_ID_CONFIG property. >>>>>>>>>>>>>>> In practice, that would mean that when “rack.id >>>>>>>>>>>>>>> <http://rack.id/> <http://rack.id/ <http://rack.id/>> >>>>>>>>>>>>>>> <http://rack.id/ <http://rack.id/> <http://rack.id/ >>>>>>>>>>>>>>> <http://rack.id/>>> <http://rack.id/ <http://rack.id/> >>>>>>>>>>>>>>> <http://rack.id/ <http://rack.id/>> <http://rack.id/ >>>>>>>>>>>>>>> <http://rack.id/> <http://rack.id/ <http://rack.id/>>>> >>>>>>>>>>>>>>> <http://rack.id/ <http://rack.id/> <http://rack.id/ >>>>>>>>>>>>>>> <http://rack.id/>> <http://rack.id/ <http://rack.id/> >>>>>>>>>>>>>>> <http://rack.id/ <http://rack.id/>>> <http://rack.id/ >>>>>>>>>>>>>>> <http://rack.id/> <http://rack.id/ <http://rack.id/>> >>>>>>>>>>>>>>> <http://rack.id/ <http://rack.id/> <http://rack.id/ >>>>>>>>>>>>>>> <http://rack.id/>>>>>” is >>>>>>>>>>>>>>> configured in Kafka Streams, it will automatically translate >>>>>>>>>>>>>>> into >>>>>>>>>>>>>>> ConsumerConfig#CLIENT_RACK_ID config for all the KafkaConsumer >>>>>>>>>>>>>>> clients >>>>>>>>>>>>>>> that is used by Kafka Streams internally. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> [1] >>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor>><https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor>><https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor>><https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor>><https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> P.S >>>>>>>>>>>>>>> I have draft PR ready, if it helps the discussion moving >>>>>>>>>>>>>>> forward, I can >>>>>>>>>>>>>>> provide the draft PR link in this thread. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>> Levani