But there is no guarantee that the onPartitionsLost callback will be called before a zombie producer coming back to life tries to continue with the transaction, e.g. sending offsets or committing, so I should handle the exception first and I could directly create a new producer there instead of doing in the callback. The curious part for me is that I was able to reproduce a case that simulates a zombie producer that will try to send offsets after a rebalance but instead of failing with a ProducerFencedException is failing with a CommitFailedException with this message "Transaction offset Commit failed due to consumer group metadata mismatch: Specified group generation id is not valid.", which makes sense but is not even documented in the KafkaProducer#sendOffsetsToTransaction. Is this the expected behaviour or it should fail with a ProducerFencedException when the generation.id is outdated? The case I reproduced is like this 1. Consumer A subscribes to topic X partitions 0 and 1, and starts a producer with transactional.id = "tid.123" 2. Consumes message from partition 1 and sends it to another thread to be consumed (so the poll thread is not blocked) 3. Producer A begins a transaction, sends to output topic and gets blocked (I'm using a lock here to simulate a long processing) before calling sendOffsetsToTransaction 4. Consumer B is created and gets assigned partition 1 (I'm using CooperativeStickyAssignor) and creates a producer with transactional.id = "tid.456" 5. Consumer B fetches the same message, processes it and commits the transaction successfully 6. Producer A calls sendOffsetsToTransaction (because the lock was released) and fails with CommitFailedException
This behaviour reflects what is described here https://www.confluent.io/blog/simplified-robust-exactly-one-semantics-in-kafka-2-5/#client-api-simplification, but I was actually expecting a ProducerFencedException instead. Does that exception only correspond to fencing done by transactional.id? Thanks El mar, 24 may 2022 a las 20:30, Guozhang Wang (<wangg...@gmail.com>) escribió: > No problem. > > The key is that at step 4, when the consumer re-joins it will be aware that > it has lost its previously assigned partitions and will trigger > `onPartitionsLost` on the rebalance callback. And since in your scenario > it's a 1-1 mapping from consumer to producer, it means the producer has > been fenced and hence should be closed. > > So in that step 4, the old producer with Client A should be closed within > the rebalance callback, and then one can create a new producer to pair with > the re-joined consumer. > > On Tue, May 24, 2022 at 1:30 PM Gabriel Giussi <gabrielgiu...@gmail.com> > wrote: > > > Last question, the fencing occurs with the sendOffsetsToTransaction which > > includes ConsumerGroupMetadata, I guess the generation.id is what > matters > > here since it is bumped with each rebalance. > > But couldn't this happen? > > 1. Client A consumes from topic partition P1 with generation.id = 1 and > a > > producer associated to it produces to some output topic but a long GC > pause > > occurs before calling sendOffsetsToTransaction > > 2. Client A gets out of sync and becomes a zombie due to session timeout, > > group rebalanced. > > 3. Client B is assigned topic partition P1 with generation.id = 2, calls > > sendOffsetsToTransaction and commits the txn > > 4. Client A is back online and joins again with generation.id = 3 (this > > happens in some internal thread) > > 5. The thread that was about to call sendOffsetsToTransaction is > scheduled > > and calls sendOffsetsToTransaction with generation.id = 3 which is the > > current one so it won't be fenced. > > > > I'm asking this because we are always asking the current > > consumerGroupMetadata to the consumer object, not the one that was used > to > > consume the offsets, like this > > producer.sendOffsetsToTransaction(consumedOffsets, > > consumer.groupMetadata()); > > > > Couldn't this return a groupMetadata that has a valid generation.id even > > when it is not the same at the moment of consuming the messages that are > > about to be commited? > > > > I'm sure I'm missing something (probably in step 4) that makes this not a > > possible scenario, but I can't say what it is. > > > > Sorry if the question is too confusing. > > > > > > > > > > > > > > El mar, 24 may 2022 a las 12:49, Guozhang Wang (<wangg...@gmail.com>) > > escribió: > > > > > Hi Gabriel, > > > > > > What I meant is that with KIP-447, the fencing is achieved by the time > of > > > committing with the consumer metadata. If within a transaction, the > > > producer would always try to commit at least once on behalf of the > > > consumer, AND a zombie of the producer would always come from a zombie > > of a > > > consumer, then the transaction would be guaranteed to be fenced. But: > > > > > > 1) If within a transaction, there's no `sendOffset..` triggered, then > > > fencing still need to be done by the txn coordinator, and txn.id plays > > the > > > role here ---- I think this is not your scenario. > > > 2) If a consumer may be "represented" by multiple producers, and a > zombie > > > producer does not come from a zombie consumer, then we still need the > > > fencing be done via the txn.id --- this is the scenario I'd like to > > remind > > > you about. For example, if two producers could be (mistakenly) created > > with > > > different txn.ids and are paired with the same consumer, then the new > API > > > in KIP-447 would not fence one of them. > > > > > > Guozhang > > > > > > On Tue, May 24, 2022 at 5:50 AM Gabriel Giussi < > gabrielgiu...@gmail.com> > > > wrote: > > > > > > > Hello Guozhang, > > > > > > > > thanks for the response, I have some doubts about the "N-1 > > > > producer-consumer" case you mentioned and why I may need to configure > > the > > > > transactional id there and how. Is this a case of N consumers sharing > > the > > > > same producer right? > > > > > > > > My current implementation is creating a consumer per topic (I don't > > > > subscribe to multiple topics from the same consumer) and starting a > > > > producer per consumer, so the relation is 1 consumer/topic => 1 > > producer > > > > and the transactional id is set as > > > <consumer-group>-<topic>-<random-uuid>. > > > > Do you see any problem with this configuration? > > > > > > > > Thanks again. > > > > > > > > El sáb, 21 may 2022 a las 16:37, Guozhang Wang (<wangg...@gmail.com > >) > > > > escribió: > > > > > > > > > Hello Gabriel, > > > > > > > > > > What you're asking is a very fair question :) In fact, for Streams > > > where > > > > > the partition-assignment to producer-consumer pairs are purely > > > flexible, > > > > we > > > > > think the new EOS would not have hard requirement on > > transactional.id: > > > > > https://issues.apache.org/jira/browse/KAFKA-9453 > > > > > > > > > > I you implemented the transactional messaging via a DIY > > > producer+consumer > > > > > though, it depends on how you'd expect the life-time of a producer, > > > e.g. > > > > if > > > > > you do not have a 1-1 producer-consumer mapping then > > transactional.id > > > is > > > > > not crucial, but if your have a N-1 producer-consumer mapping then > > you > > > > may > > > > > still need to configure that id. > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > > > > > On Fri, May 20, 2022 at 8:39 AM Gabriel Giussi < > > > gabrielgiu...@gmail.com> > > > > > wrote: > > > > > > > > > > > Before KIP-447 I understood the use of transactional.id to > prevent > > > us > > > > > from > > > > > > zombies introducing duplicates, as explained in this talk > > > > > > https://youtu.be/j0l_zUhQaTc?t=822. > > > > > > So in order to get zombie fencing working correctly we should > > assign > > > > > > producers with a transactional.id that included the partition > id, > > > > > > something > > > > > > like <application><topic>-<partition-id>, as shown in this slide > > > > > > https://youtu.be/j0l_zUhQaTc?t=1047 where processor 2 should use > > the > > > > > same > > > > > > txnl.id A as the process 1 that crashed. > > > > > > This prevented us from having process 2 consuming the message > again > > > and > > > > > > committing, while process 1 could come back to life and also > commit > > > the > > > > > > pending transaction, hence having duplicates message being > > produced. > > > In > > > > > > this case process 1 will be fenced by having an outdated epoch. > > > > > > > > > > > > With KIP-447 we no longer have that potential scenario of two > > pending > > > > > > transactions trying to produce and mark a message as committed, > > > because > > > > > we > > > > > > won't let process 2 even start the transaction if there is a > > pending > > > > one > > > > > > (basically by not returning any messages since we reject the > Offset > > > > Fetch > > > > > > if a there is a pending transaction for that offset partition). > > This > > > is > > > > > > explained in this post > > > > > > > > > > > > > > > > > > > > > > > > > > > https://www.confluent.io/blog/simplified-robust-exactly-one-semantics-in-kafka-2-5/#client-api-simplification > > > > > > > > > > > > Having that, I don't see anymore the value of transactional.id > or > > > how > > > > I > > > > > > should configure it in my producers. The main benefit of KIP-447 > is > > > > that > > > > > we > > > > > > no longer have to start one producer per input partition, a quote > > > from > > > > > the > > > > > > post > > > > > > "The only way the static assignment requirement could be met is > if > > > each > > > > > > input partition uses a separate producer instance, which is in > fact > > > > what > > > > > > Kafka Streams previously relied on. However, this made running > EOS > > > > > > applications much more costly in terms of the client resources > and > > > load > > > > > on > > > > > > the brokers. A large number of client connections could heavily > > > impact > > > > > the > > > > > > stability of brokers and become a waste of resources as well." > > > > > > > > > > > > I guess now I can reuse my producer between different input > > > partitions, > > > > > so > > > > > > what transactional.id should I assign to it and why should I > care, > > > > isn't > > > > > > zombie fencing resolved by rejecting offset fetch already? > > > > > > > > > > > > Thanks. > > > > > > > > > > > > > > > > > > > > > -- > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > -- > > > -- Guozhang > > > > > > > > -- > -- Guozhang >