Hey Omnia, I was doing a bit of snooping (I actually just get updates for the KIP page) and I saw this draft was in progress. I shared it with some of my colleagues as well who I previously discussed the issue with.
The initial look was pretty promising to me. I appreciate the detailing of the rejected options since we had quite a few we worked through :) One question I have is how will we handle a scenario where potentially each new client has a new Kafka Principal? Is that simply not covered by throttling? Thanks, Justine On Wed, May 31, 2023 at 10:08 AM Omnia Ibrahim <o.g.h.ibra...@gmail.com> wrote: > Hi Justine and Luke, > > I started a KIP draft here > https://cwiki.apache.org/confluence/display/KAFKA/KIP-936%3A+Throttle+number+of+active+PIDs > for a proposal would appreciate it if you could provide any initial > feedback before opening a broader discussion. > > Thanks > > On Wed, Feb 22, 2023 at 4:35 PM Omnia Ibrahim <o.g.h.ibra...@gmail.com> > wrote: > >> >> *Hi Justine, * >> >> *My initial thought of throttling the initProducerId was to get ripped >> off the problem at the source (which creates too many PIDs per client) and >> fail faster but if having this on the produce request level is easier this >> should be fine. I am guessing it will be the same direction as we may >> ClientQuotaManage for Produce throttling with a different quota window than >> `quota.window.size.seconds `. * >> >> *If this is good as an initial solution I can put start a KIP and see >> what the wider community feels about this. * >> >> *Also, I noticed that at some point one of us hit "Replay" instead of >> "Replay to All" :) So here are the previous conversations* >> >> *On Wed, Feb 15, 2023 at 12:20 AM Justine Olshan <jols...@confluent.io >> <jols...@confluent.io>> wrote:* >> >>> Hey Omnia, >>> >>> Thanks for the response. I think I understand your explanations here >>> with respect to principal and clientId usage. >>> >>> For the throttling -- handleInitProducerIdRequest will allocate the ID >>> to the producer, but we don't actually store it on the broker or increment >>> our metric until the first produce request for that producer is sent (or >>> sent again after previously expiring). Would you consider throttling the >>> produce request instead? It may be hard to get any metrics from the >>> transaction coordinator where the initProducerId request is handled. >>> >>> Justine >> >> >> *On Tue, Feb 14, 2023 at 9:29 AM Omnia Ibrahim <o.g.h.ibra...@gmail.com >> <o.g.h.ibra...@gmail.com>> wrote:* >> >>> Hey Justine, >>> > If I understand your message correctly, there are issues with >>> identifying the source of the rogue clients? So you propose to add a new >>> metric for that? >>> > And also proposing to throttle based on clientId as a potential follow >>> up? >>> I want to identify rogue clients by KafkaPrincipal (and/or clientId) >>> similarly to how we identify clients in Fetch/Produce/Request >>> QuotaManagers. Using KafkaPrincipal should give cluster admin the ability >>> to throttle later based on principal which is most likely to be a smaller >>> set than clientIds. My initial thought was to add a metrics that represent >>> how many InitProducerIDRequest are sent by KafkaPrincipal (and/or clientId) >>> similar to Fetch/Produce QuotaManagers. >>> Then as a follow-up, we can throttle based on either KafkaPrinciple or >>> clientId (maybe default as well to align this with other QuotaManagers in >>> Kafka). >>> >>> >1. Does we rely on the client using the same ID? What if there are many >>> clients that all use different client IDs? >>> This is why I want to use the combination of KafkaPrincipal or clientId >>> similar to some other quotas we have in Kafka already. This will be a >>> similar risk to Fetch/Produce quota in Kafka which also relay on the client >>> to use the same clientId and KafkaPrincipal. >>> >>> >2. Are there places where high cardinality of this metric is a concern? >>> I can imagine many client IDs in the system. Would we treat this as a rate >>> metric (ie, when we get an init producer ID and return a new producer ID we >>> emit a count for that client id?) Or something else? >>> My initial thought here was to follow the steps of ClientQuotaManager >>> and ClientRequestQuotaManager and use a rate metric. However, I think we >>> can emit it either >>> >>> 1. when we return the new PID. However, I have concerns that we may >>> circle back to the previous concerns with OMM due to keeping track of >>> ACTIVE PIDs per KafkaPrincipal(and/or) clientId in the future. Also this >>> would be the first time Kafka throttle IDs for any client. >>> 2. or once we recieve initProducerIDRequest and throttle before even >>> hitting `handleInitProducerIdRequest`. Going this direction we may need >>> to >>> throttle it within a different quota window than ` >>> quota.window.size.seconds ` as throttling INIT_PRODUCER_ID request >>> per second wouldn't be efficient for most cases. I personally think this >>> direction is easier as it seems more consistent with the existing quota >>> implementation. Specially that Kafka has already the concept of >>> throttling >>> subset of requests (in ControllerMutationQuotaManager) but never had any >>> concept of throttling active IDs for any client. >>> >>> >>> What do you think? >>> >>> Thanks >>> Omnia >>> >> >> *On Thu, Feb 2, 2023 at 5:39 PM Justine Olshan <jols...@confluent.io >> <jols...@confluent.io>> wrote:* >> >>> Hey Omnia, >>> Sorry for losing track of this. >>> >>> If I understand your message correctly, there are issues with >>> identifying the source of the rogue clients? So you propose to add a new >>> metric for that? >>> And also proposing to throttle based on clientId as a potential follow >>> up? >>> >>> I think both of these make sense. The only things I can think of for the >>> metric are: >>> 1. Does we rely on the client using the same ID? What if there are many >>> clients that all use different client IDs? >>> 2. Are there places where high cardinality of this metric is a concern? >>> I can imagine many client IDs in the system. Would we treat this as a rate >>> metric (ie, when we get an init producer ID and return a new producer ID we >>> emit a count for that client id?) Or something else? >>> >>> Thanks, >>> Justine >>> >> >> Thanks >> Omnia >> >> On Thu, Feb 2, 2023 at 4:44 PM Omnia Ibrahim <o.g.h.ibra...@gmail.com> >> wrote: >> >>> Hi Luke and Justine, >>> Are there any thoughts or updates on this? I would love to help with >>> this as we are hitting this more frequently now. >>> >>> best, >>> >>> On Mon, Oct 31, 2022 at 6:15 PM Omnia Ibrahim <o.g.h.ibra...@gmail.com> >>> wrote: >>> >>>> Hi Luke and Justine, >>>> >>>>> For (3), you said: >>>>> > - I have some concerns about the impact of this option on the >>>>> transactional >>>>> producers, for example, what will happen to an ongoing transaction >>>>> associated with an expired PID? Would this leave the transactions in a >>>>> "hanging" state? >>>>> >>>>> - How will we notify the client that the transaction can't continue >>>>> due to >>>>> an expired PID? >>>>> >>>>> - If PID got marked as `expired` this will mean that >>>>> `admin.DescribeProducers` will not list them which will make >>>>> *`kafka-transactions.sh >>>>> --list`* a bit tricky as we can't identify if there are transactions >>>>> linked >>>>> to this expired PID or not. The same concern applies to >>>>> *`kafka-transactions.sh >>>>> --find-hanging`*. >>>>> >>>>> --> Yes, you're right. Those are also concerns for this solution. >>>>> Currently, there's no way to notify clients about the expiration. >>>>> Also, the ongoing transactions will be hanging. For the admin cli, >>>>> we've >>>>> never thought about it. Good point. >>>>> In summary, to adopt this solution, there are many issues needed to get >>>>> fixed. >>>>> >>>> >>>> Justin already clarified that if PID is attached to a transaction it >>>> will not expire so identifying the transactions shouldn't be a concern >>>> anymore. >>>> The only concern here will be that this solution will not solve the >>>> problem if the rouge producer is a transactional producer with hanging >>>> transactions. >>>> If anyone faced this situation they will need to abort the hanging >>>> transactions manually and then the solution to expire a PID can then work. >>>> >>>> --> Yes, I mean KafkaPrinciple (sorry, I didn't make it clear) >>>>> Yes, We were thinking about throttling by KafkaPrinciple. Client Id is >>>>> also >>>>> workable. >>>>> It's just these 2 attributes are not required. >>>>> That is, it's possible we take all clients as the same one: {default >>>>> KafkaPrinciple + default clientID}, and apply throttling on it. >>>>> Do you have any thoughts about it? >>>>> Maybe skip throttling for {default KafkaPrinciple + default clientID} >>>>> >>>> >>>> Throttling for default KafkaPrinciple and default ClientID is useful >>>> when we need to have a hard limit on the whole cluster and whoever is >>>> running the cluster doesn't knowclientsntIDs or if a KafkaPrinciple is >>>> reused between different producer applications. >>>> I usually find it helpful to have a way to apply throttling only on the >>>> rough clients only once I identify them without punishing everyone on the >>>> cluster. However, there are two problems with this >>>> - There's no easy way at the moment to link PIDs to clientId or >>>> KafkaPrinciple. This need to be addressed first. >>>> - Is Justin's comment on the throttling, and the fact that will mean we >>>> either block all requests or have to store the request in memory which in >>>> both cases has side downs on the producer experince. >>>> >>>> I recently had another discussion with my team and it does seem like >>>>> there >>>>> should be a way to make it more clear to the clients what is going on. >>>>> A >>>>> lot of this protocol is implicit. I'm wondering if maybe there is a >>>>> way to >>>>> improve the story for newer clients. (Ie if we choose to expire based >>>>> on a >>>>> size limit, we should include a response indicating the ID has >>>>> expired.) We >>>>> also discussed ways to redefine the guarantees so that users who have >>>>> stronger idempotency requirements can ensure them (over >>>>> availability/memory >>>>> concerns). Let me know if you have any ideas here. >>>>> >>>> >>>> It may be easier to improve the experience for new clients. However, if >>>> we improved only the new clients we may need a way to help teams who run >>>> Kafka with rough clients on old versions by at least giving them an easy >>>> way to identify the clientId/ or KafkaPrinciple that generated these PIDs. >>>> >>>> For context, it's very tricky to even identify which clientId is >>>> creating all these PIDs that caused OOM, which is a contributing part of >>>> the issue at the moment. So maybe one option here could be adding a new >>>> metric that tracks the number of generated PIDs per clientId. This will >>>> help the team who runs the Kafka cluster to >>>> - contact these rough clients and ask them to fix their clients or >>>> upgrade to a new client if the new client version has a better experience. >>>> - or if ended with a throttling solution this may help identify which >>>> clientId needs to be throttled. >>>> >>>> Maybe we can start with a solution for identifying the rough clients >>>> first and keep looking for a solution to limit them, what do you think? >>>> >>>> Thanks >>>> >>>> On Tue, Oct 18, 2022 at 5:24 PM Justine Olshan >>>> <jols...@confluent.io.invalid> wrote: >>>> >>>>> Oops. I realized I just replied to Omnia 🤦♀️ >>>>> >>>>> Here was my response for the mailing thread: >>>>> >>>>> Hey Omnia, >>>>> Sorry to hear this is a problem for you as well. :( >>>>> > * I have some concerns about the impact of this option on the >>>>> transactional producers, for example, what will happen to an ongoing >>>>> transaction associated with an expired PID? Would this leave the >>>>> transactions in a "hanging" state?* >>>>> We currently check if a transaction is ongoing and do not expire the >>>>> producer ID if it has an ongoing transaction. I suspect we will >>>>> continue to >>>>> do this with any solution we pick. >>>>> >>>>> My team members and I looked a bit into the throttling case and it can >>>>> get >>>>> a bit tricky since it means we need to throttle the produce request >>>>> before >>>>> it is processed. This means we either block all requests or have to >>>>> store >>>>> the request in memory (which is not great if we are trying to save >>>>> memory). >>>>> >>>>> I recently had another discussion with my team and it does seem like >>>>> there >>>>> should be a way to make it more clear to the clients what is going on. >>>>> A >>>>> lot of this protocol is implicit. I'm wondering if maybe there is a >>>>> way to >>>>> improve the story for newer clients. (Ie if we choose to expire based >>>>> on a >>>>> size limit, we should include a response indicating the ID has >>>>> expired.) We >>>>> also discussed ways to redefine the guarantees so that users who have >>>>> stronger idempotency requirements can ensure them (over >>>>> availability/memory >>>>> concerns). Let me know if you have any ideas here. >>>>> >>>>> Thanks again for commenting here, hopefully we can come to a good >>>>> solution. >>>>> >>>>> On Tue, Oct 18, 2022 at 1:11 AM Luke Chen <show...@gmail.com> wrote: >>>>> >>>>> > Hi Omnia, >>>>> > >>>>> > Thanks for your reply. >>>>> > >>>>> > For (3), you said: >>>>> > > - I have some concerns about the impact of this option on the >>>>> > transactional >>>>> > producers, for example, what will happen to an ongoing transaction >>>>> > associated with an expired PID? Would this leave the transactions in >>>>> a >>>>> > "hanging" state? >>>>> > >>>>> > - How will we notify the client that the transaction can't continue >>>>> due to >>>>> > an expired PID? >>>>> > >>>>> > - If PID got marked as `expired` this will mean that >>>>> > `admin.DescribeProducers` will not list them which will make >>>>> > *`kafka-transactions.sh >>>>> > --list`* a bit tricky as we can't identify if there are transactions >>>>> linked >>>>> > to this expired PID or not. The same concern applies to >>>>> > *`kafka-transactions.sh >>>>> > --find-hanging`*. >>>>> > >>>>> > --> Yes, you're right. Those are also concerns for this solution. >>>>> > Currently, there's no way to notify clients about the expiration. >>>>> > Also, the ongoing transactions will be hanging. For the admin cli, >>>>> we've >>>>> > never thought about it. Good point. >>>>> > In summary, to adopt this solution, there are many issues needed to >>>>> get >>>>> > fixed. >>>>> > >>>>> > >>>>> > For (5), you said: >>>>> > > I am assuming you mean KafkaPrincipal here! If so is your concern >>>>> here >>>>> > that >>>>> > those good clients that use the same principal as a rogue one will >>>>> get >>>>> > throttled? >>>>> > >>>>> > If this is the case, then I believe it should be okay as other >>>>> throttling >>>>> > in Kafka on *`/config/users/<user>`* has the same behaviour. >>>>> > >>>>> > >>>>> > What about applying limit/throttling to >>>>> > *`/config/users/<user>/clients/<client-id>` >>>>> > *similar to what we have with client quota? This should reduce the >>>>> concern >>>>> > about throttling good clients, right? >>>>> > >>>>> > --> Yes, I mean KafkaPrinciple (sorry, I didn't make it clear) >>>>> > Yes, We were thinking about throttling by KafkaPrinciple. Client Id >>>>> is >>>>> > also workable. >>>>> > It's just these 2 attributes are not required. >>>>> > That is, it's possible we take all clients as the same one: {default >>>>> > KafkaPrinciple + default clientID}, and apply throttling on it. >>>>> > Do you have any thoughts about it? >>>>> > Maybe skip throttling for {default KafkaPrinciple + default >>>>> clientID} ? >>>>> > >>>>> > Luke >>>>> > >>>>> > >>>>> > >>>>> > On Sat, Oct 15, 2022 at 2:33 AM Omnia Ibrahim < >>>>> o.g.h.ibra...@gmail.com> >>>>> > wrote: >>>>> > >>>>> >> Hi Luke & Justine, >>>>> >> Thanks for looking into this issue, we have been experiencing this >>>>> because >>>>> >> of rouge clients as well. >>>>> >> >>>>> >> > 3. Having a limit to the number of active producer IDs (sort of >>>>> like an >>>>> >> LRU >>>>> >> >cache) >>>>> >> >-> The idea here is that if we hit a misconfigured client, we will >>>>> expire >>>>> >> >the older entries. The concern here is we have risks to lose >>>>> idempotency >>>>> >> >guarantees, and currently, we don't have a way to notify clients >>>>> about >>>>> >> >losing idempotency guarantees. Besides, the least recently used >>>>> entries >>>>> >> >got removed are not always from the "bad" clients. >>>>> >> >>>>> >> - I have some concerns about the impact of this option on the >>>>> >> transactional >>>>> >> producers, for example, what will happen to an ongoing transaction >>>>> >> associated with an expired PID? Would this leave the transactions >>>>> in a >>>>> >> "hanging" state? >>>>> >> >>>>> >> - How will we notify the client that the transaction can't continue >>>>> due to >>>>> >> an expired PID? >>>>> >> >>>>> >> - If PID got marked as `expired` this will mean that >>>>> >> `admin.DescribeProducers` will not list them which will make >>>>> >> *`kafka-transactions.sh >>>>> >> --list`* a bit tricky as we can't identify if there are transactions >>>>> >> linked >>>>> >> to this expired PID or not. The same concern applies to >>>>> >> *`kafka-transactions.sh >>>>> >> --find-hanging`*. >>>>> >> >>>>> >> >>>>> >> >5. limit/throttling the producer id based on the principle >>>>> >> >-> Although we can limit the impact to a certain principle with >>>>> this >>>>> >> idea, >>>>> >> >same concern still exists as solution #1 #2. >>>>> >> >>>>> >> I am assuming you mean KafkaPrincipal here! If so is your concern >>>>> here >>>>> >> that >>>>> >> those good clients that use the same principal as a rogue one will >>>>> get >>>>> >> throttled? >>>>> >> >>>>> >> If this is the case, then I believe it should be okay as other >>>>> throttling >>>>> >> in Kafka on *`/config/users/<user>`* has the same behaviour. >>>>> >> >>>>> >> >>>>> >> What about applying limit/throttling to >>>>> >> *`/config/users/<user>/clients/<client-id>` >>>>> >> *similar to what we have with client quota? This should reduce the >>>>> concern >>>>> >> about throttling good clients, right? >>>>> >> >>>>> >> best, >>>>> >> >>>>> >> Omnia >>>>> >> >>>>> >> On Tue, Oct 11, 2022 at 4:18 AM Luke Chen <show...@gmail.com> >>>>> wrote: >>>>> >> >>>>> >> > Bump this thread to see if there are any comments/thoughts. >>>>> >> > Thanks. >>>>> >> > >>>>> >> > Luke >>>>> >> > >>>>> >> > On Mon, Sep 26, 2022 at 11:06 AM Luke Chen <show...@gmail.com> >>>>> wrote: >>>>> >> > >>>>> >> > > Hi devs, >>>>> >> > > >>>>> >> > > As stated in the motivation section in KIP-854 >>>>> >> > > < >>>>> >> > >>>>> >> >>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-854+Separate+configuration+for+producer+ID+expiry >>>>> >> > >: >>>>> >> > > >>>>> >> > > With idempotent producers becoming the default in Kafka, this >>>>> means >>>>> >> that >>>>> >> > > unless otherwise specified, all new producers will be given >>>>> producer >>>>> >> IDs. >>>>> >> > > Some (inefficient) applications may now create many >>>>> non-transactional >>>>> >> > > idempotent producers. Each of these producers will be assigned a >>>>> >> producer >>>>> >> > > ID and these IDs and their metadata are stored in the broker >>>>> memory, >>>>> >> > which >>>>> >> > > might cause brokers out of memory. >>>>> >> > > >>>>> >> > > Justine (in cc.) and I and some other team members are working >>>>> on the >>>>> >> > > solutions for this issue. But none of them solves it completely >>>>> >> without >>>>> >> > > side effects. Among them, "availability" VS "idempotency >>>>> guarantees" >>>>> >> is >>>>> >> > > what we can't decide which to sacrifice. Some of these solutions >>>>> >> > sacrifice >>>>> >> > > availability of produce (1,2,5) and others sacrifice idempotency >>>>> >> > guarantees >>>>> >> > > (3). It could be useful to know if people generally have a >>>>> preference >>>>> >> one >>>>> >> > > way or the other. Or what other better solutions there might be. >>>>> >> > > >>>>> >> > > Here are the proposals we came up with: >>>>> >> > > >>>>> >> > > 1. Limit the total active producer ID allocation number. >>>>> >> > > -> This is the simplest solution. But since the OOM issue is >>>>> usually >>>>> >> > > caused by a rogue or misconfigured client, and this solution >>>>> might >>>>> >> > "punish" >>>>> >> > > the good client from sending messages. >>>>> >> > > >>>>> >> > > 2. Throttling the producer ID allocation rate >>>>> >> > > -> Same concern as the solution #1. >>>>> >> > > >>>>> >> > > 3. Having a limit to the number of active producer IDs (sort of >>>>> like >>>>> >> an >>>>> >> > > LRU cache) >>>>> >> > > -> The idea here is that if we hit a misconfigured client, we >>>>> will >>>>> >> expire >>>>> >> > > the older entries. The concern here is we have risks to lose >>>>> >> idempotency >>>>> >> > > guarantees, and currently, we don't have a way to notify >>>>> clients about >>>>> >> > > losing idempotency guarantees. Besides, the least recently used >>>>> >> entries >>>>> >> > > got removed are not always from the "bad" clients. >>>>> >> > > >>>>> >> > > 4. allow clients to "close" the producer ID usage >>>>> >> > > -> We can provide a way for producer to "close" producerID >>>>> usage. >>>>> >> > > Currently, we only have a way to INIT_PRODUCER_ID requested to >>>>> >> allocate >>>>> >> > > one. After that, we'll keep the producer ID metadata in broker >>>>> even if >>>>> >> > the >>>>> >> > > producer is "closed". Having a closed API (ex: >>>>> END_PRODUCER_ID), we >>>>> >> can >>>>> >> > > remove the entry from broker side. In client side, we can send >>>>> it when >>>>> >> > > producer closing. The concern is, the old clients (including >>>>> non-java >>>>> >> > > clients) will still suffer from the OOM issue. >>>>> >> > > >>>>> >> > > 5. limit/throttling the producer id based on the principle >>>>> >> > > -> Although we can limit the impact to a certain principle with >>>>> this >>>>> >> > idea, >>>>> >> > > same concern still exists as solution #1 #2. >>>>> >> > > >>>>> >> > > Any thoughts/feedback are welcomed. >>>>> >> > > >>>>> >> > > Thank you. >>>>> >> > > Luke >>>>> >> > > >>>>> >> > >>>>> >> >>>>> > >>>>> >>>> >> On Wed, Feb 15, 2023 at 12:20 AM Justine Olshan <jols...@confluent.io> >> wrote: >> >>> Hey Omnia, >>> >>> Thanks for the response. I think I understand your explanations here >>> with respect to principal and clientId usage. >>> >>> For the throttling -- handleInitProducerIdRequest will allocate the ID >>> to the producer, but we don't actually store it on the broker or increment >>> our metric until the first produce request for that producer is sent (or >>> sent again after previously expiring). Would you consider throttling the >>> produce request instead? It may be hard to get any metrics from the >>> transaction coordinator where the initProducerId request is handled. >>> >>> Justine >>> >>> On Tue, Feb 14, 2023 at 9:29 AM Omnia Ibrahim <o.g.h.ibra...@gmail.com> >>> wrote: >>> >>>> Hey Justine, >>>> > If I understand your message correctly, there are issues with >>>> identifying the source of the rogue clients? So you propose to add a new >>>> metric for that? >>>> > And also proposing to throttle based on clientId as a potential >>>> follow up? >>>> I want to identify rogue clients by KafkaPrincipal (and/or clientId) >>>> similarly to how we identify clients in Fetch/Produce/Request >>>> QuotaManagers. Using KafkaPrincipal should give cluster admin the ability >>>> to throttle later based on principal which is most likely to be a smaller >>>> set than clientIds. My initial thought was to add a metrics that represent >>>> how many InitProducerIDRequest are sent by KafkaPrincipal (and/or clientId) >>>> similar to Fetch/Produce QuotaManagers. >>>> Then as a follow-up, we can throttle based on either KafkaPrinciple or >>>> clientId (maybe default as well to align this with other QuotaManagers in >>>> Kafka). >>>> >>>> >1. Does we rely on the client using the same ID? What if there are >>>> many clients that all use different client IDs? >>>> This is why I want to use the combination of KafkaPrincipal or clientId >>>> similar to some other quotas we have in Kafka already. This will be a >>>> similar risk to Fetch/Produce quota in Kafka which also relay on the client >>>> to use the same clientId and KafkaPrincipal. >>>> >>>> >2. Are there places where high cardinality of this metric is a >>>> concern? I can imagine many client IDs in the system. Would we treat this >>>> as a rate metric (ie, when we get an init producer ID and return a new >>>> producer ID we emit a count for that client id?) Or something else? >>>> My initial thought here was to follow the steps of ClientQuotaManager >>>> and ClientRequestQuotaManager and use a rate metric. However, I think we >>>> can emit it either >>>> >>>> 1. when we return the new PID. However, I have concerns that we may >>>> circle back to the previous concerns with OMM due to keeping track of >>>> ACTIVE PIDs per KafkaPrincipal(and/or) clientId in the future. Also this >>>> would be the first time Kafka throttle IDs for any client. >>>> 2. or once we recieve initProducerIDRequest and throttle before >>>> even hitting `handleInitProducerIdRequest`. Going this direction we may >>>> need to throttle it within a different quota window than ` >>>> quota.window.size.seconds ` as throttling INIT_PRODUCER_ID request >>>> per second wouldn't be efficient for most cases. I personally think this >>>> direction is easier as it seems more consistent with the existing quota >>>> implementation. Specially that Kafka has already the concept of >>>> throttling >>>> subset of requests (in ControllerMutationQuotaManager) but never had any >>>> concept of throttling active IDs for any client. >>>> >>>> >>>> What do you think? >>>> >>>> Thanks >>>> Omnia >>>> >>>> On Thu, Feb 2, 2023 at 5:39 PM Justine Olshan <jols...@confluent.io> >>>> wrote: >>>> >>>>> Hey Omnia, >>>>> Sorry for losing track of this. >>>>> >>>>> If I understand your message correctly, there are issues with >>>>> identifying the source of the rogue clients? So you propose to add a new >>>>> metric for that? >>>>> And also proposing to throttle based on clientId as a potential follow >>>>> up? >>>>> >>>>> I think both of these make sense. The only things I can think of for >>>>> the metric are: >>>>> 1. Does we rely on the client using the same ID? What if there are >>>>> many clients that all use different client IDs? >>>>> 2. Are there places where high cardinality of this metric is a >>>>> concern? I can imagine many client IDs in the system. Would we treat this >>>>> as a rate metric (ie, when we get an init producer ID and return a new >>>>> producer ID we emit a count for that client id?) Or something else? >>>>> >>>>> Thanks, >>>>> Justine >>>>> >>>>> On Thu, Feb 2, 2023 at 8:44 AM Omnia Ibrahim <o.g.h.ibra...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi Luke and Justine, >>>>>> Are there any thoughts or updates on this? I would love to help with >>>>>> this as we are hitting this more frequently now. >>>>>> >>>>>> best, >>>>>> >>>>>> On Mon, Oct 31, 2022 at 6:15 PM Omnia Ibrahim < >>>>>> o.g.h.ibra...@gmail.com> wrote: >>>>>> >>>>>>> Hi Luke and Justine, >>>>>>> >>>>>>>> For (3), you said: >>>>>>>> > - I have some concerns about the impact of this option on the >>>>>>>> transactional >>>>>>>> producers, for example, what will happen to an ongoing transaction >>>>>>>> associated with an expired PID? Would this leave the transactions >>>>>>>> in a >>>>>>>> "hanging" state? >>>>>>>> >>>>>>>> - How will we notify the client that the transaction can't continue >>>>>>>> due to >>>>>>>> an expired PID? >>>>>>>> >>>>>>>> - If PID got marked as `expired` this will mean that >>>>>>>> `admin.DescribeProducers` will not list them which will make >>>>>>>> *`kafka-transactions.sh >>>>>>>> --list`* a bit tricky as we can't identify if there are >>>>>>>> transactions linked >>>>>>>> to this expired PID or not. The same concern applies to >>>>>>>> *`kafka-transactions.sh >>>>>>>> --find-hanging`*. >>>>>>>> >>>>>>>> --> Yes, you're right. Those are also concerns for this solution. >>>>>>>> Currently, there's no way to notify clients about the expiration. >>>>>>>> Also, the ongoing transactions will be hanging. For the admin cli, >>>>>>>> we've >>>>>>>> never thought about it. Good point. >>>>>>>> In summary, to adopt this solution, there are many issues needed to >>>>>>>> get >>>>>>>> fixed. >>>>>>>> >>>>>>> >>>>>>> Justin already clarified that if PID is attached to a transaction it >>>>>>> will not expire so identifying the transactions shouldn't be a concern >>>>>>> anymore. >>>>>>> The only concern here will be that this solution will not solve the >>>>>>> problem if the rouge producer is a transactional producer with hanging >>>>>>> transactions. >>>>>>> If anyone faced this situation they will need to abort the hanging >>>>>>> transactions manually and then the solution to expire a PID can then >>>>>>> work. >>>>>>> >>>>>>> --> Yes, I mean KafkaPrinciple (sorry, I didn't make it clear) >>>>>>>> Yes, We were thinking about throttling by KafkaPrinciple. Client Id >>>>>>>> is also >>>>>>>> workable. >>>>>>>> It's just these 2 attributes are not required. >>>>>>>> That is, it's possible we take all clients as the same one: {default >>>>>>>> KafkaPrinciple + default clientID}, and apply throttling on it. >>>>>>>> Do you have any thoughts about it? >>>>>>>> Maybe skip throttling for {default KafkaPrinciple + default >>>>>>>> clientID} >>>>>>>> >>>>>>> >>>>>>> Throttling for default KafkaPrinciple and default ClientID is useful >>>>>>> when we need to have a hard limit on the whole cluster and whoever is >>>>>>> running the cluster doesn't knowclientsntIDs or if a KafkaPrinciple is >>>>>>> reused between different producer applications. >>>>>>> I usually find it helpful to have a way to apply throttling only on >>>>>>> the rough clients only once I identify them without punishing everyone >>>>>>> on >>>>>>> the cluster. However, there are two problems with this >>>>>>> - There's no easy way at the moment to link PIDs to clientId or >>>>>>> KafkaPrinciple. This need to be addressed first. >>>>>>> - Is Justin's comment on the throttling, and the fact that will mean >>>>>>> we either block all requests or have to store the request in memory >>>>>>> which >>>>>>> in both cases has side downs on the producer experince. >>>>>>> >>>>>>> I recently had another discussion with my team and it does seem like >>>>>>>> there >>>>>>>> should be a way to make it more clear to the clients what is going >>>>>>>> on. A >>>>>>>> lot of this protocol is implicit. I'm wondering if maybe there is a >>>>>>>> way to >>>>>>>> improve the story for newer clients. (Ie if we choose to expire >>>>>>>> based on a >>>>>>>> size limit, we should include a response indicating the ID has >>>>>>>> expired.) We >>>>>>>> also discussed ways to redefine the guarantees so that users who >>>>>>>> have >>>>>>>> stronger idempotency requirements can ensure them (over >>>>>>>> availability/memory >>>>>>>> concerns). Let me know if you have any ideas here. >>>>>>>> >>>>>>> >>>>>>> It may be easier to improve the experience for new clients. However, >>>>>>> if we improved only the new clients we may need a way to help teams who >>>>>>> run >>>>>>> Kafka with rough clients on old versions by at least giving them an easy >>>>>>> way to identify the clientId/ or KafkaPrinciple that generated these >>>>>>> PIDs. >>>>>>> >>>>>>> For context, it's very tricky to even identify which clientId is >>>>>>> creating all these PIDs that caused OOM, which is a contributing part of >>>>>>> the issue at the moment. So maybe one option here could be adding a new >>>>>>> metric that tracks the number of generated PIDs per clientId. This will >>>>>>> help the team who runs the Kafka cluster to >>>>>>> - contact these rough clients and ask them to fix their clients or >>>>>>> upgrade to a new client if the new client version has a better >>>>>>> experience. >>>>>>> - or if ended with a throttling solution this may help identify >>>>>>> which clientId needs to be throttled. >>>>>>> >>>>>>> Maybe we can start with a solution for identifying the rough clients >>>>>>> first and keep looking for a solution to limit them, what do you think? >>>>>>> >>>>>>> Thanks >>>>>>> >>>>>>> On Tue, Oct 18, 2022 at 5:24 PM Justine Olshan >>>>>>> <jols...@confluent.io.invalid> wrote: >>>>>>> >>>>>>>> Oops. I realized I just replied to Omnia 🤦♀️ >>>>>>>> >>>>>>>> Here was my response for the mailing thread: >>>>>>>> >>>>>>>> Hey Omnia, >>>>>>>> Sorry to hear this is a problem for you as well. :( >>>>>>>> > * I have some concerns about the impact of this option on the >>>>>>>> transactional producers, for example, what will happen to an ongoing >>>>>>>> transaction associated with an expired PID? Would this leave the >>>>>>>> transactions in a "hanging" state?* >>>>>>>> We currently check if a transaction is ongoing and do not expire the >>>>>>>> producer ID if it has an ongoing transaction. I suspect we will >>>>>>>> continue to >>>>>>>> do this with any solution we pick. >>>>>>>> >>>>>>>> My team members and I looked a bit into the throttling case and it >>>>>>>> can get >>>>>>>> a bit tricky since it means we need to throttle the produce request >>>>>>>> before >>>>>>>> it is processed. This means we either block all requests or have to >>>>>>>> store >>>>>>>> the request in memory (which is not great if we are trying to save >>>>>>>> memory). >>>>>>>> >>>>>>>> I recently had another discussion with my team and it does seem >>>>>>>> like there >>>>>>>> should be a way to make it more clear to the clients what is going >>>>>>>> on. A >>>>>>>> lot of this protocol is implicit. I'm wondering if maybe there is a >>>>>>>> way to >>>>>>>> improve the story for newer clients. (Ie if we choose to expire >>>>>>>> based on a >>>>>>>> size limit, we should include a response indicating the ID has >>>>>>>> expired.) We >>>>>>>> also discussed ways to redefine the guarantees so that users who >>>>>>>> have >>>>>>>> stronger idempotency requirements can ensure them (over >>>>>>>> availability/memory >>>>>>>> concerns). Let me know if you have any ideas here. >>>>>>>> >>>>>>>> Thanks again for commenting here, hopefully we can come to a good >>>>>>>> solution. >>>>>>>> >>>>>>>> On Tue, Oct 18, 2022 at 1:11 AM Luke Chen <show...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>> > Hi Omnia, >>>>>>>> > >>>>>>>> > Thanks for your reply. >>>>>>>> > >>>>>>>> > For (3), you said: >>>>>>>> > > - I have some concerns about the impact of this option on the >>>>>>>> > transactional >>>>>>>> > producers, for example, what will happen to an ongoing transaction >>>>>>>> > associated with an expired PID? Would this leave the transactions >>>>>>>> in a >>>>>>>> > "hanging" state? >>>>>>>> > >>>>>>>> > - How will we notify the client that the transaction can't >>>>>>>> continue due to >>>>>>>> > an expired PID? >>>>>>>> > >>>>>>>> > - If PID got marked as `expired` this will mean that >>>>>>>> > `admin.DescribeProducers` will not list them which will make >>>>>>>> > *`kafka-transactions.sh >>>>>>>> > --list`* a bit tricky as we can't identify if there are >>>>>>>> transactions linked >>>>>>>> > to this expired PID or not. The same concern applies to >>>>>>>> > *`kafka-transactions.sh >>>>>>>> > --find-hanging`*. >>>>>>>> > >>>>>>>> > --> Yes, you're right. Those are also concerns for this solution. >>>>>>>> > Currently, there's no way to notify clients about the expiration. >>>>>>>> > Also, the ongoing transactions will be hanging. For the admin >>>>>>>> cli, we've >>>>>>>> > never thought about it. Good point. >>>>>>>> > In summary, to adopt this solution, there are many issues needed >>>>>>>> to get >>>>>>>> > fixed. >>>>>>>> > >>>>>>>> > >>>>>>>> > For (5), you said: >>>>>>>> > > I am assuming you mean KafkaPrincipal here! If so is your >>>>>>>> concern here >>>>>>>> > that >>>>>>>> > those good clients that use the same principal as a rogue one >>>>>>>> will get >>>>>>>> > throttled? >>>>>>>> > >>>>>>>> > If this is the case, then I believe it should be okay as other >>>>>>>> throttling >>>>>>>> > in Kafka on *`/config/users/<user>`* has the same behaviour. >>>>>>>> > >>>>>>>> > >>>>>>>> > What about applying limit/throttling to >>>>>>>> > *`/config/users/<user>/clients/<client-id>` >>>>>>>> > *similar to what we have with client quota? This should reduce >>>>>>>> the concern >>>>>>>> > about throttling good clients, right? >>>>>>>> > >>>>>>>> > --> Yes, I mean KafkaPrinciple (sorry, I didn't make it clear) >>>>>>>> > Yes, We were thinking about throttling by KafkaPrinciple. Client >>>>>>>> Id is >>>>>>>> > also workable. >>>>>>>> > It's just these 2 attributes are not required. >>>>>>>> > That is, it's possible we take all clients as the same one: >>>>>>>> {default >>>>>>>> > KafkaPrinciple + default clientID}, and apply throttling on it. >>>>>>>> > Do you have any thoughts about it? >>>>>>>> > Maybe skip throttling for {default KafkaPrinciple + default >>>>>>>> clientID} ? >>>>>>>> > >>>>>>>> > Luke >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> > On Sat, Oct 15, 2022 at 2:33 AM Omnia Ibrahim < >>>>>>>> o.g.h.ibra...@gmail.com> >>>>>>>> > wrote: >>>>>>>> > >>>>>>>> >> Hi Luke & Justine, >>>>>>>> >> Thanks for looking into this issue, we have been experiencing >>>>>>>> this because >>>>>>>> >> of rouge clients as well. >>>>>>>> >> >>>>>>>> >> > 3. Having a limit to the number of active producer IDs (sort >>>>>>>> of like an >>>>>>>> >> LRU >>>>>>>> >> >cache) >>>>>>>> >> >-> The idea here is that if we hit a misconfigured client, we >>>>>>>> will expire >>>>>>>> >> >the older entries. The concern here is we have risks to lose >>>>>>>> idempotency >>>>>>>> >> >guarantees, and currently, we don't have a way to notify >>>>>>>> clients about >>>>>>>> >> >losing idempotency guarantees. Besides, the least recently >>>>>>>> used entries >>>>>>>> >> >got removed are not always from the "bad" clients. >>>>>>>> >> >>>>>>>> >> - I have some concerns about the impact of this option on the >>>>>>>> >> transactional >>>>>>>> >> producers, for example, what will happen to an ongoing >>>>>>>> transaction >>>>>>>> >> associated with an expired PID? Would this leave the >>>>>>>> transactions in a >>>>>>>> >> "hanging" state? >>>>>>>> >> >>>>>>>> >> - How will we notify the client that the transaction can't >>>>>>>> continue due to >>>>>>>> >> an expired PID? >>>>>>>> >> >>>>>>>> >> - If PID got marked as `expired` this will mean that >>>>>>>> >> `admin.DescribeProducers` will not list them which will make >>>>>>>> >> *`kafka-transactions.sh >>>>>>>> >> --list`* a bit tricky as we can't identify if there are >>>>>>>> transactions >>>>>>>> >> linked >>>>>>>> >> to this expired PID or not. The same concern applies to >>>>>>>> >> *`kafka-transactions.sh >>>>>>>> >> --find-hanging`*. >>>>>>>> >> >>>>>>>> >> >>>>>>>> >> >5. limit/throttling the producer id based on the principle >>>>>>>> >> >-> Although we can limit the impact to a certain principle with >>>>>>>> this >>>>>>>> >> idea, >>>>>>>> >> >same concern still exists as solution #1 #2. >>>>>>>> >> >>>>>>>> >> I am assuming you mean KafkaPrincipal here! If so is your >>>>>>>> concern here >>>>>>>> >> that >>>>>>>> >> those good clients that use the same principal as a rogue one >>>>>>>> will get >>>>>>>> >> throttled? >>>>>>>> >> >>>>>>>> >> If this is the case, then I believe it should be okay as other >>>>>>>> throttling >>>>>>>> >> in Kafka on *`/config/users/<user>`* has the same behaviour. >>>>>>>> >> >>>>>>>> >> >>>>>>>> >> What about applying limit/throttling to >>>>>>>> >> *`/config/users/<user>/clients/<client-id>` >>>>>>>> >> *similar to what we have with client quota? This should reduce >>>>>>>> the concern >>>>>>>> >> about throttling good clients, right? >>>>>>>> >> >>>>>>>> >> best, >>>>>>>> >> >>>>>>>> >> Omnia >>>>>>>> >> >>>>>>>> >> On Tue, Oct 11, 2022 at 4:18 AM Luke Chen <show...@gmail.com> >>>>>>>> wrote: >>>>>>>> >> >>>>>>>> >> > Bump this thread to see if there are any comments/thoughts. >>>>>>>> >> > Thanks. >>>>>>>> >> > >>>>>>>> >> > Luke >>>>>>>> >> > >>>>>>>> >> > On Mon, Sep 26, 2022 at 11:06 AM Luke Chen <show...@gmail.com> >>>>>>>> wrote: >>>>>>>> >> > >>>>>>>> >> > > Hi devs, >>>>>>>> >> > > >>>>>>>> >> > > As stated in the motivation section in KIP-854 >>>>>>>> >> > > < >>>>>>>> >> > >>>>>>>> >> >>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-854+Separate+configuration+for+producer+ID+expiry >>>>>>>> >> > >: >>>>>>>> >> > > >>>>>>>> >> > > With idempotent producers becoming the default in Kafka, >>>>>>>> this means >>>>>>>> >> that >>>>>>>> >> > > unless otherwise specified, all new producers will be given >>>>>>>> producer >>>>>>>> >> IDs. >>>>>>>> >> > > Some (inefficient) applications may now create many >>>>>>>> non-transactional >>>>>>>> >> > > idempotent producers. Each of these producers will be >>>>>>>> assigned a >>>>>>>> >> producer >>>>>>>> >> > > ID and these IDs and their metadata are stored in the broker >>>>>>>> memory, >>>>>>>> >> > which >>>>>>>> >> > > might cause brokers out of memory. >>>>>>>> >> > > >>>>>>>> >> > > Justine (in cc.) and I and some other team members are >>>>>>>> working on the >>>>>>>> >> > > solutions for this issue. But none of them solves it >>>>>>>> completely >>>>>>>> >> without >>>>>>>> >> > > side effects. Among them, "availability" VS "idempotency >>>>>>>> guarantees" >>>>>>>> >> is >>>>>>>> >> > > what we can't decide which to sacrifice. Some of these >>>>>>>> solutions >>>>>>>> >> > sacrifice >>>>>>>> >> > > availability of produce (1,2,5) and others sacrifice >>>>>>>> idempotency >>>>>>>> >> > guarantees >>>>>>>> >> > > (3). It could be useful to know if people generally have a >>>>>>>> preference >>>>>>>> >> one >>>>>>>> >> > > way or the other. Or what other better solutions there might >>>>>>>> be. >>>>>>>> >> > > >>>>>>>> >> > > Here are the proposals we came up with: >>>>>>>> >> > > >>>>>>>> >> > > 1. Limit the total active producer ID allocation number. >>>>>>>> >> > > -> This is the simplest solution. But since the OOM issue is >>>>>>>> usually >>>>>>>> >> > > caused by a rogue or misconfigured client, and this solution >>>>>>>> might >>>>>>>> >> > "punish" >>>>>>>> >> > > the good client from sending messages. >>>>>>>> >> > > >>>>>>>> >> > > 2. Throttling the producer ID allocation rate >>>>>>>> >> > > -> Same concern as the solution #1. >>>>>>>> >> > > >>>>>>>> >> > > 3. Having a limit to the number of active producer IDs (sort >>>>>>>> of like >>>>>>>> >> an >>>>>>>> >> > > LRU cache) >>>>>>>> >> > > -> The idea here is that if we hit a misconfigured client, >>>>>>>> we will >>>>>>>> >> expire >>>>>>>> >> > > the older entries. The concern here is we have risks to lose >>>>>>>> >> idempotency >>>>>>>> >> > > guarantees, and currently, we don't have a way to notify >>>>>>>> clients about >>>>>>>> >> > > losing idempotency guarantees. Besides, the least recently >>>>>>>> used >>>>>>>> >> entries >>>>>>>> >> > > got removed are not always from the "bad" clients. >>>>>>>> >> > > >>>>>>>> >> > > 4. allow clients to "close" the producer ID usage >>>>>>>> >> > > -> We can provide a way for producer to "close" producerID >>>>>>>> usage. >>>>>>>> >> > > Currently, we only have a way to INIT_PRODUCER_ID requested >>>>>>>> to >>>>>>>> >> allocate >>>>>>>> >> > > one. After that, we'll keep the producer ID metadata in >>>>>>>> broker even if >>>>>>>> >> > the >>>>>>>> >> > > producer is "closed". Having a closed API (ex: >>>>>>>> END_PRODUCER_ID), we >>>>>>>> >> can >>>>>>>> >> > > remove the entry from broker side. In client side, we can >>>>>>>> send it when >>>>>>>> >> > > producer closing. The concern is, the old clients (including >>>>>>>> non-java >>>>>>>> >> > > clients) will still suffer from the OOM issue. >>>>>>>> >> > > >>>>>>>> >> > > 5. limit/throttling the producer id based on the principle >>>>>>>> >> > > -> Although we can limit the impact to a certain principle >>>>>>>> with this >>>>>>>> >> > idea, >>>>>>>> >> > > same concern still exists as solution #1 #2. >>>>>>>> >> > > >>>>>>>> >> > > Any thoughts/feedback are welcomed. >>>>>>>> >> > > >>>>>>>> >> > > Thank you. >>>>>>>> >> > > Luke >>>>>>>> >> > > >>>>>>>> >> > >>>>>>>> >> >>>>>>>> > >>>>>>>> >>>>>>>