Hi Justine,

After thinking a bit about supporting atomic dual writes for Kafka + NoSQL
database, I came to a conclusion that we do need to bump the epoch even
with InitProducerId(keepPreparedTxn=true).  As I described in my previous
email, we wouldn't need to bump the epoch to protect from zombies so that
reasoning is still true.  But we cannot protect from split-brain scenarios
when two or more instances of a producer with the same transactional id try
to produce at the same time.  The dual-write example for SQL databases (
https://github.com/apache/kafka/pull/14231/files) doesn't have a
split-brain problem because execution is protected by the update lock on
the transaction state record; however NoSQL databases may not have this
protection (I'll write an example for NoSQL database dual-write soon).

In a nutshell, here is an example of a split-brain scenario:

   1. (instance1) InitProducerId(keepPreparedTxn=true), got epoch=42
   2. (instance2) InitProducerId(keepPreparedTxn=true), got epoch=42
   3. (instance1) CommitTxn, epoch bumped to 43
   4. (instance2) CommitTxn, this is considered a retry, so it got epoch 43
   as well
   5. (instance1) Produce messageA w/sequence 1
   6. (instance2) Produce messageB w/sequence 1, this is considered a
   duplicate
   7. (instance2) Produce messageC w/sequence 2
   8. (instance1) Produce messageD w/sequence 2, this is considered a
   duplicate

Now if either of those commit the transaction, it would have a mix of
messages from the two instances (messageA and messageC).  With the proper
epoch bump, instance1 would get fenced at step 3.

In order to update epoch in InitProducerId(keepPreparedTxn=true) we need to
preserve the ongoing transaction's epoch (and producerId, if the epoch
overflows), because we'd need to make a correct decision when we compare
the PreparedTxnState that we read from the database with the (producerId,
epoch) of the ongoing transaction.

I've updated the KIP with the following:

   - Ongoing transaction now has 2 (producerId, epoch) pairs -- one pair
   describes the ongoing transaction, the other pair describes expected epoch
   for operations on this transactional id
   - InitProducerIdResponse now returns 2 (producerId, epoch) pairs
   - TransactionalLogValue now has 2 (producerId, epoch) pairs, the new
   values added as tagged fields, so it's easy to downgrade
   - Added a note about downgrade in the Compatibility section
   - Added a rejected alternative

-Artem

On Fri, Oct 6, 2023 at 5:16 PM Artem Livshits <alivsh...@confluent.io>
wrote:

> Hi Justine,
>
> Thank you for the questions.  Currently (pre-KIP-939) we always bump the
> epoch on InitProducerId and abort an ongoing transaction (if any).  I
> expect this behavior will continue with KIP-890 as well.
>
> With KIP-939 we need to support the case when the ongoing transaction
> needs to be preserved when keepPreparedTxn=true.  Bumping epoch without
> aborting or committing a transaction is tricky because epoch is a short
> value and it's easy to overflow.  Currently, the overflow case is handled
> by aborting the ongoing transaction, which would send out transaction
> markers with epoch=Short.MAX_VALUE to the partition leaders, which would
> fence off any messages with the producer id that started the transaction
> (they would have epoch that is less than Short.MAX_VALUE).  Then it is safe
> to allocate a new producer id and use it in new transactions.
>
> We could say that maybe when keepPreparedTxn=true we bump epoch unless it
> leads to overflow, and don't bump epoch in the overflow case.  I don't
> think it's a good solution because if it's not safe to keep the same epoch
> when keepPreparedTxn=true, then we must handle the epoch overflow case as
> well.  So either we should convince ourselves that it's safe to keep the
> epoch and do it in the general case, or we always bump the epoch and handle
> the overflow.
>
> With KIP-890, we bump the epoch on every transaction commit / abort.  This
> guarantees that even if InitProducerId(keepPreparedTxn=true) doesn't
> increment epoch on the ongoing transaction, the client will have to call
> commit or abort to finish the transaction and will increment the epoch (and
> handle epoch overflow, if needed).  If the ongoing transaction was in a bad
> state and had some zombies waiting to arrive, the abort operation would
> fence them because with KIP-890 every abort would bump the epoch.
>
> We could also look at this from the following perspective.  With KIP-890,
> zombies won't be able to cross transaction boundaries; each transaction
> completion creates a boundary and any activity in the past gets confined in
> the boundary.  Then data in any partition would look like this:
>
> 1. message1, epoch=42
> 2. message2, epoch=42
> 3. message3, epoch=42
> 4. marker (commit or abort), epoch=43
>
> Now if we inject steps 3a and 3b like this:
>
> 1. message1, epoch=42
> 2. message2, epoch=42
> 3. message3, epoch=42
> 3a. crash
> 3b. InitProducerId(keepPreparedTxn=true)
> 4. marker (commit or abort), epoch=43
>
> The invariant still holds even with steps 3a and 3b -- whatever activity
> was in the past will get confined in the past with mandatory abort / commit
> that must follow  InitProducerId(keepPreparedTxn=true).
>
> So KIP-890 provides the proper isolation between transactions, so
> injecting crash + InitProducerId(keepPreparedTxn=true) into the
> transaction sequence is safe from the zombie protection perspective.
>
> That said, I'm still thinking about it and looking for cases that might
> break because we don't bump epoch when
> InitProducerId(keepPreparedTxn=true), if such cases exist, we'll need to
> develop the logic to handle epoch overflow for ongoing transactions.
>
> -Artem
>
>
>
> On Tue, Oct 3, 2023 at 10:15 AM Justine Olshan
> <jols...@confluent.io.invalid> wrote:
>
>> Hey Artem,
>>
>> Thanks for the KIP. I had a question about epoch bumping.
>>
>> Previously when we send an InitProducerId request on Producer startup, we
>> bump the epoch and abort the transaction. Is it correct to assume that we
>> will still bump the epoch, but just not abort the transaction?
>> If we still bump the epoch in this case, how does this interact with
>> KIP-890 where we also bump the epoch on every transaction. (I think this
>> means that we may skip epochs and the data itself will all have the same
>> epoch)
>>
>> I may have follow ups depending on the answer to this. :)
>>
>> Thanks,
>> Justine
>>
>> On Thu, Sep 7, 2023 at 9:51 PM Artem Livshits
>> <alivsh...@confluent.io.invalid> wrote:
>>
>> > Hi Alex,
>> >
>> > Thank you for your questions.
>> >
>> > > the purpose of having broker-level transaction.two.phase.commit.enable
>> >
>> > The thinking is that 2PC is a bit of an advanced construct so enabling
>> 2PC
>> > in a Kafka cluster should be an explicit decision.  If it is set to
>> 'false'
>> > InitiProducerId (and initTransactions) would
>> > return TRANSACTIONAL_ID_AUTHORIZATION_FAILED.
>> >
>> > > WDYT about adding an AdminClient method that returns the state of
>> > transaction.two.phase.commit.enable
>> >
>> > I wonder if the client could just try to use 2PC and then handle the
>> error
>> > (e.g. if it needs to fall back to ordinary transactions).  This way it
>> > could uniformly handle cases when Kafka cluster doesn't support 2PC
>> > completely and cases when 2PC is restricted to certain users.  We could
>> > also expose this config in describeConfigs, if the fallback approach
>> > doesn't work for some scenarios.
>> >
>> > -Artem
>> >
>> >
>> > On Tue, Sep 5, 2023 at 12:45 PM Alexander Sorokoumov
>> > <asorokou...@confluent.io.invalid> wrote:
>> >
>> > > Hi Artem,
>> > >
>> > > Thanks for publishing this KIP!
>> > >
>> > > Can you please clarify the purpose of having broker-level
>> > > transaction.two.phase.commit.enable config in addition to the new
>> ACL? If
>> > > the brokers are configured with
>> > transaction.two.phase.commit.enable=false,
>> > > at what point will a client configured with
>> > > transaction.two.phase.commit.enable=true fail? Will it happen at
>> > > KafkaProducer#initTransactions?
>> > >
>> > > WDYT about adding an AdminClient method that returns the state of t
>> > > ransaction.two.phase.commit.enable? This way, clients would know in
>> > advance
>> > > if 2PC is enabled on the brokers.
>> > >
>> > > Best,
>> > > Alex
>> > >
>> > > On Fri, Aug 25, 2023 at 9:40 AM Roger Hoover <roger.hoo...@gmail.com>
>> > > wrote:
>> > >
>> > > > Other than supporting multiplexing transactional streams on a single
>> > > > producer, I don't see how to improve it.
>> > > >
>> > > > On Thu, Aug 24, 2023 at 12:12 PM Artem Livshits
>> > > > <alivsh...@confluent.io.invalid> wrote:
>> > > >
>> > > > > Hi Roger,
>> > > > >
>> > > > > Thank you for summarizing the cons.  I agree and I'm curious what
>> > would
>> > > > be
>> > > > > the alternatives to solve these problems better and if they can be
>> > > > > incorporated into this proposal (or built independently in
>> addition
>> > to
>> > > or
>> > > > > on top of this proposal).  E.g. one potential extension we
>> discussed
>> > > > > earlier in the thread could be multiplexing logical transactional
>> > > > "streams"
>> > > > > with a single producer.
>> > > > >
>> > > > > -Artem
>> > > > >
>> > > > > On Wed, Aug 23, 2023 at 4:50 PM Roger Hoover <
>> roger.hoo...@gmail.com
>> > >
>> > > > > wrote:
>> > > > >
>> > > > > > Thanks.  I like that you're moving Kafka toward supporting this
>> > > > > dual-write
>> > > > > > pattern.  Each use case needs to consider the tradeoffs.  You
>> > already
>> > > > > > summarized the pros very well in the KIP.  I would summarize the
>> > cons
>> > > > > > as follows:
>> > > > > >
>> > > > > > - you sacrifice availability - each write requires both DB and
>> > Kafka
>> > > to
>> > > > > be
>> > > > > > available so I think your overall application availability is 1
>> -
>> > > p(DB
>> > > > is
>> > > > > > unavailable)*p(Kafka is unavailable).
>> > > > > > - latency will be higher and throughput lower - each write
>> requires
>> > > > both
>> > > > > > writes to DB and Kafka while holding an exclusive lock in DB.
>> > > > > > - you need to create a producer per unit of concurrency in your
>> app
>> > > > which
>> > > > > > has some overhead in the app and Kafka side (number of
>> connections,
>> > > > poor
>> > > > > > batching).  I assume the producers would need to be configured
>> for
>> > > low
>> > > > > > latency (linger.ms=0)
>> > > > > > - there's some complexity in managing stable transactional ids
>> for
>> > > each
>> > > > > > producer/concurrency unit in your application.  With k8s
>> > deployment,
>> > > > you
>> > > > > > may need to switch to something like a StatefulSet that gives
>> each
>> > > pod
>> > > > a
>> > > > > > stable identity across restarts.  On top of that pod identity
>> which
>> > > you
>> > > > > can
>> > > > > > use as a prefix, you then assign unique transactional ids to
>> each
>> > > > > > concurrency unit (thread/goroutine).
>> > > > > >
>> > > > > > On Wed, Aug 23, 2023 at 12:53 PM Artem Livshits
>> > > > > > <alivsh...@confluent.io.invalid> wrote:
>> > > > > >
>> > > > > > > Hi Roger,
>> > > > > > >
>> > > > > > > Thank you for the feedback.  You make a very good point that
>> we
>> > > also
>> > > > > > > discussed internally.  Adding support for multiple concurrent
>> > > > > > > transactions in one producer could be valuable but it seems to
>> > be a
>> > > > > > fairly
>> > > > > > > large and independent change that would deserve a separate
>> KIP.
>> > If
>> > > > > such
>> > > > > > > support is added we could modify 2PC functionality to
>> incorporate
>> > > > that.
>> > > > > > >
>> > > > > > > > Maybe not too bad but a bit of pain to manage these ids
>> inside
>> > > each
>> > > > > > > process and across all application processes.
>> > > > > > >
>> > > > > > > I'm not sure if supporting multiple transactions in one
>> producer
>> > > > would
>> > > > > > make
>> > > > > > > id management simpler: we'd need to store a piece of data per
>> > > > > > transaction,
>> > > > > > > so whether it's N producers with a single transaction or N
>> > > > transactions
>> > > > > > > with a single producer, it's still roughly the same amount of
>> > data
>> > > to
>> > > > > > > manage.  In fact, managing transactional ids (current
>> proposal)
>> > > might
>> > > > > be
>> > > > > > > easier, because the id is controlled by the application and it
>> > > knows
>> > > > > how
>> > > > > > to
>> > > > > > > complete the transaction after crash / restart; while a TID
>> would
>> > > be
>> > > > > > > generated by Kafka and that would create a question of
>> starting
>> > > Kafka
>> > > > > > > transaction, but not saving its TID and then crashing, then
>> > > figuring
>> > > > > out
>> > > > > > > which transactions to abort and etc.
>> > > > > > >
>> > > > > > > > 2) creating a separate producer for each concurrency slot in
>> > the
>> > > > > > > application
>> > > > > > >
>> > > > > > > This is a very valid concern.  Maybe we'd need to have some
>> > > > > multiplexing
>> > > > > > of
>> > > > > > > transactional logical "streams" over the same connection.
>> Seems
>> > > > like a
>> > > > > > > separate KIP, though.
>> > > > > > >
>> > > > > > > > Otherwise, it seems you're left with single-threaded model
>> per
>> > > > > > > application process?
>> > > > > > >
>> > > > > > > That's a fair assessment.  Not necessarily exactly
>> > single-threaded
>> > > > per
>> > > > > > > application, but a single producer per thread model (i.e. an
>> > > > > application
>> > > > > > > could have a pool of threads + producers to increase
>> > concurrency).
>> > > > > > >
>> > > > > > > -Artem
>> > > > > > >
>> > > > > > > On Tue, Aug 22, 2023 at 7:22 PM Roger Hoover <
>> > > roger.hoo...@gmail.com
>> > > > >
>> > > > > > > wrote:
>> > > > > > >
>> > > > > > > > Artem,
>> > > > > > > >
>> > > > > > > > Thanks for the reply.
>> > > > > > > >
>> > > > > > > > If I understand correctly, Kafka does not support concurrent
>> > > > > > transactions
>> > > > > > > > from the same producer (transactional id).  I think this
>> means
>> > > that
>> > > > > > > > applications that want to support in-process concurrency
>> (say
>> > > > > > > thread-level
>> > > > > > > > concurrency with row-level DB locking) would need to manage
>> > > > separate
>> > > > > > > > transactional ids and producers per thread and then store
>> txn
>> > > state
>> > > > > > > > accordingly.   The potential usability downsides I see are
>> > > > > > > > 1) managing a set of transactional ids for each application
>> > > process
>> > > > > > that
>> > > > > > > > scales up to it's max concurrency.  Maybe not too bad but a
>> bit
>> > > of
>> > > > > pain
>> > > > > > > to
>> > > > > > > > manage these ids inside each process and across all
>> application
>> > > > > > > processes.
>> > > > > > > > 2) creating a separate producer for each concurrency slot in
>> > the
>> > > > > > > > application - this could create a lot more producers and
>> > > resultant
>> > > > > > > > connections to Kafka than the typical model of a single
>> > producer
>> > > > per
>> > > > > > > > process.
>> > > > > > > >
>> > > > > > > > Otherwise, it seems you're left with single-threaded model
>> per
>> > > > > > > application
>> > > > > > > > process?
>> > > > > > > >
>> > > > > > > > Thanks,
>> > > > > > > >
>> > > > > > > > Roger
>> > > > > > > >
>> > > > > > > > On Tue, Aug 22, 2023 at 5:11 PM Artem Livshits
>> > > > > > > > <alivsh...@confluent.io.invalid> wrote:
>> > > > > > > >
>> > > > > > > > > Hi Roger, Arjun,
>> > > > > > > > >
>> > > > > > > > > Thank you for the questions.
>> > > > > > > > > > It looks like the application must have stable
>> > transactional
>> > > > ids
>> > > > > > over
>> > > > > > > > > time?
>> > > > > > > > >
>> > > > > > > > > The transactional id should uniquely identify a producer
>> > > instance
>> > > > > and
>> > > > > > > > needs
>> > > > > > > > > to be stable across the restarts.  If the transactional
>> id is
>> > > not
>> > > > > > > stable
>> > > > > > > > > across restarts, then zombie messages from a previous
>> > > incarnation
>> > > > > of
>> > > > > > > the
>> > > > > > > > > producer may violate atomicity.  If there are 2 producer
>> > > > instances
>> > > > > > > > > concurrently producing data with the same transactional
>> id,
>> > > they
>> > > > > are
>> > > > > > > > going
>> > > > > > > > > to constantly fence each other and most likely make
>> little or
>> > > no
>> > > > > > > > progress.
>> > > > > > > > >
>> > > > > > > > > The name might be a little bit confusing as it may be
>> > mistaken
>> > > > for
>> > > > > a
>> > > > > > > > > transaction id / TID that uniquely identifies every
>> > > transaction.
>> > > > > The
>> > > > > > > > name
>> > > > > > > > > and the semantics were defined in the original
>> > > > > exactly-once-semantics
>> > > > > > > > (EoS)
>> > > > > > > > > proposal (
>> > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging
>> > > > > > > > > )
>> > > > > > > > > and KIP-939 just build on top of that.
>> > > > > > > > >
>> > > > > > > > > > I'm curious to understand what happens if the producer
>> > dies,
>> > > > and
>> > > > > > does
>> > > > > > > > not
>> > > > > > > > > come up and recover the pending transaction within the
>> > > > transaction
>> > > > > > > > timeout
>> > > > > > > > > interval.
>> > > > > > > > >
>> > > > > > > > > If the producer / application never comes back, the
>> > transaction
>> > > > > will
>> > > > > > > > remain
>> > > > > > > > > in prepared (a.k.a. "in-doubt") state until an operator
>> > > > forcefully
>> > > > > > > > > terminates the transaction.  That's why there is a new
>> ACL is
>> > > > > defined
>> > > > > > > in
>> > > > > > > > > this proposal -- this functionality should only provided
>> to
>> > > > > > > applications
>> > > > > > > > > that implement proper recovery logic.
>> > > > > > > > >
>> > > > > > > > > -Artem
>> > > > > > > > >
>> > > > > > > > > On Tue, Aug 22, 2023 at 12:52 AM Arjun Satish <
>> > > > > > arjun.sat...@gmail.com>
>> > > > > > > > > wrote:
>> > > > > > > > >
>> > > > > > > > > > Hello Artem,
>> > > > > > > > > >
>> > > > > > > > > > Thanks for the KIP.
>> > > > > > > > > >
>> > > > > > > > > > I have the same question as Roger on concurrent writes,
>> and
>> > > an
>> > > > > > > > additional
>> > > > > > > > > > one on consumer behavior. Typically, transactions will
>> > > timeout
>> > > > if
>> > > > > > not
>> > > > > > > > > > committed within some time interval. With the proposed
>> > > changes
>> > > > in
>> > > > > > > this
>> > > > > > > > > KIP,
>> > > > > > > > > > consumers cannot consume past the ongoing transaction.
>> I'm
>> > > > > curious
>> > > > > > to
>> > > > > > > > > > understand what happens if the producer dies, and does
>> not
>> > > come
>> > > > > up
>> > > > > > > and
>> > > > > > > > > > recover the pending transaction within the transaction
>> > > timeout
>> > > > > > > > interval.
>> > > > > > > > > Or
>> > > > > > > > > > are we saying that when used in this 2PC context, we
>> should
>> > > > > > configure
>> > > > > > > > > these
>> > > > > > > > > > transaction timeouts to very large durations?
>> > > > > > > > > >
>> > > > > > > > > > Thanks in advance!
>> > > > > > > > > >
>> > > > > > > > > > Best,
>> > > > > > > > > > Arjun
>> > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > > > On Mon, Aug 21, 2023 at 1:06 PM Roger Hoover <
>> > > > > > roger.hoo...@gmail.com
>> > > > > > > >
>> > > > > > > > > > wrote:
>> > > > > > > > > >
>> > > > > > > > > > > Hi Artem,
>> > > > > > > > > > >
>> > > > > > > > > > > Thanks for writing this KIP.  Can you clarify the
>> > > > requirements
>> > > > > a
>> > > > > > > bit
>> > > > > > > > > more
>> > > > > > > > > > > for managing transaction state?  It looks like the
>> > > > application
>> > > > > > must
>> > > > > > > > > have
>> > > > > > > > > > > stable transactional ids over time?   What is the
>> > > granularity
>> > > > > of
>> > > > > > > > those
>> > > > > > > > > > ids
>> > > > > > > > > > > and producers?  Say the application is a
>> multi-threaded
>> > > Java
>> > > > > web
>> > > > > > > > > server,
>> > > > > > > > > > > can/should all the concurrent threads share a
>> > transactional
>> > > > id
>> > > > > > and
>> > > > > > > > > > > producer?  That doesn't seem right to me unless the
>> > > > application
>> > > > > > is
>> > > > > > > > > using
>> > > > > > > > > > > global DB locks that serialize all requests.
>> Instead, if
>> > > the
>> > > > > > > > > application
>> > > > > > > > > > > uses row-level DB locks, there could be multiple,
>> > > concurrent,
>> > > > > > > > > independent
>> > > > > > > > > > > txns happening in the same JVM so it seems like the
>> > > > granularity
>> > > > > > > > > managing
>> > > > > > > > > > > transactional ids and txn state needs to line up with
>> > > > > granularity
>> > > > > > > of
>> > > > > > > > > the
>> > > > > > > > > > DB
>> > > > > > > > > > > locking.
>> > > > > > > > > > >
>> > > > > > > > > > > Does that make sense or am I misunderstanding?
>> > > > > > > > > > >
>> > > > > > > > > > > Thanks,
>> > > > > > > > > > >
>> > > > > > > > > > > Roger
>> > > > > > > > > > >
>> > > > > > > > > > > On Wed, Aug 16, 2023 at 11:40 PM Artem Livshits
>> > > > > > > > > > > <alivsh...@confluent.io.invalid> wrote:
>> > > > > > > > > > >
>> > > > > > > > > > > > Hello,
>> > > > > > > > > > > >
>> > > > > > > > > > > > This is a discussion thread for
>> > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-939%3A+Support+Participation+in+2PC
>> > > > > > > > > > > > .
>> > > > > > > > > > > >
>> > > > > > > > > > > > The KIP proposes extending Kafka transaction support
>> > > (that
>> > > > > > > already
>> > > > > > > > > uses
>> > > > > > > > > > > 2PC
>> > > > > > > > > > > > under the hood) to enable atomicity of dual writes
>> to
>> > > Kafka
>> > > > > and
>> > > > > > > an
>> > > > > > > > > > > external
>> > > > > > > > > > > > database, and helps to fix a long standing Flink
>> issue.
>> > > > > > > > > > > >
>> > > > > > > > > > > > An example of code that uses the dual write recipe
>> with
>> > > > JDBC
>> > > > > > and
>> > > > > > > > > should
>> > > > > > > > > > > > work for most SQL databases is here
>> > > > > > > > > > > > https://github.com/apache/kafka/pull/14231.
>> > > > > > > > > > > >
>> > > > > > > > > > > > The FLIP for the sister fix in Flink is here
>> > > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255071710
>> > > > > > > > > > > >
>> > > > > > > > > > > > -Artem
>> > > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>

Reply via email to