Hi Levani, 1. Thanks for the details.
I figured it must be something like this two-dimensional definition of "rack". It does seem like, if we make the config take a list of tags, we can define the semantics to be that the system will make a best effort to distribute the standbys over each rack dimension. In your example, there are two clusters and three AZs. The example configs would be: Node 1: instance.tag.cluster: K8s_Cluster1 instance.tag.zone: eu-central-1a task.assignment.awareness: cluster,zone Node 2: instance.tag.cluster: K8s_Cluster1 instance.tag.zone: eu-central-1b task.assignment.awareness: cluster,zone Node 3: instance.tag.cluster: K8s_Cluster1 instance.tag.zone: eu-central-1c task.assignment.awareness: cluster,zone Node 4: instance.tag.cluster: K8s_Cluster2 instance.tag.zone: eu-central-1a task.assignment.awareness: cluster,zone Node 5: instance.tag.cluster: K8s_Cluster2 instance.tag.zone: eu-central-1b task.assignment.awareness: cluster,zone Node 6: instance.tag.cluster: K8s_Cluster2 instance.tag.zone: eu-central-1c task.assignment.awareness: cluster,zone Now, if we have a task 0_0 with an active and two replicas, there are three total copies of the task to distribute over: * 6 instances * 2 clusters * 3 zones There is a constraint that we _cannot_ assign two copies of a task to a single instance, but it seems like the default rack awareness would permit us to assign two copies of a task to a rack, if (and only if) the number of copies is greater than the number of racks. So, the assignment we would get is like this: * assigned to three different instances * one copy in each of zone a, b, and c * two copies in one cluster and one in the other cluster For example, we might have 0_0 assigned to: * Node 1 (cluster 1, zone a) * Node 5 (cluster 2, zone b) * Node 3 (cluster 1, zone c) Is that what you were also thinking? Thanks, -John On Tue, Feb 2, 2021, at 02:24, Levani Kokhreidze wrote: > Hi John, > > 1. Main reason was that it seemed easier change compared to having > multiple tags assigned to each host. > > --- > > Answering your question what use-case I have in mind: > Lets say we have two Kubernetes clusters running the same Kafka Streams > application. > And each Kubernetes cluster is spanned across multiple AZ. > So the setup overall looks something like this: > > K8s_Cluster1 [eu-central-1a, eu-central-1b, eu-central-1c] > K8s_Cluster2 [eu-central-1a, eu-central-1b, eu-central-1c] > > Now, if Kafka Streams application is launched in K8s_Clister1: > eu-central-1a, > ideally I would want standby task to be created in the different K8s > cluster and region. > So in this example it can be K8s_Cluster2: [eu-central-1b, > eu-central-1c] > > But giving it a bit more thought, this can be implemented if we change > semantics of “tags” a bit. > So instead of doing full match with tags, we can do iterative matching > and it should work. > (If this is what you had in mind, apologies for the misunderstanding). > > If we consider the same example as mentioned above, for the active task > we would > have following tags: [K8s_Cluster1, eu-central-1a]. In order to > distribute standby task > in the different K8s cluster, plus in the different AWS region, standby > task assignment > algorithm can compare each tag by index. So steps would be something > like: > > // this will result in selecting client in the different K8s cluster > 1. clientsInDifferentCluster = (tagsOfActiveTask[0] != allClientTags[0]) > // this will result in selecting the client in different AWS region > 2. selectedClientForStandbyTask = (tagsOfActiveTask[1] != > clientsInDifferentCluster[1] ) > > WDYT? > > If you agree with the use-case I’ve mentioned, the pluggable assignor > can be differed to another KIP, yes. > As it won’t be required for this KIP and use-cases I had in mind to > work. > > Regards, > Levani > > > > On 2. Feb 2021, at 07:55, John Roesler <vvcep...@apache.org> wrote: > > > > Hello Levani, > > > > Thanks for the reply. > > > > 1. Interesting; why did you change your mind? > > > > I have a gut feeling that we can achieve pretty much any rack awareness > > need that people have by using purely config, which is obviously much > > easier to use. But if you had a case in mind where this wouldn’t work, it > > would be good to know. > > > > In fact, if that is true, then perhaps you could just defer the whole idea > > of a pluggable interface (point 2) to a separate KIP. I do think a > > pluggable assignor would be extremely valuable, but it might be nice to cut > > the scope of KIP-708 if just a config will suffice. > > > > What do you think? > > Thanks, > > John > > > > > > On Mon, Feb 1, 2021, at 06:07, Levani Kokhreidze wrote: > >> Hi John, > >> > >> Thanks a lot for thorough feedback, it’s really valuable. > >> > >> 1. Agree with this. Had the same idea initially. > >> We can set some upper limit in terms of what’s > >> the max number of tags users can set to make > >> sure it’s not overused. By default, we can create > >> standby tasks where tags are different from active task (full match). > >> This should mimic default rack awareness behaviour. > >> > >> 2. I like the idea and I’d be happy to work on > >> refactoring TaskAssignor to accommodate rack awareness use-case. > >> When I was going through the code, it felt way more natural > >> to use pluggable TaskAssignor for achieving rack awareness > >> instead of introducing new interface and contract. > >> But I thought approach mentioned in the KIP is simpler so > >> decided to move forward with it as an initial proposal :). > >> But I agree with you, it will be much better if we can have > >> TaskAssignor as pluggable interface users can use. > >> One potential challenge I see with this is that, if we just let > >> users implement TaskAssignor in its current form, we will be forcing > >> users to implement functionality for active task assignment, as well as > >> standby task assignment. This feels like not very clear contract, > >> because with > >> just TaskAssignor interface it’s not really clear they one needs to > >> allocate > >> standby tasks as well. We can enforce it on some level with the return > >> object > >> You’ve mentioned TaskAssignor#assign has to return, but still feels > >> error prone. > >> In addition, I suspect in most of the cases users would want > >> to control standby task assignment and leave active task assignment as > >> is. > >> To make implementation of standby task assignment easier for users, > >> what if > >> we decouple active and standby task assignment from the `TaskAssignor`? > >> Idea I have in mind is to split TaskAssignor into ActiveTaskAssignor > >> and StandbyTaskAssignor > >> and let users add their own implementation for them separately if they > >> like via config. > >> > >> If this approach sounds reasonable, I’ll work on updating KIP this week. > >> > >> Thanks, > >> Levani > >> > >>> On 28. Jan 2021, at 19:20, John Roesler <vvcep...@apache.org> wrote: > >>> > >>> Thanks, Levani! > >>> > >>> I was reflecting more on your KIP last night. > >>> > >>> One thing I should mention is that I have previously used > >>> the rack awareness feature of Elasticsearch, and found it to > >>> be pretty intuitive and also capable of what we needed in > >>> our AWS clusters. As you look at related work, you might > >>> take ES into consideration. > >>> > >>> I was also had some thoughts about your proposal. > >>> > >>> 1. I'm wondering if we instead allow people to add arbitrary > >>> tags to each host, and then have a configuration to specify > >>> a combination of tags to use for rack awareness. This seems > >>> easier to manage than for the use case you anticipate where > >>> people would concatenate rackId = (clusterId + AZ), and then > >>> have to parse the rackId back out to compute the assignment. > >>> > >>> 2. About the proposed RackAwareStandbyTaskAssignor, I'm > >>> wondering if we can change the level of abstraction a little > >>> bit and capture even more value here. One thing we wanted to > >>> do in KIP-441, but decided to cut from the scope, was to > >>> define a public TaskAssignor interface so that people can > >>> plug in the whole task assignment algorithm. > >>> > >>> In fact, there is already an internal config and interface > >>> for this (`internal.task.assignor.class`: > >>> `org.apache.kafka.streams.processor.internals.assignment.Tas > >>> kAssignor`). > >>> > >>> We kept that interface and config internal because the > >>> current TaskAssignor interface has a number of flaws, but if > >>> we correct those flaws, we can offer a nice public interface > >>> that people can use to control the standby allocation, but > >>> also active task allocation, based on the tags I suggested > >>> in (1). > >>> > >>> I don't think we need too much work to refactor > >>> TaskAssignor, the main problems are that the assign method > >>> mutates its input and that it doesn't expose the full > >>> metadata from the cluster members. Therefore, if you like > >>> this idea, we should propose to refactor TaskAssignor with: > >>> * input: an immutable description of the cluster, including > >>> current lags of all stateful tasks and current stateless > >>> task assignments, as well as metadata for each host. > >>> * output: an object describing the new assignment as well as > >>> a flag on whether to schedule a followup probing rebalance. > >>> > >>> An even more stretchy stretch goal would be to include > >>> metadata of the brokers, which could be used to achieve > >>> higher levels of rack awareness. For example, we could co- > >>> locate tasks in the same "rack" (AZ) as the partition leader > >>> for their input or output topics, to minimize cross-AZ > >>> traffic. I'm not sure to what extent clients can learn the > >>> relevant broker metadata, so this stretch might not be > >>> currently feasible, but as long as we design the > >>> TaskAssignor for extensibility, we can do something like > >>> this in the future. > >>> > >>> Thanks again for this proposal, I hope the above is more > >>> inspiring than annoying :) > >>> > >>> I really think your KIP is super high value in whatever form > >>> you ultimately land on. > >>> > >>> > >>> Thanks, > >>> John > >>> > >>> On Thu, 2021-01-28 at 13:08 +0200, Levani Kokhreidze wrote: > >>>> Hi John > >>>> > >>>> Thanks for the feedback (and for the great work on KIP441 :) ). > >>>> Makes sense, will add a section in the KIP explaining rack awarenesses > >>>> on high level and how it’s implemented in the different distributed > >>>> systems. > >>>> > >>>> Thanks, > >>>> Levani > >>>> > >>>>> On 27. Jan 2021, at 16:07, John Roesler <vvcep...@apache.org> wrote: > >>>>> > >>>>> Hi Levani, > >>>>> > >>>>> Thanks for this KIP! I think this is really high value; it was > >>>>> something I was disappointed I didn’t get to do as part of KIP-441. > >>>>> > >>>>> Rack awareness is a feature provided by other distributed systems as > >>>>> well. I wonder if your KIP could devote a section to summarizing what > >>>>> rack awareness looks like in other distributed systems, to help us put > >>>>> this design in context. > >>>>> > >>>>> Thanks! > >>>>> John > >>>>> > >>>>> > >>>>> On Tue, Jan 26, 2021, at 16:46, Levani Kokhreidze wrote: > >>>>>> Hello all, > >>>>>> > >>>>>> I’d like to start discussion on KIP-708 [1] that aims to introduce > >>>>>> rack > >>>>>> aware standby task distribution in Kafka Streams. > >>>>>> In addition to changes mentioned in the KIP, I’d like to get some > >>>>>> ideas > >>>>>> on additional change I have in mind. > >>>>>> Assuming KIP moves forward, I was wondering if it makes sense to > >>>>>> configure Kafka Streams consumer instances with the rack ID passed > >>>>>> with > >>>>>> the new StreamsConfig#RACK_ID_CONFIG property. > >>>>>> In practice, that would mean that when “rack.id <http://rack.id/>” is > >>>>>> configured in Kafka Streams, it will automatically translate into > >>>>>> ConsumerConfig#CLIENT_RACK_ID config for all the KafkaConsumer clients > >>>>>> that is used by Kafka Streams internally. > >>>>>> > >>>>>> [1] > >>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor > >>>>>> > >>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor> > >>>>>> > >>>>>> P.S > >>>>>> I have draft PR ready, if it helps the discussion moving forward, I > >>>>>> can > >>>>>> provide the draft PR link in this thread. > >>>>>> > >>>>>> Regards, > >>>>>> Levani > >>>> > >>> > >>> > >> > >> > >