No problem. The key is that at step 4, when the consumer re-joins it will be aware that it has lost its previously assigned partitions and will trigger `onPartitionsLost` on the rebalance callback. And since in your scenario it's a 1-1 mapping from consumer to producer, it means the producer has been fenced and hence should be closed.
So in that step 4, the old producer with Client A should be closed within the rebalance callback, and then one can create a new producer to pair with the re-joined consumer. On Tue, May 24, 2022 at 1:30 PM Gabriel Giussi <gabrielgiu...@gmail.com> wrote: > Last question, the fencing occurs with the sendOffsetsToTransaction which > includes ConsumerGroupMetadata, I guess the generation.id is what matters > here since it is bumped with each rebalance. > But couldn't this happen? > 1. Client A consumes from topic partition P1 with generation.id = 1 and a > producer associated to it produces to some output topic but a long GC pause > occurs before calling sendOffsetsToTransaction > 2. Client A gets out of sync and becomes a zombie due to session timeout, > group rebalanced. > 3. Client B is assigned topic partition P1 with generation.id = 2, calls > sendOffsetsToTransaction and commits the txn > 4. Client A is back online and joins again with generation.id = 3 (this > happens in some internal thread) > 5. The thread that was about to call sendOffsetsToTransaction is scheduled > and calls sendOffsetsToTransaction with generation.id = 3 which is the > current one so it won't be fenced. > > I'm asking this because we are always asking the current > consumerGroupMetadata to the consumer object, not the one that was used to > consume the offsets, like this > producer.sendOffsetsToTransaction(consumedOffsets, > consumer.groupMetadata()); > > Couldn't this return a groupMetadata that has a valid generation.id even > when it is not the same at the moment of consuming the messages that are > about to be commited? > > I'm sure I'm missing something (probably in step 4) that makes this not a > possible scenario, but I can't say what it is. > > Sorry if the question is too confusing. > > > > > > > El mar, 24 may 2022 a las 12:49, Guozhang Wang (<wangg...@gmail.com>) > escribió: > > > Hi Gabriel, > > > > What I meant is that with KIP-447, the fencing is achieved by the time of > > committing with the consumer metadata. If within a transaction, the > > producer would always try to commit at least once on behalf of the > > consumer, AND a zombie of the producer would always come from a zombie > of a > > consumer, then the transaction would be guaranteed to be fenced. But: > > > > 1) If within a transaction, there's no `sendOffset..` triggered, then > > fencing still need to be done by the txn coordinator, and txn.id plays > the > > role here ---- I think this is not your scenario. > > 2) If a consumer may be "represented" by multiple producers, and a zombie > > producer does not come from a zombie consumer, then we still need the > > fencing be done via the txn.id --- this is the scenario I'd like to > remind > > you about. For example, if two producers could be (mistakenly) created > with > > different txn.ids and are paired with the same consumer, then the new API > > in KIP-447 would not fence one of them. > > > > Guozhang > > > > On Tue, May 24, 2022 at 5:50 AM Gabriel Giussi <gabrielgiu...@gmail.com> > > wrote: > > > > > Hello Guozhang, > > > > > > thanks for the response, I have some doubts about the "N-1 > > > producer-consumer" case you mentioned and why I may need to configure > the > > > transactional id there and how. Is this a case of N consumers sharing > the > > > same producer right? > > > > > > My current implementation is creating a consumer per topic (I don't > > > subscribe to multiple topics from the same consumer) and starting a > > > producer per consumer, so the relation is 1 consumer/topic => 1 > producer > > > and the transactional id is set as > > <consumer-group>-<topic>-<random-uuid>. > > > Do you see any problem with this configuration? > > > > > > Thanks again. > > > > > > El sáb, 21 may 2022 a las 16:37, Guozhang Wang (<wangg...@gmail.com>) > > > escribió: > > > > > > > Hello Gabriel, > > > > > > > > What you're asking is a very fair question :) In fact, for Streams > > where > > > > the partition-assignment to producer-consumer pairs are purely > > flexible, > > > we > > > > think the new EOS would not have hard requirement on > transactional.id: > > > > https://issues.apache.org/jira/browse/KAFKA-9453 > > > > > > > > I you implemented the transactional messaging via a DIY > > producer+consumer > > > > though, it depends on how you'd expect the life-time of a producer, > > e.g. > > > if > > > > you do not have a 1-1 producer-consumer mapping then > transactional.id > > is > > > > not crucial, but if your have a N-1 producer-consumer mapping then > you > > > may > > > > still need to configure that id. > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > On Fri, May 20, 2022 at 8:39 AM Gabriel Giussi < > > gabrielgiu...@gmail.com> > > > > wrote: > > > > > > > > > Before KIP-447 I understood the use of transactional.id to prevent > > us > > > > from > > > > > zombies introducing duplicates, as explained in this talk > > > > > https://youtu.be/j0l_zUhQaTc?t=822. > > > > > So in order to get zombie fencing working correctly we should > assign > > > > > producers with a transactional.id that included the partition id, > > > > > something > > > > > like <application><topic>-<partition-id>, as shown in this slide > > > > > https://youtu.be/j0l_zUhQaTc?t=1047 where processor 2 should use > the > > > > same > > > > > txnl.id A as the process 1 that crashed. > > > > > This prevented us from having process 2 consuming the message again > > and > > > > > committing, while process 1 could come back to life and also commit > > the > > > > > pending transaction, hence having duplicates message being > produced. > > In > > > > > this case process 1 will be fenced by having an outdated epoch. > > > > > > > > > > With KIP-447 we no longer have that potential scenario of two > pending > > > > > transactions trying to produce and mark a message as committed, > > because > > > > we > > > > > won't let process 2 even start the transaction if there is a > pending > > > one > > > > > (basically by not returning any messages since we reject the Offset > > > Fetch > > > > > if a there is a pending transaction for that offset partition). > This > > is > > > > > explained in this post > > > > > > > > > > > > > > > > > > > > https://www.confluent.io/blog/simplified-robust-exactly-one-semantics-in-kafka-2-5/#client-api-simplification > > > > > > > > > > Having that, I don't see anymore the value of transactional.id or > > how > > > I > > > > > should configure it in my producers. The main benefit of KIP-447 is > > > that > > > > we > > > > > no longer have to start one producer per input partition, a quote > > from > > > > the > > > > > post > > > > > "The only way the static assignment requirement could be met is if > > each > > > > > input partition uses a separate producer instance, which is in fact > > > what > > > > > Kafka Streams previously relied on. However, this made running EOS > > > > > applications much more costly in terms of the client resources and > > load > > > > on > > > > > the brokers. A large number of client connections could heavily > > impact > > > > the > > > > > stability of brokers and become a waste of resources as well." > > > > > > > > > > I guess now I can reuse my producer between different input > > partitions, > > > > so > > > > > what transactional.id should I assign to it and why should I care, > > > isn't > > > > > zombie fencing resolved by rejecting offset fetch already? > > > > > > > > > > Thanks. > > > > > > > > > > > > > > > > > -- > > > > -- Guozhang > > > > > > > > > > > > > -- > > -- Guozhang > > > -- -- Guozhang