Hi, Artem,

Thanks for the reply.
20A. One option is to make the API initTransactions(boolean enable2PC).
Then, it's clear from the code whether 2PC related logic should be added.

20B. But realistically, we want Flink (and other apps) to have a single
implementation of the 2PC logic, not two different implementations, right?

32. My suggestion is to
change 
kafka.server:type=transaction-coordinator-metrics,name=active-transaction-open-time-max
to sth like
Metric Name                        Type  Group
Tags   Description
active-transaction-open-time-max   Max   transaction-coordinator-metrics
 none  The max time a currently-open transaction has been open

Jun

On Wed, Feb 21, 2024 at 11:25 AM Artem Livshits
<alivsh...@confluent.io.invalid> wrote:

> Hi Jun,
>
> > 20A.  This only takes care of the abort case. The application still needs
> to be changed to handle the commit case properly
>
> My point here is that looking at the initTransactions() call it's not clear
> what the semantics is.  Say I'm doing code review, I cannot say if the code
> is correct or not -- if the config (that's something that's
> theoretically not known at the time of code review) is going to enable 2PC,
> then the correct code should look one way, otherwise it would need to look
> differently.  Also, say if code is written with InitTransaction() without
> explicit abort and then for whatever reason the code would get used with
> 2PC enabled (could be a library in a bigger product) it'll start breaking
> in a non-intuitive way.
>
> > 20B. Hmm, if the admin disables 2PC, there is likely a reason behind that
>
> That's true, but reality may be more complicated.  Say a user wants to run
> a self-managed Flink with Confluent cloud.  Confluent cloud adim may not
> be comfortable enabling 2PC to general user accounts that use services not
> managed by Confluent (the same way Confluent doesn't allow increasing max
> transaction timeout for general user accounts).  Right now, self-managed
> Flink works because it uses reflection, if it moves to use public APIs
> provided by KIP-939 it'll break.
>
> > 32. Ok. That's the kafka metric. In that case, the metric name has a
> group and a name. There is no type and no package name.
>
> Is this a suggestion to change or confirmation that the current logic is
> ok?  I just copied an existing metric but can change if needed.
>
> -Artem
>
> On Tue, Feb 20, 2024 at 11:25 AM Jun Rao <j...@confluent.io.invalid> wrote:
>
> > Hi, Artem,
> >
> > Thanks for the reply.
> >
> > 20. "Say if an application
> > currently uses initTransactions() to achieve the current semantics, it
> > would need to be rewritten to use initTransactions() + abort to achieve
> the
> > same semantics if the config is changed. "
> >
> > This only takes care of the abort case. The application still needs to be
> > changed to handle the commit case properly
> > if transaction.two.phase.commit.enable is set to true.
> >
> > "Even when KIP-939 is implemented,
> > there would be situations when 2PC is disabled by the admin (e.g. Kafka
> > service providers may be reluctant to enable 2PC for Flink services that
> > users host themselves), so we either have to perpetuate the
> > reflection-based implementation in Flink or enable keepPreparedTxn=true
> > without 2PC."
> >
> > Hmm, if the admin disables 2PC, there is likely a reason behind that. I
> am
> > not sure that we should provide an API to encourage the application to
> > circumvent that.
> >
> > 32. Ok. That's the kafka metric. In that case, the metric name has a
> group
> > and a name. There is no type and no package name.
> >
> > Jun
> >
> >
> > On Thu, Feb 15, 2024 at 8:23 PM Artem Livshits
> > <alivsh...@confluent.io.invalid> wrote:
> >
> > > Hi Jun,
> > >
> > > Thank you for your questions.
> > >
> > > > 20. So to abort a prepared transaction after the producer start, we
> > could
> > > use ...
> > >
> > > I agree, initTransaction(true) + abort would accomplish the behavior of
> > > initTransactions(false), so we could technically have fewer ways to
> > achieve
> > > the same thing, which is generally valuable.  I wonder, though, if that
> > > would be intuitive from the application perspective.  Say if an
> > application
> > > currently uses initTransactions() to achieve the current semantics, it
> > > would need to be rewritten to use initTransactions() + abort to achieve
> > the
> > > same semantics if the config is changed.  I think this could create
> > > subtle confusion, as the config change is generally decoupled from
> > changing
> > > application implementation.
> > >
> > > >  The use case mentioned for keepPreparedTxn=true without 2PC doesn't
> > seem
> > > very important
> > >
> > > I agree, it's not a strict requirement.  It is, however, a missing
> option
> > > in the public API, so currently Flink has to use reflection to emulate
> > this
> > > functionality without 2PC support.   Even when KIP-939 is implemented,
> > > there would be situations when 2PC is disabled by the admin (e.g. Kafka
> > > service providers may be reluctant to enable 2PC for Flink services
> that
> > > users host themselves), so we either have to perpetuate the
> > > reflection-based implementation in Flink or enable keepPreparedTxn=true
> > > without 2PC.
> > >
> > > > 32.
> > >
> > >
> >
> kafka.server:type=transaction-coordinator-metrics,name=active-transaction-open-time-max
> > >
> > > I just followed the existing metric implementation example
> > >
> > >
> >
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala#L95
> > > ,
> > > which maps to
> > >
> > >
> >
> kafka.server:type=transaction-coordinator-metrics,name=partition-load-time-max.
> > >
> > > > 33. "If the value is 'true' then the corresponding field is set
> > >
> > > That's correct.  Updated the KIP.
> > >
> > > -Artem
> > >
> > > On Wed, Feb 7, 2024 at 10:06 AM Jun Rao <j...@confluent.io.invalid>
> > wrote:
> > >
> > > > Hi, Artem,
> > > >
> > > > Thanks for the reply.
> > > >
> > > > 20. So to abort a prepared transaction after producer start, we could
> > use
> > > > either
> > > >   producer.initTransactions(false)
> > > > or
> > > >   producer.initTransactions(true)
> > > >   producer.abortTransaction
> > > > Could we just always use the latter API? If we do this, we could
> > > > potentially eliminate the keepPreparedTxn flag in initTransactions().
> > > After
> > > > the initTransactions() call, the outstanding txn is always preserved
> if
> > > 2pc
> > > > is enabled and aborted if 2pc is disabled. The use case mentioned for
> > > > keepPreparedTxn=true without 2PC doesn't seem very important. If we
> > could
> > > > do that, it seems that we have (1) less redundant and simpler APIs;
> (2)
> > > > more symmetric syntax for aborting/committing a prepared txn after
> > > producer
> > > > restart.
> > > >
> > > > 32.
> > > >
> > > >
> > >
> >
> kafka.server:type=transaction-coordinator-metrics,name=active-transaction-open-time-max
> > > > Is this a Yammer or kafka metric? The former uses the camel case for
> > name
> > > > and type. The latter uses the hyphen notation, but doesn't have the
> > type
> > > > attribute.
> > > >
> > > > 33. "If the value is 'true' then the corresponding field is set in
> the
> > > > InitProducerIdRequest and the KafkaProducer object is set into a
> state
> > > > which only allows calling .commitTransaction or .abortTransaction."
> > > > We should also allow .completeTransaction, right?
> > > >
> > > > Jun
> > > >
> > > >
> > > > On Tue, Feb 6, 2024 at 3:29 PM Artem Livshits
> > > > <alivsh...@confluent.io.invalid> wrote:
> > > >
> > > > > Hi Jun,
> > > > >
> > > > > > 20. For Flink usage, it seems that the APIs used to abort and
> > commit
> > > a
> > > > > prepared txn are not symmetric.
> > > > >
> > > > > For Flink it is expected that Flink would call .commitTransaction
> or
> > > > > .abortTransaction directly, it wouldn't need to deal with
> > > > PreparedTxnState,
> > > > > the outcome is actually determined by the Flink's job manager, not
> by
> > > > > comparison of PreparedTxnState.  So for Flink, if the Kafka sync
> > > crashes
> > > > > and restarts there are 2 cases:
> > > > >
> > > > > 1. Transaction is not prepared.  In that case just call
> > > > > producer.initTransactions(false) and then can start transactions as
> > > > needed.
> > > > > 2. Transaction is prepared.  In that case call
> > > > > producer.initTransactions(true) and wait for the decision from the
> > job
> > > > > manager.  Note that it's not given that the transaction will get
> > > > committed,
> > > > > the decision could also be an abort.
> > > > >
> > > > >  > 21. transaction.max.timeout.ms could in theory be MAX_INT.
> > Perhaps
> > > we
> > > > > could use a negative timeout in the record to indicate 2PC?
> > > > >
> > > > > -1 sounds good, updated.
> > > > >
> > > > > > 30. The KIP has two different APIs to abort an ongoing txn. Do we
> > > need
> > > > > both?
> > > > >
> > > > > I think of producer.initTransactions() to be an implementation for
> > > > > adminClient.forceTerminateTransaction(transactionalId).
> > > > >
> > > > > > 31. "This would flush all the pending messages and transition the
> > > > > producer
> > > > >
> > > > > Updated the KIP to clarify that IllegalStateException will be
> thrown.
> > > > >
> > > > > -Artem
> > > > >
> > > > >
> > > > > On Mon, Feb 5, 2024 at 2:22 PM Jun Rao <j...@confluent.io.invalid>
> > > wrote:
> > > > >
> > > > > > Hi, Artem,
> > > > > >
> > > > > > Thanks for the reply.
> > > > > >
> > > > > > 20. For Flink usage, it seems that the APIs used to abort and
> > commit
> > > a
> > > > > > prepared txn are not symmetric.
> > > > > > To abort, the app will just call
> > > > > >   producer.initTransactions(false)
> > > > > >
> > > > > > To commit, the app needs to call
> > > > > >   producer.initTransactions(true)
> > > > > >   producer.completeTransaction(preparedTxnState)
> > > > > >
> > > > > > Will this be a concern? For the dual-writer usage, both
> > abort/commit
> > > > use
> > > > > > the same API.
> > > > > >
> > > > > > 21. transaction.max.timeout.ms could in theory be MAX_INT.
> Perhaps
> > > we
> > > > > > could
> > > > > > use a negative timeout in the record to indicate 2PC?
> > > > > >
> > > > > > 30. The KIP has two different APIs to abort an ongoing txn. Do we
> > > need
> > > > > > both?
> > > > > >   producer.initTransactions(false)
> > > > > >   adminClient.forceTerminateTransaction(transactionalId)
> > > > > >
> > > > > > 31. "This would flush all the pending messages and transition the
> > > > > producer
> > > > > > into a mode where only .commitTransaction, .abortTransaction, or
> > > > > > .completeTransaction could be called.  If the call is successful
> > (all
> > > > > > messages successfully got flushed to all partitions) the
> > transaction
> > > is
> > > > > > prepared."
> > > > > >  If the producer calls send() in that state, what exception will
> > the
> > > > > caller
> > > > > > receive?
> > > > > >
> > > > > > Jun
> > > > > >
> > > > > >
> > > > > > On Fri, Feb 2, 2024 at 3:34 PM Artem Livshits
> > > > > > <alivsh...@confluent.io.invalid> wrote:
> > > > > >
> > > > > > > Hi Jun,
> > > > > > >
> > > > > > > >  Then, should we change the following in the example to use
> > > > > > > InitProducerId(true) instead?
> > > > > > >
> > > > > > > We could. I just thought that it's good to make the example
> > > > > > self-contained
> > > > > > > by starting from a clean state.
> > > > > > >
> > > > > > > > Also, could Flink just follow the dual-write recipe?
> > > > > > >
> > > > > > > I think it would bring some unnecessary logic to Flink (or any
> > > other
> > > > > > system
> > > > > > > that already has a transaction coordinator and just wants to
> > drive
> > > > > Kafka
> > > > > > to
> > > > > > > the desired state).  We could discuss it with Flink folks, the
> > > > current
> > > > > > > proposal was developed in collaboration with them.
> > > > > > >
> > > > > > > > 21. Could a non 2pc user explicitly set the
> > TransactionTimeoutMs
> > > to
> > > > > > > Integer.MAX_VALUE?
> > > > > > >
> > > > > > > The server would reject this for regular transactions, it only
> > > > accepts
> > > > > > > values that are <= *transaction.max.timeout.ms
> > > > > > > <http://transaction.max.timeout.ms> *(a broker config).
> > > > > > >
> > > > > > > > 24. Hmm, In KIP-890, without 2pc, the coordinator expects the
> > > > endTxn
> > > > > > > request to use the ongoing pid. ...
> > > > > > >
> > > > > > > Without 2PC there is no case where the pid could change between
> > > > > starting
> > > > > > a
> > > > > > > transaction and endTxn (InitProducerId would abort any ongoing
> > > > > > > transaction).  WIth 2PC there is now a case where there could
> be
> > > > > > > InitProducerId that can change the pid without aborting the
> > > > > transaction,
> > > > > > so
> > > > > > > we need to handle that.  I wouldn't say that the flow is
> > different,
> > > > but
> > > > > > > it's rather extended to handle new cases.  The main principle
> is
> > > > still
> > > > > > the
> > > > > > > same -- for all operations we use the latest "operational" pid
> > and
> > > > > epoch
> > > > > > > known to the client, this way we guarantee that we can fence
> > > zombie /
> > > > > > split
> > > > > > > brain clients by disrupting the "latest known" pid + epoch
> > > > progression.
> > > > > > >
> > > > > > > > 25. "We send out markers using the original ongoing
> transaction
> > > > > > > ProducerId and ProducerEpoch" ...
> > > > > > >
> > > > > > > Updated.
> > > > > > >
> > > > > > > -Artem
> > > > > > >
> > > > > > > On Mon, Jan 29, 2024 at 4:57 PM Jun Rao
> <j...@confluent.io.invalid
> > >
> > > > > > wrote:
> > > > > > >
> > > > > > > > Hi, Artem,
> > > > > > > >
> > > > > > > > Thanks for the reply.
> > > > > > > >
> > > > > > > > 20. So for the dual-write recipe, we should always call
> > > > > > > > InitProducerId(keepPreparedTxn=true) from the producer? Then,
> > > > should
> > > > > we
> > > > > > > > change the following in the example to use
> InitProducerId(true)
> > > > > > instead?
> > > > > > > > 1. InitProducerId(false); TC STATE: Empty, ProducerId=42,
> > > > > > > > ProducerEpoch=MAX-1, PrevProducerId=-1, NextProducerId=-1,
> > > > > > > > NextProducerEpoch=-1; RESPONSE ProducerId=42, Epoch=MAX-1,
> > > > > > > > OngoingTxnProducerId=-1, OngoingTxnEpoch=-1.
> > > > > > > > Also, could Flink just follow the dual-write recipe? It's
> > simpler
> > > > if
> > > > > > > there
> > > > > > > > is one way to solve the 2pc issue.
> > > > > > > >
> > > > > > > > 21. Could a non 2pc user explicitly set the
> > TransactionTimeoutMs
> > > to
> > > > > > > > Integer.MAX_VALUE?
> > > > > > > >
> > > > > > > > 24. Hmm, In KIP-890, without 2pc, the coordinator expects the
> > > > endTxn
> > > > > > > > request to use the ongoing pid. With 2pc, the coordinator now
> > > > expects
> > > > > > the
> > > > > > > > endTxn request to use the next pid. So, the flow is
> different,
> > > > right?
> > > > > > > >
> > > > > > > > 25. "We send out markers using the original ongoing
> transaction
> > > > > > > ProducerId
> > > > > > > > and ProducerEpoch"
> > > > > > > > We should use ProducerEpoch + 1 in the marker, right?
> > > > > > > >
> > > > > > > > Jun
> > > > > > > >
> > > > > > > > On Fri, Jan 26, 2024 at 8:35 PM Artem Livshits
> > > > > > > > <alivsh...@confluent.io.invalid> wrote:
> > > > > > > >
> > > > > > > > > Hi Jun,
> > > > > > > > >
> > > > > > > > > > 20.  I am a bit confused by how we set keepPreparedTxn.
> > ...
> > > > > > > > >
> > > > > > > > > keepPreparedTxn=true informs the transaction coordinator
> that
> > > it
> > > > > > should
> > > > > > > > > keep the ongoing transaction, if any.  If the
> > > > > keepPreparedTxn=false,
> > > > > > > then
> > > > > > > > > any ongoing transaction is aborted (this is exactly the
> > current
> > > > > > > > behavior).
> > > > > > > > > enable2Pc is a separate argument that is controlled by the
> > > > > > > > > *transaction.two.phase.commit.enable *setting on the
> client.
> > > > > > > > >
> > > > > > > > > To start 2PC, the client just needs to set
> > > > > > > > > *transaction.two.phase.commit.enable*=true in the config.
> > Then
> > > > if
> > > > > > the
> > > > > > > > > client knows the status of the transaction upfront (in the
> > case
> > > > of
> > > > > > > Flink,
> > > > > > > > > Flink keeps the knowledge if the transaction is prepared in
> > its
> > > > own
> > > > > > > > store,
> > > > > > > > > so it always knows upfront), it can set keepPreparedTxn
> > > > > accordingly,
> > > > > > > then
> > > > > > > > > if the transaction was prepared, it'll be ready for the
> > client
> > > to
> > > > > > > > complete
> > > > > > > > > the appropriate action; if the client doesn't have a
> > knowledge
> > > > that
> > > > > > the
> > > > > > > > > transaction is prepared, keepPreparedTxn is going to be
> > false,
> > > in
> > > > > > which
> > > > > > > > > case we'll get to a clean state (the same way we do today).
> > > > > > > > >
> > > > > > > > > For the dual-write recipe, the client doesn't know upfront
> if
> > > the
> > > > > > > > > transaction is prepared, this information is implicitly
> > encoded
> > > > > > > > > PreparedTxnState value that can be used to resolve the
> > > > transaction
> > > > > > > state.
> > > > > > > > > In that case, keepPreparedTxn should always be true,
> because
> > we
> > > > > don't
> > > > > > > > know
> > > > > > > > > upfront and we don't want to accidentally abort a committed
> > > > > > > transaction.
> > > > > > > > >
> > > > > > > > > The forceTerminateTransaction call can just use
> > > > > > keepPreparedTxn=false,
> > > > > > > it
> > > > > > > > > actually doesn't matter if it sets Enable2Pc flag.
> > > > > > > > >
> > > > > > > > > > 21. TransactionLogValue: Do we need some field to
> identify
> > > > > whether
> > > > > > > this
> > > > > > > > > is written for 2PC so that ongoing txn is never auto
> aborted?
> > > > > > > > >
> > > > > > > > > The TransactionTimeoutMs would be set to Integer.MAX_VALUE
> if
> > > 2PC
> > > > > was
> > > > > > > > > enabled.  I've added a note to the KIP about this.
> > > > > > > > >
> > > > > > > > > > 22
> > > > > > > > >
> > > > > > > > > You're right it's a typo.  I fixed it as well as step 9
> > > (REQUEST:
> > > > > > > > > ProducerId=73, ProducerEpoch=MAX).
> > > > > > > > >
> > > > > > > > > > 23. It's a bit weird that Enable2Pc is driven by a config
> > > while
> > > > > > > > > KeepPreparedTxn is from an API param ...
> > > > > > > > >
> > > > > > > > > The intent to use 2PC doesn't change from transaction to
> > > > > transaction,
> > > > > > > but
> > > > > > > > > the intent to keep prepared txn may change from transaction
> > to
> > > > > > > > > transaction.  In dual-write recipes the distinction is not
> > > clear,
> > > > > but
> > > > > > > for
> > > > > > > > > use cases where keepPreparedTxn value is known upfront
> (e.g.
> > > > Flink)
> > > > > > > it's
> > > > > > > > > more prominent.  E.g. a Flink's Kafka sink operator could
> be
> > > > > deployed
> > > > > > > > with
> > > > > > > > > *transaction.two.phase.commit.enable*=true hardcoded in the
> > > > image,
> > > > > > but
> > > > > > > > > keepPreparedTxn cannot be hardcoded in the image, because
> it
> > > > > depends
> > > > > > on
> > > > > > > > the
> > > > > > > > > job manager's state.
> > > > > > > > >
> > > > > > > > > > 24
> > > > > > > > >
> > > > > > > > > The flow is actually going to be the same way as it is now
> --
> > > the
> > > > > > > "main"
> > > > > > > > > producer id + epoch needs to be used in all operations to
> > > prevent
> > > > > > > fencing
> > > > > > > > > (it's sort of a common "header" in all RPC calls that
> follow
> > > the
> > > > > same
> > > > > > > > > rules).  The ongoing txn info is just additional info for
> > > making
> > > > a
> > > > > > > > commit /
> > > > > > > > > abort decision based on the PreparedTxnState from the DB.
> > > > > > > > >
> > > > > > > > > --Artem
> > > > > > > > >
> > > > > > > > > On Thu, Jan 25, 2024 at 11:05 AM Jun Rao
> > > > <j...@confluent.io.invalid
> > > > > >
> > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi, Artem,
> > > > > > > > > >
> > > > > > > > > > Thanks for the reply. A few more comments.
> > > > > > > > > >
> > > > > > > > > > 20. I am a bit confused by how we set keepPreparedTxn.
> From
> > > the
> > > > > > KIP,
> > > > > > > I
> > > > > > > > > got
> > > > > > > > > > the following (1) to start 2pc, we call
> > > > > > > > > > InitProducerId(keepPreparedTxn=false); (2) when the
> > producer
> > > > > fails
> > > > > > > and
> > > > > > > > > > needs to do recovery, it calls
> > > > > > InitProducerId(keepPreparedTxn=true);
> > > > > > > > (3)
> > > > > > > > > > Admin.forceTerminateTransaction() calls
> > > > > > > > > > InitProducerId(keepPreparedTxn=false).
> > > > > > > > > > 20.1 In (1), when a producer calls InitProducerId(false)
> > with
> > > > 2pc
> > > > > > > > > enabled,
> > > > > > > > > > and there is an ongoing txn, should the server return an
> > > error
> > > > to
> > > > > > the
> > > > > > > > > > InitProducerId request? If so, what would be the error
> > code?
> > > > > > > > > > 20.2 How do we distinguish between (1) and (3)? It's the
> > same
> > > > API
> > > > > > > call
> > > > > > > > > but
> > > > > > > > > > (1) doesn't abort ongoing txn and (2) does.
> > > > > > > > > > 20.3 The usage in (1) seems unintuitive. 2pc implies
> > keeping
> > > > the
> > > > > > > > ongoing
> > > > > > > > > > txn. So, setting keepPreparedTxn to false to start 2pc
> > seems
> > > > > > counter
> > > > > > > > > > intuitive.
> > > > > > > > > >
> > > > > > > > > > 21. TransactionLogValue: Do we need some field to
> identify
> > > > > whether
> > > > > > > this
> > > > > > > > > is
> > > > > > > > > > written for 2PC so that ongoing txn is never auto
> aborted?
> > > > > > > > > >
> > > > > > > > > > 22. "8. InitProducerId(true); TC STATE: Ongoing,
> > > ProducerId=42,
> > > > > > > > > > ProducerEpoch=MAX-1, PrevProducerId=-1,
> NextProducerId=73,
> > > > > > > > > > NextProducerEpoch=MAX; RESPONSE ProducerId=73,
> Epoch=MAX-1,
> > > > > > > > > > OngoingTxnProducerId=42, OngoingTxnEpoch=MAX-1"
> > > > > > > > > > It seems in the above example, Epoch in RESPONSE should
> be
> > > MAX
> > > > to
> > > > > > > match
> > > > > > > > > > NextProducerEpoch?
> > > > > > > > > >
> > > > > > > > > > 23. It's a bit weird that Enable2Pc is driven by a config
> > > > > > > > > > while KeepPreparedTxn is from an API param. Should we
> make
> > > them
> > > > > > more
> > > > > > > > > > consistent since they seem related?
> > > > > > > > > >
> > > > > > > > > > 24. "9. Commit; REQUEST: ProducerId=73,
> > ProducerEpoch=MAX-1;
> > > TC
> > > > > > > STATE:
> > > > > > > > > > PrepareCommit, ProducerId=42, ProducerEpoch=MAX,
> > > > > PrevProducerId=73,
> > > > > > > > > > NextProducerId=85, NextProducerEpoch=0; RESPONSE
> > > ProducerId=85,
> > > > > > > > Epoch=0,
> > > > > > > > > > When a commit request is sent, it uses the latest
> > ProducerId
> > > > and
> > > > > > > > > > ProducerEpoch."
> > > > > > > > > > The step where we use the next produceId to commit an old
> > txn
> > > > > > works,
> > > > > > > > but
> > > > > > > > > > can be confusing. It's going to be hard for people
> > > implementing
> > > > > > this
> > > > > > > > new
> > > > > > > > > > client protocol to figure out when to use the current or
> > the
> > > > new
> > > > > > > > > producerId
> > > > > > > > > > in the EndTxnRequest. One potential way to improve this
> is
> > to
> > > > > > extend
> > > > > > > > > > EndTxnRequest with a new field like
> expectedNextProducerId.
> > > > Then
> > > > > we
> > > > > > > can
> > > > > > > > > > always use the old produceId in the existing field, but
> set
> > > > > > > > > > expectedNextProducerId to bypass the fencing logic when
> > > needed.
> > > > > > > > > >
> > > > > > > > > > Thanks,
> > > > > > > > > >
> > > > > > > > > > Jun
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Mon, Dec 18, 2023 at 2:06 PM Artem Livshits
> > > > > > > > > > <alivsh...@confluent.io.invalid> wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi Jun,
> > > > > > > > > > >
> > > > > > > > > > > Thank you for the comments.
> > > > > > > > > > >
> > > > > > > > > > > > 10. For the two new fields in Enable2Pc and
> > > KeepPreparedTxn
> > > > > ...
> > > > > > > > > > >
> > > > > > > > > > > I added a note that all combinations are valid.
> > > > > Enable2Pc=false
> > > > > > &
> > > > > > > > > > > KeepPreparedTxn=true could be potentially useful for
> > > backward
> > > > > > > > > > compatibility
> > > > > > > > > > > with Flink, when the new version of Flink that
> implements
> > > > > KIP-319
> > > > > > > > tries
> > > > > > > > > > to
> > > > > > > > > > > work with a cluster that doesn't authorize 2PC.
> > > > > > > > > > >
> > > > > > > > > > > > 11.  InitProducerIdResponse: If there is no ongoing
> > txn,
> > > > what
> > > > > > > will
> > > > > > > > > > > OngoingTxnProducerId and OngoingTxnEpoch be set?
> > > > > > > > > > >
> > > > > > > > > > > I added a note that they will be set to -1.  The client
> > > then
> > > > > will
> > > > > > > > know
> > > > > > > > > > that
> > > > > > > > > > > there is no ongoing txn and .completeTransaction
> becomes
> > a
> > > > > no-op
> > > > > > > (but
> > > > > > > > > > still
> > > > > > > > > > > required before .send is enabled).
> > > > > > > > > > >
> > > > > > > > > > > > 12. ListTransactionsRequest related changes: It seems
> > > those
> > > > > are
> > > > > > > > > already
> > > > > > > > > > > covered by KIP-994?
> > > > > > > > > > >
> > > > > > > > > > > Removed from this KIP.
> > > > > > > > > > >
> > > > > > > > > > > > 13. TransactionalLogValue ...
> > > > > > > > > > >
> > > > > > > > > > > This is now updated to work on top of KIP-890.
> > > > > > > > > > >
> > > > > > > > > > > > 14. "Note that the (producerId, epoch) pair that
> > > > corresponds
> > > > > to
> > > > > > > the
> > > > > > > > > > > ongoing transaction ...
> > > > > > > > > > >
> > > > > > > > > > > This is now updated to work on top of KIP-890.
> > > > > > > > > > >
> > > > > > > > > > > > 15. active-transaction-total-time-max : ...
> > > > > > > > > > >
> > > > > > > > > > > Updated.
> > > > > > > > > > >
> > > > > > > > > > > > 16. "transaction.two.phase.commit.enable The default
> > > would
> > > > be
> > > > > > > > > ‘false’.
> > > > > > > > > > > If it’s ‘false’, 2PC functionality is disabled even if
> > the
> > > > ACL
> > > > > is
> > > > > > > set
> > > > > > > > > ...
> > > > > > > > > > >
> > > > > > > > > > > Disabling 2PC effectively removes all authorization to
> > use
> > > > it,
> > > > > > > hence
> > > > > > > > I
> > > > > > > > > > > thought TRANSACTIONAL_ID_AUTHORIZATION_FAILED would be
> > > > > > appropriate.
> > > > > > > > > > >
> > > > > > > > > > > Do you suggest using a different error code for 2PC
> > > > > authorization
> > > > > > > vs
> > > > > > > > > some
> > > > > > > > > > > other authorization (e.g.
> > > > > > > TRANSACTIONAL_ID_2PC_AUTHORIZATION_FAILED)
> > > > > > > > > or a
> > > > > > > > > > > different code for disabled vs. unauthorised (e.g.
> > > > > > > > > > > TWO_PHASE_COMMIT_DISABLED) or both?
> > > > > > > > > > >
> > > > > > > > > > > > 17. completeTransaction(). We expect this to be only
> > used
> > > > > > during
> > > > > > > > > > > recovery.
> > > > > > > > > > >
> > > > > > > > > > > It can also be used if, say, a commit to the database
> > fails
> > > > and
> > > > > > the
> > > > > > > > > > result
> > > > > > > > > > > is inconclusive, e.g.
> > > > > > > > > > >
> > > > > > > > > > > 1. Begin DB transaction
> > > > > > > > > > > 2. Begin Kafka transaction
> > > > > > > > > > > 3. Prepare Kafka transaction
> > > > > > > > > > > 4. Commit DB transaction
> > > > > > > > > > > 5. The DB commit fails, figure out the state of the
> > > > transaction
> > > > > > by
> > > > > > > > > > reading
> > > > > > > > > > > the PreparedTxnState from DB
> > > > > > > > > > > 6. Complete Kafka transaction with the
> PreparedTxnState.
> > > > > > > > > > >
> > > > > > > > > > > > 18. "either prepareTransaction was called or
> > > > > > > initTransaction(true)
> > > > > > > > > was
> > > > > > > > > > > called": "either" should be "neither"?
> > > > > > > > > > >
> > > > > > > > > > > Updated.
> > > > > > > > > > >
> > > > > > > > > > > > 19. Since InitProducerId always bumps up the epoch,
> it
> > > > > creates
> > > > > > a
> > > > > > > > > > > situation ...
> > > > > > > > > > >
> > > > > > > > > > > InitProducerId only bumps the producer epoch, the
> ongoing
> > > > > > > transaction
> > > > > > > > > > epoch
> > > > > > > > > > > stays the same, no matter how many times the
> > InitProducerId
> > > > is
> > > > > > > called
> > > > > > > > > > > before the transaction is completed.  Eventually the
> > epoch
> > > > may
> > > > > > > > > overflow,
> > > > > > > > > > > and then a new producer id would be allocated, but the
> > > > ongoing
> > > > > > > > > > transaction
> > > > > > > > > > > producer id would stay the same.
> > > > > > > > > > >
> > > > > > > > > > > I've added a couple examples in the KIP (
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-939%3A+Support+Participation+in+2PC#KIP939:SupportParticipationin2PC-PersistedDataFormatChanges
> > > > > > > > > > > )
> > > > > > > > > > > that walk through some scenarios and show how the state
> > is
> > > > > > changed.
> > > > > > > > > > >
> > > > > > > > > > > -Artem
> > > > > > > > > > >
> > > > > > > > > > > On Fri, Dec 8, 2023 at 6:04 PM Jun Rao
> > > > > <j...@confluent.io.invalid
> > > > > > >
> > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Hi, Artem,
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks for the KIP. A few comments below.
> > > > > > > > > > > >
> > > > > > > > > > > > 10. For the two new fields in Enable2Pc and
> > > KeepPreparedTxn
> > > > > in
> > > > > > > > > > > > InitProducerId, it would be useful to document a bit
> > more
> > > > > > detail
> > > > > > > on
> > > > > > > > > > what
> > > > > > > > > > > > values are set under what cases. For example, are all
> > > four
> > > > > > > > > combinations
> > > > > > > > > > > > valid?
> > > > > > > > > > > >
> > > > > > > > > > > > 11.  InitProducerIdResponse: If there is no ongoing
> > txn,
> > > > what
> > > > > > > will
> > > > > > > > > > > > OngoingTxnProducerId and OngoingTxnEpoch be set?
> > > > > > > > > > > >
> > > > > > > > > > > > 12. ListTransactionsRequest related changes: It seems
> > > those
> > > > > are
> > > > > > > > > already
> > > > > > > > > > > > covered by KIP-994?
> > > > > > > > > > > >
> > > > > > > > > > > > 13. TransactionalLogValue: Could we name
> > > > > TransactionProducerId
> > > > > > > and
> > > > > > > > > > > > ProducerId better? It's not clear from the name which
> > is
> > > > for
> > > > > > > which.
> > > > > > > > > > > >
> > > > > > > > > > > > 14. "Note that the (producerId, epoch) pair that
> > > > corresponds
> > > > > to
> > > > > > > the
> > > > > > > > > > > ongoing
> > > > > > > > > > > > transaction is going to be written instead of the
> > > existing
> > > > > > > > ProducerId
> > > > > > > > > > and
> > > > > > > > > > > > ProducerEpoch fields (which are renamed to reflect
> the
> > > > > > semantics)
> > > > > > > > to
> > > > > > > > > > > > support downgrade.": I am a bit confused on that. Are
> > we
> > > > > > writing
> > > > > > > > > > > different
> > > > > > > > > > > > values to the existing fields? Then, we can't
> > downgrade,
> > > > > right?
> > > > > > > > > > > >
> > > > > > > > > > > > 15. active-transaction-total-time-max : Would
> > > > > > > > > > > > active-transaction-open-time-max be more intuitive?
> > Also,
> > > > > could
> > > > > > > we
> > > > > > > > > > > include
> > > > > > > > > > > > the full name (group, tags, etc)?
> > > > > > > > > > > >
> > > > > > > > > > > > 16. "transaction.two.phase.commit.enable The default
> > > would
> > > > be
> > > > > > > > > ‘false’.
> > > > > > > > > > > If
> > > > > > > > > > > > it’s ‘false’, 2PC functionality is disabled even if
> the
> > > ACL
> > > > > is
> > > > > > > set,
> > > > > > > > > > > clients
> > > > > > > > > > > > that attempt to use this functionality would receive
> > > > > > > > > > > > TRANSACTIONAL_ID_AUTHORIZATION_FAILED error."
> > > > > > > > > > > > TRANSACTIONAL_ID_AUTHORIZATION_FAILED seems
> unintuitive
> > > for
> > > > > the
> > > > > > > > > client
> > > > > > > > > > to
> > > > > > > > > > > > understand what the actual cause is.
> > > > > > > > > > > >
> > > > > > > > > > > > 17. completeTransaction(). We expect this to be only
> > used
> > > > > > during
> > > > > > > > > > > recovery.
> > > > > > > > > > > > Could we document this clearly? Could we prevent it
> > from
> > > > > being
> > > > > > > used
> > > > > > > > > > > > incorrectly (e.g. throw an exception if the producer
> > has
> > > > > called
> > > > > > > > other
> > > > > > > > > > > > methods like send())?
> > > > > > > > > > > >
> > > > > > > > > > > > 18. "either prepareTransaction was called or
> > > > > > > initTransaction(true)
> > > > > > > > > was
> > > > > > > > > > > > called": "either" should be "neither"?
> > > > > > > > > > > >
> > > > > > > > > > > > 19. Since InitProducerId always bumps up the epoch,
> it
> > > > > creates
> > > > > > a
> > > > > > > > > > > situation
> > > > > > > > > > > > where there could be multiple outstanding txns. The
> > > > following
> > > > > > is
> > > > > > > an
> > > > > > > > > > > example
> > > > > > > > > > > > of a potential problem during recovery.
> > > > > > > > > > > >    The last txn epoch in the external store is 41
> when
> > > the
> > > > > app
> > > > > > > > dies.
> > > > > > > > > > > >    Instance1 is created for recovery.
> > > > > > > > > > > >      1. (instance1)
> > InitProducerId(keepPreparedTxn=true),
> > > > > > > epoch=42,
> > > > > > > > > > > > ongoingEpoch=41
> > > > > > > > > > > >      2. (instance1) dies before completeTxn(41) can
> be
> > > > > called.
> > > > > > > > > > > >    Instance2 is created for recovery.
> > > > > > > > > > > >      3. (instance2)
> > InitProducerId(keepPreparedTxn=true),
> > > > > > > epoch=43,
> > > > > > > > > > > > ongoingEpoch=42
> > > > > > > > > > > >      4. (instance2) completeTxn(41) => abort
> > > > > > > > > > > >    The first problem is that 41 now is aborted when
> it
> > > > should
> > > > > > be
> > > > > > > > > > > committed.
> > > > > > > > > > > > The second one is that it's not clear who could abort
> > > epoch
> > > > > 42,
> > > > > > > > which
> > > > > > > > > > is
> > > > > > > > > > > > still open.
> > > > > > > > > > > >
> > > > > > > > > > > > Jun
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > On Thu, Dec 7, 2023 at 2:43 PM Justine Olshan
> > > > > > > > > > > <jols...@confluent.io.invalid
> > > > > > > > > > > > >
> > > > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > Hey Artem,
> > > > > > > > > > > > >
> > > > > > > > > > > > > Thanks for the updates. I think what you say makes
> > > > sense. I
> > > > > > > just
> > > > > > > > > > > updated
> > > > > > > > > > > > my
> > > > > > > > > > > > > KIP so I want to reconcile some of the changes we
> > made
> > > > > > > especially
> > > > > > > > > > with
> > > > > > > > > > > > > respect to the TransactionLogValue.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Firstly, I believe tagged fields require a default
> > > value
> > > > so
> > > > > > > that
> > > > > > > > if
> > > > > > > > > > > they
> > > > > > > > > > > > > are not filled, we return the default (and know
> that
> > > they
> > > > > > were
> > > > > > > > > > empty).
> > > > > > > > > > > > For
> > > > > > > > > > > > > my KIP, I proposed the default for producer ID
> tagged
> > > > > fields
> > > > > > > > should
> > > > > > > > > > be
> > > > > > > > > > > > -1.
> > > > > > > > > > > > > I was wondering if we could update the KIP to
> include
> > > the
> > > > > > > default
> > > > > > > > > > > values
> > > > > > > > > > > > > for producer ID and epoch.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Next, I noticed we decided to rename the fields. I
> > > guess
> > > > > that
> > > > > > > the
> > > > > > > > > > field
> > > > > > > > > > > > > "NextProducerId" in my KIP correlates to
> "ProducerId"
> > > in
> > > > > this
> > > > > > > > KIP.
> > > > > > > > > Is
> > > > > > > > > > > > that
> > > > > > > > > > > > > correct? So we would have "TransactionProducerId"
> for
> > > the
> > > > > > > > > non-tagged
> > > > > > > > > > > > field
> > > > > > > > > > > > > and have "ProducerId" (NextProducerId) and
> > > > "PrevProducerId"
> > > > > > as
> > > > > > > > > tagged
> > > > > > > > > > > > > fields the final version after KIP-890 and KIP-936
> > are
> > > > > > > > implemented.
> > > > > > > > > > Is
> > > > > > > > > > > > this
> > > > > > > > > > > > > correct? I think the tags will need updating, but
> > that
> > > is
> > > > > > > > trivial.
> > > > > > > > > > > > >
> > > > > > > > > > > > > The final question I had was with respect to
> storing
> > > the
> > > > > new
> > > > > > > > epoch.
> > > > > > > > > > In
> > > > > > > > > > > > > KIP-890 part 2 (epoch bumps) I think we concluded
> > that
> > > we
> > > > > > don't
> > > > > > > > > need
> > > > > > > > > > to
> > > > > > > > > > > > > store the epoch since we can interpret the previous
> > > epoch
> > > > > > based
> > > > > > > > on
> > > > > > > > > > the
> > > > > > > > > > > > > producer ID. But here we could call the
> > InitProducerId
> > > > > > multiple
> > > > > > > > > times
> > > > > > > > > > > and
> > > > > > > > > > > > > we only want the producer with the correct epoch to
> > be
> > > > able
> > > > > > to
> > > > > > > > > commit
> > > > > > > > > > > the
> > > > > > > > > > > > > transaction. Is that the correct reasoning for why
> we
> > > > need
> > > > > > > epoch
> > > > > > > > > here
> > > > > > > > > > > but
> > > > > > > > > > > > > not the Prepare/Commit state.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > Justine
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Wed, Nov 22, 2023 at 9:48 AM Artem Livshits
> > > > > > > > > > > > > <alivsh...@confluent.io.invalid> wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > > Hi Justine,
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > After thinking a bit about supporting atomic dual
> > > > writes
> > > > > > for
> > > > > > > > > Kafka
> > > > > > > > > > +
> > > > > > > > > > > > > NoSQL
> > > > > > > > > > > > > > database, I came to a conclusion that we do need
> to
> > > > bump
> > > > > > the
> > > > > > > > > epoch
> > > > > > > > > > > even
> > > > > > > > > > > > > > with InitProducerId(keepPreparedTxn=true).  As I
> > > > > described
> > > > > > in
> > > > > > > > my
> > > > > > > > > > > > previous
> > > > > > > > > > > > > > email, we wouldn't need to bump the epoch to
> > protect
> > > > from
> > > > > > > > zombies
> > > > > > > > > > so
> > > > > > > > > > > > that
> > > > > > > > > > > > > > reasoning is still true.  But we cannot protect
> > from
> > > > > > > > split-brain
> > > > > > > > > > > > > scenarios
> > > > > > > > > > > > > > when two or more instances of a producer with the
> > > same
> > > > > > > > > > transactional
> > > > > > > > > > > id
> > > > > > > > > > > > > try
> > > > > > > > > > > > > > to produce at the same time.  The dual-write
> > example
> > > > for
> > > > > > SQL
> > > > > > > > > > > databases
> > > > > > > > > > > > (
> > > > > > > > > > > > > > https://github.com/apache/kafka/pull/14231/files
> )
> > > > > doesn't
> > > > > > > > have a
> > > > > > > > > > > > > > split-brain problem because execution is
> protected
> > by
> > > > the
> > > > > > > > update
> > > > > > > > > > lock
> > > > > > > > > > > > on
> > > > > > > > > > > > > > the transaction state record; however NoSQL
> > databases
> > > > may
> > > > > > not
> > > > > > > > > have
> > > > > > > > > > > this
> > > > > > > > > > > > > > protection (I'll write an example for NoSQL
> > database
> > > > > > > dual-write
> > > > > > > > > > > soon).
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > In a nutshell, here is an example of a
> split-brain
> > > > > > scenario:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >    1. (instance1)
> > > InitProducerId(keepPreparedTxn=true),
> > > > > got
> > > > > > > > > > epoch=42
> > > > > > > > > > > > > >    2. (instance2)
> > > InitProducerId(keepPreparedTxn=true),
> > > > > got
> > > > > > > > > > epoch=42
> > > > > > > > > > > > > >    3. (instance1) CommitTxn, epoch bumped to 43
> > > > > > > > > > > > > >    4. (instance2) CommitTxn, this is considered a
> > > > retry,
> > > > > so
> > > > > > > it
> > > > > > > > > got
> > > > > > > > > > > > epoch
> > > > > > > > > > > > > 43
> > > > > > > > > > > > > >    as well
> > > > > > > > > > > > > >    5. (instance1) Produce messageA w/sequence 1
> > > > > > > > > > > > > >    6. (instance2) Produce messageB w/sequence 1,
> > this
> > > > is
> > > > > > > > > > considered a
> > > > > > > > > > > > > >    duplicate
> > > > > > > > > > > > > >    7. (instance2) Produce messageC w/sequence 2
> > > > > > > > > > > > > >    8. (instance1) Produce messageD w/sequence 2,
> > this
> > > > is
> > > > > > > > > > considered a
> > > > > > > > > > > > > >    duplicate
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Now if either of those commit the transaction, it
> > > would
> > > > > > have
> > > > > > > a
> > > > > > > > > mix
> > > > > > > > > > of
> > > > > > > > > > > > > > messages from the two instances (messageA and
> > > > messageC).
> > > > > > > With
> > > > > > > > > the
> > > > > > > > > > > > proper
> > > > > > > > > > > > > > epoch bump, instance1 would get fenced at step 3.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > In order to update epoch in
> > > > > > > > InitProducerId(keepPreparedTxn=true)
> > > > > > > > > we
> > > > > > > > > > > > need
> > > > > > > > > > > > > to
> > > > > > > > > > > > > > preserve the ongoing transaction's epoch (and
> > > > producerId,
> > > > > > if
> > > > > > > > the
> > > > > > > > > > > epoch
> > > > > > > > > > > > > > overflows), because we'd need to make a correct
> > > > decision
> > > > > > when
> > > > > > > > we
> > > > > > > > > > > > compare
> > > > > > > > > > > > > > the PreparedTxnState that we read from the
> database
> > > > with
> > > > > > the
> > > > > > > > > > > > (producerId,
> > > > > > > > > > > > > > epoch) of the ongoing transaction.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > I've updated the KIP with the following:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >    - Ongoing transaction now has 2 (producerId,
> > > epoch)
> > > > > > pairs
> > > > > > > --
> > > > > > > > > one
> > > > > > > > > > > > pair
> > > > > > > > > > > > > >    describes the ongoing transaction, the other
> > pair
> > > > > > > describes
> > > > > > > > > > > expected
> > > > > > > > > > > > > > epoch
> > > > > > > > > > > > > >    for operations on this transactional id
> > > > > > > > > > > > > >    - InitProducerIdResponse now returns 2
> > > (producerId,
> > > > > > epoch)
> > > > > > > > > pairs
> > > > > > > > > > > > > >    - TransactionalLogValue now has 2 (producerId,
> > > > epoch)
> > > > > > > pairs,
> > > > > > > > > the
> > > > > > > > > > > new
> > > > > > > > > > > > > >    values added as tagged fields, so it's easy to
> > > > > downgrade
> > > > > > > > > > > > > >    - Added a note about downgrade in the
> > > Compatibility
> > > > > > > section
> > > > > > > > > > > > > >    - Added a rejected alternative
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > -Artem
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > On Fri, Oct 6, 2023 at 5:16 PM Artem Livshits <
> > > > > > > > > > > alivsh...@confluent.io>
> > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Hi Justine,
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Thank you for the questions.  Currently
> > > (pre-KIP-939)
> > > > > we
> > > > > > > > always
> > > > > > > > > > > bump
> > > > > > > > > > > > > the
> > > > > > > > > > > > > > > epoch on InitProducerId and abort an ongoing
> > > > > transaction
> > > > > > > (if
> > > > > > > > > > > any).  I
> > > > > > > > > > > > > > > expect this behavior will continue with KIP-890
> > as
> > > > > well.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > With KIP-939 we need to support the case when
> the
> > > > > ongoing
> > > > > > > > > > > transaction
> > > > > > > > > > > > > > > needs to be preserved when
> keepPreparedTxn=true.
> > > > > Bumping
> > > > > > > > epoch
> > > > > > > > > > > > without
> > > > > > > > > > > > > > > aborting or committing a transaction is tricky
> > > > because
> > > > > > > epoch
> > > > > > > > > is a
> > > > > > > > > > > > short
> > > > > > > > > > > > > > > value and it's easy to overflow.  Currently,
> the
> > > > > overflow
> > > > > > > > case
> > > > > > > > > is
> > > > > > > > > > > > > handled
> > > > > > > > > > > > > > > by aborting the ongoing transaction, which
> would
> > > send
> > > > > out
> > > > > > > > > > > transaction
> > > > > > > > > > > > > > > markers with epoch=Short.MAX_VALUE to the
> > partition
> > > > > > > leaders,
> > > > > > > > > > which
> > > > > > > > > > > > > would
> > > > > > > > > > > > > > > fence off any messages with the producer id
> that
> > > > > started
> > > > > > > the
> > > > > > > > > > > > > transaction
> > > > > > > > > > > > > > > (they would have epoch that is less than
> > > > > > Short.MAX_VALUE).
> > > > > > > > > Then
> > > > > > > > > > it
> > > > > > > > > > > > is
> > > > > > > > > > > > > > safe
> > > > > > > > > > > > > > > to allocate a new producer id and use it in new
> > > > > > > transactions.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > We could say that maybe when
> keepPreparedTxn=true
> > > we
> > > > > bump
> > > > > > > > epoch
> > > > > > > > > > > > unless
> > > > > > > > > > > > > it
> > > > > > > > > > > > > > > leads to overflow, and don't bump epoch in the
> > > > overflow
> > > > > > > case.
> > > > > > > > > I
> > > > > > > > > > > > don't
> > > > > > > > > > > > > > > think it's a good solution because if it's not
> > safe
> > > > to
> > > > > > keep
> > > > > > > > the
> > > > > > > > > > > same
> > > > > > > > > > > > > > epoch
> > > > > > > > > > > > > > > when keepPreparedTxn=true, then we must handle
> > the
> > > > > epoch
> > > > > > > > > overflow
> > > > > > > > > > > > case
> > > > > > > > > > > > > as
> > > > > > > > > > > > > > > well.  So either we should convince ourselves
> > that
> > > > it's
> > > > > > > safe
> > > > > > > > to
> > > > > > > > > > > keep
> > > > > > > > > > > > > the
> > > > > > > > > > > > > > > epoch and do it in the general case, or we
> always
> > > > bump
> > > > > > the
> > > > > > > > > epoch
> > > > > > > > > > > and
> > > > > > > > > > > > > > handle
> > > > > > > > > > > > > > > the overflow.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > With KIP-890, we bump the epoch on every
> > > transaction
> > > > > > > commit /
> > > > > > > > > > > abort.
> > > > > > > > > > > > > > This
> > > > > > > > > > > > > > > guarantees that even if
> > > > > > > InitProducerId(keepPreparedTxn=true)
> > > > > > > > > > > doesn't
> > > > > > > > > > > > > > > increment epoch on the ongoing transaction, the
> > > > client
> > > > > > will
> > > > > > > > > have
> > > > > > > > > > to
> > > > > > > > > > > > > call
> > > > > > > > > > > > > > > commit or abort to finish the transaction and
> > will
> > > > > > > increment
> > > > > > > > > the
> > > > > > > > > > > > epoch
> > > > > > > > > > > > > > (and
> > > > > > > > > > > > > > > handle epoch overflow, if needed).  If the
> > ongoing
> > > > > > > > transaction
> > > > > > > > > > was
> > > > > > > > > > > > in a
> > > > > > > > > > > > > > bad
> > > > > > > > > > > > > > > state and had some zombies waiting to arrive,
> the
> > > > abort
> > > > > > > > > operation
> > > > > > > > > > > > would
> > > > > > > > > > > > > > > fence them because with KIP-890 every abort
> would
> > > > bump
> > > > > > the
> > > > > > > > > epoch.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > We could also look at this from the following
> > > > > > perspective.
> > > > > > > > > With
> > > > > > > > > > > > > KIP-890,
> > > > > > > > > > > > > > > zombies won't be able to cross transaction
> > > > boundaries;
> > > > > > each
> > > > > > > > > > > > transaction
> > > > > > > > > > > > > > > completion creates a boundary and any activity
> in
> > > the
> > > > > > past
> > > > > > > > gets
> > > > > > > > > > > > > confined
> > > > > > > > > > > > > > in
> > > > > > > > > > > > > > > the boundary.  Then data in any partition would
> > > look
> > > > > like
> > > > > > > > this:
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > 1. message1, epoch=42
> > > > > > > > > > > > > > > 2. message2, epoch=42
> > > > > > > > > > > > > > > 3. message3, epoch=42
> > > > > > > > > > > > > > > 4. marker (commit or abort), epoch=43
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Now if we inject steps 3a and 3b like this:
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > 1. message1, epoch=42
> > > > > > > > > > > > > > > 2. message2, epoch=42
> > > > > > > > > > > > > > > 3. message3, epoch=42
> > > > > > > > > > > > > > > 3a. crash
> > > > > > > > > > > > > > > 3b. InitProducerId(keepPreparedTxn=true)
> > > > > > > > > > > > > > > 4. marker (commit or abort), epoch=43
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > The invariant still holds even with steps 3a
> and
> > 3b
> > > > --
> > > > > > > > whatever
> > > > > > > > > > > > > activity
> > > > > > > > > > > > > > > was in the past will get confined in the past
> > with
> > > > > > > mandatory
> > > > > > > > > > abort
> > > > > > > > > > > /
> > > > > > > > > > > > > > commit
> > > > > > > > > > > > > > > that must follow
> > > > InitProducerId(keepPreparedTxn=true).
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > So KIP-890 provides the proper isolation
> between
> > > > > > > > transactions,
> > > > > > > > > so
> > > > > > > > > > > > > > > injecting crash +
> > > > InitProducerId(keepPreparedTxn=true)
> > > > > > into
> > > > > > > > the
> > > > > > > > > > > > > > > transaction sequence is safe from the zombie
> > > > protection
> > > > > > > > > > > perspective.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > That said, I'm still thinking about it and
> > looking
> > > > for
> > > > > > > cases
> > > > > > > > > that
> > > > > > > > > > > > might
> > > > > > > > > > > > > > > break because we don't bump epoch when
> > > > > > > > > > > > > > > InitProducerId(keepPreparedTxn=true), if such
> > cases
> > > > > > exist,
> > > > > > > > > we'll
> > > > > > > > > > > need
> > > > > > > > > > > > > to
> > > > > > > > > > > > > > > develop the logic to handle epoch overflow for
> > > > ongoing
> > > > > > > > > > > transactions.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > -Artem
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > On Tue, Oct 3, 2023 at 10:15 AM Justine Olshan
> > > > > > > > > > > > > > > <jols...@confluent.io.invalid> wrote:
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >> Hey Artem,
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >> Thanks for the KIP. I had a question about
> epoch
> > > > > > bumping.
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >> Previously when we send an InitProducerId
> > request
> > > on
> > > > > > > > Producer
> > > > > > > > > > > > startup,
> > > > > > > > > > > > > > we
> > > > > > > > > > > > > > >> bump the epoch and abort the transaction. Is
> it
> > > > > correct
> > > > > > to
> > > > > > > > > > assume
> > > > > > > > > > > > that
> > > > > > > > > > > > > > we
> > > > > > > > > > > > > > >> will still bump the epoch, but just not abort
> > the
> > > > > > > > transaction?
> > > > > > > > > > > > > > >> If we still bump the epoch in this case, how
> > does
> > > > this
> > > > > > > > > interact
> > > > > > > > > > > with
> > > > > > > > > > > > > > >> KIP-890 where we also bump the epoch on every
> > > > > > transaction.
> > > > > > > > (I
> > > > > > > > > > > think
> > > > > > > > > > > > > this
> > > > > > > > > > > > > > >> means that we may skip epochs and the data
> > itself
> > > > will
> > > > > > all
> > > > > > > > > have
> > > > > > > > > > > the
> > > > > > > > > > > > > same
> > > > > > > > > > > > > > >> epoch)
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >> I may have follow ups depending on the answer
> to
> > > > this.
> > > > > > :)
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >> Thanks,
> > > > > > > > > > > > > > >> Justine
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >> On Thu, Sep 7, 2023 at 9:51 PM Artem Livshits
> > > > > > > > > > > > > > >> <alivsh...@confluent.io.invalid> wrote:
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >> > Hi Alex,
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >> > Thank you for your questions.
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >> > > the purpose of having broker-level
> > > > > > > > > > > > > > transaction.two.phase.commit.enable
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >> > The thinking is that 2PC is a bit of an
> > advanced
> > > > > > > construct
> > > > > > > > > so
> > > > > > > > > > > > > enabling
> > > > > > > > > > > > > > >> 2PC
> > > > > > > > > > > > > > >> > in a Kafka cluster should be an explicit
> > > decision.
> > > > > If
> > > > > > > it
> > > > > > > > is
> > > > > > > > > > set
> > > > > > > > > > > > to
> > > > > > > > > > > > > > >> 'false'
> > > > > > > > > > > > > > >> > InitiProducerId (and initTransactions) would
> > > > > > > > > > > > > > >> > return
> TRANSACTIONAL_ID_AUTHORIZATION_FAILED.
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >> > > WDYT about adding an AdminClient method
> that
> > > > > returns
> > > > > > > the
> > > > > > > > > > state
> > > > > > > > > > > > of
> > > > > > > > > > > > > > >> > transaction.two.phase.commit.enable
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >> > I wonder if the client could just try to use
> > 2PC
> > > > and
> > > > > > > then
> > > > > > > > > > handle
> > > > > > > > > > > > the
> > > > > > > > > > > > > > >> error
> > > > > > > > > > > > > > >> > (e.g. if it needs to fall back to ordinary
> > > > > > > transactions).
> > > > > > > > > > This
> > > > > > > > > > > > way
> > > > > > > > > > > > > it
> > > > > > > > > > > > > > >> > could uniformly handle cases when Kafka
> > cluster
> > > > > > doesn't
> > > > > > > > > > support
> > > > > > > > > > > > 2PC
> > > > > > > > > > > > > > >> > completely and cases when 2PC is restricted
> to
> > > > > certain
> > > > > > > > > users.
> > > > > > > > > > > We
> > > > > > > > > > > > > > could
> > > > > > > > > > > > > > >> > also expose this config in describeConfigs,
> if
> > > the
> > > > > > > > fallback
> > > > > > > > > > > > approach
> > > > > > > > > > > > > > >> > doesn't work for some scenarios.
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >> > -Artem
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >> > On Tue, Sep 5, 2023 at 12:45 PM Alexander
> > > > Sorokoumov
> > > > > > > > > > > > > > >> > <asorokou...@confluent.io.invalid> wrote:
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >> > > Hi Artem,
> > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > >> > > Thanks for publishing this KIP!
> > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > >> > > Can you please clarify the purpose of
> having
> > > > > > > > broker-level
> > > > > > > > > > > > > > >> > > transaction.two.phase.commit.enable config
> > in
> > > > > > addition
> > > > > > > > to
> > > > > > > > > > the
> > > > > > > > > > > > new
> > > > > > > > > > > > > > >> ACL? If
> > > > > > > > > > > > > > >> > > the brokers are configured with
> > > > > > > > > > > > > > >> > transaction.two.phase.commit.enable=false,
> > > > > > > > > > > > > > >> > > at what point will a client configured
> with
> > > > > > > > > > > > > > >> > > transaction.two.phase.commit.enable=true
> > fail?
> > > > > Will
> > > > > > it
> > > > > > > > > > happen
> > > > > > > > > > > at
> > > > > > > > > > > > > > >> > > KafkaProducer#initTransactions?
> > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > >> > > WDYT about adding an AdminClient method
> that
> > > > > returns
> > > > > > > the
> > > > > > > > > > state
> > > > > > > > > > > > of
> > > > > > > > > > > > > t
> > > > > > > > > > > > > > >> > > ransaction.two.phase.commit.enable? This
> > way,
> > > > > > clients
> > > > > > > > > would
> > > > > > > > > > > know
> > > > > > > > > > > > > in
> > > > > > > > > > > > > > >> > advance
> > > > > > > > > > > > > > >> > > if 2PC is enabled on the brokers.
> > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > >> > > Best,
> > > > > > > > > > > > > > >> > > Alex
> > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > >> > > On Fri, Aug 25, 2023 at 9:40 AM Roger
> > Hoover <
> > > > > > > > > > > > > > roger.hoo...@gmail.com>
> > > > > > > > > > > > > > >> > > wrote:
> > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > >> > > > Other than supporting multiplexing
> > > > transactional
> > > > > > > > streams
> > > > > > > > > > on
> > > > > > > > > > > a
> > > > > > > > > > > > > > single
> > > > > > > > > > > > > > >> > > > producer, I don't see how to improve it.
> > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > >> > > > On Thu, Aug 24, 2023 at 12:12 PM Artem
> > > > Livshits
> > > > > > > > > > > > > > >> > > > <alivsh...@confluent.io.invalid> wrote:
> > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > >> > > > > Hi Roger,
> > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > >> > > > > Thank you for summarizing the cons.  I
> > > agree
> > > > > and
> > > > > > > I'm
> > > > > > > > > > > curious
> > > > > > > > > > > > > > what
> > > > > > > > > > > > > > >> > would
> > > > > > > > > > > > > > >> > > > be
> > > > > > > > > > > > > > >> > > > > the alternatives to solve these
> problems
> > > > > better
> > > > > > > and
> > > > > > > > if
> > > > > > > > > > > they
> > > > > > > > > > > > > can
> > > > > > > > > > > > > > be
> > > > > > > > > > > > > > >> > > > > incorporated into this proposal (or
> > built
> > > > > > > > > independently
> > > > > > > > > > in
> > > > > > > > > > > > > > >> addition
> > > > > > > > > > > > > > >> > to
> > > > > > > > > > > > > > >> > > or
> > > > > > > > > > > > > > >> > > > > on top of this proposal).  E.g. one
> > > > potential
> > > > > > > > > extension
> > > > > > > > > > we
> > > > > > > > > > > > > > >> discussed
> > > > > > > > > > > > > > >> > > > > earlier in the thread could be
> > > multiplexing
> > > > > > > logical
> > > > > > > > > > > > > > transactional
> > > > > > > > > > > > > > >> > > > "streams"
> > > > > > > > > > > > > > >> > > > > with a single producer.
> > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > >> > > > > -Artem
> > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > >> > > > > On Wed, Aug 23, 2023 at 4:50 PM Roger
> > > > Hoover <
> > > > > > > > > > > > > > >> roger.hoo...@gmail.com
> > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > >> > > > > wrote:
> > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > >> > > > > > Thanks.  I like that you're moving
> > Kafka
> > > > > > toward
> > > > > > > > > > > supporting
> > > > > > > > > > > > > > this
> > > > > > > > > > > > > > >> > > > > dual-write
> > > > > > > > > > > > > > >> > > > > > pattern.  Each use case needs to
> > > consider
> > > > > the
> > > > > > > > > > tradeoffs.
> > > > > > > > > > > > > You
> > > > > > > > > > > > > > >> > already
> > > > > > > > > > > > > > >> > > > > > summarized the pros very well in the
> > > > KIP.  I
> > > > > > > would
> > > > > > > > > > > > summarize
> > > > > > > > > > > > > > the
> > > > > > > > > > > > > > >> > cons
> > > > > > > > > > > > > > >> > > > > > as follows:
> > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > >> > > > > > - you sacrifice availability - each
> > > write
> > > > > > > requires
> > > > > > > > > > both
> > > > > > > > > > > DB
> > > > > > > > > > > > > and
> > > > > > > > > > > > > > >> > Kafka
> > > > > > > > > > > > > > >> > > to
> > > > > > > > > > > > > > >> > > > > be
> > > > > > > > > > > > > > >> > > > > > available so I think your overall
> > > > > application
> > > > > > > > > > > availability
> > > > > > > > > > > > > is
> > > > > > > > > > > > > > 1
> > > > > > > > > > > > > > >> -
> > > > > > > > > > > > > > >> > > p(DB
> > > > > > > > > > > > > > >> > > > is
> > > > > > > > > > > > > > >> > > > > > unavailable)*p(Kafka is
> unavailable).
> > > > > > > > > > > > > > >> > > > > > - latency will be higher and
> > throughput
> > > > > lower
> > > > > > -
> > > > > > > > each
> > > > > > > > > > > write
> > > > > > > > > > > > > > >> requires
> > > > > > > > > > > > > > >> > > > both
> > > > > > > > > > > > > > >> > > > > > writes to DB and Kafka while holding
> > an
> > > > > > > exclusive
> > > > > > > > > lock
> > > > > > > > > > > in
> > > > > > > > > > > > > DB.
> > > > > > > > > > > > > > >> > > > > > - you need to create a producer per
> > unit
> > > > of
> > > > > > > > > > concurrency
> > > > > > > > > > > in
> > > > > > > > > > > > > > your
> > > > > > > > > > > > > > >> app
> > > > > > > > > > > > > > >> > > > which
> > > > > > > > > > > > > > >> > > > > > has some overhead in the app and
> Kafka
> > > > side
> > > > > > > > (number
> > > > > > > > > of
> > > > > > > > > > > > > > >> connections,
> > > > > > > > > > > > > > >> > > > poor
> > > > > > > > > > > > > > >> > > > > > batching).  I assume the producers
> > would
> > > > > need
> > > > > > to
> > > > > > > > be
> > > > > > > > > > > > > configured
> > > > > > > > > > > > > > >> for
> > > > > > > > > > > > > > >> > > low
> > > > > > > > > > > > > > >> > > > > > latency (linger.ms=0)
> > > > > > > > > > > > > > >> > > > > > - there's some complexity in
> managing
> > > > stable
> > > > > > > > > > > transactional
> > > > > > > > > > > > > ids
> > > > > > > > > > > > > > >> for
> > > > > > > > > > > > > > >> > > each
> > > > > > > > > > > > > > >> > > > > > producer/concurrency unit in your
> > > > > application.
> > > > > > > > With
> > > > > > > > > > k8s
> > > > > > > > > > > > > > >> > deployment,
> > > > > > > > > > > > > > >> > > > you
> > > > > > > > > > > > > > >> > > > > > may need to switch to something
> like a
> > > > > > > StatefulSet
> > > > > > > > > > that
> > > > > > > > > > > > > gives
> > > > > > > > > > > > > > >> each
> > > > > > > > > > > > > > >> > > pod
> > > > > > > > > > > > > > >> > > > a
> > > > > > > > > > > > > > >> > > > > > stable identity across restarts.  On
> > top
> > > > of
> > > > > > that
> > > > > > > > pod
> > > > > > > > > > > > > identity
> > > > > > > > > > > > > > >> which
> > > > > > > > > > > > > > >> > > you
> > > > > > > > > > > > > > >> > > > > can
> > > > > > > > > > > > > > >> > > > > > use as a prefix, you then assign
> > unique
> > > > > > > > > transactional
> > > > > > > > > > > ids
> > > > > > > > > > > > to
> > > > > > > > > > > > > > >> each
> > > > > > > > > > > > > > >> > > > > > concurrency unit (thread/goroutine).
> > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > >> > > > > > On Wed, Aug 23, 2023 at 12:53 PM
> Artem
> > > > > > Livshits
> > > > > > > > > > > > > > >> > > > > > <alivsh...@confluent.io.invalid>
> > wrote:
> > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > >> > > > > > > Hi Roger,
> > > > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > > > >> > > > > > > Thank you for the feedback.  You
> > make
> > > a
> > > > > very
> > > > > > > > good
> > > > > > > > > > > point
> > > > > > > > > > > > > that
> > > > > > > > > > > > > > >> we
> > > > > > > > > > > > > > >> > > also
> > > > > > > > > > > > > > >> > > > > > > discussed internally.  Adding
> > support
> > > > for
> > > > > > > > multiple
> > > > > > > > > > > > > > concurrent
> > > > > > > > > > > > > > >> > > > > > > transactions in one producer could
> > be
> > > > > > valuable
> > > > > > > > but
> > > > > > > > > > it
> > > > > > > > > > > > > seems
> > > > > > > > > > > > > > to
> > > > > > > > > > > > > > >> > be a
> > > > > > > > > > > > > > >> > > > > > fairly
> > > > > > > > > > > > > > >> > > > > > > large and independent change that
> > > would
> > > > > > > deserve
> > > > > > > > a
> > > > > > > > > > > > separate
> > > > > > > > > > > > > > >> KIP.
> > > > > > > > > > > > > > >> > If
> > > > > > > > > > > > > > >> > > > > such
> > > > > > > > > > > > > > >> > > > > > > support is added we could modify
> 2PC
> > > > > > > > functionality
> > > > > > > > > > to
> > > > > > > > > > > > > > >> incorporate
> > > > > > > > > > > > > > >> > > > that.
> > > > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > > > >> > > > > > > > Maybe not too bad but a bit of
> > pain
> > > to
> > > > > > > manage
> > > > > > > > > > these
> > > > > > > > > > > > ids
> > > > > > > > > > > > > > >> inside
> > > > > > > > > > > > > > >> > > each
> > > > > > > > > > > > > > >> > > > > > > process and across all application
> > > > > > processes.
> > > > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > > > >> > > > > > > I'm not sure if supporting
> multiple
> > > > > > > transactions
> > > > > > > > > in
> > > > > > > > > > > one
> > > > > > > > > > > > > > >> producer
> > > > > > > > > > > > > > >> > > > would
> > > > > > > > > > > > > > >> > > > > > make
> > > > > > > > > > > > > > >> > > > > > > id management simpler: we'd need
> to
> > > > store
> > > > > a
> > > > > > > > piece
> > > > > > > > > of
> > > > > > > > > > > > data
> > > > > > > > > > > > > > per
> > > > > > > > > > > > > > >> > > > > > transaction,
> > > > > > > > > > > > > > >> > > > > > > so whether it's N producers with a
> > > > single
> > > > > > > > > > transaction
> > > > > > > > > > > > or N
> > > > > > > > > > > > > > >> > > > transactions
> > > > > > > > > > > > > > >> > > > > > > with a single producer, it's still
> > > > roughly
> > > > > > the
> > > > > > > > > same
> > > > > > > > > > > > amount
> > > > > > > > > > > > > > of
> > > > > > > > > > > > > > >> > data
> > > > > > > > > > > > > > >> > > to
> > > > > > > > > > > > > > >> > > > > > > manage.  In fact, managing
> > > transactional
> > > > > ids
> > > > > > > > > > (current
> > > > > > > > > > > > > > >> proposal)
> > > > > > > > > > > > > > >> > > might
> > > > > > > > > > > > > > >> > > > > be
> > > > > > > > > > > > > > >> > > > > > > easier, because the id is
> controlled
> > > by
> > > > > the
> > > > > > > > > > > application
> > > > > > > > > > > > > and
> > > > > > > > > > > > > > it
> > > > > > > > > > > > > > >> > > knows
> > > > > > > > > > > > > > >> > > > > how
> > > > > > > > > > > > > > >> > > > > > to
> > > > > > > > > > > > > > >> > > > > > > complete the transaction after
> > crash /
> > > > > > > restart;
> > > > > > > > > > while
> > > > > > > > > > > a
> > > > > > > > > > > > > TID
> > > > > > > > > > > > > > >> would
> > > > > > > > > > > > > > >> > > be
> > > > > > > > > > > > > > >> > > > > > > generated by Kafka and that would
> > > > create a
> > > > > > > > > question
> > > > > > > > > > of
> > > > > > > > > > > > > > >> starting
> > > > > > > > > > > > > > >> > > Kafka
> > > > > > > > > > > > > > >> > > > > > > transaction, but not saving its
> TID
> > > and
> > > > > then
> > > > > > > > > > crashing,
> > > > > > > > > > > > > then
> > > > > > > > > > > > > > >> > > figuring
> > > > > > > > > > > > > > >> > > > > out
> > > > > > > > > > > > > > >> > > > > > > which transactions to abort and
> etc.
> > > > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > > > >> > > > > > > > 2) creating a separate producer
> > for
> > > > each
> > > > > > > > > > concurrency
> > > > > > > > > > > > > slot
> > > > > > > > > > > > > > in
> > > > > > > > > > > > > > >> > the
> > > > > > > > > > > > > > >> > > > > > > application
> > > > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > > > >> > > > > > > This is a very valid concern.
> Maybe
> > > > we'd
> > > > > > need
> > > > > > > > to
> > > > > > > > > > have
> > > > > > > > > > > > > some
> > > > > > > > > > > > > > >> > > > > multiplexing
> > > > > > > > > > > > > > >> > > > > > of
> > > > > > > > > > > > > > >> > > > > > > transactional logical "streams"
> over
> > > the
> > > > > > same
> > > > > > > > > > > > connection.
> > > > > > > > > > > > > > >> Seems
> > > > > > > > > > > > > > >> > > > like a
> > > > > > > > > > > > > > >> > > > > > > separate KIP, though.
> > > > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > > > >> > > > > > > > Otherwise, it seems you're left
> > with
> > > > > > > > > > single-threaded
> > > > > > > > > > > > > model
> > > > > > > > > > > > > > >> per
> > > > > > > > > > > > > > >> > > > > > > application process?
> > > > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > > > >> > > > > > > That's a fair assessment.  Not
> > > > necessarily
> > > > > > > > exactly
> > > > > > > > > > > > > > >> > single-threaded
> > > > > > > > > > > > > > >> > > > per
> > > > > > > > > > > > > > >> > > > > > > application, but a single producer
> > per
> > > > > > thread
> > > > > > > > > model
> > > > > > > > > > > > (i.e.
> > > > > > > > > > > > > an
> > > > > > > > > > > > > > >> > > > > application
> > > > > > > > > > > > > > >> > > > > > > could have a pool of threads +
> > > producers
> > > > > to
> > > > > > > > > increase
> > > > > > > > > > > > > > >> > concurrency).
> > > > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > > > >> > > > > > > -Artem
> > > > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > > > >> > > > > > > On Tue, Aug 22, 2023 at 7:22 PM
> > Roger
> > > > > > Hoover <
> > > > > > > > > > > > > > >> > > roger.hoo...@gmail.com
> > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > >> > > > > > > wrote:
> > > > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > > > >> > > > > > > > Artem,
> > > > > > > > > > > > > > >> > > > > > > >
> > > > > > > > > > > > > > >> > > > > > > > Thanks for the reply.
> > > > > > > > > > > > > > >> > > > > > > >
> > > > > > > > > > > > > > >> > > > > > > > If I understand correctly, Kafka
> > > does
> > > > > not
> > > > > > > > > support
> > > > > > > > > > > > > > concurrent
> > > > > > > > > > > > > > >> > > > > > transactions
> > > > > > > > > > > > > > >> > > > > > > > from the same producer
> > > (transactional
> > > > > id).
> > > > > > > I
> > > > > > > > > > think
> > > > > > > > > > > > this
> > > > > > > > > > > > > > >> means
> > > > > > > > > > > > > > >> > > that
> > > > > > > > > > > > > > >> > > > > > > > applications that want to
> support
> > > > > > in-process
> > > > > > > > > > > > concurrency
> > > > > > > > > > > > > > >> (say
> > > > > > > > > > > > > > >> > > > > > > thread-level
> > > > > > > > > > > > > > >> > > > > > > > concurrency with row-level DB
> > > locking)
> > > > > > would
> > > > > > > > > need
> > > > > > > > > > to
> > > > > > > > > > > > > > manage
> > > > > > > > > > > > > > >> > > > separate
> > > > > > > > > > > > > > >> > > > > > > > transactional ids and producers
> > per
> > > > > thread
> > > > > > > and
> > > > > > > > > > then
> > > > > > > > > > > > > store
> > > > > > > > > > > > > > >> txn
> > > > > > > > > > > > > > >> > > state
> > > > > > > > > > > > > > >> > > > > > > > accordingly.   The potential
> > > usability
> > > > > > > > > downsides I
> > > > > > > > > > > see
> > > > > > > > > > > > > are
> > > > > > > > > > > > > > >> > > > > > > > 1) managing a set of
> transactional
> > > ids
> > > > > for
> > > > > > > > each
> > > > > > > > > > > > > > application
> > > > > > > > > > > > > > >> > > process
> > > > > > > > > > > > > > >> > > > > > that
> > > > > > > > > > > > > > >> > > > > > > > scales up to it's max
> concurrency.
> > > > > Maybe
> > > > > > > not
> > > > > > > > > too
> > > > > > > > > > > bad
> > > > > > > > > > > > > but
> > > > > > > > > > > > > > a
> > > > > > > > > > > > > > >> bit
> > > > > > > > > > > > > > >> > > of
> > > > > > > > > > > > > > >> > > > > pain
> > > > > > > > > > > > > > >> > > > > > > to
> > > > > > > > > > > > > > >> > > > > > > > manage these ids inside each
> > process
> > > > and
> > > > > > > > across
> > > > > > > > > > all
> > > > > > > > > > > > > > >> application
> > > > > > > > > > > > > > >> > > > > > > processes.
> > > > > > > > > > > > > > >> > > > > > > > 2) creating a separate producer
> > for
> > > > each
> > > > > > > > > > concurrency
> > > > > > > > > > > > > slot
> > > > > > > > > > > > > > in
> > > > > > > > > > > > > > >> > the
> > > > > > > > > > > > > > >> > > > > > > > application - this could create
> a
> > > lot
> > > > > more
> > > > > > > > > > producers
> > > > > > > > > > > > and
> > > > > > > > > > > > > > >> > > resultant
> > > > > > > > > > > > > > >> > > > > > > > connections to Kafka than the
> > > typical
> > > > > > model
> > > > > > > > of a
> > > > > > > > > > > > single
> > > > > > > > > > > > > > >> > producer
> > > > > > > > > > > > > > >> > > > per
> > > > > > > > > > > > > > >> > > > > > > > process.
> > > > > > > > > > > > > > >> > > > > > > >
> > > > > > > > > > > > > > >> > > > > > > > Otherwise, it seems you're left
> > with
> > > > > > > > > > single-threaded
> > > > > > > > > > > > > model
> > > > > > > > > > > > > > >> per
> > > > > > > > > > > > > > >> > > > > > > application
> > > > > > > > > > > > > > >> > > > > > > > process?
> > > > > > > > > > > > > > >> > > > > > > >
> > > > > > > > > > > > > > >> > > > > > > > Thanks,
> > > > > > > > > > > > > > >> > > > > > > >
> > > > > > > > > > > > > > >> > > > > > > > Roger
> > > > > > > > > > > > > > >> > > > > > > >
> > > > > > > > > > > > > > >> > > > > > > > On Tue, Aug 22, 2023 at 5:11 PM
> > > Artem
> > > > > > > Livshits
> > > > > > > > > > > > > > >> > > > > > > > <alivsh...@confluent.io
> .invalid>
> > > > wrote:
> > > > > > > > > > > > > > >> > > > > > > >
> > > > > > > > > > > > > > >> > > > > > > > > Hi Roger, Arjun,
> > > > > > > > > > > > > > >> > > > > > > > >
> > > > > > > > > > > > > > >> > > > > > > > > Thank you for the questions.
> > > > > > > > > > > > > > >> > > > > > > > > > It looks like the
> application
> > > must
> > > > > > have
> > > > > > > > > stable
> > > > > > > > > > > > > > >> > transactional
> > > > > > > > > > > > > > >> > > > ids
> > > > > > > > > > > > > > >> > > > > > over
> > > > > > > > > > > > > > >> > > > > > > > > time?
> > > > > > > > > > > > > > >> > > > > > > > >
> > > > > > > > > > > > > > >> > > > > > > > > The transactional id should
> > > uniquely
> > > > > > > > identify
> > > > > > > > > a
> > > > > > > > > > > > > producer
> > > > > > > > > > > > > > >> > > instance
> > > > > > > > > > > > > > >> > > > > and
> > > > > > > > > > > > > > >> > > > > > > > needs
> > > > > > > > > > > > > > >> > > > > > > > > to be stable across the
> > restarts.
> > > > If
> > > > > > the
> > > > > > > > > > > > > transactional
> > > > > > > > > > > > > > >> id is
> > > > > > > > > > > > > > >> > > not
> > > > > > > > > > > > > > >> > > > > > > stable
> > > > > > > > > > > > > > >> > > > > > > > > across restarts, then zombie
> > > > messages
> > > > > > > from a
> > > > > > > > > > > > previous
> > > > > > > > > > > > > > >> > > incarnation
> > > > > > > > > > > > > > >> > > > > of
> > > > > > > > > > > > > > >> > > > > > > the
> > > > > > > > > > > > > > >> > > > > > > > > producer may violate
> atomicity.
> > > If
> > > > > > there
> > > > > > > > are
> > > > > > > > > 2
> > > > > > > > > > > > > producer
> > > > > > > > > > > > > > >> > > > instances
> > > > > > > > > > > > > > >> > > > > > > > > concurrently producing data
> with
> > > the
> > > > > > same
> > > > > > > > > > > > > transactional
> > > > > > > > > > > > > > >> id,
> > > > > > > > > > > > > > >> > > they
> > > > > > > > > > > > > > >> > > > > are
> > > > > > > > > > > > > > >> > > > > > > > going
> > > > > > > > > > > > > > >> > > > > > > > > to constantly fence each other
> > and
> > > > > most
> > > > > > > > likely
> > > > > > > > > > > make
> > > > > > > > > > > > > > >> little or
> > > > > > > > > > > > > > >> > > no
> > > > > > > > > > > > > > >> > > > > > > > progress.
> > > > > > > > > > > > > > >> > > > > > > > >
> > > > > > > > > > > > > > >> > > > > > > > > The name might be a little bit
> > > > > confusing
> > > > > > > as
> > > > > > > > it
> > > > > > > > > > may
> > > > > > > > > > > > be
> > > > > > > > > > > > > > >> > mistaken
> > > > > > > > > > > > > > >> > > > for
> > > > > > > > > > > > > > >> > > > > a
> > > > > > > > > > > > > > >> > > > > > > > > transaction id / TID that
> > uniquely
> > > > > > > > identifies
> > > > > > > > > > > every
> > > > > > > > > > > > > > >> > > transaction.
> > > > > > > > > > > > > > >> > > > > The
> > > > > > > > > > > > > > >> > > > > > > > name
> > > > > > > > > > > > > > >> > > > > > > > > and the semantics were defined
> > in
> > > > the
> > > > > > > > original
> > > > > > > > > > > > > > >> > > > > exactly-once-semantics
> > > > > > > > > > > > > > >> > > > > > > > (EoS)
> > > > > > > > > > > > > > >> > > > > > > > > proposal (
> > > > > > > > > > > > > > >> > > > > > > > >
> > > > > > > > > > > > > > >> > > > > > > > >
> > > > > > > > > > > > > > >> > > > > > > >
> > > > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging
> > > > > > > > > > > > > > >> > > > > > > > > )
> > > > > > > > > > > > > > >> > > > > > > > > and KIP-939 just build on top
> of
> > > > that.
> > > > > > > > > > > > > > >> > > > > > > > >
> > > > > > > > > > > > > > >> > > > > > > > > > I'm curious to understand
> what
> > > > > happens
> > > > > > > if
> > > > > > > > > the
> > > > > > > > > > > > > producer
> > > > > > > > > > > > > > >> > dies,
> > > > > > > > > > > > > > >> > > > and
> > > > > > > > > > > > > > >> > > > > > does
> > > > > > > > > > > > > > >> > > > > > > > not
> > > > > > > > > > > > > > >> > > > > > > > > come up and recover the
> pending
> > > > > > > transaction
> > > > > > > > > > within
> > > > > > > > > > > > the
> > > > > > > > > > > > > > >> > > > transaction
> > > > > > > > > > > > > > >> > > > > > > > timeout
> > > > > > > > > > > > > > >> > > > > > > > > interval.
> > > > > > > > > > > > > > >> > > > > > > > >
> > > > > > > > > > > > > > >> > > > > > > > > If the producer / application
> > > never
> > > > > > comes
> > > > > > > > > back,
> > > > > > > > > > > the
> > > > > > > > > > > > > > >> > transaction
> > > > > > > > > > > > > > >> > > > > will
> > > > > > > > > > > > > > >> > > > > > > > remain
> > > > > > > > > > > > > > >> > > > > > > > > in prepared (a.k.a.
> "in-doubt")
> > > > state
> > > > > > > until
> > > > > > > > an
> > > > > > > > > > > > > operator
> > > > > > > > > > > > > > >> > > > forcefully
> > > > > > > > > > > > > > >> > > > > > > > > terminates the transaction.
> > > That's
> > > > > why
> > > > > > > > there
> > > > > > > > > > is a
> > > > > > > > > > > > new
> > > > > > > > > > > > > > >> ACL is
> > > > > > > > > > > > > > >> > > > > defined
> > > > > > > > > > > > > > >> > > > > > > in
> > > > > > > > > > > > > > >> > > > > > > > > this proposal -- this
> > > functionality
> > > > > > should
> > > > > > > > > only
> > > > > > > > > > > > > provided
> > > > > > > > > > > > > > >> to
> > > > > > > > > > > > > > >> > > > > > > applications
> > > > > > > > > > > > > > >> > > > > > > > > that implement proper recovery
> > > > logic.
> > > > > > > > > > > > > > >> > > > > > > > >
> > > > > > > > > > > > > > >> > > > > > > > > -Artem
> > > > > > > > > > > > > > >> > > > > > > > >
> > > > > > > > > > > > > > >> > > > > > > > > On Tue, Aug 22, 2023 at
> 12:52 AM
> > > > Arjun
> > > > > > > > Satish
> > > > > > > > > <
> > > > > > > > > > > > > > >> > > > > > arjun.sat...@gmail.com>
> > > > > > > > > > > > > > >> > > > > > > > > wrote:
> > > > > > > > > > > > > > >> > > > > > > > >
> > > > > > > > > > > > > > >> > > > > > > > > > Hello Artem,
> > > > > > > > > > > > > > >> > > > > > > > > >
> > > > > > > > > > > > > > >> > > > > > > > > > Thanks for the KIP.
> > > > > > > > > > > > > > >> > > > > > > > > >
> > > > > > > > > > > > > > >> > > > > > > > > > I have the same question as
> > > Roger
> > > > on
> > > > > > > > > > concurrent
> > > > > > > > > > > > > > writes,
> > > > > > > > > > > > > > >> and
> > > > > > > > > > > > > > >> > > an
> > > > > > > > > > > > > > >> > > > > > > > additional
> > > > > > > > > > > > > > >> > > > > > > > > > one on consumer behavior.
> > > > Typically,
> > > > > > > > > > > transactions
> > > > > > > > > > > > > will
> > > > > > > > > > > > > > >> > > timeout
> > > > > > > > > > > > > > >> > > > if
> > > > > > > > > > > > > > >> > > > > > not
> > > > > > > > > > > > > > >> > > > > > > > > > committed within some time
> > > > interval.
> > > > > > > With
> > > > > > > > > the
> > > > > > > > > > > > > proposed
> > > > > > > > > > > > > > >> > > changes
> > > > > > > > > > > > > > >> > > > in
> > > > > > > > > > > > > > >> > > > > > > this
> > > > > > > > > > > > > > >> > > > > > > > > KIP,
> > > > > > > > > > > > > > >> > > > > > > > > > consumers cannot consume
> past
> > > the
> > > > > > > ongoing
> > > > > > > > > > > > > transaction.
> > > > > > > > > > > > > > >> I'm
> > > > > > > > > > > > > > >> > > > > curious
> > > > > > > > > > > > > > >> > > > > > to
> > > > > > > > > > > > > > >> > > > > > > > > > understand what happens if
> the
> > > > > > producer
> > > > > > > > > dies,
> > > > > > > > > > > and
> > > > > > > > > > > > > does
> > > > > > > > > > > > > > >> not
> > > > > > > > > > > > > > >> > > come
> > > > > > > > > > > > > > >> > > > > up
> > > > > > > > > > > > > > >> > > > > > > and
> > > > > > > > > > > > > > >> > > > > > > > > > recover the pending
> > transaction
> > > > > within
> > > > > > > the
> > > > > > > > > > > > > transaction
> > > > > > > > > > > > > > >> > > timeout
> > > > > > > > > > > > > > >> > > > > > > > interval.
> > > > > > > > > > > > > > >> > > > > > > > > Or
> > > > > > > > > > > > > > >> > > > > > > > > > are we saying that when used
> > in
> > > > this
> > > > > > 2PC
> > > > > > > > > > > context,
> > > > > > > > > > > > we
> > > > > > > > > > > > > > >> should
> > > > > > > > > > > > > > >> > > > > > configure
> > > > > > > > > > > > > > >> > > > > > > > > these
> > > > > > > > > > > > > > >> > > > > > > > > > transaction timeouts to very
> > > large
> > > > > > > > > durations?
> > > > > > > > > > > > > > >> > > > > > > > > >
> > > > > > > > > > > > > > >> > > > > > > > > > Thanks in advance!
> > > > > > > > > > > > > > >> > > > > > > > > >
> > > > > > > > > > > > > > >> > > > > > > > > > Best,
> > > > > > > > > > > > > > >> > > > > > > > > > Arjun
> > > > > > > > > > > > > > >> > > > > > > > > >
> > > > > > > > > > > > > > >> > > > > > > > > >
> > > > > > > > > > > > > > >> > > > > > > > > > On Mon, Aug 21, 2023 at
> > 1:06 PM
> > > > > Roger
> > > > > > > > > Hoover <
> > > > > > > > > > > > > > >> > > > > > roger.hoo...@gmail.com
> > > > > > > > > > > > > > >> > > > > > > >
> > > > > > > > > > > > > > >> > > > > > > > > > wrote:
> > > > > > > > > > > > > > >> > > > > > > > > >
> > > > > > > > > > > > > > >> > > > > > > > > > > Hi Artem,
> > > > > > > > > > > > > > >> > > > > > > > > > >
> > > > > > > > > > > > > > >> > > > > > > > > > > Thanks for writing this
> KIP.
> > > > Can
> > > > > > you
> > > > > > > > > > clarify
> > > > > > > > > > > > the
> > > > > > > > > > > > > > >> > > > requirements
> > > > > > > > > > > > > > >> > > > > a
> > > > > > > > > > > > > > >> > > > > > > bit
> > > > > > > > > > > > > > >> > > > > > > > > more
> > > > > > > > > > > > > > >> > > > > > > > > > > for managing transaction
> > > state?
> > > > > It
> > > > > > > > looks
> > > > > > > > > > like
> > > > > > > > > > > > the
> > > > > > > > > > > > > > >> > > > application
> > > > > > > > > > > > > > >> > > > > > must
> > > > > > > > > > > > > > >> > > > > > > > > have
> > > > > > > > > > > > > > >> > > > > > > > > > > stable transactional ids
> > over
> > > > > time?
> > > > > > > >  What
> > > > > > > > > > is
> > > > > > > > > > > > the
> > > > > > > > > > > > > > >> > > granularity
> > > > > > > > > > > > > > >> > > > > of
> > > > > > > > > > > > > > >> > > > > > > > those
> > > > > > > > > > > > > > >> > > > > > > > > > ids
> > > > > > > > > > > > > > >> > > > > > > > > > > and producers?  Say the
> > > > > application
> > > > > > > is a
> > > > > > > > > > > > > > >> multi-threaded
> > > > > > > > > > > > > > >> > > Java
> > > > > > > > > > > > > > >> > > > > web
> > > > > > > > > > > > > > >> > > > > > > > > server,
> > > > > > > > > > > > > > >> > > > > > > > > > > can/should all the
> > concurrent
> > > > > > threads
> > > > > > > > > share
> > > > > > > > > > a
> > > > > > > > > > > > > > >> > transactional
> > > > > > > > > > > > > > >> > > > id
> > > > > > > > > > > > > > >> > > > > > and
> > > > > > > > > > > > > > >> > > > > > > > > > > producer?  That doesn't
> seem
> > > > right
> > > > > > to
> > > > > > > me
> > > > > > > > > > > unless
> > > > > > > > > > > > > the
> > > > > > > > > > > > > > >> > > > application
> > > > > > > > > > > > > > >> > > > > > is
> > > > > > > > > > > > > > >> > > > > > > > > using
> > > > > > > > > > > > > > >> > > > > > > > > > > global DB locks that
> > serialize
> > > > all
> > > > > > > > > requests.
> > > > > > > > > > > > > > >> Instead, if
> > > > > > > > > > > > > > >> > > the
> > > > > > > > > > > > > > >> > > > > > > > > application
> > > > > > > > > > > > > > >> > > > > > > > > > > uses row-level DB locks,
> > there
> > > > > could
> > > > > > > be
> > > > > > > > > > > > multiple,
> > > > > > > > > > > > > > >> > > concurrent,
> > > > > > > > > > > > > > >> > > > > > > > > independent
> > > > > > > > > > > > > > >> > > > > > > > > > > txns happening in the same
> > JVM
> > > > so
> > > > > it
> > > > > > > > seems
> > > > > > > > > > > like
> > > > > > > > > > > > > the
> > > > > > > > > > > > > > >> > > > granularity
> > > > > > > > > > > > > > >> > > > > > > > > managing
> > > > > > > > > > > > > > >> > > > > > > > > > > transactional ids and txn
> > > state
> > > > > > needs
> > > > > > > to
> > > > > > > > > > line
> > > > > > > > > > > up
> > > > > > > > > > > > > > with
> > > > > > > > > > > > > > >> > > > > granularity
> > > > > > > > > > > > > > >> > > > > > > of
> > > > > > > > > > > > > > >> > > > > > > > > the
> > > > > > > > > > > > > > >> > > > > > > > > > DB
> > > > > > > > > > > > > > >> > > > > > > > > > > locking.
> > > > > > > > > > > > > > >> > > > > > > > > > >
> > > > > > > > > > > > > > >> > > > > > > > > > > Does that make sense or
> am I
> > > > > > > > > > misunderstanding?
> > > > > > > > > > > > > > >> > > > > > > > > > >
> > > > > > > > > > > > > > >> > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > >> > > > > > > > > > >
> > > > > > > > > > > > > > >> > > > > > > > > > > Roger
> > > > > > > > > > > > > > >> > > > > > > > > > >
> > > > > > > > > > > > > > >> > > > > > > > > > > On Wed, Aug 16, 2023 at
> > > 11:40 PM
> > > > > > Artem
> > > > > > > > > > > Livshits
> > > > > > > > > > > > > > >> > > > > > > > > > > <alivsh...@confluent.io
> > > > .invalid>
> > > > > > > wrote:
> > > > > > > > > > > > > > >> > > > > > > > > > >
> > > > > > > > > > > > > > >> > > > > > > > > > > > Hello,
> > > > > > > > > > > > > > >> > > > > > > > > > > >
> > > > > > > > > > > > > > >> > > > > > > > > > > > This is a discussion
> > thread
> > > > for
> > > > > > > > > > > > > > >> > > > > > > > > > > >
> > > > > > > > > > > > > > >> > > > > > > > > > > >
> > > > > > > > > > > > > > >> > > > > > > > > > >
> > > > > > > > > > > > > > >> > > > > > > > > >
> > > > > > > > > > > > > > >> > > > > > > > >
> > > > > > > > > > > > > > >> > > > > > > >
> > > > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-939%3A+Support+Participation+in+2PC
> > > > > > > > > > > > > > >> > > > > > > > > > > > .
> > > > > > > > > > > > > > >> > > > > > > > > > > >
> > > > > > > > > > > > > > >> > > > > > > > > > > > The KIP proposes
> extending
> > > > Kafka
> > > > > > > > > > transaction
> > > > > > > > > > > > > > support
> > > > > > > > > > > > > > >> > > (that
> > > > > > > > > > > > > > >> > > > > > > already
> > > > > > > > > > > > > > >> > > > > > > > > uses
> > > > > > > > > > > > > > >> > > > > > > > > > > 2PC
> > > > > > > > > > > > > > >> > > > > > > > > > > > under the hood) to
> enable
> > > > > > atomicity
> > > > > > > of
> > > > > > > > > > dual
> > > > > > > > > > > > > writes
> > > > > > > > > > > > > > >> to
> > > > > > > > > > > > > > >> > > Kafka
> > > > > > > > > > > > > > >> > > > > and
> > > > > > > > > > > > > > >> > > > > > > an
> > > > > > > > > > > > > > >> > > > > > > > > > > external
> > > > > > > > > > > > > > >> > > > > > > > > > > > database, and helps to
> > fix a
> > > > > long
> > > > > > > > > standing
> > > > > > > > > > > > Flink
> > > > > > > > > > > > > > >> issue.
> > > > > > > > > > > > > > >> > > > > > > > > > > >
> > > > > > > > > > > > > > >> > > > > > > > > > > > An example of code that
> > uses
> > > > the
> > > > > > > dual
> > > > > > > > > > write
> > > > > > > > > > > > > recipe
> > > > > > > > > > > > > > >> with
> > > > > > > > > > > > > > >> > > > JDBC
> > > > > > > > > > > > > > >> > > > > > and
> > > > > > > > > > > > > > >> > > > > > > > > should
> > > > > > > > > > > > > > >> > > > > > > > > > > > work for most SQL
> > databases
> > > is
> > > > > > here
> > > > > > > > > > > > > > >> > > > > > > > > > > >
> > > > > > > > > > https://github.com/apache/kafka/pull/14231.
> > > > > > > > > > > > > > >> > > > > > > > > > > >
> > > > > > > > > > > > > > >> > > > > > > > > > > > The FLIP for the sister
> > fix
> > > in
> > > > > > Flink
> > > > > > > > is
> > > > > > > > > > here
> > > > > > > > > > > > > > >> > > > > > > > > > > >
> > > > > > > > > > > > > > >> > > > > > > > > > >
> > > > > > > > > > > > > > >> > > > > > > > > >
> > > > > > > > > > > > > > >> > > > > > > > >
> > > > > > > > > > > > > > >> > > > > > > >
> > > > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255071710
> > > > > > > > > > > > > > >> > > > > > > > > > > >
> > > > > > > > > > > > > > >> > > > > > > > > > > > -Artem
> > > > > > > > > > > > > > >> > > > > > > > > > > >
> > > > > > > > > > > > > > >> > > > > > > > > > >
> > > > > > > > > > > > > > >> > > > > > > > > >
> > > > > > > > > > > > > > >> > > > > > > > >
> > > > > > > > > > > > > > >> > > > > > > >
> > > > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to