Hello all, I’ve updated KIP-708 [1] to reflect the latest discussion outcomes. I’m looking forward to your feedback.
Regards, Levani [1] - https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+awarness+for+Kafka+Streams > On 2. Feb 2021, at 22:03, Levani Kokhreidze <levani.co...@gmail.com> wrote: > > Hi John. > > Thanks a lot for this detailed analysis! > Yes, that is what I had in mind as well. > I also like that idea of having “task.assignment.awareness” configuration > to tell which instance tags can be used for rack awareness. > I may borrow it for this KIP if you don’t mind :) > > Thanks again John for this discussion, it’s really valuable. > > I’ll update the proposal and share it once again in this discussion thread. > > Regards, > Levani > >> On 2. Feb 2021, at 18:47, John Roesler <vvcep...@apache.org >> <mailto:vvcep...@apache.org>> wrote: >> >> Hi Levani, >> >> 1. Thanks for the details. >> >> I figured it must be something like this two-dimensional definition of >> "rack". >> >> It does seem like, if we make the config take a list of tags, we can define >> the semantics to be that the system will make a best effort to distribute >> the standbys over each rack dimension. >> >> In your example, there are two clusters and three AZs. The example >> configs would be: >> >> Node 1: >> instance.tag.cluster: K8s_Cluster1 >> instance.tag.zone: eu-central-1a >> task.assignment.awareness: cluster,zone >> >> Node 2: >> instance.tag.cluster: K8s_Cluster1 >> instance.tag.zone: eu-central-1b >> task.assignment.awareness: cluster,zone >> >> Node 3: >> instance.tag.cluster: K8s_Cluster1 >> instance.tag.zone: eu-central-1c >> task.assignment.awareness: cluster,zone >> >> Node 4: >> instance.tag.cluster: K8s_Cluster2 >> instance.tag.zone: eu-central-1a >> task.assignment.awareness: cluster,zone >> >> Node 5: >> instance.tag.cluster: K8s_Cluster2 >> instance.tag.zone: eu-central-1b >> task.assignment.awareness: cluster,zone >> >> Node 6: >> instance.tag.cluster: K8s_Cluster2 >> instance.tag.zone: eu-central-1c >> task.assignment.awareness: cluster,zone >> >> >> Now, if we have a task 0_0 with an active and two replicas, >> there are three total copies of the task to distribute over: >> * 6 instances >> * 2 clusters >> * 3 zones >> >> There is a constraint that we _cannot_ assign two copies of a task >> to a single instance, but it seems like the default rack awareness >> would permit us to assign two copies of a task to a rack, if (and only >> if) the number of copies is greater than the number of racks. >> >> So, the assignment we would get is like this: >> * assigned to three different instances >> * one copy in each of zone a, b, and c >> * two copies in one cluster and one in the other cluster >> >> For example, we might have 0_0 assigned to: >> * Node 1 (cluster 1, zone a) >> * Node 5 (cluster 2, zone b) >> * Node 3 (cluster 1, zone c) >> >> Is that what you were also thinking? >> >> Thanks, >> -John >> >> On Tue, Feb 2, 2021, at 02:24, Levani Kokhreidze wrote: >>> Hi John, >>> >>> 1. Main reason was that it seemed easier change compared to having >>> multiple tags assigned to each host. >>> >>> --- >>> >>> Answering your question what use-case I have in mind: >>> Lets say we have two Kubernetes clusters running the same Kafka Streams >>> application. >>> And each Kubernetes cluster is spanned across multiple AZ. >>> So the setup overall looks something like this: >>> >>> K8s_Cluster1 [eu-central-1a, eu-central-1b, eu-central-1c] >>> K8s_Cluster2 [eu-central-1a, eu-central-1b, eu-central-1c] >>> >>> Now, if Kafka Streams application is launched in K8s_Clister1: >>> eu-central-1a, >>> ideally I would want standby task to be created in the different K8s >>> cluster and region. >>> So in this example it can be K8s_Cluster2: [eu-central-1b, >>> eu-central-1c] >>> >>> But giving it a bit more thought, this can be implemented if we change >>> semantics of “tags” a bit. >>> So instead of doing full match with tags, we can do iterative matching >>> and it should work. >>> (If this is what you had in mind, apologies for the misunderstanding). >>> >>> If we consider the same example as mentioned above, for the active task >>> we would >>> have following tags: [K8s_Cluster1, eu-central-1a]. In order to >>> distribute standby task >>> in the different K8s cluster, plus in the different AWS region, standby >>> task assignment >>> algorithm can compare each tag by index. So steps would be something >>> like: >>> >>> // this will result in selecting client in the different K8s cluster >>> 1. clientsInDifferentCluster = (tagsOfActiveTask[0] != allClientTags[0]) >>> // this will result in selecting the client in different AWS region >>> 2. selectedClientForStandbyTask = (tagsOfActiveTask[1] != >>> clientsInDifferentCluster[1] ) >>> >>> WDYT? >>> >>> If you agree with the use-case I’ve mentioned, the pluggable assignor >>> can be differed to another KIP, yes. >>> As it won’t be required for this KIP and use-cases I had in mind to >>> work. >>> >>> Regards, >>> Levani >>> >>> >>>> On 2. Feb 2021, at 07:55, John Roesler <vvcep...@apache.org >>>> <mailto:vvcep...@apache.org>> wrote: >>>> >>>> Hello Levani, >>>> >>>> Thanks for the reply. >>>> >>>> 1. Interesting; why did you change your mind? >>>> >>>> I have a gut feeling that we can achieve pretty much any rack awareness >>>> need that people have by using purely config, which is obviously much >>>> easier to use. But if you had a case in mind where this wouldn’t work, it >>>> would be good to know. >>>> >>>> In fact, if that is true, then perhaps you could just defer the whole idea >>>> of a pluggable interface (point 2) to a separate KIP. I do think a >>>> pluggable assignor would be extremely valuable, but it might be nice to >>>> cut the scope of KIP-708 if just a config will suffice. >>>> >>>> What do you think? >>>> Thanks, >>>> John >>>> >>>> >>>> On Mon, Feb 1, 2021, at 06:07, Levani Kokhreidze wrote: >>>>> Hi John, >>>>> >>>>> Thanks a lot for thorough feedback, it’s really valuable. >>>>> >>>>> 1. Agree with this. Had the same idea initially. >>>>> We can set some upper limit in terms of what’s >>>>> the max number of tags users can set to make >>>>> sure it’s not overused. By default, we can create >>>>> standby tasks where tags are different from active task (full match). >>>>> This should mimic default rack awareness behaviour. >>>>> >>>>> 2. I like the idea and I’d be happy to work on >>>>> refactoring TaskAssignor to accommodate rack awareness use-case. >>>>> When I was going through the code, it felt way more natural >>>>> to use pluggable TaskAssignor for achieving rack awareness >>>>> instead of introducing new interface and contract. >>>>> But I thought approach mentioned in the KIP is simpler so >>>>> decided to move forward with it as an initial proposal :). >>>>> But I agree with you, it will be much better if we can have >>>>> TaskAssignor as pluggable interface users can use. >>>>> One potential challenge I see with this is that, if we just let >>>>> users implement TaskAssignor in its current form, we will be forcing >>>>> users to implement functionality for active task assignment, as well as >>>>> standby task assignment. This feels like not very clear contract, >>>>> because with >>>>> just TaskAssignor interface it’s not really clear they one needs to >>>>> allocate >>>>> standby tasks as well. We can enforce it on some level with the return >>>>> object >>>>> You’ve mentioned TaskAssignor#assign has to return, but still feels >>>>> error prone. >>>>> In addition, I suspect in most of the cases users would want >>>>> to control standby task assignment and leave active task assignment as >>>>> is. >>>>> To make implementation of standby task assignment easier for users, >>>>> what if >>>>> we decouple active and standby task assignment from the `TaskAssignor`? >>>>> Idea I have in mind is to split TaskAssignor into ActiveTaskAssignor >>>>> and StandbyTaskAssignor >>>>> and let users add their own implementation for them separately if they >>>>> like via config. >>>>> >>>>> If this approach sounds reasonable, I’ll work on updating KIP this week. >>>>> >>>>> Thanks, >>>>> Levani >>>>> >>>>>> On 28. Jan 2021, at 19:20, John Roesler <vvcep...@apache.org >>>>>> <mailto:vvcep...@apache.org>> wrote: >>>>>> >>>>>> Thanks, Levani! >>>>>> >>>>>> I was reflecting more on your KIP last night. >>>>>> >>>>>> One thing I should mention is that I have previously used >>>>>> the rack awareness feature of Elasticsearch, and found it to >>>>>> be pretty intuitive and also capable of what we needed in >>>>>> our AWS clusters. As you look at related work, you might >>>>>> take ES into consideration. >>>>>> >>>>>> I was also had some thoughts about your proposal. >>>>>> >>>>>> 1. I'm wondering if we instead allow people to add arbitrary >>>>>> tags to each host, and then have a configuration to specify >>>>>> a combination of tags to use for rack awareness. This seems >>>>>> easier to manage than for the use case you anticipate where >>>>>> people would concatenate rackId = (clusterId + AZ), and then >>>>>> have to parse the rackId back out to compute the assignment. >>>>>> >>>>>> 2. About the proposed RackAwareStandbyTaskAssignor, I'm >>>>>> wondering if we can change the level of abstraction a little >>>>>> bit and capture even more value here. One thing we wanted to >>>>>> do in KIP-441, but decided to cut from the scope, was to >>>>>> define a public TaskAssignor interface so that people can >>>>>> plug in the whole task assignment algorithm. >>>>>> >>>>>> In fact, there is already an internal config and interface >>>>>> for this (`internal.task.assignor.class`: >>>>>> `org.apache.kafka.streams.processor.internals.assignment.Tas >>>>>> kAssignor`). >>>>>> >>>>>> We kept that interface and config internal because the >>>>>> current TaskAssignor interface has a number of flaws, but if >>>>>> we correct those flaws, we can offer a nice public interface >>>>>> that people can use to control the standby allocation, but >>>>>> also active task allocation, based on the tags I suggested >>>>>> in (1). >>>>>> >>>>>> I don't think we need too much work to refactor >>>>>> TaskAssignor, the main problems are that the assign method >>>>>> mutates its input and that it doesn't expose the full >>>>>> metadata from the cluster members. Therefore, if you like >>>>>> this idea, we should propose to refactor TaskAssignor with: >>>>>> * input: an immutable description of the cluster, including >>>>>> current lags of all stateful tasks and current stateless >>>>>> task assignments, as well as metadata for each host. >>>>>> * output: an object describing the new assignment as well as >>>>>> a flag on whether to schedule a followup probing rebalance. >>>>>> >>>>>> An even more stretchy stretch goal would be to include >>>>>> metadata of the brokers, which could be used to achieve >>>>>> higher levels of rack awareness. For example, we could co- >>>>>> locate tasks in the same "rack" (AZ) as the partition leader >>>>>> for their input or output topics, to minimize cross-AZ >>>>>> traffic. I'm not sure to what extent clients can learn the >>>>>> relevant broker metadata, so this stretch might not be >>>>>> currently feasible, but as long as we design the >>>>>> TaskAssignor for extensibility, we can do something like >>>>>> this in the future. >>>>>> >>>>>> Thanks again for this proposal, I hope the above is more >>>>>> inspiring than annoying :) >>>>>> >>>>>> I really think your KIP is super high value in whatever form >>>>>> you ultimately land on. >>>>>> >>>>>> >>>>>> Thanks, >>>>>> John >>>>>> >>>>>> On Thu, 2021-01-28 at 13:08 +0200, Levani Kokhreidze wrote: >>>>>>> Hi John >>>>>>> >>>>>>> Thanks for the feedback (and for the great work on KIP441 :) ). >>>>>>> Makes sense, will add a section in the KIP explaining rack awarenesses >>>>>>> on high level and how it’s implemented in the different distributed >>>>>>> systems. >>>>>>> >>>>>>> Thanks, >>>>>>> Levani >>>>>>> >>>>>>>> On 27. Jan 2021, at 16:07, John Roesler <vvcep...@apache.org >>>>>>>> <mailto:vvcep...@apache.org>> wrote: >>>>>>>> >>>>>>>> Hi Levani, >>>>>>>> >>>>>>>> Thanks for this KIP! I think this is really high value; it was >>>>>>>> something I was disappointed I didn’t get to do as part of KIP-441. >>>>>>>> >>>>>>>> Rack awareness is a feature provided by other distributed systems as >>>>>>>> well. I wonder if your KIP could devote a section to summarizing what >>>>>>>> rack awareness looks like in other distributed systems, to help us put >>>>>>>> this design in context. >>>>>>>> >>>>>>>> Thanks! >>>>>>>> John >>>>>>>> >>>>>>>> >>>>>>>> On Tue, Jan 26, 2021, at 16:46, Levani Kokhreidze wrote: >>>>>>>>> Hello all, >>>>>>>>> >>>>>>>>> I’d like to start discussion on KIP-708 [1] that aims to introduce >>>>>>>>> rack >>>>>>>>> aware standby task distribution in Kafka Streams. >>>>>>>>> In addition to changes mentioned in the KIP, I’d like to get some >>>>>>>>> ideas >>>>>>>>> on additional change I have in mind. >>>>>>>>> Assuming KIP moves forward, I was wondering if it makes sense to >>>>>>>>> configure Kafka Streams consumer instances with the rack ID passed >>>>>>>>> with >>>>>>>>> the new StreamsConfig#RACK_ID_CONFIG property. >>>>>>>>> In practice, that would mean that when “rack.id <http://rack.id/> >>>>>>>>> <http://rack.id/ <http://rack.id/>>” is >>>>>>>>> configured in Kafka Streams, it will automatically translate into >>>>>>>>> ConsumerConfig#CLIENT_RACK_ID config for all the KafkaConsumer >>>>>>>>> clients >>>>>>>>> that is used by Kafka Streams internally. >>>>>>>>> >>>>>>>>> [1] >>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor >>>>>>>>> >>>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor> >>>>>>>>> >>>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor >>>>>>>>> >>>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor>> >>>>>>>>> >>>>>>>>> P.S >>>>>>>>> I have draft PR ready, if it helps the discussion moving forward, I >>>>>>>>> can >>>>>>>>> provide the draft PR link in this thread. >>>>>>>>> >>>>>>>>> Regards, >>>>>>>>> Levani >>>>>>> >>>>>> >>>>>> >>>>> >>>>> >>> >>> >