*Hi Justine, *

*My initial thought of throttling the initProducerId was to get ripped off
the problem at the source (which creates too many PIDs per client) and fail
faster but if having this on the produce request level is easier this
should be fine. I am guessing it will be the same direction as we may
ClientQuotaManage for Produce throttling with a different quota window than
`quota.window.size.seconds `. *

*If this is good as an initial solution I can put start a KIP and see what
the wider community feels about this. *

*Also, I noticed that at some point one of us hit "Replay" instead of
"Replay to All" :)  So here are the previous conversations*

*On Wed, Feb 15, 2023 at 12:20 AM Justine Olshan <jols...@confluent.io
<jols...@confluent.io>> wrote:*

> Hey Omnia,
>
> Thanks for the response. I think I understand your explanations here with
> respect to principal and clientId usage.
>
> For the throttling -- handleInitProducerIdRequest will allocate the ID to
> the producer, but we don't actually store it on the broker or increment our
> metric until the first produce request for that producer is sent (or sent
> again after previously expiring). Would you consider throttling the produce
> request instead? It may be hard to get any metrics from the transaction
> coordinator where the initProducerId request is handled.
>
> Justine


*On Tue, Feb 14, 2023 at 9:29 AM Omnia Ibrahim <o.g.h.ibra...@gmail.com
<o.g.h.ibra...@gmail.com>> wrote:*

> Hey Justine,
> > If I understand your message correctly, there are issues with
> identifying the source of the rogue clients? So you propose to add a new
> metric for that?
> > And also proposing to throttle based on clientId as a potential follow
> up?
> I want to identify rogue clients by KafkaPrincipal (and/or clientId)
> similarly to how we identify clients in Fetch/Produce/Request
> QuotaManagers. Using KafkaPrincipal should give cluster admin the ability
> to throttle later based on principal which is most likely to be a smaller
> set than clientIds. My initial thought was to add a metrics that represent
> how many InitProducerIDRequest are sent by KafkaPrincipal (and/or clientId)
> similar to Fetch/Produce QuotaManagers.
> Then as a follow-up, we can throttle based on either KafkaPrinciple or
> clientId (maybe default as well to align this with other QuotaManagers in
> Kafka).
>
> >1. Does we rely on the client using the same ID? What if there are many
> clients that all use different client IDs?
> This is why I want to use the combination of KafkaPrincipal or clientId
> similar to some other quotas we have in Kafka already. This will be a
> similar risk to Fetch/Produce quota in Kafka which also relay on the client
> to use the same clientId and KafkaPrincipal.
>
> >2. Are there places where high cardinality of this metric is a concern? I
> can imagine many client IDs in the system. Would we treat this as a rate
> metric (ie, when we get an init producer ID and return a new producer ID we
> emit a count for that client id?) Or something else?
> My initial thought here was to follow the steps of ClientQuotaManager and
> ClientRequestQuotaManager and use a rate metric. However, I think we can
> emit it either
>
>    1. when we return the new PID. However, I have concerns that we may
>    circle back to the previous concerns with OMM due to keeping track of
>    ACTIVE PIDs per KafkaPrincipal(and/or) clientId in the future. Also this
>    would be the first time Kafka throttle IDs for any client.
>    2. or once we recieve initProducerIDRequest and throttle before even
>    hitting `handleInitProducerIdRequest`. Going this direction we may need to
>    throttle it within a different quota window than `
>    quota.window.size.seconds ` as throttling INIT_PRODUCER_ID request per
>    second wouldn't be efficient for most cases. I personally think this
>    direction is easier as it seems more consistent with the existing quota
>    implementation. Specially that Kafka has already the concept of throttling
>    subset of requests (in ControllerMutationQuotaManager) but never had any
>    concept of throttling active IDs for any client.
>
>
> What do you think?
>
> Thanks
> Omnia
>

*On Thu, Feb 2, 2023 at 5:39 PM Justine Olshan <jols...@confluent.io
<jols...@confluent.io>> wrote:*

> Hey Omnia,
> Sorry for losing track of this.
>
> If I understand your message correctly, there are issues with identifying
> the source of the rogue clients? So you propose to add a new metric for
> that?
> And also proposing to throttle based on clientId as a potential follow up?
>
> I think both of these make sense. The only things I can think of for the
> metric are:
> 1. Does we rely on the client using the same ID? What if there are many
> clients that all use different client IDs?
> 2. Are there places where high cardinality of this metric is a concern? I
> can imagine many client IDs in the system. Would we treat this as a rate
> metric (ie, when we get an init producer ID and return a new producer ID we
> emit a count for that client id?) Or something else?
>
> Thanks,
> Justine
>

Thanks
Omnia

On Thu, Feb 2, 2023 at 4:44 PM Omnia Ibrahim <o.g.h.ibra...@gmail.com>
wrote:

