But there is no guarantee that the onPartitionsLost callback will be called
before a zombie producer coming back to life tries to continue with the
transaction, e.g. sending offsets or committing, so I should handle the
exception first and I could directly create a new producer there instead of
doing in the callback.
The curious part for me is that I was able to reproduce a case that
simulates a zombie producer that will try to send offsets after a rebalance
but instead of failing with a ProducerFencedException is failing with a
CommitFailedException with this message "Transaction offset Commit failed
due to consumer group metadata mismatch: Specified group generation id is
not valid.", which makes sense but is not even documented in the
KafkaProducer#sendOffsetsToTransaction.
Is this the expected behaviour or it should fail with a
ProducerFencedException when the generation.id is outdated?
The case I reproduced is like this
1. Consumer A subscribes to topic X partitions 0 and 1, and starts a
producer with transactional.id = "tid.123"
2. Consumes message from partition 1 and sends it to another thread to be
consumed (so the poll thread is not blocked)
3. Producer A begins a transaction, sends to output topic and gets blocked
(I'm using a lock here to simulate a long processing) before calling
sendOffsetsToTransaction
4. Consumer B is created and gets assigned partition 1 (I'm using
CooperativeStickyAssignor) and creates a producer with transactional.id =
"tid.456"
5. Consumer B fetches the same message, processes it and commits the
transaction successfully
6. Producer A calls sendOffsetsToTransaction (because the lock was
released) and fails with CommitFailedException

This behaviour reflects what is described here
https://www.confluent.io/blog/simplified-robust-exactly-one-semantics-in-kafka-2-5/#client-api-simplification,
but I was actually expecting a ProducerFencedException instead. Does that
exception only correspond to fencing done by transactional.id?

Thanks

El mar, 24 may 2022 a las 20:30, Guozhang Wang (<wangg...@gmail.com>)
escribió:

> No problem.
>
> The key is that at step 4, when the consumer re-joins it will be aware that
> it has lost its previously assigned partitions and will trigger
> `onPartitionsLost` on the rebalance callback. And since in your scenario
> it's a 1-1 mapping from consumer to producer, it means the producer has
> been fenced and hence should be closed.
>
> So in that step 4, the old producer with Client A should be closed within
> the rebalance callback, and then one can create a new producer to pair with
> the re-joined consumer.
>
> On Tue, May 24, 2022 at 1:30 PM Gabriel Giussi <gabrielgiu...@gmail.com>
> wrote:
>
> > Last question, the fencing occurs with the sendOffsetsToTransaction which
> > includes ConsumerGroupMetadata, I guess the generation.id is what
> matters
> > here since it is bumped with each rebalance.
> > But couldn't this happen?
> > 1. Client A consumes from topic partition P1 with generation.id = 1 and
> a
> > producer associated to it produces to some output topic but a long GC
> pause
> > occurs before calling sendOffsetsToTransaction
> > 2. Client A gets out of sync and becomes a zombie due to session timeout,
> > group rebalanced.
> > 3. Client B is assigned topic partition P1 with generation.id = 2, calls
> > sendOffsetsToTransaction and commits the txn
> > 4. Client A is back online and joins again with generation.id = 3 (this
> > happens in some internal thread)
> > 5. The thread that was about to call sendOffsetsToTransaction is
> scheduled
> > and calls sendOffsetsToTransaction with generation.id = 3 which is the
> > current one so it won't be fenced.
> >
> > I'm asking this because we are always asking the current
> > consumerGroupMetadata to the consumer object, not the one that was used
> to
> > consume the offsets, like this
> > producer.sendOffsetsToTransaction(consumedOffsets,
> > consumer.groupMetadata());
> >
> > Couldn't this return a groupMetadata that has a valid generation.id even
> > when it is not the same at the moment of consuming the messages that are
> > about to be commited?
> >
> > I'm sure I'm missing something (probably in step 4) that makes this not a
> > possible scenario, but I can't say what it is.
> >
> > Sorry if the question is too confusing.
> >
> >
> >
> >
> >
> >
> > El mar, 24 may 2022 a las 12:49, Guozhang Wang (<wangg...@gmail.com>)
> > escribió:
> >
> > > Hi Gabriel,
> > >
> > > What I meant is that with KIP-447, the fencing is achieved by the time
> of
> > > committing with the consumer metadata. If within a transaction, the
> > > producer would always try to commit at least once on behalf of the
> > > consumer, AND a zombie of the producer would always come from a zombie
> > of a
> > > consumer, then the transaction would be guaranteed to be fenced. But:
> > >
> > > 1) If within a transaction, there's no `sendOffset..` triggered, then
> > > fencing still need to be done by the txn coordinator, and txn.id plays
> > the
> > > role here ---- I think this is not your scenario.
> > > 2) If a consumer may be "represented" by multiple producers, and a
> zombie
> > > producer does not come from a zombie consumer, then we still need the
> > > fencing be done via the txn.id --- this is the scenario I'd like to
> > remind
> > > you about. For example, if two producers could be (mistakenly) created
> > with
> > > different txn.ids and are paired with the same consumer, then the new
> API
> > > in KIP-447 would not fence one of them.
> > >
> > > Guozhang
> > >
> > > On Tue, May 24, 2022 at 5:50 AM Gabriel Giussi <
> gabrielgiu...@gmail.com>
> > > wrote:
> > >
> > > > Hello Guozhang,
> > > >
> > > > thanks for the response, I have some doubts about the "N-1
> > > > producer-consumer" case you mentioned and why I may need to configure
> > the
> > > > transactional id there and how. Is this a case of N consumers sharing
> > the
> > > > same producer right?
> > > >
> > > > My current implementation is creating a consumer per topic (I don't
> > > > subscribe to multiple topics from the same consumer) and starting a
> > > > producer per consumer, so the relation is 1 consumer/topic => 1
> > producer
> > > > and the transactional id is set as
> > > <consumer-group>-<topic>-<random-uuid>.
> > > > Do you see any problem with this configuration?
> > > >
> > > > Thanks again.
> > > >
> > > > El sáb, 21 may 2022 a las 16:37, Guozhang Wang (<wangg...@gmail.com
> >)
> > > > escribió:
> > > >
> > > > > Hello Gabriel,
> > > > >
> > > > > What you're asking is a very fair question :) In fact, for Streams
> > > where
> > > > > the partition-assignment to producer-consumer pairs are purely
> > > flexible,
> > > > we
> > > > > think the new EOS would not have hard requirement on
> > transactional.id:
> > > > > https://issues.apache.org/jira/browse/KAFKA-9453
> > > > >
> > > > > I you implemented the transactional messaging via a DIY
> > > producer+consumer
> > > > > though, it depends on how you'd expect the life-time of a producer,
> > > e.g.
> > > > if
> > > > > you do not have a 1-1 producer-consumer mapping then
> > transactional.id
> > > is
> > > > > not crucial, but if your have a N-1 producer-consumer mapping then
> > you
> > > > may
> > > > > still need to configure that id.
> > > > >
> > > > >
> > > > > Guozhang
> > > > >
> > > > >
> > > > >
> > > > > On Fri, May 20, 2022 at 8:39 AM Gabriel Giussi <
> > > gabrielgiu...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Before KIP-447 I understood the use of transactional.id to
> prevent
> > > us
> > > > > from
> > > > > > zombies introducing duplicates, as explained in this talk
> > > > > > https://youtu.be/j0l_zUhQaTc?t=822.
> > > > > > So in order to get zombie fencing working correctly we should
> > assign
> > > > > > producers with a transactional.id that included the partition
> id,
> > > > > > something
> > > > > > like <application><topic>-<partition-id>, as shown in this slide
> > > > > > https://youtu.be/j0l_zUhQaTc?t=1047 where processor 2 should use
> > the
> > > > > same
> > > > > > txnl.id A as the process 1 that crashed.
> > > > > > This prevented us from having process 2 consuming the message
> again
> > > and
> > > > > > committing, while process 1 could come back to life and also
> commit
> > > the
> > > > > > pending transaction, hence having duplicates message being
> > produced.
> > > In
> > > > > > this case process 1 will be fenced by having an outdated epoch.
> > > > > >
> > > > > > With KIP-447 we no longer have that potential scenario of two
> > pending
> > > > > > transactions trying to produce and mark a message as committed,
> > > because
> > > > > we
> > > > > > won't let process 2 even start the transaction if there is a
> > pending
> > > > one
> > > > > > (basically by not returning any messages since we reject the
> Offset
> > > > Fetch
> > > > > > if a there is a pending transaction for that offset partition).
> > This
> > > is
> > > > > > explained in this post
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://www.confluent.io/blog/simplified-robust-exactly-one-semantics-in-kafka-2-5/#client-api-simplification
> > > > > >
> > > > > > Having that, I don't see anymore the value of transactional.id
> or
> > > how
> > > > I
> > > > > > should configure it in my producers. The main benefit of KIP-447
> is
> > > > that
> > > > > we
> > > > > > no longer have to start one producer per input partition, a quote
> > > from
> > > > > the
> > > > > > post
> > > > > > "The only way the static assignment requirement could be met is
> if
> > > each
> > > > > > input partition uses a separate producer instance, which is in
> fact
> > > > what
> > > > > > Kafka Streams previously relied on. However, this made running
> EOS
> > > > > > applications much more costly in terms of the client resources
> and
> > > load
> > > > > on
> > > > > > the brokers. A large number of client connections could heavily
> > > impact
> > > > > the
> > > > > > stability of brokers and become a waste of resources as well."
> > > > > >
> > > > > > I guess now I can reuse my producer between different input
> > > partitions,
> > > > > so
> > > > > > what transactional.id should I assign to it and why should I
> care,
> > > > isn't
> > > > > > zombie fencing resolved by rejecting offset fetch already?
> > > > > >
> > > > > > Thanks.
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
> --
> -- Guozhang
>

Reply via email to