I think "commitTransaction" should not throw CommitFailedException. Here admittedly we are overusing the term "commit" here, as we use it for two operations: committing the offsets (used for consumer, in either EOS or ALOS), and committing the transaction. The exception is meant for the former and would not be expected in `commitTransaction`.
Guozhang On Thu, Jun 2, 2022 at 5:45 AM Gabriel Giussi <gabrielgiu...@gmail.com> wrote: > "I think we may overlooked it in documentation to emphasize that, in case > 1), it should not expect ProducerFencedException. If so, we can fix the > javadoc." > > IMHO that would be nice, I'm reviewing an existing codebase where we were > only handling ProducerFencedException, because the javadoc and the method > signature is explicit only about that, and CommitFailedException is not > even referenced but falls under the general KafkaException. > I think this could happen in both sendOffsetsToTransaction and > commitTransaction right? > > Thanks. > > El mar, 31 may 2022 a las 14:49, Guozhang Wang (<wangg...@gmail.com>) > escribió: > > > The CommitFailedException should be expected, since the fencing happens > at > > the consumer coordinator. I.e. we can only fence the consumer-producer > pair > > by the consumer's generation, but we cannot do so since there's no other > > producer who has just grabbed the same txn.id and bumped the producer > > epoch. > > > > So to just clarify, when the zombie comes back, it could be fenced either > > when: > > > > 1) it tries to complete the ongoing transaction via `sendOffset`, in > which > > it would see the CommitFailedException. The caller is then responsible to > > handle the thrown exception that indicates being fenced. > > 2) it tries to heartbeat in the background thread, and got an > > InvalidGeneration error code, in which it would trigger the > > onPartitionsLost. The callback impl class is then responsible to handle > > that case which indicates being fenced. > > > > I think we may overlooked it in documentation to emphasize that, in case > > 1), it should not expect ProducerFencedException. If so, we can fix the > > javadoc. > > > > > > > > > > On Tue, May 31, 2022 at 8:26 AM Gabriel Giussi <gabrielgiu...@gmail.com> > > wrote: > > > > > But there is no guarantee that the onPartitionsLost callback will be > > called > > > before a zombie producer coming back to life tries to continue with the > > > transaction, e.g. sending offsets or committing, so I should handle the > > > exception first and I could directly create a new producer there > instead > > of > > > doing in the callback. > > > The curious part for me is that I was able to reproduce a case that > > > simulates a zombie producer that will try to send offsets after a > > rebalance > > > but instead of failing with a ProducerFencedException is failing with a > > > CommitFailedException with this message "Transaction offset Commit > failed > > > due to consumer group metadata mismatch: Specified group generation id > is > > > not valid.", which makes sense but is not even documented in the > > > KafkaProducer#sendOffsetsToTransaction. > > > Is this the expected behaviour or it should fail with a > > > ProducerFencedException when the generation.id is outdated? > > > The case I reproduced is like this > > > 1. Consumer A subscribes to topic X partitions 0 and 1, and starts a > > > producer with transactional.id = "tid.123" > > > 2. Consumes message from partition 1 and sends it to another thread to > be > > > consumed (so the poll thread is not blocked) > > > 3. Producer A begins a transaction, sends to output topic and gets > > blocked > > > (I'm using a lock here to simulate a long processing) before calling > > > sendOffsetsToTransaction > > > 4. Consumer B is created and gets assigned partition 1 (I'm using > > > CooperativeStickyAssignor) and creates a producer with > transactional.id > > = > > > "tid.456" > > > 5. Consumer B fetches the same message, processes it and commits the > > > transaction successfully > > > 6. Producer A calls sendOffsetsToTransaction (because the lock was > > > released) and fails with CommitFailedException > > > > > > This behaviour reflects what is described here > > > > > > > > > https://www.confluent.io/blog/simplified-robust-exactly-one-semantics-in-kafka-2-5/#client-api-simplification > > > , > > > but I was actually expecting a ProducerFencedException instead. Does > that > > > exception only correspond to fencing done by transactional.id? > > > > > > Thanks > > > > > > El mar, 24 may 2022 a las 20:30, Guozhang Wang (<wangg...@gmail.com>) > > > escribió: > > > > > > > No problem. > > > > > > > > The key is that at step 4, when the consumer re-joins it will be > aware > > > that > > > > it has lost its previously assigned partitions and will trigger > > > > `onPartitionsLost` on the rebalance callback. And since in your > > scenario > > > > it's a 1-1 mapping from consumer to producer, it means the producer > has > > > > been fenced and hence should be closed. > > > > > > > > So in that step 4, the old producer with Client A should be closed > > within > > > > the rebalance callback, and then one can create a new producer to > pair > > > with > > > > the re-joined consumer. > > > > > > > > On Tue, May 24, 2022 at 1:30 PM Gabriel Giussi < > > gabrielgiu...@gmail.com> > > > > wrote: > > > > > > > > > Last question, the fencing occurs with the sendOffsetsToTransaction > > > which > > > > > includes ConsumerGroupMetadata, I guess the generation.id is what > > > > matters > > > > > here since it is bumped with each rebalance. > > > > > But couldn't this happen? > > > > > 1. Client A consumes from topic partition P1 with generation.id = > 1 > > > and > > > > a > > > > > producer associated to it produces to some output topic but a long > GC > > > > pause > > > > > occurs before calling sendOffsetsToTransaction > > > > > 2. Client A gets out of sync and becomes a zombie due to session > > > timeout, > > > > > group rebalanced. > > > > > 3. Client B is assigned topic partition P1 with generation.id = 2, > > > calls > > > > > sendOffsetsToTransaction and commits the txn > > > > > 4. Client A is back online and joins again with generation.id = 3 > > > (this > > > > > happens in some internal thread) > > > > > 5. The thread that was about to call sendOffsetsToTransaction is > > > > scheduled > > > > > and calls sendOffsetsToTransaction with generation.id = 3 which is > > the > > > > > current one so it won't be fenced. > > > > > > > > > > I'm asking this because we are always asking the current > > > > > consumerGroupMetadata to the consumer object, not the one that was > > used > > > > to > > > > > consume the offsets, like this > > > > > producer.sendOffsetsToTransaction(consumedOffsets, > > > > > consumer.groupMetadata()); > > > > > > > > > > Couldn't this return a groupMetadata that has a valid > generation.id > > > even > > > > > when it is not the same at the moment of consuming the messages > that > > > are > > > > > about to be commited? > > > > > > > > > > I'm sure I'm missing something (probably in step 4) that makes this > > > not a > > > > > possible scenario, but I can't say what it is. > > > > > > > > > > Sorry if the question is too confusing. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > El mar, 24 may 2022 a las 12:49, Guozhang Wang (< > wangg...@gmail.com > > >) > > > > > escribió: > > > > > > > > > > > Hi Gabriel, > > > > > > > > > > > > What I meant is that with KIP-447, the fencing is achieved by the > > > time > > > > of > > > > > > committing with the consumer metadata. If within a transaction, > the > > > > > > producer would always try to commit at least once on behalf of > the > > > > > > consumer, AND a zombie of the producer would always come from a > > > zombie > > > > > of a > > > > > > consumer, then the transaction would be guaranteed to be fenced. > > But: > > > > > > > > > > > > 1) If within a transaction, there's no `sendOffset..` triggered, > > then > > > > > > fencing still need to be done by the txn coordinator, and txn.id > > > plays > > > > > the > > > > > > role here ---- I think this is not your scenario. > > > > > > 2) If a consumer may be "represented" by multiple producers, and > a > > > > zombie > > > > > > producer does not come from a zombie consumer, then we still need > > the > > > > > > fencing be done via the txn.id --- this is the scenario I'd like > > to > > > > > remind > > > > > > you about. For example, if two producers could be (mistakenly) > > > created > > > > > with > > > > > > different txn.ids and are paired with the same consumer, then the > > new > > > > API > > > > > > in KIP-447 would not fence one of them. > > > > > > > > > > > > Guozhang > > > > > > > > > > > > On Tue, May 24, 2022 at 5:50 AM Gabriel Giussi < > > > > gabrielgiu...@gmail.com> > > > > > > wrote: > > > > > > > > > > > > > Hello Guozhang, > > > > > > > > > > > > > > thanks for the response, I have some doubts about the "N-1 > > > > > > > producer-consumer" case you mentioned and why I may need to > > > configure > > > > > the > > > > > > > transactional id there and how. Is this a case of N consumers > > > sharing > > > > > the > > > > > > > same producer right? > > > > > > > > > > > > > > My current implementation is creating a consumer per topic (I > > don't > > > > > > > subscribe to multiple topics from the same consumer) and > > starting a > > > > > > > producer per consumer, so the relation is 1 consumer/topic => 1 > > > > > producer > > > > > > > and the transactional id is set as > > > > > > <consumer-group>-<topic>-<random-uuid>. > > > > > > > Do you see any problem with this configuration? > > > > > > > > > > > > > > Thanks again. > > > > > > > > > > > > > > El sáb, 21 may 2022 a las 16:37, Guozhang Wang (< > > > wangg...@gmail.com > > > > >) > > > > > > > escribió: > > > > > > > > > > > > > > > Hello Gabriel, > > > > > > > > > > > > > > > > What you're asking is a very fair question :) In fact, for > > > Streams > > > > > > where > > > > > > > > the partition-assignment to producer-consumer pairs are > purely > > > > > > flexible, > > > > > > > we > > > > > > > > think the new EOS would not have hard requirement on > > > > > transactional.id: > > > > > > > > https://issues.apache.org/jira/browse/KAFKA-9453 > > > > > > > > > > > > > > > > I you implemented the transactional messaging via a DIY > > > > > > producer+consumer > > > > > > > > though, it depends on how you'd expect the life-time of a > > > producer, > > > > > > e.g. > > > > > > > if > > > > > > > > you do not have a 1-1 producer-consumer mapping then > > > > > transactional.id > > > > > > is > > > > > > > > not crucial, but if your have a N-1 producer-consumer mapping > > > then > > > > > you > > > > > > > may > > > > > > > > still need to configure that id. > > > > > > > > > > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, May 20, 2022 at 8:39 AM Gabriel Giussi < > > > > > > gabrielgiu...@gmail.com> > > > > > > > > wrote: > > > > > > > > > > > > > > > > > Before KIP-447 I understood the use of transactional.id to > > > > prevent > > > > > > us > > > > > > > > from > > > > > > > > > zombies introducing duplicates, as explained in this talk > > > > > > > > > https://youtu.be/j0l_zUhQaTc?t=822. > > > > > > > > > So in order to get zombie fencing working correctly we > should > > > > > assign > > > > > > > > > producers with a transactional.id that included the > > partition > > > > id, > > > > > > > > > something > > > > > > > > > like <application><topic>-<partition-id>, as shown in this > > > slide > > > > > > > > > https://youtu.be/j0l_zUhQaTc?t=1047 where processor 2 > should > > > use > > > > > the > > > > > > > > same > > > > > > > > > txnl.id A as the process 1 that crashed. > > > > > > > > > This prevented us from having process 2 consuming the > message > > > > again > > > > > > and > > > > > > > > > committing, while process 1 could come back to life and > also > > > > commit > > > > > > the > > > > > > > > > pending transaction, hence having duplicates message being > > > > > produced. > > > > > > In > > > > > > > > > this case process 1 will be fenced by having an outdated > > epoch. > > > > > > > > > > > > > > > > > > With KIP-447 we no longer have that potential scenario of > two > > > > > pending > > > > > > > > > transactions trying to produce and mark a message as > > committed, > > > > > > because > > > > > > > > we > > > > > > > > > won't let process 2 even start the transaction if there is > a > > > > > pending > > > > > > > one > > > > > > > > > (basically by not returning any messages since we reject > the > > > > Offset > > > > > > > Fetch > > > > > > > > > if a there is a pending transaction for that offset > > partition). > > > > > This > > > > > > is > > > > > > > > > explained in this post > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://www.confluent.io/blog/simplified-robust-exactly-one-semantics-in-kafka-2-5/#client-api-simplification > > > > > > > > > > > > > > > > > > Having that, I don't see anymore the value of > > transactional.id > > > > or > > > > > > how > > > > > > > I > > > > > > > > > should configure it in my producers. The main benefit of > > > KIP-447 > > > > is > > > > > > > that > > > > > > > > we > > > > > > > > > no longer have to start one producer per input partition, a > > > quote > > > > > > from > > > > > > > > the > > > > > > > > > post > > > > > > > > > "The only way the static assignment requirement could be > met > > is > > > > if > > > > > > each > > > > > > > > > input partition uses a separate producer instance, which is > > in > > > > fact > > > > > > > what > > > > > > > > > Kafka Streams previously relied on. However, this made > > running > > > > EOS > > > > > > > > > applications much more costly in terms of the client > > resources > > > > and > > > > > > load > > > > > > > > on > > > > > > > > > the brokers. A large number of client connections could > > heavily > > > > > > impact > > > > > > > > the > > > > > > > > > stability of brokers and become a waste of resources as > > well." > > > > > > > > > > > > > > > > > > I guess now I can reuse my producer between different input > > > > > > partitions, > > > > > > > > so > > > > > > > > > what transactional.id should I assign to it and why > should I > > > > care, > > > > > > > isn't > > > > > > > > > zombie fencing resolved by rejecting offset fetch already? > > > > > > > > > > > > > > > > > > Thanks. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > > > -- > > > > -- Guozhang > > > > > > > > > > > > > -- > > -- Guozhang > > > -- -- Guozhang