Hi Justine and Luke, I started a KIP draft here https://cwiki.apache.org/confluence/display/KAFKA/KIP-936%3A+Throttle+number+of+active+PIDs for a proposal would appreciate it if you could provide any initial feedback before opening a broader discussion.
Thanks On Wed, Feb 22, 2023 at 4:35 PM Omnia Ibrahim <o.g.h.ibra...@gmail.com> wrote: > > *Hi Justine, * > > *My initial thought of throttling the initProducerId was to get ripped off > the problem at the source (which creates too many PIDs per client) and fail > faster but if having this on the produce request level is easier this > should be fine. I am guessing it will be the same direction as we may > ClientQuotaManage for Produce throttling with a different quota window than > `quota.window.size.seconds `. * > > *If this is good as an initial solution I can put start a KIP and see what > the wider community feels about this. * > > *Also, I noticed that at some point one of us hit "Replay" instead of > "Replay to All" :) So here are the previous conversations* > > *On Wed, Feb 15, 2023 at 12:20 AM Justine Olshan <jols...@confluent.io > <jols...@confluent.io>> wrote:* > >> Hey Omnia, >> >> Thanks for the response. I think I understand your explanations here with >> respect to principal and clientId usage. >> >> For the throttling -- handleInitProducerIdRequest will allocate the ID to >> the producer, but we don't actually store it on the broker or increment our >> metric until the first produce request for that producer is sent (or sent >> again after previously expiring). Would you consider throttling the produce >> request instead? It may be hard to get any metrics from the transaction >> coordinator where the initProducerId request is handled. >> >> Justine > > > *On Tue, Feb 14, 2023 at 9:29 AM Omnia Ibrahim <o.g.h.ibra...@gmail.com > <o.g.h.ibra...@gmail.com>> wrote:* > >> Hey Justine, >> > If I understand your message correctly, there are issues with >> identifying the source of the rogue clients? So you propose to add a new >> metric for that? >> > And also proposing to throttle based on clientId as a potential follow >> up? >> I want to identify rogue clients by KafkaPrincipal (and/or clientId) >> similarly to how we identify clients in Fetch/Produce/Request >> QuotaManagers. Using KafkaPrincipal should give cluster admin the ability >> to throttle later based on principal which is most likely to be a smaller >> set than clientIds. My initial thought was to add a metrics that represent >> how many InitProducerIDRequest are sent by KafkaPrincipal (and/or clientId) >> similar to Fetch/Produce QuotaManagers. >> Then as a follow-up, we can throttle based on either KafkaPrinciple or >> clientId (maybe default as well to align this with other QuotaManagers in >> Kafka). >> >> >1. Does we rely on the client using the same ID? What if there are many >> clients that all use different client IDs? >> This is why I want to use the combination of KafkaPrincipal or clientId >> similar to some other quotas we have in Kafka already. This will be a >> similar risk to Fetch/Produce quota in Kafka which also relay on the client >> to use the same clientId and KafkaPrincipal. >> >> >2. Are there places where high cardinality of this metric is a concern? >> I can imagine many client IDs in the system. Would we treat this as a rate >> metric (ie, when we get an init producer ID and return a new producer ID we >> emit a count for that client id?) Or something else? >> My initial thought here was to follow the steps of ClientQuotaManager and >> ClientRequestQuotaManager and use a rate metric. However, I think we can >> emit it either >> >> 1. when we return the new PID. However, I have concerns that we may >> circle back to the previous concerns with OMM due to keeping track of >> ACTIVE PIDs per KafkaPrincipal(and/or) clientId in the future. Also this >> would be the first time Kafka throttle IDs for any client. >> 2. or once we recieve initProducerIDRequest and throttle before even >> hitting `handleInitProducerIdRequest`. Going this direction we may need to >> throttle it within a different quota window than ` >> quota.window.size.seconds ` as throttling INIT_PRODUCER_ID request >> per second wouldn't be efficient for most cases. I personally think this >> direction is easier as it seems more consistent with the existing quota >> implementation. Specially that Kafka has already the concept of throttling >> subset of requests (in ControllerMutationQuotaManager) but never had any >> concept of throttling active IDs for any client. >> >> >> What do you think? >> >> Thanks >> Omnia >> > > *On Thu, Feb 2, 2023 at 5:39 PM Justine Olshan <jols...@confluent.io > <jols...@confluent.io>> wrote:* > >> Hey Omnia, >> Sorry for losing track of this. >> >> If I understand your message correctly, there are issues with identifying >> the source of the rogue clients? So you propose to add a new metric for >> that? >> And also proposing to throttle based on clientId as a potential follow up? >> >> I think both of these make sense. The only things I can think of for the >> metric are: >> 1. Does we rely on the client using the same ID? What if there are many >> clients that all use different client IDs? >> 2. Are there places where high cardinality of this metric is a concern? I >> can imagine many client IDs in the system. Would we treat this as a rate >> metric (ie, when we get an init producer ID and return a new producer ID we >> emit a count for that client id?) Or something else? >> >> Thanks, >> Justine >> > > Thanks > Omnia > > On Thu, Feb 2, 2023 at 4:44 PM Omnia Ibrahim <o.g.h.ibra...@gmail.com> > wrote: > >> Hi Luke and Justine, >> Are there any thoughts or updates on this? I would love to help with this >> as we are hitting this more frequently now. >> >> best, >> >> On Mon, Oct 31, 2022 at 6:15 PM Omnia Ibrahim <o.g.h.ibra...@gmail.com> >> wrote: >> >>> Hi Luke and Justine, >>> >>>> For (3), you said: >>>> > - I have some concerns about the impact of this option on the >>>> transactional >>>> producers, for example, what will happen to an ongoing transaction >>>> associated with an expired PID? Would this leave the transactions in a >>>> "hanging" state? >>>> >>>> - How will we notify the client that the transaction can't continue due >>>> to >>>> an expired PID? >>>> >>>> - If PID got marked as `expired` this will mean that >>>> `admin.DescribeProducers` will not list them which will make >>>> *`kafka-transactions.sh >>>> --list`* a bit tricky as we can't identify if there are transactions >>>> linked >>>> to this expired PID or not. The same concern applies to >>>> *`kafka-transactions.sh >>>> --find-hanging`*. >>>> >>>> --> Yes, you're right. Those are also concerns for this solution. >>>> Currently, there's no way to notify clients about the expiration. >>>> Also, the ongoing transactions will be hanging. For the admin cli, we've >>>> never thought about it. Good point. >>>> In summary, to adopt this solution, there are many issues needed to get >>>> fixed. >>>> >>> >>> Justin already clarified that if PID is attached to a transaction it >>> will not expire so identifying the transactions shouldn't be a concern >>> anymore. >>> The only concern here will be that this solution will not solve the >>> problem if the rouge producer is a transactional producer with hanging >>> transactions. >>> If anyone faced this situation they will need to abort the hanging >>> transactions manually and then the solution to expire a PID can then work. >>> >>> --> Yes, I mean KafkaPrinciple (sorry, I didn't make it clear) >>>> Yes, We were thinking about throttling by KafkaPrinciple. Client Id is >>>> also >>>> workable. >>>> It's just these 2 attributes are not required. >>>> That is, it's possible we take all clients as the same one: {default >>>> KafkaPrinciple + default clientID}, and apply throttling on it. >>>> Do you have any thoughts about it? >>>> Maybe skip throttling for {default KafkaPrinciple + default clientID} >>>> >>> >>> Throttling for default KafkaPrinciple and default ClientID is useful >>> when we need to have a hard limit on the whole cluster and whoever is >>> running the cluster doesn't knowclientsntIDs or if a KafkaPrinciple is >>> reused between different producer applications. >>> I usually find it helpful to have a way to apply throttling only on the >>> rough clients only once I identify them without punishing everyone on the >>> cluster. However, there are two problems with this >>> - There's no easy way at the moment to link PIDs to clientId or >>> KafkaPrinciple. This need to be addressed first. >>> - Is Justin's comment on the throttling, and the fact that will mean we >>> either block all requests or have to store the request in memory which in >>> both cases has side downs on the producer experince. >>> >>> I recently had another discussion with my team and it does seem like >>>> there >>>> should be a way to make it more clear to the clients what is going on. A >>>> lot of this protocol is implicit. I'm wondering if maybe there is a way >>>> to >>>> improve the story for newer clients. (Ie if we choose to expire based >>>> on a >>>> size limit, we should include a response indicating the ID has >>>> expired.) We >>>> also discussed ways to redefine the guarantees so that users who have >>>> stronger idempotency requirements can ensure them (over >>>> availability/memory >>>> concerns). Let me know if you have any ideas here. >>>> >>> >>> It may be easier to improve the experience for new clients. However, if >>> we improved only the new clients we may need a way to help teams who run >>> Kafka with rough clients on old versions by at least giving them an easy >>> way to identify the clientId/ or KafkaPrinciple that generated these PIDs. >>> >>> For context, it's very tricky to even identify which clientId is >>> creating all these PIDs that caused OOM, which is a contributing part of >>> the issue at the moment. So maybe one option here could be adding a new >>> metric that tracks the number of generated PIDs per clientId. This will >>> help the team who runs the Kafka cluster to >>> - contact these rough clients and ask them to fix their clients or >>> upgrade to a new client if the new client version has a better experience. >>> - or if ended with a throttling solution this may help identify which >>> clientId needs to be throttled. >>> >>> Maybe we can start with a solution for identifying the rough clients >>> first and keep looking for a solution to limit them, what do you think? >>> >>> Thanks >>> >>> On Tue, Oct 18, 2022 at 5:24 PM Justine Olshan >>> <jols...@confluent.io.invalid> wrote: >>> >>>> Oops. I realized I just replied to Omnia 🤦♀️ >>>> >>>> Here was my response for the mailing thread: >>>> >>>> Hey Omnia, >>>> Sorry to hear this is a problem for you as well. :( >>>> > * I have some concerns about the impact of this option on the >>>> transactional producers, for example, what will happen to an ongoing >>>> transaction associated with an expired PID? Would this leave the >>>> transactions in a "hanging" state?* >>>> We currently check if a transaction is ongoing and do not expire the >>>> producer ID if it has an ongoing transaction. I suspect we will >>>> continue to >>>> do this with any solution we pick. >>>> >>>> My team members and I looked a bit into the throttling case and it can >>>> get >>>> a bit tricky since it means we need to throttle the produce request >>>> before >>>> it is processed. This means we either block all requests or have to >>>> store >>>> the request in memory (which is not great if we are trying to save >>>> memory). >>>> >>>> I recently had another discussion with my team and it does seem like >>>> there >>>> should be a way to make it more clear to the clients what is going on. A >>>> lot of this protocol is implicit. I'm wondering if maybe there is a way >>>> to >>>> improve the story for newer clients. (Ie if we choose to expire based >>>> on a >>>> size limit, we should include a response indicating the ID has >>>> expired.) We >>>> also discussed ways to redefine the guarantees so that users who have >>>> stronger idempotency requirements can ensure them (over >>>> availability/memory >>>> concerns). Let me know if you have any ideas here. >>>> >>>> Thanks again for commenting here, hopefully we can come to a good >>>> solution. >>>> >>>> On Tue, Oct 18, 2022 at 1:11 AM Luke Chen <show...@gmail.com> wrote: >>>> >>>> > Hi Omnia, >>>> > >>>> > Thanks for your reply. >>>> > >>>> > For (3), you said: >>>> > > - I have some concerns about the impact of this option on the >>>> > transactional >>>> > producers, for example, what will happen to an ongoing transaction >>>> > associated with an expired PID? Would this leave the transactions in a >>>> > "hanging" state? >>>> > >>>> > - How will we notify the client that the transaction can't continue >>>> due to >>>> > an expired PID? >>>> > >>>> > - If PID got marked as `expired` this will mean that >>>> > `admin.DescribeProducers` will not list them which will make >>>> > *`kafka-transactions.sh >>>> > --list`* a bit tricky as we can't identify if there are transactions >>>> linked >>>> > to this expired PID or not. The same concern applies to >>>> > *`kafka-transactions.sh >>>> > --find-hanging`*. >>>> > >>>> > --> Yes, you're right. Those are also concerns for this solution. >>>> > Currently, there's no way to notify clients about the expiration. >>>> > Also, the ongoing transactions will be hanging. For the admin cli, >>>> we've >>>> > never thought about it. Good point. >>>> > In summary, to adopt this solution, there are many issues needed to >>>> get >>>> > fixed. >>>> > >>>> > >>>> > For (5), you said: >>>> > > I am assuming you mean KafkaPrincipal here! If so is your concern >>>> here >>>> > that >>>> > those good clients that use the same principal as a rogue one will get >>>> > throttled? >>>> > >>>> > If this is the case, then I believe it should be okay as other >>>> throttling >>>> > in Kafka on *`/config/users/<user>`* has the same behaviour. >>>> > >>>> > >>>> > What about applying limit/throttling to >>>> > *`/config/users/<user>/clients/<client-id>` >>>> > *similar to what we have with client quota? This should reduce the >>>> concern >>>> > about throttling good clients, right? >>>> > >>>> > --> Yes, I mean KafkaPrinciple (sorry, I didn't make it clear) >>>> > Yes, We were thinking about throttling by KafkaPrinciple. Client Id is >>>> > also workable. >>>> > It's just these 2 attributes are not required. >>>> > That is, it's possible we take all clients as the same one: {default >>>> > KafkaPrinciple + default clientID}, and apply throttling on it. >>>> > Do you have any thoughts about it? >>>> > Maybe skip throttling for {default KafkaPrinciple + default clientID} >>>> ? >>>> > >>>> > Luke >>>> > >>>> > >>>> > >>>> > On Sat, Oct 15, 2022 at 2:33 AM Omnia Ibrahim < >>>> o.g.h.ibra...@gmail.com> >>>> > wrote: >>>> > >>>> >> Hi Luke & Justine, >>>> >> Thanks for looking into this issue, we have been experiencing this >>>> because >>>> >> of rouge clients as well. >>>> >> >>>> >> > 3. Having a limit to the number of active producer IDs (sort of >>>> like an >>>> >> LRU >>>> >> >cache) >>>> >> >-> The idea here is that if we hit a misconfigured client, we will >>>> expire >>>> >> >the older entries. The concern here is we have risks to lose >>>> idempotency >>>> >> >guarantees, and currently, we don't have a way to notify clients >>>> about >>>> >> >losing idempotency guarantees. Besides, the least recently used >>>> entries >>>> >> >got removed are not always from the "bad" clients. >>>> >> >>>> >> - I have some concerns about the impact of this option on the >>>> >> transactional >>>> >> producers, for example, what will happen to an ongoing transaction >>>> >> associated with an expired PID? Would this leave the transactions in >>>> a >>>> >> "hanging" state? >>>> >> >>>> >> - How will we notify the client that the transaction can't continue >>>> due to >>>> >> an expired PID? >>>> >> >>>> >> - If PID got marked as `expired` this will mean that >>>> >> `admin.DescribeProducers` will not list them which will make >>>> >> *`kafka-transactions.sh >>>> >> --list`* a bit tricky as we can't identify if there are transactions >>>> >> linked >>>> >> to this expired PID or not. The same concern applies to >>>> >> *`kafka-transactions.sh >>>> >> --find-hanging`*. >>>> >> >>>> >> >>>> >> >5. limit/throttling the producer id based on the principle >>>> >> >-> Although we can limit the impact to a certain principle with this >>>> >> idea, >>>> >> >same concern still exists as solution #1 #2. >>>> >> >>>> >> I am assuming you mean KafkaPrincipal here! If so is your concern >>>> here >>>> >> that >>>> >> those good clients that use the same principal as a rogue one will >>>> get >>>> >> throttled? >>>> >> >>>> >> If this is the case, then I believe it should be okay as other >>>> throttling >>>> >> in Kafka on *`/config/users/<user>`* has the same behaviour. >>>> >> >>>> >> >>>> >> What about applying limit/throttling to >>>> >> *`/config/users/<user>/clients/<client-id>` >>>> >> *similar to what we have with client quota? This should reduce the >>>> concern >>>> >> about throttling good clients, right? >>>> >> >>>> >> best, >>>> >> >>>> >> Omnia >>>> >> >>>> >> On Tue, Oct 11, 2022 at 4:18 AM Luke Chen <show...@gmail.com> wrote: >>>> >> >>>> >> > Bump this thread to see if there are any comments/thoughts. >>>> >> > Thanks. >>>> >> > >>>> >> > Luke >>>> >> > >>>> >> > On Mon, Sep 26, 2022 at 11:06 AM Luke Chen <show...@gmail.com> >>>> wrote: >>>> >> > >>>> >> > > Hi devs, >>>> >> > > >>>> >> > > As stated in the motivation section in KIP-854 >>>> >> > > < >>>> >> > >>>> >> >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-854+Separate+configuration+for+producer+ID+expiry >>>> >> > >: >>>> >> > > >>>> >> > > With idempotent producers becoming the default in Kafka, this >>>> means >>>> >> that >>>> >> > > unless otherwise specified, all new producers will be given >>>> producer >>>> >> IDs. >>>> >> > > Some (inefficient) applications may now create many >>>> non-transactional >>>> >> > > idempotent producers. Each of these producers will be assigned a >>>> >> producer >>>> >> > > ID and these IDs and their metadata are stored in the broker >>>> memory, >>>> >> > which >>>> >> > > might cause brokers out of memory. >>>> >> > > >>>> >> > > Justine (in cc.) and I and some other team members are working >>>> on the >>>> >> > > solutions for this issue. But none of them solves it completely >>>> >> without >>>> >> > > side effects. Among them, "availability" VS "idempotency >>>> guarantees" >>>> >> is >>>> >> > > what we can't decide which to sacrifice. Some of these solutions >>>> >> > sacrifice >>>> >> > > availability of produce (1,2,5) and others sacrifice idempotency >>>> >> > guarantees >>>> >> > > (3). It could be useful to know if people generally have a >>>> preference >>>> >> one >>>> >> > > way or the other. Or what other better solutions there might be. >>>> >> > > >>>> >> > > Here are the proposals we came up with: >>>> >> > > >>>> >> > > 1. Limit the total active producer ID allocation number. >>>> >> > > -> This is the simplest solution. But since the OOM issue is >>>> usually >>>> >> > > caused by a rogue or misconfigured client, and this solution >>>> might >>>> >> > "punish" >>>> >> > > the good client from sending messages. >>>> >> > > >>>> >> > > 2. Throttling the producer ID allocation rate >>>> >> > > -> Same concern as the solution #1. >>>> >> > > >>>> >> > > 3. Having a limit to the number of active producer IDs (sort of >>>> like >>>> >> an >>>> >> > > LRU cache) >>>> >> > > -> The idea here is that if we hit a misconfigured client, we >>>> will >>>> >> expire >>>> >> > > the older entries. The concern here is we have risks to lose >>>> >> idempotency >>>> >> > > guarantees, and currently, we don't have a way to notify clients >>>> about >>>> >> > > losing idempotency guarantees. Besides, the least recently used >>>> >> entries >>>> >> > > got removed are not always from the "bad" clients. >>>> >> > > >>>> >> > > 4. allow clients to "close" the producer ID usage >>>> >> > > -> We can provide a way for producer to "close" producerID usage. >>>> >> > > Currently, we only have a way to INIT_PRODUCER_ID requested to >>>> >> allocate >>>> >> > > one. After that, we'll keep the producer ID metadata in broker >>>> even if >>>> >> > the >>>> >> > > producer is "closed". Having a closed API (ex: END_PRODUCER_ID), >>>> we >>>> >> can >>>> >> > > remove the entry from broker side. In client side, we can send >>>> it when >>>> >> > > producer closing. The concern is, the old clients (including >>>> non-java >>>> >> > > clients) will still suffer from the OOM issue. >>>> >> > > >>>> >> > > 5. limit/throttling the producer id based on the principle >>>> >> > > -> Although we can limit the impact to a certain principle with >>>> this >>>> >> > idea, >>>> >> > > same concern still exists as solution #1 #2. >>>> >> > > >>>> >> > > Any thoughts/feedback are welcomed. >>>> >> > > >>>> >> > > Thank you. >>>> >> > > Luke >>>> >> > > >>>> >> > >>>> >> >>>> > >>>> >>> > On Wed, Feb 15, 2023 at 12:20 AM Justine Olshan <jols...@confluent.io> > wrote: > >> Hey Omnia, >> >> Thanks for the response. I think I understand your explanations here with >> respect to principal and clientId usage. >> >> For the throttling -- handleInitProducerIdRequest will allocate the ID to >> the producer, but we don't actually store it on the broker or increment our >> metric until the first produce request for that producer is sent (or sent >> again after previously expiring). Would you consider throttling the produce >> request instead? It may be hard to get any metrics from the transaction >> coordinator where the initProducerId request is handled. >> >> Justine >> >> On Tue, Feb 14, 2023 at 9:29 AM Omnia Ibrahim <o.g.h.ibra...@gmail.com> >> wrote: >> >>> Hey Justine, >>> > If I understand your message correctly, there are issues with >>> identifying the source of the rogue clients? So you propose to add a new >>> metric for that? >>> > And also proposing to throttle based on clientId as a potential follow >>> up? >>> I want to identify rogue clients by KafkaPrincipal (and/or clientId) >>> similarly to how we identify clients in Fetch/Produce/Request >>> QuotaManagers. Using KafkaPrincipal should give cluster admin the ability >>> to throttle later based on principal which is most likely to be a smaller >>> set than clientIds. My initial thought was to add a metrics that represent >>> how many InitProducerIDRequest are sent by KafkaPrincipal (and/or clientId) >>> similar to Fetch/Produce QuotaManagers. >>> Then as a follow-up, we can throttle based on either KafkaPrinciple or >>> clientId (maybe default as well to align this with other QuotaManagers in >>> Kafka). >>> >>> >1. Does we rely on the client using the same ID? What if there are many >>> clients that all use different client IDs? >>> This is why I want to use the combination of KafkaPrincipal or clientId >>> similar to some other quotas we have in Kafka already. This will be a >>> similar risk to Fetch/Produce quota in Kafka which also relay on the client >>> to use the same clientId and KafkaPrincipal. >>> >>> >2. Are there places where high cardinality of this metric is a concern? >>> I can imagine many client IDs in the system. Would we treat this as a rate >>> metric (ie, when we get an init producer ID and return a new producer ID we >>> emit a count for that client id?) Or something else? >>> My initial thought here was to follow the steps of ClientQuotaManager >>> and ClientRequestQuotaManager and use a rate metric. However, I think we >>> can emit it either >>> >>> 1. when we return the new PID. However, I have concerns that we may >>> circle back to the previous concerns with OMM due to keeping track of >>> ACTIVE PIDs per KafkaPrincipal(and/or) clientId in the future. Also this >>> would be the first time Kafka throttle IDs for any client. >>> 2. or once we recieve initProducerIDRequest and throttle before even >>> hitting `handleInitProducerIdRequest`. Going this direction we may need >>> to >>> throttle it within a different quota window than ` >>> quota.window.size.seconds ` as throttling INIT_PRODUCER_ID request >>> per second wouldn't be efficient for most cases. I personally think this >>> direction is easier as it seems more consistent with the existing quota >>> implementation. Specially that Kafka has already the concept of >>> throttling >>> subset of requests (in ControllerMutationQuotaManager) but never had any >>> concept of throttling active IDs for any client. >>> >>> >>> What do you think? >>> >>> Thanks >>> Omnia >>> >>> On Thu, Feb 2, 2023 at 5:39 PM Justine Olshan <jols...@confluent.io> >>> wrote: >>> >>>> Hey Omnia, >>>> Sorry for losing track of this. >>>> >>>> If I understand your message correctly, there are issues with >>>> identifying the source of the rogue clients? So you propose to add a new >>>> metric for that? >>>> And also proposing to throttle based on clientId as a potential follow >>>> up? >>>> >>>> I think both of these make sense. The only things I can think of for >>>> the metric are: >>>> 1. Does we rely on the client using the same ID? What if there are many >>>> clients that all use different client IDs? >>>> 2. Are there places where high cardinality of this metric is a concern? >>>> I can imagine many client IDs in the system. Would we treat this as a rate >>>> metric (ie, when we get an init producer ID and return a new producer ID we >>>> emit a count for that client id?) Or something else? >>>> >>>> Thanks, >>>> Justine >>>> >>>> On Thu, Feb 2, 2023 at 8:44 AM Omnia Ibrahim <o.g.h.ibra...@gmail.com> >>>> wrote: >>>> >>>>> Hi Luke and Justine, >>>>> Are there any thoughts or updates on this? I would love to help with >>>>> this as we are hitting this more frequently now. >>>>> >>>>> best, >>>>> >>>>> On Mon, Oct 31, 2022 at 6:15 PM Omnia Ibrahim <o.g.h.ibra...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi Luke and Justine, >>>>>> >>>>>>> For (3), you said: >>>>>>> > - I have some concerns about the impact of this option on the >>>>>>> transactional >>>>>>> producers, for example, what will happen to an ongoing transaction >>>>>>> associated with an expired PID? Would this leave the transactions in >>>>>>> a >>>>>>> "hanging" state? >>>>>>> >>>>>>> - How will we notify the client that the transaction can't continue >>>>>>> due to >>>>>>> an expired PID? >>>>>>> >>>>>>> - If PID got marked as `expired` this will mean that >>>>>>> `admin.DescribeProducers` will not list them which will make >>>>>>> *`kafka-transactions.sh >>>>>>> --list`* a bit tricky as we can't identify if there are transactions >>>>>>> linked >>>>>>> to this expired PID or not. The same concern applies to >>>>>>> *`kafka-transactions.sh >>>>>>> --find-hanging`*. >>>>>>> >>>>>>> --> Yes, you're right. Those are also concerns for this solution. >>>>>>> Currently, there's no way to notify clients about the expiration. >>>>>>> Also, the ongoing transactions will be hanging. For the admin cli, >>>>>>> we've >>>>>>> never thought about it. Good point. >>>>>>> In summary, to adopt this solution, there are many issues needed to >>>>>>> get >>>>>>> fixed. >>>>>>> >>>>>> >>>>>> Justin already clarified that if PID is attached to a transaction it >>>>>> will not expire so identifying the transactions shouldn't be a concern >>>>>> anymore. >>>>>> The only concern here will be that this solution will not solve the >>>>>> problem if the rouge producer is a transactional producer with hanging >>>>>> transactions. >>>>>> If anyone faced this situation they will need to abort the hanging >>>>>> transactions manually and then the solution to expire a PID can then >>>>>> work. >>>>>> >>>>>> --> Yes, I mean KafkaPrinciple (sorry, I didn't make it clear) >>>>>>> Yes, We were thinking about throttling by KafkaPrinciple. Client Id >>>>>>> is also >>>>>>> workable. >>>>>>> It's just these 2 attributes are not required. >>>>>>> That is, it's possible we take all clients as the same one: {default >>>>>>> KafkaPrinciple + default clientID}, and apply throttling on it. >>>>>>> Do you have any thoughts about it? >>>>>>> Maybe skip throttling for {default KafkaPrinciple + default >>>>>>> clientID} >>>>>>> >>>>>> >>>>>> Throttling for default KafkaPrinciple and default ClientID is useful >>>>>> when we need to have a hard limit on the whole cluster and whoever is >>>>>> running the cluster doesn't knowclientsntIDs or if a KafkaPrinciple is >>>>>> reused between different producer applications. >>>>>> I usually find it helpful to have a way to apply throttling only on >>>>>> the rough clients only once I identify them without punishing everyone on >>>>>> the cluster. However, there are two problems with this >>>>>> - There's no easy way at the moment to link PIDs to clientId or >>>>>> KafkaPrinciple. This need to be addressed first. >>>>>> - Is Justin's comment on the throttling, and the fact that will mean >>>>>> we either block all requests or have to store the request in memory which >>>>>> in both cases has side downs on the producer experince. >>>>>> >>>>>> I recently had another discussion with my team and it does seem like >>>>>>> there >>>>>>> should be a way to make it more clear to the clients what is going >>>>>>> on. A >>>>>>> lot of this protocol is implicit. I'm wondering if maybe there is a >>>>>>> way to >>>>>>> improve the story for newer clients. (Ie if we choose to expire >>>>>>> based on a >>>>>>> size limit, we should include a response indicating the ID has >>>>>>> expired.) We >>>>>>> also discussed ways to redefine the guarantees so that users who have >>>>>>> stronger idempotency requirements can ensure them (over >>>>>>> availability/memory >>>>>>> concerns). Let me know if you have any ideas here. >>>>>>> >>>>>> >>>>>> It may be easier to improve the experience for new clients. However, >>>>>> if we improved only the new clients we may need a way to help teams who >>>>>> run >>>>>> Kafka with rough clients on old versions by at least giving them an easy >>>>>> way to identify the clientId/ or KafkaPrinciple that generated these >>>>>> PIDs. >>>>>> >>>>>> For context, it's very tricky to even identify which clientId is >>>>>> creating all these PIDs that caused OOM, which is a contributing part of >>>>>> the issue at the moment. So maybe one option here could be adding a new >>>>>> metric that tracks the number of generated PIDs per clientId. This will >>>>>> help the team who runs the Kafka cluster to >>>>>> - contact these rough clients and ask them to fix their clients or >>>>>> upgrade to a new client if the new client version has a better >>>>>> experience. >>>>>> - or if ended with a throttling solution this may help identify which >>>>>> clientId needs to be throttled. >>>>>> >>>>>> Maybe we can start with a solution for identifying the rough clients >>>>>> first and keep looking for a solution to limit them, what do you think? >>>>>> >>>>>> Thanks >>>>>> >>>>>> On Tue, Oct 18, 2022 at 5:24 PM Justine Olshan >>>>>> <jols...@confluent.io.invalid> wrote: >>>>>> >>>>>>> Oops. I realized I just replied to Omnia 🤦♀️ >>>>>>> >>>>>>> Here was my response for the mailing thread: >>>>>>> >>>>>>> Hey Omnia, >>>>>>> Sorry to hear this is a problem for you as well. :( >>>>>>> > * I have some concerns about the impact of this option on the >>>>>>> transactional producers, for example, what will happen to an ongoing >>>>>>> transaction associated with an expired PID? Would this leave the >>>>>>> transactions in a "hanging" state?* >>>>>>> We currently check if a transaction is ongoing and do not expire the >>>>>>> producer ID if it has an ongoing transaction. I suspect we will >>>>>>> continue to >>>>>>> do this with any solution we pick. >>>>>>> >>>>>>> My team members and I looked a bit into the throttling case and it >>>>>>> can get >>>>>>> a bit tricky since it means we need to throttle the produce request >>>>>>> before >>>>>>> it is processed. This means we either block all requests or have to >>>>>>> store >>>>>>> the request in memory (which is not great if we are trying to save >>>>>>> memory). >>>>>>> >>>>>>> I recently had another discussion with my team and it does seem like >>>>>>> there >>>>>>> should be a way to make it more clear to the clients what is going >>>>>>> on. A >>>>>>> lot of this protocol is implicit. I'm wondering if maybe there is a >>>>>>> way to >>>>>>> improve the story for newer clients. (Ie if we choose to expire >>>>>>> based on a >>>>>>> size limit, we should include a response indicating the ID has >>>>>>> expired.) We >>>>>>> also discussed ways to redefine the guarantees so that users who have >>>>>>> stronger idempotency requirements can ensure them (over >>>>>>> availability/memory >>>>>>> concerns). Let me know if you have any ideas here. >>>>>>> >>>>>>> Thanks again for commenting here, hopefully we can come to a good >>>>>>> solution. >>>>>>> >>>>>>> On Tue, Oct 18, 2022 at 1:11 AM Luke Chen <show...@gmail.com> wrote: >>>>>>> >>>>>>> > Hi Omnia, >>>>>>> > >>>>>>> > Thanks for your reply. >>>>>>> > >>>>>>> > For (3), you said: >>>>>>> > > - I have some concerns about the impact of this option on the >>>>>>> > transactional >>>>>>> > producers, for example, what will happen to an ongoing transaction >>>>>>> > associated with an expired PID? Would this leave the transactions >>>>>>> in a >>>>>>> > "hanging" state? >>>>>>> > >>>>>>> > - How will we notify the client that the transaction can't >>>>>>> continue due to >>>>>>> > an expired PID? >>>>>>> > >>>>>>> > - If PID got marked as `expired` this will mean that >>>>>>> > `admin.DescribeProducers` will not list them which will make >>>>>>> > *`kafka-transactions.sh >>>>>>> > --list`* a bit tricky as we can't identify if there are >>>>>>> transactions linked >>>>>>> > to this expired PID or not. The same concern applies to >>>>>>> > *`kafka-transactions.sh >>>>>>> > --find-hanging`*. >>>>>>> > >>>>>>> > --> Yes, you're right. Those are also concerns for this solution. >>>>>>> > Currently, there's no way to notify clients about the expiration. >>>>>>> > Also, the ongoing transactions will be hanging. For the admin cli, >>>>>>> we've >>>>>>> > never thought about it. Good point. >>>>>>> > In summary, to adopt this solution, there are many issues needed >>>>>>> to get >>>>>>> > fixed. >>>>>>> > >>>>>>> > >>>>>>> > For (5), you said: >>>>>>> > > I am assuming you mean KafkaPrincipal here! If so is your >>>>>>> concern here >>>>>>> > that >>>>>>> > those good clients that use the same principal as a rogue one will >>>>>>> get >>>>>>> > throttled? >>>>>>> > >>>>>>> > If this is the case, then I believe it should be okay as other >>>>>>> throttling >>>>>>> > in Kafka on *`/config/users/<user>`* has the same behaviour. >>>>>>> > >>>>>>> > >>>>>>> > What about applying limit/throttling to >>>>>>> > *`/config/users/<user>/clients/<client-id>` >>>>>>> > *similar to what we have with client quota? This should reduce the >>>>>>> concern >>>>>>> > about throttling good clients, right? >>>>>>> > >>>>>>> > --> Yes, I mean KafkaPrinciple (sorry, I didn't make it clear) >>>>>>> > Yes, We were thinking about throttling by KafkaPrinciple. Client >>>>>>> Id is >>>>>>> > also workable. >>>>>>> > It's just these 2 attributes are not required. >>>>>>> > That is, it's possible we take all clients as the same one: >>>>>>> {default >>>>>>> > KafkaPrinciple + default clientID}, and apply throttling on it. >>>>>>> > Do you have any thoughts about it? >>>>>>> > Maybe skip throttling for {default KafkaPrinciple + default >>>>>>> clientID} ? >>>>>>> > >>>>>>> > Luke >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > On Sat, Oct 15, 2022 at 2:33 AM Omnia Ibrahim < >>>>>>> o.g.h.ibra...@gmail.com> >>>>>>> > wrote: >>>>>>> > >>>>>>> >> Hi Luke & Justine, >>>>>>> >> Thanks for looking into this issue, we have been experiencing >>>>>>> this because >>>>>>> >> of rouge clients as well. >>>>>>> >> >>>>>>> >> > 3. Having a limit to the number of active producer IDs (sort of >>>>>>> like an >>>>>>> >> LRU >>>>>>> >> >cache) >>>>>>> >> >-> The idea here is that if we hit a misconfigured client, we >>>>>>> will expire >>>>>>> >> >the older entries. The concern here is we have risks to lose >>>>>>> idempotency >>>>>>> >> >guarantees, and currently, we don't have a way to notify clients >>>>>>> about >>>>>>> >> >losing idempotency guarantees. Besides, the least recently used >>>>>>> entries >>>>>>> >> >got removed are not always from the "bad" clients. >>>>>>> >> >>>>>>> >> - I have some concerns about the impact of this option on the >>>>>>> >> transactional >>>>>>> >> producers, for example, what will happen to an ongoing transaction >>>>>>> >> associated with an expired PID? Would this leave the transactions >>>>>>> in a >>>>>>> >> "hanging" state? >>>>>>> >> >>>>>>> >> - How will we notify the client that the transaction can't >>>>>>> continue due to >>>>>>> >> an expired PID? >>>>>>> >> >>>>>>> >> - If PID got marked as `expired` this will mean that >>>>>>> >> `admin.DescribeProducers` will not list them which will make >>>>>>> >> *`kafka-transactions.sh >>>>>>> >> --list`* a bit tricky as we can't identify if there are >>>>>>> transactions >>>>>>> >> linked >>>>>>> >> to this expired PID or not. The same concern applies to >>>>>>> >> *`kafka-transactions.sh >>>>>>> >> --find-hanging`*. >>>>>>> >> >>>>>>> >> >>>>>>> >> >5. limit/throttling the producer id based on the principle >>>>>>> >> >-> Although we can limit the impact to a certain principle with >>>>>>> this >>>>>>> >> idea, >>>>>>> >> >same concern still exists as solution #1 #2. >>>>>>> >> >>>>>>> >> I am assuming you mean KafkaPrincipal here! If so is your concern >>>>>>> here >>>>>>> >> that >>>>>>> >> those good clients that use the same principal as a rogue one >>>>>>> will get >>>>>>> >> throttled? >>>>>>> >> >>>>>>> >> If this is the case, then I believe it should be okay as other >>>>>>> throttling >>>>>>> >> in Kafka on *`/config/users/<user>`* has the same behaviour. >>>>>>> >> >>>>>>> >> >>>>>>> >> What about applying limit/throttling to >>>>>>> >> *`/config/users/<user>/clients/<client-id>` >>>>>>> >> *similar to what we have with client quota? This should reduce >>>>>>> the concern >>>>>>> >> about throttling good clients, right? >>>>>>> >> >>>>>>> >> best, >>>>>>> >> >>>>>>> >> Omnia >>>>>>> >> >>>>>>> >> On Tue, Oct 11, 2022 at 4:18 AM Luke Chen <show...@gmail.com> >>>>>>> wrote: >>>>>>> >> >>>>>>> >> > Bump this thread to see if there are any comments/thoughts. >>>>>>> >> > Thanks. >>>>>>> >> > >>>>>>> >> > Luke >>>>>>> >> > >>>>>>> >> > On Mon, Sep 26, 2022 at 11:06 AM Luke Chen <show...@gmail.com> >>>>>>> wrote: >>>>>>> >> > >>>>>>> >> > > Hi devs, >>>>>>> >> > > >>>>>>> >> > > As stated in the motivation section in KIP-854 >>>>>>> >> > > < >>>>>>> >> > >>>>>>> >> >>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-854+Separate+configuration+for+producer+ID+expiry >>>>>>> >> > >: >>>>>>> >> > > >>>>>>> >> > > With idempotent producers becoming the default in Kafka, this >>>>>>> means >>>>>>> >> that >>>>>>> >> > > unless otherwise specified, all new producers will be given >>>>>>> producer >>>>>>> >> IDs. >>>>>>> >> > > Some (inefficient) applications may now create many >>>>>>> non-transactional >>>>>>> >> > > idempotent producers. Each of these producers will be >>>>>>> assigned a >>>>>>> >> producer >>>>>>> >> > > ID and these IDs and their metadata are stored in the broker >>>>>>> memory, >>>>>>> >> > which >>>>>>> >> > > might cause brokers out of memory. >>>>>>> >> > > >>>>>>> >> > > Justine (in cc.) and I and some other team members are >>>>>>> working on the >>>>>>> >> > > solutions for this issue. But none of them solves it >>>>>>> completely >>>>>>> >> without >>>>>>> >> > > side effects. Among them, "availability" VS "idempotency >>>>>>> guarantees" >>>>>>> >> is >>>>>>> >> > > what we can't decide which to sacrifice. Some of these >>>>>>> solutions >>>>>>> >> > sacrifice >>>>>>> >> > > availability of produce (1,2,5) and others sacrifice >>>>>>> idempotency >>>>>>> >> > guarantees >>>>>>> >> > > (3). It could be useful to know if people generally have a >>>>>>> preference >>>>>>> >> one >>>>>>> >> > > way or the other. Or what other better solutions there might >>>>>>> be. >>>>>>> >> > > >>>>>>> >> > > Here are the proposals we came up with: >>>>>>> >> > > >>>>>>> >> > > 1. Limit the total active producer ID allocation number. >>>>>>> >> > > -> This is the simplest solution. But since the OOM issue is >>>>>>> usually >>>>>>> >> > > caused by a rogue or misconfigured client, and this solution >>>>>>> might >>>>>>> >> > "punish" >>>>>>> >> > > the good client from sending messages. >>>>>>> >> > > >>>>>>> >> > > 2. Throttling the producer ID allocation rate >>>>>>> >> > > -> Same concern as the solution #1. >>>>>>> >> > > >>>>>>> >> > > 3. Having a limit to the number of active producer IDs (sort >>>>>>> of like >>>>>>> >> an >>>>>>> >> > > LRU cache) >>>>>>> >> > > -> The idea here is that if we hit a misconfigured client, we >>>>>>> will >>>>>>> >> expire >>>>>>> >> > > the older entries. The concern here is we have risks to lose >>>>>>> >> idempotency >>>>>>> >> > > guarantees, and currently, we don't have a way to notify >>>>>>> clients about >>>>>>> >> > > losing idempotency guarantees. Besides, the least recently >>>>>>> used >>>>>>> >> entries >>>>>>> >> > > got removed are not always from the "bad" clients. >>>>>>> >> > > >>>>>>> >> > > 4. allow clients to "close" the producer ID usage >>>>>>> >> > > -> We can provide a way for producer to "close" producerID >>>>>>> usage. >>>>>>> >> > > Currently, we only have a way to INIT_PRODUCER_ID requested to >>>>>>> >> allocate >>>>>>> >> > > one. After that, we'll keep the producer ID metadata in >>>>>>> broker even if >>>>>>> >> > the >>>>>>> >> > > producer is "closed". Having a closed API (ex: >>>>>>> END_PRODUCER_ID), we >>>>>>> >> can >>>>>>> >> > > remove the entry from broker side. In client side, we can >>>>>>> send it when >>>>>>> >> > > producer closing. The concern is, the old clients (including >>>>>>> non-java >>>>>>> >> > > clients) will still suffer from the OOM issue. >>>>>>> >> > > >>>>>>> >> > > 5. limit/throttling the producer id based on the principle >>>>>>> >> > > -> Although we can limit the impact to a certain principle >>>>>>> with this >>>>>>> >> > idea, >>>>>>> >> > > same concern still exists as solution #1 #2. >>>>>>> >> > > >>>>>>> >> > > Any thoughts/feedback are welcomed. >>>>>>> >> > > >>>>>>> >> > > Thank you. >>>>>>> >> > > Luke >>>>>>> >> > > >>>>>>> >> > >>>>>>> >> >>>>>>> > >>>>>>> >>>>>>