Hi John.

Thanks a lot for this detailed analysis! 
Yes, that is what I had in mind as well. 
I also like that idea of having “task.assignment.awareness” configuration
to tell which instance tags can be used for rack awareness.
I may borrow it for this KIP if you don’t mind :) 

Thanks again John for this discussion, it’s really valuable.

I’ll update the proposal and share it once again in this discussion thread.

Regards,
Levani 

> On 2. Feb 2021, at 18:47, John Roesler <vvcep...@apache.org> wrote:
> 
> Hi Levani,
> 
> 1. Thanks for the details.
> 
> I figured it must be something like this two-dimensional definition of "rack".
> 
> It does seem like, if we make the config take a list of tags, we can define
> the semantics to be that the system will make a best effort to distribute
> the standbys over each rack dimension.
> 
> In your example, there are two clusters and three AZs. The example
> configs would be:
> 
> Node 1:
> instance.tag.cluster: K8s_Cluster1
> instance.tag.zone: eu-central-1a
> task.assignment.awareness: cluster,zone
> 
> Node 2:
> instance.tag.cluster: K8s_Cluster1
> instance.tag.zone: eu-central-1b
> task.assignment.awareness: cluster,zone
> 
> Node 3:
> instance.tag.cluster: K8s_Cluster1
> instance.tag.zone: eu-central-1c
> task.assignment.awareness: cluster,zone
> 
> Node 4:
> instance.tag.cluster: K8s_Cluster2
> instance.tag.zone: eu-central-1a
> task.assignment.awareness: cluster,zone
> 
> Node 5:
> instance.tag.cluster: K8s_Cluster2
> instance.tag.zone: eu-central-1b
> task.assignment.awareness: cluster,zone
> 
> Node 6:
> instance.tag.cluster: K8s_Cluster2
> instance.tag.zone: eu-central-1c
> task.assignment.awareness: cluster,zone
> 
> 
> Now, if we have a task 0_0 with an active and two replicas,
> there are three total copies of the task to distribute over:
> * 6 instances
> * 2 clusters
> * 3 zones
> 
> There is a constraint that we _cannot_ assign two copies of a task
> to a single instance, but it seems like the default rack awareness
> would permit us to assign two copies of a task to a rack, if (and only
> if) the number of copies is greater than the number of racks.
> 
> So, the assignment we would get is like this:
> * assigned to three different instances
> * one copy in each of zone a, b, and c
> * two copies in one cluster and one in the other cluster
> 
> For example, we might have 0_0 assigned to:
> * Node 1 (cluster 1, zone a)
> * Node 5 (cluster 2, zone b)
> * Node 3 (cluster 1, zone c)
> 
> Is that what you were also thinking?
> 
> Thanks,
> -John
> 
> On Tue, Feb 2, 2021, at 02:24, Levani Kokhreidze wrote:
>> Hi John,
>> 
>> 1. Main reason was that it seemed easier change compared to having 
>> multiple tags assigned to each host.
>> 
>> ---
>> 
>> Answering your question what use-case I have in mind:
>> Lets say we have two Kubernetes clusters running the same Kafka Streams 
>> application. 
>> And each Kubernetes cluster is spanned across multiple AZ. 
>> So the setup overall looks something like this:
>> 
>> K8s_Cluster1 [eu-central-1a, eu-central-1b, eu-central-1c]
>> K8s_Cluster2 [eu-central-1a, eu-central-1b, eu-central-1c]
>> 
>> Now, if Kafka Streams application is launched in K8s_Clister1: 
>> eu-central-1a,
>> ideally I would want standby task to be created in the different K8s 
>> cluster and region.
>> So in this example it can be K8s_Cluster2: [eu-central-1b, 
>> eu-central-1c]
>> 
>> But giving it a bit more thought, this can be implemented if we change 
>> semantics of “tags” a bit.
>> So instead of doing full match with tags, we can do iterative matching 
>> and it should work.
>> (If this is what you had in mind, apologies for the misunderstanding).
>> 
>> If we consider the same example as mentioned above, for the active task 
>> we would
>> have following tags: [K8s_Cluster1, eu-central-1a]. In order to 
>> distribute standby task
>> in the different K8s cluster, plus in the different AWS region, standby 
>> task assignment 
>> algorithm can compare each tag by index. So steps would be something 
>> like:
>> 
>> // this will result in selecting client in the different K8s cluster
>> 1. clientsInDifferentCluster = (tagsOfActiveTask[0] != allClientTags[0])
>> // this will result in selecting the client in different AWS region
>> 2. selectedClientForStandbyTask = (tagsOfActiveTask[1] != 
>> clientsInDifferentCluster[1] )
>> 
>> WDYT?
>> 
>> If you agree with the use-case I’ve mentioned, the pluggable assignor 
>> can be differed to another KIP, yes.
>> As it won’t be required for this KIP and use-cases I had in mind to 
>> work.
>> 
>> Regards,
>> Levani 
>> 
>> 
>>> On 2. Feb 2021, at 07:55, John Roesler <vvcep...@apache.org> wrote:
>>> 
>>> Hello Levani,
>>> 
>>> Thanks for the reply. 
>>> 
>>> 1. Interesting; why did you change your mind?
>>> 
>>> I have a gut feeling that we can achieve pretty much any rack awareness 
>>> need that people have by using purely config, which is obviously much 
>>> easier to use. But if you had a case in mind where this wouldn’t work, it 
>>> would be good to know. 
>>> 
>>> In fact, if that is true, then perhaps you could just defer the whole idea 
>>> of a pluggable interface (point 2) to a separate KIP. I do think a 
>>> pluggable assignor would be extremely valuable, but it might be nice to cut 
>>> the scope of KIP-708 if just a config will suffice.
>>> 
>>> What do you think?
>>> Thanks,
>>> John
>>> 
>>> 
>>> On Mon, Feb 1, 2021, at 06:07, Levani Kokhreidze wrote:
>>>> Hi John,
>>>> 
>>>> Thanks a lot for thorough feedback, it’s really valuable.
>>>> 
>>>> 1. Agree with this. Had the same idea initially.
>>>> We can set some upper limit in terms of what’s 
>>>> the max number of tags users can set to make 
>>>> sure it’s not overused. By default, we can create 
>>>> standby tasks where tags are different from active task (full match). 
>>>> This should mimic default rack awareness behaviour.
>>>> 
>>>> 2. I like the idea and I’d be happy to work on 
>>>> refactoring TaskAssignor to accommodate rack awareness use-case. 
>>>> When I was going through the code, it felt way more natural 
>>>> to use pluggable TaskAssignor for achieving rack awareness 
>>>> instead of introducing new interface and contract. 
>>>> But I thought approach mentioned in the KIP is simpler so 
>>>> decided to move forward with it as an initial proposal :). 
>>>> But I agree with you, it will be much better if we can have 
>>>> TaskAssignor as pluggable interface users can use.
>>>> One potential challenge I see with this is that, if we just let
>>>> users implement TaskAssignor in its current form, we will be forcing
>>>> users to implement functionality for active task assignment, as well as
>>>> standby task assignment. This feels like not very clear contract, 
>>>> because with
>>>> just TaskAssignor interface it’s not really clear they one needs to 
>>>> allocate 
>>>> standby tasks as well. We can enforce it on some level with the return 
>>>> object
>>>> You’ve mentioned TaskAssignor#assign has to return, but still feels 
>>>> error prone.
>>>> In addition, I suspect in most of the cases users would want
>>>> to control standby task assignment and leave active task assignment as 
>>>> is. 
>>>> To make implementation of standby task assignment easier for users, 
>>>> what if
>>>> we decouple active and standby task assignment from the `TaskAssignor`?
>>>> Idea I have in mind is to split TaskAssignor into ActiveTaskAssignor 
>>>> and StandbyTaskAssignor
>>>> and let users add their own implementation for them separately if they 
>>>> like via config.
>>>> 
>>>> If this approach sounds reasonable, I’ll work on updating KIP this week. 
>>>> 
>>>> Thanks,
>>>> Levani
>>>> 
>>>>> On 28. Jan 2021, at 19:20, John Roesler <vvcep...@apache.org> wrote:
>>>>> 
>>>>> Thanks, Levani!
>>>>> 
>>>>> I was reflecting more on your KIP last night.
>>>>> 
>>>>> One thing I should mention is that I have previously used
>>>>> the rack awareness feature of Elasticsearch, and found it to
>>>>> be pretty intuitive and also capable of what we needed in
>>>>> our AWS clusters. As you look at related work, you might
>>>>> take ES into consideration.
>>>>> 
>>>>> I was also had some thoughts about your proposal.
>>>>> 
>>>>> 1. I'm wondering if we instead allow people to add arbitrary
>>>>> tags to each host, and then have a configuration to specify
>>>>> a combination of tags to use for rack awareness. This seems
>>>>> easier to manage than for the use case you anticipate where
>>>>> people would concatenate rackId = (clusterId + AZ), and then
>>>>> have to parse the rackId back out to compute the assignment.
>>>>> 
>>>>> 2. About the proposed RackAwareStandbyTaskAssignor, I'm
>>>>> wondering if we can change the level of abstraction a little
>>>>> bit and capture even more value here. One thing we wanted to
>>>>> do in KIP-441, but decided to cut from the scope, was to
>>>>> define a public TaskAssignor interface so that people can
>>>>> plug in the whole task assignment algorithm.
>>>>> 
>>>>> In fact, there is already an internal config and interface
>>>>> for this (`internal.task.assignor.class`:
>>>>> `org.apache.kafka.streams.processor.internals.assignment.Tas
>>>>> kAssignor`).
>>>>> 
>>>>> We kept that interface and config internal because the
>>>>> current TaskAssignor interface has a number of flaws, but if
>>>>> we correct those flaws, we can offer a nice public interface
>>>>> that people can use to control the standby allocation, but
>>>>> also active task allocation, based on the tags I suggested
>>>>> in (1).
>>>>> 
>>>>> I don't think we need too much work to refactor
>>>>> TaskAssignor, the main problems are that the assign method
>>>>> mutates its input and that it doesn't expose the full
>>>>> metadata from the cluster members. Therefore, if you like
>>>>> this idea, we should propose to refactor TaskAssignor with:
>>>>> * input: an immutable description of the cluster, including
>>>>> current lags of all stateful tasks and current stateless
>>>>> task assignments, as well as metadata for each host.
>>>>> * output: an object describing the new assignment as well as
>>>>> a flag on whether to schedule a followup probing rebalance.
>>>>> 
>>>>> An even more stretchy stretch goal would be to include
>>>>> metadata of the brokers, which could be used to achieve
>>>>> higher levels of rack awareness. For example, we could co-
>>>>> locate tasks in the same "rack" (AZ) as the partition leader
>>>>> for their input or output topics, to minimize cross-AZ
>>>>> traffic. I'm not sure to what extent clients can learn the
>>>>> relevant broker metadata, so this stretch might not be
>>>>> currently feasible, but as long as we design the
>>>>> TaskAssignor for extensibility, we can do something like
>>>>> this in the future.
>>>>> 
>>>>> Thanks again for this proposal, I hope the above is more
>>>>> inspiring than annoying :)
>>>>> 
>>>>> I really think your KIP is super high value in whatever form
>>>>> you ultimately land on.
>>>>> 
>>>>> 
>>>>> Thanks,
>>>>> John
>>>>> 
>>>>> On Thu, 2021-01-28 at 13:08 +0200, Levani Kokhreidze wrote:
>>>>>> Hi John
>>>>>> 
>>>>>> Thanks for the feedback (and for the great work on KIP441 :) ). 
>>>>>> Makes sense, will add a section in the KIP explaining rack awarenesses 
>>>>>> on high level and how it’s implemented in the different distributed 
>>>>>> systems.
>>>>>> 
>>>>>> Thanks,
>>>>>> Levani
>>>>>> 
>>>>>>> On 27. Jan 2021, at 16:07, John Roesler <vvcep...@apache.org> wrote:
>>>>>>> 
>>>>>>> Hi Levani,
>>>>>>> 
>>>>>>> Thanks for this KIP! I think this is really high value; it was 
>>>>>>> something I was disappointed I didn’t get to do as part of KIP-441.
>>>>>>> 
>>>>>>> Rack awareness is a feature provided by other distributed systems as 
>>>>>>> well. I wonder if your KIP could devote a section to summarizing what 
>>>>>>> rack awareness looks like in other distributed systems, to help us put 
>>>>>>> this design in context. 
>>>>>>> 
>>>>>>> Thanks!
>>>>>>> John
>>>>>>> 
>>>>>>> 
>>>>>>> On Tue, Jan 26, 2021, at 16:46, Levani Kokhreidze wrote:
>>>>>>>> Hello all,
>>>>>>>> 
>>>>>>>> I’d like to start discussion on KIP-708 [1] that aims to introduce 
>>>>>>>> rack 
>>>>>>>> aware standby task distribution in Kafka Streams.
>>>>>>>> In addition to changes mentioned in the KIP, I’d like to get some 
>>>>>>>> ideas 
>>>>>>>> on additional change I have in mind. 
>>>>>>>> Assuming KIP moves forward, I was wondering if it makes sense to 
>>>>>>>> configure Kafka Streams consumer instances with the rack ID passed 
>>>>>>>> with 
>>>>>>>> the new StreamsConfig#RACK_ID_CONFIG property. 
>>>>>>>> In practice, that would mean that when “rack.id <http://rack.id/>” is 
>>>>>>>> configured in Kafka Streams, it will automatically translate into 
>>>>>>>> ConsumerConfig#CLIENT_RACK_ID config for all the KafkaConsumer clients 
>>>>>>>> that is used by Kafka Streams internally.
>>>>>>>> 
>>>>>>>> [1] 
>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor
>>>>>>>>  
>>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor>
>>>>>>>> 
>>>>>>>> P.S 
>>>>>>>> I have draft PR ready, if it helps the discussion moving forward, I 
>>>>>>>> can 
>>>>>>>> provide the draft PR link in this thread.
>>>>>>>> 
>>>>>>>> Regards, 
>>>>>>>> Levani
>>>>>> 
>>>>> 
>>>>> 
>>>> 
>>>> 
>> 
>> 

Reply via email to