Hi John,

Thanks a lot for thorough feedback, it’s really valuable.

1. Agree with this. Had the same idea initially.
We can set some upper limit in terms of what’s 
the max number of tags users can set to make 
sure it’s not overused. By default, we can create 
standby tasks where tags are different from active task (full match). 
This should mimic default rack awareness behaviour.

2. I like the idea and I’d be happy to work on 
refactoring TaskAssignor to accommodate rack awareness use-case. 
When I was going through the code, it felt way more natural 
to use pluggable TaskAssignor for achieving rack awareness 
instead of introducing new interface and contract. 
But I thought approach mentioned in the KIP is simpler so 
decided to move forward with it as an initial proposal :). 
But I agree with you, it will be much better if we can have 
TaskAssignor as pluggable interface users can use.
One potential challenge I see with this is that, if we just let
users implement TaskAssignor in its current form, we will be forcing
users to implement functionality for active task assignment, as well as
standby task assignment. This feels like not very clear contract, because with
just TaskAssignor interface it’s not really clear they one needs to allocate 
standby tasks as well. We can enforce it on some level with the return object
You’ve mentioned TaskAssignor#assign has to return, but still feels error prone.
In addition, I suspect in most of the cases users would want
to control standby task assignment and leave active task assignment as is. 
To make implementation of standby task assignment easier for users, what if
we decouple active and standby task assignment from the `TaskAssignor`?
Idea I have in mind is to split TaskAssignor into ActiveTaskAssignor and 
StandbyTaskAssignor
and let users add their own implementation for them separately if they like via 
config.

If this approach sounds reasonable, I’ll work on updating KIP this week. 

Thanks,
Levani

> On 28. Jan 2021, at 19:20, John Roesler <vvcep...@apache.org> wrote:
> 
> Thanks, Levani!
> 
> I was reflecting more on your KIP last night.
> 
> One thing I should mention is that I have previously used
> the rack awareness feature of Elasticsearch, and found it to
> be pretty intuitive and also capable of what we needed in
> our AWS clusters. As you look at related work, you might
> take ES into consideration.
> 
> I was also had some thoughts about your proposal.
> 
> 1. I'm wondering if we instead allow people to add arbitrary
> tags to each host, and then have a configuration to specify
> a combination of tags to use for rack awareness. This seems
> easier to manage than for the use case you anticipate where
> people would concatenate rackId = (clusterId + AZ), and then
> have to parse the rackId back out to compute the assignment.
> 
> 2. About the proposed RackAwareStandbyTaskAssignor, I'm
> wondering if we can change the level of abstraction a little
> bit and capture even more value here. One thing we wanted to
> do in KIP-441, but decided to cut from the scope, was to
> define a public TaskAssignor interface so that people can
> plug in the whole task assignment algorithm.
> 
> In fact, there is already an internal config and interface
> for this (`internal.task.assignor.class`:
> `org.apache.kafka.streams.processor.internals.assignment.Tas
> kAssignor`).
> 
> We kept that interface and config internal because the
> current TaskAssignor interface has a number of flaws, but if
> we correct those flaws, we can offer a nice public interface
> that people can use to control the standby allocation, but
> also active task allocation, based on the tags I suggested
> in (1).
> 
> I don't think we need too much work to refactor
> TaskAssignor, the main problems are that the assign method
> mutates its input and that it doesn't expose the full
> metadata from the cluster members. Therefore, if you like
> this idea, we should propose to refactor TaskAssignor with:
> * input: an immutable description of the cluster, including
> current lags of all stateful tasks and current stateless
> task assignments, as well as metadata for each host.
> * output: an object describing the new assignment as well as
> a flag on whether to schedule a followup probing rebalance.
> 
> An even more stretchy stretch goal would be to include
> metadata of the brokers, which could be used to achieve
> higher levels of rack awareness. For example, we could co-
> locate tasks in the same "rack" (AZ) as the partition leader
> for their input or output topics, to minimize cross-AZ
> traffic. I'm not sure to what extent clients can learn the
> relevant broker metadata, so this stretch might not be
> currently feasible, but as long as we design the
> TaskAssignor for extensibility, we can do something like
> this in the future.
> 
> Thanks again for this proposal, I hope the above is more
> inspiring than annoying :)
> 
> I really think your KIP is super high value in whatever form
> you ultimately land on.
> 
> 
> Thanks,
> John
> 
> On Thu, 2021-01-28 at 13:08 +0200, Levani Kokhreidze wrote:
>> Hi John
>> 
>> Thanks for the feedback (and for the great work on KIP441 :) ). 
>> Makes sense, will add a section in the KIP explaining rack awarenesses on 
>> high level and how it’s implemented in the different distributed systems.
>> 
>> Thanks,
>> Levani
>> 
>>> On 27. Jan 2021, at 16:07, John Roesler <vvcep...@apache.org> wrote:
>>> 
>>> Hi Levani,
>>> 
>>> Thanks for this KIP! I think this is really high value; it was something I 
>>> was disappointed I didn’t get to do as part of KIP-441.
>>> 
>>> Rack awareness is a feature provided by other distributed systems as well. 
>>> I wonder if your KIP could devote a section to summarizing what rack 
>>> awareness looks like in other distributed systems, to help us put this 
>>> design in context. 
>>> 
>>> Thanks!
>>> John
>>> 
>>> 
>>> On Tue, Jan 26, 2021, at 16:46, Levani Kokhreidze wrote:
>>>> Hello all,
>>>> 
>>>> I’d like to start discussion on KIP-708 [1] that aims to introduce rack 
>>>> aware standby task distribution in Kafka Streams.
>>>> In addition to changes mentioned in the KIP, I’d like to get some ideas 
>>>> on additional change I have in mind. 
>>>> Assuming KIP moves forward, I was wondering if it makes sense to 
>>>> configure Kafka Streams consumer instances with the rack ID passed with 
>>>> the new StreamsConfig#RACK_ID_CONFIG property. 
>>>> In practice, that would mean that when “rack.id <http://rack.id/>” is 
>>>> configured in Kafka Streams, it will automatically translate into 
>>>> ConsumerConfig#CLIENT_RACK_ID config for all the KafkaConsumer clients 
>>>> that is used by Kafka Streams internally.
>>>> 
>>>> [1] 
>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor
>>>>  
>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor>
>>>> 
>>>> P.S 
>>>> I have draft PR ready, if it helps the discussion moving forward, I can 
>>>> provide the draft PR link in this thread.
>>>> 
>>>> Regards, 
>>>> Levani
>> 
> 
> 

Reply via email to