> Hi Luke and Justine,
> Are there any thoughts or updates on this? I would love to help with this
> as we are hitting this more frequently now.
>
> best,
>
> On Mon, Oct 31, 2022 at 6:15 PM Omnia Ibrahim <o.g.h.ibra...@gmail.com>
> wrote:
>
>> Hi Luke and Justine,
>>
>>> For (3), you said:
>>> > - I have some concerns about the impact of this option on the
>>> transactional
>>> producers, for example, what will happen to an ongoing transaction
>>> associated with an expired PID? Would this leave the transactions in a
>>> "hanging" state?
>>>
>>> - How will we notify the client that the transaction can't continue due
>>> to
>>> an expired PID?
>>>
>>> - If PID got marked as `expired` this will mean that
>>> `admin.DescribeProducers` will not list them which will make
>>> *`kafka-transactions.sh
>>> --list`* a bit tricky as we can't identify if there are transactions
>>> linked
>>> to this expired PID or not. The same concern applies to
>>> *`kafka-transactions.sh
>>> --find-hanging`*.
>>>
>>> --> Yes, you're right. Those are also concerns for this solution.
>>> Currently, there's no way to notify clients about the expiration.
>>> Also, the ongoing transactions will be hanging. For the admin cli, we've
>>> never thought about it. Good point.
>>> In summary, to adopt this solution, there are many issues needed to get
>>> fixed.
>>>
>>
>> Justin already clarified that if PID is attached to a transaction it will
>> not expire so identifying the transactions shouldn't be a concern anymore.
>> The only concern here will be that this solution will not solve the
>> problem if the rouge producer is a transactional producer with hanging
>> transactions.
>> If anyone faced this situation they will need to abort the hanging
>> transactions manually and then the solution to expire a PID can then work.
>>
>> --> Yes, I mean KafkaPrinciple (sorry, I didn't make it clear)
>>> Yes, We were thinking about throttling by KafkaPrinciple. Client Id is
>>> also
>>> workable.
>>> It's just these 2 attributes are not required.
>>> That is, it's possible we take all clients as the same one: {default
>>> KafkaPrinciple + default clientID}, and apply throttling on it.
>>> Do you have any thoughts about it?
>>> Maybe skip throttling for {default KafkaPrinciple + default clientID}
>>>
>>
>> Throttling for default KafkaPrinciple and default ClientID is useful when
>> we need to have a hard limit on the whole cluster and whoever is running
>> the cluster doesn't knowclientsntIDs or if a KafkaPrinciple is reused
>> between different producer applications.
>> I usually find it helpful to have a way to apply throttling only on the
>> rough clients only once I identify them without punishing everyone on the
>> cluster. However, there are two problems with this
>> - There's no easy way at the moment to link PIDs to clientId or
>> KafkaPrinciple. This need to be addressed first.
>> - Is Justin's comment on the throttling, and the fact that will mean we
>> either block all requests or have to store the request in memory which in
>> both cases has side downs on the producer experince.
>>
>> I recently had another discussion with my team and it does seem like there
>>> should be a way to make it more clear to the clients what is going on. A
>>> lot of this protocol is implicit. I'm wondering if maybe there is a way
>>> to
>>> improve the story for newer clients. (Ie if we choose to expire based on
>>> a
>>> size limit, we should include a response indicating the ID has expired.)
>>> We
>>> also discussed ways to redefine the guarantees so that users who have
>>> stronger idempotency requirements can ensure them (over
>>> availability/memory
>>> concerns). Let me know if you have any ideas here.
>>>
>>
>> It may be easier to improve the experience for new clients. However, if
>> we improved only the new clients we may need a way to help teams who run
>> Kafka with rough clients on old versions by at least giving them an easy
>> way to identify the clientId/ or KafkaPrinciple that generated these PIDs.
>>
>> For context, it's very tricky to even identify which clientId is creating
>> all these PIDs that caused OOM, which is a contributing part of the issue
>> at the moment. So maybe one option here could be adding a new metric that
>> tracks the number of generated PIDs per clientId. This will help the team
>> who runs the Kafka cluster to
>> - contact these rough clients and ask them to fix their clients or
>> upgrade to a new client if the new client version has a better experience.
>> - or if ended with a throttling solution this may help identify which
>> clientId needs to be throttled.
>>
>> Maybe we can start with a solution for identifying the rough clients
>> first and keep looking for a solution to limit them, what do you think?
>>
>> Thanks
>>
>> On Tue, Oct 18, 2022 at 5:24 PM Justine Olshan
>> <jols...@confluent.io.invalid> wrote:
>>
>>> Oops.  I realized I just replied to Omnia 🤦‍♀️
>>>
>>> Here was my response for the mailing thread:
>>>
>>> Hey Omnia,
>>> Sorry to hear this is a problem for you as well. :(
>>> > * I have some concerns about the impact of this option on the
>>> transactional producers, for example, what will happen to an ongoing
>>> transaction associated with an expired PID? Would this leave the
>>> transactions in a "hanging" state?*
>>> We currently check if a transaction is ongoing and do not expire the
>>> producer ID if it has an ongoing transaction. I suspect we will continue
>>> to
>>> do this with any solution we pick.
>>>
>>> My team members and I looked a bit into the throttling case and it can
>>> get
>>> a bit tricky since it means we need to throttle the produce request
>>> before
>>> it is processed. This means we either block all requests or have to store
>>> the request in memory (which is not great if we are trying to save
>>> memory).
>>>
>>> I recently had another discussion with my team and it does seem like
>>> there
>>> should be a way to make it more clear to the clients what is going on. A
>>> lot of this protocol is implicit. I'm wondering if maybe there is a way
>>> to
>>> improve the story for newer clients. (Ie if we choose to expire based on
>>> a
>>> size limit, we should include a response indicating the ID has expired.)
>>> We
>>> also discussed ways to redefine the guarantees so that users who have
>>> stronger idempotency requirements can ensure them (over
>>> availability/memory
>>> concerns). Let me know if you have any ideas here.
>>>
>>> Thanks again for commenting here, hopefully we can come to a good
>>> solution.
>>>
>>> On Tue, Oct 18, 2022 at 1:11 AM Luke Chen <show...@gmail.com> wrote:
>>>
>>> > Hi Omnia,
>>> >
>>> > Thanks for your reply.
>>> >
>>> > For (3), you said:
>>> > > - I have some concerns about the impact of this option on the
>>> > transactional
>>> > producers, for example, what will happen to an ongoing transaction
>>> > associated with an expired PID? Would this leave the transactions in a
>>> > "hanging" state?
>>> >
>>> > - How will we notify the client that the transaction can't continue
>>> due to
>>> > an expired PID?
>>> >
>>> > - If PID got marked as `expired` this will mean that
>>> > `admin.DescribeProducers` will not list them which will make
>>> > *`kafka-transactions.sh
>>> > --list`* a bit tricky as we can't identify if there are transactions
>>> linked
>>> > to this expired PID or not. The same concern applies to
>>> > *`kafka-transactions.sh
>>> > --find-hanging`*.
>>> >
>>> > --> Yes, you're right. Those are also concerns for this solution.
>>> > Currently, there's no way to notify clients about the expiration.
>>> > Also, the ongoing transactions will be hanging. For the admin cli,
>>> we've
>>> > never thought about it. Good point.
>>> > In summary, to adopt this solution, there are many issues needed to get
>>> > fixed.
>>> >
>>> >
>>> > For (5), you said:
>>> > > I am assuming you mean KafkaPrincipal here! If so is your concern
>>> here
>>> > that
>>> > those good clients that use the same principal as a rogue one will get
>>> > throttled?
>>> >
>>> > If this is the case, then I believe it should be okay as other
>>> throttling
>>> > in Kafka on *`/config/users/<user>`* has the same behaviour.
>>> >
>>> >
>>> > What about applying limit/throttling to
>>> > *`/config/users/<user>/clients/<client-id>`
>>> > *similar to what we have with client quota? This should reduce the
>>> concern
>>> > about throttling good clients, right?
>>> >
>>> > --> Yes, I mean KafkaPrinciple (sorry, I didn't make it clear)
>>> > Yes, We were thinking about throttling by KafkaPrinciple. Client Id is
>>> > also workable.
>>> > It's just these 2 attributes are not required.
>>> > That is, it's possible we take all clients as the same one: {default
>>> > KafkaPrinciple + default clientID}, and apply throttling on it.
>>> > Do you have any thoughts about it?
>>> > Maybe skip throttling for {default KafkaPrinciple + default clientID} ?
>>> >
>>> > Luke
>>> >
>>> >
>>> >
>>> > On Sat, Oct 15, 2022 at 2:33 AM Omnia Ibrahim <o.g.h.ibra...@gmail.com
>>> >
>>> > wrote:
>>> >
>>> >> Hi Luke & Justine,
>>> >> Thanks for looking into this issue, we have been experiencing this
>>> because
>>> >> of rouge clients as well.
>>> >>
>>> >> > 3. Having a limit to the number of active producer IDs (sort of
>>> like an
>>> >> LRU
>>> >> >cache)
>>> >> >-> The idea here is that if we hit a misconfigured client, we will
>>> expire
>>> >> >the older entries. The concern here is we have risks to lose
>>> idempotency
>>> >> >guarantees, and currently, we don't have a way to notify clients
>>> about
>>> >> >losing idempotency guarantees. Besides, the least  recently used
>>> entries
>>> >> >got removed are not always from the "bad" clients.
>>> >>
>>> >> - I have some concerns about the impact of this option on the
>>> >> transactional
>>> >> producers, for example, what will happen to an ongoing transaction
>>> >> associated with an expired PID? Would this leave the transactions in a
>>> >> "hanging" state?
>>> >>
>>> >> - How will we notify the client that the transaction can't continue
>>> due to
>>> >> an expired PID?
>>> >>
>>> >> - If PID got marked as `expired` this will mean that
>>> >> `admin.DescribeProducers` will not list them which will make
>>> >> *`kafka-transactions.sh
>>> >> --list`* a bit tricky as we can't identify if there are transactions
>>> >> linked
>>> >> to this expired PID or not. The same concern applies to
>>> >> *`kafka-transactions.sh
>>> >> --find-hanging`*.
>>> >>
>>> >>
>>> >> >5. limit/throttling the producer id based on the principle
>>> >> >-> Although we can limit the impact to a certain principle with this
>>> >> idea,
>>> >> >same concern still exists as solution #1 #2.
>>> >>
>>> >> I am assuming you mean KafkaPrincipal here! If so is your concern here
>>> >> that
>>> >> those good clients that use the same principal as a rogue one will get
>>> >> throttled?
>>> >>
>>> >> If this is the case, then I believe it should be okay as other
>>> throttling
>>> >> in Kafka on *`/config/users/<user>`* has the same behaviour.
>>> >>
>>> >>
>>> >> What about applying limit/throttling to
>>> >> *`/config/users/<user>/clients/<client-id>`
>>> >> *similar to what we have with client quota? This should reduce the
>>> concern
>>> >> about throttling good clients, right?
>>> >>
>>> >> best,
>>> >>
>>> >> Omnia
>>> >>
>>> >> On Tue, Oct 11, 2022 at 4:18 AM Luke Chen <show...@gmail.com> wrote:
>>> >>
>>> >> > Bump this thread to see if there are any comments/thoughts.
>>> >> > Thanks.
>>> >> >
>>> >> > Luke
>>> >> >
>>> >> > On Mon, Sep 26, 2022 at 11:06 AM Luke Chen <show...@gmail.com>
>>> wrote:
>>> >> >
>>> >> > > Hi devs,
>>> >> > >
>>> >> > > As stated in the motivation section in KIP-854
>>> >> > > <
>>> >> >
>>> >>
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-854+Separate+configuration+for+producer+ID+expiry
>>> >> > >:
>>> >> > >
>>> >> > > With idempotent producers becoming the default in Kafka, this
>>> means
>>> >> that
>>> >> > > unless otherwise specified, all new producers will be given
>>> producer
>>> >> IDs.
>>> >> > > Some (inefficient) applications may now create many
>>> non-transactional
>>> >> > > idempotent producers. Each of these producers will be assigned a
>>> >> producer
>>> >> > > ID and these IDs and their metadata are stored in the broker
>>> memory,
>>> >> > which
>>> >> > > might cause brokers out of memory.
>>> >> > >
>>> >> > > Justine (in cc.) and I and some other team members are working on
>>> the
>>> >> > > solutions for this issue. But none of them solves it completely
>>> >> without
>>> >> > > side effects. Among them, "availability" VS "idempotency
>>> guarantees"
>>> >> is
>>> >> > > what we can't decide which to sacrifice. Some of these solutions
>>> >> > sacrifice
>>> >> > > availability of produce (1,2,5) and others sacrifice idempotency
>>> >> > guarantees
>>> >> > > (3). It could be useful to know if people generally have a
>>> preference
>>> >> one
>>> >> > > way or the other. Or what other better solutions there might be.
>>> >> > >
>>> >> > > Here are the proposals we came up with:
>>> >> > >
>>> >> > > 1. Limit the total active producer ID allocation number.
>>> >> > > -> This is the simplest solution. But since the OOM issue is
>>> usually
>>> >> > > caused by a rogue or misconfigured client, and this solution might
>>> >> > "punish"
>>> >> > > the good client from sending messages.
>>> >> > >
>>> >> > > 2. Throttling the producer ID allocation rate
>>> >> > > -> Same concern as the solution #1.
>>> >> > >
>>> >> > > 3. Having a limit to the number of active producer IDs (sort of
>>> like
>>> >> an
>>> >> > > LRU cache)
>>> >> > > -> The idea here is that if we hit a misconfigured client, we will
>>> >> expire
>>> >> > > the older entries. The concern here is we have risks to lose
>>> >> idempotency
>>> >> > > guarantees, and currently, we don't have a way to notify clients
>>> about
>>> >> > > losing idempotency guarantees. Besides, the least  recently used
>>> >> entries
>>> >> > > got removed are not always from the "bad" clients.
>>> >> > >
>>> >> > > 4. allow clients to "close" the producer ID usage
>>> >> > > -> We can provide a way for producer to "close" producerID usage.
>>> >> > > Currently, we only have a way to INIT_PRODUCER_ID requested to
>>> >> allocate
>>> >> > > one. After that, we'll keep the producer ID metadata in broker
>>> even if
>>> >> > the
>>> >> > > producer is "closed". Having a closed API (ex: END_PRODUCER_ID),
>>> we
>>> >> can
>>> >> > > remove the entry from broker side. In client side, we can send it
>>> when
>>> >> > > producer closing. The concern is, the old clients (including
>>> non-java
>>> >> > > clients) will still suffer from the OOM issue.
>>> >> > >
>>> >> > > 5. limit/throttling the producer id based on the principle
>>> >> > > -> Although we can limit the impact to a certain principle with
>>> this
>>> >> > idea,
>>> >> > > same concern still exists as solution #1 #2.
>>> >> > >
>>> >> > > Any thoughts/feedback are welcomed.
>>> >> > >
>>> >> > > Thank you.
>>> >> > > Luke
>>> >> > >
>>> >> >
>>> >>
>>> >
>>>
>>
On Wed, Feb 15, 2023 at 12:20 AM Justine Olshan <jols...@confluent.io>
wrote:

> Hey Omnia,
>
> Thanks for the response. I think I understand your explanations here with
> respect to principal and clientId usage.
>
> For the throttling -- handleInitProducerIdRequest will allocate the ID to
> the producer, but we don't actually store it on the broker or increment our
> metric until the first produce request for that producer is sent (or sent
> again after previously expiring). Would you consider throttling the produce
> request instead? It may be hard to get any metrics from the transaction
> coordinator where the initProducerId request is handled.
>
> Justine
>
> On Tue, Feb 14, 2023 at 9:29 AM Omnia Ibrahim <o.g.h.ibra...@gmail.com>
> wrote:
>
>> Hey Justine,
>> > If I understand your message correctly, there are issues with
>> identifying the source of the rogue clients? So you propose to add a new
>> metric for that?
>> > And also proposing to throttle based on clientId as a potential follow
>> up?
>> I want to identify rogue clients by KafkaPrincipal (and/or clientId)
>> similarly to how we identify clients in Fetch/Produce/Request
>> QuotaManagers. Using KafkaPrincipal should give cluster admin the ability
>> to throttle later based on principal which is most likely to be a smaller
>> set than clientIds. My initial thought was to add a metrics that represent
>> how many InitProducerIDRequest are sent by KafkaPrincipal (and/or clientId)
>> similar to Fetch/Produce QuotaManagers.
>> Then as a follow-up, we can throttle based on either KafkaPrinciple or
>> clientId (maybe default as well to align this with other QuotaManagers in
>> Kafka).
>>
>> >1. Does we rely on the client using the same ID? What if there are many
>> clients that all use different client IDs?
>> This is why I want to use the combination of KafkaPrincipal or clientId
>> similar to some other quotas we have in Kafka already. This will be a
>> similar risk to Fetch/Produce quota in Kafka which also relay on the client
>> to use the same clientId and KafkaPrincipal.
>>
>> >2. Are there places where high cardinality of this metric is a concern?
>> I can imagine many client IDs in the system. Would we treat this as a rate
>> metric (ie, when we get an init producer ID and return a new producer ID we
>> emit a count for that client id?) Or something else?
>> My initial thought here was to follow the steps of ClientQuotaManager and
>> ClientRequestQuotaManager and use a rate metric. However, I think we can
>> emit it either
>>
>>    1. when we return the new PID. However, I have concerns that we may
>>    circle back to the previous concerns with OMM due to keeping track of
>>    ACTIVE PIDs per KafkaPrincipal(and/or) clientId in the future. Also this
>>    would be the first time Kafka throttle IDs for any client.
>>    2. or once we recieve initProducerIDRequest and throttle before even
>>    hitting `handleInitProducerIdRequest`. Going this direction we may need to
>>    throttle it within a different quota window than `
>>    quota.window.size.seconds ` as throttling INIT_PRODUCER_ID request
>>    per second wouldn't be efficient for most cases. I personally think this
>>    direction is easier as it seems more consistent with the existing quota
>>    implementation. Specially that Kafka has already the concept of throttling
>>    subset of requests (in ControllerMutationQuotaManager) but never had any
>>    concept of throttling active IDs for any client.
>>
>>
>> What do you think?
>>
>> Thanks
>> Omnia
>>
>> On Thu, Feb 2, 2023 at 5:39 PM Justine Olshan <jols...@confluent.io>
>> wrote:
>>
>>> Hey Omnia,
>>> Sorry for losing track of this.
>>>
>>> If I understand your message correctly, there are issues with
>>> identifying the source of the rogue clients? So you propose to add a new
>>> metric for that?
>>> And also proposing to throttle based on clientId as a potential follow
>>> up?
>>>
>>> I think both of these make sense. The only things I can think of for the
>>> metric are:
>>> 1. Does we rely on the client using the same ID? What if there are many
>>> clients that all use different client IDs?
>>> 2. Are there places where high cardinality of this metric is a concern?
>>> I can imagine many client IDs in the system. Would we treat this as a rate
>>> metric (ie, when we get an init producer ID and return a new producer ID we
>>> emit a count for that client id?) Or something else?
>>>
>>> Thanks,
>>> Justine
>>>
>>> On Thu, Feb 2, 2023 at 8:44 AM Omnia Ibrahim <o.g.h.ibra...@gmail.com>
>>> wrote:
>>>
>>>> Hi Luke and Justine,
>>>> Are there any thoughts or updates on this? I would love to help with
>>>> this as we are hitting this more frequently now.
>>>>
>>>> best,
>>>>
>>>> On Mon, Oct 31, 2022 at 6:15 PM Omnia Ibrahim <o.g.h.ibra...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Luke and Justine,
>>>>>
>>>>>> For (3), you said:
>>>>>> > - I have some concerns about the impact of this option on the
>>>>>> transactional
>>>>>> producers, for example, what will happen to an ongoing transaction
>>>>>> associated with an expired PID? Would this leave the transactions in a
>>>>>> "hanging" state?
>>>>>>
>>>>>> - How will we notify the client that the transaction can't continue
>>>>>> due to
>>>>>> an expired PID?
>>>>>>
>>>>>> - If PID got marked as `expired` this will mean that
>>>>>> `admin.DescribeProducers` will not list them which will make
>>>>>> *`kafka-transactions.sh
>>>>>> --list`* a bit tricky as we can't identify if there are transactions
>>>>>> linked
>>>>>> to this expired PID or not. The same concern applies to
>>>>>> *`kafka-transactions.sh
>>>>>> --find-hanging`*.
>>>>>>
>>>>>> --> Yes, you're right. Those are also concerns for this solution.
>>>>>> Currently, there's no way to notify clients about the expiration.
>>>>>> Also, the ongoing transactions will be hanging. For the admin cli,
>>>>>> we've
>>>>>> never thought about it. Good point.
>>>>>> In summary, to adopt this solution, there are many issues needed to
>>>>>> get
>>>>>> fixed.
>>>>>>
>>>>>
>>>>> Justin already clarified that if PID is attached to a transaction it
>>>>> will not expire so identifying the transactions shouldn't be a concern
>>>>> anymore.
>>>>> The only concern here will be that this solution will not solve the
>>>>> problem if the rouge producer is a transactional producer with hanging
>>>>> transactions.
>>>>> If anyone faced this situation they will need to abort the hanging
>>>>> transactions manually and then the solution to expire a PID can then work.
>>>>>
>>>>> --> Yes, I mean KafkaPrinciple (sorry, I didn't make it clear)
>>>>>> Yes, We were thinking about throttling by KafkaPrinciple. Client Id
>>>>>> is also
>>>>>> workable.
>>>>>> It's just these 2 attributes are not required.
>>>>>> That is, it's possible we take all clients as the same one: {default
>>>>>> KafkaPrinciple + default clientID}, and apply throttling on it.
>>>>>> Do you have any thoughts about it?
>>>>>> Maybe skip throttling for {default KafkaPrinciple + default clientID}
>>>>>>
>>>>>
>>>>> Throttling for default KafkaPrinciple and default ClientID is useful
>>>>> when we need to have a hard limit on the whole cluster and whoever is
>>>>> running the cluster doesn't knowclientsntIDs or if a KafkaPrinciple is
>>>>> reused between different producer applications.
>>>>> I usually find it helpful to have a way to apply throttling only on
>>>>> the rough clients only once I identify them without punishing everyone on
>>>>> the cluster. However, there are two problems with this
>>>>> - There's no easy way at the moment to link PIDs to clientId or
>>>>> KafkaPrinciple. This need to be addressed first.
>>>>> - Is Justin's comment on the throttling, and the fact that will mean
>>>>> we either block all requests or have to store the request in memory which
>>>>> in both cases has side downs on the producer experince.
>>>>>
>>>>> I recently had another discussion with my team and it does seem like
>>>>>> there
>>>>>> should be a way to make it more clear to the clients what is going
>>>>>> on. A
>>>>>> lot of this protocol is implicit. I'm wondering if maybe there is a
>>>>>> way to
>>>>>> improve the story for newer clients. (Ie if we choose to expire based
>>>>>> on a
>>>>>> size limit, we should include a response indicating the ID has
>>>>>> expired.) We
>>>>>> also discussed ways to redefine the guarantees so that users who have
>>>>>> stronger idempotency requirements can ensure them (over
>>>>>> availability/memory
>>>>>> concerns). Let me know if you have any ideas here.
>>>>>>
>>>>>
>>>>> It may be easier to improve the experience for new clients. However,
>>>>> if we improved only the new clients we may need a way to help teams who 
>>>>> run
>>>>> Kafka with rough clients on old versions by at least giving them an easy
>>>>> way to identify the clientId/ or KafkaPrinciple that generated these PIDs.
>>>>>
>>>>> For context, it's very tricky to even identify which clientId is
>>>>> creating all these PIDs that caused OOM, which is a contributing part of
>>>>> the issue at the moment. So maybe one option here could be adding a new
>>>>> metric that tracks the number of generated PIDs per clientId. This will
>>>>> help the team who runs the Kafka cluster to
>>>>> - contact these rough clients and ask them to fix their clients or
>>>>> upgrade to a new client if the new client version has a better experience.
>>>>> - or if ended with a throttling solution this may help identify which
>>>>> clientId needs to be throttled.
>>>>>
>>>>> Maybe we can start with a solution for identifying the rough clients
>>>>> first and keep looking for a solution to limit them, what do you think?
>>>>>
>>>>> Thanks
>>>>>
>>>>> On Tue, Oct 18, 2022 at 5:24 PM Justine Olshan
>>>>> <jols...@confluent.io.invalid> wrote:
>>>>>
>>>>>> Oops.  I realized I just replied to Omnia 🤦‍♀️
>>>>>>
>>>>>> Here was my response for the mailing thread:
>>>>>>
>>>>>> Hey Omnia,
>>>>>> Sorry to hear this is a problem for you as well. :(
>>>>>> > * I have some concerns about the impact of this option on the
>>>>>> transactional producers, for example, what will happen to an ongoing
>>>>>> transaction associated with an expired PID? Would this leave the
>>>>>> transactions in a "hanging" state?*
>>>>>> We currently check if a transaction is ongoing and do not expire the
>>>>>> producer ID if it has an ongoing transaction. I suspect we will
>>>>>> continue to
>>>>>> do this with any solution we pick.
>>>>>>
>>>>>> My team members and I looked a bit into the throttling case and it
>>>>>> can get
>>>>>> a bit tricky since it means we need to throttle the produce request
>>>>>> before
>>>>>> it is processed. This means we either block all requests or have to
>>>>>> store
>>>>>> the request in memory (which is not great if we are trying to save
>>>>>> memory).
>>>>>>
>>>>>> I recently had another discussion with my team and it does seem like
>>>>>> there
>>>>>> should be a way to make it more clear to the clients what is going
>>>>>> on. A
>>>>>> lot of this protocol is implicit. I'm wondering if maybe there is a
>>>>>> way to
>>>>>> improve the story for newer clients. (Ie if we choose to expire based
>>>>>> on a
>>>>>> size limit, we should include a response indicating the ID has
>>>>>> expired.) We
>>>>>> also discussed ways to redefine the guarantees so that users who have
>>>>>> stronger idempotency requirements can ensure them (over
>>>>>> availability/memory
>>>>>> concerns). Let me know if you have any ideas here.
>>>>>>
>>>>>> Thanks again for commenting here, hopefully we can come to a good
>>>>>> solution.
>>>>>>
>>>>>> On Tue, Oct 18, 2022 at 1:11 AM Luke Chen <show...@gmail.com> wrote:
>>>>>>
>>>>>> > Hi Omnia,
>>>>>> >
>>>>>> > Thanks for your reply.
>>>>>> >
>>>>>> > For (3), you said:
>>>>>> > > - I have some concerns about the impact of this option on the
>>>>>> > transactional
>>>>>> > producers, for example, what will happen to an ongoing transaction
>>>>>> > associated with an expired PID? Would this leave the transactions
>>>>>> in a
>>>>>> > "hanging" state?
>>>>>> >
>>>>>> > - How will we notify the client that the transaction can't continue
>>>>>> due to
>>>>>> > an expired PID?
>>>>>> >
>>>>>> > - If PID got marked as `expired` this will mean that
>>>>>> > `admin.DescribeProducers` will not list them which will make
>>>>>> > *`kafka-transactions.sh
>>>>>> > --list`* a bit tricky as we can't identify if there are
>>>>>> transactions linked
>>>>>> > to this expired PID or not. The same concern applies to
>>>>>> > *`kafka-transactions.sh
>>>>>> > --find-hanging`*.
>>>>>> >
>>>>>> > --> Yes, you're right. Those are also concerns for this solution.
>>>>>> > Currently, there's no way to notify clients about the expiration.
>>>>>> > Also, the ongoing transactions will be hanging. For the admin cli,
>>>>>> we've
>>>>>> > never thought about it. Good point.
>>>>>> > In summary, to adopt this solution, there are many issues needed to
>>>>>> get
>>>>>> > fixed.
>>>>>> >
>>>>>> >
>>>>>> > For (5), you said:
>>>>>> > > I am assuming you mean KafkaPrincipal here! If so is your concern
>>>>>> here
>>>>>> > that
>>>>>> > those good clients that use the same principal as a rogue one will
>>>>>> get
>>>>>> > throttled?
>>>>>> >
>>>>>> > If this is the case, then I believe it should be okay as other
>>>>>> throttling
>>>>>> > in Kafka on *`/config/users/<user>`* has the same behaviour.
>>>>>> >
>>>>>> >
>>>>>> > What about applying limit/throttling to
>>>>>> > *`/config/users/<user>/clients/<client-id>`
>>>>>> > *similar to what we have with client quota? This should reduce the
>>>>>> concern
>>>>>> > about throttling good clients, right?
>>>>>> >
>>>>>> > --> Yes, I mean KafkaPrinciple (sorry, I didn't make it clear)
>>>>>> > Yes, We were thinking about throttling by KafkaPrinciple. Client Id
>>>>>> is
>>>>>> > also workable.
>>>>>> > It's just these 2 attributes are not required.
>>>>>> > That is, it's possible we take all clients as the same one: {default
>>>>>> > KafkaPrinciple + default clientID}, and apply throttling on it.
>>>>>> > Do you have any thoughts about it?
>>>>>> > Maybe skip throttling for {default KafkaPrinciple + default
>>>>>> clientID} ?
>>>>>> >
>>>>>> > Luke
>>>>>> >
>>>>>> >
>>>>>> >
>>>>>> > On Sat, Oct 15, 2022 at 2:33 AM Omnia Ibrahim <
>>>>>> o.g.h.ibra...@gmail.com>
>>>>>> > wrote:
>>>>>> >
>>>>>> >> Hi Luke & Justine,
>>>>>> >> Thanks for looking into this issue, we have been experiencing this
>>>>>> because
>>>>>> >> of rouge clients as well.
>>>>>> >>
>>>>>> >> > 3. Having a limit to the number of active producer IDs (sort of
>>>>>> like an
>>>>>> >> LRU
>>>>>> >> >cache)
>>>>>> >> >-> The idea here is that if we hit a misconfigured client, we
>>>>>> will expire
>>>>>> >> >the older entries. The concern here is we have risks to lose
>>>>>> idempotency
>>>>>> >> >guarantees, and currently, we don't have a way to notify clients
>>>>>> about
>>>>>> >> >losing idempotency guarantees. Besides, the least  recently used
>>>>>> entries
>>>>>> >> >got removed are not always from the "bad" clients.
>>>>>> >>
>>>>>> >> - I have some concerns about the impact of this option on the
>>>>>> >> transactional
>>>>>> >> producers, for example, what will happen to an ongoing transaction
>>>>>> >> associated with an expired PID? Would this leave the transactions
>>>>>> in a
>>>>>> >> "hanging" state?
>>>>>> >>
>>>>>> >> - How will we notify the client that the transaction can't
>>>>>> continue due to
>>>>>> >> an expired PID?
>>>>>> >>
>>>>>> >> - If PID got marked as `expired` this will mean that
>>>>>> >> `admin.DescribeProducers` will not list them which will make
>>>>>> >> *`kafka-transactions.sh
>>>>>> >> --list`* a bit tricky as we can't identify if there are
>>>>>> transactions
>>>>>> >> linked
>>>>>> >> to this expired PID or not. The same concern applies to
>>>>>> >> *`kafka-transactions.sh
>>>>>> >> --find-hanging`*.
>>>>>> >>
>>>>>> >>
>>>>>> >> >5. limit/throttling the producer id based on the principle
>>>>>> >> >-> Although we can limit the impact to a certain principle with
>>>>>> this
>>>>>> >> idea,
>>>>>> >> >same concern still exists as solution #1 #2.
>>>>>> >>
>>>>>> >> I am assuming you mean KafkaPrincipal here! If so is your concern
>>>>>> here
>>>>>> >> that
>>>>>> >> those good clients that use the same principal as a rogue one will
>>>>>> get
>>>>>> >> throttled?
>>>>>> >>
>>>>>> >> If this is the case, then I believe it should be okay as other
>>>>>> throttling
>>>>>> >> in Kafka on *`/config/users/<user>`* has the same behaviour.
>>>>>> >>
>>>>>> >>
>>>>>> >> What about applying limit/throttling to
>>>>>> >> *`/config/users/<user>/clients/<client-id>`
>>>>>> >> *similar to what we have with client quota? This should reduce the
>>>>>> concern
>>>>>> >> about throttling good clients, right?
>>>>>> >>
>>>>>> >> best,
>>>>>> >>
>>>>>> >> Omnia
>>>>>> >>
>>>>>> >> On Tue, Oct 11, 2022 at 4:18 AM Luke Chen <show...@gmail.com>
>>>>>> wrote:
>>>>>> >>
>>>>>> >> > Bump this thread to see if there are any comments/thoughts.
>>>>>> >> > Thanks.
>>>>>> >> >
>>>>>> >> > Luke
>>>>>> >> >
>>>>>> >> > On Mon, Sep 26, 2022 at 11:06 AM Luke Chen <show...@gmail.com>
>>>>>> wrote:
>>>>>> >> >
>>>>>> >> > > Hi devs,
>>>>>> >> > >
>>>>>> >> > > As stated in the motivation section in KIP-854
>>>>>> >> > > <
>>>>>> >> >
>>>>>> >>
>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-854+Separate+configuration+for+producer+ID+expiry
>>>>>> >> > >:
>>>>>> >> > >
>>>>>> >> > > With idempotent producers becoming the default in Kafka, this
>>>>>> means
>>>>>> >> that
>>>>>> >> > > unless otherwise specified, all new producers will be given
>>>>>> producer
>>>>>> >> IDs.
>>>>>> >> > > Some (inefficient) applications may now create many
>>>>>> non-transactional
>>>>>> >> > > idempotent producers. Each of these producers will be assigned
>>>>>> a
>>>>>> >> producer
>>>>>> >> > > ID and these IDs and their metadata are stored in the broker
>>>>>> memory,
>>>>>> >> > which
>>>>>> >> > > might cause brokers out of memory.
>>>>>> >> > >
>>>>>> >> > > Justine (in cc.) and I and some other team members are working
>>>>>> on the
>>>>>> >> > > solutions for this issue. But none of them solves it completely
>>>>>> >> without
>>>>>> >> > > side effects. Among them, "availability" VS "idempotency
>>>>>> guarantees"
>>>>>> >> is
>>>>>> >> > > what we can't decide which to sacrifice. Some of these
>>>>>> solutions
>>>>>> >> > sacrifice
>>>>>> >> > > availability of produce (1,2,5) and others sacrifice
>>>>>> idempotency
>>>>>> >> > guarantees
>>>>>> >> > > (3). It could be useful to know if people generally have a
>>>>>> preference
>>>>>> >> one
>>>>>> >> > > way or the other. Or what other better solutions there might
>>>>>> be.
>>>>>> >> > >
>>>>>> >> > > Here are the proposals we came up with:
>>>>>> >> > >
>>>>>> >> > > 1. Limit the total active producer ID allocation number.
>>>>>> >> > > -> This is the simplest solution. But since the OOM issue is
>>>>>> usually
>>>>>> >> > > caused by a rogue or misconfigured client, and this solution
>>>>>> might
>>>>>> >> > "punish"
>>>>>> >> > > the good client from sending messages.
>>>>>> >> > >
>>>>>> >> > > 2. Throttling the producer ID allocation rate
>>>>>> >> > > -> Same concern as the solution #1.
>>>>>> >> > >
>>>>>> >> > > 3. Having a limit to the number of active producer IDs (sort
>>>>>> of like
>>>>>> >> an
>>>>>> >> > > LRU cache)
>>>>>> >> > > -> The idea here is that if we hit a misconfigured client, we
>>>>>> will
>>>>>> >> expire
>>>>>> >> > > the older entries. The concern here is we have risks to lose
>>>>>> >> idempotency
>>>>>> >> > > guarantees, and currently, we don't have a way to notify
>>>>>> clients about
>>>>>> >> > > losing idempotency guarantees. Besides, the least  recently
>>>>>> used
>>>>>> >> entries
>>>>>> >> > > got removed are not always from the "bad" clients.
>>>>>> >> > >
>>>>>> >> > > 4. allow clients to "close" the producer ID usage
>>>>>> >> > > -> We can provide a way for producer to "close" producerID
>>>>>> usage.
>>>>>> >> > > Currently, we only have a way to INIT_PRODUCER_ID requested to
>>>>>> >> allocate
>>>>>> >> > > one. After that, we'll keep the producer ID metadata in broker
>>>>>> even if
>>>>>> >> > the
>>>>>> >> > > producer is "closed". Having a closed API (ex:
>>>>>> END_PRODUCER_ID), we
>>>>>> >> can
>>>>>> >> > > remove the entry from broker side. In client side, we can send
>>>>>> it when
>>>>>> >> > > producer closing. The concern is, the old clients (including
>>>>>> non-java
>>>>>> >> > > clients) will still suffer from the OOM issue.
>>>>>> >> > >
>>>>>> >> > > 5. limit/throttling the producer id based on the principle
>>>>>> >> > > -> Although we can limit the impact to a certain principle
>>>>>> with this
>>>>>> >> > idea,
>>>>>> >> > > same concern still exists as solution #1 #2.
>>>>>> >> > >
>>>>>> >> > > Any thoughts/feedback are welcomed.
>>>>>> >> > >
>>>>>> >> > > Thank you.
>>>>>> >> > > Luke
>>>>>> >> > >
>>>>>> >> >
>>>>>> >>
>>>>>> >
>>>>>>
>>>>>

Reply via email to