Last question, the fencing occurs with the sendOffsetsToTransaction which includes ConsumerGroupMetadata, I guess the generation.id is what matters here since it is bumped with each rebalance. But couldn't this happen? 1. Client A consumes from topic partition P1 with generation.id = 1 and a producer associated to it produces to some output topic but a long GC pause occurs before calling sendOffsetsToTransaction 2. Client A gets out of sync and becomes a zombie due to session timeout, group rebalanced. 3. Client B is assigned topic partition P1 with generation.id = 2, calls sendOffsetsToTransaction and commits the txn 4. Client A is back online and joins again with generation.id = 3 (this happens in some internal thread) 5. The thread that was about to call sendOffsetsToTransaction is scheduled and calls sendOffsetsToTransaction with generation.id = 3 which is the current one so it won't be fenced.
I'm asking this because we are always asking the current consumerGroupMetadata to the consumer object, not the one that was used to consume the offsets, like this producer.sendOffsetsToTransaction(consumedOffsets, consumer.groupMetadata()); Couldn't this return a groupMetadata that has a valid generation.id even when it is not the same at the moment of consuming the messages that are about to be commited? I'm sure I'm missing something (probably in step 4) that makes this not a possible scenario, but I can't say what it is. Sorry if the question is too confusing. El mar, 24 may 2022 a las 12:49, Guozhang Wang (<wangg...@gmail.com>) escribió: > Hi Gabriel, > > What I meant is that with KIP-447, the fencing is achieved by the time of > committing with the consumer metadata. If within a transaction, the > producer would always try to commit at least once on behalf of the > consumer, AND a zombie of the producer would always come from a zombie of a > consumer, then the transaction would be guaranteed to be fenced. But: > > 1) If within a transaction, there's no `sendOffset..` triggered, then > fencing still need to be done by the txn coordinator, and txn.id plays the > role here ---- I think this is not your scenario. > 2) If a consumer may be "represented" by multiple producers, and a zombie > producer does not come from a zombie consumer, then we still need the > fencing be done via the txn.id --- this is the scenario I'd like to remind > you about. For example, if two producers could be (mistakenly) created with > different txn.ids and are paired with the same consumer, then the new API > in KIP-447 would not fence one of them. > > Guozhang > > On Tue, May 24, 2022 at 5:50 AM Gabriel Giussi <gabrielgiu...@gmail.com> > wrote: > > > Hello Guozhang, > > > > thanks for the response, I have some doubts about the "N-1 > > producer-consumer" case you mentioned and why I may need to configure the > > transactional id there and how. Is this a case of N consumers sharing the > > same producer right? > > > > My current implementation is creating a consumer per topic (I don't > > subscribe to multiple topics from the same consumer) and starting a > > producer per consumer, so the relation is 1 consumer/topic => 1 producer > > and the transactional id is set as > <consumer-group>-<topic>-<random-uuid>. > > Do you see any problem with this configuration? > > > > Thanks again. > > > > El sáb, 21 may 2022 a las 16:37, Guozhang Wang (<wangg...@gmail.com>) > > escribió: > > > > > Hello Gabriel, > > > > > > What you're asking is a very fair question :) In fact, for Streams > where > > > the partition-assignment to producer-consumer pairs are purely > flexible, > > we > > > think the new EOS would not have hard requirement on transactional.id: > > > https://issues.apache.org/jira/browse/KAFKA-9453 > > > > > > I you implemented the transactional messaging via a DIY > producer+consumer > > > though, it depends on how you'd expect the life-time of a producer, > e.g. > > if > > > you do not have a 1-1 producer-consumer mapping then transactional.id > is > > > not crucial, but if your have a N-1 producer-consumer mapping then you > > may > > > still need to configure that id. > > > > > > > > > Guozhang > > > > > > > > > > > > On Fri, May 20, 2022 at 8:39 AM Gabriel Giussi < > gabrielgiu...@gmail.com> > > > wrote: > > > > > > > Before KIP-447 I understood the use of transactional.id to prevent > us > > > from > > > > zombies introducing duplicates, as explained in this talk > > > > https://youtu.be/j0l_zUhQaTc?t=822. > > > > So in order to get zombie fencing working correctly we should assign > > > > producers with a transactional.id that included the partition id, > > > > something > > > > like <application><topic>-<partition-id>, as shown in this slide > > > > https://youtu.be/j0l_zUhQaTc?t=1047 where processor 2 should use the > > > same > > > > txnl.id A as the process 1 that crashed. > > > > This prevented us from having process 2 consuming the message again > and > > > > committing, while process 1 could come back to life and also commit > the > > > > pending transaction, hence having duplicates message being produced. > In > > > > this case process 1 will be fenced by having an outdated epoch. > > > > > > > > With KIP-447 we no longer have that potential scenario of two pending > > > > transactions trying to produce and mark a message as committed, > because > > > we > > > > won't let process 2 even start the transaction if there is a pending > > one > > > > (basically by not returning any messages since we reject the Offset > > Fetch > > > > if a there is a pending transaction for that offset partition). This > is > > > > explained in this post > > > > > > > > > > > > > > https://www.confluent.io/blog/simplified-robust-exactly-one-semantics-in-kafka-2-5/#client-api-simplification > > > > > > > > Having that, I don't see anymore the value of transactional.id or > how > > I > > > > should configure it in my producers. The main benefit of KIP-447 is > > that > > > we > > > > no longer have to start one producer per input partition, a quote > from > > > the > > > > post > > > > "The only way the static assignment requirement could be met is if > each > > > > input partition uses a separate producer instance, which is in fact > > what > > > > Kafka Streams previously relied on. However, this made running EOS > > > > applications much more costly in terms of the client resources and > load > > > on > > > > the brokers. A large number of client connections could heavily > impact > > > the > > > > stability of brokers and become a waste of resources as well." > > > > > > > > I guess now I can reuse my producer between different input > partitions, > > > so > > > > what transactional.id should I assign to it and why should I care, > > isn't > > > > zombie fencing resolved by rejecting offset fetch already? > > > > > > > > Thanks. > > > > > > > > > > > > > -- > > > -- Guozhang > > > > > > > > -- > -- Guozhang >