Thanks Levani for the explanation. I think I understand. Is "rack" still a useful term in this context? I think my concept of "rack" made it hard for me to wrap my head around the multiple tags approach. For example, how can a node be in different racks at the same time? And why would multiple dimensions be needed to describe a rack? I see from your example that this is not the intention, but the terminology made me think it was.
Could just be me. Ryanne On Sun, Feb 28, 2021, 6:18 AM Levani Kokhreidze <levani.co...@gmail.com> wrote: > Hello Ryanne, > > Thanks for the question. > Tag approach gives more flexibility, which otherwise could have been only > possible with pluggable custom logic Kafka Streams's user must provide (it > is briefly described in "Rejected Alternatives" section). > For instance, if we append multiple tags to form a single rack, it may not > give desired distribution to the user if the infrastructure topology is > more complex. > Let us consider the following example with appending multiple tags to form > the single rack. > Node-1: > rack.id: K8s_Cluster1-eu-central-1a > num.standby.replicas: 1 > > Node-2: > rack.id: K8s_Cluster1-eu-central-1b > num.standby.replicas: 1 > > Node-3: > rack.id: K8s_Cluster1-eu-central-1c > num.standby.replicas: 1 > > Node-4: > rack.id: K8s_Cluster2-eu-central-1a > num.standby.replicas: 1 > > Node-5: > rack.id: K8s_Cluster2-eu-central-1b > num.standby.replicas: 1 > > Node-6: > rack.id: K8s_Cluster2-eu-central-1c > num.standby.replicas: 1 > > In the example mentioned above, we have three AZs and two Kubernetes > clusters. Our use-case is to distribute standby task in the different > Kubernetes cluster and different availability zone. > For instance, if the active task is in Node1 (K8s_Cluster1-eu-central-1a), > the corresponding standby task should be in either > Node-5(K8s_Cluster2-eu-central-1b) or Node-6(K8s_Cluster2-eu-central-1c). > Unfortunately, without custom logic provided by the user, this would be > very hard to achieve with a single "rack.id" configuration. Because > without any input from the user, Kafka Streams might as well allocate > standby task for the active task either: > In the same Kubernetes cluster and different AZ (Node-2, Node-3) > In the different Kubernetes cluster but the same AZ (Node-4) > On the other hand, with the combination of the new "client.tag.*" and > "task.assignment.rack.awareness" configurations, standby task distribution > algorithm will be able to figure out what will be the most optimal > distribution by balancing the standby tasks over each client.tag dimension > individually. And it can be achieved by simply providing necessary > configurations to Kafka Streams. > The flow was described in more details in previous versions of the KIP, > but I've omitted the KIP algorithm implementation details based on received > feedback. But I acknowledge that this information can be put in the KIP for > better clarity. I took the liberty of updating the KIP with the example > mentioned above [1]. > I hope this answeres your question. > > Regards, > Levani > > [1] - > https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+awarness+for+Kafka+Streams#KIP708:RackawarnessforKafkaStreams-Benefitsoftagsvssinglerack.idconfiguration > > > On 28. Feb 2021, at 01:37, Ryanne Dolan <ryannedo...@gmail.com> wrote: > > > > I guess I don't understand how multiple tags work together to achieve > rack > > awareness. I realize I could go look at how Elasticseach works, but > ideally > > this would be more plain in the KIP. > > > > In particular I'm not sure how the tag approach is different than > appending > > multiple tags together, e.g. how is cluster=foo, zone=bar different than > > rack=foo-bar? > > > > Ryanne > > > > On Sat, Feb 27, 2021, 5:00 PM Levani Kokhreidze <levani.co...@gmail.com > <mailto:levani.co...@gmail.com>> > > wrote: > > > >> Hi Bruno, > >> > >> Thanks for the feedback. I think it makes sense. > >> I’ve updated the KIP [1] and tried to omit implementation details around > >> the algorithm. > >> > >> Please let me know if the latest version looks OK. > >> > >> Regards, > >> Levani > >> > >> > >> [1] > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+awarness+for+Kafka+Streams > >> > >>> On 25. Feb 2021, at 17:59, Bruno Cadonna <br...@confluent.io> wrote: > >>> > >>> Hi Levani, > >>> > >>> I discussed your KIP with John the other day and we both think it is a > >> really interesting KIP and you did a good job in writing it. However, we > >> think that the KIP exposes to many implementation details. That makes > >> future changes to the implementation of the distribution algorithm > harder > >> without a KIP. So, we would like to propose to just describe the config > and > >> the properties that any implementation of the distribution algorithm > should > >> have. We did something similar in KIP-441 for the task assignment > algorithm > >> [1]. > >>> > >>> Specifically, for your KIP, any possible implementation of the > >> distribution algorithm should read the tags to be considered for rack > >> awareness from the config and if the cluster allows to distribute each > >> active task and its replicas to Streams clients with different values > for > >> each tag, the algorithm will do so. How the implementation behaves, if a > >> cluster does not allow to distribute over all tag values can be left as > an > >> implementation detail. This would give us flexibility for future > changes to > >> the distribution algorithm. > >>> > >>> Since there may be distribution algorithms that do not use the order of > >> the tags, it would be better to not mention the order of the tags in the > >> config doc. I would propose to omit the config doc from the KIP or > >> formulate it really generic. > >>> > >>> We would also like to rename standby.replicas.awareness to > >> task.assignment.rack.awareness or something that does not contain > standby > >> and/or replica (sorry for requesting again to change this name). That > way, > >> we might be able to use this config also when we decide to make the > active > >> task assignment rack aware. > >>> > >>> I hope all of this makes sense to you. > >>> > >>> Thank you again for the interesting KIP! > >>> > >>> Looking forward to your implementation! > >>> > >>> Best, > >>> Bruno > >>> > >>> > >>> [1] > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams#KIP441:SmoothScalingOutforKafkaStreams-AssignmentAlgorithm > >> < > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams#KIP441:SmoothScalingOutforKafkaStreams-AssignmentAlgorithm > < > https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams#KIP441:SmoothScalingOutforKafkaStreams-AssignmentAlgorithm > > > >>> > >>> > >>> On 22.02.21 17:42, Levani Kokhreidze wrote: > >>>> Hi Bruno, > >>>> Thanks for the quick reply > >>>> 5.Sorry, maybe I am not making it clear. > >>>> What you have described is how it should work, yes. As it is stated in > >> KIP, with the importance semantics in standby.replicas.awareness, > >>>> if we have an active task on Node-1 and the first standby task on > >> Node-5, the third standby should be on Node-3 or Node-6 (both of them > are > >> in the different AZ compared to the active and the first standby task). > >>>> That flow is described in Partially Preferred Distribution section > [1]. > >>>> Node-4 could have been a valid option IF standby.replicas.awareness > >> didn’t have the importance semantics because Kafka Streams could have > just > >> picked Node-4 in that case. > >>>> 7. Yup, will do. > >>>> 8. Good question, So far assumption I had was that the configuration > >> between different Kafka Streams instances is the same. Can we have an > extra > >> validation check to make sure that is the case? > >>>> If not, what you have mentioned in point 5 always preferring the > >> dimension where there’re enough KS instances is very valid. > >>>> On the other hand, I thought allowing to specify the importance of the > >> various dimensions may give users extra flexibility over standby task > >> allocation. > >>>> [1] > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+awarness+for+Kafka+Streams#KIP708:RackawarnessforKafkaStreams-PartiallyPreferredStandbyTaskDistribution > >>>> Regards, > >>>> Levani > >>>>> On 22. Feb 2021, at 16:51, Bruno Cadonna <br...@confluent.io> wrote: > >>>>> > >>>>> Hi Levani, > >>>>> > >>>>> Thanks for the modifications! > >>>>> > >>>>> I have some follow up questions/comments: > >>>>> > >>>>> 5. Something is not clear to me. If the active is on Node-1 and the > >> first replica is on Node-5 (different cluster, different zone), why > would > >> the second replica go to Node-4 that has a different cluster than but > the > >> same zone as the active instead of Node-6 which has a different zone of > >> Node-1? In general wouldn't it be better to guarantee under Partially > >> Preferred task distribution to distribute active and standby replicas of > >> the same task over the dimension that has at least as many values as the > >> number of replicas + 1 and then over the dimensions that have less > values? > >> That would then also be independent on the ordering of the tags. > >>>>> > >>>>> 7. I agree with you. Could you add a sentence or two about this to > the > >> KIP? > >>>>> > >>>>> New question: > >>>>> > >>>>> 8. How would the assignor react on different numbers and different > >> orderings of the tags in standby.replicas.awareness across Streams > clients? > >>>>> > >>>>> Best, > >>>>> Bruno > >>>>> > >>>>> > >>>>> On 22.02.21 11:46, Levani Kokhreidze wrote: > >>>>>> Hi Bruno, > >>>>>> Thanks for the feedback. Please check my answers below: > >>>>>> 1. No objections; sounds good. Updated KIP > >>>>>> 2. No objections; sounds good. Updated KIP > >>>>>> 3. Thanks for the information; I can change KIP only to expose > prefix > >> method instead of a constant if it’s the way forward. > >>>>>> 4. Done. Updated KIP > >>>>>> 5. Yes, order in standby.replicas.awareness config counts as stated > >> in the STANDBY_REPLICA_AWARENESS_DOC. > >>>>>> Actually, it plays a role in Partially Preferred distribution. In > the > >> example presented in the KIP, while one of the standby tasks can be > placed > >> in a different cluster and different zone compared to the active task, > we > >> have to choose either the same cluster or the same zone for the second > >> standby task. In the first example presented in the KIP, while Node-5 > is in > >> the other cluster and other zone compared to the active task, the second > >> standby task's preferred options are in different zones than Node-1 and > >> Node-5, but in the same cluster as active task or the first standby > task. > >> Without importance semantics in standby.replicas.awareness, putting > second > >> standby task in Node-4 (different cluster, same zone as active task) > would > >> have been a valid option. > >>>>>> I’ve updated KIP to clarify this a bit more, I hope this helps. > >>>>>> 6. Thanks for pointing that out, it was a mistake. I’ve removed that > >> phrase from the KIP. > >>>>>> 7. It shouldn’t affect HighAvailabilityTaskAssignor in a “breaking > >> way” meaning that all the existing behavior should stay as is (e.g., > when > >> new configurations are not specified). Once required configurations are > >> set, the main change should happen in > >> HighAvailabilityTaskAssignor#assignStandbyReplicaTasks and > >> HighAvailabilityTaskAssignor#assignStandbyTaskMovements > >>>>>> I hope this answers your questions. > >>>>>> Regards, > >>>>>> Levani > >>>>>>> On 18. Feb 2021, at 15:10, Bruno Cadonna <br...@confluent.io> > wrote: > >>>>>>> > >>>>>>> Hi Levani, > >>>>>>> > >>>>>>> Thank you for the KIP. > >>>>>>> > >>>>>>> Really interesting! > >>>>>>> > >>>>>>> Here my comments: > >>>>>>> > >>>>>>> 1. To be consistent with the other configs that involve standbys , > I > >> would rename > >>>>>>> standby.task.assignment.awareness -> standby.replicas.awareness > >>>>>>> > >>>>>>> 2. I would also rename the prefix > >>>>>>> instance.tag -> client.tag > >>>>>>> > >>>>>>> 3. The following is a question about prefixes in general that maybe > >> somebody else can answer. In the config it says for other prefixes that > it > >> is recommended to use the method *Prefix(final String prop) instead of > the > >> raw prefix string. > >>>>>>> > >>>>>>> Is the plan to make the raw prefix string private in a future > >> release? > >>>>>>> Should we consider making only the prefix method for this KIP > public? > >>>>>>> > >>>>>>> 4. Could you provide a mathematical formula instead of Java code > for > >> absolute preferred standby task distribution and the other distributtion > >> properties? Could you also add an example for absolute preffered > >> distribution for the computation of the formula similar to what you did > for > >> the other properties? > >>>>>>> > >>>>>>> 5. Does the order of the tags given for > >> standby.task.assignment.awareness count? You mention it once, but then > for > >> the Partially Preferred standby task distribution property it does not > seem > >> to be important. > >>>>>>> > >>>>>>> 6. In the section about least preferred standby task distribution, > >> you state that "and one [zone] will be reserved for active task". What > do > >> you mean by that? All Streams clients will participate in the task > >> assignment of active tasks irrespective of their tags, right? The > statement > >> does also not really fit with the example where active stateful task > 0_0 is > >> on Node-1, does it? > >>>>>>> > >>>>>>> 7. Could you also say some words about how this KIP affects the > >> current HighAvailabilityTaskAssignor? > >>>>>>> > >>>>>>> > >>>>>>> Best, > >>>>>>> Bruno > >>>>>>> > >>>>>>> On 09.02.21 15:54, Levani Kokhreidze wrote: > >>>>>>>> Hello all, > >>>>>>>> I’ve updated KIP-708 [1] to reflect the latest discussion > outcomes. > >>>>>>>> I’m looking forward to your feedback. > >>>>>>>> Regards, > >>>>>>>> Levani > >>>>>>>> [1] - > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+awarness+for+Kafka+Streams > >>>>>>>>> On 2. Feb 2021, at 22:03, Levani Kokhreidze < > >> levani.co...@gmail.com> wrote: > >>>>>>>>> > >>>>>>>>> Hi John. > >>>>>>>>> > >>>>>>>>> Thanks a lot for this detailed analysis! > >>>>>>>>> Yes, that is what I had in mind as well. > >>>>>>>>> I also like that idea of having “task.assignment.awareness” > >> configuration > >>>>>>>>> to tell which instance tags can be used for rack awareness. > >>>>>>>>> I may borrow it for this KIP if you don’t mind :) > >>>>>>>>> > >>>>>>>>> Thanks again John for this discussion, it’s really valuable. > >>>>>>>>> > >>>>>>>>> I’ll update the proposal and share it once again in this > >> discussion thread. > >>>>>>>>> > >>>>>>>>> Regards, > >>>>>>>>> Levani > >>>>>>>>> > >>>>>>>>>> On 2. Feb 2021, at 18:47, John Roesler <vvcep...@apache.org > >> <mailto:vvcep...@apache.org> <mailto:vvcep...@apache.org <mailto: > >> vvcep...@apache.org>> <mailto:vvcep...@apache.org <mailto: > >> vvcep...@apache.org> <mailto:vvcep...@apache.org <mailto: > >> vvcep...@apache.org <mailto:vvcep...@apache.org>>>> <mailto: > vvcep...@apache.org <mailto:vvcep...@apache.org> <mailto: > >> vvcep...@apache.org <mailto:vvcep...@apache.org>> <mailto: > vvcep...@apache.org <mailto:vvcep...@apache.org> <mailto: > >> vvcep...@apache.org <mailto:vvcep...@apache.org>>> <mailto: > vvcep...@apache.org <mailto:vvcep...@apache.org> <mailto: > >> vvcep...@apache.org <mailto:vvcep...@apache.org>> <mailto: > vvcep...@apache.org <mailto:vvcep...@apache.org> <mailto: > >> vvcep...@apache.org <mailto:vvcep...@apache.org>>>>>> wrote: > >>>>>>>>>> > >>>>>>>>>> Hi Levani, > >>>>>>>>>> > >>>>>>>>>> 1. Thanks for the details. > >>>>>>>>>> > >>>>>>>>>> I figured it must be something like this two-dimensional > >> definition of "rack". > >>>>>>>>>> > >>>>>>>>>> It does seem like, if we make the config take a list of tags, we > >> can define > >>>>>>>>>> the semantics to be that the system will make a best effort to > >> distribute > >>>>>>>>>> the standbys over each rack dimension. > >>>>>>>>>> > >>>>>>>>>> In your example, there are two clusters and three AZs. The > example > >>>>>>>>>> configs would be: > >>>>>>>>>> > >>>>>>>>>> Node 1: > >>>>>>>>>> instance.tag.cluster: K8s_Cluster1 > >>>>>>>>>> instance.tag.zone: eu-central-1a > >>>>>>>>>> task.assignment.awareness: cluster,zone > >>>>>>>>>> > >>>>>>>>>> Node 2: > >>>>>>>>>> instance.tag.cluster: K8s_Cluster1 > >>>>>>>>>> instance.tag.zone: eu-central-1b > >>>>>>>>>> task.assignment.awareness: cluster,zone > >>>>>>>>>> > >>>>>>>>>> Node 3: > >>>>>>>>>> instance.tag.cluster: K8s_Cluster1 > >>>>>>>>>> instance.tag.zone: eu-central-1c > >>>>>>>>>> task.assignment.awareness: cluster,zone > >>>>>>>>>> > >>>>>>>>>> Node 4: > >>>>>>>>>> instance.tag.cluster: K8s_Cluster2 > >>>>>>>>>> instance.tag.zone: eu-central-1a > >>>>>>>>>> task.assignment.awareness: cluster,zone > >>>>>>>>>> > >>>>>>>>>> Node 5: > >>>>>>>>>> instance.tag.cluster: K8s_Cluster2 > >>>>>>>>>> instance.tag.zone: eu-central-1b > >>>>>>>>>> task.assignment.awareness: cluster,zone > >>>>>>>>>> > >>>>>>>>>> Node 6: > >>>>>>>>>> instance.tag.cluster: K8s_Cluster2 > >>>>>>>>>> instance.tag.zone: eu-central-1c > >>>>>>>>>> task.assignment.awareness: cluster,zone > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> Now, if we have a task 0_0 with an active and two replicas, > >>>>>>>>>> there are three total copies of the task to distribute over: > >>>>>>>>>> * 6 instances > >>>>>>>>>> * 2 clusters > >>>>>>>>>> * 3 zones > >>>>>>>>>> > >>>>>>>>>> There is a constraint that we _cannot_ assign two copies of a > task > >>>>>>>>>> to a single instance, but it seems like the default rack > awareness > >>>>>>>>>> would permit us to assign two copies of a task to a rack, if > (and > >> only > >>>>>>>>>> if) the number of copies is greater than the number of racks. > >>>>>>>>>> > >>>>>>>>>> So, the assignment we would get is like this: > >>>>>>>>>> * assigned to three different instances > >>>>>>>>>> * one copy in each of zone a, b, and c > >>>>>>>>>> * two copies in one cluster and one in the other cluster > >>>>>>>>>> > >>>>>>>>>> For example, we might have 0_0 assigned to: > >>>>>>>>>> * Node 1 (cluster 1, zone a) > >>>>>>>>>> * Node 5 (cluster 2, zone b) > >>>>>>>>>> * Node 3 (cluster 1, zone c) > >>>>>>>>>> > >>>>>>>>>> Is that what you were also thinking? > >>>>>>>>>> > >>>>>>>>>> Thanks, > >>>>>>>>>> -John > >>>>>>>>>> > >>>>>>>>>> On Tue, Feb 2, 2021, at 02:24, Levani Kokhreidze wrote: > >>>>>>>>>>> Hi John, > >>>>>>>>>>> > >>>>>>>>>>> 1. Main reason was that it seemed easier change compared to > >> having > >>>>>>>>>>> multiple tags assigned to each host. > >>>>>>>>>>> > >>>>>>>>>>> --- > >>>>>>>>>>> > >>>>>>>>>>> Answering your question what use-case I have in mind: > >>>>>>>>>>> Lets say we have two Kubernetes clusters running the same Kafka > >> Streams > >>>>>>>>>>> application. > >>>>>>>>>>> And each Kubernetes cluster is spanned across multiple AZ. > >>>>>>>>>>> So the setup overall looks something like this: > >>>>>>>>>>> > >>>>>>>>>>> K8s_Cluster1 [eu-central-1a, eu-central-1b, eu-central-1c] > >>>>>>>>>>> K8s_Cluster2 [eu-central-1a, eu-central-1b, eu-central-1c] > >>>>>>>>>>> > >>>>>>>>>>> Now, if Kafka Streams application is launched in K8s_Clister1: > >>>>>>>>>>> eu-central-1a, > >>>>>>>>>>> ideally I would want standby task to be created in the > different > >> K8s > >>>>>>>>>>> cluster and region. > >>>>>>>>>>> So in this example it can be K8s_Cluster2: [eu-central-1b, > >>>>>>>>>>> eu-central-1c] > >>>>>>>>>>> > >>>>>>>>>>> But giving it a bit more thought, this can be implemented if we > >> change > >>>>>>>>>>> semantics of “tags” a bit. > >>>>>>>>>>> So instead of doing full match with tags, we can do iterative > >> matching > >>>>>>>>>>> and it should work. > >>>>>>>>>>> (If this is what you had in mind, apologies for the > >> misunderstanding). > >>>>>>>>>>> > >>>>>>>>>>> If we consider the same example as mentioned above, for the > >> active task > >>>>>>>>>>> we would > >>>>>>>>>>> have following tags: [K8s_Cluster1, eu-central-1a]. In order to > >>>>>>>>>>> distribute standby task > >>>>>>>>>>> in the different K8s cluster, plus in the different AWS region, > >> standby > >>>>>>>>>>> task assignment > >>>>>>>>>>> algorithm can compare each tag by index. So steps would be > >> something > >>>>>>>>>>> like: > >>>>>>>>>>> > >>>>>>>>>>> // this will result in selecting client in the different K8s > >> cluster > >>>>>>>>>>> 1. clientsInDifferentCluster = (tagsOfActiveTask[0] != > >> allClientTags[0]) > >>>>>>>>>>> // this will result in selecting the client in different AWS > >> region > >>>>>>>>>>> 2. selectedClientForStandbyTask = (tagsOfActiveTask[1] != > >>>>>>>>>>> clientsInDifferentCluster[1] ) > >>>>>>>>>>> > >>>>>>>>>>> WDYT? > >>>>>>>>>>> > >>>>>>>>>>> If you agree with the use-case I’ve mentioned, the pluggable > >> assignor > >>>>>>>>>>> can be differed to another KIP, yes. > >>>>>>>>>>> As it won’t be required for this KIP and use-cases I had in > mind > >> to > >>>>>>>>>>> work. > >>>>>>>>>>> > >>>>>>>>>>> Regards, > >>>>>>>>>>> Levani > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>>> On 2. Feb 2021, at 07:55, John Roesler <vvcep...@apache.org > >> <mailto:vvcep...@apache.org> <mailto:vvcep...@apache.org <mailto: > >> vvcep...@apache.org>> <mailto:vvcep...@apache.org <mailto: > >> vvcep...@apache.org> <mailto:vvcep...@apache.org <mailto: > >> vvcep...@apache.org <mailto:vvcep...@apache.org>>>> <mailto: > vvcep...@apache.org <mailto:vvcep...@apache.org> <mailto: > >> vvcep...@apache.org <mailto:vvcep...@apache.org>> <mailto: > vvcep...@apache.org <mailto:vvcep...@apache.org> <mailto: > >> vvcep...@apache.org <mailto:vvcep...@apache.org>>> <mailto: > vvcep...@apache.org <mailto:vvcep...@apache.org> <mailto: > >> vvcep...@apache.org <mailto:vvcep...@apache.org>> <mailto: > vvcep...@apache.org <mailto:vvcep...@apache.org> <mailto: > >> vvcep...@apache.org <mailto:vvcep...@apache.org>>>>>> wrote: > >>>>>>>>>>>> > >>>>>>>>>>>> Hello Levani, > >>>>>>>>>>>> > >>>>>>>>>>>> Thanks for the reply. > >>>>>>>>>>>> > >>>>>>>>>>>> 1. Interesting; why did you change your mind? > >>>>>>>>>>>> > >>>>>>>>>>>> I have a gut feeling that we can achieve pretty much any rack > >> awareness need that people have by using purely config, which is > obviously > >> much easier to use. But if you had a case in mind where this wouldn’t > work, > >> it would be good to know. > >>>>>>>>>>>> > >>>>>>>>>>>> In fact, if that is true, then perhaps you could just defer > the > >> whole idea of a pluggable interface (point 2) to a separate KIP. I do > think > >> a pluggable assignor would be extremely valuable, but it might be nice > to > >> cut the scope of KIP-708 if just a config will suffice. > >>>>>>>>>>>> > >>>>>>>>>>>> What do you think? > >>>>>>>>>>>> Thanks, > >>>>>>>>>>>> John > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> On Mon, Feb 1, 2021, at 06:07, Levani Kokhreidze wrote: > >>>>>>>>>>>>> Hi John, > >>>>>>>>>>>>> > >>>>>>>>>>>>> Thanks a lot for thorough feedback, it’s really valuable. > >>>>>>>>>>>>> > >>>>>>>>>>>>> 1. Agree with this. Had the same idea initially. > >>>>>>>>>>>>> We can set some upper limit in terms of what’s > >>>>>>>>>>>>> the max number of tags users can set to make > >>>>>>>>>>>>> sure it’s not overused. By default, we can create > >>>>>>>>>>>>> standby tasks where tags are different from active task (full > >> match). > >>>>>>>>>>>>> This should mimic default rack awareness behaviour. > >>>>>>>>>>>>> > >>>>>>>>>>>>> 2. I like the idea and I’d be happy to work on > >>>>>>>>>>>>> refactoring TaskAssignor to accommodate rack awareness > >> use-case. > >>>>>>>>>>>>> When I was going through the code, it felt way more natural > >>>>>>>>>>>>> to use pluggable TaskAssignor for achieving rack awareness > >>>>>>>>>>>>> instead of introducing new interface and contract. > >>>>>>>>>>>>> But I thought approach mentioned in the KIP is simpler so > >>>>>>>>>>>>> decided to move forward with it as an initial proposal :). > >>>>>>>>>>>>> But I agree with you, it will be much better if we can have > >>>>>>>>>>>>> TaskAssignor as pluggable interface users can use. > >>>>>>>>>>>>> One potential challenge I see with this is that, if we just > let > >>>>>>>>>>>>> users implement TaskAssignor in its current form, we will be > >> forcing > >>>>>>>>>>>>> users to implement functionality for active task assignment, > >> as well as > >>>>>>>>>>>>> standby task assignment. This feels like not very clear > >> contract, > >>>>>>>>>>>>> because with > >>>>>>>>>>>>> just TaskAssignor interface it’s not really clear they one > >> needs to > >>>>>>>>>>>>> allocate > >>>>>>>>>>>>> standby tasks as well. We can enforce it on some level with > >> the return > >>>>>>>>>>>>> object > >>>>>>>>>>>>> You’ve mentioned TaskAssignor#assign has to return, but still > >> feels > >>>>>>>>>>>>> error prone. > >>>>>>>>>>>>> In addition, I suspect in most of the cases users would want > >>>>>>>>>>>>> to control standby task assignment and leave active task > >> assignment as > >>>>>>>>>>>>> is. > >>>>>>>>>>>>> To make implementation of standby task assignment easier for > >> users, > >>>>>>>>>>>>> what if > >>>>>>>>>>>>> we decouple active and standby task assignment from the > >> `TaskAssignor`? > >>>>>>>>>>>>> Idea I have in mind is to split TaskAssignor into > >> ActiveTaskAssignor > >>>>>>>>>>>>> and StandbyTaskAssignor > >>>>>>>>>>>>> and let users add their own implementation for them > separately > >> if they > >>>>>>>>>>>>> like via config. > >>>>>>>>>>>>> > >>>>>>>>>>>>> If this approach sounds reasonable, I’ll work on updating KIP > >> this week. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>> Levani > >>>>>>>>>>>>> > >>>>>>>>>>>>>> On 28. Jan 2021, at 19:20, John Roesler < > vvcep...@apache.org > >> <mailto:vvcep...@apache.org> <mailto:vvcep...@apache.org <mailto: > >> vvcep...@apache.org>> <mailto:vvcep...@apache.org <mailto: > >> vvcep...@apache.org> <mailto:vvcep...@apache.org <mailto: > >> vvcep...@apache.org <mailto:vvcep...@apache.org>>>> <mailto: > vvcep...@apache.org <mailto:vvcep...@apache.org> <mailto: > >> vvcep...@apache.org <mailto:vvcep...@apache.org>> <mailto: > vvcep...@apache.org <mailto:vvcep...@apache.org> <mailto: > >> vvcep...@apache.org <mailto:vvcep...@apache.org>>> <mailto: > vvcep...@apache.org <mailto:vvcep...@apache.org> <mailto: > >> vvcep...@apache.org <mailto:vvcep...@apache.org>> <mailto: > vvcep...@apache.org <mailto:vvcep...@apache.org> <mailto: > >> vvcep...@apache.org <mailto:vvcep...@apache.org>>>>>> wrote: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Thanks, Levani! > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> I was reflecting more on your KIP last night. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> One thing I should mention is that I have previously used > >>>>>>>>>>>>>> the rack awareness feature of Elasticsearch, and found it to > >>>>>>>>>>>>>> be pretty intuitive and also capable of what we needed in > >>>>>>>>>>>>>> our AWS clusters. As you look at related work, you might > >>>>>>>>>>>>>> take ES into consideration. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> I was also had some thoughts about your proposal. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 1. I'm wondering if we instead allow people to add arbitrary > >>>>>>>>>>>>>> tags to each host, and then have a configuration to specify > >>>>>>>>>>>>>> a combination of tags to use for rack awareness. This seems > >>>>>>>>>>>>>> easier to manage than for the use case you anticipate where > >>>>>>>>>>>>>> people would concatenate rackId = (clusterId + AZ), and then > >>>>>>>>>>>>>> have to parse the rackId back out to compute the assignment. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 2. About the proposed RackAwareStandbyTaskAssignor, I'm > >>>>>>>>>>>>>> wondering if we can change the level of abstraction a little > >>>>>>>>>>>>>> bit and capture even more value here. One thing we wanted to > >>>>>>>>>>>>>> do in KIP-441, but decided to cut from the scope, was to > >>>>>>>>>>>>>> define a public TaskAssignor interface so that people can > >>>>>>>>>>>>>> plug in the whole task assignment algorithm. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> In fact, there is already an internal config and interface > >>>>>>>>>>>>>> for this (`internal.task.assignor.class`: > >>>>>>>>>>>>>> `org.apache.kafka.streams.processor.internals.assignment.Tas > >>>>>>>>>>>>>> kAssignor`). > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> We kept that interface and config internal because the > >>>>>>>>>>>>>> current TaskAssignor interface has a number of flaws, but if > >>>>>>>>>>>>>> we correct those flaws, we can offer a nice public interface > >>>>>>>>>>>>>> that people can use to control the standby allocation, but > >>>>>>>>>>>>>> also active task allocation, based on the tags I suggested > >>>>>>>>>>>>>> in (1). > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> I don't think we need too much work to refactor > >>>>>>>>>>>>>> TaskAssignor, the main problems are that the assign method > >>>>>>>>>>>>>> mutates its input and that it doesn't expose the full > >>>>>>>>>>>>>> metadata from the cluster members. Therefore, if you like > >>>>>>>>>>>>>> this idea, we should propose to refactor TaskAssignor with: > >>>>>>>>>>>>>> * input: an immutable description of the cluster, including > >>>>>>>>>>>>>> current lags of all stateful tasks and current stateless > >>>>>>>>>>>>>> task assignments, as well as metadata for each host. > >>>>>>>>>>>>>> * output: an object describing the new assignment as well as > >>>>>>>>>>>>>> a flag on whether to schedule a followup probing rebalance. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> An even more stretchy stretch goal would be to include > >>>>>>>>>>>>>> metadata of the brokers, which could be used to achieve > >>>>>>>>>>>>>> higher levels of rack awareness. For example, we could co- > >>>>>>>>>>>>>> locate tasks in the same "rack" (AZ) as the partition leader > >>>>>>>>>>>>>> for their input or output topics, to minimize cross-AZ > >>>>>>>>>>>>>> traffic. I'm not sure to what extent clients can learn the > >>>>>>>>>>>>>> relevant broker metadata, so this stretch might not be > >>>>>>>>>>>>>> currently feasible, but as long as we design the > >>>>>>>>>>>>>> TaskAssignor for extensibility, we can do something like > >>>>>>>>>>>>>> this in the future. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Thanks again for this proposal, I hope the above is more > >>>>>>>>>>>>>> inspiring than annoying :) > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> I really think your KIP is super high value in whatever form > >>>>>>>>>>>>>> you ultimately land on. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>> John > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> On Thu, 2021-01-28 at 13:08 +0200, Levani Kokhreidze wrote: > >>>>>>>>>>>>>>> Hi John > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Thanks for the feedback (and for the great work on KIP441 > :) > >> ). > >>>>>>>>>>>>>>> Makes sense, will add a section in the KIP explaining rack > >> awarenesses on high level and how it’s implemented in the different > >> distributed systems. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>>> Levani > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> On 27. Jan 2021, at 16:07, John Roesler < > >> vvcep...@apache.org <mailto:vvcep...@apache.org> <mailto: > >> vvcep...@apache.org <mailto:vvcep...@apache.org>> <mailto: > >> vvcep...@apache.org <mailto:vvcep...@apache.org> <mailto: > >> vvcep...@apache.org <mailto:vvcep...@apache.org> <mailto: > vvcep...@apache.org <mailto:vvcep...@apache.org>>>> <mailto: > >> vvcep...@apache.org <mailto:vvcep...@apache.org> <mailto: > vvcep...@apache.org <mailto:vvcep...@apache.org>> <mailto: > >> vvcep...@apache.org <mailto:vvcep...@apache.org> <mailto: > vvcep...@apache.org <mailto:vvcep...@apache.org>>> <mailto: > >> vvcep...@apache.org <mailto:vvcep...@apache.org> <mailto: > vvcep...@apache.org <mailto:vvcep...@apache.org>> <mailto: > >> vvcep...@apache.org <mailto:vvcep...@apache.org> <mailto: > vvcep...@apache.org <mailto:vvcep...@apache.org>>>>>> wrote: > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Hi Levani, > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Thanks for this KIP! I think this is really high value; it > >> was something I was disappointed I didn’t get to do as part of KIP-441. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Rack awareness is a feature provided by other distributed > >> systems as well. I wonder if your KIP could devote a section to > summarizing > >> what rack awareness looks like in other distributed systems, to help us > put > >> this design in context. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Thanks! > >>>>>>>>>>>>>>>> John > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> On Tue, Jan 26, 2021, at 16:46, Levani Kokhreidze wrote: > >>>>>>>>>>>>>>>>> Hello all, > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> I’d like to start discussion on KIP-708 [1] that aims to > >> introduce rack > >>>>>>>>>>>>>>>>> aware standby task distribution in Kafka Streams. > >>>>>>>>>>>>>>>>> In addition to changes mentioned in the KIP, I’d like to > >> get some ideas > >>>>>>>>>>>>>>>>> on additional change I have in mind. > >>>>>>>>>>>>>>>>> Assuming KIP moves forward, I was wondering if it makes > >> sense to > >>>>>>>>>>>>>>>>> configure Kafka Streams consumer instances with the rack > >> ID passed with > >>>>>>>>>>>>>>>>> the new StreamsConfig#RACK_ID_CONFIG property. > >>>>>>>>>>>>>>>>> In practice, that would mean that when “rack.id < > >> http://rack.id/> <http://rack.id/ <http://rack.id/>> <http://rack.id/ < > >> http://rack.id/> <http://rack.id/ <http://rack.id/>>> <http://rack.id/ > < > >> http://rack.id/> <http://rack.id/ <http://rack.id/>> <http://rack.id/ < > >> http://rack.id/ <http://rack.id/>> <http://rack.id/ <http://rack.id/> < > http://rack.id/ <http://rack.id/>>>>> <http://rack.id/ <http://rack.id/> < > >> http://rack.id/ <http://rack.id/>> <http://rack.id/ <http://rack.id/> < > http://rack.id/ <http://rack.id/>>> <http://rack.id/ <http://rack.id/> < > >> http://rack.id/ <http://rack.id/>> <http://rack.id/ <http://rack.id/> < > http://rack.id/ <http://rack.id/>>>> <http://rack.id/ <http://rack.id/> < > >> http://rack.id/ <http://rack.id/>> <http://rack.id/ <http://rack.id/> < > http://rack.id/ <http://rack.id/>>> <http://rack.id/ <http://rack.id/> < > >> http://rack.id/ <http://rack.id/>> <http://rack.id/ <http://rack.id/> < > http://rack.id/ <http://rack.id/>>>>>>” is > >>>>>>>>>>>>>>>>> configured in Kafka Streams, it will automatically > >> translate into > >>>>>>>>>>>>>>>>> ConsumerConfig#CLIENT_RACK_ID config for all the > >> KafkaConsumer clients > >>>>>>>>>>>>>>>>> that is used by Kafka Streams internally. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> [1] > >>>>>>>>>>>>>>>>> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor > >> < > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor > > > >> < > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor > >> < > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor > >>>> < > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor > >> < > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor > > > >> < > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor > >> < > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor > >>> > >> < > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor > < > https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor > > > >> < > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor > < > https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor > >> > >> < > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor > < > https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor > > > >> < > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor > < > https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor > > > >>>> < > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor > < > https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor > > > >> < > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor > < > https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor > >> > >> < > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor > < > https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor > > > >> < > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor > < > https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor > >>>>> > >> < > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor > >> < > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor > > > >> < > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor > >> < > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor > >>>> < > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor > >> < > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor > > > >> < > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor > >> < > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor > >>> > >> < > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor > < > https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor > > > >> < > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor > < > https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor > >> > >> < > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor > < > https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor > > > >> < > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor > < > https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor > > > >>>> < > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor > < > https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor > > > >> < > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor > < > https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor > >> > >> < > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor > < > https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor > > > >> < > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor > < > https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor > > > >>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> P.S > >>>>>>>>>>>>>>>>> I have draft PR ready, if it helps the discussion moving > >> forward, I can > >>>>>>>>>>>>>>>>> provide the draft PR link in this thread. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Regards, > >>>>>>>>>>>>>>>>> Levani > >