Hi Justine and Luke,

I started a KIP draft here
https://cwiki.apache.org/confluence/display/KAFKA/KIP-936%3A+Throttle+number+of+active+PIDs
for a proposal would appreciate it if you could provide any initial
feedback before opening a broader discussion.

Thanks

On Wed, Feb 22, 2023 at 4:35 PM Omnia Ibrahim <o.g.h.ibra...@gmail.com>
wrote:

>
> *Hi Justine, *
>
> *My initial thought of throttling the initProducerId was to get ripped off
> the problem at the source (which creates too many PIDs per client) and fail
> faster but if having this on the produce request level is easier this
> should be fine. I am guessing it will be the same direction as we may
> ClientQuotaManage for Produce throttling with a different quota window than
> `quota.window.size.seconds `. *
>
> *If this is good as an initial solution I can put start a KIP and see what
> the wider community feels about this. *
>
> *Also, I noticed that at some point one of us hit "Replay" instead of
> "Replay to All" :)  So here are the previous conversations*
>
> *On Wed, Feb 15, 2023 at 12:20 AM Justine Olshan <jols...@confluent.io
> <jols...@confluent.io>> wrote:*
>
>> Hey Omnia,
>>
>> Thanks for the response. I think I understand your explanations here with
>> respect to principal and clientId usage.
>>
>> For the throttling -- handleInitProducerIdRequest will allocate the ID to
>> the producer, but we don't actually store it on the broker or increment our
>> metric until the first produce request for that producer is sent (or sent
>> again after previously expiring). Would you consider throttling the produce
>> request instead? It may be hard to get any metrics from the transaction
>> coordinator where the initProducerId request is handled.
>>
>> Justine
>
>
> *On Tue, Feb 14, 2023 at 9:29 AM Omnia Ibrahim <o.g.h.ibra...@gmail.com
> <o.g.h.ibra...@gmail.com>> wrote:*
>
>> Hey Justine,
>> > If I understand your message correctly, there are issues with
>> identifying the source of the rogue clients? So you propose to add a new
>> metric for that?
>> > And also proposing to throttle based on clientId as a potential follow
>> up?
>> I want to identify rogue clients by KafkaPrincipal (and/or clientId)
>> similarly to how we identify clients in Fetch/Produce/Request
>> QuotaManagers. Using KafkaPrincipal should give cluster admin the ability
>> to throttle later based on principal which is most likely to be a smaller
>> set than clientIds. My initial thought was to add a metrics that represent
>> how many InitProducerIDRequest are sent by KafkaPrincipal (and/or clientId)
>> similar to Fetch/Produce QuotaManagers.
>> Then as a follow-up, we can throttle based on either KafkaPrinciple or
>> clientId (maybe default as well to align this with other QuotaManagers in
>> Kafka).
>>
>> >1. Does we rely on the client using the same ID? What if there are many
>> clients that all use different client IDs?
>> This is why I want to use the combination of KafkaPrincipal or clientId
>> similar to some other quotas we have in Kafka already. This will be a
>> similar risk to Fetch/Produce quota in Kafka which also relay on the client
>> to use the same clientId and KafkaPrincipal.
>>
>> >2. Are there places where high cardinality of this metric is a concern?
>> I can imagine many client IDs in the system. Would we treat this as a rate
>> metric (ie, when we get an init producer ID and return a new producer ID we
>> emit a count for that client id?) Or something else?
>> My initial thought here was to follow the steps of ClientQuotaManager and
>> ClientRequestQuotaManager and use a rate metric. However, I think we can
>> emit it either
>>
>>    1. when we return the new PID. However, I have concerns that we may
>>    circle back to the previous concerns with OMM due to keeping track of
>>    ACTIVE PIDs per KafkaPrincipal(and/or) clientId in the future. Also this
>>    would be the first time Kafka throttle IDs for any client.
>>    2. or once we recieve initProducerIDRequest and throttle before even
>>    hitting `handleInitProducerIdRequest`. Going this direction we may need to
>>    throttle it within a different quota window than `
>>    quota.window.size.seconds ` as throttling INIT_PRODUCER_ID request
>>    per second wouldn't be efficient for most cases. I personally think this
>>    direction is easier as it seems more consistent with the existing quota
>>    implementation. Specially that Kafka has already the concept of throttling
>>    subset of requests (in ControllerMutationQuotaManager) but never had any
>>    concept of throttling active IDs for any client.
>>
>>
>> What do you think?
>>
>> Thanks
>> Omnia
>>
>
> *On Thu, Feb 2, 2023 at 5:39 PM Justine Olshan <jols...@confluent.io
> <jols...@confluent.io>> wrote:*
>
>> Hey Omnia,
>> Sorry for losing track of this.
>>
>> If I understand your message correctly, there are issues with identifying
>> the source of the rogue clients? So you propose to add a new metric for
>> that?
>> And also proposing to throttle based on clientId as a potential follow up?
>>
>> I think both of these make sense. The only things I can think of for the
>> metric are:
>> 1. Does we rely on the client using the same ID? What if there are many
>> clients that all use different client IDs?
>> 2. Are there places where high cardinality of this metric is a concern? I
>> can imagine many client IDs in the system. Would we treat this as a rate
>> metric (ie, when we get an init producer ID and return a new producer ID we
>> emit a count for that client id?) Or something else?
>>
>> Thanks,
>> Justine
>>
>
> Thanks
> Omnia
>
> On Thu, Feb 2, 2023 at 4:44 PM Omnia Ibrahim <o.g.h.ibra...@gmail.com>
> wrote:
>
>> Hi Luke and Justine,
>> Are there any thoughts or updates on this? I would love to help with this
>> as we are hitting this more frequently now.
>>
>> best,
>>
>> On Mon, Oct 31, 2022 at 6:15 PM Omnia Ibrahim <o.g.h.ibra...@gmail.com>
>> wrote:
>>
>>> Hi Luke and Justine,
>>>
>>>> For (3), you said:
>>>> > - I have some concerns about the impact of this option on the
>>>> transactional
>>>> producers, for example, what will happen to an ongoing transaction
>>>> associated with an expired PID? Would this leave the transactions in a
>>>> "hanging" state?
>>>>
>>>> - How will we notify the client that the transaction can't continue due
>>>> to
>>>> an expired PID?
>>>>
>>>> - If PID got marked as `expired` this will mean that
>>>> `admin.DescribeProducers` will not list them which will make
>>>> *`kafka-transactions.sh
>>>> --list`* a bit tricky as we can't identify if there are transactions
>>>> linked
>>>> to this expired PID or not. The same concern applies to
>>>> *`kafka-transactions.sh
>>>> --find-hanging`*.
>>>>
>>>> --> Yes, you're right. Those are also concerns for this solution.
>>>> Currently, there's no way to notify clients about the expiration.
>>>> Also, the ongoing transactions will be hanging. For the admin cli, we've
>>>> never thought about it. Good point.
>>>> In summary, to adopt this solution, there are many issues needed to get
>>>> fixed.
>>>>
>>>
>>> Justin already clarified that if PID is attached to a transaction it
>>> will not expire so identifying the transactions shouldn't be a concern
>>> anymore.
>>> The only concern here will be that this solution will not solve the
>>> problem if the rouge producer is a transactional producer with hanging
>>> transactions.
>>> If anyone faced this situation they will need to abort the hanging
>>> transactions manually and then the solution to expire a PID can then work.
>>>
>>> --> Yes, I mean KafkaPrinciple (sorry, I didn't make it clear)
>>>> Yes, We were thinking about throttling by KafkaPrinciple. Client Id is
>>>> also
>>>> workable.
>>>> It's just these 2 attributes are not required.
>>>> That is, it's possible we take all clients as the same one: {default
>>>> KafkaPrinciple + default clientID}, and apply throttling on it.
>>>> Do you have any thoughts about it?
>>>> Maybe skip throttling for {default KafkaPrinciple + default clientID}
>>>>
>>>
>>> Throttling for default KafkaPrinciple and default ClientID is useful
>>> when we need to have a hard limit on the whole cluster and whoever is
>>> running the cluster doesn't knowclientsntIDs or if a KafkaPrinciple is
>>> reused between different producer applications.
>>> I usually find it helpful to have a way to apply throttling only on the
>>> rough clients only once I identify them without punishing everyone on the
>>> cluster. However, there are two problems with this
>>> - There's no easy way at the moment to link PIDs to clientId or
>>> KafkaPrinciple. This need to be addressed first.
>>> - Is Justin's comment on the throttling, and the fact that will mean we
>>> either block all requests or have to store the request in memory which in
>>> both cases has side downs on the producer experince.
>>>
>>> I recently had another discussion with my team and it does seem like
>>>> there
>>>> should be a way to make it more clear to the clients what is going on. A
>>>> lot of this protocol is implicit. I'm wondering if maybe there is a way
>>>> to
>>>> improve the story for newer clients. (Ie if we choose to expire based
>>>> on a
>>>> size limit, we should include a response indicating the ID has
>>>> expired.) We
>>>> also discussed ways to redefine the guarantees so that users who have
>>>> stronger idempotency requirements can ensure them (over
>>>> availability/memory
>>>> concerns). Let me know if you have any ideas here.
>>>>
>>>
>>> It may be easier to improve the experience for new clients. However, if
>>> we improved only the new clients we may need a way to help teams who run
>>> Kafka with rough clients on old versions by at least giving them an easy
>>> way to identify the clientId/ or KafkaPrinciple that generated these PIDs.
>>>
>>> For context, it's very tricky to even identify which clientId is
>>> creating all these PIDs that caused OOM, which is a contributing part of
>>> the issue at the moment. So maybe one option here could be adding a new
>>> metric that tracks the number of generated PIDs per clientId. This will
>>> help the team who runs the Kafka cluster to
>>> - contact these rough clients and ask them to fix their clients or
>>> upgrade to a new client if the new client version has a better experience.
>>> - or if ended with a throttling solution this may help identify which
>>> clientId needs to be throttled.
>>>
>>> Maybe we can start with a solution for identifying the rough clients
>>> first and keep looking for a solution to limit them, what do you think?
>>>
>>> Thanks
>>>
>>> On Tue, Oct 18, 2022 at 5:24 PM Justine Olshan
>>> <jols...@confluent.io.invalid> wrote:
>>>
>>>> Oops.  I realized I just replied to Omnia 🤦‍♀️
>>>>
>>>> Here was my response for the mailing thread:
>>>>
>>>> Hey Omnia,
>>>> Sorry to hear this is a problem for you as well. :(
>>>> > * I have some concerns about the impact of this option on the
>>>> transactional producers, for example, what will happen to an ongoing
>>>> transaction associated with an expired PID? Would this leave the
>>>> transactions in a "hanging" state?*
>>>> We currently check if a transaction is ongoing and do not expire the
>>>> producer ID if it has an ongoing transaction. I suspect we will
>>>> continue to
>>>> do this with any solution we pick.
>>>>
>>>> My team members and I looked a bit into the throttling case and it can
>>>> get
>>>> a bit tricky since it means we need to throttle the produce request
>>>> before
>>>> it is processed. This means we either block all requests or have to
>>>> store
>>>> the request in memory (which is not great if we are trying to save
>>>> memory).
>>>>
>>>> I recently had another discussion with my team and it does seem like
>>>> there
>>>> should be a way to make it more clear to the clients what is going on. A
>>>> lot of this protocol is implicit. I'm wondering if maybe there is a way
>>>> to
>>>> improve the story for newer clients. (Ie if we choose to expire based
>>>> on a
>>>> size limit, we should include a response indicating the ID has
>>>> expired.) We
>>>> also discussed ways to redefine the guarantees so that users who have
>>>> stronger idempotency requirements can ensure them (over
>>>> availability/memory
>>>> concerns). Let me know if you have any ideas here.
>>>>
>>>> Thanks again for commenting here, hopefully we can come to a good
>>>> solution.
>>>>
>>>> On Tue, Oct 18, 2022 at 1:11 AM Luke Chen <show...@gmail.com> wrote:
>>>>
>>>> > Hi Omnia,
>>>> >
>>>> > Thanks for your reply.
>>>> >
>>>> > For (3), you said:
>>>> > > - I have some concerns about the impact of this option on the
>>>> > transactional
>>>> > producers, for example, what will happen to an ongoing transaction
>>>> > associated with an expired PID? Would this leave the transactions in a
>>>> > "hanging" state?
>>>> >
>>>> > - How will we notify the client that the transaction can't continue
>>>> due to
>>>> > an expired PID?
>>>> >
>>>> > - If PID got marked as `expired` this will mean that
>>>> > `admin.DescribeProducers` will not list them which will make
>>>> > *`kafka-transactions.sh
>>>> > --list`* a bit tricky as we can't identify if there are transactions
>>>> linked
>>>> > to this expired PID or not. The same concern applies to
>>>> > *`kafka-transactions.sh
>>>> > --find-hanging`*.
>>>> >
>>>> > --> Yes, you're right. Those are also concerns for this solution.
>>>> > Currently, there's no way to notify clients about the expiration.
>>>> > Also, the ongoing transactions will be hanging. For the admin cli,
>>>> we've
>>>> > never thought about it. Good point.
>>>> > In summary, to adopt this solution, there are many issues needed to
>>>> get
>>>> > fixed.
>>>> >
>>>> >
>>>> > For (5), you said:
>>>> > > I am assuming you mean KafkaPrincipal here! If so is your concern
>>>> here
>>>> > that
>>>> > those good clients that use the same principal as a rogue one will get
>>>> > throttled?
>>>> >
>>>> > If this is the case, then I believe it should be okay as other
>>>> throttling
>>>> > in Kafka on *`/config/users/<user>`* has the same behaviour.
>>>> >
>>>> >
>>>> > What about applying limit/throttling to
>>>> > *`/config/users/<user>/clients/<client-id>`
>>>> > *similar to what we have with client quota? This should reduce the
>>>> concern
>>>> > about throttling good clients, right?
>>>> >
>>>> > --> Yes, I mean KafkaPrinciple (sorry, I didn't make it clear)
>>>> > Yes, We were thinking about throttling by KafkaPrinciple. Client Id is
>>>> > also workable.
>>>> > It's just these 2 attributes are not required.
>>>> > That is, it's possible we take all clients as the same one: {default
>>>> > KafkaPrinciple + default clientID}, and apply throttling on it.
>>>> > Do you have any thoughts about it?
>>>> > Maybe skip throttling for {default KafkaPrinciple + default clientID}
>>>> ?
>>>> >
>>>> > Luke
>>>> >
>>>> >
>>>> >
>>>> > On Sat, Oct 15, 2022 at 2:33 AM Omnia Ibrahim <
>>>> o.g.h.ibra...@gmail.com>
>>>> > wrote:
>>>> >
>>>> >> Hi Luke & Justine,
>>>> >> Thanks for looking into this issue, we have been experiencing this
>>>> because
>>>> >> of rouge clients as well.
>>>> >>
>>>> >> > 3. Having a limit to the number of active producer IDs (sort of
>>>> like an
>>>> >> LRU
>>>> >> >cache)
>>>> >> >-> The idea here is that if we hit a misconfigured client, we will
>>>> expire
>>>> >> >the older entries. The concern here is we have risks to lose
>>>> idempotency
>>>> >> >guarantees, and currently, we don't have a way to notify clients
>>>> about
>>>> >> >losing idempotency guarantees. Besides, the least  recently used
>>>> entries
>>>> >> >got removed are not always from the "bad" clients.
>>>> >>
>>>> >> - I have some concerns about the impact of this option on the
>>>> >> transactional
>>>> >> producers, for example, what will happen to an ongoing transaction
>>>> >> associated with an expired PID? Would this leave the transactions in
>>>> a
>>>> >> "hanging" state?
>>>> >>
>>>> >> - How will we notify the client that the transaction can't continue
>>>> due to
>>>> >> an expired PID?
>>>> >>
>>>> >> - If PID got marked as `expired` this will mean that
>>>> >> `admin.DescribeProducers` will not list them which will make
>>>> >> *`kafka-transactions.sh
>>>> >> --list`* a bit tricky as we can't identify if there are transactions
>>>> >> linked
>>>> >> to this expired PID or not. The same concern applies to
>>>> >> *`kafka-transactions.sh
>>>> >> --find-hanging`*.
>>>> >>
>>>> >>
>>>> >> >5. limit/throttling the producer id based on the principle
>>>> >> >-> Although we can limit the impact to a certain principle with this
>>>> >> idea,
>>>> >> >same concern still exists as solution #1 #2.
>>>> >>
>>>> >> I am assuming you mean KafkaPrincipal here! If so is your concern
>>>> here
>>>> >> that
>>>> >> those good clients that use the same principal as a rogue one will
>>>> get
>>>> >> throttled?
>>>> >>
>>>> >> If this is the case, then I believe it should be okay as other
>>>> throttling
>>>> >> in Kafka on *`/config/users/<user>`* has the same behaviour.
>>>> >>
>>>> >>
>>>> >> What about applying limit/throttling to
>>>> >> *`/config/users/<user>/clients/<client-id>`
>>>> >> *similar to what we have with client quota? This should reduce the
>>>> concern
>>>> >> about throttling good clients, right?
>>>> >>
>>>> >> best,
>>>> >>
>>>> >> Omnia
>>>> >>
>>>> >> On Tue, Oct 11, 2022 at 4:18 AM Luke Chen <show...@gmail.com> wrote:
>>>> >>
>>>> >> > Bump this thread to see if there are any comments/thoughts.
>>>> >> > Thanks.
>>>> >> >
>>>> >> > Luke
>>>> >> >
>>>> >> > On Mon, Sep 26, 2022 at 11:06 AM Luke Chen <show...@gmail.com>
>>>> wrote:
>>>> >> >
>>>> >> > > Hi devs,
>>>> >> > >
>>>> >> > > As stated in the motivation section in KIP-854
>>>> >> > > <
>>>> >> >
>>>> >>
>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-854+Separate+configuration+for+producer+ID+expiry
>>>> >> > >:
>>>> >> > >
>>>> >> > > With idempotent producers becoming the default in Kafka, this
>>>> means
>>>> >> that
>>>> >> > > unless otherwise specified, all new producers will be given
>>>> producer
>>>> >> IDs.
>>>> >> > > Some (inefficient) applications may now create many
>>>> non-transactional
>>>> >> > > idempotent producers. Each of these producers will be assigned a
>>>> >> producer
>>>> >> > > ID and these IDs and their metadata are stored in the broker
>>>> memory,
>>>> >> > which
>>>> >> > > might cause brokers out of memory.
>>>> >> > >
>>>> >> > > Justine (in cc.) and I and some other team members are working
>>>> on the
>>>> >> > > solutions for this issue. But none of them solves it completely
>>>> >> without
>>>> >> > > side effects. Among them, "availability" VS "idempotency
>>>> guarantees"
>>>> >> is
>>>> >> > > what we can't decide which to sacrifice. Some of these solutions
>>>> >> > sacrifice
>>>> >> > > availability of produce (1,2,5) and others sacrifice idempotency
>>>> >> > guarantees
>>>> >> > > (3). It could be useful to know if people generally have a
>>>> preference
>>>> >> one
>>>> >> > > way or the other. Or what other better solutions there might be.
>>>> >> > >
>>>> >> > > Here are the proposals we came up with:
>>>> >> > >
>>>> >> > > 1. Limit the total active producer ID allocation number.
>>>> >> > > -> This is the simplest solution. But since the OOM issue is
>>>> usually
>>>> >> > > caused by a rogue or misconfigured client, and this solution
>>>> might
>>>> >> > "punish"
>>>> >> > > the good client from sending messages.
>>>> >> > >
>>>> >> > > 2. Throttling the producer ID allocation rate
>>>> >> > > -> Same concern as the solution #1.
>>>> >> > >
>>>> >> > > 3. Having a limit to the number of active producer IDs (sort of
>>>> like
>>>> >> an
>>>> >> > > LRU cache)
>>>> >> > > -> The idea here is that if we hit a misconfigured client, we
>>>> will
>>>> >> expire
>>>> >> > > the older entries. The concern here is we have risks to lose
>>>> >> idempotency
>>>> >> > > guarantees, and currently, we don't have a way to notify clients
>>>> about
>>>> >> > > losing idempotency guarantees. Besides, the least  recently used
>>>> >> entries
>>>> >> > > got removed are not always from the "bad" clients.
>>>> >> > >
>>>> >> > > 4. allow clients to "close" the producer ID usage
>>>> >> > > -> We can provide a way for producer to "close" producerID usage.
>>>> >> > > Currently, we only have a way to INIT_PRODUCER_ID requested to
>>>> >> allocate
>>>> >> > > one. After that, we'll keep the producer ID metadata in broker
>>>> even if
>>>> >> > the
>>>> >> > > producer is "closed". Having a closed API (ex: END_PRODUCER_ID),
>>>> we
>>>> >> can
>>>> >> > > remove the entry from broker side. In client side, we can send
>>>> it when
>>>> >> > > producer closing. The concern is, the old clients (including
>>>> non-java
>>>> >> > > clients) will still suffer from the OOM issue.
>>>> >> > >
>>>> >> > > 5. limit/throttling the producer id based on the principle
>>>> >> > > -> Although we can limit the impact to a certain principle with
>>>> this
>>>> >> > idea,
>>>> >> > > same concern still exists as solution #1 #2.
>>>> >> > >
>>>> >> > > Any thoughts/feedback are welcomed.
>>>> >> > >
>>>> >> > > Thank you.
>>>> >> > > Luke
>>>> >> > >
>>>> >> >
>>>> >>
>>>> >
>>>>
>>>
> On Wed, Feb 15, 2023 at 12:20 AM Justine Olshan <jols...@confluent.io>
> wrote:
>
>> Hey Omnia,
>>
>> Thanks for the response. I think I understand your explanations here with
>> respect to principal and clientId usage.
>>
>> For the throttling -- handleInitProducerIdRequest will allocate the ID to
>> the producer, but we don't actually store it on the broker or increment our
>> metric until the first produce request for that producer is sent (or sent
>> again after previously expiring). Would you consider throttling the produce
>> request instead? It may be hard to get any metrics from the transaction
>> coordinator where the initProducerId request is handled.
>>
>> Justine
>>
>> On Tue, Feb 14, 2023 at 9:29 AM Omnia Ibrahim <o.g.h.ibra...@gmail.com>
>> wrote:
>>
>>> Hey Justine,
>>> > If I understand your message correctly, there are issues with
>>> identifying the source of the rogue clients? So you propose to add a new
>>> metric for that?
>>> > And also proposing to throttle based on clientId as a potential follow
>>> up?
>>> I want to identify rogue clients by KafkaPrincipal (and/or clientId)
>>> similarly to how we identify clients in Fetch/Produce/Request
>>> QuotaManagers. Using KafkaPrincipal should give cluster admin the ability
>>> to throttle later based on principal which is most likely to be a smaller
>>> set than clientIds. My initial thought was to add a metrics that represent
>>> how many InitProducerIDRequest are sent by KafkaPrincipal (and/or clientId)
>>> similar to Fetch/Produce QuotaManagers.
>>> Then as a follow-up, we can throttle based on either KafkaPrinciple or
>>> clientId (maybe default as well to align this with other QuotaManagers in
>>> Kafka).
>>>
>>> >1. Does we rely on the client using the same ID? What if there are many
>>> clients that all use different client IDs?
>>> This is why I want to use the combination of KafkaPrincipal or clientId
>>> similar to some other quotas we have in Kafka already. This will be a
>>> similar risk to Fetch/Produce quota in Kafka which also relay on the client
>>> to use the same clientId and KafkaPrincipal.
>>>
>>> >2. Are there places where high cardinality of this metric is a concern?
>>> I can imagine many client IDs in the system. Would we treat this as a rate
>>> metric (ie, when we get an init producer ID and return a new producer ID we
>>> emit a count for that client id?) Or something else?
>>> My initial thought here was to follow the steps of ClientQuotaManager
>>> and ClientRequestQuotaManager and use a rate metric. However, I think we
>>> can emit it either
>>>
>>>    1. when we return the new PID. However, I have concerns that we may
>>>    circle back to the previous concerns with OMM due to keeping track of
>>>    ACTIVE PIDs per KafkaPrincipal(and/or) clientId in the future. Also this
>>>    would be the first time Kafka throttle IDs for any client.
>>>    2. or once we recieve initProducerIDRequest and throttle before even
>>>    hitting `handleInitProducerIdRequest`. Going this direction we may need 
>>> to
>>>    throttle it within a different quota window than `
>>>    quota.window.size.seconds ` as throttling INIT_PRODUCER_ID request
>>>    per second wouldn't be efficient for most cases. I personally think this
>>>    direction is easier as it seems more consistent with the existing quota
>>>    implementation. Specially that Kafka has already the concept of 
>>> throttling
>>>    subset of requests (in ControllerMutationQuotaManager) but never had any
>>>    concept of throttling active IDs for any client.
>>>
>>>
>>> What do you think?
>>>
>>> Thanks
>>> Omnia
>>>
>>> On Thu, Feb 2, 2023 at 5:39 PM Justine Olshan <jols...@confluent.io>
>>> wrote:
>>>
>>>> Hey Omnia,
>>>> Sorry for losing track of this.
>>>>
>>>> If I understand your message correctly, there are issues with
>>>> identifying the source of the rogue clients? So you propose to add a new
>>>> metric for that?
>>>> And also proposing to throttle based on clientId as a potential follow
>>>> up?
>>>>
>>>> I think both of these make sense. The only things I can think of for
>>>> the metric are:
>>>> 1. Does we rely on the client using the same ID? What if there are many
>>>> clients that all use different client IDs?
>>>> 2. Are there places where high cardinality of this metric is a concern?
>>>> I can imagine many client IDs in the system. Would we treat this as a rate
>>>> metric (ie, when we get an init producer ID and return a new producer ID we
>>>> emit a count for that client id?) Or something else?
>>>>
>>>> Thanks,
>>>> Justine
>>>>
>>>> On Thu, Feb 2, 2023 at 8:44 AM Omnia Ibrahim <o.g.h.ibra...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Luke and Justine,
>>>>> Are there any thoughts or updates on this? I would love to help with
>>>>> this as we are hitting this more frequently now.
>>>>>
>>>>> best,
>>>>>
>>>>> On Mon, Oct 31, 2022 at 6:15 PM Omnia Ibrahim <o.g.h.ibra...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Luke and Justine,
>>>>>>
>>>>>>> For (3), you said:
>>>>>>> > - I have some concerns about the impact of this option on the
>>>>>>> transactional
>>>>>>> producers, for example, what will happen to an ongoing transaction
>>>>>>> associated with an expired PID? Would this leave the transactions in
>>>>>>> a
>>>>>>> "hanging" state?
>>>>>>>
>>>>>>> - How will we notify the client that the transaction can't continue
>>>>>>> due to
>>>>>>> an expired PID?
>>>>>>>
>>>>>>> - If PID got marked as `expired` this will mean that
>>>>>>> `admin.DescribeProducers` will not list them which will make
>>>>>>> *`kafka-transactions.sh
>>>>>>> --list`* a bit tricky as we can't identify if there are transactions
>>>>>>> linked
>>>>>>> to this expired PID or not. The same concern applies to
>>>>>>> *`kafka-transactions.sh
>>>>>>> --find-hanging`*.
>>>>>>>
>>>>>>> --> Yes, you're right. Those are also concerns for this solution.
>>>>>>> Currently, there's no way to notify clients about the expiration.
>>>>>>> Also, the ongoing transactions will be hanging. For the admin cli,
>>>>>>> we've
>>>>>>> never thought about it. Good point.
>>>>>>> In summary, to adopt this solution, there are many issues needed to
>>>>>>> get
>>>>>>> fixed.
>>>>>>>
>>>>>>
>>>>>> Justin already clarified that if PID is attached to a transaction it
>>>>>> will not expire so identifying the transactions shouldn't be a concern
>>>>>> anymore.
>>>>>> The only concern here will be that this solution will not solve the
>>>>>> problem if the rouge producer is a transactional producer with hanging
>>>>>> transactions.
>>>>>> If anyone faced this situation they will need to abort the hanging
>>>>>> transactions manually and then the solution to expire a PID can then 
>>>>>> work.
>>>>>>
>>>>>> --> Yes, I mean KafkaPrinciple (sorry, I didn't make it clear)
>>>>>>> Yes, We were thinking about throttling by KafkaPrinciple. Client Id
>>>>>>> is also
>>>>>>> workable.
>>>>>>> It's just these 2 attributes are not required.
>>>>>>> That is, it's possible we take all clients as the same one: {default
>>>>>>> KafkaPrinciple + default clientID}, and apply throttling on it.
>>>>>>> Do you have any thoughts about it?
>>>>>>> Maybe skip throttling for {default KafkaPrinciple + default
>>>>>>> clientID}
>>>>>>>
>>>>>>
>>>>>> Throttling for default KafkaPrinciple and default ClientID is useful
>>>>>> when we need to have a hard limit on the whole cluster and whoever is
>>>>>> running the cluster doesn't knowclientsntIDs or if a KafkaPrinciple is
>>>>>> reused between different producer applications.
>>>>>> I usually find it helpful to have a way to apply throttling only on
>>>>>> the rough clients only once I identify them without punishing everyone on
>>>>>> the cluster. However, there are two problems with this
>>>>>> - There's no easy way at the moment to link PIDs to clientId or
>>>>>> KafkaPrinciple. This need to be addressed first.
>>>>>> - Is Justin's comment on the throttling, and the fact that will mean
>>>>>> we either block all requests or have to store the request in memory which
>>>>>> in both cases has side downs on the producer experince.
>>>>>>
>>>>>> I recently had another discussion with my team and it does seem like
>>>>>>> there
>>>>>>> should be a way to make it more clear to the clients what is going
>>>>>>> on. A
>>>>>>> lot of this protocol is implicit. I'm wondering if maybe there is a
>>>>>>> way to
>>>>>>> improve the story for newer clients. (Ie if we choose to expire
>>>>>>> based on a
>>>>>>> size limit, we should include a response indicating the ID has
>>>>>>> expired.) We
>>>>>>> also discussed ways to redefine the guarantees so that users who have
>>>>>>> stronger idempotency requirements can ensure them (over
>>>>>>> availability/memory
>>>>>>> concerns). Let me know if you have any ideas here.
>>>>>>>
>>>>>>
>>>>>> It may be easier to improve the experience for new clients. However,
>>>>>> if we improved only the new clients we may need a way to help teams who 
>>>>>> run
>>>>>> Kafka with rough clients on old versions by at least giving them an easy
>>>>>> way to identify the clientId/ or KafkaPrinciple that generated these 
>>>>>> PIDs.
>>>>>>
>>>>>> For context, it's very tricky to even identify which clientId is
>>>>>> creating all these PIDs that caused OOM, which is a contributing part of
>>>>>> the issue at the moment. So maybe one option here could be adding a new
>>>>>> metric that tracks the number of generated PIDs per clientId. This will
>>>>>> help the team who runs the Kafka cluster to
>>>>>> - contact these rough clients and ask them to fix their clients or
>>>>>> upgrade to a new client if the new client version has a better 
>>>>>> experience.
>>>>>> - or if ended with a throttling solution this may help identify which
>>>>>> clientId needs to be throttled.
>>>>>>
>>>>>> Maybe we can start with a solution for identifying the rough clients
>>>>>> first and keep looking for a solution to limit them, what do you think?
>>>>>>
>>>>>> Thanks
>>>>>>
>>>>>> On Tue, Oct 18, 2022 at 5:24 PM Justine Olshan
>>>>>> <jols...@confluent.io.invalid> wrote:
>>>>>>
>>>>>>> Oops.  I realized I just replied to Omnia 🤦‍♀️
>>>>>>>
>>>>>>> Here was my response for the mailing thread:
>>>>>>>
>>>>>>> Hey Omnia,
>>>>>>> Sorry to hear this is a problem for you as well. :(
>>>>>>> > * I have some concerns about the impact of this option on the
>>>>>>> transactional producers, for example, what will happen to an ongoing
>>>>>>> transaction associated with an expired PID? Would this leave the
>>>>>>> transactions in a "hanging" state?*
>>>>>>> We currently check if a transaction is ongoing and do not expire the
>>>>>>> producer ID if it has an ongoing transaction. I suspect we will
>>>>>>> continue to
>>>>>>> do this with any solution we pick.
>>>>>>>
>>>>>>> My team members and I looked a bit into the throttling case and it
>>>>>>> can get
>>>>>>> a bit tricky since it means we need to throttle the produce request
>>>>>>> before
>>>>>>> it is processed. This means we either block all requests or have to
>>>>>>> store
>>>>>>> the request in memory (which is not great if we are trying to save
>>>>>>> memory).
>>>>>>>
>>>>>>> I recently had another discussion with my team and it does seem like
>>>>>>> there
>>>>>>> should be a way to make it more clear to the clients what is going
>>>>>>> on. A
>>>>>>> lot of this protocol is implicit. I'm wondering if maybe there is a
>>>>>>> way to
>>>>>>> improve the story for newer clients. (Ie if we choose to expire
>>>>>>> based on a
>>>>>>> size limit, we should include a response indicating the ID has
>>>>>>> expired.) We
>>>>>>> also discussed ways to redefine the guarantees so that users who have
>>>>>>> stronger idempotency requirements can ensure them (over
>>>>>>> availability/memory
>>>>>>> concerns). Let me know if you have any ideas here.
>>>>>>>
>>>>>>> Thanks again for commenting here, hopefully we can come to a good
>>>>>>> solution.
>>>>>>>
>>>>>>> On Tue, Oct 18, 2022 at 1:11 AM Luke Chen <show...@gmail.com> wrote:
>>>>>>>
>>>>>>> > Hi Omnia,
>>>>>>> >
>>>>>>> > Thanks for your reply.
>>>>>>> >
>>>>>>> > For (3), you said:
>>>>>>> > > - I have some concerns about the impact of this option on the
>>>>>>> > transactional
>>>>>>> > producers, for example, what will happen to an ongoing transaction
>>>>>>> > associated with an expired PID? Would this leave the transactions
>>>>>>> in a
>>>>>>> > "hanging" state?
>>>>>>> >
>>>>>>> > - How will we notify the client that the transaction can't
>>>>>>> continue due to
>>>>>>> > an expired PID?
>>>>>>> >
>>>>>>> > - If PID got marked as `expired` this will mean that
>>>>>>> > `admin.DescribeProducers` will not list them which will make
>>>>>>> > *`kafka-transactions.sh
>>>>>>> > --list`* a bit tricky as we can't identify if there are
>>>>>>> transactions linked
>>>>>>> > to this expired PID or not. The same concern applies to
>>>>>>> > *`kafka-transactions.sh
>>>>>>> > --find-hanging`*.
>>>>>>> >
>>>>>>> > --> Yes, you're right. Those are also concerns for this solution.
>>>>>>> > Currently, there's no way to notify clients about the expiration.
>>>>>>> > Also, the ongoing transactions will be hanging. For the admin cli,
>>>>>>> we've
>>>>>>> > never thought about it. Good point.
>>>>>>> > In summary, to adopt this solution, there are many issues needed
>>>>>>> to get
>>>>>>> > fixed.
>>>>>>> >
>>>>>>> >
>>>>>>> > For (5), you said:
>>>>>>> > > I am assuming you mean KafkaPrincipal here! If so is your
>>>>>>> concern here
>>>>>>> > that
>>>>>>> > those good clients that use the same principal as a rogue one will
>>>>>>> get
>>>>>>> > throttled?
>>>>>>> >
>>>>>>> > If this is the case, then I believe it should be okay as other
>>>>>>> throttling
>>>>>>> > in Kafka on *`/config/users/<user>`* has the same behaviour.
>>>>>>> >
>>>>>>> >
>>>>>>> > What about applying limit/throttling to
>>>>>>> > *`/config/users/<user>/clients/<client-id>`
>>>>>>> > *similar to what we have with client quota? This should reduce the
>>>>>>> concern
>>>>>>> > about throttling good clients, right?
>>>>>>> >
>>>>>>> > --> Yes, I mean KafkaPrinciple (sorry, I didn't make it clear)
>>>>>>> > Yes, We were thinking about throttling by KafkaPrinciple. Client
>>>>>>> Id is
>>>>>>> > also workable.
>>>>>>> > It's just these 2 attributes are not required.
>>>>>>> > That is, it's possible we take all clients as the same one:
>>>>>>> {default
>>>>>>> > KafkaPrinciple + default clientID}, and apply throttling on it.
>>>>>>> > Do you have any thoughts about it?
>>>>>>> > Maybe skip throttling for {default KafkaPrinciple + default
>>>>>>> clientID} ?
>>>>>>> >
>>>>>>> > Luke
>>>>>>> >
>>>>>>> >
>>>>>>> >
>>>>>>> > On Sat, Oct 15, 2022 at 2:33 AM Omnia Ibrahim <
>>>>>>> o.g.h.ibra...@gmail.com>
>>>>>>> > wrote:
>>>>>>> >
>>>>>>> >> Hi Luke & Justine,
>>>>>>> >> Thanks for looking into this issue, we have been experiencing
>>>>>>> this because
>>>>>>> >> of rouge clients as well.
>>>>>>> >>
>>>>>>> >> > 3. Having a limit to the number of active producer IDs (sort of
>>>>>>> like an
>>>>>>> >> LRU
>>>>>>> >> >cache)
>>>>>>> >> >-> The idea here is that if we hit a misconfigured client, we
>>>>>>> will expire
>>>>>>> >> >the older entries. The concern here is we have risks to lose
>>>>>>> idempotency
>>>>>>> >> >guarantees, and currently, we don't have a way to notify clients
>>>>>>> about
>>>>>>> >> >losing idempotency guarantees. Besides, the least  recently used
>>>>>>> entries
>>>>>>> >> >got removed are not always from the "bad" clients.
>>>>>>> >>
>>>>>>> >> - I have some concerns about the impact of this option on the
>>>>>>> >> transactional
>>>>>>> >> producers, for example, what will happen to an ongoing transaction
>>>>>>> >> associated with an expired PID? Would this leave the transactions
>>>>>>> in a
>>>>>>> >> "hanging" state?
>>>>>>> >>
>>>>>>> >> - How will we notify the client that the transaction can't
>>>>>>> continue due to
>>>>>>> >> an expired PID?
>>>>>>> >>
>>>>>>> >> - If PID got marked as `expired` this will mean that
>>>>>>> >> `admin.DescribeProducers` will not list them which will make
>>>>>>> >> *`kafka-transactions.sh
>>>>>>> >> --list`* a bit tricky as we can't identify if there are
>>>>>>> transactions
>>>>>>> >> linked
>>>>>>> >> to this expired PID or not. The same concern applies to
>>>>>>> >> *`kafka-transactions.sh
>>>>>>> >> --find-hanging`*.
>>>>>>> >>
>>>>>>> >>
>>>>>>> >> >5. limit/throttling the producer id based on the principle
>>>>>>> >> >-> Although we can limit the impact to a certain principle with
>>>>>>> this
>>>>>>> >> idea,
>>>>>>> >> >same concern still exists as solution #1 #2.
>>>>>>> >>
>>>>>>> >> I am assuming you mean KafkaPrincipal here! If so is your concern
>>>>>>> here
>>>>>>> >> that
>>>>>>> >> those good clients that use the same principal as a rogue one
>>>>>>> will get
>>>>>>> >> throttled?
>>>>>>> >>
>>>>>>> >> If this is the case, then I believe it should be okay as other
>>>>>>> throttling
>>>>>>> >> in Kafka on *`/config/users/<user>`* has the same behaviour.
>>>>>>> >>
>>>>>>> >>
>>>>>>> >> What about applying limit/throttling to
>>>>>>> >> *`/config/users/<user>/clients/<client-id>`
>>>>>>> >> *similar to what we have with client quota? This should reduce
>>>>>>> the concern
>>>>>>> >> about throttling good clients, right?
>>>>>>> >>
>>>>>>> >> best,
>>>>>>> >>
>>>>>>> >> Omnia
>>>>>>> >>
>>>>>>> >> On Tue, Oct 11, 2022 at 4:18 AM Luke Chen <show...@gmail.com>
>>>>>>> wrote:
>>>>>>> >>
>>>>>>> >> > Bump this thread to see if there are any comments/thoughts.
>>>>>>> >> > Thanks.
>>>>>>> >> >
>>>>>>> >> > Luke
>>>>>>> >> >
>>>>>>> >> > On Mon, Sep 26, 2022 at 11:06 AM Luke Chen <show...@gmail.com>
>>>>>>> wrote:
>>>>>>> >> >
>>>>>>> >> > > Hi devs,
>>>>>>> >> > >
>>>>>>> >> > > As stated in the motivation section in KIP-854
>>>>>>> >> > > <
>>>>>>> >> >
>>>>>>> >>
>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-854+Separate+configuration+for+producer+ID+expiry
>>>>>>> >> > >:
>>>>>>> >> > >
>>>>>>> >> > > With idempotent producers becoming the default in Kafka, this
>>>>>>> means
>>>>>>> >> that
>>>>>>> >> > > unless otherwise specified, all new producers will be given
>>>>>>> producer
>>>>>>> >> IDs.
>>>>>>> >> > > Some (inefficient) applications may now create many
>>>>>>> non-transactional
>>>>>>> >> > > idempotent producers. Each of these producers will be
>>>>>>> assigned a
>>>>>>> >> producer
>>>>>>> >> > > ID and these IDs and their metadata are stored in the broker
>>>>>>> memory,
>>>>>>> >> > which
>>>>>>> >> > > might cause brokers out of memory.
>>>>>>> >> > >
>>>>>>> >> > > Justine (in cc.) and I and some other team members are
>>>>>>> working on the
>>>>>>> >> > > solutions for this issue. But none of them solves it
>>>>>>> completely
>>>>>>> >> without
>>>>>>> >> > > side effects. Among them, "availability" VS "idempotency
>>>>>>> guarantees"
>>>>>>> >> is
>>>>>>> >> > > what we can't decide which to sacrifice. Some of these
>>>>>>> solutions
>>>>>>> >> > sacrifice
>>>>>>> >> > > availability of produce (1,2,5) and others sacrifice
>>>>>>> idempotency
>>>>>>> >> > guarantees
>>>>>>> >> > > (3). It could be useful to know if people generally have a
>>>>>>> preference
>>>>>>> >> one
>>>>>>> >> > > way or the other. Or what other better solutions there might
>>>>>>> be.
>>>>>>> >> > >
>>>>>>> >> > > Here are the proposals we came up with:
>>>>>>> >> > >
>>>>>>> >> > > 1. Limit the total active producer ID allocation number.
>>>>>>> >> > > -> This is the simplest solution. But since the OOM issue is
>>>>>>> usually
>>>>>>> >> > > caused by a rogue or misconfigured client, and this solution
>>>>>>> might
>>>>>>> >> > "punish"
>>>>>>> >> > > the good client from sending messages.
>>>>>>> >> > >
>>>>>>> >> > > 2. Throttling the producer ID allocation rate
>>>>>>> >> > > -> Same concern as the solution #1.
>>>>>>> >> > >
>>>>>>> >> > > 3. Having a limit to the number of active producer IDs (sort
>>>>>>> of like
>>>>>>> >> an
>>>>>>> >> > > LRU cache)
>>>>>>> >> > > -> The idea here is that if we hit a misconfigured client, we
>>>>>>> will
>>>>>>> >> expire
>>>>>>> >> > > the older entries. The concern here is we have risks to lose
>>>>>>> >> idempotency
>>>>>>> >> > > guarantees, and currently, we don't have a way to notify
>>>>>>> clients about
>>>>>>> >> > > losing idempotency guarantees. Besides, the least  recently
>>>>>>> used
>>>>>>> >> entries
>>>>>>> >> > > got removed are not always from the "bad" clients.
>>>>>>> >> > >
>>>>>>> >> > > 4. allow clients to "close" the producer ID usage
>>>>>>> >> > > -> We can provide a way for producer to "close" producerID
>>>>>>> usage.
>>>>>>> >> > > Currently, we only have a way to INIT_PRODUCER_ID requested to
>>>>>>> >> allocate
>>>>>>> >> > > one. After that, we'll keep the producer ID metadata in
>>>>>>> broker even if
>>>>>>> >> > the
>>>>>>> >> > > producer is "closed". Having a closed API (ex:
>>>>>>> END_PRODUCER_ID), we
>>>>>>> >> can
>>>>>>> >> > > remove the entry from broker side. In client side, we can
>>>>>>> send it when
>>>>>>> >> > > producer closing. The concern is, the old clients (including
>>>>>>> non-java
>>>>>>> >> > > clients) will still suffer from the OOM issue.
>>>>>>> >> > >
>>>>>>> >> > > 5. limit/throttling the producer id based on the principle
>>>>>>> >> > > -> Although we can limit the impact to a certain principle
>>>>>>> with this
>>>>>>> >> > idea,
>>>>>>> >> > > same concern still exists as solution #1 #2.
>>>>>>> >> > >
>>>>>>> >> > > Any thoughts/feedback are welcomed.
>>>>>>> >> > >
>>>>>>> >> > > Thank you.
>>>>>>> >> > > Luke
>>>>>>> >> > >
>>>>>>> >> >
>>>>>>> >>
>>>>>>> >
>>>>>>>
>>>>>>

Reply via email to