Hello Ryanne,
Thanks for the question.
Tag approach gives more flexibility, which otherwise could have been only
possible with pluggable custom logic Kafka Streams's user must provide (it
is briefly described in "Rejected Alternatives" section).
For instance, if we append multiple tags to form a single rack, it may not
give desired distribution to the user if the infrastructure topology is
more complex.
Let us consider the following example with appending multiple tags to form
the single rack.
Node-1:
rack.id: K8s_Cluster1-eu-central-1a
num.standby.replicas: 1
Node-2:
rack.id: K8s_Cluster1-eu-central-1b
num.standby.replicas: 1
Node-3:
rack.id: K8s_Cluster1-eu-central-1c
num.standby.replicas: 1
Node-4:
rack.id: K8s_Cluster2-eu-central-1a
num.standby.replicas: 1
Node-5:
rack.id: K8s_Cluster2-eu-central-1b
num.standby.replicas: 1
Node-6:
rack.id: K8s_Cluster2-eu-central-1c
num.standby.replicas: 1
In the example mentioned above, we have three AZs and two Kubernetes
clusters. Our use-case is to distribute standby task in the different
Kubernetes cluster and different availability zone.
For instance, if the active task is in Node1 (K8s_Cluster1-eu-central-1a),
the corresponding standby task should be in either
Node-5(K8s_Cluster2-eu-central-1b) or Node-6(K8s_Cluster2-eu-central-1c).
Unfortunately, without custom logic provided by the user, this would be
very hard to achieve with a single "rack.id" configuration. Because
without any input from the user, Kafka Streams might as well allocate
standby task for the active task either:
In the same Kubernetes cluster and different AZ (Node-2, Node-3)
In the different Kubernetes cluster but the same AZ (Node-4)
On the other hand, with the combination of the new "client.tag.*" and
"task.assignment.rack.awareness" configurations, standby task distribution
algorithm will be able to figure out what will be the most optimal
distribution by balancing the standby tasks over each client.tag dimension
individually. And it can be achieved by simply providing necessary
configurations to Kafka Streams.
The flow was described in more details in previous versions of the KIP,
but I've omitted the KIP algorithm implementation details based on received
feedback. But I acknowledge that this information can be put in the KIP for
better clarity. I took the liberty of updating the KIP with the example
mentioned above [1].
I hope this answeres your question.
Regards,
Levani
[1] -
https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+awarness+for+Kafka+Streams#KIP708:RackawarnessforKafkaStreams-Benefitsoftagsvssinglerack.idconfiguration
On 28. Feb 2021, at 01:37, Ryanne Dolan <ryannedo...@gmail.com> wrote:
I guess I don't understand how multiple tags work together to achieve
rack
awareness. I realize I could go look at how Elasticseach works, but
ideally
this would be more plain in the KIP.
In particular I'm not sure how the tag approach is different than
appending
multiple tags together, e.g. how is cluster=foo, zone=bar different than
rack=foo-bar?
Ryanne
On Sat, Feb 27, 2021, 5:00 PM Levani Kokhreidze <levani.co...@gmail.com
<mailto:levani.co...@gmail.com>>
wrote:
Hi Bruno,
Thanks for the feedback. I think it makes sense.
I’ve updated the KIP [1] and tried to omit implementation details around
the algorithm.
Please let me know if the latest version looks OK.
Regards,
Levani
[1]
https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+awarness+for+Kafka+Streams
On 25. Feb 2021, at 17:59, Bruno Cadonna <br...@confluent.io> wrote:
Hi Levani,
I discussed your KIP with John the other day and we both think it is a
really interesting KIP and you did a good job in writing it. However, we
think that the KIP exposes to many implementation details. That makes
future changes to the implementation of the distribution algorithm
harder
without a KIP. So, we would like to propose to just describe the config
and
the properties that any implementation of the distribution algorithm
should
have. We did something similar in KIP-441 for the task assignment
algorithm
[1].
Specifically, for your KIP, any possible implementation of the
distribution algorithm should read the tags to be considered for rack
awareness from the config and if the cluster allows to distribute each
active task and its replicas to Streams clients with different values
for
each tag, the algorithm will do so. How the implementation behaves, if a
cluster does not allow to distribute over all tag values can be left as
an
implementation detail. This would give us flexibility for future
changes to
the distribution algorithm.
Since there may be distribution algorithms that do not use the order of
the tags, it would be better to not mention the order of the tags in the
config doc. I would propose to omit the config doc from the KIP or
formulate it really generic.
We would also like to rename standby.replicas.awareness to
task.assignment.rack.awareness or something that does not contain
standby
and/or replica (sorry for requesting again to change this name). That
way,
we might be able to use this config also when we decide to make the
active
task assignment rack aware.
I hope all of this makes sense to you.
Thank you again for the interesting KIP!
Looking forward to your implementation!
Best,
Bruno
[1]
https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams#KIP441:SmoothScalingOutforKafkaStreams-AssignmentAlgorithm
<
https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams#KIP441:SmoothScalingOutforKafkaStreams-AssignmentAlgorithm
<
https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams#KIP441:SmoothScalingOutforKafkaStreams-AssignmentAlgorithm
On 22.02.21 17:42, Levani Kokhreidze wrote:
Hi Bruno,
Thanks for the quick reply
5.Sorry, maybe I am not making it clear.
What you have described is how it should work, yes. As it is stated in
KIP, with the importance semantics in standby.replicas.awareness,
if we have an active task on Node-1 and the first standby task on
Node-5, the third standby should be on Node-3 or Node-6 (both of them
are
in the different AZ compared to the active and the first standby task).
That flow is described in Partially Preferred Distribution section
[1].
Node-4 could have been a valid option IF standby.replicas.awareness
didn’t have the importance semantics because Kafka Streams could have
just
picked Node-4 in that case.
7. Yup, will do.
8. Good question, So far assumption I had was that the configuration
between different Kafka Streams instances is the same. Can we have an
extra
validation check to make sure that is the case?
If not, what you have mentioned in point 5 always preferring the
dimension where there’re enough KS instances is very valid.
On the other hand, I thought allowing to specify the importance of the
various dimensions may give users extra flexibility over standby task
allocation.
[1]
https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+awarness+for+Kafka+Streams#KIP708:RackawarnessforKafkaStreams-PartiallyPreferredStandbyTaskDistribution
Regards,
Levani
On 22. Feb 2021, at 16:51, Bruno Cadonna <br...@confluent.io> wrote:
Hi Levani,
Thanks for the modifications!
I have some follow up questions/comments:
5. Something is not clear to me. If the active is on Node-1 and the
first replica is on Node-5 (different cluster, different zone), why
would
the second replica go to Node-4 that has a different cluster than but
the
same zone as the active instead of Node-6 which has a different zone of
Node-1? In general wouldn't it be better to guarantee under Partially
Preferred task distribution to distribute active and standby replicas of
the same task over the dimension that has at least as many values as the
number of replicas + 1 and then over the dimensions that have less
values?
That would then also be independent on the ordering of the tags.
7. I agree with you. Could you add a sentence or two about this to
the
KIP?
New question:
8. How would the assignor react on different numbers and different
orderings of the tags in standby.replicas.awareness across Streams
clients?
Best,
Bruno
On 22.02.21 11:46, Levani Kokhreidze wrote:
Hi Bruno,
Thanks for the feedback. Please check my answers below:
1. No objections; sounds good. Updated KIP
2. No objections; sounds good. Updated KIP
3. Thanks for the information; I can change KIP only to expose
prefix
method instead of a constant if it’s the way forward.
4. Done. Updated KIP
5. Yes, order in standby.replicas.awareness config counts as stated
in the STANDBY_REPLICA_AWARENESS_DOC.
Actually, it plays a role in Partially Preferred distribution. In
the
example presented in the KIP, while one of the standby tasks can be
placed
in a different cluster and different zone compared to the active task,
we
have to choose either the same cluster or the same zone for the second
standby task. In the first example presented in the KIP, while Node-5
is in
the other cluster and other zone compared to the active task, the second
standby task's preferred options are in different zones than Node-1 and
Node-5, but in the same cluster as active task or the first standby
task.
Without importance semantics in standby.replicas.awareness, putting
second
standby task in Node-4 (different cluster, same zone as active task)
would
have been a valid option.
I’ve updated KIP to clarify this a bit more, I hope this helps.
6. Thanks for pointing that out, it was a mistake. I’ve removed that
phrase from the KIP.
7. It shouldn’t affect HighAvailabilityTaskAssignor in a “breaking
way” meaning that all the existing behavior should stay as is (e.g.,
when
new configurations are not specified). Once required configurations are
set, the main change should happen in
HighAvailabilityTaskAssignor#assignStandbyReplicaTasks and
HighAvailabilityTaskAssignor#assignStandbyTaskMovements
I hope this answers your questions.
Regards,
Levani
On 18. Feb 2021, at 15:10, Bruno Cadonna <br...@confluent.io>
wrote:
Hi Levani,
Thank you for the KIP.
Really interesting!
Here my comments:
1. To be consistent with the other configs that involve standbys ,
I
would rename
standby.task.assignment.awareness -> standby.replicas.awareness
2. I would also rename the prefix
instance.tag -> client.tag
3. The following is a question about prefixes in general that maybe
somebody else can answer. In the config it says for other prefixes that
it
is recommended to use the method *Prefix(final String prop) instead of
the
raw prefix string.
Is the plan to make the raw prefix string private in a future
release?
Should we consider making only the prefix method for this KIP
public?
4. Could you provide a mathematical formula instead of Java code
for
absolute preferred standby task distribution and the other distributtion
properties? Could you also add an example for absolute preffered
distribution for the computation of the formula similar to what you did
for
the other properties?
5. Does the order of the tags given for
standby.task.assignment.awareness count? You mention it once, but then
for
the Partially Preferred standby task distribution property it does not
seem
to be important.
6. In the section about least preferred standby task distribution,
you state that "and one [zone] will be reserved for active task". What
do
you mean by that? All Streams clients will participate in the task
assignment of active tasks irrespective of their tags, right? The
statement
does also not really fit with the example where active stateful task
0_0 is
on Node-1, does it?
7. Could you also say some words about how this KIP affects the
current HighAvailabilityTaskAssignor?
Best,
Bruno
On 09.02.21 15:54, Levani Kokhreidze wrote:
Hello all,
I’ve updated KIP-708 [1] to reflect the latest discussion
outcomes.
I’m looking forward to your feedback.
Regards,
Levani
[1] -
https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+awarness+for+Kafka+Streams
On 2. Feb 2021, at 22:03, Levani Kokhreidze <
levani.co...@gmail.com> wrote:
Hi John.
Thanks a lot for this detailed analysis!
Yes, that is what I had in mind as well.
I also like that idea of having “task.assignment.awareness”
configuration
to tell which instance tags can be used for rack awareness.
I may borrow it for this KIP if you don’t mind :)
Thanks again John for this discussion, it’s really valuable.
I’ll update the proposal and share it once again in this
discussion thread.
Regards,
Levani
On 2. Feb 2021, at 18:47, John Roesler <vvcep...@apache.org
<mailto:vvcep...@apache.org> <mailto:vvcep...@apache.org <mailto:
vvcep...@apache.org>> <mailto:vvcep...@apache.org <mailto:
vvcep...@apache.org> <mailto:vvcep...@apache.org <mailto:
vvcep...@apache.org <mailto:vvcep...@apache.org>>>> <mailto:
vvcep...@apache.org <mailto:vvcep...@apache.org> <mailto:
vvcep...@apache.org <mailto:vvcep...@apache.org>> <mailto:
vvcep...@apache.org <mailto:vvcep...@apache.org> <mailto:
vvcep...@apache.org <mailto:vvcep...@apache.org>>> <mailto:
vvcep...@apache.org <mailto:vvcep...@apache.org> <mailto:
vvcep...@apache.org <mailto:vvcep...@apache.org>> <mailto:
vvcep...@apache.org <mailto:vvcep...@apache.org> <mailto:
vvcep...@apache.org <mailto:vvcep...@apache.org>>>>>> wrote:
Hi Levani,
1. Thanks for the details.
I figured it must be something like this two-dimensional
definition of "rack".
It does seem like, if we make the config take a list of tags, we
can define
the semantics to be that the system will make a best effort to
distribute
the standbys over each rack dimension.
In your example, there are two clusters and three AZs. The
example
configs would be:
Node 1:
instance.tag.cluster: K8s_Cluster1
instance.tag.zone: eu-central-1a
task.assignment.awareness: cluster,zone
Node 2:
instance.tag.cluster: K8s_Cluster1
instance.tag.zone: eu-central-1b
task.assignment.awareness: cluster,zone
Node 3:
instance.tag.cluster: K8s_Cluster1
instance.tag.zone: eu-central-1c
task.assignment.awareness: cluster,zone
Node 4:
instance.tag.cluster: K8s_Cluster2
instance.tag.zone: eu-central-1a
task.assignment.awareness: cluster,zone
Node 5:
instance.tag.cluster: K8s_Cluster2
instance.tag.zone: eu-central-1b
task.assignment.awareness: cluster,zone
Node 6:
instance.tag.cluster: K8s_Cluster2
instance.tag.zone: eu-central-1c
task.assignment.awareness: cluster,zone
Now, if we have a task 0_0 with an active and two replicas,
there are three total copies of the task to distribute over:
* 6 instances
* 2 clusters
* 3 zones
There is a constraint that we _cannot_ assign two copies of a
task
to a single instance, but it seems like the default rack
awareness
would permit us to assign two copies of a task to a rack, if
(and
only
if) the number of copies is greater than the number of racks.
So, the assignment we would get is like this:
* assigned to three different instances
* one copy in each of zone a, b, and c
* two copies in one cluster and one in the other cluster
For example, we might have 0_0 assigned to:
* Node 1 (cluster 1, zone a)
* Node 5 (cluster 2, zone b)
* Node 3 (cluster 1, zone c)
Is that what you were also thinking?
Thanks,
-John
On Tue, Feb 2, 2021, at 02:24, Levani Kokhreidze wrote:
Hi John,
1. Main reason was that it seemed easier change compared to
having
multiple tags assigned to each host.
---
Answering your question what use-case I have in mind:
Lets say we have two Kubernetes clusters running the same Kafka
Streams
application.
And each Kubernetes cluster is spanned across multiple AZ.
So the setup overall looks something like this:
K8s_Cluster1 [eu-central-1a, eu-central-1b, eu-central-1c]
K8s_Cluster2 [eu-central-1a, eu-central-1b, eu-central-1c]
Now, if Kafka Streams application is launched in K8s_Clister1:
eu-central-1a,
ideally I would want standby task to be created in the
different
K8s
cluster and region.
So in this example it can be K8s_Cluster2: [eu-central-1b,
eu-central-1c]
But giving it a bit more thought, this can be implemented if we
change
semantics of “tags” a bit.
So instead of doing full match with tags, we can do iterative
matching
and it should work.
(If this is what you had in mind, apologies for the
misunderstanding).
If we consider the same example as mentioned above, for the
active task
we would
have following tags: [K8s_Cluster1, eu-central-1a]. In order to
distribute standby task
in the different K8s cluster, plus in the different AWS region,
standby
task assignment
algorithm can compare each tag by index. So steps would be
something
like:
// this will result in selecting client in the different K8s
cluster
1. clientsInDifferentCluster = (tagsOfActiveTask[0] !=
allClientTags[0])
// this will result in selecting the client in different AWS
region
2. selectedClientForStandbyTask = (tagsOfActiveTask[1] !=
clientsInDifferentCluster[1] )
WDYT?
If you agree with the use-case I’ve mentioned, the pluggable
assignor
can be differed to another KIP, yes.
As it won’t be required for this KIP and use-cases I had in
mind
to
work.
Regards,
Levani
On 2. Feb 2021, at 07:55, John Roesler <vvcep...@apache.org
<mailto:vvcep...@apache.org> <mailto:vvcep...@apache.org <mailto:
vvcep...@apache.org>> <mailto:vvcep...@apache.org <mailto:
vvcep...@apache.org> <mailto:vvcep...@apache.org <mailto:
vvcep...@apache.org <mailto:vvcep...@apache.org>>>> <mailto:
vvcep...@apache.org <mailto:vvcep...@apache.org> <mailto:
vvcep...@apache.org <mailto:vvcep...@apache.org>> <mailto:
vvcep...@apache.org <mailto:vvcep...@apache.org> <mailto:
vvcep...@apache.org <mailto:vvcep...@apache.org>>> <mailto:
vvcep...@apache.org <mailto:vvcep...@apache.org> <mailto:
vvcep...@apache.org <mailto:vvcep...@apache.org>> <mailto:
vvcep...@apache.org <mailto:vvcep...@apache.org> <mailto:
vvcep...@apache.org <mailto:vvcep...@apache.org>>>>>> wrote:
Hello Levani,
Thanks for the reply.
1. Interesting; why did you change your mind?
I have a gut feeling that we can achieve pretty much any rack
awareness need that people have by using purely config, which is
obviously
much easier to use. But if you had a case in mind where this wouldn’t
work,
it would be good to know.
In fact, if that is true, then perhaps you could just defer
the
whole idea of a pluggable interface (point 2) to a separate KIP. I do
think
a pluggable assignor would be extremely valuable, but it might be nice
to
cut the scope of KIP-708 if just a config will suffice.
What do you think?
Thanks,
John
On Mon, Feb 1, 2021, at 06:07, Levani Kokhreidze wrote:
Hi John,
Thanks a lot for thorough feedback, it’s really valuable.
1. Agree with this. Had the same idea initially.
We can set some upper limit in terms of what’s
the max number of tags users can set to make
sure it’s not overused. By default, we can create
standby tasks where tags are different from active task (full
match).
This should mimic default rack awareness behaviour.
2. I like the idea and I’d be happy to work on
refactoring TaskAssignor to accommodate rack awareness
use-case.
When I was going through the code, it felt way more natural
to use pluggable TaskAssignor for achieving rack awareness
instead of introducing new interface and contract.
But I thought approach mentioned in the KIP is simpler so
decided to move forward with it as an initial proposal :).
But I agree with you, it will be much better if we can have
TaskAssignor as pluggable interface users can use.
One potential challenge I see with this is that, if we just
let
users implement TaskAssignor in its current form, we will be
forcing
users to implement functionality for active task assignment,
as well as
standby task assignment. This feels like not very clear
contract,
because with
just TaskAssignor interface it’s not really clear they one
needs to
allocate
standby tasks as well. We can enforce it on some level with
the return
object
You’ve mentioned TaskAssignor#assign has to return, but still
feels
error prone.
In addition, I suspect in most of the cases users would want
to control standby task assignment and leave active task
assignment as
is.
To make implementation of standby task assignment easier for
users,
what if
we decouple active and standby task assignment from the
`TaskAssignor`?
Idea I have in mind is to split TaskAssignor into
ActiveTaskAssignor
and StandbyTaskAssignor
and let users add their own implementation for them
separately
if they
like via config.
If this approach sounds reasonable, I’ll work on updating KIP
this week.
Thanks,
Levani
On 28. Jan 2021, at 19:20, John Roesler <
vvcep...@apache.org
<mailto:vvcep...@apache.org> <mailto:vvcep...@apache.org <mailto:
vvcep...@apache.org>> <mailto:vvcep...@apache.org <mailto:
vvcep...@apache.org> <mailto:vvcep...@apache.org <mailto:
vvcep...@apache.org <mailto:vvcep...@apache.org>>>> <mailto:
vvcep...@apache.org <mailto:vvcep...@apache.org> <mailto:
vvcep...@apache.org <mailto:vvcep...@apache.org>> <mailto:
vvcep...@apache.org <mailto:vvcep...@apache.org> <mailto:
vvcep...@apache.org <mailto:vvcep...@apache.org>>> <mailto:
vvcep...@apache.org <mailto:vvcep...@apache.org> <mailto:
vvcep...@apache.org <mailto:vvcep...@apache.org>> <mailto:
vvcep...@apache.org <mailto:vvcep...@apache.org> <mailto:
vvcep...@apache.org <mailto:vvcep...@apache.org>>>>>> wrote:
Thanks, Levani!
I was reflecting more on your KIP last night.
One thing I should mention is that I have previously used
the rack awareness feature of Elasticsearch, and found it to
be pretty intuitive and also capable of what we needed in
our AWS clusters. As you look at related work, you might
take ES into consideration.
I was also had some thoughts about your proposal.
1. I'm wondering if we instead allow people to add arbitrary
tags to each host, and then have a configuration to specify
a combination of tags to use for rack awareness. This seems
easier to manage than for the use case you anticipate where
people would concatenate rackId = (clusterId + AZ), and then
have to parse the rackId back out to compute the assignment.
2. About the proposed RackAwareStandbyTaskAssignor, I'm
wondering if we can change the level of abstraction a little
bit and capture even more value here. One thing we wanted to
do in KIP-441, but decided to cut from the scope, was to
define a public TaskAssignor interface so that people can
plug in the whole task assignment algorithm.
In fact, there is already an internal config and interface
for this (`internal.task.assignor.class`:
`org.apache.kafka.streams.processor.internals.assignment.Tas
kAssignor`).
We kept that interface and config internal because the
current TaskAssignor interface has a number of flaws, but if
we correct those flaws, we can offer a nice public interface
that people can use to control the standby allocation, but
also active task allocation, based on the tags I suggested
in (1).
I don't think we need too much work to refactor
TaskAssignor, the main problems are that the assign method
mutates its input and that it doesn't expose the full
metadata from the cluster members. Therefore, if you like
this idea, we should propose to refactor TaskAssignor with:
* input: an immutable description of the cluster, including
current lags of all stateful tasks and current stateless
task assignments, as well as metadata for each host.
* output: an object describing the new assignment as well as
a flag on whether to schedule a followup probing rebalance.
An even more stretchy stretch goal would be to include
metadata of the brokers, which could be used to achieve
higher levels of rack awareness. For example, we could co-
locate tasks in the same "rack" (AZ) as the partition leader
for their input or output topics, to minimize cross-AZ
traffic. I'm not sure to what extent clients can learn the
relevant broker metadata, so this stretch might not be
currently feasible, but as long as we design the
TaskAssignor for extensibility, we can do something like
this in the future.
Thanks again for this proposal, I hope the above is more
inspiring than annoying :)
I really think your KIP is super high value in whatever form
you ultimately land on.
Thanks,
John
On Thu, 2021-01-28 at 13:08 +0200, Levani Kokhreidze wrote:
Hi John
Thanks for the feedback (and for the great work on KIP441
:)
).
Makes sense, will add a section in the KIP explaining rack
awarenesses on high level and how it’s implemented in the different
distributed systems.
Thanks,
Levani
On 27. Jan 2021, at 16:07, John Roesler <
vvcep...@apache.org <mailto:vvcep...@apache.org> <mailto:
vvcep...@apache.org <mailto:vvcep...@apache.org>> <mailto:
vvcep...@apache.org <mailto:vvcep...@apache.org> <mailto:
vvcep...@apache.org <mailto:vvcep...@apache.org> <mailto:
vvcep...@apache.org <mailto:vvcep...@apache.org>>>> <mailto:
vvcep...@apache.org <mailto:vvcep...@apache.org> <mailto:
vvcep...@apache.org <mailto:vvcep...@apache.org>> <mailto:
vvcep...@apache.org <mailto:vvcep...@apache.org> <mailto:
vvcep...@apache.org <mailto:vvcep...@apache.org>>> <mailto:
vvcep...@apache.org <mailto:vvcep...@apache.org> <mailto:
vvcep...@apache.org <mailto:vvcep...@apache.org>> <mailto:
vvcep...@apache.org <mailto:vvcep...@apache.org> <mailto:
vvcep...@apache.org <mailto:vvcep...@apache.org>>>>>> wrote:
Hi Levani,
Thanks for this KIP! I think this is really high value; it
was something I was disappointed I didn’t get to do as part of KIP-441.
Rack awareness is a feature provided by other distributed
systems as well. I wonder if your KIP could devote a section to
summarizing
what rack awareness looks like in other distributed systems, to help us
put
this design in context.
Thanks!
John
On Tue, Jan 26, 2021, at 16:46, Levani Kokhreidze wrote:
Hello all,
I’d like to start discussion on KIP-708 [1] that aims to
introduce rack
aware standby task distribution in Kafka Streams.
In addition to changes mentioned in the KIP, I’d like to
get some ideas
on additional change I have in mind.
Assuming KIP moves forward, I was wondering if it makes
sense to
configure Kafka Streams consumer instances with the rack
ID passed with
the new StreamsConfig#RACK_ID_CONFIG property.
In practice, that would mean that when “rack.id <
http://rack.id/> <http://rack.id/ <http://rack.id/>> <http://rack.id/ <
http://rack.id/> <http://rack.id/ <http://rack.id/>>> <http://rack.id/
<
http://rack.id/> <http://rack.id/ <http://rack.id/>> <http://rack.id/ <
http://rack.id/ <http://rack.id/>> <http://rack.id/ <http://rack.id/> <
http://rack.id/ <http://rack.id/>>>>> <http://rack.id/ <http://rack.id/> <
http://rack.id/ <http://rack.id/>> <http://rack.id/ <http://rack.id/> <
http://rack.id/ <http://rack.id/>>> <http://rack.id/ <http://rack.id/> <
http://rack.id/ <http://rack.id/>> <http://rack.id/ <http://rack.id/> <
http://rack.id/ <http://rack.id/>>>> <http://rack.id/ <http://rack.id/> <
http://rack.id/ <http://rack.id/>> <http://rack.id/ <http://rack.id/> <
http://rack.id/ <http://rack.id/>>> <http://rack.id/ <http://rack.id/> <
http://rack.id/ <http://rack.id/>> <http://rack.id/ <http://rack.id/> <
http://rack.id/ <http://rack.id/>>>>>>” is
configured in Kafka Streams, it will automatically
translate into
ConsumerConfig#CLIENT_RACK_ID config for all the
KafkaConsumer clients
that is used by Kafka Streams internally.
[1]
https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor
<
https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor
<
https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor
<
https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor
<
https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor
<
https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor
<
https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor
<
https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor
<
https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor
<
https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor
<
https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor
<
https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor
<
https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor
<
https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor
<
https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor
<
https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor
<
https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor
<
https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor
<
https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor
<
https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor
<
https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor
<
https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor
<
https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor
<
https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor
<
https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor
<
https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor
<
https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor
<
https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor
<
https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor
<
https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor
<
https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor
<
https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor
<
https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor
<
https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor
<
https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor
<
https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor
<
https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor
<
https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor
<
https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor
<
https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor
<
https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor
<
https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor
<
https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor
<
https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor
<
https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor
<
https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor
<
https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor
<
https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor
P.S
I have draft PR ready, if it helps the discussion moving
forward, I can
provide the draft PR link in this thread.
Regards,
Levani