Hi, Artem,

Thanks for the reply.

I understand your concern on having a timeout breaking the 2PC guarantees.
However, the fallback plan to disable 2PC with an independent
keepPreparedTxn is subject to the timeout too. So, it doesn't provide the
same guarantees as 2PC either.

To me, if we provide a new functionality, we should make it easy such that
the application developer only needs to implement it in one way, which is
always correct. Then, we can consider what additional things are needed to
make the operator comfortable enabling it.

Jun

On Tue, Feb 27, 2024 at 4:45 PM Artem Livshits
<alivsh...@confluent.io.invalid> wrote:

> Hi Jun,
>
> Thank you for the discussion.
>
> > For 3b, it would be useful to understand the reason why an admin doesn't
> authorize 2PC for self-hosted Flink
>
> I think the nuance here is that for cloud, there is a cloud admin
> (operator) and there is cluster admin (who, for example could manage acls
> on topics or etc.).  The 2PC functionality can affect cloud operations,
> because a long running transaction can block the last stable offset and
> prevent compaction or data tiering.  In a multi-tenant environment, a long
> running transaction that involves consumer offset may affect data that is
> shared by multiple tenants (Flink transactions don't use consumer offsets,
> so this is not an issue for Flink, but we'd need a separate ACL or some
> other way to express this permission if we wanted to go in that direction).
>
> For that reason, I expect 2PC to be controlled by the cloud operator and it
> just may not be scalable for the cloud operator to manage all potential
> interactions required to resolve in-doubt transactions (communicate to the
> end users, etc.).  In general, we make no assumptions about Kafka
> applications -- they may come and go, they may abandon transactional ids
> and generate new ones.  For 2PC we need to make sure that the application
> is highly available and wouldn't easily abandon an open transaction in
> Kafka.
>
> > If so, another way to address that is to allow the admin to set a timeout
> even for the 2PC case.
>
> This effectively abandons the 2PC guarantee because it creates a case for
> Kafka to unilaterally make an automatic decision on a prepared
> transaction.  I think it's fundamental for 2PC to abandon this ability and
> wait for the external coordinator for the decision, after all the
> coordinator may legitimately be unavailable for an arbitrary amount of
> time.  Also, we already have a timeout on regular Kafka transactions,
> having another "special" timeout could be confusing, and a large enough
> timeout could still produce the undesirable effects for the cloud
> operations (so we kind of get worst of both options -- we don't provide
> guarantees and still have impact on operations).
>
> -Artem
>
> On Fri, Feb 23, 2024 at 8:55 AM Jun Rao <j...@confluent.io.invalid> wrote:
>
> > Hi, Artem,
> >
> > Thanks for the reply.
> >
> > For 3b, it would be useful to understand the reason why an admin doesn't
> > authorize 2PC for self-hosted Flink. Is the main reason that 2PC has
> > unbounded timeout that could lead to unbounded outstanding transactions?
> If
> > so, another way to address that is to allow the admin to set a timeout
> even
> > for the 2PC case. The timeout would be long enough for behavioring
> > applications to complete 2PC operations, but not too long for
> non-behaving
> > applications' transactions to hang.
> >
> > Jun
> >
> > On Wed, Feb 21, 2024 at 4:34 PM Artem Livshits
> > <alivsh...@confluent.io.invalid> wrote:
> >
> > > Hi Jun,
> > >
> > > > 20A. One option is to make the API initTransactions(boolean
> enable2PC).
> > >
> > > We could do that.  I think there is a little bit of symmetry between
> the
> > > client and server that would get lost with this approach (server has
> > > enable2PC as config), but I don't really see a strong reason for
> > enable2PC
> > > to be a config vs. an argument for initTransactions.  But let's see if
> we
> > > find 20B to be a strong consideration for keeping a separate flag for
> > > keepPreparedTxn.
> > >
> > > > 20B. But realistically, we want Flink (and other apps) to have a
> single
> > > implementation
> > >
> > > That's correct and here's what I think can happen if we don't allow
> > > independent keepPreparedTxn:
> > >
> > > 1. Pre-KIP-939 self-hosted Flink vs. any Kafka cluster -- reflection is
> > > used, which effectively implements keepPreparedTxn=true without our
> > > explicit support.
> > > 2. KIP-939 self-hosted Flink vs. pre-KIP-939 Kafka cluster -- we can
> > > either fall back to reflection or we just say we don't support this,
> have
> > > to upgrade Kafka cluster first.
> > > 3. KIP-939 self-hosted Flink vs. KIP-939 Kafka cluster, this becomes
> > > interesting depending on whether the Kafka cluster authorizes 2PC or
> not:
> > >  3a. Kafka cluster autorizes 2PC for self-hosted Flink -- everything
> uses
> > > KIP-939 and there is no problem
> > >  3b. Kafka cluster doesn't authorize 2PC for self-hosted Flink -- we
> can
> > > either fallback to reflection or use keepPreparedTxn=true even if 2PC
> is
> > > not enabled.
> > >
> > > It seems to be ok to not support case 2 (i.e. require Kafka upgrade
> > first),
> > > it shouldn't be an issue for cloud offerings as cloud providers are
> > likely
> > > to upgrade their Kafka to the latest versions.
> > >
> > > The case 3b seems to be important to support, though -- the latest
> > version
> > > of everything should work at least as well (and preferably better) than
> > > previous ones.  It's possible to downgrade to case 1, but it's probably
> > not
> > > sustainable as newer versions of Flink would also add other features
> that
> > > the customers may want to take advantage of.
> > >
> > > If we enabled keepPreparedTxn=true even without 2PC, then we could
> enable
> > > case 3b without the need to fall back to reflection, so we could get
> rid
> > of
> > > reflection-based logic and just have a single implementation based on
> > > KIP-939.
> > >
> > > > 32. My suggestion is to change
> > >
> > > Let me think about it and I'll come back to this.
> > >
> > > -Artem
> > >
> > > On Wed, Feb 21, 2024 at 3:40 PM Jun Rao <j...@confluent.io.invalid>
> > wrote:
> > >
> > > > Hi, Artem,
> > > >
> > > > Thanks for the reply.
> > > >
> > > > 20A. One option is to make the API initTransactions(boolean
> enable2PC).
> > > > Then, it's clear from the code whether 2PC related logic should be
> > added.
> > > >
> > > > 20B. But realistically, we want Flink (and other apps) to have a
> single
> > > > implementation of the 2PC logic, not two different implementations,
> > > right?
> > > >
> > > > 32. My suggestion is to
> > > > change
> > > >
> > >
> >
> kafka.server:type=transaction-coordinator-metrics,name=active-transaction-open-time-max
> > > > to sth like
> > > > Metric Name                        Type  Group
> > > > Tags   Description
> > > > active-transaction-open-time-max   Max
> >  transaction-coordinator-metrics
> > > >  none  The max time a currently-open transaction has been open
> > > >
> > > > Jun
> > > >
> > > > On Wed, Feb 21, 2024 at 11:25 AM Artem Livshits
> > > > <alivsh...@confluent.io.invalid> wrote:
> > > >
> > > > > Hi Jun,
> > > > >
> > > > > > 20A.  This only takes care of the abort case. The application
> still
> > > > needs
> > > > > to be changed to handle the commit case properly
> > > > >
> > > > > My point here is that looking at the initTransactions() call it's
> not
> > > > clear
> > > > > what the semantics is.  Say I'm doing code review, I cannot say if
> > the
> > > > code
> > > > > is correct or not -- if the config (that's something that's
> > > > > theoretically not known at the time of code review) is going to
> > enable
> > > > 2PC,
> > > > > then the correct code should look one way, otherwise it would need
> to
> > > > look
> > > > > differently.  Also, say if code is written with InitTransaction()
> > > without
> > > > > explicit abort and then for whatever reason the code would get used
> > > with
> > > > > 2PC enabled (could be a library in a bigger product) it'll start
> > > breaking
> > > > > in a non-intuitive way.
> > > > >
> > > > > > 20B. Hmm, if the admin disables 2PC, there is likely a reason
> > behind
> > > > that
> > > > >
> > > > > That's true, but reality may be more complicated.  Say a user wants
> > to
> > > > run
> > > > > a self-managed Flink with Confluent cloud.  Confluent cloud adim
> may
> > > not
> > > > > be comfortable enabling 2PC to general user accounts that use
> > services
> > > > not
> > > > > managed by Confluent (the same way Confluent doesn't allow
> increasing
> > > max
> > > > > transaction timeout for general user accounts).  Right now,
> > > self-managed
> > > > > Flink works because it uses reflection, if it moves to use public
> > APIs
> > > > > provided by KIP-939 it'll break.
> > > > >
> > > > > > 32. Ok. That's the kafka metric. In that case, the metric name
> has
> > a
> > > > > group and a name. There is no type and no package name.
> > > > >
> > > > > Is this a suggestion to change or confirmation that the current
> logic
> > > is
> > > > > ok?  I just copied an existing metric but can change if needed.
> > > > >
> > > > > -Artem
> > > > >
> > > > > On Tue, Feb 20, 2024 at 11:25 AM Jun Rao <j...@confluent.io.invalid
> >
> > > > wrote:
> > > > >
> > > > > > Hi, Artem,
> > > > > >
> > > > > > Thanks for the reply.
> > > > > >
> > > > > > 20. "Say if an application
> > > > > > currently uses initTransactions() to achieve the current
> semantics,
> > > it
> > > > > > would need to be rewritten to use initTransactions() + abort to
> > > achieve
> > > > > the
> > > > > > same semantics if the config is changed. "
> > > > > >
> > > > > > This only takes care of the abort case. The application still
> needs
> > > to
> > > > be
> > > > > > changed to handle the commit case properly
> > > > > > if transaction.two.phase.commit.enable is set to true.
> > > > > >
> > > > > > "Even when KIP-939 is implemented,
> > > > > > there would be situations when 2PC is disabled by the admin (e.g.
> > > Kafka
> > > > > > service providers may be reluctant to enable 2PC for Flink
> services
> > > > that
> > > > > > users host themselves), so we either have to perpetuate the
> > > > > > reflection-based implementation in Flink or enable
> > > keepPreparedTxn=true
> > > > > > without 2PC."
> > > > > >
> > > > > > Hmm, if the admin disables 2PC, there is likely a reason behind
> > > that. I
> > > > > am
> > > > > > not sure that we should provide an API to encourage the
> application
> > > to
> > > > > > circumvent that.
> > > > > >
> > > > > > 32. Ok. That's the kafka metric. In that case, the metric name
> has
> > a
> > > > > group
> > > > > > and a name. There is no type and no package name.
> > > > > >
> > > > > > Jun
> > > > > >
> > > > > >
> > > > > > On Thu, Feb 15, 2024 at 8:23 PM Artem Livshits
> > > > > > <alivsh...@confluent.io.invalid> wrote:
> > > > > >
> > > > > > > Hi Jun,
> > > > > > >
> > > > > > > Thank you for your questions.
> > > > > > >
> > > > > > > > 20. So to abort a prepared transaction after the producer
> > start,
> > > we
> > > > > > could
> > > > > > > use ...
> > > > > > >
> > > > > > > I agree, initTransaction(true) + abort would accomplish the
> > > behavior
> > > > of
> > > > > > > initTransactions(false), so we could technically have fewer
> ways
> > to
> > > > > > achieve
> > > > > > > the same thing, which is generally valuable.  I wonder, though,
> > if
> > > > that
> > > > > > > would be intuitive from the application perspective.  Say if an
> > > > > > application
> > > > > > > currently uses initTransactions() to achieve the current
> > semantics,
> > > > it
> > > > > > > would need to be rewritten to use initTransactions() + abort to
> > > > achieve
> > > > > > the
> > > > > > > same semantics if the config is changed.  I think this could
> > create
> > > > > > > subtle confusion, as the config change is generally decoupled
> > from
> > > > > > changing
> > > > > > > application implementation.
> > > > > > >
> > > > > > > >  The use case mentioned for keepPreparedTxn=true without 2PC
> > > > doesn't
> > > > > > seem
> > > > > > > very important
> > > > > > >
> > > > > > > I agree, it's not a strict requirement.  It is, however, a
> > missing
> > > > > option
> > > > > > > in the public API, so currently Flink has to use reflection to
> > > > emulate
> > > > > > this
> > > > > > > functionality without 2PC support.   Even when KIP-939 is
> > > > implemented,
> > > > > > > there would be situations when 2PC is disabled by the admin
> (e.g.
> > > > Kafka
> > > > > > > service providers may be reluctant to enable 2PC for Flink
> > services
> > > > > that
> > > > > > > users host themselves), so we either have to perpetuate the
> > > > > > > reflection-based implementation in Flink or enable
> > > > keepPreparedTxn=true
> > > > > > > without 2PC.
> > > > > > >
> > > > > > > > 32.
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server:type=transaction-coordinator-metrics,name=active-transaction-open-time-max
> > > > > > >
> > > > > > > I just followed the existing metric implementation example
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala#L95
> > > > > > > ,
> > > > > > > which maps to
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server:type=transaction-coordinator-metrics,name=partition-load-time-max.
> > > > > > >
> > > > > > > > 33. "If the value is 'true' then the corresponding field is
> set
> > > > > > >
> > > > > > > That's correct.  Updated the KIP.
> > > > > > >
> > > > > > > -Artem
> > > > > > >
> > > > > > > On Wed, Feb 7, 2024 at 10:06 AM Jun Rao
> <j...@confluent.io.invalid
> > >
> > > > > > wrote:
> > > > > > >
> > > > > > > > Hi, Artem,
> > > > > > > >
> > > > > > > > Thanks for the reply.
> > > > > > > >
> > > > > > > > 20. So to abort a prepared transaction after producer start,
> we
> > > > could
> > > > > > use
> > > > > > > > either
> > > > > > > >   producer.initTransactions(false)
> > > > > > > > or
> > > > > > > >   producer.initTransactions(true)
> > > > > > > >   producer.abortTransaction
> > > > > > > > Could we just always use the latter API? If we do this, we
> > could
> > > > > > > > potentially eliminate the keepPreparedTxn flag in
> > > > initTransactions().
> > > > > > > After
> > > > > > > > the initTransactions() call, the outstanding txn is always
> > > > preserved
> > > > > if
> > > > > > > 2pc
> > > > > > > > is enabled and aborted if 2pc is disabled. The use case
> > mentioned
> > > > for
> > > > > > > > keepPreparedTxn=true without 2PC doesn't seem very important.
> > If
> > > we
> > > > > > could
> > > > > > > > do that, it seems that we have (1) less redundant and simpler
> > > APIs;
> > > > > (2)
> > > > > > > > more symmetric syntax for aborting/committing a prepared txn
> > > after
> > > > > > > producer
> > > > > > > > restart.
> > > > > > > >
> > > > > > > > 32.
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server:type=transaction-coordinator-metrics,name=active-transaction-open-time-max
> > > > > > > > Is this a Yammer or kafka metric? The former uses the camel
> > case
> > > > for
> > > > > > name
> > > > > > > > and type. The latter uses the hyphen notation, but doesn't
> have
> > > the
> > > > > > type
> > > > > > > > attribute.
> > > > > > > >
> > > > > > > > 33. "If the value is 'true' then the corresponding field is
> set
> > > in
> > > > > the
> > > > > > > > InitProducerIdRequest and the KafkaProducer object is set
> into
> > a
> > > > > state
> > > > > > > > which only allows calling .commitTransaction or
> > > .abortTransaction."
> > > > > > > > We should also allow .completeTransaction, right?
> > > > > > > >
> > > > > > > > Jun
> > > > > > > >
> > > > > > > >
> > > > > > > > On Tue, Feb 6, 2024 at 3:29 PM Artem Livshits
> > > > > > > > <alivsh...@confluent.io.invalid> wrote:
> > > > > > > >
> > > > > > > > > Hi Jun,
> > > > > > > > >
> > > > > > > > > > 20. For Flink usage, it seems that the APIs used to abort
> > and
> > > > > > commit
> > > > > > > a
> > > > > > > > > prepared txn are not symmetric.
> > > > > > > > >
> > > > > > > > > For Flink it is expected that Flink would call
> > > .commitTransaction
> > > > > or
> > > > > > > > > .abortTransaction directly, it wouldn't need to deal with
> > > > > > > > PreparedTxnState,
> > > > > > > > > the outcome is actually determined by the Flink's job
> > manager,
> > > > not
> > > > > by
> > > > > > > > > comparison of PreparedTxnState.  So for Flink, if the Kafka
> > > sync
> > > > > > > crashes
> > > > > > > > > and restarts there are 2 cases:
> > > > > > > > >
> > > > > > > > > 1. Transaction is not prepared.  In that case just call
> > > > > > > > > producer.initTransactions(false) and then can start
> > > transactions
> > > > as
> > > > > > > > needed.
> > > > > > > > > 2. Transaction is prepared.  In that case call
> > > > > > > > > producer.initTransactions(true) and wait for the decision
> > from
> > > > the
> > > > > > job
> > > > > > > > > manager.  Note that it's not given that the transaction
> will
> > > get
> > > > > > > > committed,
> > > > > > > > > the decision could also be an abort.
> > > > > > > > >
> > > > > > > > >  > 21. transaction.max.timeout.ms could in theory be
> > MAX_INT.
> > > > > > Perhaps
> > > > > > > we
> > > > > > > > > could use a negative timeout in the record to indicate 2PC?
> > > > > > > > >
> > > > > > > > > -1 sounds good, updated.
> > > > > > > > >
> > > > > > > > > > 30. The KIP has two different APIs to abort an ongoing
> txn.
> > > Do
> > > > we
> > > > > > > need
> > > > > > > > > both?
> > > > > > > > >
> > > > > > > > > I think of producer.initTransactions() to be an
> > implementation
> > > > for
> > > > > > > > > adminClient.forceTerminateTransaction(transactionalId).
> > > > > > > > >
> > > > > > > > > > 31. "This would flush all the pending messages and
> > transition
> > > > the
> > > > > > > > > producer
> > > > > > > > >
> > > > > > > > > Updated the KIP to clarify that IllegalStateException will
> be
> > > > > thrown.
> > > > > > > > >
> > > > > > > > > -Artem
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Mon, Feb 5, 2024 at 2:22 PM Jun Rao
> > > <j...@confluent.io.invalid
> > > > >
> > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi, Artem,
> > > > > > > > > >
> > > > > > > > > > Thanks for the reply.
> > > > > > > > > >
> > > > > > > > > > 20. For Flink usage, it seems that the APIs used to abort
> > and
> > > > > > commit
> > > > > > > a
> > > > > > > > > > prepared txn are not symmetric.
> > > > > > > > > > To abort, the app will just call
> > > > > > > > > >   producer.initTransactions(false)
> > > > > > > > > >
> > > > > > > > > > To commit, the app needs to call
> > > > > > > > > >   producer.initTransactions(true)
> > > > > > > > > >   producer.completeTransaction(preparedTxnState)
> > > > > > > > > >
> > > > > > > > > > Will this be a concern? For the dual-writer usage, both
> > > > > > abort/commit
> > > > > > > > use
> > > > > > > > > > the same API.
> > > > > > > > > >
> > > > > > > > > > 21. transaction.max.timeout.ms could in theory be
> MAX_INT.
> > > > > Perhaps
> > > > > > > we
> > > > > > > > > > could
> > > > > > > > > > use a negative timeout in the record to indicate 2PC?
> > > > > > > > > >
> > > > > > > > > > 30. The KIP has two different APIs to abort an ongoing
> txn.
> > > Do
> > > > we
> > > > > > > need
> > > > > > > > > > both?
> > > > > > > > > >   producer.initTransactions(false)
> > > > > > > > > >   adminClient.forceTerminateTransaction(transactionalId)
> > > > > > > > > >
> > > > > > > > > > 31. "This would flush all the pending messages and
> > transition
> > > > the
> > > > > > > > > producer
> > > > > > > > > > into a mode where only .commitTransaction,
> > .abortTransaction,
> > > > or
> > > > > > > > > > .completeTransaction could be called.  If the call is
> > > > successful
> > > > > > (all
> > > > > > > > > > messages successfully got flushed to all partitions) the
> > > > > > transaction
> > > > > > > is
> > > > > > > > > > prepared."
> > > > > > > > > >  If the producer calls send() in that state, what
> exception
> > > > will
> > > > > > the
> > > > > > > > > caller
> > > > > > > > > > receive?
> > > > > > > > > >
> > > > > > > > > > Jun
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Fri, Feb 2, 2024 at 3:34 PM Artem Livshits
> > > > > > > > > > <alivsh...@confluent.io.invalid> wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi Jun,
> > > > > > > > > > >
> > > > > > > > > > > >  Then, should we change the following in the example
> to
> > > use
> > > > > > > > > > > InitProducerId(true) instead?
> > > > > > > > > > >
> > > > > > > > > > > We could. I just thought that it's good to make the
> > example
> > > > > > > > > > self-contained
> > > > > > > > > > > by starting from a clean state.
> > > > > > > > > > >
> > > > > > > > > > > > Also, could Flink just follow the dual-write recipe?
> > > > > > > > > > >
> > > > > > > > > > > I think it would bring some unnecessary logic to Flink
> > (or
> > > > any
> > > > > > > other
> > > > > > > > > > system
> > > > > > > > > > > that already has a transaction coordinator and just
> wants
> > > to
> > > > > > drive
> > > > > > > > > Kafka
> > > > > > > > > > to
> > > > > > > > > > > the desired state).  We could discuss it with Flink
> > folks,
> > > > the
> > > > > > > > current
> > > > > > > > > > > proposal was developed in collaboration with them.
> > > > > > > > > > >
> > > > > > > > > > > > 21. Could a non 2pc user explicitly set the
> > > > > > TransactionTimeoutMs
> > > > > > > to
> > > > > > > > > > > Integer.MAX_VALUE?
> > > > > > > > > > >
> > > > > > > > > > > The server would reject this for regular transactions,
> it
> > > > only
> > > > > > > > accepts
> > > > > > > > > > > values that are <= *transaction.max.timeout.ms
> > > > > > > > > > > <http://transaction.max.timeout.ms> *(a broker
> config).
> > > > > > > > > > >
> > > > > > > > > > > > 24. Hmm, In KIP-890, without 2pc, the coordinator
> > expects
> > > > the
> > > > > > > > endTxn
> > > > > > > > > > > request to use the ongoing pid. ...
> > > > > > > > > > >
> > > > > > > > > > > Without 2PC there is no case where the pid could change
> > > > between
> > > > > > > > > starting
> > > > > > > > > > a
> > > > > > > > > > > transaction and endTxn (InitProducerId would abort any
> > > > ongoing
> > > > > > > > > > > transaction).  WIth 2PC there is now a case where there
> > > could
> > > > > be
> > > > > > > > > > > InitProducerId that can change the pid without aborting
> > the
> > > > > > > > > transaction,
> > > > > > > > > > so
> > > > > > > > > > > we need to handle that.  I wouldn't say that the flow
> is
> > > > > > different,
> > > > > > > > but
> > > > > > > > > > > it's rather extended to handle new cases.  The main
> > > principle
> > > > > is
> > > > > > > > still
> > > > > > > > > > the
> > > > > > > > > > > same -- for all operations we use the latest
> > "operational"
> > > > pid
> > > > > > and
> > > > > > > > > epoch
> > > > > > > > > > > known to the client, this way we guarantee that we can
> > > fence
> > > > > > > zombie /
> > > > > > > > > > split
> > > > > > > > > > > brain clients by disrupting the "latest known" pid +
> > epoch
> > > > > > > > progression.
> > > > > > > > > > >
> > > > > > > > > > > > 25. "We send out markers using the original ongoing
> > > > > transaction
> > > > > > > > > > > ProducerId and ProducerEpoch" ...
> > > > > > > > > > >
> > > > > > > > > > > Updated.
> > > > > > > > > > >
> > > > > > > > > > > -Artem
> > > > > > > > > > >
> > > > > > > > > > > On Mon, Jan 29, 2024 at 4:57 PM Jun Rao
> > > > > <j...@confluent.io.invalid
> > > > > > >
> > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Hi, Artem,
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks for the reply.
> > > > > > > > > > > >
> > > > > > > > > > > > 20. So for the dual-write recipe, we should always
> call
> > > > > > > > > > > > InitProducerId(keepPreparedTxn=true) from the
> producer?
> > > > Then,
> > > > > > > > should
> > > > > > > > > we
> > > > > > > > > > > > change the following in the example to use
> > > > > InitProducerId(true)
> > > > > > > > > > instead?
> > > > > > > > > > > > 1. InitProducerId(false); TC STATE: Empty,
> > ProducerId=42,
> > > > > > > > > > > > ProducerEpoch=MAX-1, PrevProducerId=-1,
> > > NextProducerId=-1,
> > > > > > > > > > > > NextProducerEpoch=-1; RESPONSE ProducerId=42,
> > > Epoch=MAX-1,
> > > > > > > > > > > > OngoingTxnProducerId=-1, OngoingTxnEpoch=-1.
> > > > > > > > > > > > Also, could Flink just follow the dual-write recipe?
> > It's
> > > > > > simpler
> > > > > > > > if
> > > > > > > > > > > there
> > > > > > > > > > > > is one way to solve the 2pc issue.
> > > > > > > > > > > >
> > > > > > > > > > > > 21. Could a non 2pc user explicitly set the
> > > > > > TransactionTimeoutMs
> > > > > > > to
> > > > > > > > > > > > Integer.MAX_VALUE?
> > > > > > > > > > > >
> > > > > > > > > > > > 24. Hmm, In KIP-890, without 2pc, the coordinator
> > expects
> > > > the
> > > > > > > > endTxn
> > > > > > > > > > > > request to use the ongoing pid. With 2pc, the
> > coordinator
> > > > now
> > > > > > > > expects
> > > > > > > > > > the
> > > > > > > > > > > > endTxn request to use the next pid. So, the flow is
> > > > > different,
> > > > > > > > right?
> > > > > > > > > > > >
> > > > > > > > > > > > 25. "We send out markers using the original ongoing
> > > > > transaction
> > > > > > > > > > > ProducerId
> > > > > > > > > > > > and ProducerEpoch"
> > > > > > > > > > > > We should use ProducerEpoch + 1 in the marker, right?
> > > > > > > > > > > >
> > > > > > > > > > > > Jun
> > > > > > > > > > > >
> > > > > > > > > > > > On Fri, Jan 26, 2024 at 8:35 PM Artem Livshits
> > > > > > > > > > > > <alivsh...@confluent.io.invalid> wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > Hi Jun,
> > > > > > > > > > > > >
> > > > > > > > > > > > > > 20.  I am a bit confused by how we set
> > > keepPreparedTxn.
> > > > > > ...
> > > > > > > > > > > > >
> > > > > > > > > > > > > keepPreparedTxn=true informs the transaction
> > > coordinator
> > > > > that
> > > > > > > it
> > > > > > > > > > should
> > > > > > > > > > > > > keep the ongoing transaction, if any.  If the
> > > > > > > > > keepPreparedTxn=false,
> > > > > > > > > > > then
> > > > > > > > > > > > > any ongoing transaction is aborted (this is exactly
> > the
> > > > > > current
> > > > > > > > > > > > behavior).
> > > > > > > > > > > > > enable2Pc is a separate argument that is controlled
> > by
> > > > the
> > > > > > > > > > > > > *transaction.two.phase.commit.enable *setting on
> the
> > > > > client.
> > > > > > > > > > > > >
> > > > > > > > > > > > > To start 2PC, the client just needs to set
> > > > > > > > > > > > > *transaction.two.phase.commit.enable*=true in the
> > > config.
> > > > > > Then
> > > > > > > > if
> > > > > > > > > > the
> > > > > > > > > > > > > client knows the status of the transaction upfront
> > (in
> > > > the
> > > > > > case
> > > > > > > > of
> > > > > > > > > > > Flink,
> > > > > > > > > > > > > Flink keeps the knowledge if the transaction is
> > > prepared
> > > > in
> > > > > > its
> > > > > > > > own
> > > > > > > > > > > > store,
> > > > > > > > > > > > > so it always knows upfront), it can set
> > keepPreparedTxn
> > > > > > > > > accordingly,
> > > > > > > > > > > then
> > > > > > > > > > > > > if the transaction was prepared, it'll be ready for
> > the
> > > > > > client
> > > > > > > to
> > > > > > > > > > > > complete
> > > > > > > > > > > > > the appropriate action; if the client doesn't have
> a
> > > > > > knowledge
> > > > > > > > that
> > > > > > > > > > the
> > > > > > > > > > > > > transaction is prepared, keepPreparedTxn is going
> to
> > be
> > > > > > false,
> > > > > > > in
> > > > > > > > > > which
> > > > > > > > > > > > > case we'll get to a clean state (the same way we do
> > > > today).
> > > > > > > > > > > > >
> > > > > > > > > > > > > For the dual-write recipe, the client doesn't know
> > > > upfront
> > > > > if
> > > > > > > the
> > > > > > > > > > > > > transaction is prepared, this information is
> > implicitly
> > > > > > encoded
> > > > > > > > > > > > > PreparedTxnState value that can be used to resolve
> > the
> > > > > > > > transaction
> > > > > > > > > > > state.
> > > > > > > > > > > > > In that case, keepPreparedTxn should always be
> true,
> > > > > because
> > > > > > we
> > > > > > > > > don't
> > > > > > > > > > > > know
> > > > > > > > > > > > > upfront and we don't want to accidentally abort a
> > > > committed
> > > > > > > > > > > transaction.
> > > > > > > > > > > > >
> > > > > > > > > > > > > The forceTerminateTransaction call can just use
> > > > > > > > > > keepPreparedTxn=false,
> > > > > > > > > > > it
> > > > > > > > > > > > > actually doesn't matter if it sets Enable2Pc flag.
> > > > > > > > > > > > >
> > > > > > > > > > > > > > 21. TransactionLogValue: Do we need some field to
> > > > > identify
> > > > > > > > > whether
> > > > > > > > > > > this
> > > > > > > > > > > > > is written for 2PC so that ongoing txn is never
> auto
> > > > > aborted?
> > > > > > > > > > > > >
> > > > > > > > > > > > > The TransactionTimeoutMs would be set to
> > > > Integer.MAX_VALUE
> > > > > if
> > > > > > > 2PC
> > > > > > > > > was
> > > > > > > > > > > > > enabled.  I've added a note to the KIP about this.
> > > > > > > > > > > > >
> > > > > > > > > > > > > > 22
> > > > > > > > > > > > >
> > > > > > > > > > > > > You're right it's a typo.  I fixed it as well as
> > step 9
> > > > > > > (REQUEST:
> > > > > > > > > > > > > ProducerId=73, ProducerEpoch=MAX).
> > > > > > > > > > > > >
> > > > > > > > > > > > > > 23. It's a bit weird that Enable2Pc is driven by
> a
> > > > config
> > > > > > > while
> > > > > > > > > > > > > KeepPreparedTxn is from an API param ...
> > > > > > > > > > > > >
> > > > > > > > > > > > > The intent to use 2PC doesn't change from
> transaction
> > > to
> > > > > > > > > transaction,
> > > > > > > > > > > but
> > > > > > > > > > > > > the intent to keep prepared txn may change from
> > > > transaction
> > > > > > to
> > > > > > > > > > > > > transaction.  In dual-write recipes the distinction
> > is
> > > > not
> > > > > > > clear,
> > > > > > > > > but
> > > > > > > > > > > for
> > > > > > > > > > > > > use cases where keepPreparedTxn value is known
> > upfront
> > > > > (e.g.
> > > > > > > > Flink)
> > > > > > > > > > > it's
> > > > > > > > > > > > > more prominent.  E.g. a Flink's Kafka sink operator
> > > could
> > > > > be
> > > > > > > > > deployed
> > > > > > > > > > > > with
> > > > > > > > > > > > > *transaction.two.phase.commit.enable*=true
> hardcoded
> > in
> > > > the
> > > > > > > > image,
> > > > > > > > > > but
> > > > > > > > > > > > > keepPreparedTxn cannot be hardcoded in the image,
> > > because
> > > > > it
> > > > > > > > > depends
> > > > > > > > > > on
> > > > > > > > > > > > the
> > > > > > > > > > > > > job manager's state.
> > > > > > > > > > > > >
> > > > > > > > > > > > > > 24
> > > > > > > > > > > > >
> > > > > > > > > > > > > The flow is actually going to be the same way as it
> > is
> > > > now
> > > > > --
> > > > > > > the
> > > > > > > > > > > "main"
> > > > > > > > > > > > > producer id + epoch needs to be used in all
> > operations
> > > to
> > > > > > > prevent
> > > > > > > > > > > fencing
> > > > > > > > > > > > > (it's sort of a common "header" in all RPC calls
> that
> > > > > follow
> > > > > > > the
> > > > > > > > > same
> > > > > > > > > > > > > rules).  The ongoing txn info is just additional
> info
> > > for
> > > > > > > making
> > > > > > > > a
> > > > > > > > > > > > commit /
> > > > > > > > > > > > > abort decision based on the PreparedTxnState from
> the
> > > DB.
> > > > > > > > > > > > >
> > > > > > > > > > > > > --Artem
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Thu, Jan 25, 2024 at 11:05 AM Jun Rao
> > > > > > > > <j...@confluent.io.invalid
> > > > > > > > > >
> > > > > > > > > > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > > Hi, Artem,
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Thanks for the reply. A few more comments.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > 20. I am a bit confused by how we set
> > > keepPreparedTxn.
> > > > > From
> > > > > > > the
> > > > > > > > > > KIP,
> > > > > > > > > > > I
> > > > > > > > > > > > > got
> > > > > > > > > > > > > > the following (1) to start 2pc, we call
> > > > > > > > > > > > > > InitProducerId(keepPreparedTxn=false); (2) when
> the
> > > > > > producer
> > > > > > > > > fails
> > > > > > > > > > > and
> > > > > > > > > > > > > > needs to do recovery, it calls
> > > > > > > > > > InitProducerId(keepPreparedTxn=true);
> > > > > > > > > > > > (3)
> > > > > > > > > > > > > > Admin.forceTerminateTransaction() calls
> > > > > > > > > > > > > > InitProducerId(keepPreparedTxn=false).
> > > > > > > > > > > > > > 20.1 In (1), when a producer calls
> > > > InitProducerId(false)
> > > > > > with
> > > > > > > > 2pc
> > > > > > > > > > > > > enabled,
> > > > > > > > > > > > > > and there is an ongoing txn, should the server
> > return
> > > > an
> > > > > > > error
> > > > > > > > to
> > > > > > > > > > the
> > > > > > > > > > > > > > InitProducerId request? If so, what would be the
> > > error
> > > > > > code?
> > > > > > > > > > > > > > 20.2 How do we distinguish between (1) and (3)?
> > It's
> > > > the
> > > > > > same
> > > > > > > > API
> > > > > > > > > > > call
> > > > > > > > > > > > > but
> > > > > > > > > > > > > > (1) doesn't abort ongoing txn and (2) does.
> > > > > > > > > > > > > > 20.3 The usage in (1) seems unintuitive. 2pc
> > implies
> > > > > > keeping
> > > > > > > > the
> > > > > > > > > > > > ongoing
> > > > > > > > > > > > > > txn. So, setting keepPreparedTxn to false to
> start
> > > 2pc
> > > > > > seems
> > > > > > > > > > counter
> > > > > > > > > > > > > > intuitive.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > 21. TransactionLogValue: Do we need some field to
> > > > > identify
> > > > > > > > > whether
> > > > > > > > > > > this
> > > > > > > > > > > > > is
> > > > > > > > > > > > > > written for 2PC so that ongoing txn is never auto
> > > > > aborted?
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > 22. "8. InitProducerId(true); TC STATE: Ongoing,
> > > > > > > ProducerId=42,
> > > > > > > > > > > > > > ProducerEpoch=MAX-1, PrevProducerId=-1,
> > > > > NextProducerId=73,
> > > > > > > > > > > > > > NextProducerEpoch=MAX; RESPONSE ProducerId=73,
> > > > > Epoch=MAX-1,
> > > > > > > > > > > > > > OngoingTxnProducerId=42, OngoingTxnEpoch=MAX-1"
> > > > > > > > > > > > > > It seems in the above example, Epoch in RESPONSE
> > > should
> > > > > be
> > > > > > > MAX
> > > > > > > > to
> > > > > > > > > > > match
> > > > > > > > > > > > > > NextProducerEpoch?
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > 23. It's a bit weird that Enable2Pc is driven by
> a
> > > > config
> > > > > > > > > > > > > > while KeepPreparedTxn is from an API param.
> Should
> > we
> > > > > make
> > > > > > > them
> > > > > > > > > > more
> > > > > > > > > > > > > > consistent since they seem related?
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > 24. "9. Commit; REQUEST: ProducerId=73,
> > > > > > ProducerEpoch=MAX-1;
> > > > > > > TC
> > > > > > > > > > > STATE:
> > > > > > > > > > > > > > PrepareCommit, ProducerId=42, ProducerEpoch=MAX,
> > > > > > > > > PrevProducerId=73,
> > > > > > > > > > > > > > NextProducerId=85, NextProducerEpoch=0; RESPONSE
> > > > > > > ProducerId=85,
> > > > > > > > > > > > Epoch=0,
> > > > > > > > > > > > > > When a commit request is sent, it uses the latest
> > > > > > ProducerId
> > > > > > > > and
> > > > > > > > > > > > > > ProducerEpoch."
> > > > > > > > > > > > > > The step where we use the next produceId to
> commit
> > an
> > > > old
> > > > > > txn
> > > > > > > > > > works,
> > > > > > > > > > > > but
> > > > > > > > > > > > > > can be confusing. It's going to be hard for
> people
> > > > > > > implementing
> > > > > > > > > > this
> > > > > > > > > > > > new
> > > > > > > > > > > > > > client protocol to figure out when to use the
> > current
> > > > or
> > > > > > the
> > > > > > > > new
> > > > > > > > > > > > > producerId
> > > > > > > > > > > > > > in the EndTxnRequest. One potential way to
> improve
> > > this
> > > > > is
> > > > > > to
> > > > > > > > > > extend
> > > > > > > > > > > > > > EndTxnRequest with a new field like
> > > > > expectedNextProducerId.
> > > > > > > > Then
> > > > > > > > > we
> > > > > > > > > > > can
> > > > > > > > > > > > > > always use the old produceId in the existing
> field,
> > > but
> > > > > set
> > > > > > > > > > > > > > expectedNextProducerId to bypass the fencing
> logic
> > > when
> > > > > > > needed.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Jun
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > On Mon, Dec 18, 2023 at 2:06 PM Artem Livshits
> > > > > > > > > > > > > > <alivsh...@confluent.io.invalid> wrote:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Hi Jun,
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Thank you for the comments.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > 10. For the two new fields in Enable2Pc and
> > > > > > > KeepPreparedTxn
> > > > > > > > > ...
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > I added a note that all combinations are valid.
> > > > > > > > > Enable2Pc=false
> > > > > > > > > > &
> > > > > > > > > > > > > > > KeepPreparedTxn=true could be potentially
> useful
> > > for
> > > > > > > backward
> > > > > > > > > > > > > > compatibility
> > > > > > > > > > > > > > > with Flink, when the new version of Flink that
> > > > > implements
> > > > > > > > > KIP-319
> > > > > > > > > > > > tries
> > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > work with a cluster that doesn't authorize 2PC.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > 11.  InitProducerIdResponse: If there is no
> > > ongoing
> > > > > > txn,
> > > > > > > > what
> > > > > > > > > > > will
> > > > > > > > > > > > > > > OngoingTxnProducerId and OngoingTxnEpoch be
> set?
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > I added a note that they will be set to -1.
> The
> > > > client
> > > > > > > then
> > > > > > > > > will
> > > > > > > > > > > > know
> > > > > > > > > > > > > > that
> > > > > > > > > > > > > > > there is no ongoing txn and
> .completeTransaction
> > > > > becomes
> > > > > > a
> > > > > > > > > no-op
> > > > > > > > > > > (but
> > > > > > > > > > > > > > still
> > > > > > > > > > > > > > > required before .send is enabled).
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > 12. ListTransactionsRequest related changes:
> It
> > > > seems
> > > > > > > those
> > > > > > > > > are
> > > > > > > > > > > > > already
> > > > > > > > > > > > > > > covered by KIP-994?
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Removed from this KIP.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > 13. TransactionalLogValue ...
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > This is now updated to work on top of KIP-890.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > 14. "Note that the (producerId, epoch) pair
> > that
> > > > > > > > corresponds
> > > > > > > > > to
> > > > > > > > > > > the
> > > > > > > > > > > > > > > ongoing transaction ...
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > This is now updated to work on top of KIP-890.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > 15. active-transaction-total-time-max : ...
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Updated.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > 16. "transaction.two.phase.commit.enable The
> > > > default
> > > > > > > would
> > > > > > > > be
> > > > > > > > > > > > > ‘false’.
> > > > > > > > > > > > > > > If it’s ‘false’, 2PC functionality is disabled
> > even
> > > > if
> > > > > > the
> > > > > > > > ACL
> > > > > > > > > is
> > > > > > > > > > > set
> > > > > > > > > > > > > ...
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Disabling 2PC effectively removes all
> > authorization
> > > > to
> > > > > > use
> > > > > > > > it,
> > > > > > > > > > > hence
> > > > > > > > > > > > I
> > > > > > > > > > > > > > > thought TRANSACTIONAL_ID_AUTHORIZATION_FAILED
> > would
> > > > be
> > > > > > > > > > appropriate.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Do you suggest using a different error code for
> > 2PC
> > > > > > > > > authorization
> > > > > > > > > > > vs
> > > > > > > > > > > > > some
> > > > > > > > > > > > > > > other authorization (e.g.
> > > > > > > > > > > TRANSACTIONAL_ID_2PC_AUTHORIZATION_FAILED)
> > > > > > > > > > > > > or a
> > > > > > > > > > > > > > > different code for disabled vs. unauthorised
> > (e.g.
> > > > > > > > > > > > > > > TWO_PHASE_COMMIT_DISABLED) or both?
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > 17. completeTransaction(). We expect this to
> be
> > > > only
> > > > > > used
> > > > > > > > > > during
> > > > > > > > > > > > > > > recovery.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > It can also be used if, say, a commit to the
> > > database
> > > > > > fails
> > > > > > > > and
> > > > > > > > > > the
> > > > > > > > > > > > > > result
> > > > > > > > > > > > > > > is inconclusive, e.g.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > 1. Begin DB transaction
> > > > > > > > > > > > > > > 2. Begin Kafka transaction
> > > > > > > > > > > > > > > 3. Prepare Kafka transaction
> > > > > > > > > > > > > > > 4. Commit DB transaction
> > > > > > > > > > > > > > > 5. The DB commit fails, figure out the state of
> > the
> > > > > > > > transaction
> > > > > > > > > > by
> > > > > > > > > > > > > > reading
> > > > > > > > > > > > > > > the PreparedTxnState from DB
> > > > > > > > > > > > > > > 6. Complete Kafka transaction with the
> > > > > PreparedTxnState.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > 18. "either prepareTransaction was called or
> > > > > > > > > > > initTransaction(true)
> > > > > > > > > > > > > was
> > > > > > > > > > > > > > > called": "either" should be "neither"?
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Updated.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > 19. Since InitProducerId always bumps up the
> > > epoch,
> > > > > it
> > > > > > > > > creates
> > > > > > > > > > a
> > > > > > > > > > > > > > > situation ...
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > InitProducerId only bumps the producer epoch,
> the
> > > > > ongoing
> > > > > > > > > > > transaction
> > > > > > > > > > > > > > epoch
> > > > > > > > > > > > > > > stays the same, no matter how many times the
> > > > > > InitProducerId
> > > > > > > > is
> > > > > > > > > > > called
> > > > > > > > > > > > > > > before the transaction is completed.
> Eventually
> > > the
> > > > > > epoch
> > > > > > > > may
> > > > > > > > > > > > > overflow,
> > > > > > > > > > > > > > > and then a new producer id would be allocated,
> > but
> > > > the
> > > > > > > > ongoing
> > > > > > > > > > > > > > transaction
> > > > > > > > > > > > > > > producer id would stay the same.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > I've added a couple examples in the KIP (
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-939%3A+Support+Participation+in+2PC#KIP939:SupportParticipationin2PC-PersistedDataFormatChanges
> > > > > > > > > > > > > > > )
> > > > > > > > > > > > > > > that walk through some scenarios and show how
> the
> > > > state
> > > > > > is
> > > > > > > > > > changed.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > -Artem
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > On Fri, Dec 8, 2023 at 6:04 PM Jun Rao
> > > > > > > > > <j...@confluent.io.invalid
> > > > > > > > > > >
> > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Hi, Artem,
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Thanks for the KIP. A few comments below.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > 10. For the two new fields in Enable2Pc and
> > > > > > > KeepPreparedTxn
> > > > > > > > > in
> > > > > > > > > > > > > > > > InitProducerId, it would be useful to
> document
> > a
> > > > bit
> > > > > > more
> > > > > > > > > > detail
> > > > > > > > > > > on
> > > > > > > > > > > > > > what
> > > > > > > > > > > > > > > > values are set under what cases. For example,
> > are
> > > > all
> > > > > > > four
> > > > > > > > > > > > > combinations
> > > > > > > > > > > > > > > > valid?
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > 11.  InitProducerIdResponse: If there is no
> > > ongoing
> > > > > > txn,
> > > > > > > > what
> > > > > > > > > > > will
> > > > > > > > > > > > > > > > OngoingTxnProducerId and OngoingTxnEpoch be
> > set?
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > 12. ListTransactionsRequest related changes:
> It
> > > > seems
> > > > > > > those
> > > > > > > > > are
> > > > > > > > > > > > > already
> > > > > > > > > > > > > > > > covered by KIP-994?
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > 13. TransactionalLogValue: Could we name
> > > > > > > > > TransactionProducerId
> > > > > > > > > > > and
> > > > > > > > > > > > > > > > ProducerId better? It's not clear from the
> name
> > > > which
> > > > > > is
> > > > > > > > for
> > > > > > > > > > > which.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > 14. "Note that the (producerId, epoch) pair
> > that
> > > > > > > > corresponds
> > > > > > > > > to
> > > > > > > > > > > the
> > > > > > > > > > > > > > > ongoing
> > > > > > > > > > > > > > > > transaction is going to be written instead of
> > the
> > > > > > > existing
> > > > > > > > > > > > ProducerId
> > > > > > > > > > > > > > and
> > > > > > > > > > > > > > > > ProducerEpoch fields (which are renamed to
> > > reflect
> > > > > the
> > > > > > > > > > semantics)
> > > > > > > > > > > > to
> > > > > > > > > > > > > > > > support downgrade.": I am a bit confused on
> > that.
> > > > Are
> > > > > > we
> > > > > > > > > > writing
> > > > > > > > > > > > > > > different
> > > > > > > > > > > > > > > > values to the existing fields? Then, we can't
> > > > > > downgrade,
> > > > > > > > > right?
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > 15. active-transaction-total-time-max : Would
> > > > > > > > > > > > > > > > active-transaction-open-time-max be more
> > > intuitive?
> > > > > > Also,
> > > > > > > > > could
> > > > > > > > > > > we
> > > > > > > > > > > > > > > include
> > > > > > > > > > > > > > > > the full name (group, tags, etc)?
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > 16. "transaction.two.phase.commit.enable The
> > > > default
> > > > > > > would
> > > > > > > > be
> > > > > > > > > > > > > ‘false’.
> > > > > > > > > > > > > > > If
> > > > > > > > > > > > > > > > it’s ‘false’, 2PC functionality is disabled
> > even
> > > if
> > > > > the
> > > > > > > ACL
> > > > > > > > > is
> > > > > > > > > > > set,
> > > > > > > > > > > > > > > clients
> > > > > > > > > > > > > > > > that attempt to use this functionality would
> > > > receive
> > > > > > > > > > > > > > > > TRANSACTIONAL_ID_AUTHORIZATION_FAILED error."
> > > > > > > > > > > > > > > > TRANSACTIONAL_ID_AUTHORIZATION_FAILED seems
> > > > > unintuitive
> > > > > > > for
> > > > > > > > > the
> > > > > > > > > > > > > client
> > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > understand what the actual cause is.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > 17. completeTransaction(). We expect this to
> be
> > > > only
> > > > > > used
> > > > > > > > > > during
> > > > > > > > > > > > > > > recovery.
> > > > > > > > > > > > > > > > Could we document this clearly? Could we
> > prevent
> > > it
> > > > > > from
> > > > > > > > > being
> > > > > > > > > > > used
> > > > > > > > > > > > > > > > incorrectly (e.g. throw an exception if the
> > > > producer
> > > > > > has
> > > > > > > > > called
> > > > > > > > > > > > other
> > > > > > > > > > > > > > > > methods like send())?
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > 18. "either prepareTransaction was called or
> > > > > > > > > > > initTransaction(true)
> > > > > > > > > > > > > was
> > > > > > > > > > > > > > > > called": "either" should be "neither"?
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > 19. Since InitProducerId always bumps up the
> > > epoch,
> > > > > it
> > > > > > > > > creates
> > > > > > > > > > a
> > > > > > > > > > > > > > > situation
> > > > > > > > > > > > > > > > where there could be multiple outstanding
> txns.
> > > The
> > > > > > > > following
> > > > > > > > > > is
> > > > > > > > > > > an
> > > > > > > > > > > > > > > example
> > > > > > > > > > > > > > > > of a potential problem during recovery.
> > > > > > > > > > > > > > > >    The last txn epoch in the external store
> is
> > 41
> > > > > when
> > > > > > > the
> > > > > > > > > app
> > > > > > > > > > > > dies.
> > > > > > > > > > > > > > > >    Instance1 is created for recovery.
> > > > > > > > > > > > > > > >      1. (instance1)
> > > > > > InitProducerId(keepPreparedTxn=true),
> > > > > > > > > > > epoch=42,
> > > > > > > > > > > > > > > > ongoingEpoch=41
> > > > > > > > > > > > > > > >      2. (instance1) dies before
> completeTxn(41)
> > > can
> > > > > be
> > > > > > > > > called.
> > > > > > > > > > > > > > > >    Instance2 is created for recovery.
> > > > > > > > > > > > > > > >      3. (instance2)
> > > > > > InitProducerId(keepPreparedTxn=true),
> > > > > > > > > > > epoch=43,
> > > > > > > > > > > > > > > > ongoingEpoch=42
> > > > > > > > > > > > > > > >      4. (instance2) completeTxn(41) => abort
> > > > > > > > > > > > > > > >    The first problem is that 41 now is
> aborted
> > > when
> > > > > it
> > > > > > > > should
> > > > > > > > > > be
> > > > > > > > > > > > > > > committed.
> > > > > > > > > > > > > > > > The second one is that it's not clear who
> could
> > > > abort
> > > > > > > epoch
> > > > > > > > > 42,
> > > > > > > > > > > > which
> > > > > > > > > > > > > > is
> > > > > > > > > > > > > > > > still open.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Jun
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > On Thu, Dec 7, 2023 at 2:43 PM Justine Olshan
> > > > > > > > > > > > > > > <jols...@confluent.io.invalid
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > Hey Artem,
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > Thanks for the updates. I think what you
> say
> > > > makes
> > > > > > > > sense. I
> > > > > > > > > > > just
> > > > > > > > > > > > > > > updated
> > > > > > > > > > > > > > > > my
> > > > > > > > > > > > > > > > > KIP so I want to reconcile some of the
> > changes
> > > we
> > > > > > made
> > > > > > > > > > > especially
> > > > > > > > > > > > > > with
> > > > > > > > > > > > > > > > > respect to the TransactionLogValue.
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > Firstly, I believe tagged fields require a
> > > > default
> > > > > > > value
> > > > > > > > so
> > > > > > > > > > > that
> > > > > > > > > > > > if
> > > > > > > > > > > > > > > they
> > > > > > > > > > > > > > > > > are not filled, we return the default (and
> > know
> > > > > that
> > > > > > > they
> > > > > > > > > > were
> > > > > > > > > > > > > > empty).
> > > > > > > > > > > > > > > > For
> > > > > > > > > > > > > > > > > my KIP, I proposed the default for producer
> > ID
> > > > > tagged
> > > > > > > > > fields
> > > > > > > > > > > > should
> > > > > > > > > > > > > > be
> > > > > > > > > > > > > > > > -1.
> > > > > > > > > > > > > > > > > I was wondering if we could update the KIP
> to
> > > > > include
> > > > > > > the
> > > > > > > > > > > default
> > > > > > > > > > > > > > > values
> > > > > > > > > > > > > > > > > for producer ID and epoch.
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > Next, I noticed we decided to rename the
> > > fields.
> > > > I
> > > > > > > guess
> > > > > > > > > that
> > > > > > > > > > > the
> > > > > > > > > > > > > > field
> > > > > > > > > > > > > > > > > "NextProducerId" in my KIP correlates to
> > > > > "ProducerId"
> > > > > > > in
> > > > > > > > > this
> > > > > > > > > > > > KIP.
> > > > > > > > > > > > > Is
> > > > > > > > > > > > > > > > that
> > > > > > > > > > > > > > > > > correct? So we would have
> > > "TransactionProducerId"
> > > > > for
> > > > > > > the
> > > > > > > > > > > > > non-tagged
> > > > > > > > > > > > > > > > field
> > > > > > > > > > > > > > > > > and have "ProducerId" (NextProducerId) and
> > > > > > > > "PrevProducerId"
> > > > > > > > > > as
> > > > > > > > > > > > > tagged
> > > > > > > > > > > > > > > > > fields the final version after KIP-890 and
> > > > KIP-936
> > > > > > are
> > > > > > > > > > > > implemented.
> > > > > > > > > > > > > > Is
> > > > > > > > > > > > > > > > this
> > > > > > > > > > > > > > > > > correct? I think the tags will need
> updating,
> > > but
> > > > > > that
> > > > > > > is
> > > > > > > > > > > > trivial.
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > The final question I had was with respect
> to
> > > > > storing
> > > > > > > the
> > > > > > > > > new
> > > > > > > > > > > > epoch.
> > > > > > > > > > > > > > In
> > > > > > > > > > > > > > > > > KIP-890 part 2 (epoch bumps) I think we
> > > concluded
> > > > > > that
> > > > > > > we
> > > > > > > > > > don't
> > > > > > > > > > > > > need
> > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > > store the epoch since we can interpret the
> > > > previous
> > > > > > > epoch
> > > > > > > > > > based
> > > > > > > > > > > > on
> > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > producer ID. But here we could call the
> > > > > > InitProducerId
> > > > > > > > > > multiple
> > > > > > > > > > > > > times
> > > > > > > > > > > > > > > and
> > > > > > > > > > > > > > > > > we only want the producer with the correct
> > > epoch
> > > > to
> > > > > > be
> > > > > > > > able
> > > > > > > > > > to
> > > > > > > > > > > > > commit
> > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > transaction. Is that the correct reasoning
> > for
> > > > why
> > > > > we
> > > > > > > > need
> > > > > > > > > > > epoch
> > > > > > > > > > > > > here
> > > > > > > > > > > > > > > but
> > > > > > > > > > > > > > > > > not the Prepare/Commit state.
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > > > Justine
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > On Wed, Nov 22, 2023 at 9:48 AM Artem
> > Livshits
> > > > > > > > > > > > > > > > > <alivsh...@confluent.io.invalid> wrote:
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > Hi Justine,
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > After thinking a bit about supporting
> > atomic
> > > > dual
> > > > > > > > writes
> > > > > > > > > > for
> > > > > > > > > > > > > Kafka
> > > > > > > > > > > > > > +
> > > > > > > > > > > > > > > > > NoSQL
> > > > > > > > > > > > > > > > > > database, I came to a conclusion that we
> do
> > > > need
> > > > > to
> > > > > > > > bump
> > > > > > > > > > the
> > > > > > > > > > > > > epoch
> > > > > > > > > > > > > > > even
> > > > > > > > > > > > > > > > > > with
> InitProducerId(keepPreparedTxn=true).
> > > As
> > > > I
> > > > > > > > > described
> > > > > > > > > > in
> > > > > > > > > > > > my
> > > > > > > > > > > > > > > > previous
> > > > > > > > > > > > > > > > > > email, we wouldn't need to bump the epoch
> > to
> > > > > > protect
> > > > > > > > from
> > > > > > > > > > > > zombies
> > > > > > > > > > > > > > so
> > > > > > > > > > > > > > > > that
> > > > > > > > > > > > > > > > > > reasoning is still true.  But we cannot
> > > protect
> > > > > > from
> > > > > > > > > > > > split-brain
> > > > > > > > > > > > > > > > > scenarios
> > > > > > > > > > > > > > > > > > when two or more instances of a producer
> > with
> > > > the
> > > > > > > same
> > > > > > > > > > > > > > transactional
> > > > > > > > > > > > > > > id
> > > > > > > > > > > > > > > > > try
> > > > > > > > > > > > > > > > > > to produce at the same time.  The
> > dual-write
> > > > > > example
> > > > > > > > for
> > > > > > > > > > SQL
> > > > > > > > > > > > > > > databases
> > > > > > > > > > > > > > > > (
> > > > > > > > > > > > > > > > > >
> > > > https://github.com/apache/kafka/pull/14231/files
> > > > > )
> > > > > > > > > doesn't
> > > > > > > > > > > > have a
> > > > > > > > > > > > > > > > > > split-brain problem because execution is
> > > > > protected
> > > > > > by
> > > > > > > > the
> > > > > > > > > > > > update
> > > > > > > > > > > > > > lock
> > > > > > > > > > > > > > > > on
> > > > > > > > > > > > > > > > > > the transaction state record; however
> NoSQL
> > > > > > databases
> > > > > > > > may
> > > > > > > > > > not
> > > > > > > > > > > > > have
> > > > > > > > > > > > > > > this
> > > > > > > > > > > > > > > > > > protection (I'll write an example for
> NoSQL
> > > > > > database
> > > > > > > > > > > dual-write
> > > > > > > > > > > > > > > soon).
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > In a nutshell, here is an example of a
> > > > > split-brain
> > > > > > > > > > scenario:
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > >    1. (instance1)
> > > > > > > InitProducerId(keepPreparedTxn=true),
> > > > > > > > > got
> > > > > > > > > > > > > > epoch=42
> > > > > > > > > > > > > > > > > >    2. (instance2)
> > > > > > > InitProducerId(keepPreparedTxn=true),
> > > > > > > > > got
> > > > > > > > > > > > > > epoch=42
> > > > > > > > > > > > > > > > > >    3. (instance1) CommitTxn, epoch bumped
> > to
> > > 43
> > > > > > > > > > > > > > > > > >    4. (instance2) CommitTxn, this is
> > > > considered a
> > > > > > > > retry,
> > > > > > > > > so
> > > > > > > > > > > it
> > > > > > > > > > > > > got
> > > > > > > > > > > > > > > > epoch
> > > > > > > > > > > > > > > > > 43
> > > > > > > > > > > > > > > > > >    as well
> > > > > > > > > > > > > > > > > >    5. (instance1) Produce messageA
> > > w/sequence 1
> > > > > > > > > > > > > > > > > >    6. (instance2) Produce messageB
> > w/sequence
> > > > 1,
> > > > > > this
> > > > > > > > is
> > > > > > > > > > > > > > considered a
> > > > > > > > > > > > > > > > > >    duplicate
> > > > > > > > > > > > > > > > > >    7. (instance2) Produce messageC
> > > w/sequence 2
> > > > > > > > > > > > > > > > > >    8. (instance1) Produce messageD
> > w/sequence
> > > > 2,
> > > > > > this
> > > > > > > > is
> > > > > > > > > > > > > > considered a
> > > > > > > > > > > > > > > > > >    duplicate
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > Now if either of those commit the
> > > transaction,
> > > > it
> > > > > > > would
> > > > > > > > > > have
> > > > > > > > > > > a
> > > > > > > > > > > > > mix
> > > > > > > > > > > > > > of
> > > > > > > > > > > > > > > > > > messages from the two instances (messageA
> > and
> > > > > > > > messageC).
> > > > > > > > > > > With
> > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > proper
> > > > > > > > > > > > > > > > > > epoch bump, instance1 would get fenced at
> > > step
> > > > 3.
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > In order to update epoch in
> > > > > > > > > > > > InitProducerId(keepPreparedTxn=true)
> > > > > > > > > > > > > we
> > > > > > > > > > > > > > > > need
> > > > > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > > > preserve the ongoing transaction's epoch
> > (and
> > > > > > > > producerId,
> > > > > > > > > > if
> > > > > > > > > > > > the
> > > > > > > > > > > > > > > epoch
> > > > > > > > > > > > > > > > > > overflows), because we'd need to make a
> > > correct
> > > > > > > > decision
> > > > > > > > > > when
> > > > > > > > > > > > we
> > > > > > > > > > > > > > > > compare
> > > > > > > > > > > > > > > > > > the PreparedTxnState that we read from
> the
> > > > > database
> > > > > > > > with
> > > > > > > > > > the
> > > > > > > > > > > > > > > > (producerId,
> > > > > > > > > > > > > > > > > > epoch) of the ongoing transaction.
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > I've updated the KIP with the following:
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > >    - Ongoing transaction now has 2
> > > (producerId,
> > > > > > > epoch)
> > > > > > > > > > pairs
> > > > > > > > > > > --
> > > > > > > > > > > > > one
> > > > > > > > > > > > > > > > pair
> > > > > > > > > > > > > > > > > >    describes the ongoing transaction, the
> > > other
> > > > > > pair
> > > > > > > > > > > describes
> > > > > > > > > > > > > > > expected
> > > > > > > > > > > > > > > > > > epoch
> > > > > > > > > > > > > > > > > >    for operations on this transactional
> id
> > > > > > > > > > > > > > > > > >    - InitProducerIdResponse now returns 2
> > > > > > > (producerId,
> > > > > > > > > > epoch)
> > > > > > > > > > > > > pairs
> > > > > > > > > > > > > > > > > >    - TransactionalLogValue now has 2
> > > > (producerId,
> > > > > > > > epoch)
> > > > > > > > > > > pairs,
> > > > > > > > > > > > > the
> > > > > > > > > > > > > > > new
> > > > > > > > > > > > > > > > > >    values added as tagged fields, so it's
> > > easy
> > > > to
> > > > > > > > > downgrade
> > > > > > > > > > > > > > > > > >    - Added a note about downgrade in the
> > > > > > > Compatibility
> > > > > > > > > > > section
> > > > > > > > > > > > > > > > > >    - Added a rejected alternative
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > -Artem
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > On Fri, Oct 6, 2023 at 5:16 PM Artem
> > > Livshits <
> > > > > > > > > > > > > > > alivsh...@confluent.io>
> > > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > Hi Justine,
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > Thank you for the questions.  Currently
> > > > > > > (pre-KIP-939)
> > > > > > > > > we
> > > > > > > > > > > > always
> > > > > > > > > > > > > > > bump
> > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > epoch on InitProducerId and abort an
> > > ongoing
> > > > > > > > > transaction
> > > > > > > > > > > (if
> > > > > > > > > > > > > > > any).  I
> > > > > > > > > > > > > > > > > > > expect this behavior will continue with
> > > > KIP-890
> > > > > > as
> > > > > > > > > well.
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > With KIP-939 we need to support the
> case
> > > when
> > > > > the
> > > > > > > > > ongoing
> > > > > > > > > > > > > > > transaction
> > > > > > > > > > > > > > > > > > > needs to be preserved when
> > > > > keepPreparedTxn=true.
> > > > > > > > > Bumping
> > > > > > > > > > > > epoch
> > > > > > > > > > > > > > > > without
> > > > > > > > > > > > > > > > > > > aborting or committing a transaction is
> > > > tricky
> > > > > > > > because
> > > > > > > > > > > epoch
> > > > > > > > > > > > > is a
> > > > > > > > > > > > > > > > short
> > > > > > > > > > > > > > > > > > > value and it's easy to overflow.
> > > Currently,
> > > > > the
> > > > > > > > > overflow
> > > > > > > > > > > > case
> > > > > > > > > > > > > is
> > > > > > > > > > > > > > > > > handled
> > > > > > > > > > > > > > > > > > > by aborting the ongoing transaction,
> > which
> > > > > would
> > > > > > > send
> > > > > > > > > out
> > > > > > > > > > > > > > > transaction
> > > > > > > > > > > > > > > > > > > markers with epoch=Short.MAX_VALUE to
> the
> > > > > > partition
> > > > > > > > > > > leaders,
> > > > > > > > > > > > > > which
> > > > > > > > > > > > > > > > > would
> > > > > > > > > > > > > > > > > > > fence off any messages with the
> producer
> > id
> > > > > that
> > > > > > > > > started
> > > > > > > > > > > the
> > > > > > > > > > > > > > > > > transaction
> > > > > > > > > > > > > > > > > > > (they would have epoch that is less
> than
> > > > > > > > > > Short.MAX_VALUE).
> > > > > > > > > > > > > Then
> > > > > > > > > > > > > > it
> > > > > > > > > > > > > > > > is
> > > > > > > > > > > > > > > > > > safe
> > > > > > > > > > > > > > > > > > > to allocate a new producer id and use
> it
> > in
> > > > new
> > > > > > > > > > > transactions.
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > We could say that maybe when
> > > > > keepPreparedTxn=true
> > > > > > > we
> > > > > > > > > bump
> > > > > > > > > > > > epoch
> > > > > > > > > > > > > > > > unless
> > > > > > > > > > > > > > > > > it
> > > > > > > > > > > > > > > > > > > leads to overflow, and don't bump epoch
> > in
> > > > the
> > > > > > > > overflow
> > > > > > > > > > > case.
> > > > > > > > > > > > > I
> > > > > > > > > > > > > > > > don't
> > > > > > > > > > > > > > > > > > > think it's a good solution because if
> > it's
> > > > not
> > > > > > safe
> > > > > > > > to
> > > > > > > > > > keep
> > > > > > > > > > > > the
> > > > > > > > > > > > > > > same
> > > > > > > > > > > > > > > > > > epoch
> > > > > > > > > > > > > > > > > > > when keepPreparedTxn=true, then we must
> > > > handle
> > > > > > the
> > > > > > > > > epoch
> > > > > > > > > > > > > overflow
> > > > > > > > > > > > > > > > case
> > > > > > > > > > > > > > > > > as
> > > > > > > > > > > > > > > > > > > well.  So either we should convince
> > > ourselves
> > > > > > that
> > > > > > > > it's
> > > > > > > > > > > safe
> > > > > > > > > > > > to
> > > > > > > > > > > > > > > keep
> > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > epoch and do it in the general case, or
> > we
> > > > > always
> > > > > > > > bump
> > > > > > > > > > the
> > > > > > > > > > > > > epoch
> > > > > > > > > > > > > > > and
> > > > > > > > > > > > > > > > > > handle
> > > > > > > > > > > > > > > > > > > the overflow.
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > With KIP-890, we bump the epoch on
> every
> > > > > > > transaction
> > > > > > > > > > > commit /
> > > > > > > > > > > > > > > abort.
> > > > > > > > > > > > > > > > > > This
> > > > > > > > > > > > > > > > > > > guarantees that even if
> > > > > > > > > > > InitProducerId(keepPreparedTxn=true)
> > > > > > > > > > > > > > > doesn't
> > > > > > > > > > > > > > > > > > > increment epoch on the ongoing
> > transaction,
> > > > the
> > > > > > > > client
> > > > > > > > > > will
> > > > > > > > > > > > > have
> > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > > call
> > > > > > > > > > > > > > > > > > > commit or abort to finish the
> transaction
> > > and
> > > > > > will
> > > > > > > > > > > increment
> > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > epoch
> > > > > > > > > > > > > > > > > > (and
> > > > > > > > > > > > > > > > > > > handle epoch overflow, if needed).  If
> > the
> > > > > > ongoing
> > > > > > > > > > > > transaction
> > > > > > > > > > > > > > was
> > > > > > > > > > > > > > > > in a
> > > > > > > > > > > > > > > > > > bad
> > > > > > > > > > > > > > > > > > > state and had some zombies waiting to
> > > arrive,
> > > > > the
> > > > > > > > abort
> > > > > > > > > > > > > operation
> > > > > > > > > > > > > > > > would
> > > > > > > > > > > > > > > > > > > fence them because with KIP-890 every
> > abort
> > > > > would
> > > > > > > > bump
> > > > > > > > > > the
> > > > > > > > > > > > > epoch.
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > We could also look at this from the
> > > following
> > > > > > > > > > perspective.
> > > > > > > > > > > > > With
> > > > > > > > > > > > > > > > > KIP-890,
> > > > > > > > > > > > > > > > > > > zombies won't be able to cross
> > transaction
> > > > > > > > boundaries;
> > > > > > > > > > each
> > > > > > > > > > > > > > > > transaction
> > > > > > > > > > > > > > > > > > > completion creates a boundary and any
> > > > activity
> > > > > in
> > > > > > > the
> > > > > > > > > > past
> > > > > > > > > > > > gets
> > > > > > > > > > > > > > > > > confined
> > > > > > > > > > > > > > > > > > in
> > > > > > > > > > > > > > > > > > > the boundary.  Then data in any
> partition
> > > > would
> > > > > > > look
> > > > > > > > > like
> > > > > > > > > > > > this:
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > 1. message1, epoch=42
> > > > > > > > > > > > > > > > > > > 2. message2, epoch=42
> > > > > > > > > > > > > > > > > > > 3. message3, epoch=42
> > > > > > > > > > > > > > > > > > > 4. marker (commit or abort), epoch=43
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > Now if we inject steps 3a and 3b like
> > this:
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > 1. message1, epoch=42
> > > > > > > > > > > > > > > > > > > 2. message2, epoch=42
> > > > > > > > > > > > > > > > > > > 3. message3, epoch=42
> > > > > > > > > > > > > > > > > > > 3a. crash
> > > > > > > > > > > > > > > > > > > 3b.
> InitProducerId(keepPreparedTxn=true)
> > > > > > > > > > > > > > > > > > > 4. marker (commit or abort), epoch=43
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > The invariant still holds even with
> steps
> > > 3a
> > > > > and
> > > > > > 3b
> > > > > > > > --
> > > > > > > > > > > > whatever
> > > > > > > > > > > > > > > > > activity
> > > > > > > > > > > > > > > > > > > was in the past will get confined in
> the
> > > past
> > > > > > with
> > > > > > > > > > > mandatory
> > > > > > > > > > > > > > abort
> > > > > > > > > > > > > > > /
> > > > > > > > > > > > > > > > > > commit
> > > > > > > > > > > > > > > > > > > that must follow
> > > > > > > > InitProducerId(keepPreparedTxn=true).
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > So KIP-890 provides the proper
> isolation
> > > > > between
> > > > > > > > > > > > transactions,
> > > > > > > > > > > > > so
> > > > > > > > > > > > > > > > > > > injecting crash +
> > > > > > > > InitProducerId(keepPreparedTxn=true)
> > > > > > > > > > into
> > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > transaction sequence is safe from the
> > > zombie
> > > > > > > > protection
> > > > > > > > > > > > > > > perspective.
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > That said, I'm still thinking about it
> > and
> > > > > > looking
> > > > > > > > for
> > > > > > > > > > > cases
> > > > > > > > > > > > > that
> > > > > > > > > > > > > > > > might
> > > > > > > > > > > > > > > > > > > break because we don't bump epoch when
> > > > > > > > > > > > > > > > > > > InitProducerId(keepPreparedTxn=true),
> if
> > > such
> > > > > > cases
> > > > > > > > > > exist,
> > > > > > > > > > > > > we'll
> > > > > > > > > > > > > > > need
> > > > > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > > > > develop the logic to handle epoch
> > overflow
> > > > for
> > > > > > > > ongoing
> > > > > > > > > > > > > > > transactions.
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > -Artem
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > On Tue, Oct 3, 2023 at 10:15 AM Justine
> > > > Olshan
> > > > > > > > > > > > > > > > > > > <jols...@confluent.io.invalid> wrote:
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > >> Hey Artem,
> > > > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > > > >> Thanks for the KIP. I had a question
> > about
> > > > > epoch
> > > > > > > > > > bumping.
> > > > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > > > >> Previously when we send an
> > InitProducerId
> > > > > > request
> > > > > > > on
> > > > > > > > > > > > Producer
> > > > > > > > > > > > > > > > startup,
> > > > > > > > > > > > > > > > > > we
> > > > > > > > > > > > > > > > > > >> bump the epoch and abort the
> > transaction.
> > > Is
> > > > > it
> > > > > > > > > correct
> > > > > > > > > > to
> > > > > > > > > > > > > > assume
> > > > > > > > > > > > > > > > that
> > > > > > > > > > > > > > > > > > we
> > > > > > > > > > > > > > > > > > >> will still bump the epoch, but just
> not
> > > > abort
> > > > > > the
> > > > > > > > > > > > transaction?
> > > > > > > > > > > > > > > > > > >> If we still bump the epoch in this
> case,
> > > how
> > > > > > does
> > > > > > > > this
> > > > > > > > > > > > > interact
> > > > > > > > > > > > > > > with
> > > > > > > > > > > > > > > > > > >> KIP-890 where we also bump the epoch
> on
> > > > every
> > > > > > > > > > transaction.
> > > > > > > > > > > > (I
> > > > > > > > > > > > > > > think
> > > > > > > > > > > > > > > > > this
> > > > > > > > > > > > > > > > > > >> means that we may skip epochs and the
> > data
> > > > > > itself
> > > > > > > > will
> > > > > > > > > > all
> > > > > > > > > > > > > have
> > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > same
> > > > > > > > > > > > > > > > > > >> epoch)
> > > > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > > > >> I may have follow ups depending on the
> > > > answer
> > > > > to
> > > > > > > > this.
> > > > > > > > > > :)
> > > > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > > > >> Thanks,
> > > > > > > > > > > > > > > > > > >> Justine
> > > > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > > > >> On Thu, Sep 7, 2023 at 9:51 PM Artem
> > > > Livshits
> > > > > > > > > > > > > > > > > > >> <alivsh...@confluent.io.invalid>
> wrote:
> > > > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > > > >> > Hi Alex,
> > > > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > > > >> > Thank you for your questions.
> > > > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > > > >> > > the purpose of having broker-level
> > > > > > > > > > > > > > > > > > transaction.two.phase.commit.enable
> > > > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > > > >> > The thinking is that 2PC is a bit of
> > an
> > > > > > advanced
> > > > > > > > > > > construct
> > > > > > > > > > > > > so
> > > > > > > > > > > > > > > > > enabling
> > > > > > > > > > > > > > > > > > >> 2PC
> > > > > > > > > > > > > > > > > > >> > in a Kafka cluster should be an
> > explicit
> > > > > > > decision.
> > > > > > > > > If
> > > > > > > > > > > it
> > > > > > > > > > > > is
> > > > > > > > > > > > > > set
> > > > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > > > >> 'false'
> > > > > > > > > > > > > > > > > > >> > InitiProducerId (and
> initTransactions)
> > > > would
> > > > > > > > > > > > > > > > > > >> > return
> > > > > TRANSACTIONAL_ID_AUTHORIZATION_FAILED.
> > > > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > > > >> > > WDYT about adding an AdminClient
> > > method
> > > > > that
> > > > > > > > > returns
> > > > > > > > > > > the
> > > > > > > > > > > > > > state
> > > > > > > > > > > > > > > > of
> > > > > > > > > > > > > > > > > > >> > transaction.two.phase.commit.enable
> > > > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > > > >> > I wonder if the client could just
> try
> > to
> > > > use
> > > > > > 2PC
> > > > > > > > and
> > > > > > > > > > > then
> > > > > > > > > > > > > > handle
> > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > >> error
> > > > > > > > > > > > > > > > > > >> > (e.g. if it needs to fall back to
> > > ordinary
> > > > > > > > > > > transactions).
> > > > > > > > > > > > > > This
> > > > > > > > > > > > > > > > way
> > > > > > > > > > > > > > > > > it
> > > > > > > > > > > > > > > > > > >> > could uniformly handle cases when
> > Kafka
> > > > > > cluster
> > > > > > > > > > doesn't
> > > > > > > > > > > > > > support
> > > > > > > > > > > > > > > > 2PC
> > > > > > > > > > > > > > > > > > >> > completely and cases when 2PC is
> > > > restricted
> > > > > to
> > > > > > > > > certain
> > > > > > > > > > > > > users.
> > > > > > > > > > > > > > > We
> > > > > > > > > > > > > > > > > > could
> > > > > > > > > > > > > > > > > > >> > also expose this config in
> > > > describeConfigs,
> > > > > if
> > > > > > > the
> > > > > > > > > > > > fallback
> > > > > > > > > > > > > > > > approach
> > > > > > > > > > > > > > > > > > >> > doesn't work for some scenarios.
> > > > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > > > >> > -Artem
> > > > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > > > >> > On Tue, Sep 5, 2023 at 12:45 PM
> > > Alexander
> > > > > > > > Sorokoumov
> > > > > > > > > > > > > > > > > > >> > <asorokou...@confluent.io.invalid>
> > > wrote:
> > > > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > > > >> > > Hi Artem,
> > > > > > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > > > > > >> > > Thanks for publishing this KIP!
> > > > > > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > > > > > >> > > Can you please clarify the purpose
> > of
> > > > > having
> > > > > > > > > > > > broker-level
> > > > > > > > > > > > > > > > > > >> > >
> transaction.two.phase.commit.enable
> > > > config
> > > > > > in
> > > > > > > > > > addition
> > > > > > > > > > > > to
> > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > new
> > > > > > > > > > > > > > > > > > >> ACL? If
> > > > > > > > > > > > > > > > > > >> > > the brokers are configured with
> > > > > > > > > > > > > > > > > > >> >
> > > transaction.two.phase.commit.enable=false,
> > > > > > > > > > > > > > > > > > >> > > at what point will a client
> > configured
> > > > > with
> > > > > > > > > > > > > > > > > > >> > >
> > > transaction.two.phase.commit.enable=true
> > > > > > fail?
> > > > > > > > > Will
> > > > > > > > > > it
> > > > > > > > > > > > > > happen
> > > > > > > > > > > > > > > at
> > > > > > > > > > > > > > > > > > >> > > KafkaProducer#initTransactions?
> > > > > > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > > > > > >> > > WDYT about adding an AdminClient
> > > method
> > > > > that
> > > > > > > > > returns
> > > > > > > > > > > the
> > > > > > > > > > > > > > state
> > > > > > > > > > > > > > > > of
> > > > > > > > > > > > > > > > > t
> > > > > > > > > > > > > > > > > > >> > >
> ransaction.two.phase.commit.enable?
> > > This
> > > > > > way,
> > > > > > > > > > clients
> > > > > > > > > > > > > would
> > > > > > > > > > > > > > > know
> > > > > > > > > > > > > > > > > in
> > > > > > > > > > > > > > > > > > >> > advance
> > > > > > > > > > > > > > > > > > >> > > if 2PC is enabled on the brokers.
> > > > > > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > > > > > >> > > Best,
> > > > > > > > > > > > > > > > > > >> > > Alex
> > > > > > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > > > > > >> > > On Fri, Aug 25, 2023 at 9:40 AM
> > Roger
> > > > > > Hoover <
> > > > > > > > > > > > > > > > > > roger.hoo...@gmail.com>
> > > > > > > > > > > > > > > > > > >> > > wrote:
> > > > > > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > > > > > >> > > > Other than supporting
> multiplexing
> > > > > > > > transactional
> > > > > > > > > > > > streams
> > > > > > > > > > > > > > on
> > > > > > > > > > > > > > > a
> > > > > > > > > > > > > > > > > > single
> > > > > > > > > > > > > > > > > > >> > > > producer, I don't see how to
> > improve
> > > > it.
> > > > > > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > > > > > >> > > > On Thu, Aug 24, 2023 at 12:12 PM
> > > Artem
> > > > > > > > Livshits
> > > > > > > > > > > > > > > > > > >> > > > <alivsh...@confluent.io
> .invalid>
> > > > wrote:
> > > > > > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > > > > > >> > > > > Hi Roger,
> > > > > > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > > > > > >> > > > > Thank you for summarizing the
> > > > cons.  I
> > > > > > > agree
> > > > > > > > > and
> > > > > > > > > > > I'm
> > > > > > > > > > > > > > > curious
> > > > > > > > > > > > > > > > > > what
> > > > > > > > > > > > > > > > > > >> > would
> > > > > > > > > > > > > > > > > > >> > > > be
> > > > > > > > > > > > > > > > > > >> > > > > the alternatives to solve
> these
> > > > > problems
> > > > > > > > > better
> > > > > > > > > > > and
> > > > > > > > > > > > if
> > > > > > > > > > > > > > > they
> > > > > > > > > > > > > > > > > can
> > > > > > > > > > > > > > > > > > be
> > > > > > > > > > > > > > > > > > >> > > > > incorporated into this
> proposal
> > > (or
> > > > > > built
> > > > > > > > > > > > > independently
> > > > > > > > > > > > > > in
> > > > > > > > > > > > > > > > > > >> addition
> > > > > > > > > > > > > > > > > > >> > to
> > > > > > > > > > > > > > > > > > >> > > or
> > > > > > > > > > > > > > > > > > >> > > > > on top of this proposal).
> E.g.
> > > one
> > > > > > > > potential
> > > > > > > > > > > > > extension
> > > > > > > > > > > > > > we
> > > > > > > > > > > > > > > > > > >> discussed
> > > > > > > > > > > > > > > > > > >> > > > > earlier in the thread could be
> > > > > > > multiplexing
> > > > > > > > > > > logical
> > > > > > > > > > > > > > > > > > transactional
> > > > > > > > > > > > > > > > > > >> > > > "streams"
> > > > > > > > > > > > > > > > > > >> > > > > with a single producer.
> > > > > > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > > > > > >> > > > > -Artem
> > > > > > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > > > > > >> > > > > On Wed, Aug 23, 2023 at
> 4:50 PM
> > > > Roger
> > > > > > > > Hoover <
> > > > > > > > > > > > > > > > > > >> roger.hoo...@gmail.com
> > > > > > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > > > > > >> > > > > wrote:
> > > > > > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > > > > > >> > > > > > Thanks.  I like that you're
> > > moving
> > > > > > Kafka
> > > > > > > > > > toward
> > > > > > > > > > > > > > > supporting
> > > > > > > > > > > > > > > > > > this
> > > > > > > > > > > > > > > > > > >> > > > > dual-write
> > > > > > > > > > > > > > > > > > >> > > > > > pattern.  Each use case
> needs
> > to
> > > > > > > consider
> > > > > > > > > the
> > > > > > > > > > > > > > tradeoffs.
> > > > > > > > > > > > > > > > > You
> > > > > > > > > > > > > > > > > > >> > already
> > > > > > > > > > > > > > > > > > >> > > > > > summarized the pros very
> well
> > in
> > > > the
> > > > > > > > KIP.  I
> > > > > > > > > > > would
> > > > > > > > > > > > > > > > summarize
> > > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > >> > cons
> > > > > > > > > > > > > > > > > > >> > > > > > as follows:
> > > > > > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > > > > > >> > > > > > - you sacrifice
> availability -
> > > > each
> > > > > > > write
> > > > > > > > > > > requires
> > > > > > > > > > > > > > both
> > > > > > > > > > > > > > > DB
> > > > > > > > > > > > > > > > > and
> > > > > > > > > > > > > > > > > > >> > Kafka
> > > > > > > > > > > > > > > > > > >> > > to
> > > > > > > > > > > > > > > > > > >> > > > > be
> > > > > > > > > > > > > > > > > > >> > > > > > available so I think your
> > > overall
> > > > > > > > > application
> > > > > > > > > > > > > > > availability
> > > > > > > > > > > > > > > > > is
> > > > > > > > > > > > > > > > > > 1
> > > > > > > > > > > > > > > > > > >> -
> > > > > > > > > > > > > > > > > > >> > > p(DB
> > > > > > > > > > > > > > > > > > >> > > > is
> > > > > > > > > > > > > > > > > > >> > > > > > unavailable)*p(Kafka is
> > > > > unavailable).
> > > > > > > > > > > > > > > > > > >> > > > > > - latency will be higher and
> > > > > > throughput
> > > > > > > > > lower
> > > > > > > > > > -
> > > > > > > > > > > > each
> > > > > > > > > > > > > > > write
> > > > > > > > > > > > > > > > > > >> requires
> > > > > > > > > > > > > > > > > > >> > > > both
> > > > > > > > > > > > > > > > > > >> > > > > > writes to DB and Kafka while
> > > > holding
> > > > > > an
> > > > > > > > > > > exclusive
> > > > > > > > > > > > > lock
> > > > > > > > > > > > > > > in
> > > > > > > > > > > > > > > > > DB.
> > > > > > > > > > > > > > > > > > >> > > > > > - you need to create a
> > producer
> > > > per
> > > > > > unit
> > > > > > > > of
> > > > > > > > > > > > > > concurrency
> > > > > > > > > > > > > > > in
> > > > > > > > > > > > > > > > > > your
> > > > > > > > > > > > > > > > > > >> app
> > > > > > > > > > > > > > > > > > >> > > > which
> > > > > > > > > > > > > > > > > > >> > > > > > has some overhead in the app
> > and
> > > > > Kafka
> > > > > > > > side
> > > > > > > > > > > > (number
> > > > > > > > > > > > > of
> > > > > > > > > > > > > > > > > > >> connections,
> > > > > > > > > > > > > > > > > > >> > > > poor
> > > > > > > > > > > > > > > > > > >> > > > > > batching).  I assume the
> > > producers
> > > > > > would
> > > > > > > > > need
> > > > > > > > > > to
> > > > > > > > > > > > be
> > > > > > > > > > > > > > > > > configured
> > > > > > > > > > > > > > > > > > >> for
> > > > > > > > > > > > > > > > > > >> > > low
> > > > > > > > > > > > > > > > > > >> > > > > > latency (linger.ms=0)
> > > > > > > > > > > > > > > > > > >> > > > > > - there's some complexity in
> > > > > managing
> > > > > > > > stable
> > > > > > > > > > > > > > > transactional
> > > > > > > > > > > > > > > > > ids
> > > > > > > > > > > > > > > > > > >> for
> > > > > > > > > > > > > > > > > > >> > > each
> > > > > > > > > > > > > > > > > > >> > > > > > producer/concurrency unit in
> > > your
> > > > > > > > > application.
> > > > > > > > > > > > With
> > > > > > > > > > > > > > k8s
> > > > > > > > > > > > > > > > > > >> > deployment,
> > > > > > > > > > > > > > > > > > >> > > > you
> > > > > > > > > > > > > > > > > > >> > > > > > may need to switch to
> > something
> > > > > like a
> > > > > > > > > > > StatefulSet
> > > > > > > > > > > > > > that
> > > > > > > > > > > > > > > > > gives
> > > > > > > > > > > > > > > > > > >> each
> > > > > > > > > > > > > > > > > > >> > > pod
> > > > > > > > > > > > > > > > > > >> > > > a
> > > > > > > > > > > > > > > > > > >> > > > > > stable identity across
> > restarts.
> > > > On
> > > > > > top
> > > > > > > > of
> > > > > > > > > > that
> > > > > > > > > > > > pod
> > > > > > > > > > > > > > > > > identity
> > > > > > > > > > > > > > > > > > >> which
> > > > > > > > > > > > > > > > > > >> > > you
> > > > > > > > > > > > > > > > > > >> > > > > can
> > > > > > > > > > > > > > > > > > >> > > > > > use as a prefix, you then
> > assign
> > > > > > unique
> > > > > > > > > > > > > transactional
> > > > > > > > > > > > > > > ids
> > > > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > > > >> each
> > > > > > > > > > > > > > > > > > >> > > > > > concurrency unit
> > > > (thread/goroutine).
> > > > > > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > > > > > >> > > > > > On Wed, Aug 23, 2023 at
> > 12:53 PM
> > > > > Artem
> > > > > > > > > > Livshits
> > > > > > > > > > > > > > > > > > >> > > > > > <alivsh...@confluent.io
> > > .invalid>
> > > > > > wrote:
> > > > > > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > > > > > >> > > > > > > Hi Roger,
> > > > > > > > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > > > > > > > >> > > > > > > Thank you for the
> feedback.
> > > You
> > > > > > make
> > > > > > > a
> > > > > > > > > very
> > > > > > > > > > > > good
> > > > > > > > > > > > > > > point
> > > > > > > > > > > > > > > > > that
> > > > > > > > > > > > > > > > > > >> we
> > > > > > > > > > > > > > > > > > >> > > also
> > > > > > > > > > > > > > > > > > >> > > > > > > discussed internally.
> > Adding
> > > > > > support
> > > > > > > > for
> > > > > > > > > > > > multiple
> > > > > > > > > > > > > > > > > > concurrent
> > > > > > > > > > > > > > > > > > >> > > > > > > transactions in one
> producer
> > > > could
> > > > > > be
> > > > > > > > > > valuable
> > > > > > > > > > > > but
> > > > > > > > > > > > > > it
> > > > > > > > > > > > > > > > > seems
> > > > > > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > > > >> > be a
> > > > > > > > > > > > > > > > > > >> > > > > > fairly
> > > > > > > > > > > > > > > > > > >> > > > > > > large and independent
> change
> > > > that
> > > > > > > would
> > > > > > > > > > > deserve
> > > > > > > > > > > > a
> > > > > > > > > > > > > > > > separate
> > > > > > > > > > > > > > > > > > >> KIP.
> > > > > > > > > > > > > > > > > > >> > If
> > > > > > > > > > > > > > > > > > >> > > > > such
> > > > > > > > > > > > > > > > > > >> > > > > > > support is added we could
> > > modify
> > > > > 2PC
> > > > > > > > > > > > functionality
> > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > > > >> incorporate
> > > > > > > > > > > > > > > > > > >> > > > that.
> > > > > > > > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > > > > > > > >> > > > > > > > Maybe not too bad but a
> > bit
> > > of
> > > > > > pain
> > > > > > > to
> > > > > > > > > > > manage
> > > > > > > > > > > > > > these
> > > > > > > > > > > > > > > > ids
> > > > > > > > > > > > > > > > > > >> inside
> > > > > > > > > > > > > > > > > > >> > > each
> > > > > > > > > > > > > > > > > > >> > > > > > > process and across all
> > > > application
> > > > > > > > > > processes.
> > > > > > > > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > > > > > > > >> > > > > > > I'm not sure if supporting
> > > > > multiple
> > > > > > > > > > > transactions
> > > > > > > > > > > > > in
> > > > > > > > > > > > > > > one
> > > > > > > > > > > > > > > > > > >> producer
> > > > > > > > > > > > > > > > > > >> > > > would
> > > > > > > > > > > > > > > > > > >> > > > > > make
> > > > > > > > > > > > > > > > > > >> > > > > > > id management simpler:
> we'd
> > > need
> > > > > to
> > > > > > > > store
> > > > > > > > > a
> > > > > > > > > > > > piece
> > > > > > > > > > > > > of
> > > > > > > > > > > > > > > > data
> > > > > > > > > > > > > > > > > > per
> > > > > > > > > > > > > > > > > > >> > > > > > transaction,
> > > > > > > > > > > > > > > > > > >> > > > > > > so whether it's N
> producers
> > > > with a
> > > > > > > > single
> > > > > > > > > > > > > > transaction
> > > > > > > > > > > > > > > > or N
> > > > > > > > > > > > > > > > > > >> > > > transactions
> > > > > > > > > > > > > > > > > > >> > > > > > > with a single producer,
> it's
> > > > still
> > > > > > > > roughly
> > > > > > > > > > the
> > > > > > > > > > > > > same
> > > > > > > > > > > > > > > > amount
> > > > > > > > > > > > > > > > > > of
> > > > > > > > > > > > > > > > > > >> > data
> > > > > > > > > > > > > > > > > > >> > > to
> > > > > > > > > > > > > > > > > > >> > > > > > > manage.  In fact, managing
> > > > > > > transactional
> > > > > > > > > ids
> > > > > > > > > > > > > > (current
> > > > > > > > > > > > > > > > > > >> proposal)
> > > > > > > > > > > > > > > > > > >> > > might
> > > > > > > > > > > > > > > > > > >> > > > > be
> > > > > > > > > > > > > > > > > > >> > > > > > > easier, because the id is
> > > > > controlled
> > > > > > > by
> > > > > > > > > the
> > > > > > > > > > > > > > > application
> > > > > > > > > > > > > > > > > and
> > > > > > > > > > > > > > > > > > it
> > > > > > > > > > > > > > > > > > >> > > knows
> > > > > > > > > > > > > > > > > > >> > > > > how
> > > > > > > > > > > > > > > > > > >> > > > > > to
> > > > > > > > > > > > > > > > > > >> > > > > > > complete the transaction
> > after
> > > > > > crash /
> > > > > > > > > > > restart;
> > > > > > > > > > > > > > while
> > > > > > > > > > > > > > > a
> > > > > > > > > > > > > > > > > TID
> > > > > > > > > > > > > > > > > > >> would
> > > > > > > > > > > > > > > > > > >> > > be
> > > > > > > > > > > > > > > > > > >> > > > > > > generated by Kafka and
> that
> > > > would
> > > > > > > > create a
> > > > > > > > > > > > > question
> > > > > > > > > > > > > > of
> > > > > > > > > > > > > > > > > > >> starting
> > > > > > > > > > > > > > > > > > >> > > Kafka
> > > > > > > > > > > > > > > > > > >> > > > > > > transaction, but not
> saving
> > > its
> > > > > TID
> > > > > > > and
> > > > > > > > > then
> > > > > > > > > > > > > > crashing,
> > > > > > > > > > > > > > > > > then
> > > > > > > > > > > > > > > > > > >> > > figuring
> > > > > > > > > > > > > > > > > > >> > > > > out
> > > > > > > > > > > > > > > > > > >> > > > > > > which transactions to
> abort
> > > and
> > > > > etc.
> > > > > > > > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > > > > > > > >> > > > > > > > 2) creating a separate
> > > > producer
> > > > > > for
> > > > > > > > each
> > > > > > > > > > > > > > concurrency
> > > > > > > > > > > > > > > > > slot
> > > > > > > > > > > > > > > > > > in
> > > > > > > > > > > > > > > > > > >> > the
> > > > > > > > > > > > > > > > > > >> > > > > > > application
> > > > > > > > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > > > > > > > >> > > > > > > This is a very valid
> > concern.
> > > > > Maybe
> > > > > > > > we'd
> > > > > > > > > > need
> > > > > > > > > > > > to
> > > > > > > > > > > > > > have
> > > > > > > > > > > > > > > > > some
> > > > > > > > > > > > > > > > > > >> > > > > multiplexing
> > > > > > > > > > > > > > > > > > >> > > > > > of
> > > > > > > > > > > > > > > > > > >> > > > > > > transactional logical
> > > "streams"
> > > > > over
> > > > > > > the
> > > > > > > > > > same
> > > > > > > > > > > > > > > > connection.
> > > > > > > > > > > > > > > > > > >> Seems
> > > > > > > > > > > > > > > > > > >> > > > like a
> > > > > > > > > > > > > > > > > > >> > > > > > > separate KIP, though.
> > > > > > > > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > > > > > > > >> > > > > > > > Otherwise, it seems
> you're
> > > > left
> > > > > > with
> > > > > > > > > > > > > > single-threaded
> > > > > > > > > > > > > > > > > model
> > > > > > > > > > > > > > > > > > >> per
> > > > > > > > > > > > > > > > > > >> > > > > > > application process?
> > > > > > > > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > > > > > > > >> > > > > > > That's a fair assessment.
> > Not
> > > > > > > > necessarily
> > > > > > > > > > > > exactly
> > > > > > > > > > > > > > > > > > >> > single-threaded
> > > > > > > > > > > > > > > > > > >> > > > per
> > > > > > > > > > > > > > > > > > >> > > > > > > application, but a single
> > > > producer
> > > > > > per
> > > > > > > > > > thread
> > > > > > > > > > > > > model
> > > > > > > > > > > > > > > > (i.e.
> > > > > > > > > > > > > > > > > an
> > > > > > > > > > > > > > > > > > >> > > > > application
> > > > > > > > > > > > > > > > > > >> > > > > > > could have a pool of
> > threads +
> > > > > > > producers
> > > > > > > > > to
> > > > > > > > > > > > > increase
> > > > > > > > > > > > > > > > > > >> > concurrency).
> > > > > > > > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > > > > > > > >> > > > > > > -Artem
> > > > > > > > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > > > > > > > >> > > > > > > On Tue, Aug 22, 2023 at
> > > 7:22 PM
> > > > > > Roger
> > > > > > > > > > Hoover <
> > > > > > > > > > > > > > > > > > >> > > roger.hoo...@gmail.com
> > > > > > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > > > > > >> > > > > > > wrote:
> > > > > > > > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > > > > > > > >> > > > > > > > Artem,
> > > > > > > > > > > > > > > > > > >> > > > > > > >
> > > > > > > > > > > > > > > > > > >> > > > > > > > Thanks for the reply.
> > > > > > > > > > > > > > > > > > >> > > > > > > >
> > > > > > > > > > > > > > > > > > >> > > > > > > > If I understand
> correctly,
> > > > Kafka
> > > > > > > does
> > > > > > > > > not
> > > > > > > > > > > > > support
> > > > > > > > > > > > > > > > > > concurrent
> > > > > > > > > > > > > > > > > > >> > > > > > transactions
> > > > > > > > > > > > > > > > > > >> > > > > > > > from the same producer
> > > > > > > (transactional
> > > > > > > > > id).
> > > > > > > > > > > I
> > > > > > > > > > > > > > think
> > > > > > > > > > > > > > > > this
> > > > > > > > > > > > > > > > > > >> means
> > > > > > > > > > > > > > > > > > >> > > that
> > > > > > > > > > > > > > > > > > >> > > > > > > > applications that want
> to
> > > > > support
> > > > > > > > > > in-process
> > > > > > > > > > > > > > > > concurrency
> > > > > > > > > > > > > > > > > > >> (say
> > > > > > > > > > > > > > > > > > >> > > > > > > thread-level
> > > > > > > > > > > > > > > > > > >> > > > > > > > concurrency with
> row-level
> > > DB
> > > > > > > locking)
> > > > > > > > > > would
> > > > > > > > > > > > > need
> > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > > > manage
> > > > > > > > > > > > > > > > > > >> > > > separate
> > > > > > > > > > > > > > > > > > >> > > > > > > > transactional ids and
> > > > producers
> > > > > > per
> > > > > > > > > thread
> > > > > > > > > > > and
> > > > > > > > > > > > > > then
> > > > > > > > > > > > > > > > > store
> > > > > > > > > > > > > > > > > > >> txn
> > > > > > > > > > > > > > > > > > >> > > state
> > > > > > > > > > > > > > > > > > >> > > > > > > > accordingly.   The
> > potential
> > > > > > > usability
> > > > > > > > > > > > > downsides I
> > > > > > > > > > > > > > > see
> > > > > > > > > > > > > > > > > are
> > > > > > > > > > > > > > > > > > >> > > > > > > > 1) managing a set of
> > > > > transactional
> > > > > > > ids
> > > > > > > > > for
> > > > > > > > > > > > each
> > > > > > > > > > > > > > > > > > application
> > > > > > > > > > > > > > > > > > >> > > process
> > > > > > > > > > > > > > > > > > >> > > > > > that
> > > > > > > > > > > > > > > > > > >> > > > > > > > scales up to it's max
> > > > > concurrency.
> > > > > > > > > Maybe
> > > > > > > > > > > not
> > > > > > > > > > > > > too
> > > > > > > > > > > > > > > bad
> > > > > > > > > > > > > > > > > but
> > > > > > > > > > > > > > > > > > a
> > > > > > > > > > > > > > > > > > >> bit
> > > > > > > > > > > > > > > > > > >> > > of
> > > > > > > > > > > > > > > > > > >> > > > > pain
> > > > > > > > > > > > > > > > > > >> > > > > > > to
> > > > > > > > > > > > > > > > > > >> > > > > > > > manage these ids inside
> > each
> > > > > > process
> > > > > > > > and
> > > > > > > > > > > > across
> > > > > > > > > > > > > > all
> > > > > > > > > > > > > > > > > > >> application
> > > > > > > > > > > > > > > > > > >> > > > > > > processes.
> > > > > > > > > > > > > > > > > > >> > > > > > > > 2) creating a separate
> > > > producer
> > > > > > for
> > > > > > > > each
> > > > > > > > > > > > > > concurrency
> > > > > > > > > > > > > > > > > slot
> > > > > > > > > > > > > > > > > > in
> > > > > > > > > > > > > > > > > > >> > the
> > > > > > > > > > > > > > > > > > >> > > > > > > > application - this could
> > > > create
> > > > > a
> > > > > > > lot
> > > > > > > > > more
> > > > > > > > > > > > > > producers
> > > > > > > > > > > > > > > > and
> > > > > > > > > > > > > > > > > > >> > > resultant
> > > > > > > > > > > > > > > > > > >> > > > > > > > connections to Kafka
> than
> > > the
> > > > > > > typical
> > > > > > > > > > model
> > > > > > > > > > > > of a
> > > > > > > > > > > > > > > > single
> > > > > > > > > > > > > > > > > > >> > producer
> > > > > > > > > > > > > > > > > > >> > > > per
> > > > > > > > > > > > > > > > > > >> > > > > > > > process.
> > > > > > > > > > > > > > > > > > >> > > > > > > >
> > > > > > > > > > > > > > > > > > >> > > > > > > > Otherwise, it seems
> you're
> > > > left
> > > > > > with
> > > > > > > > > > > > > > single-threaded
> > > > > > > > > > > > > > > > > model
> > > > > > > > > > > > > > > > > > >> per
> > > > > > > > > > > > > > > > > > >> > > > > > > application
> > > > > > > > > > > > > > > > > > >> > > > > > > > process?
> > > > > > > > > > > > > > > > > > >> > > > > > > >
> > > > > > > > > > > > > > > > > > >> > > > > > > > Thanks,
> > > > > > > > > > > > > > > > > > >> > > > > > > >
> > > > > > > > > > > > > > > > > > >> > > > > > > > Roger
> > > > > > > > > > > > > > > > > > >> > > > > > > >
> > > > > > > > > > > > > > > > > > >> > > > > > > > On Tue, Aug 22, 2023 at
> > > > 5:11 PM
> > > > > > > Artem
> > > > > > > > > > > Livshits
> > > > > > > > > > > > > > > > > > >> > > > > > > > <alivsh...@confluent.io
> > > > > .invalid>
> > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > >> > > > > > > >
> > > > > > > > > > > > > > > > > > >> > > > > > > > > Hi Roger, Arjun,
> > > > > > > > > > > > > > > > > > >> > > > > > > > >
> > > > > > > > > > > > > > > > > > >> > > > > > > > > Thank you for the
> > > questions.
> > > > > > > > > > > > > > > > > > >> > > > > > > > > > It looks like the
> > > > > application
> > > > > > > must
> > > > > > > > > > have
> > > > > > > > > > > > > stable
> > > > > > > > > > > > > > > > > > >> > transactional
> > > > > > > > > > > > > > > > > > >> > > > ids
> > > > > > > > > > > > > > > > > > >> > > > > > over
> > > > > > > > > > > > > > > > > > >> > > > > > > > > time?
> > > > > > > > > > > > > > > > > > >> > > > > > > > >
> > > > > > > > > > > > > > > > > > >> > > > > > > > > The transactional id
> > > should
> > > > > > > uniquely
> > > > > > > > > > > > identify
> > > > > > > > > > > > > a
> > > > > > > > > > > > > > > > > producer
> > > > > > > > > > > > > > > > > > >> > > instance
> > > > > > > > > > > > > > > > > > >> > > > > and
> > > > > > > > > > > > > > > > > > >> > > > > > > > needs
> > > > > > > > > > > > > > > > > > >> > > > > > > > > to be stable across
> the
> > > > > > restarts.
> > > > > > > > If
> > > > > > > > > > the
> > > > > > > > > > > > > > > > > transactional
> > > > > > > > > > > > > > > > > > >> id is
> > > > > > > > > > > > > > > > > > >> > > not
> > > > > > > > > > > > > > > > > > >> > > > > > > stable
> > > > > > > > > > > > > > > > > > >> > > > > > > > > across restarts, then
> > > zombie
> > > > > > > > messages
> > > > > > > > > > > from a
> > > > > > > > > > > > > > > > previous
> > > > > > > > > > > > > > > > > > >> > > incarnation
> > > > > > > > > > > > > > > > > > >> > > > > of
> > > > > > > > > > > > > > > > > > >> > > > > > > the
> > > > > > > > > > > > > > > > > > >> > > > > > > > > producer may violate
> > > > > atomicity.
> > > > > > > If
> > > > > > > > > > there
> > > > > > > > > > > > are
> > > > > > > > > > > > > 2
> > > > > > > > > > > > > > > > > producer
> > > > > > > > > > > > > > > > > > >> > > > instances
> > > > > > > > > > > > > > > > > > >> > > > > > > > > concurrently producing
> > > data
> > > > > with
> > > > > > > the
> > > > > > > > > > same
> > > > > > > > > > > > > > > > > transactional
> > > > > > > > > > > > > > > > > > >> id,
> > > > > > > > > > > > > > > > > > >> > > they
> > > > > > > > > > > > > > > > > > >> > > > > are
> > > > > > > > > > > > > > > > > > >> > > > > > > > going
> > > > > > > > > > > > > > > > > > >> > > > > > > > > to constantly fence
> each
> > > > other
> > > > > > and
> > > > > > > > > most
> > > > > > > > > > > > likely
> > > > > > > > > > > > > > > make
> > > > > > > > > > > > > > > > > > >> little or
> > > > > > > > > > > > > > > > > > >> > > no
> > > > > > > > > > > > > > > > > > >> > > > > > > > progress.
> > > > > > > > > > > > > > > > > > >> > > > > > > > >
> > > > > > > > > > > > > > > > > > >> > > > > > > > > The name might be a
> > little
> > > > bit
> > > > > > > > > confusing
> > > > > > > > > > > as
> > > > > > > > > > > > it
> > > > > > > > > > > > > > may
> > > > > > > > > > > > > > > > be
> > > > > > > > > > > > > > > > > > >> > mistaken
> > > > > > > > > > > > > > > > > > >> > > > for
> > > > > > > > > > > > > > > > > > >> > > > > a
> > > > > > > > > > > > > > > > > > >> > > > > > > > > transaction id / TID
> > that
> > > > > > uniquely
> > > > > > > > > > > > identifies
> > > > > > > > > > > > > > > every
> > > > > > > > > > > > > > > > > > >> > > transaction.
> > > > > > > > > > > > > > > > > > >> > > > > The
> > > > > > > > > > > > > > > > > > >> > > > > > > > name
> > > > > > > > > > > > > > > > > > >> > > > > > > > > and the semantics were
> > > > defined
> > > > > > in
> > > > > > > > the
> > > > > > > > > > > > original
> > > > > > > > > > > > > > > > > > >> > > > > exactly-once-semantics
> > > > > > > > > > > > > > > > > > >> > > > > > > > (EoS)
> > > > > > > > > > > > > > > > > > >> > > > > > > > > proposal (
> > > > > > > > > > > > > > > > > > >> > > > > > > > >
> > > > > > > > > > > > > > > > > > >> > > > > > > > >
> > > > > > > > > > > > > > > > > > >> > > > > > > >
> > > > > > > > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging
> > > > > > > > > > > > > > > > > > >> > > > > > > > > )
> > > > > > > > > > > > > > > > > > >> > > > > > > > > and KIP-939 just build
> > on
> > > > top
> > > > > of
> > > > > > > > that.
> > > > > > > > > > > > > > > > > > >> > > > > > > > >
> > > > > > > > > > > > > > > > > > >> > > > > > > > > > I'm curious to
> > > understand
> > > > > what
> > > > > > > > > happens
> > > > > > > > > > > if
> > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > producer
> > > > > > > > > > > > > > > > > > >> > dies,
> > > > > > > > > > > > > > > > > > >> > > > and
> > > > > > > > > > > > > > > > > > >> > > > > > does
> > > > > > > > > > > > > > > > > > >> > > > > > > > not
> > > > > > > > > > > > > > > > > > >> > > > > > > > > come up and recover
> the
> > > > > pending
> > > > > > > > > > > transaction
> > > > > > > > > > > > > > within
> > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > >> > > > transaction
> > > > > > > > > > > > > > > > > > >> > > > > > > > timeout
> > > > > > > > > > > > > > > > > > >> > > > > > > > > interval.
> > > > > > > > > > > > > > > > > > >> > > > > > > > >
> > > > > > > > > > > > > > > > > > >> > > > > > > > > If the producer /
> > > > application
> > > > > > > never
> > > > > > > > > > comes
> > > > > > > > > > > > > back,
> > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > >> > transaction
> > > > > > > > > > > > > > > > > > >> > > > > will
> > > > > > > > > > > > > > > > > > >> > > > > > > > remain
> > > > > > > > > > > > > > > > > > >> > > > > > > > > in prepared (a.k.a.
> > > > > "in-doubt")
> > > > > > > > state
> > > > > > > > > > > until
> > > > > > > > > > > > an
> > > > > > > > > > > > > > > > > operator
> > > > > > > > > > > > > > > > > > >> > > > forcefully
> > > > > > > > > > > > > > > > > > >> > > > > > > > > terminates the
> > > transaction.
> > > > > > > That's
> > > > > > > > > why
> > > > > > > > > > > > there
> > > > > > > > > > > > > > is a
> > > > > > > > > > > > > > > > new
> > > > > > > > > > > > > > > > > > >> ACL is
> > > > > > > > > > > > > > > > > > >> > > > > defined
> > > > > > > > > > > > > > > > > > >> > > > > > > in
> > > > > > > > > > > > > > > > > > >> > > > > > > > > this proposal -- this
> > > > > > > functionality
> > > > > > > > > > should
> > > > > > > > > > > > > only
> > > > > > > > > > > > > > > > > provided
> > > > > > > > > > > > > > > > > > >> to
> > > > > > > > > > > > > > > > > > >> > > > > > > applications
> > > > > > > > > > > > > > > > > > >> > > > > > > > > that implement proper
> > > > recovery
> > > > > > > > logic.
> > > > > > > > > > > > > > > > > > >> > > > > > > > >
> > > > > > > > > > > > > > > > > > >> > > > > > > > > -Artem
> > > > > > > > > > > > > > > > > > >> > > > > > > > >
> > > > > > > > > > > > > > > > > > >> > > > > > > > > On Tue, Aug 22, 2023
> at
> > > > > 12:52 AM
> > > > > > > > Arjun
> > > > > > > > > > > > Satish
> > > > > > > > > > > > > <
> > > > > > > > > > > > > > > > > > >> > > > > > arjun.sat...@gmail.com>
> > > > > > > > > > > > > > > > > > >> > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > >> > > > > > > > >
> > > > > > > > > > > > > > > > > > >> > > > > > > > > > Hello Artem,
> > > > > > > > > > > > > > > > > > >> > > > > > > > > >
> > > > > > > > > > > > > > > > > > >> > > > > > > > > > Thanks for the KIP.
> > > > > > > > > > > > > > > > > > >> > > > > > > > > >
> > > > > > > > > > > > > > > > > > >> > > > > > > > > > I have the same
> > question
> > > > as
> > > > > > > Roger
> > > > > > > > on
> > > > > > > > > > > > > > concurrent
> > > > > > > > > > > > > > > > > > writes,
> > > > > > > > > > > > > > > > > > >> and
> > > > > > > > > > > > > > > > > > >> > > an
> > > > > > > > > > > > > > > > > > >> > > > > > > > additional
> > > > > > > > > > > > > > > > > > >> > > > > > > > > > one on consumer
> > > behavior.
> > > > > > > > Typically,
> > > > > > > > > > > > > > > transactions
> > > > > > > > > > > > > > > > > will
> > > > > > > > > > > > > > > > > > >> > > timeout
> > > > > > > > > > > > > > > > > > >> > > > if
> > > > > > > > > > > > > > > > > > >> > > > > > not
> > > > > > > > > > > > > > > > > > >> > > > > > > > > > committed within
> some
> > > time
> > > > > > > > interval.
> > > > > > > > > > > With
> > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > proposed
> > > > > > > > > > > > > > > > > > >> > > changes
> > > > > > > > > > > > > > > > > > >> > > > in
> > > > > > > > > > > > > > > > > > >> > > > > > > this
> > > > > > > > > > > > > > > > > > >> > > > > > > > > KIP,
> > > > > > > > > > > > > > > > > > >> > > > > > > > > > consumers cannot
> > consume
> > > > > past
> > > > > > > the
> > > > > > > > > > > ongoing
> > > > > > > > > > > > > > > > > transaction.
> > > > > > > > > > > > > > > > > > >> I'm
> > > > > > > > > > > > > > > > > > >> > > > > curious
> > > > > > > > > > > > > > > > > > >> > > > > > to
> > > > > > > > > > > > > > > > > > >> > > > > > > > > > understand what
> > happens
> > > if
> > > > > the
> > > > > > > > > > producer
> > > > > > > > > > > > > dies,
> > > > > > > > > > > > > > > and
> > > > > > > > > > > > > > > > > does
> > > > > > > > > > > > > > > > > > >> not
> > > > > > > > > > > > > > > > > > >> > > come
> > > > > > > > > > > > > > > > > > >> > > > > up
> > > > > > > > > > > > > > > > > > >> > > > > > > and
> > > > > > > > > > > > > > > > > > >> > > > > > > > > > recover the pending
> > > > > > transaction
> > > > > > > > > within
> > > > > > > > > > > the
> > > > > > > > > > > > > > > > > transaction
> > > > > > > > > > > > > > > > > > >> > > timeout
> > > > > > > > > > > > > > > > > > >> > > > > > > > interval.
> > > > > > > > > > > > > > > > > > >> > > > > > > > > Or
> > > > > > > > > > > > > > > > > > >> > > > > > > > > > are we saying that
> > when
> > > > used
> > > > > > in
> > > > > > > > this
> > > > > > > > > > 2PC
> > > > > > > > > > > > > > > context,
> > > > > > > > > > > > > > > > we
> > > > > > > > > > > > > > > > > > >> should
> > > > > > > > > > > > > > > > > > >> > > > > > configure
> > > > > > > > > > > > > > > > > > >> > > > > > > > > these
> > > > > > > > > > > > > > > > > > >> > > > > > > > > > transaction timeouts
> > to
> > > > very
> > > > > > > large
> > > > > > > > > > > > > durations?
> > > > > > > > > > > > > > > > > > >> > > > > > > > > >
> > > > > > > > > > > > > > > > > > >> > > > > > > > > > Thanks in advance!
> > > > > > > > > > > > > > > > > > >> > > > > > > > > >
> > > > > > > > > > > > > > > > > > >> > > > > > > > > > Best,
> > > > > > > > > > > > > > > > > > >> > > > > > > > > > Arjun
> > > > > > > > > > > > > > > > > > >> > > > > > > > > >
> > > > > > > > > > > > > > > > > > >> > > > > > > > > >
> > > > > > > > > > > > > > > > > > >> > > > > > > > > > On Mon, Aug 21, 2023
> > at
> > > > > > 1:06 PM
> > > > > > > > > Roger
> > > > > > > > > > > > > Hoover <
> > > > > > > > > > > > > > > > > > >> > > > > > roger.hoo...@gmail.com
> > > > > > > > > > > > > > > > > > >> > > > > > > >
> > > > > > > > > > > > > > > > > > >> > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > >> > > > > > > > > >
> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > Hi Artem,
> > > > > > > > > > > > > > > > > > >> > > > > > > > > > >
> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > Thanks for writing
> > > this
> > > > > KIP.
> > > > > > > > Can
> > > > > > > > > > you
> > > > > > > > > > > > > > clarify
> > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > >> > > > requirements
> > > > > > > > > > > > > > > > > > >> > > > > a
> > > > > > > > > > > > > > > > > > >> > > > > > > bit
> > > > > > > > > > > > > > > > > > >> > > > > > > > > more
> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > for managing
> > > transaction
> > > > > > > state?
> > > > > > > > > It
> > > > > > > > > > > > looks
> > > > > > > > > > > > > > like
> > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > >> > > > application
> > > > > > > > > > > > > > > > > > >> > > > > > must
> > > > > > > > > > > > > > > > > > >> > > > > > > > > have
> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > stable
> transactional
> > > ids
> > > > > > over
> > > > > > > > > time?
> > > > > > > > > > > >  What
> > > > > > > > > > > > > > is
> > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > >> > > granularity
> > > > > > > > > > > > > > > > > > >> > > > > of
> > > > > > > > > > > > > > > > > > >> > > > > > > > those
> > > > > > > > > > > > > > > > > > >> > > > > > > > > > ids
> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > and producers?
> Say
> > > the
> > > > > > > > > application
> > > > > > > > > > > is a
> > > > > > > > > > > > > > > > > > >> multi-threaded
> > > > > > > > > > > > > > > > > > >> > > Java
> > > > > > > > > > > > > > > > > > >> > > > > web
> > > > > > > > > > > > > > > > > > >> > > > > > > > > server,
> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > can/should all the
> > > > > > concurrent
> > > > > > > > > > threads
> > > > > > > > > > > > > share
> > > > > > > > > > > > > > a
> > > > > > > > > > > > > > > > > > >> > transactional
> > > > > > > > > > > > > > > > > > >> > > > id
> > > > > > > > > > > > > > > > > > >> > > > > > and
> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > producer?  That
> > > doesn't
> > > > > seem
> > > > > > > > right
> > > > > > > > > > to
> > > > > > > > > > > me
> > > > > > > > > > > > > > > unless
> > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > >> > > > application
> > > > > > > > > > > > > > > > > > >> > > > > > is
> > > > > > > > > > > > > > > > > > >> > > > > > > > > using
> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > global DB locks
> that
> > > > > > serialize
> > > > > > > > all
> > > > > > > > > > > > > requests.
> > > > > > > > > > > > > > > > > > >> Instead, if
> > > > > > > > > > > > > > > > > > >> > > the
> > > > > > > > > > > > > > > > > > >> > > > > > > > > application
> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > uses row-level DB
> > > locks,
> > > > > > there
> > > > > > > > > could
> > > > > > > > > > > be
> > > > > > > > > > > > > > > > multiple,
> > > > > > > > > > > > > > > > > > >> > > concurrent,
> > > > > > > > > > > > > > > > > > >> > > > > > > > > independent
> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > txns happening in
> > the
> > > > same
> > > > > > JVM
> > > > > > > > so
> > > > > > > > > it
> > > > > > > > > > > > seems
> > > > > > > > > > > > > > > like
> > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > >> > > > granularity
> > > > > > > > > > > > > > > > > > >> > > > > > > > > managing
> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > transactional ids
> > and
> > > > txn
> > > > > > > state
> > > > > > > > > > needs
> > > > > > > > > > > to
> > > > > > > > > > > > > > line
> > > > > > > > > > > > > > > up
> > > > > > > > > > > > > > > > > > with
> > > > > > > > > > > > > > > > > > >> > > > > granularity
> > > > > > > > > > > > > > > > > > >> > > > > > > of
> > > > > > > > > > > > > > > > > > >> > > > > > > > > the
> > > > > > > > > > > > > > > > > > >> > > > > > > > > > DB
> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > locking.
> > > > > > > > > > > > > > > > > > >> > > > > > > > > > >
> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > Does that make
> sense
> > > or
> > > > > am I
> > > > > > > > > > > > > > misunderstanding?
> > > > > > > > > > > > > > > > > > >> > > > > > > > > > >
> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > > > > >> > > > > > > > > > >
> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > Roger
> > > > > > > > > > > > > > > > > > >> > > > > > > > > > >
> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > On Wed, Aug 16,
> 2023
> > > at
> > > > > > > 11:40 PM
> > > > > > > > > > Artem
> > > > > > > > > > > > > > > Livshits
> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > <
> > > alivsh...@confluent.io
> > > > > > > > .invalid>
> > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > >> > > > > > > > > > >
> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > Hello,
> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > This is a
> > discussion
> > > > > > thread
> > > > > > > > for
> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > >> > > > > > > > > > >
> > > > > > > > > > > > > > > > > > >> > > > > > > > > >
> > > > > > > > > > > > > > > > > > >> > > > > > > > >
> > > > > > > > > > > > > > > > > > >> > > > > > > >
> > > > > > > > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-939%3A+Support+Participation+in+2PC
> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > .
> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > The KIP proposes
> > > > > extending
> > > > > > > > Kafka
> > > > > > > > > > > > > > transaction
> > > > > > > > > > > > > > > > > > support
> > > > > > > > > > > > > > > > > > >> > > (that
> > > > > > > > > > > > > > > > > > >> > > > > > > already
> > > > > > > > > > > > > > > > > > >> > > > > > > > > uses
> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > 2PC
> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > under the hood)
> to
> > > > > enable
> > > > > > > > > > atomicity
> > > > > > > > > > > of
> > > > > > > > > > > > > > dual
> > > > > > > > > > > > > > > > > writes
> > > > > > > > > > > > > > > > > > >> to
> > > > > > > > > > > > > > > > > > >> > > Kafka
> > > > > > > > > > > > > > > > > > >> > > > > and
> > > > > > > > > > > > > > > > > > >> > > > > > > an
> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > external
> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > database, and
> > helps
> > > to
> > > > > > fix a
> > > > > > > > > long
> > > > > > > > > > > > > standing
> > > > > > > > > > > > > > > > Flink
> > > > > > > > > > > > > > > > > > >> issue.
> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > An example of
> code
> > > > that
> > > > > > uses
> > > > > > > > the
> > > > > > > > > > > dual
> > > > > > > > > > > > > > write
> > > > > > > > > > > > > > > > > recipe
> > > > > > > > > > > > > > > > > > >> with
> > > > > > > > > > > > > > > > > > >> > > > JDBC
> > > > > > > > > > > > > > > > > > >> > > > > > and
> > > > > > > > > > > > > > > > > > >> > > > > > > > > should
> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > work for most
> SQL
> > > > > > databases
> > > > > > > is
> > > > > > > > > > here
> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > >
> > > > > > > > > > > > > > https://github.com/apache/kafka/pull/14231.
> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > The FLIP for the
> > > > sister
> > > > > > fix
> > > > > > > in
> > > > > > > > > > Flink
> > > > > > > > > > > > is
> > > > > > > > > > > > > > here
> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > >> > > > > > > > > > >
> > > > > > > > > > > > > > > > > > >> > > > > > > > > >
> > > > > > > > > > > > > > > > > > >> > > > > > > > >
> > > > > > > > > > > > > > > > > > >> > > > > > > >
> > > > > > > > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255071710
> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > -Artem
> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > >> > > > > > > > > > >
> > > > > > > > > > > > > > > > > > >> > > > > > > > > >
> > > > > > > > > > > > > > > > > > >> > > > > > > > >
> > > > > > > > > > > > > > > > > > >> > > > > > > >
> > > > > > > > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to