I think "commitTransaction" should not throw CommitFailedException. Here
admittedly we are overusing the term "commit" here, as we use it for two
operations: committing the offsets (used for consumer, in either EOS or
ALOS), and committing the transaction. The exception is meant for the
former and would not be expected in `commitTransaction`.


Guozhang

On Thu, Jun 2, 2022 at 5:45 AM Gabriel Giussi <gabrielgiu...@gmail.com>
wrote:

> "I think we may overlooked it in documentation to emphasize that, in case
> 1), it should not expect ProducerFencedException. If so, we can fix the
> javadoc."
>
> IMHO that would be nice, I'm reviewing an existing codebase where we were
> only handling ProducerFencedException, because the javadoc and the method
> signature is explicit only about that, and CommitFailedException is not
> even referenced but falls under the general KafkaException.
> I think this could happen in both sendOffsetsToTransaction and
> commitTransaction right?
>
> Thanks.
>
> El mar, 31 may 2022 a las 14:49, Guozhang Wang (<wangg...@gmail.com>)
> escribió:
>
> > The CommitFailedException should be expected, since the fencing happens
> at
> > the consumer coordinator. I.e. we can only fence the consumer-producer
> pair
> > by the consumer's generation, but we cannot do so since there's no other
> > producer who has just grabbed the same txn.id and bumped the producer
> > epoch.
> >
> > So to just clarify, when the zombie comes back, it could be fenced either
> > when:
> >
> > 1) it tries to complete the ongoing transaction via `sendOffset`, in
> which
> > it would see the CommitFailedException. The caller is then responsible to
> > handle the thrown exception that indicates being fenced.
> > 2) it tries to heartbeat in the background thread, and got an
> > InvalidGeneration error code, in which it would trigger the
> > onPartitionsLost. The callback impl class is then responsible to handle
> > that case which indicates being fenced.
> >
> > I think we may overlooked it in documentation to emphasize that, in case
> > 1), it should not expect ProducerFencedException. If so, we can fix the
> > javadoc.
> >
> >
> >
> >
> > On Tue, May 31, 2022 at 8:26 AM Gabriel Giussi <gabrielgiu...@gmail.com>
> > wrote:
> >
> > > But there is no guarantee that the onPartitionsLost callback will be
> > called
> > > before a zombie producer coming back to life tries to continue with the
> > > transaction, e.g. sending offsets or committing, so I should handle the
> > > exception first and I could directly create a new producer there
> instead
> > of
> > > doing in the callback.
> > > The curious part for me is that I was able to reproduce a case that
> > > simulates a zombie producer that will try to send offsets after a
> > rebalance
> > > but instead of failing with a ProducerFencedException is failing with a
> > > CommitFailedException with this message "Transaction offset Commit
> failed
> > > due to consumer group metadata mismatch: Specified group generation id
> is
> > > not valid.", which makes sense but is not even documented in the
> > > KafkaProducer#sendOffsetsToTransaction.
> > > Is this the expected behaviour or it should fail with a
> > > ProducerFencedException when the generation.id is outdated?
> > > The case I reproduced is like this
> > > 1. Consumer A subscribes to topic X partitions 0 and 1, and starts a
> > > producer with transactional.id = "tid.123"
> > > 2. Consumes message from partition 1 and sends it to another thread to
> be
> > > consumed (so the poll thread is not blocked)
> > > 3. Producer A begins a transaction, sends to output topic and gets
> > blocked
> > > (I'm using a lock here to simulate a long processing) before calling
> > > sendOffsetsToTransaction
> > > 4. Consumer B is created and gets assigned partition 1 (I'm using
> > > CooperativeStickyAssignor) and creates a producer with
> transactional.id
> > =
> > > "tid.456"
> > > 5. Consumer B fetches the same message, processes it and commits the
> > > transaction successfully
> > > 6. Producer A calls sendOffsetsToTransaction (because the lock was
> > > released) and fails with CommitFailedException
> > >
> > > This behaviour reflects what is described here
> > >
> > >
> >
> https://www.confluent.io/blog/simplified-robust-exactly-one-semantics-in-kafka-2-5/#client-api-simplification
> > > ,
> > > but I was actually expecting a ProducerFencedException instead. Does
> that
> > > exception only correspond to fencing done by transactional.id?
> > >
> > > Thanks
> > >
> > > El mar, 24 may 2022 a las 20:30, Guozhang Wang (<wangg...@gmail.com>)
> > > escribió:
> > >
> > > > No problem.
> > > >
> > > > The key is that at step 4, when the consumer re-joins it will be
> aware
> > > that
> > > > it has lost its previously assigned partitions and will trigger
> > > > `onPartitionsLost` on the rebalance callback. And since in your
> > scenario
> > > > it's a 1-1 mapping from consumer to producer, it means the producer
> has
> > > > been fenced and hence should be closed.
> > > >
> > > > So in that step 4, the old producer with Client A should be closed
> > within
> > > > the rebalance callback, and then one can create a new producer to
> pair
> > > with
> > > > the re-joined consumer.
> > > >
> > > > On Tue, May 24, 2022 at 1:30 PM Gabriel Giussi <
> > gabrielgiu...@gmail.com>
> > > > wrote:
> > > >
> > > > > Last question, the fencing occurs with the sendOffsetsToTransaction
> > > which
> > > > > includes ConsumerGroupMetadata, I guess the generation.id is what
> > > > matters
> > > > > here since it is bumped with each rebalance.
> > > > > But couldn't this happen?
> > > > > 1. Client A consumes from topic partition P1 with generation.id =
> 1
> > > and
> > > > a
> > > > > producer associated to it produces to some output topic but a long
> GC
> > > > pause
> > > > > occurs before calling sendOffsetsToTransaction
> > > > > 2. Client A gets out of sync and becomes a zombie due to session
> > > timeout,
> > > > > group rebalanced.
> > > > > 3. Client B is assigned topic partition P1 with generation.id = 2,
> > > calls
> > > > > sendOffsetsToTransaction and commits the txn
> > > > > 4. Client A is back online and joins again with generation.id = 3
> > > (this
> > > > > happens in some internal thread)
> > > > > 5. The thread that was about to call sendOffsetsToTransaction is
> > > > scheduled
> > > > > and calls sendOffsetsToTransaction with generation.id = 3 which is
> > the
> > > > > current one so it won't be fenced.
> > > > >
> > > > > I'm asking this because we are always asking the current
> > > > > consumerGroupMetadata to the consumer object, not the one that was
> > used
> > > > to
> > > > > consume the offsets, like this
> > > > > producer.sendOffsetsToTransaction(consumedOffsets,
> > > > > consumer.groupMetadata());
> > > > >
> > > > > Couldn't this return a groupMetadata that has a valid
> generation.id
> > > even
> > > > > when it is not the same at the moment of consuming the messages
> that
> > > are
> > > > > about to be commited?
> > > > >
> > > > > I'm sure I'm missing something (probably in step 4) that makes this
> > > not a
> > > > > possible scenario, but I can't say what it is.
> > > > >
> > > > > Sorry if the question is too confusing.
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > El mar, 24 may 2022 a las 12:49, Guozhang Wang (<
> wangg...@gmail.com
> > >)
> > > > > escribió:
> > > > >
> > > > > > Hi Gabriel,
> > > > > >
> > > > > > What I meant is that with KIP-447, the fencing is achieved by the
> > > time
> > > > of
> > > > > > committing with the consumer metadata. If within a transaction,
> the
> > > > > > producer would always try to commit at least once on behalf of
> the
> > > > > > consumer, AND a zombie of the producer would always come from a
> > > zombie
> > > > > of a
> > > > > > consumer, then the transaction would be guaranteed to be fenced.
> > But:
> > > > > >
> > > > > > 1) If within a transaction, there's no `sendOffset..` triggered,
> > then
> > > > > > fencing still need to be done by the txn coordinator, and txn.id
> > > plays
> > > > > the
> > > > > > role here ---- I think this is not your scenario.
> > > > > > 2) If a consumer may be "represented" by multiple producers, and
> a
> > > > zombie
> > > > > > producer does not come from a zombie consumer, then we still need
> > the
> > > > > > fencing be done via the txn.id --- this is the scenario I'd like
> > to
> > > > > remind
> > > > > > you about. For example, if two producers could be (mistakenly)
> > > created
> > > > > with
> > > > > > different txn.ids and are paired with the same consumer, then the
> > new
> > > > API
> > > > > > in KIP-447 would not fence one of them.
> > > > > >
> > > > > > Guozhang
> > > > > >
> > > > > > On Tue, May 24, 2022 at 5:50 AM Gabriel Giussi <
> > > > gabrielgiu...@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Hello Guozhang,
> > > > > > >
> > > > > > > thanks for the response, I have some doubts about the "N-1
> > > > > > > producer-consumer" case you mentioned and why I may need to
> > > configure
> > > > > the
> > > > > > > transactional id there and how. Is this a case of N consumers
> > > sharing
> > > > > the
> > > > > > > same producer right?
> > > > > > >
> > > > > > > My current implementation is creating a consumer per topic (I
> > don't
> > > > > > > subscribe to multiple topics from the same consumer) and
> > starting a
> > > > > > > producer per consumer, so the relation is 1 consumer/topic => 1
> > > > > producer
> > > > > > > and the transactional id is set as
> > > > > > <consumer-group>-<topic>-<random-uuid>.
> > > > > > > Do you see any problem with this configuration?
> > > > > > >
> > > > > > > Thanks again.
> > > > > > >
> > > > > > > El sáb, 21 may 2022 a las 16:37, Guozhang Wang (<
> > > wangg...@gmail.com
> > > > >)
> > > > > > > escribió:
> > > > > > >
> > > > > > > > Hello Gabriel,
> > > > > > > >
> > > > > > > > What you're asking is a very fair question :) In fact, for
> > > Streams
> > > > > > where
> > > > > > > > the partition-assignment to producer-consumer pairs are
> purely
> > > > > > flexible,
> > > > > > > we
> > > > > > > > think the new EOS would not have hard requirement on
> > > > > transactional.id:
> > > > > > > > https://issues.apache.org/jira/browse/KAFKA-9453
> > > > > > > >
> > > > > > > > I you implemented the transactional messaging via a DIY
> > > > > > producer+consumer
> > > > > > > > though, it depends on how you'd expect the life-time of a
> > > producer,
> > > > > > e.g.
> > > > > > > if
> > > > > > > > you do not have a 1-1 producer-consumer mapping then
> > > > > transactional.id
> > > > > > is
> > > > > > > > not crucial, but if your have a N-1 producer-consumer mapping
> > > then
> > > > > you
> > > > > > > may
> > > > > > > > still need to configure that id.
> > > > > > > >
> > > > > > > >
> > > > > > > > Guozhang
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > On Fri, May 20, 2022 at 8:39 AM Gabriel Giussi <
> > > > > > gabrielgiu...@gmail.com>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Before KIP-447 I understood the use of transactional.id to
> > > > prevent
> > > > > > us
> > > > > > > > from
> > > > > > > > > zombies introducing duplicates, as explained in this talk
> > > > > > > > > https://youtu.be/j0l_zUhQaTc?t=822.
> > > > > > > > > So in order to get zombie fencing working correctly we
> should
> > > > > assign
> > > > > > > > > producers with a transactional.id that included the
> > partition
> > > > id,
> > > > > > > > > something
> > > > > > > > > like <application><topic>-<partition-id>, as shown in this
> > > slide
> > > > > > > > > https://youtu.be/j0l_zUhQaTc?t=1047 where processor 2
> should
> > > use
> > > > > the
> > > > > > > > same
> > > > > > > > > txnl.id A as the process 1 that crashed.
> > > > > > > > > This prevented us from having process 2 consuming the
> message
> > > > again
> > > > > > and
> > > > > > > > > committing, while process 1 could come back to life and
> also
> > > > commit
> > > > > > the
> > > > > > > > > pending transaction, hence having duplicates message being
> > > > > produced.
> > > > > > In
> > > > > > > > > this case process 1 will be fenced by having an outdated
> > epoch.
> > > > > > > > >
> > > > > > > > > With KIP-447 we no longer have that potential scenario of
> two
> > > > > pending
> > > > > > > > > transactions trying to produce and mark a message as
> > committed,
> > > > > > because
> > > > > > > > we
> > > > > > > > > won't let process 2 even start the transaction if there is
> a
> > > > > pending
> > > > > > > one
> > > > > > > > > (basically by not returning any messages since we reject
> the
> > > > Offset
> > > > > > > Fetch
> > > > > > > > > if a there is a pending transaction for that offset
> > partition).
> > > > > This
> > > > > > is
> > > > > > > > > explained in this post
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://www.confluent.io/blog/simplified-robust-exactly-one-semantics-in-kafka-2-5/#client-api-simplification
> > > > > > > > >
> > > > > > > > > Having that, I don't see anymore the value of
> > transactional.id
> > > > or
> > > > > > how
> > > > > > > I
> > > > > > > > > should configure it in my producers. The main benefit of
> > > KIP-447
> > > > is
> > > > > > > that
> > > > > > > > we
> > > > > > > > > no longer have to start one producer per input partition, a
> > > quote
> > > > > > from
> > > > > > > > the
> > > > > > > > > post
> > > > > > > > > "The only way the static assignment requirement could be
> met
> > is
> > > > if
> > > > > > each
> > > > > > > > > input partition uses a separate producer instance, which is
> > in
> > > > fact
> > > > > > > what
> > > > > > > > > Kafka Streams previously relied on. However, this made
> > running
> > > > EOS
> > > > > > > > > applications much more costly in terms of the client
> > resources
> > > > and
> > > > > > load
> > > > > > > > on
> > > > > > > > > the brokers. A large number of client connections could
> > heavily
> > > > > > impact
> > > > > > > > the
> > > > > > > > > stability of brokers and become a waste of resources as
> > well."
> > > > > > > > >
> > > > > > > > > I guess now I can reuse my producer between different input
> > > > > > partitions,
> > > > > > > > so
> > > > > > > > > what transactional.id should I assign to it and why
> should I
> > > > care,
> > > > > > > isn't
> > > > > > > > > zombie fencing resolved by rejecting offset fetch already?
> > > > > > > > >
> > > > > > > > > Thanks.
> > > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > --
> > > > > > > > -- Guozhang
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > -- Guozhang
> > > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
> >
> > --
> > -- Guozhang
> >
>


-- 
-- Guozhang

Reply via email to