Hey Omnia,

I was doing a bit of snooping (I actually just get updates for the KIP
page) and I saw this draft was in progress. I shared it with some of my
colleagues as well who I previously discussed the issue with.

The initial look was pretty promising to me. I appreciate the detailing of
the rejected options since we had quite a few we worked through :)

One question I have is how will we handle a scenario where potentially each
new client has a new Kafka Principal? Is that simply not covered by
throttling?

Thanks,
Justine

On Wed, May 31, 2023 at 10:08 AM Omnia Ibrahim <o.g.h.ibra...@gmail.com>
wrote:

> Hi Justine and Luke,
>
> I started a KIP draft here
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-936%3A+Throttle+number+of+active+PIDs
> for a proposal would appreciate it if you could provide any initial
> feedback before opening a broader discussion.
>
> Thanks
>
> On Wed, Feb 22, 2023 at 4:35 PM Omnia Ibrahim <o.g.h.ibra...@gmail.com>
> wrote:
>
>>
>> *Hi Justine, *
>>
>> *My initial thought of throttling the initProducerId was to get ripped
>> off the problem at the source (which creates too many PIDs per client) and
>> fail faster but if having this on the produce request level is easier this
>> should be fine. I am guessing it will be the same direction as we may
>> ClientQuotaManage for Produce throttling with a different quota window than
>> `quota.window.size.seconds `. *
>>
>> *If this is good as an initial solution I can put start a KIP and see
>> what the wider community feels about this. *
>>
>> *Also, I noticed that at some point one of us hit "Replay" instead of
>> "Replay to All" :)  So here are the previous conversations*
>>
>> *On Wed, Feb 15, 2023 at 12:20 AM Justine Olshan <jols...@confluent.io
>> <jols...@confluent.io>> wrote:*
>>
>>> Hey Omnia,
>>>
>>> Thanks for the response. I think I understand your explanations here
>>> with respect to principal and clientId usage.
>>>
>>> For the throttling -- handleInitProducerIdRequest will allocate the ID
>>> to the producer, but we don't actually store it on the broker or increment
>>> our metric until the first produce request for that producer is sent (or
>>> sent again after previously expiring). Would you consider throttling the
>>> produce request instead? It may be hard to get any metrics from the
>>> transaction coordinator where the initProducerId request is handled.
>>>
>>> Justine
>>
>>
>> *On Tue, Feb 14, 2023 at 9:29 AM Omnia Ibrahim <o.g.h.ibra...@gmail.com
>> <o.g.h.ibra...@gmail.com>> wrote:*
>>
>>> Hey Justine,
>>> > If I understand your message correctly, there are issues with
>>> identifying the source of the rogue clients? So you propose to add a new
>>> metric for that?
>>> > And also proposing to throttle based on clientId as a potential follow
>>> up?
>>> I want to identify rogue clients by KafkaPrincipal (and/or clientId)
>>> similarly to how we identify clients in Fetch/Produce/Request
>>> QuotaManagers. Using KafkaPrincipal should give cluster admin the ability
>>> to throttle later based on principal which is most likely to be a smaller
>>> set than clientIds. My initial thought was to add a metrics that represent
>>> how many InitProducerIDRequest are sent by KafkaPrincipal (and/or clientId)
>>> similar to Fetch/Produce QuotaManagers.
>>> Then as a follow-up, we can throttle based on either KafkaPrinciple or
>>> clientId (maybe default as well to align this with other QuotaManagers in
>>> Kafka).
>>>
>>> >1. Does we rely on the client using the same ID? What if there are many
>>> clients that all use different client IDs?
>>> This is why I want to use the combination of KafkaPrincipal or clientId
>>> similar to some other quotas we have in Kafka already. This will be a
>>> similar risk to Fetch/Produce quota in Kafka which also relay on the client
>>> to use the same clientId and KafkaPrincipal.
>>>
>>> >2. Are there places where high cardinality of this metric is a concern?
>>> I can imagine many client IDs in the system. Would we treat this as a rate
>>> metric (ie, when we get an init producer ID and return a new producer ID we
>>> emit a count for that client id?) Or something else?
>>> My initial thought here was to follow the steps of ClientQuotaManager
>>> and ClientRequestQuotaManager and use a rate metric. However, I think we
>>> can emit it either
>>>
>>>    1. when we return the new PID. However, I have concerns that we may
>>>    circle back to the previous concerns with OMM due to keeping track of
>>>    ACTIVE PIDs per KafkaPrincipal(and/or) clientId in the future. Also this
>>>    would be the first time Kafka throttle IDs for any client.
>>>    2. or once we recieve initProducerIDRequest and throttle before even
>>>    hitting `handleInitProducerIdRequest`. Going this direction we may need 
>>> to
>>>    throttle it within a different quota window than `
>>>    quota.window.size.seconds ` as throttling INIT_PRODUCER_ID request
>>>    per second wouldn't be efficient for most cases. I personally think this
>>>    direction is easier as it seems more consistent with the existing quota
>>>    implementation. Specially that Kafka has already the concept of 
>>> throttling
>>>    subset of requests (in ControllerMutationQuotaManager) but never had any
>>>    concept of throttling active IDs for any client.
>>>
>>>
>>> What do you think?
>>>
>>> Thanks
>>> Omnia
>>>
>>
>> *On Thu, Feb 2, 2023 at 5:39 PM Justine Olshan <jols...@confluent.io
>> <jols...@confluent.io>> wrote:*
>>
>>> Hey Omnia,
>>> Sorry for losing track of this.
>>>
>>> If I understand your message correctly, there are issues with
>>> identifying the source of the rogue clients? So you propose to add a new
>>> metric for that?
>>> And also proposing to throttle based on clientId as a potential follow
>>> up?
>>>
>>> I think both of these make sense. The only things I can think of for the
>>> metric are:
>>> 1. Does we rely on the client using the same ID? What if there are many
>>> clients that all use different client IDs?
>>> 2. Are there places where high cardinality of this metric is a concern?
>>> I can imagine many client IDs in the system. Would we treat this as a rate
>>> metric (ie, when we get an init producer ID and return a new producer ID we
>>> emit a count for that client id?) Or something else?
>>>
>>> Thanks,
>>> Justine
>>>
>>
>> Thanks
>> Omnia
>>
>> On Thu, Feb 2, 2023 at 4:44 PM Omnia Ibrahim <o.g.h.ibra...@gmail.com>
>> wrote:
>>
>>> Hi Luke and Justine,
>>> Are there any thoughts or updates on this? I would love to help with
>>> this as we are hitting this more frequently now.
>>>
>>> best,
>>>
>>> On Mon, Oct 31, 2022 at 6:15 PM Omnia Ibrahim <o.g.h.ibra...@gmail.com>
>>> wrote:
>>>
>>>> Hi Luke and Justine,
>>>>
>>>>> For (3), you said:
>>>>> > - I have some concerns about the impact of this option on the
>>>>> transactional
>>>>> producers, for example, what will happen to an ongoing transaction
>>>>> associated with an expired PID? Would this leave the transactions in a
>>>>> "hanging" state?
>>>>>
>>>>> - How will we notify the client that the transaction can't continue
>>>>> due to
>>>>> an expired PID?
>>>>>
>>>>> - If PID got marked as `expired` this will mean that
>>>>> `admin.DescribeProducers` will not list them which will make
>>>>> *`kafka-transactions.sh
>>>>> --list`* a bit tricky as we can't identify if there are transactions
>>>>> linked
>>>>> to this expired PID or not. The same concern applies to
>>>>> *`kafka-transactions.sh
>>>>> --find-hanging`*.
>>>>>
>>>>> --> Yes, you're right. Those are also concerns for this solution.
>>>>> Currently, there's no way to notify clients about the expiration.
>>>>> Also, the ongoing transactions will be hanging. For the admin cli,
>>>>> we've
>>>>> never thought about it. Good point.
>>>>> In summary, to adopt this solution, there are many issues needed to get
>>>>> fixed.
>>>>>
>>>>
>>>> Justin already clarified that if PID is attached to a transaction it
>>>> will not expire so identifying the transactions shouldn't be a concern
>>>> anymore.
>>>> The only concern here will be that this solution will not solve the
>>>> problem if the rouge producer is a transactional producer with hanging
>>>> transactions.
>>>> If anyone faced this situation they will need to abort the hanging
>>>> transactions manually and then the solution to expire a PID can then work.
>>>>
>>>> --> Yes, I mean KafkaPrinciple (sorry, I didn't make it clear)
>>>>> Yes, We were thinking about throttling by KafkaPrinciple. Client Id is
>>>>> also
>>>>> workable.
>>>>> It's just these 2 attributes are not required.
>>>>> That is, it's possible we take all clients as the same one: {default
>>>>> KafkaPrinciple + default clientID}, and apply throttling on it.
>>>>> Do you have any thoughts about it?
>>>>> Maybe skip throttling for {default KafkaPrinciple + default clientID}
>>>>>
>>>>
>>>> Throttling for default KafkaPrinciple and default ClientID is useful
>>>> when we need to have a hard limit on the whole cluster and whoever is
>>>> running the cluster doesn't knowclientsntIDs or if a KafkaPrinciple is
>>>> reused between different producer applications.
>>>> I usually find it helpful to have a way to apply throttling only on the
>>>> rough clients only once I identify them without punishing everyone on the
>>>> cluster. However, there are two problems with this
>>>> - There's no easy way at the moment to link PIDs to clientId or
>>>> KafkaPrinciple. This need to be addressed first.
>>>> - Is Justin's comment on the throttling, and the fact that will mean we
>>>> either block all requests or have to store the request in memory which in
>>>> both cases has side downs on the producer experince.
>>>>
>>>> I recently had another discussion with my team and it does seem like
>>>>> there
>>>>> should be a way to make it more clear to the clients what is going on.
>>>>> A
>>>>> lot of this protocol is implicit. I'm wondering if maybe there is a
>>>>> way to
>>>>> improve the story for newer clients. (Ie if we choose to expire based
>>>>> on a
>>>>> size limit, we should include a response indicating the ID has
>>>>> expired.) We
>>>>> also discussed ways to redefine the guarantees so that users who have
>>>>> stronger idempotency requirements can ensure them (over
>>>>> availability/memory
>>>>> concerns). Let me know if you have any ideas here.
>>>>>
>>>>
>>>> It may be easier to improve the experience for new clients. However, if
>>>> we improved only the new clients we may need a way to help teams who run
>>>> Kafka with rough clients on old versions by at least giving them an easy
>>>> way to identify the clientId/ or KafkaPrinciple that generated these PIDs.
>>>>
>>>> For context, it's very tricky to even identify which clientId is
>>>> creating all these PIDs that caused OOM, which is a contributing part of
>>>> the issue at the moment. So maybe one option here could be adding a new
>>>> metric that tracks the number of generated PIDs per clientId. This will
>>>> help the team who runs the Kafka cluster to
>>>> - contact these rough clients and ask them to fix their clients or
>>>> upgrade to a new client if the new client version has a better experience.
>>>> - or if ended with a throttling solution this may help identify which
>>>> clientId needs to be throttled.
>>>>
>>>> Maybe we can start with a solution for identifying the rough clients
>>>> first and keep looking for a solution to limit them, what do you think?
>>>>
>>>> Thanks
>>>>
>>>> On Tue, Oct 18, 2022 at 5:24 PM Justine Olshan
>>>> <jols...@confluent.io.invalid> wrote:
>>>>
>>>>> Oops.  I realized I just replied to Omnia 🤦‍♀️
>>>>>
>>>>> Here was my response for the mailing thread:
>>>>>
>>>>> Hey Omnia,
>>>>> Sorry to hear this is a problem for you as well. :(
>>>>> > * I have some concerns about the impact of this option on the
>>>>> transactional producers, for example, what will happen to an ongoing
>>>>> transaction associated with an expired PID? Would this leave the
>>>>> transactions in a "hanging" state?*
>>>>> We currently check if a transaction is ongoing and do not expire the
>>>>> producer ID if it has an ongoing transaction. I suspect we will
>>>>> continue to
>>>>> do this with any solution we pick.
>>>>>
>>>>> My team members and I looked a bit into the throttling case and it can
>>>>> get
>>>>> a bit tricky since it means we need to throttle the produce request
>>>>> before
>>>>> it is processed. This means we either block all requests or have to
>>>>> store
>>>>> the request in memory (which is not great if we are trying to save
>>>>> memory).
>>>>>
>>>>> I recently had another discussion with my team and it does seem like
>>>>> there
>>>>> should be a way to make it more clear to the clients what is going on.
>>>>> A
>>>>> lot of this protocol is implicit. I'm wondering if maybe there is a
>>>>> way to
>>>>> improve the story for newer clients. (Ie if we choose to expire based
>>>>> on a
>>>>> size limit, we should include a response indicating the ID has
>>>>> expired.) We
>>>>> also discussed ways to redefine the guarantees so that users who have
>>>>> stronger idempotency requirements can ensure them (over
>>>>> availability/memory
>>>>> concerns). Let me know if you have any ideas here.
>>>>>
>>>>> Thanks again for commenting here, hopefully we can come to a good
>>>>> solution.
>>>>>
>>>>> On Tue, Oct 18, 2022 at 1:11 AM Luke Chen <show...@gmail.com> wrote:
>>>>>
>>>>> > Hi Omnia,
>>>>> >
>>>>> > Thanks for your reply.
>>>>> >
>>>>> > For (3), you said:
>>>>> > > - I have some concerns about the impact of this option on the
>>>>> > transactional
>>>>> > producers, for example, what will happen to an ongoing transaction
>>>>> > associated with an expired PID? Would this leave the transactions in
>>>>> a
>>>>> > "hanging" state?
>>>>> >
>>>>> > - How will we notify the client that the transaction can't continue
>>>>> due to
>>>>> > an expired PID?
>>>>> >
>>>>> > - If PID got marked as `expired` this will mean that
>>>>> > `admin.DescribeProducers` will not list them which will make
>>>>> > *`kafka-transactions.sh
>>>>> > --list`* a bit tricky as we can't identify if there are transactions
>>>>> linked
>>>>> > to this expired PID or not. The same concern applies to
>>>>> > *`kafka-transactions.sh
>>>>> > --find-hanging`*.
>>>>> >
>>>>> > --> Yes, you're right. Those are also concerns for this solution.
>>>>> > Currently, there's no way to notify clients about the expiration.
>>>>> > Also, the ongoing transactions will be hanging. For the admin cli,
>>>>> we've
>>>>> > never thought about it. Good point.
>>>>> > In summary, to adopt this solution, there are many issues needed to
>>>>> get
>>>>> > fixed.
>>>>> >
>>>>> >
>>>>> > For (5), you said:
>>>>> > > I am assuming you mean KafkaPrincipal here! If so is your concern
>>>>> here
>>>>> > that
>>>>> > those good clients that use the same principal as a rogue one will
>>>>> get
>>>>> > throttled?
>>>>> >
>>>>> > If this is the case, then I believe it should be okay as other
>>>>> throttling
>>>>> > in Kafka on *`/config/users/<user>`* has the same behaviour.
>>>>> >
>>>>> >
>>>>> > What about applying limit/throttling to
>>>>> > *`/config/users/<user>/clients/<client-id>`
>>>>> > *similar to what we have with client quota? This should reduce the
>>>>> concern
>>>>> > about throttling good clients, right?
>>>>> >
>>>>> > --> Yes, I mean KafkaPrinciple (sorry, I didn't make it clear)
>>>>> > Yes, We were thinking about throttling by KafkaPrinciple. Client Id
>>>>> is
>>>>> > also workable.
>>>>> > It's just these 2 attributes are not required.
>>>>> > That is, it's possible we take all clients as the same one: {default
>>>>> > KafkaPrinciple + default clientID}, and apply throttling on it.
>>>>> > Do you have any thoughts about it?
>>>>> > Maybe skip throttling for {default KafkaPrinciple + default
>>>>> clientID} ?
>>>>> >
>>>>> > Luke
>>>>> >
>>>>> >
>>>>> >
>>>>> > On Sat, Oct 15, 2022 at 2:33 AM Omnia Ibrahim <
>>>>> o.g.h.ibra...@gmail.com>
>>>>> > wrote:
>>>>> >
>>>>> >> Hi Luke & Justine,
>>>>> >> Thanks for looking into this issue, we have been experiencing this
>>>>> because
>>>>> >> of rouge clients as well.
>>>>> >>
>>>>> >> > 3. Having a limit to the number of active producer IDs (sort of
>>>>> like an
>>>>> >> LRU
>>>>> >> >cache)
>>>>> >> >-> The idea here is that if we hit a misconfigured client, we will
>>>>> expire
>>>>> >> >the older entries. The concern here is we have risks to lose
>>>>> idempotency
>>>>> >> >guarantees, and currently, we don't have a way to notify clients
>>>>> about
>>>>> >> >losing idempotency guarantees. Besides, the least  recently used
>>>>> entries
>>>>> >> >got removed are not always from the "bad" clients.
>>>>> >>
>>>>> >> - I have some concerns about the impact of this option on the
>>>>> >> transactional
>>>>> >> producers, for example, what will happen to an ongoing transaction
>>>>> >> associated with an expired PID? Would this leave the transactions
>>>>> in a
>>>>> >> "hanging" state?
>>>>> >>
>>>>> >> - How will we notify the client that the transaction can't continue
>>>>> due to
>>>>> >> an expired PID?
>>>>> >>
>>>>> >> - If PID got marked as `expired` this will mean that
>>>>> >> `admin.DescribeProducers` will not list them which will make
>>>>> >> *`kafka-transactions.sh
>>>>> >> --list`* a bit tricky as we can't identify if there are transactions
>>>>> >> linked
>>>>> >> to this expired PID or not. The same concern applies to
>>>>> >> *`kafka-transactions.sh
>>>>> >> --find-hanging`*.
>>>>> >>
>>>>> >>
>>>>> >> >5. limit/throttling the producer id based on the principle
>>>>> >> >-> Although we can limit the impact to a certain principle with
>>>>> this
>>>>> >> idea,
>>>>> >> >same concern still exists as solution #1 #2.
>>>>> >>
>>>>> >> I am assuming you mean KafkaPrincipal here! If so is your concern
>>>>> here
>>>>> >> that
>>>>> >> those good clients that use the same principal as a rogue one will
>>>>> get
>>>>> >> throttled?
>>>>> >>
>>>>> >> If this is the case, then I believe it should be okay as other
>>>>> throttling
>>>>> >> in Kafka on *`/config/users/<user>`* has the same behaviour.
>>>>> >>
>>>>> >>
>>>>> >> What about applying limit/throttling to
>>>>> >> *`/config/users/<user>/clients/<client-id>`
>>>>> >> *similar to what we have with client quota? This should reduce the
>>>>> concern
>>>>> >> about throttling good clients, right?
>>>>> >>
>>>>> >> best,
>>>>> >>
>>>>> >> Omnia
>>>>> >>
>>>>> >> On Tue, Oct 11, 2022 at 4:18 AM Luke Chen <show...@gmail.com>
>>>>> wrote:
>>>>> >>
>>>>> >> > Bump this thread to see if there are any comments/thoughts.
>>>>> >> > Thanks.
>>>>> >> >
>>>>> >> > Luke
>>>>> >> >
>>>>> >> > On Mon, Sep 26, 2022 at 11:06 AM Luke Chen <show...@gmail.com>
>>>>> wrote:
>>>>> >> >
>>>>> >> > > Hi devs,
>>>>> >> > >
>>>>> >> > > As stated in the motivation section in KIP-854
>>>>> >> > > <
>>>>> >> >
>>>>> >>
>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-854+Separate+configuration+for+producer+ID+expiry
>>>>> >> > >:
>>>>> >> > >
>>>>> >> > > With idempotent producers becoming the default in Kafka, this
>>>>> means
>>>>> >> that
>>>>> >> > > unless otherwise specified, all new producers will be given
>>>>> producer
>>>>> >> IDs.
>>>>> >> > > Some (inefficient) applications may now create many
>>>>> non-transactional
>>>>> >> > > idempotent producers. Each of these producers will be assigned a
>>>>> >> producer
>>>>> >> > > ID and these IDs and their metadata are stored in the broker
>>>>> memory,
>>>>> >> > which
>>>>> >> > > might cause brokers out of memory.
>>>>> >> > >
>>>>> >> > > Justine (in cc.) and I and some other team members are working
>>>>> on the
>>>>> >> > > solutions for this issue. But none of them solves it completely
>>>>> >> without
>>>>> >> > > side effects. Among them, "availability" VS "idempotency
>>>>> guarantees"
>>>>> >> is
>>>>> >> > > what we can't decide which to sacrifice. Some of these solutions
>>>>> >> > sacrifice
>>>>> >> > > availability of produce (1,2,5) and others sacrifice idempotency
>>>>> >> > guarantees
>>>>> >> > > (3). It could be useful to know if people generally have a
>>>>> preference
>>>>> >> one
>>>>> >> > > way or the other. Or what other better solutions there might be.
>>>>> >> > >
>>>>> >> > > Here are the proposals we came up with:
>>>>> >> > >
>>>>> >> > > 1. Limit the total active producer ID allocation number.
>>>>> >> > > -> This is the simplest solution. But since the OOM issue is
>>>>> usually
>>>>> >> > > caused by a rogue or misconfigured client, and this solution
>>>>> might
>>>>> >> > "punish"
>>>>> >> > > the good client from sending messages.
>>>>> >> > >
>>>>> >> > > 2. Throttling the producer ID allocation rate
>>>>> >> > > -> Same concern as the solution #1.
>>>>> >> > >
>>>>> >> > > 3. Having a limit to the number of active producer IDs (sort of
>>>>> like
>>>>> >> an
>>>>> >> > > LRU cache)
>>>>> >> > > -> The idea here is that if we hit a misconfigured client, we
>>>>> will
>>>>> >> expire
>>>>> >> > > the older entries. The concern here is we have risks to lose
>>>>> >> idempotency
>>>>> >> > > guarantees, and currently, we don't have a way to notify
>>>>> clients about
>>>>> >> > > losing idempotency guarantees. Besides, the least  recently used
>>>>> >> entries
>>>>> >> > > got removed are not always from the "bad" clients.
>>>>> >> > >
>>>>> >> > > 4. allow clients to "close" the producer ID usage
>>>>> >> > > -> We can provide a way for producer to "close" producerID
>>>>> usage.
>>>>> >> > > Currently, we only have a way to INIT_PRODUCER_ID requested to
>>>>> >> allocate
>>>>> >> > > one. After that, we'll keep the producer ID metadata in broker
>>>>> even if
>>>>> >> > the
>>>>> >> > > producer is "closed". Having a closed API (ex:
>>>>> END_PRODUCER_ID), we
>>>>> >> can
>>>>> >> > > remove the entry from broker side. In client side, we can send
>>>>> it when
>>>>> >> > > producer closing. The concern is, the old clients (including
>>>>> non-java
>>>>> >> > > clients) will still suffer from the OOM issue.
>>>>> >> > >
>>>>> >> > > 5. limit/throttling the producer id based on the principle
>>>>> >> > > -> Although we can limit the impact to a certain principle with
>>>>> this
>>>>> >> > idea,
>>>>> >> > > same concern still exists as solution #1 #2.
>>>>> >> > >
>>>>> >> > > Any thoughts/feedback are welcomed.
>>>>> >> > >
>>>>> >> > > Thank you.
>>>>> >> > > Luke
>>>>> >> > >
>>>>> >> >
>>>>> >>
>>>>> >
>>>>>
>>>>
>> On Wed, Feb 15, 2023 at 12:20 AM Justine Olshan <jols...@confluent.io>
>> wrote:
>>
>>> Hey Omnia,
>>>
>>> Thanks for the response. I think I understand your explanations here
>>> with respect to principal and clientId usage.
>>>
>>> For the throttling -- handleInitProducerIdRequest will allocate the ID
>>> to the producer, but we don't actually store it on the broker or increment
>>> our metric until the first produce request for that producer is sent (or
>>> sent again after previously expiring). Would you consider throttling the
>>> produce request instead? It may be hard to get any metrics from the
>>> transaction coordinator where the initProducerId request is handled.
>>>
>>> Justine
>>>
>>> On Tue, Feb 14, 2023 at 9:29 AM Omnia Ibrahim <o.g.h.ibra...@gmail.com>
>>> wrote:
>>>
>>>> Hey Justine,
>>>> > If I understand your message correctly, there are issues with
>>>> identifying the source of the rogue clients? So you propose to add a new
>>>> metric for that?
>>>> > And also proposing to throttle based on clientId as a potential
>>>> follow up?
>>>> I want to identify rogue clients by KafkaPrincipal (and/or clientId)
>>>> similarly to how we identify clients in Fetch/Produce/Request
>>>> QuotaManagers. Using KafkaPrincipal should give cluster admin the ability
>>>> to throttle later based on principal which is most likely to be a smaller
>>>> set than clientIds. My initial thought was to add a metrics that represent
>>>> how many InitProducerIDRequest are sent by KafkaPrincipal (and/or clientId)
>>>> similar to Fetch/Produce QuotaManagers.
>>>> Then as a follow-up, we can throttle based on either KafkaPrinciple or
>>>> clientId (maybe default as well to align this with other QuotaManagers in
>>>> Kafka).
>>>>
>>>> >1. Does we rely on the client using the same ID? What if there are
>>>> many clients that all use different client IDs?
>>>> This is why I want to use the combination of KafkaPrincipal or clientId
>>>> similar to some other quotas we have in Kafka already. This will be a
>>>> similar risk to Fetch/Produce quota in Kafka which also relay on the client
>>>> to use the same clientId and KafkaPrincipal.
>>>>
>>>> >2. Are there places where high cardinality of this metric is a
>>>> concern? I can imagine many client IDs in the system. Would we treat this
>>>> as a rate metric (ie, when we get an init producer ID and return a new
>>>> producer ID we emit a count for that client id?) Or something else?
>>>> My initial thought here was to follow the steps of ClientQuotaManager
>>>> and ClientRequestQuotaManager and use a rate metric. However, I think we
>>>> can emit it either
>>>>
>>>>    1. when we return the new PID. However, I have concerns that we may
>>>>    circle back to the previous concerns with OMM due to keeping track of
>>>>    ACTIVE PIDs per KafkaPrincipal(and/or) clientId in the future. Also this
>>>>    would be the first time Kafka throttle IDs for any client.
>>>>    2. or once we recieve initProducerIDRequest and throttle before
>>>>    even hitting `handleInitProducerIdRequest`. Going this direction we may
>>>>    need to throttle it within a different quota window than `
>>>>    quota.window.size.seconds ` as throttling INIT_PRODUCER_ID request
>>>>    per second wouldn't be efficient for most cases. I personally think this
>>>>    direction is easier as it seems more consistent with the existing quota
>>>>    implementation. Specially that Kafka has already the concept of 
>>>> throttling
>>>>    subset of requests (in ControllerMutationQuotaManager) but never had any
>>>>    concept of throttling active IDs for any client.
>>>>
>>>>
>>>> What do you think?
>>>>
>>>> Thanks
>>>> Omnia
>>>>
>>>> On Thu, Feb 2, 2023 at 5:39 PM Justine Olshan <jols...@confluent.io>
>>>> wrote:
>>>>
>>>>> Hey Omnia,
>>>>> Sorry for losing track of this.
>>>>>
>>>>> If I understand your message correctly, there are issues with
>>>>> identifying the source of the rogue clients? So you propose to add a new
>>>>> metric for that?
>>>>> And also proposing to throttle based on clientId as a potential follow
>>>>> up?
>>>>>
>>>>> I think both of these make sense. The only things I can think of for
>>>>> the metric are:
>>>>> 1. Does we rely on the client using the same ID? What if there are
>>>>> many clients that all use different client IDs?
>>>>> 2. Are there places where high cardinality of this metric is a
>>>>> concern? I can imagine many client IDs in the system. Would we treat this
>>>>> as a rate metric (ie, when we get an init producer ID and return a new
>>>>> producer ID we emit a count for that client id?) Or something else?
>>>>>
>>>>> Thanks,
>>>>> Justine
>>>>>
>>>>> On Thu, Feb 2, 2023 at 8:44 AM Omnia Ibrahim <o.g.h.ibra...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Luke and Justine,
>>>>>> Are there any thoughts or updates on this? I would love to help with
>>>>>> this as we are hitting this more frequently now.
>>>>>>
>>>>>> best,
>>>>>>
>>>>>> On Mon, Oct 31, 2022 at 6:15 PM Omnia Ibrahim <
>>>>>> o.g.h.ibra...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Luke and Justine,
>>>>>>>
>>>>>>>> For (3), you said:
>>>>>>>> > - I have some concerns about the impact of this option on the
>>>>>>>> transactional
>>>>>>>> producers, for example, what will happen to an ongoing transaction
>>>>>>>> associated with an expired PID? Would this leave the transactions
>>>>>>>> in a
>>>>>>>> "hanging" state?
>>>>>>>>
>>>>>>>> - How will we notify the client that the transaction can't continue
>>>>>>>> due to
>>>>>>>> an expired PID?
>>>>>>>>
>>>>>>>> - If PID got marked as `expired` this will mean that
>>>>>>>> `admin.DescribeProducers` will not list them which will make
>>>>>>>> *`kafka-transactions.sh
>>>>>>>> --list`* a bit tricky as we can't identify if there are
>>>>>>>> transactions linked
>>>>>>>> to this expired PID or not. The same concern applies to
>>>>>>>> *`kafka-transactions.sh
>>>>>>>> --find-hanging`*.
>>>>>>>>
>>>>>>>> --> Yes, you're right. Those are also concerns for this solution.
>>>>>>>> Currently, there's no way to notify clients about the expiration.
>>>>>>>> Also, the ongoing transactions will be hanging. For the admin cli,
>>>>>>>> we've
>>>>>>>> never thought about it. Good point.
>>>>>>>> In summary, to adopt this solution, there are many issues needed to
>>>>>>>> get
>>>>>>>> fixed.
>>>>>>>>
>>>>>>>
>>>>>>> Justin already clarified that if PID is attached to a transaction it
>>>>>>> will not expire so identifying the transactions shouldn't be a concern
>>>>>>> anymore.
>>>>>>> The only concern here will be that this solution will not solve the
>>>>>>> problem if the rouge producer is a transactional producer with hanging
>>>>>>> transactions.
>>>>>>> If anyone faced this situation they will need to abort the hanging
>>>>>>> transactions manually and then the solution to expire a PID can then 
>>>>>>> work.
>>>>>>>
>>>>>>> --> Yes, I mean KafkaPrinciple (sorry, I didn't make it clear)
>>>>>>>> Yes, We were thinking about throttling by KafkaPrinciple. Client Id
>>>>>>>> is also
>>>>>>>> workable.
>>>>>>>> It's just these 2 attributes are not required.
>>>>>>>> That is, it's possible we take all clients as the same one: {default
>>>>>>>> KafkaPrinciple + default clientID}, and apply throttling on it.
>>>>>>>> Do you have any thoughts about it?
>>>>>>>> Maybe skip throttling for {default KafkaPrinciple + default
>>>>>>>> clientID}
>>>>>>>>
>>>>>>>
>>>>>>> Throttling for default KafkaPrinciple and default ClientID is useful
>>>>>>> when we need to have a hard limit on the whole cluster and whoever is
>>>>>>> running the cluster doesn't knowclientsntIDs or if a KafkaPrinciple is
>>>>>>> reused between different producer applications.
>>>>>>> I usually find it helpful to have a way to apply throttling only on
>>>>>>> the rough clients only once I identify them without punishing everyone 
>>>>>>> on
>>>>>>> the cluster. However, there are two problems with this
>>>>>>> - There's no easy way at the moment to link PIDs to clientId or
>>>>>>> KafkaPrinciple. This need to be addressed first.
>>>>>>> - Is Justin's comment on the throttling, and the fact that will mean
>>>>>>> we either block all requests or have to store the request in memory 
>>>>>>> which
>>>>>>> in both cases has side downs on the producer experince.
>>>>>>>
>>>>>>> I recently had another discussion with my team and it does seem like
>>>>>>>> there
>>>>>>>> should be a way to make it more clear to the clients what is going
>>>>>>>> on. A
>>>>>>>> lot of this protocol is implicit. I'm wondering if maybe there is a
>>>>>>>> way to
>>>>>>>> improve the story for newer clients. (Ie if we choose to expire
>>>>>>>> based on a
>>>>>>>> size limit, we should include a response indicating the ID has
>>>>>>>> expired.) We
>>>>>>>> also discussed ways to redefine the guarantees so that users who
>>>>>>>> have
>>>>>>>> stronger idempotency requirements can ensure them (over
>>>>>>>> availability/memory
>>>>>>>> concerns). Let me know if you have any ideas here.
>>>>>>>>
>>>>>>>
>>>>>>> It may be easier to improve the experience for new clients. However,
>>>>>>> if we improved only the new clients we may need a way to help teams who 
>>>>>>> run
>>>>>>> Kafka with rough clients on old versions by at least giving them an easy
>>>>>>> way to identify the clientId/ or KafkaPrinciple that generated these 
>>>>>>> PIDs.
>>>>>>>
>>>>>>> For context, it's very tricky to even identify which clientId is
>>>>>>> creating all these PIDs that caused OOM, which is a contributing part of
>>>>>>> the issue at the moment. So maybe one option here could be adding a new
>>>>>>> metric that tracks the number of generated PIDs per clientId. This will
>>>>>>> help the team who runs the Kafka cluster to
>>>>>>> - contact these rough clients and ask them to fix their clients or
>>>>>>> upgrade to a new client if the new client version has a better 
>>>>>>> experience.
>>>>>>> - or if ended with a throttling solution this may help identify
>>>>>>> which clientId needs to be throttled.
>>>>>>>
>>>>>>> Maybe we can start with a solution for identifying the rough clients
>>>>>>> first and keep looking for a solution to limit them, what do you think?
>>>>>>>
>>>>>>> Thanks
>>>>>>>
>>>>>>> On Tue, Oct 18, 2022 at 5:24 PM Justine Olshan
>>>>>>> <jols...@confluent.io.invalid> wrote:
>>>>>>>
>>>>>>>> Oops.  I realized I just replied to Omnia 🤦‍♀️
>>>>>>>>
>>>>>>>> Here was my response for the mailing thread:
>>>>>>>>
>>>>>>>> Hey Omnia,
>>>>>>>> Sorry to hear this is a problem for you as well. :(
>>>>>>>> > * I have some concerns about the impact of this option on the
>>>>>>>> transactional producers, for example, what will happen to an ongoing
>>>>>>>> transaction associated with an expired PID? Would this leave the
>>>>>>>> transactions in a "hanging" state?*
>>>>>>>> We currently check if a transaction is ongoing and do not expire the
>>>>>>>> producer ID if it has an ongoing transaction. I suspect we will
>>>>>>>> continue to
>>>>>>>> do this with any solution we pick.
>>>>>>>>
>>>>>>>> My team members and I looked a bit into the throttling case and it
>>>>>>>> can get
>>>>>>>> a bit tricky since it means we need to throttle the produce request
>>>>>>>> before
>>>>>>>> it is processed. This means we either block all requests or have to
>>>>>>>> store
>>>>>>>> the request in memory (which is not great if we are trying to save
>>>>>>>> memory).
>>>>>>>>
>>>>>>>> I recently had another discussion with my team and it does seem
>>>>>>>> like there
>>>>>>>> should be a way to make it more clear to the clients what is going
>>>>>>>> on. A
>>>>>>>> lot of this protocol is implicit. I'm wondering if maybe there is a
>>>>>>>> way to
>>>>>>>> improve the story for newer clients. (Ie if we choose to expire
>>>>>>>> based on a
>>>>>>>> size limit, we should include a response indicating the ID has
>>>>>>>> expired.) We
>>>>>>>> also discussed ways to redefine the guarantees so that users who
>>>>>>>> have
>>>>>>>> stronger idempotency requirements can ensure them (over
>>>>>>>> availability/memory
>>>>>>>> concerns). Let me know if you have any ideas here.
>>>>>>>>
>>>>>>>> Thanks again for commenting here, hopefully we can come to a good
>>>>>>>> solution.
>>>>>>>>
>>>>>>>> On Tue, Oct 18, 2022 at 1:11 AM Luke Chen <show...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>> > Hi Omnia,
>>>>>>>> >
>>>>>>>> > Thanks for your reply.
>>>>>>>> >
>>>>>>>> > For (3), you said:
>>>>>>>> > > - I have some concerns about the impact of this option on the
>>>>>>>> > transactional
>>>>>>>> > producers, for example, what will happen to an ongoing transaction
>>>>>>>> > associated with an expired PID? Would this leave the transactions
>>>>>>>> in a
>>>>>>>> > "hanging" state?
>>>>>>>> >
>>>>>>>> > - How will we notify the client that the transaction can't
>>>>>>>> continue due to
>>>>>>>> > an expired PID?
>>>>>>>> >
>>>>>>>> > - If PID got marked as `expired` this will mean that
>>>>>>>> > `admin.DescribeProducers` will not list them which will make
>>>>>>>> > *`kafka-transactions.sh
>>>>>>>> > --list`* a bit tricky as we can't identify if there are
>>>>>>>> transactions linked
>>>>>>>> > to this expired PID or not. The same concern applies to
>>>>>>>> > *`kafka-transactions.sh
>>>>>>>> > --find-hanging`*.
>>>>>>>> >
>>>>>>>> > --> Yes, you're right. Those are also concerns for this solution.
>>>>>>>> > Currently, there's no way to notify clients about the expiration.
>>>>>>>> > Also, the ongoing transactions will be hanging. For the admin
>>>>>>>> cli, we've
>>>>>>>> > never thought about it. Good point.
>>>>>>>> > In summary, to adopt this solution, there are many issues needed
>>>>>>>> to get
>>>>>>>> > fixed.
>>>>>>>> >
>>>>>>>> >
>>>>>>>> > For (5), you said:
>>>>>>>> > > I am assuming you mean KafkaPrincipal here! If so is your
>>>>>>>> concern here
>>>>>>>> > that
>>>>>>>> > those good clients that use the same principal as a rogue one
>>>>>>>> will get
>>>>>>>> > throttled?
>>>>>>>> >
>>>>>>>> > If this is the case, then I believe it should be okay as other
>>>>>>>> throttling
>>>>>>>> > in Kafka on *`/config/users/<user>`* has the same behaviour.
>>>>>>>> >
>>>>>>>> >
>>>>>>>> > What about applying limit/throttling to
>>>>>>>> > *`/config/users/<user>/clients/<client-id>`
>>>>>>>> > *similar to what we have with client quota? This should reduce
>>>>>>>> the concern
>>>>>>>> > about throttling good clients, right?
>>>>>>>> >
>>>>>>>> > --> Yes, I mean KafkaPrinciple (sorry, I didn't make it clear)
>>>>>>>> > Yes, We were thinking about throttling by KafkaPrinciple. Client
>>>>>>>> Id is
>>>>>>>> > also workable.
>>>>>>>> > It's just these 2 attributes are not required.
>>>>>>>> > That is, it's possible we take all clients as the same one:
>>>>>>>> {default
>>>>>>>> > KafkaPrinciple + default clientID}, and apply throttling on it.
>>>>>>>> > Do you have any thoughts about it?
>>>>>>>> > Maybe skip throttling for {default KafkaPrinciple + default
>>>>>>>> clientID} ?
>>>>>>>> >
>>>>>>>> > Luke
>>>>>>>> >
>>>>>>>> >
>>>>>>>> >
>>>>>>>> > On Sat, Oct 15, 2022 at 2:33 AM Omnia Ibrahim <
>>>>>>>> o.g.h.ibra...@gmail.com>
>>>>>>>> > wrote:
>>>>>>>> >
>>>>>>>> >> Hi Luke & Justine,
>>>>>>>> >> Thanks for looking into this issue, we have been experiencing
>>>>>>>> this because
>>>>>>>> >> of rouge clients as well.
>>>>>>>> >>
>>>>>>>> >> > 3. Having a limit to the number of active producer IDs (sort
>>>>>>>> of like an
>>>>>>>> >> LRU
>>>>>>>> >> >cache)
>>>>>>>> >> >-> The idea here is that if we hit a misconfigured client, we
>>>>>>>> will expire
>>>>>>>> >> >the older entries. The concern here is we have risks to lose
>>>>>>>> idempotency
>>>>>>>> >> >guarantees, and currently, we don't have a way to notify
>>>>>>>> clients about
>>>>>>>> >> >losing idempotency guarantees. Besides, the least  recently
>>>>>>>> used entries
>>>>>>>> >> >got removed are not always from the "bad" clients.
>>>>>>>> >>
>>>>>>>> >> - I have some concerns about the impact of this option on the
>>>>>>>> >> transactional
>>>>>>>> >> producers, for example, what will happen to an ongoing
>>>>>>>> transaction
>>>>>>>> >> associated with an expired PID? Would this leave the
>>>>>>>> transactions in a
>>>>>>>> >> "hanging" state?
>>>>>>>> >>
>>>>>>>> >> - How will we notify the client that the transaction can't
>>>>>>>> continue due to
>>>>>>>> >> an expired PID?
>>>>>>>> >>
>>>>>>>> >> - If PID got marked as `expired` this will mean that
>>>>>>>> >> `admin.DescribeProducers` will not list them which will make
>>>>>>>> >> *`kafka-transactions.sh
>>>>>>>> >> --list`* a bit tricky as we can't identify if there are
>>>>>>>> transactions
>>>>>>>> >> linked
>>>>>>>> >> to this expired PID or not. The same concern applies to
>>>>>>>> >> *`kafka-transactions.sh
>>>>>>>> >> --find-hanging`*.
>>>>>>>> >>
>>>>>>>> >>
>>>>>>>> >> >5. limit/throttling the producer id based on the principle
>>>>>>>> >> >-> Although we can limit the impact to a certain principle with
>>>>>>>> this
>>>>>>>> >> idea,
>>>>>>>> >> >same concern still exists as solution #1 #2.
>>>>>>>> >>
>>>>>>>> >> I am assuming you mean KafkaPrincipal here! If so is your
>>>>>>>> concern here
>>>>>>>> >> that
>>>>>>>> >> those good clients that use the same principal as a rogue one
>>>>>>>> will get
>>>>>>>> >> throttled?
>>>>>>>> >>
>>>>>>>> >> If this is the case, then I believe it should be okay as other
>>>>>>>> throttling
>>>>>>>> >> in Kafka on *`/config/users/<user>`* has the same behaviour.
>>>>>>>> >>
>>>>>>>> >>
>>>>>>>> >> What about applying limit/throttling to
>>>>>>>> >> *`/config/users/<user>/clients/<client-id>`
>>>>>>>> >> *similar to what we have with client quota? This should reduce
>>>>>>>> the concern
>>>>>>>> >> about throttling good clients, right?
>>>>>>>> >>
>>>>>>>> >> best,
>>>>>>>> >>
>>>>>>>> >> Omnia
>>>>>>>> >>
>>>>>>>> >> On Tue, Oct 11, 2022 at 4:18 AM Luke Chen <show...@gmail.com>
>>>>>>>> wrote:
>>>>>>>> >>
>>>>>>>> >> > Bump this thread to see if there are any comments/thoughts.
>>>>>>>> >> > Thanks.
>>>>>>>> >> >
>>>>>>>> >> > Luke
>>>>>>>> >> >
>>>>>>>> >> > On Mon, Sep 26, 2022 at 11:06 AM Luke Chen <show...@gmail.com>
>>>>>>>> wrote:
>>>>>>>> >> >
>>>>>>>> >> > > Hi devs,
>>>>>>>> >> > >
>>>>>>>> >> > > As stated in the motivation section in KIP-854
>>>>>>>> >> > > <
>>>>>>>> >> >
>>>>>>>> >>
>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-854+Separate+configuration+for+producer+ID+expiry
>>>>>>>> >> > >:
>>>>>>>> >> > >
>>>>>>>> >> > > With idempotent producers becoming the default in Kafka,
>>>>>>>> this means
>>>>>>>> >> that
>>>>>>>> >> > > unless otherwise specified, all new producers will be given
>>>>>>>> producer
>>>>>>>> >> IDs.
>>>>>>>> >> > > Some (inefficient) applications may now create many
>>>>>>>> non-transactional
>>>>>>>> >> > > idempotent producers. Each of these producers will be
>>>>>>>> assigned a
>>>>>>>> >> producer
>>>>>>>> >> > > ID and these IDs and their metadata are stored in the broker
>>>>>>>> memory,
>>>>>>>> >> > which
>>>>>>>> >> > > might cause brokers out of memory.
>>>>>>>> >> > >
>>>>>>>> >> > > Justine (in cc.) and I and some other team members are
>>>>>>>> working on the
>>>>>>>> >> > > solutions for this issue. But none of them solves it
>>>>>>>> completely
>>>>>>>> >> without
>>>>>>>> >> > > side effects. Among them, "availability" VS "idempotency
>>>>>>>> guarantees"
>>>>>>>> >> is
>>>>>>>> >> > > what we can't decide which to sacrifice. Some of these
>>>>>>>> solutions
>>>>>>>> >> > sacrifice
>>>>>>>> >> > > availability of produce (1,2,5) and others sacrifice
>>>>>>>> idempotency
>>>>>>>> >> > guarantees
>>>>>>>> >> > > (3). It could be useful to know if people generally have a
>>>>>>>> preference
>>>>>>>> >> one
>>>>>>>> >> > > way or the other. Or what other better solutions there might
>>>>>>>> be.
>>>>>>>> >> > >
>>>>>>>> >> > > Here are the proposals we came up with:
>>>>>>>> >> > >
>>>>>>>> >> > > 1. Limit the total active producer ID allocation number.
>>>>>>>> >> > > -> This is the simplest solution. But since the OOM issue is
>>>>>>>> usually
>>>>>>>> >> > > caused by a rogue or misconfigured client, and this solution
>>>>>>>> might
>>>>>>>> >> > "punish"
>>>>>>>> >> > > the good client from sending messages.
>>>>>>>> >> > >
>>>>>>>> >> > > 2. Throttling the producer ID allocation rate
>>>>>>>> >> > > -> Same concern as the solution #1.
>>>>>>>> >> > >
>>>>>>>> >> > > 3. Having a limit to the number of active producer IDs (sort
>>>>>>>> of like
>>>>>>>> >> an
>>>>>>>> >> > > LRU cache)
>>>>>>>> >> > > -> The idea here is that if we hit a misconfigured client,
>>>>>>>> we will
>>>>>>>> >> expire
>>>>>>>> >> > > the older entries. The concern here is we have risks to lose
>>>>>>>> >> idempotency
>>>>>>>> >> > > guarantees, and currently, we don't have a way to notify
>>>>>>>> clients about
>>>>>>>> >> > > losing idempotency guarantees. Besides, the least  recently
>>>>>>>> used
>>>>>>>> >> entries
>>>>>>>> >> > > got removed are not always from the "bad" clients.
>>>>>>>> >> > >
>>>>>>>> >> > > 4. allow clients to "close" the producer ID usage
>>>>>>>> >> > > -> We can provide a way for producer to "close" producerID
>>>>>>>> usage.
>>>>>>>> >> > > Currently, we only have a way to INIT_PRODUCER_ID requested
>>>>>>>> to
>>>>>>>> >> allocate
>>>>>>>> >> > > one. After that, we'll keep the producer ID metadata in
>>>>>>>> broker even if
>>>>>>>> >> > the
>>>>>>>> >> > > producer is "closed". Having a closed API (ex:
>>>>>>>> END_PRODUCER_ID), we
>>>>>>>> >> can
>>>>>>>> >> > > remove the entry from broker side. In client side, we can
>>>>>>>> send it when
>>>>>>>> >> > > producer closing. The concern is, the old clients (including
>>>>>>>> non-java
>>>>>>>> >> > > clients) will still suffer from the OOM issue.
>>>>>>>> >> > >
>>>>>>>> >> > > 5. limit/throttling the producer id based on the principle
>>>>>>>> >> > > -> Although we can limit the impact to a certain principle
>>>>>>>> with this
>>>>>>>> >> > idea,
>>>>>>>> >> > > same concern still exists as solution #1 #2.
>>>>>>>> >> > >
>>>>>>>> >> > > Any thoughts/feedback are welcomed.
>>>>>>>> >> > >
>>>>>>>> >> > > Thank you.
>>>>>>>> >> > > Luke
>>>>>>>> >> > >
>>>>>>>> >> >
>>>>>>>> >>
>>>>>>>> >
>>>>>>>>
>>>>>>>

Reply via email to