Hi Bruno,

Thanks for the feedback. I think it makes sense.
I’ve updated the KIP [1] and tried to omit implementation details around the 
algorithm.

Please let me know if the latest version looks OK.

Regards,
Levani


[1] 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+awarness+for+Kafka+Streams

> On 25. Feb 2021, at 17:59, Bruno Cadonna <br...@confluent.io> wrote:
> 
> Hi Levani,
> 
> I discussed your KIP with John the other day and we both think it is a really 
> interesting KIP and you did a good job in writing it. However, we think that 
> the KIP exposes to many implementation details. That makes future changes to 
> the implementation of the distribution algorithm harder without a KIP. So, we 
> would like to propose to just describe the config and the properties that any 
> implementation of the distribution algorithm should have. We did something 
> similar in KIP-441 for the task assignment algorithm [1].
> 
> Specifically, for your KIP, any possible implementation of the distribution 
> algorithm should read the tags to be considered for rack awareness from the 
> config and if the cluster allows to distribute each active task and its 
> replicas to Streams clients with different values for each tag, the algorithm 
> will do so. How the implementation behaves, if a cluster does not allow to 
> distribute over all tag values can be left as an implementation detail. This 
> would give us flexibility for future changes to the distribution algorithm.
> 
> Since there may be distribution algorithms that do not use the order of the 
> tags, it would be better to not mention the order of the tags in the config 
> doc. I would propose to omit the config doc from the KIP or formulate it 
> really generic.
> 
> We would also like to rename standby.replicas.awareness to 
> task.assignment.rack.awareness or something that does not contain standby 
> and/or replica (sorry for requesting again to change this name). That way, we 
> might be able to use this config also when we decide to make the active task 
> assignment rack aware.
> 
> I hope all of this makes sense to you.
> 
> Thank you again for the interesting KIP!
> 
> Looking forward to your implementation!
> 
> Best,
> Bruno
> 
> 
> [1] 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams#KIP441:SmoothScalingOutforKafkaStreams-AssignmentAlgorithm
>  
> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams#KIP441:SmoothScalingOutforKafkaStreams-AssignmentAlgorithm>
> 
> On 22.02.21 17:42, Levani Kokhreidze wrote:
>> Hi Bruno,
>> Thanks for the quick reply
>> 5.Sorry, maybe I am not making it clear.
>> What you have described is how it should work, yes. As it is stated in KIP, 
>> with the importance semantics in standby.replicas.awareness,
>> if we have an active task on Node-1 and the first standby task on Node-5, 
>> the third standby should be on Node-3 or Node-6 (both of them are in the 
>> different AZ compared to the active and the first standby task).
>> That flow is described in Partially Preferred Distribution section [1].
>> Node-4 could have been a valid option IF standby.replicas.awareness didn’t 
>> have the importance semantics because Kafka Streams could have just picked 
>> Node-4 in that case.
>> 7. Yup, will do.
>> 8. Good question, So far assumption I had was that the configuration between 
>> different Kafka Streams instances is the same. Can we have an extra 
>> validation check to make sure that is the case?
>> If not, what you have mentioned in point 5 always preferring the dimension 
>> where there’re enough KS instances is very valid.
>> On the other hand, I thought allowing to specify the importance of the 
>> various dimensions may give users extra flexibility over standby task 
>> allocation.
>> [1] 
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+awarness+for+Kafka+Streams#KIP708:RackawarnessforKafkaStreams-PartiallyPreferredStandbyTaskDistribution
>> Regards,
>> Levani
>>> On 22. Feb 2021, at 16:51, Bruno Cadonna <br...@confluent.io> wrote:
>>> 
>>> Hi Levani,
>>> 
>>> Thanks for the modifications!
>>> 
>>> I have some follow up questions/comments:
>>> 
>>> 5. Something is not clear to me. If the active is on Node-1 and the first 
>>> replica is on Node-5 (different cluster, different zone), why would the 
>>> second replica go to Node-4 that has a different cluster than but the same 
>>> zone as the active instead of Node-6 which has a different zone of Node-1? 
>>> In general wouldn't it be better to guarantee under Partially Preferred 
>>> task distribution to distribute active and standby replicas of the same 
>>> task over the dimension that has at least as many values as the number of 
>>> replicas + 1 and then over the dimensions that have less values? That would 
>>> then also be independent on the ordering of the tags.
>>> 
>>> 7. I agree with you. Could you add a sentence or two about this to the KIP?
>>> 
>>> New question:
>>> 
>>> 8. How would the assignor react on different numbers and different 
>>> orderings of the tags in standby.replicas.awareness across Streams clients?
>>> 
>>> Best,
>>> Bruno
>>> 
>>> 
>>> On 22.02.21 11:46, Levani Kokhreidze wrote:
>>>> Hi Bruno,
>>>> Thanks for the feedback. Please check my answers below:
>>>> 1. No objections; sounds good. Updated KIP
>>>> 2. No objections; sounds good. Updated KIP
>>>> 3. Thanks for the information; I can change KIP only to expose prefix 
>>>> method instead of a constant if it’s the way forward.
>>>> 4. Done. Updated KIP
>>>> 5. Yes, order in standby.replicas.awareness config counts as stated in the 
>>>> STANDBY_REPLICA_AWARENESS_DOC.
>>>> Actually, it plays a role in Partially Preferred distribution. In the 
>>>> example presented in the KIP, while one of the standby tasks can be placed 
>>>> in a different cluster and different zone compared to the active task, we 
>>>> have to choose either the same cluster or the same zone for the second 
>>>> standby task. In the first example presented in the KIP, while Node-5 is 
>>>> in the other cluster and other zone compared to the active task, the 
>>>> second standby task's preferred options are in different zones than Node-1 
>>>> and Node-5, but in the same cluster as active task or the first standby 
>>>> task. Without importance semantics in standby.replicas.awareness, putting 
>>>> second standby task in Node-4 (different cluster, same zone as active 
>>>> task) would have been a valid option.
>>>> I’ve updated KIP to clarify this a bit more, I hope this helps.
>>>> 6. Thanks for pointing that out, it was a mistake. I’ve removed that 
>>>> phrase from the KIP.
>>>> 7. It shouldn’t affect HighAvailabilityTaskAssignor in a “breaking way” 
>>>> meaning that all the existing behavior should stay as is (e.g., when new 
>>>> configurations are not specified). Once required configurations are set, 
>>>> the main change should happen in 
>>>> HighAvailabilityTaskAssignor#assignStandbyReplicaTasks and 
>>>> HighAvailabilityTaskAssignor#assignStandbyTaskMovements
>>>> I hope this answers your questions.
>>>> Regards,
>>>> Levani
>>>>> On 18. Feb 2021, at 15:10, Bruno Cadonna <br...@confluent.io> wrote:
>>>>> 
>>>>> Hi Levani,
>>>>> 
>>>>> Thank you for the KIP.
>>>>> 
>>>>> Really interesting!
>>>>> 
>>>>> Here my comments:
>>>>> 
>>>>> 1. To be consistent with the other configs that involve standbys , I 
>>>>> would rename
>>>>> standby.task.assignment.awareness -> standby.replicas.awareness
>>>>> 
>>>>> 2. I would also rename the prefix
>>>>> instance.tag -> client.tag
>>>>> 
>>>>> 3. The following is a question about prefixes in general that maybe 
>>>>> somebody else can answer. In the config it says for other prefixes that 
>>>>> it is recommended to use the method *Prefix(final String prop) instead of 
>>>>> the raw prefix string.
>>>>> 
>>>>> Is the plan to make the raw prefix string private in a future release?
>>>>> Should we consider making only the prefix method for this KIP public?
>>>>> 
>>>>> 4. Could you provide a mathematical formula instead of Java code for 
>>>>> absolute preferred standby task distribution and the other distributtion 
>>>>> properties? Could you also add an example for absolute preffered 
>>>>> distribution for the computation of the formula similar to what you did 
>>>>> for the other properties?
>>>>> 
>>>>> 5. Does the order of the tags given for standby.task.assignment.awareness 
>>>>> count? You mention it once, but then for the Partially Preferred standby 
>>>>> task distribution property it does not seem to be important.
>>>>> 
>>>>> 6. In the section about least preferred standby task distribution, you 
>>>>> state that "and one [zone] will be reserved for active task". What do you 
>>>>> mean by that? All Streams clients will participate in the task assignment 
>>>>> of active tasks irrespective of their tags, right? The statement does 
>>>>> also not really fit with the example where active stateful task 0_0 is on 
>>>>> Node-1, does it?
>>>>> 
>>>>> 7. Could you also say some words about how this KIP affects the current 
>>>>> HighAvailabilityTaskAssignor?
>>>>> 
>>>>> 
>>>>> Best,
>>>>> Bruno
>>>>> 
>>>>> On 09.02.21 15:54, Levani Kokhreidze wrote:
>>>>>> Hello all,
>>>>>> I’ve updated KIP-708 [1] to reflect the latest discussion outcomes.
>>>>>> I’m looking forward to your feedback.
>>>>>> Regards,
>>>>>> Levani
>>>>>> [1] - 
>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+awarness+for+Kafka+Streams
>>>>>>> On 2. Feb 2021, at 22:03, Levani Kokhreidze <levani.co...@gmail.com> 
>>>>>>> wrote:
>>>>>>> 
>>>>>>> Hi John.
>>>>>>> 
>>>>>>> Thanks a lot for this detailed analysis!
>>>>>>> Yes, that is what I had in mind as well.
>>>>>>> I also like that idea of having “task.assignment.awareness” 
>>>>>>> configuration
>>>>>>> to tell which instance tags can be used for rack awareness.
>>>>>>> I may borrow it for this KIP if you don’t mind :)
>>>>>>> 
>>>>>>> Thanks again John for this discussion, it’s really valuable.
>>>>>>> 
>>>>>>> I’ll update the proposal and share it once again in this discussion 
>>>>>>> thread.
>>>>>>> 
>>>>>>> Regards,
>>>>>>> Levani
>>>>>>> 
>>>>>>>> On 2. Feb 2021, at 18:47, John Roesler <vvcep...@apache.org 
>>>>>>>> <mailto:vvcep...@apache.org> <mailto:vvcep...@apache.org 
>>>>>>>> <mailto:vvcep...@apache.org>> <mailto:vvcep...@apache.org 
>>>>>>>> <mailto:vvcep...@apache.org> <mailto:vvcep...@apache.org 
>>>>>>>> <mailto:vvcep...@apache.org>>> <mailto:vvcep...@apache.org 
>>>>>>>> <mailto:vvcep...@apache.org> <mailto:vvcep...@apache.org 
>>>>>>>> <mailto:vvcep...@apache.org>> <mailto:vvcep...@apache.org 
>>>>>>>> <mailto:vvcep...@apache.org> <mailto:vvcep...@apache.org 
>>>>>>>> <mailto:vvcep...@apache.org>>>>> wrote:
>>>>>>>> 
>>>>>>>> Hi Levani,
>>>>>>>> 
>>>>>>>> 1. Thanks for the details.
>>>>>>>> 
>>>>>>>> I figured it must be something like this two-dimensional definition of 
>>>>>>>> "rack".
>>>>>>>> 
>>>>>>>> It does seem like, if we make the config take a list of tags, we can 
>>>>>>>> define
>>>>>>>> the semantics to be that the system will make a best effort to 
>>>>>>>> distribute
>>>>>>>> the standbys over each rack dimension.
>>>>>>>> 
>>>>>>>> In your example, there are two clusters and three AZs. The example
>>>>>>>> configs would be:
>>>>>>>> 
>>>>>>>> Node 1:
>>>>>>>> instance.tag.cluster: K8s_Cluster1
>>>>>>>> instance.tag.zone: eu-central-1a
>>>>>>>> task.assignment.awareness: cluster,zone
>>>>>>>> 
>>>>>>>> Node 2:
>>>>>>>> instance.tag.cluster: K8s_Cluster1
>>>>>>>> instance.tag.zone: eu-central-1b
>>>>>>>> task.assignment.awareness: cluster,zone
>>>>>>>> 
>>>>>>>> Node 3:
>>>>>>>> instance.tag.cluster: K8s_Cluster1
>>>>>>>> instance.tag.zone: eu-central-1c
>>>>>>>> task.assignment.awareness: cluster,zone
>>>>>>>> 
>>>>>>>> Node 4:
>>>>>>>> instance.tag.cluster: K8s_Cluster2
>>>>>>>> instance.tag.zone: eu-central-1a
>>>>>>>> task.assignment.awareness: cluster,zone
>>>>>>>> 
>>>>>>>> Node 5:
>>>>>>>> instance.tag.cluster: K8s_Cluster2
>>>>>>>> instance.tag.zone: eu-central-1b
>>>>>>>> task.assignment.awareness: cluster,zone
>>>>>>>> 
>>>>>>>> Node 6:
>>>>>>>> instance.tag.cluster: K8s_Cluster2
>>>>>>>> instance.tag.zone: eu-central-1c
>>>>>>>> task.assignment.awareness: cluster,zone
>>>>>>>> 
>>>>>>>> 
>>>>>>>> Now, if we have a task 0_0 with an active and two replicas,
>>>>>>>> there are three total copies of the task to distribute over:
>>>>>>>> * 6 instances
>>>>>>>> * 2 clusters
>>>>>>>> * 3 zones
>>>>>>>> 
>>>>>>>> There is a constraint that we _cannot_ assign two copies of a task
>>>>>>>> to a single instance, but it seems like the default rack awareness
>>>>>>>> would permit us to assign two copies of a task to a rack, if (and only
>>>>>>>> if) the number of copies is greater than the number of racks.
>>>>>>>> 
>>>>>>>> So, the assignment we would get is like this:
>>>>>>>> * assigned to three different instances
>>>>>>>> * one copy in each of zone a, b, and c
>>>>>>>> * two copies in one cluster and one in the other cluster
>>>>>>>> 
>>>>>>>> For example, we might have 0_0 assigned to:
>>>>>>>> * Node 1 (cluster 1, zone a)
>>>>>>>> * Node 5 (cluster 2, zone b)
>>>>>>>> * Node 3 (cluster 1, zone c)
>>>>>>>> 
>>>>>>>> Is that what you were also thinking?
>>>>>>>> 
>>>>>>>> Thanks,
>>>>>>>> -John
>>>>>>>> 
>>>>>>>> On Tue, Feb 2, 2021, at 02:24, Levani Kokhreidze wrote:
>>>>>>>>> Hi John,
>>>>>>>>> 
>>>>>>>>> 1. Main reason was that it seemed easier change compared to having
>>>>>>>>> multiple tags assigned to each host.
>>>>>>>>> 
>>>>>>>>> ---
>>>>>>>>> 
>>>>>>>>> Answering your question what use-case I have in mind:
>>>>>>>>> Lets say we have two Kubernetes clusters running the same Kafka 
>>>>>>>>> Streams
>>>>>>>>> application.
>>>>>>>>> And each Kubernetes cluster is spanned across multiple AZ.
>>>>>>>>> So the setup overall looks something like this:
>>>>>>>>> 
>>>>>>>>> K8s_Cluster1 [eu-central-1a, eu-central-1b, eu-central-1c]
>>>>>>>>> K8s_Cluster2 [eu-central-1a, eu-central-1b, eu-central-1c]
>>>>>>>>> 
>>>>>>>>> Now, if Kafka Streams application is launched in K8s_Clister1:
>>>>>>>>> eu-central-1a,
>>>>>>>>> ideally I would want standby task to be created in the different K8s
>>>>>>>>> cluster and region.
>>>>>>>>> So in this example it can be K8s_Cluster2: [eu-central-1b,
>>>>>>>>> eu-central-1c]
>>>>>>>>> 
>>>>>>>>> But giving it a bit more thought, this can be implemented if we change
>>>>>>>>> semantics of “tags” a bit.
>>>>>>>>> So instead of doing full match with tags, we can do iterative matching
>>>>>>>>> and it should work.
>>>>>>>>> (If this is what you had in mind, apologies for the misunderstanding).
>>>>>>>>> 
>>>>>>>>> If we consider the same example as mentioned above, for the active 
>>>>>>>>> task
>>>>>>>>> we would
>>>>>>>>> have following tags: [K8s_Cluster1, eu-central-1a]. In order to
>>>>>>>>> distribute standby task
>>>>>>>>> in the different K8s cluster, plus in the different AWS region, 
>>>>>>>>> standby
>>>>>>>>> task assignment
>>>>>>>>> algorithm can compare each tag by index. So steps would be something
>>>>>>>>> like:
>>>>>>>>> 
>>>>>>>>> // this will result in selecting client in the different K8s cluster
>>>>>>>>> 1. clientsInDifferentCluster = (tagsOfActiveTask[0] != 
>>>>>>>>> allClientTags[0])
>>>>>>>>> // this will result in selecting the client in different AWS region
>>>>>>>>> 2. selectedClientForStandbyTask = (tagsOfActiveTask[1] !=
>>>>>>>>> clientsInDifferentCluster[1] )
>>>>>>>>> 
>>>>>>>>> WDYT?
>>>>>>>>> 
>>>>>>>>> If you agree with the use-case I’ve mentioned, the pluggable assignor
>>>>>>>>> can be differed to another KIP, yes.
>>>>>>>>> As it won’t be required for this KIP and use-cases I had in mind to
>>>>>>>>> work.
>>>>>>>>> 
>>>>>>>>> Regards,
>>>>>>>>> Levani
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>>> On 2. Feb 2021, at 07:55, John Roesler <vvcep...@apache.org 
>>>>>>>>>> <mailto:vvcep...@apache.org> <mailto:vvcep...@apache.org 
>>>>>>>>>> <mailto:vvcep...@apache.org>> <mailto:vvcep...@apache.org 
>>>>>>>>>> <mailto:vvcep...@apache.org> <mailto:vvcep...@apache.org 
>>>>>>>>>> <mailto:vvcep...@apache.org>>> <mailto:vvcep...@apache.org 
>>>>>>>>>> <mailto:vvcep...@apache.org> <mailto:vvcep...@apache.org 
>>>>>>>>>> <mailto:vvcep...@apache.org>> <mailto:vvcep...@apache.org 
>>>>>>>>>> <mailto:vvcep...@apache.org> <mailto:vvcep...@apache.org 
>>>>>>>>>> <mailto:vvcep...@apache.org>>>>> wrote:
>>>>>>>>>> 
>>>>>>>>>> Hello Levani,
>>>>>>>>>> 
>>>>>>>>>> Thanks for the reply.
>>>>>>>>>> 
>>>>>>>>>> 1. Interesting; why did you change your mind?
>>>>>>>>>> 
>>>>>>>>>> I have a gut feeling that we can achieve pretty much any rack 
>>>>>>>>>> awareness need that people have by using purely config, which is 
>>>>>>>>>> obviously much easier to use. But if you had a case in mind where 
>>>>>>>>>> this wouldn’t work, it would be good to know.
>>>>>>>>>> 
>>>>>>>>>> In fact, if that is true, then perhaps you could just defer the 
>>>>>>>>>> whole idea of a pluggable interface (point 2) to a separate KIP. I 
>>>>>>>>>> do think a pluggable assignor would be extremely valuable, but it 
>>>>>>>>>> might be nice to cut the scope of KIP-708 if just a config will 
>>>>>>>>>> suffice.
>>>>>>>>>> 
>>>>>>>>>> What do you think?
>>>>>>>>>> Thanks,
>>>>>>>>>> John
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> On Mon, Feb 1, 2021, at 06:07, Levani Kokhreidze wrote:
>>>>>>>>>>> Hi John,
>>>>>>>>>>> 
>>>>>>>>>>> Thanks a lot for thorough feedback, it’s really valuable.
>>>>>>>>>>> 
>>>>>>>>>>> 1. Agree with this. Had the same idea initially.
>>>>>>>>>>> We can set some upper limit in terms of what’s
>>>>>>>>>>> the max number of tags users can set to make
>>>>>>>>>>> sure it’s not overused. By default, we can create
>>>>>>>>>>> standby tasks where tags are different from active task (full 
>>>>>>>>>>> match).
>>>>>>>>>>> This should mimic default rack awareness behaviour.
>>>>>>>>>>> 
>>>>>>>>>>> 2. I like the idea and I’d be happy to work on
>>>>>>>>>>> refactoring TaskAssignor to accommodate rack awareness use-case.
>>>>>>>>>>> When I was going through the code, it felt way more natural
>>>>>>>>>>> to use pluggable TaskAssignor for achieving rack awareness
>>>>>>>>>>> instead of introducing new interface and contract.
>>>>>>>>>>> But I thought approach mentioned in the KIP is simpler so
>>>>>>>>>>> decided to move forward with it as an initial proposal :).
>>>>>>>>>>> But I agree with you, it will be much better if we can have
>>>>>>>>>>> TaskAssignor as pluggable interface users can use.
>>>>>>>>>>> One potential challenge I see with this is that, if we just let
>>>>>>>>>>> users implement TaskAssignor in its current form, we will be forcing
>>>>>>>>>>> users to implement functionality for active task assignment, as 
>>>>>>>>>>> well as
>>>>>>>>>>> standby task assignment. This feels like not very clear contract,
>>>>>>>>>>> because with
>>>>>>>>>>> just TaskAssignor interface it’s not really clear they one needs to
>>>>>>>>>>> allocate
>>>>>>>>>>> standby tasks as well. We can enforce it on some level with the 
>>>>>>>>>>> return
>>>>>>>>>>> object
>>>>>>>>>>> You’ve mentioned TaskAssignor#assign has to return, but still feels
>>>>>>>>>>> error prone.
>>>>>>>>>>> In addition, I suspect in most of the cases users would want
>>>>>>>>>>> to control standby task assignment and leave active task assignment 
>>>>>>>>>>> as
>>>>>>>>>>> is.
>>>>>>>>>>> To make implementation of standby task assignment easier for users,
>>>>>>>>>>> what if
>>>>>>>>>>> we decouple active and standby task assignment from the 
>>>>>>>>>>> `TaskAssignor`?
>>>>>>>>>>> Idea I have in mind is to split TaskAssignor into ActiveTaskAssignor
>>>>>>>>>>> and StandbyTaskAssignor
>>>>>>>>>>> and let users add their own implementation for them separately if 
>>>>>>>>>>> they
>>>>>>>>>>> like via config.
>>>>>>>>>>> 
>>>>>>>>>>> If this approach sounds reasonable, I’ll work on updating KIP this 
>>>>>>>>>>> week.
>>>>>>>>>>> 
>>>>>>>>>>> Thanks,
>>>>>>>>>>> Levani
>>>>>>>>>>> 
>>>>>>>>>>>> On 28. Jan 2021, at 19:20, John Roesler <vvcep...@apache.org 
>>>>>>>>>>>> <mailto:vvcep...@apache.org> <mailto:vvcep...@apache.org 
>>>>>>>>>>>> <mailto:vvcep...@apache.org>> <mailto:vvcep...@apache.org 
>>>>>>>>>>>> <mailto:vvcep...@apache.org> <mailto:vvcep...@apache.org 
>>>>>>>>>>>> <mailto:vvcep...@apache.org>>> <mailto:vvcep...@apache.org 
>>>>>>>>>>>> <mailto:vvcep...@apache.org> <mailto:vvcep...@apache.org 
>>>>>>>>>>>> <mailto:vvcep...@apache.org>> <mailto:vvcep...@apache.org 
>>>>>>>>>>>> <mailto:vvcep...@apache.org> <mailto:vvcep...@apache.org 
>>>>>>>>>>>> <mailto:vvcep...@apache.org>>>>> wrote:
>>>>>>>>>>>> 
>>>>>>>>>>>> Thanks, Levani!
>>>>>>>>>>>> 
>>>>>>>>>>>> I was reflecting more on your KIP last night.
>>>>>>>>>>>> 
>>>>>>>>>>>> One thing I should mention is that I have previously used
>>>>>>>>>>>> the rack awareness feature of Elasticsearch, and found it to
>>>>>>>>>>>> be pretty intuitive and also capable of what we needed in
>>>>>>>>>>>> our AWS clusters. As you look at related work, you might
>>>>>>>>>>>> take ES into consideration.
>>>>>>>>>>>> 
>>>>>>>>>>>> I was also had some thoughts about your proposal.
>>>>>>>>>>>> 
>>>>>>>>>>>> 1. I'm wondering if we instead allow people to add arbitrary
>>>>>>>>>>>> tags to each host, and then have a configuration to specify
>>>>>>>>>>>> a combination of tags to use for rack awareness. This seems
>>>>>>>>>>>> easier to manage than for the use case you anticipate where
>>>>>>>>>>>> people would concatenate rackId = (clusterId + AZ), and then
>>>>>>>>>>>> have to parse the rackId back out to compute the assignment.
>>>>>>>>>>>> 
>>>>>>>>>>>> 2. About the proposed RackAwareStandbyTaskAssignor, I'm
>>>>>>>>>>>> wondering if we can change the level of abstraction a little
>>>>>>>>>>>> bit and capture even more value here. One thing we wanted to
>>>>>>>>>>>> do in KIP-441, but decided to cut from the scope, was to
>>>>>>>>>>>> define a public TaskAssignor interface so that people can
>>>>>>>>>>>> plug in the whole task assignment algorithm.
>>>>>>>>>>>> 
>>>>>>>>>>>> In fact, there is already an internal config and interface
>>>>>>>>>>>> for this (`internal.task.assignor.class`:
>>>>>>>>>>>> `org.apache.kafka.streams.processor.internals.assignment.Tas
>>>>>>>>>>>> kAssignor`).
>>>>>>>>>>>> 
>>>>>>>>>>>> We kept that interface and config internal because the
>>>>>>>>>>>> current TaskAssignor interface has a number of flaws, but if
>>>>>>>>>>>> we correct those flaws, we can offer a nice public interface
>>>>>>>>>>>> that people can use to control the standby allocation, but
>>>>>>>>>>>> also active task allocation, based on the tags I suggested
>>>>>>>>>>>> in (1).
>>>>>>>>>>>> 
>>>>>>>>>>>> I don't think we need too much work to refactor
>>>>>>>>>>>> TaskAssignor, the main problems are that the assign method
>>>>>>>>>>>> mutates its input and that it doesn't expose the full
>>>>>>>>>>>> metadata from the cluster members. Therefore, if you like
>>>>>>>>>>>> this idea, we should propose to refactor TaskAssignor with:
>>>>>>>>>>>> * input: an immutable description of the cluster, including
>>>>>>>>>>>> current lags of all stateful tasks and current stateless
>>>>>>>>>>>> task assignments, as well as metadata for each host.
>>>>>>>>>>>> * output: an object describing the new assignment as well as
>>>>>>>>>>>> a flag on whether to schedule a followup probing rebalance.
>>>>>>>>>>>> 
>>>>>>>>>>>> An even more stretchy stretch goal would be to include
>>>>>>>>>>>> metadata of the brokers, which could be used to achieve
>>>>>>>>>>>> higher levels of rack awareness. For example, we could co-
>>>>>>>>>>>> locate tasks in the same "rack" (AZ) as the partition leader
>>>>>>>>>>>> for their input or output topics, to minimize cross-AZ
>>>>>>>>>>>> traffic. I'm not sure to what extent clients can learn the
>>>>>>>>>>>> relevant broker metadata, so this stretch might not be
>>>>>>>>>>>> currently feasible, but as long as we design the
>>>>>>>>>>>> TaskAssignor for extensibility, we can do something like
>>>>>>>>>>>> this in the future.
>>>>>>>>>>>> 
>>>>>>>>>>>> Thanks again for this proposal, I hope the above is more
>>>>>>>>>>>> inspiring than annoying :)
>>>>>>>>>>>> 
>>>>>>>>>>>> I really think your KIP is super high value in whatever form
>>>>>>>>>>>> you ultimately land on.
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> John
>>>>>>>>>>>> 
>>>>>>>>>>>> On Thu, 2021-01-28 at 13:08 +0200, Levani Kokhreidze wrote:
>>>>>>>>>>>>> Hi John
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Thanks for the feedback (and for the great work on KIP441 :) ).
>>>>>>>>>>>>> Makes sense, will add a section in the KIP explaining rack 
>>>>>>>>>>>>> awarenesses on high level and how it’s implemented in the 
>>>>>>>>>>>>> different distributed systems.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>> Levani
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> On 27. Jan 2021, at 16:07, John Roesler <vvcep...@apache.org 
>>>>>>>>>>>>>> <mailto:vvcep...@apache.org> <mailto:vvcep...@apache.org 
>>>>>>>>>>>>>> <mailto:vvcep...@apache.org>> <mailto:vvcep...@apache.org 
>>>>>>>>>>>>>> <mailto:vvcep...@apache.org> <mailto:vvcep...@apache.org 
>>>>>>>>>>>>>> <mailto:vvcep...@apache.org>>> <mailto:vvcep...@apache.org 
>>>>>>>>>>>>>> <mailto:vvcep...@apache.org> <mailto:vvcep...@apache.org 
>>>>>>>>>>>>>> <mailto:vvcep...@apache.org>> <mailto:vvcep...@apache.org 
>>>>>>>>>>>>>> <mailto:vvcep...@apache.org> <mailto:vvcep...@apache.org 
>>>>>>>>>>>>>> <mailto:vvcep...@apache.org>>>>> wrote:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Hi Levani,
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Thanks for this KIP! I think this is really high value; it was 
>>>>>>>>>>>>>> something I was disappointed I didn’t get to do as part of 
>>>>>>>>>>>>>> KIP-441.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Rack awareness is a feature provided by other distributed 
>>>>>>>>>>>>>> systems as well. I wonder if your KIP could devote a section to 
>>>>>>>>>>>>>> summarizing what rack awareness looks like in other distributed 
>>>>>>>>>>>>>> systems, to help us put this design in context.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Thanks!
>>>>>>>>>>>>>> John
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> On Tue, Jan 26, 2021, at 16:46, Levani Kokhreidze wrote:
>>>>>>>>>>>>>>> Hello all,
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> I’d like to start discussion on KIP-708 [1] that aims to 
>>>>>>>>>>>>>>> introduce rack
>>>>>>>>>>>>>>> aware standby task distribution in Kafka Streams.
>>>>>>>>>>>>>>> In addition to changes mentioned in the KIP, I’d like to get 
>>>>>>>>>>>>>>> some ideas
>>>>>>>>>>>>>>> on additional change I have in mind.
>>>>>>>>>>>>>>> Assuming KIP moves forward, I was wondering if it makes sense to
>>>>>>>>>>>>>>> configure Kafka Streams consumer instances with the rack ID 
>>>>>>>>>>>>>>> passed with
>>>>>>>>>>>>>>> the new StreamsConfig#RACK_ID_CONFIG property.
>>>>>>>>>>>>>>> In practice, that would mean that when “rack.id 
>>>>>>>>>>>>>>> <http://rack.id/> <http://rack.id/ <http://rack.id/>> 
>>>>>>>>>>>>>>> <http://rack.id/ <http://rack.id/> <http://rack.id/ 
>>>>>>>>>>>>>>> <http://rack.id/>>> <http://rack.id/ <http://rack.id/> 
>>>>>>>>>>>>>>> <http://rack.id/ <http://rack.id/>> <http://rack.id/ 
>>>>>>>>>>>>>>> <http://rack.id/> <http://rack.id/ <http://rack.id/>>>> 
>>>>>>>>>>>>>>> <http://rack.id/ <http://rack.id/> <http://rack.id/ 
>>>>>>>>>>>>>>> <http://rack.id/>> <http://rack.id/ <http://rack.id/> 
>>>>>>>>>>>>>>> <http://rack.id/ <http://rack.id/>>> <http://rack.id/ 
>>>>>>>>>>>>>>> <http://rack.id/> <http://rack.id/ <http://rack.id/>> 
>>>>>>>>>>>>>>> <http://rack.id/ <http://rack.id/> <http://rack.id/ 
>>>>>>>>>>>>>>> <http://rack.id/>>>>>” is
>>>>>>>>>>>>>>> configured in Kafka Streams, it will automatically translate 
>>>>>>>>>>>>>>> into
>>>>>>>>>>>>>>> ConsumerConfig#CLIENT_RACK_ID config for all the KafkaConsumer 
>>>>>>>>>>>>>>> clients
>>>>>>>>>>>>>>> that is used by Kafka Streams internally.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor
>>>>>>>>>>>>>>>  
>>>>>>>>>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor>
>>>>>>>>>>>>>>>  
>>>>>>>>>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor
>>>>>>>>>>>>>>>  
>>>>>>>>>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor>><https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor
>>>>>>>>>>>>>>>  
>>>>>>>>>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor>
>>>>>>>>>>>>>>>  
>>>>>>>>>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor
>>>>>>>>>>>>>>>  
>>>>>>>>>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor>>>
>>>>>>>>>>>>>>>  
>>>>>>>>>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor
>>>>>>>>>>>>>>>  
>>>>>>>>>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor>
>>>>>>>>>>>>>>>  
>>>>>>>>>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor
>>>>>>>>>>>>>>>  
>>>>>>>>>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor>><https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor
>>>>>>>>>>>>>>>  
>>>>>>>>>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor>
>>>>>>>>>>>>>>>  
>>>>>>>>>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor
>>>>>>>>>>>>>>>  
>>>>>>>>>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor>>>>
>>>>>>>>>>>>>>>  
>>>>>>>>>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor
>>>>>>>>>>>>>>>  
>>>>>>>>>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor>
>>>>>>>>>>>>>>>  
>>>>>>>>>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor
>>>>>>>>>>>>>>>  
>>>>>>>>>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor>><https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor
>>>>>>>>>>>>>>>  
>>>>>>>>>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor>
>>>>>>>>>>>>>>>  
>>>>>>>>>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor
>>>>>>>>>>>>>>>  
>>>>>>>>>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor>>>
>>>>>>>>>>>>>>>  
>>>>>>>>>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor
>>>>>>>>>>>>>>>  
>>>>>>>>>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor>
>>>>>>>>>>>>>>>  
>>>>>>>>>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor
>>>>>>>>>>>>>>>  
>>>>>>>>>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor>><https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor
>>>>>>>>>>>>>>>  
>>>>>>>>>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor>
>>>>>>>>>>>>>>>  
>>>>>>>>>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor
>>>>>>>>>>>>>>>  
>>>>>>>>>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor>>>>>
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> P.S
>>>>>>>>>>>>>>> I have draft PR ready, if it helps the discussion moving 
>>>>>>>>>>>>>>> forward, I can
>>>>>>>>>>>>>>> provide the draft PR link in this thread.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>> Levani

Reply via email to