No problem.

The key is that at step 4, when the consumer re-joins it will be aware that
it has lost its previously assigned partitions and will trigger
`onPartitionsLost` on the rebalance callback. And since in your scenario
it's a 1-1 mapping from consumer to producer, it means the producer has
been fenced and hence should be closed.

So in that step 4, the old producer with Client A should be closed within
the rebalance callback, and then one can create a new producer to pair with
the re-joined consumer.

On Tue, May 24, 2022 at 1:30 PM Gabriel Giussi <gabrielgiu...@gmail.com>
wrote:

> Last question, the fencing occurs with the sendOffsetsToTransaction which
> includes ConsumerGroupMetadata, I guess the generation.id is what matters
> here since it is bumped with each rebalance.
> But couldn't this happen?
> 1. Client A consumes from topic partition P1 with generation.id = 1 and a
> producer associated to it produces to some output topic but a long GC pause
> occurs before calling sendOffsetsToTransaction
> 2. Client A gets out of sync and becomes a zombie due to session timeout,
> group rebalanced.
> 3. Client B is assigned topic partition P1 with generation.id = 2, calls
> sendOffsetsToTransaction and commits the txn
> 4. Client A is back online and joins again with generation.id = 3 (this
> happens in some internal thread)
> 5. The thread that was about to call sendOffsetsToTransaction is scheduled
> and calls sendOffsetsToTransaction with generation.id = 3 which is the
> current one so it won't be fenced.
>
> I'm asking this because we are always asking the current
> consumerGroupMetadata to the consumer object, not the one that was used to
> consume the offsets, like this
> producer.sendOffsetsToTransaction(consumedOffsets,
> consumer.groupMetadata());
>
> Couldn't this return a groupMetadata that has a valid generation.id even
> when it is not the same at the moment of consuming the messages that are
> about to be commited?
>
> I'm sure I'm missing something (probably in step 4) that makes this not a
> possible scenario, but I can't say what it is.
>
> Sorry if the question is too confusing.
>
>
>
>
>
>
> El mar, 24 may 2022 a las 12:49, Guozhang Wang (<wangg...@gmail.com>)
> escribió:
>
> > Hi Gabriel,
> >
> > What I meant is that with KIP-447, the fencing is achieved by the time of
> > committing with the consumer metadata. If within a transaction, the
> > producer would always try to commit at least once on behalf of the
> > consumer, AND a zombie of the producer would always come from a zombie
> of a
> > consumer, then the transaction would be guaranteed to be fenced. But:
> >
> > 1) If within a transaction, there's no `sendOffset..` triggered, then
> > fencing still need to be done by the txn coordinator, and txn.id plays
> the
> > role here ---- I think this is not your scenario.
> > 2) If a consumer may be "represented" by multiple producers, and a zombie
> > producer does not come from a zombie consumer, then we still need the
> > fencing be done via the txn.id --- this is the scenario I'd like to
> remind
> > you about. For example, if two producers could be (mistakenly) created
> with
> > different txn.ids and are paired with the same consumer, then the new API
> > in KIP-447 would not fence one of them.
> >
> > Guozhang
> >
> > On Tue, May 24, 2022 at 5:50 AM Gabriel Giussi <gabrielgiu...@gmail.com>
> > wrote:
> >
> > > Hello Guozhang,
> > >
> > > thanks for the response, I have some doubts about the "N-1
> > > producer-consumer" case you mentioned and why I may need to configure
> the
> > > transactional id there and how. Is this a case of N consumers sharing
> the
> > > same producer right?
> > >
> > > My current implementation is creating a consumer per topic (I don't
> > > subscribe to multiple topics from the same consumer) and starting a
> > > producer per consumer, so the relation is 1 consumer/topic => 1
> producer
> > > and the transactional id is set as
> > <consumer-group>-<topic>-<random-uuid>.
> > > Do you see any problem with this configuration?
> > >
> > > Thanks again.
> > >
> > > El sáb, 21 may 2022 a las 16:37, Guozhang Wang (<wangg...@gmail.com>)
> > > escribió:
> > >
> > > > Hello Gabriel,
> > > >
> > > > What you're asking is a very fair question :) In fact, for Streams
> > where
> > > > the partition-assignment to producer-consumer pairs are purely
> > flexible,
> > > we
> > > > think the new EOS would not have hard requirement on
> transactional.id:
> > > > https://issues.apache.org/jira/browse/KAFKA-9453
> > > >
> > > > I you implemented the transactional messaging via a DIY
> > producer+consumer
> > > > though, it depends on how you'd expect the life-time of a producer,
> > e.g.
> > > if
> > > > you do not have a 1-1 producer-consumer mapping then
> transactional.id
> > is
> > > > not crucial, but if your have a N-1 producer-consumer mapping then
> you
> > > may
> > > > still need to configure that id.
> > > >
> > > >
> > > > Guozhang
> > > >
> > > >
> > > >
> > > > On Fri, May 20, 2022 at 8:39 AM Gabriel Giussi <
> > gabrielgiu...@gmail.com>
> > > > wrote:
> > > >
> > > > > Before KIP-447 I understood the use of transactional.id to prevent
> > us
> > > > from
> > > > > zombies introducing duplicates, as explained in this talk
> > > > > https://youtu.be/j0l_zUhQaTc?t=822.
> > > > > So in order to get zombie fencing working correctly we should
> assign
> > > > > producers with a transactional.id that included the partition id,
> > > > > something
> > > > > like <application><topic>-<partition-id>, as shown in this slide
> > > > > https://youtu.be/j0l_zUhQaTc?t=1047 where processor 2 should use
> the
> > > > same
> > > > > txnl.id A as the process 1 that crashed.
> > > > > This prevented us from having process 2 consuming the message again
> > and
> > > > > committing, while process 1 could come back to life and also commit
> > the
> > > > > pending transaction, hence having duplicates message being
> produced.
> > In
> > > > > this case process 1 will be fenced by having an outdated epoch.
> > > > >
> > > > > With KIP-447 we no longer have that potential scenario of two
> pending
> > > > > transactions trying to produce and mark a message as committed,
> > because
> > > > we
> > > > > won't let process 2 even start the transaction if there is a
> pending
> > > one
> > > > > (basically by not returning any messages since we reject the Offset
> > > Fetch
> > > > > if a there is a pending transaction for that offset partition).
> This
> > is
> > > > > explained in this post
> > > > >
> > > > >
> > > >
> > >
> >
> https://www.confluent.io/blog/simplified-robust-exactly-one-semantics-in-kafka-2-5/#client-api-simplification
> > > > >
> > > > > Having that, I don't see anymore the value of transactional.id or
> > how
> > > I
> > > > > should configure it in my producers. The main benefit of KIP-447 is
> > > that
> > > > we
> > > > > no longer have to start one producer per input partition, a quote
> > from
> > > > the
> > > > > post
> > > > > "The only way the static assignment requirement could be met is if
> > each
> > > > > input partition uses a separate producer instance, which is in fact
> > > what
> > > > > Kafka Streams previously relied on. However, this made running EOS
> > > > > applications much more costly in terms of the client resources and
> > load
> > > > on
> > > > > the brokers. A large number of client connections could heavily
> > impact
> > > > the
> > > > > stability of brokers and become a waste of resources as well."
> > > > >
> > > > > I guess now I can reuse my producer between different input
> > partitions,
> > > > so
> > > > > what transactional.id should I assign to it and why should I care,
> > > isn't
> > > > > zombie fencing resolved by rejecting offset fetch already?
> > > > >
> > > > > Thanks.
> > > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
> >
> > --
> > -- Guozhang
> >
>


-- 
-- Guozhang

Reply via email to