Hi John. Thanks a lot for this detailed analysis! Yes, that is what I had in mind as well. I also like that idea of having “task.assignment.awareness” configuration to tell which instance tags can be used for rack awareness. I may borrow it for this KIP if you don’t mind :)
Thanks again John for this discussion, it’s really valuable. I’ll update the proposal and share it once again in this discussion thread. Regards, Levani > On 2. Feb 2021, at 18:47, John Roesler <vvcep...@apache.org> wrote: > > Hi Levani, > > 1. Thanks for the details. > > I figured it must be something like this two-dimensional definition of "rack". > > It does seem like, if we make the config take a list of tags, we can define > the semantics to be that the system will make a best effort to distribute > the standbys over each rack dimension. > > In your example, there are two clusters and three AZs. The example > configs would be: > > Node 1: > instance.tag.cluster: K8s_Cluster1 > instance.tag.zone: eu-central-1a > task.assignment.awareness: cluster,zone > > Node 2: > instance.tag.cluster: K8s_Cluster1 > instance.tag.zone: eu-central-1b > task.assignment.awareness: cluster,zone > > Node 3: > instance.tag.cluster: K8s_Cluster1 > instance.tag.zone: eu-central-1c > task.assignment.awareness: cluster,zone > > Node 4: > instance.tag.cluster: K8s_Cluster2 > instance.tag.zone: eu-central-1a > task.assignment.awareness: cluster,zone > > Node 5: > instance.tag.cluster: K8s_Cluster2 > instance.tag.zone: eu-central-1b > task.assignment.awareness: cluster,zone > > Node 6: > instance.tag.cluster: K8s_Cluster2 > instance.tag.zone: eu-central-1c > task.assignment.awareness: cluster,zone > > > Now, if we have a task 0_0 with an active and two replicas, > there are three total copies of the task to distribute over: > * 6 instances > * 2 clusters > * 3 zones > > There is a constraint that we _cannot_ assign two copies of a task > to a single instance, but it seems like the default rack awareness > would permit us to assign two copies of a task to a rack, if (and only > if) the number of copies is greater than the number of racks. > > So, the assignment we would get is like this: > * assigned to three different instances > * one copy in each of zone a, b, and c > * two copies in one cluster and one in the other cluster > > For example, we might have 0_0 assigned to: > * Node 1 (cluster 1, zone a) > * Node 5 (cluster 2, zone b) > * Node 3 (cluster 1, zone c) > > Is that what you were also thinking? > > Thanks, > -John > > On Tue, Feb 2, 2021, at 02:24, Levani Kokhreidze wrote: >> Hi John, >> >> 1. Main reason was that it seemed easier change compared to having >> multiple tags assigned to each host. >> >> --- >> >> Answering your question what use-case I have in mind: >> Lets say we have two Kubernetes clusters running the same Kafka Streams >> application. >> And each Kubernetes cluster is spanned across multiple AZ. >> So the setup overall looks something like this: >> >> K8s_Cluster1 [eu-central-1a, eu-central-1b, eu-central-1c] >> K8s_Cluster2 [eu-central-1a, eu-central-1b, eu-central-1c] >> >> Now, if Kafka Streams application is launched in K8s_Clister1: >> eu-central-1a, >> ideally I would want standby task to be created in the different K8s >> cluster and region. >> So in this example it can be K8s_Cluster2: [eu-central-1b, >> eu-central-1c] >> >> But giving it a bit more thought, this can be implemented if we change >> semantics of “tags” a bit. >> So instead of doing full match with tags, we can do iterative matching >> and it should work. >> (If this is what you had in mind, apologies for the misunderstanding). >> >> If we consider the same example as mentioned above, for the active task >> we would >> have following tags: [K8s_Cluster1, eu-central-1a]. In order to >> distribute standby task >> in the different K8s cluster, plus in the different AWS region, standby >> task assignment >> algorithm can compare each tag by index. So steps would be something >> like: >> >> // this will result in selecting client in the different K8s cluster >> 1. clientsInDifferentCluster = (tagsOfActiveTask[0] != allClientTags[0]) >> // this will result in selecting the client in different AWS region >> 2. selectedClientForStandbyTask = (tagsOfActiveTask[1] != >> clientsInDifferentCluster[1] ) >> >> WDYT? >> >> If you agree with the use-case I’ve mentioned, the pluggable assignor >> can be differed to another KIP, yes. >> As it won’t be required for this KIP and use-cases I had in mind to >> work. >> >> Regards, >> Levani >> >> >>> On 2. Feb 2021, at 07:55, John Roesler <vvcep...@apache.org> wrote: >>> >>> Hello Levani, >>> >>> Thanks for the reply. >>> >>> 1. Interesting; why did you change your mind? >>> >>> I have a gut feeling that we can achieve pretty much any rack awareness >>> need that people have by using purely config, which is obviously much >>> easier to use. But if you had a case in mind where this wouldn’t work, it >>> would be good to know. >>> >>> In fact, if that is true, then perhaps you could just defer the whole idea >>> of a pluggable interface (point 2) to a separate KIP. I do think a >>> pluggable assignor would be extremely valuable, but it might be nice to cut >>> the scope of KIP-708 if just a config will suffice. >>> >>> What do you think? >>> Thanks, >>> John >>> >>> >>> On Mon, Feb 1, 2021, at 06:07, Levani Kokhreidze wrote: >>>> Hi John, >>>> >>>> Thanks a lot for thorough feedback, it’s really valuable. >>>> >>>> 1. Agree with this. Had the same idea initially. >>>> We can set some upper limit in terms of what’s >>>> the max number of tags users can set to make >>>> sure it’s not overused. By default, we can create >>>> standby tasks where tags are different from active task (full match). >>>> This should mimic default rack awareness behaviour. >>>> >>>> 2. I like the idea and I’d be happy to work on >>>> refactoring TaskAssignor to accommodate rack awareness use-case. >>>> When I was going through the code, it felt way more natural >>>> to use pluggable TaskAssignor for achieving rack awareness >>>> instead of introducing new interface and contract. >>>> But I thought approach mentioned in the KIP is simpler so >>>> decided to move forward with it as an initial proposal :). >>>> But I agree with you, it will be much better if we can have >>>> TaskAssignor as pluggable interface users can use. >>>> One potential challenge I see with this is that, if we just let >>>> users implement TaskAssignor in its current form, we will be forcing >>>> users to implement functionality for active task assignment, as well as >>>> standby task assignment. This feels like not very clear contract, >>>> because with >>>> just TaskAssignor interface it’s not really clear they one needs to >>>> allocate >>>> standby tasks as well. We can enforce it on some level with the return >>>> object >>>> You’ve mentioned TaskAssignor#assign has to return, but still feels >>>> error prone. >>>> In addition, I suspect in most of the cases users would want >>>> to control standby task assignment and leave active task assignment as >>>> is. >>>> To make implementation of standby task assignment easier for users, >>>> what if >>>> we decouple active and standby task assignment from the `TaskAssignor`? >>>> Idea I have in mind is to split TaskAssignor into ActiveTaskAssignor >>>> and StandbyTaskAssignor >>>> and let users add their own implementation for them separately if they >>>> like via config. >>>> >>>> If this approach sounds reasonable, I’ll work on updating KIP this week. >>>> >>>> Thanks, >>>> Levani >>>> >>>>> On 28. Jan 2021, at 19:20, John Roesler <vvcep...@apache.org> wrote: >>>>> >>>>> Thanks, Levani! >>>>> >>>>> I was reflecting more on your KIP last night. >>>>> >>>>> One thing I should mention is that I have previously used >>>>> the rack awareness feature of Elasticsearch, and found it to >>>>> be pretty intuitive and also capable of what we needed in >>>>> our AWS clusters. As you look at related work, you might >>>>> take ES into consideration. >>>>> >>>>> I was also had some thoughts about your proposal. >>>>> >>>>> 1. I'm wondering if we instead allow people to add arbitrary >>>>> tags to each host, and then have a configuration to specify >>>>> a combination of tags to use for rack awareness. This seems >>>>> easier to manage than for the use case you anticipate where >>>>> people would concatenate rackId = (clusterId + AZ), and then >>>>> have to parse the rackId back out to compute the assignment. >>>>> >>>>> 2. About the proposed RackAwareStandbyTaskAssignor, I'm >>>>> wondering if we can change the level of abstraction a little >>>>> bit and capture even more value here. One thing we wanted to >>>>> do in KIP-441, but decided to cut from the scope, was to >>>>> define a public TaskAssignor interface so that people can >>>>> plug in the whole task assignment algorithm. >>>>> >>>>> In fact, there is already an internal config and interface >>>>> for this (`internal.task.assignor.class`: >>>>> `org.apache.kafka.streams.processor.internals.assignment.Tas >>>>> kAssignor`). >>>>> >>>>> We kept that interface and config internal because the >>>>> current TaskAssignor interface has a number of flaws, but if >>>>> we correct those flaws, we can offer a nice public interface >>>>> that people can use to control the standby allocation, but >>>>> also active task allocation, based on the tags I suggested >>>>> in (1). >>>>> >>>>> I don't think we need too much work to refactor >>>>> TaskAssignor, the main problems are that the assign method >>>>> mutates its input and that it doesn't expose the full >>>>> metadata from the cluster members. Therefore, if you like >>>>> this idea, we should propose to refactor TaskAssignor with: >>>>> * input: an immutable description of the cluster, including >>>>> current lags of all stateful tasks and current stateless >>>>> task assignments, as well as metadata for each host. >>>>> * output: an object describing the new assignment as well as >>>>> a flag on whether to schedule a followup probing rebalance. >>>>> >>>>> An even more stretchy stretch goal would be to include >>>>> metadata of the brokers, which could be used to achieve >>>>> higher levels of rack awareness. For example, we could co- >>>>> locate tasks in the same "rack" (AZ) as the partition leader >>>>> for their input or output topics, to minimize cross-AZ >>>>> traffic. I'm not sure to what extent clients can learn the >>>>> relevant broker metadata, so this stretch might not be >>>>> currently feasible, but as long as we design the >>>>> TaskAssignor for extensibility, we can do something like >>>>> this in the future. >>>>> >>>>> Thanks again for this proposal, I hope the above is more >>>>> inspiring than annoying :) >>>>> >>>>> I really think your KIP is super high value in whatever form >>>>> you ultimately land on. >>>>> >>>>> >>>>> Thanks, >>>>> John >>>>> >>>>> On Thu, 2021-01-28 at 13:08 +0200, Levani Kokhreidze wrote: >>>>>> Hi John >>>>>> >>>>>> Thanks for the feedback (and for the great work on KIP441 :) ). >>>>>> Makes sense, will add a section in the KIP explaining rack awarenesses >>>>>> on high level and how it’s implemented in the different distributed >>>>>> systems. >>>>>> >>>>>> Thanks, >>>>>> Levani >>>>>> >>>>>>> On 27. Jan 2021, at 16:07, John Roesler <vvcep...@apache.org> wrote: >>>>>>> >>>>>>> Hi Levani, >>>>>>> >>>>>>> Thanks for this KIP! I think this is really high value; it was >>>>>>> something I was disappointed I didn’t get to do as part of KIP-441. >>>>>>> >>>>>>> Rack awareness is a feature provided by other distributed systems as >>>>>>> well. I wonder if your KIP could devote a section to summarizing what >>>>>>> rack awareness looks like in other distributed systems, to help us put >>>>>>> this design in context. >>>>>>> >>>>>>> Thanks! >>>>>>> John >>>>>>> >>>>>>> >>>>>>> On Tue, Jan 26, 2021, at 16:46, Levani Kokhreidze wrote: >>>>>>>> Hello all, >>>>>>>> >>>>>>>> I’d like to start discussion on KIP-708 [1] that aims to introduce >>>>>>>> rack >>>>>>>> aware standby task distribution in Kafka Streams. >>>>>>>> In addition to changes mentioned in the KIP, I’d like to get some >>>>>>>> ideas >>>>>>>> on additional change I have in mind. >>>>>>>> Assuming KIP moves forward, I was wondering if it makes sense to >>>>>>>> configure Kafka Streams consumer instances with the rack ID passed >>>>>>>> with >>>>>>>> the new StreamsConfig#RACK_ID_CONFIG property. >>>>>>>> In practice, that would mean that when “rack.id <http://rack.id/>” is >>>>>>>> configured in Kafka Streams, it will automatically translate into >>>>>>>> ConsumerConfig#CLIENT_RACK_ID config for all the KafkaConsumer clients >>>>>>>> that is used by Kafka Streams internally. >>>>>>>> >>>>>>>> [1] >>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor >>>>>>>> >>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor> >>>>>>>> >>>>>>>> P.S >>>>>>>> I have draft PR ready, if it helps the discussion moving forward, I >>>>>>>> can >>>>>>>> provide the draft PR link in this thread. >>>>>>>> >>>>>>>> Regards, >>>>>>>> Levani >>>>>> >>>>> >>>>> >>>> >>>> >> >>