Hi, Artem, Thanks for the reply.
For 3b, it would be useful to understand the reason why an admin doesn't authorize 2PC for self-hosted Flink. Is the main reason that 2PC has unbounded timeout that could lead to unbounded outstanding transactions? If so, another way to address that is to allow the admin to set a timeout even for the 2PC case. The timeout would be long enough for behavioring applications to complete 2PC operations, but not too long for non-behaving applications' transactions to hang. Jun On Wed, Feb 21, 2024 at 4:34 PM Artem Livshits <alivsh...@confluent.io.invalid> wrote: > Hi Jun, > > > 20A. One option is to make the API initTransactions(boolean enable2PC). > > We could do that. I think there is a little bit of symmetry between the > client and server that would get lost with this approach (server has > enable2PC as config), but I don't really see a strong reason for enable2PC > to be a config vs. an argument for initTransactions. But let's see if we > find 20B to be a strong consideration for keeping a separate flag for > keepPreparedTxn. > > > 20B. But realistically, we want Flink (and other apps) to have a single > implementation > > That's correct and here's what I think can happen if we don't allow > independent keepPreparedTxn: > > 1. Pre-KIP-939 self-hosted Flink vs. any Kafka cluster -- reflection is > used, which effectively implements keepPreparedTxn=true without our > explicit support. > 2. KIP-939 self-hosted Flink vs. pre-KIP-939 Kafka cluster -- we can > either fall back to reflection or we just say we don't support this, have > to upgrade Kafka cluster first. > 3. KIP-939 self-hosted Flink vs. KIP-939 Kafka cluster, this becomes > interesting depending on whether the Kafka cluster authorizes 2PC or not: > 3a. Kafka cluster autorizes 2PC for self-hosted Flink -- everything uses > KIP-939 and there is no problem > 3b. Kafka cluster doesn't authorize 2PC for self-hosted Flink -- we can > either fallback to reflection or use keepPreparedTxn=true even if 2PC is > not enabled. > > It seems to be ok to not support case 2 (i.e. require Kafka upgrade first), > it shouldn't be an issue for cloud offerings as cloud providers are likely > to upgrade their Kafka to the latest versions. > > The case 3b seems to be important to support, though -- the latest version > of everything should work at least as well (and preferably better) than > previous ones. It's possible to downgrade to case 1, but it's probably not > sustainable as newer versions of Flink would also add other features that > the customers may want to take advantage of. > > If we enabled keepPreparedTxn=true even without 2PC, then we could enable > case 3b without the need to fall back to reflection, so we could get rid of > reflection-based logic and just have a single implementation based on > KIP-939. > > > 32. My suggestion is to change > > Let me think about it and I'll come back to this. > > -Artem > > On Wed, Feb 21, 2024 at 3:40 PM Jun Rao <j...@confluent.io.invalid> wrote: > > > Hi, Artem, > > > > Thanks for the reply. > > > > 20A. One option is to make the API initTransactions(boolean enable2PC). > > Then, it's clear from the code whether 2PC related logic should be added. > > > > 20B. But realistically, we want Flink (and other apps) to have a single > > implementation of the 2PC logic, not two different implementations, > right? > > > > 32. My suggestion is to > > change > > > kafka.server:type=transaction-coordinator-metrics,name=active-transaction-open-time-max > > to sth like > > Metric Name Type Group > > Tags Description > > active-transaction-open-time-max Max transaction-coordinator-metrics > > none The max time a currently-open transaction has been open > > > > Jun > > > > On Wed, Feb 21, 2024 at 11:25 AM Artem Livshits > > <alivsh...@confluent.io.invalid> wrote: > > > > > Hi Jun, > > > > > > > 20A. This only takes care of the abort case. The application still > > needs > > > to be changed to handle the commit case properly > > > > > > My point here is that looking at the initTransactions() call it's not > > clear > > > what the semantics is. Say I'm doing code review, I cannot say if the > > code > > > is correct or not -- if the config (that's something that's > > > theoretically not known at the time of code review) is going to enable > > 2PC, > > > then the correct code should look one way, otherwise it would need to > > look > > > differently. Also, say if code is written with InitTransaction() > without > > > explicit abort and then for whatever reason the code would get used > with > > > 2PC enabled (could be a library in a bigger product) it'll start > breaking > > > in a non-intuitive way. > > > > > > > 20B. Hmm, if the admin disables 2PC, there is likely a reason behind > > that > > > > > > That's true, but reality may be more complicated. Say a user wants to > > run > > > a self-managed Flink with Confluent cloud. Confluent cloud adim may > not > > > be comfortable enabling 2PC to general user accounts that use services > > not > > > managed by Confluent (the same way Confluent doesn't allow increasing > max > > > transaction timeout for general user accounts). Right now, > self-managed > > > Flink works because it uses reflection, if it moves to use public APIs > > > provided by KIP-939 it'll break. > > > > > > > 32. Ok. That's the kafka metric. In that case, the metric name has a > > > group and a name. There is no type and no package name. > > > > > > Is this a suggestion to change or confirmation that the current logic > is > > > ok? I just copied an existing metric but can change if needed. > > > > > > -Artem > > > > > > On Tue, Feb 20, 2024 at 11:25 AM Jun Rao <j...@confluent.io.invalid> > > wrote: > > > > > > > Hi, Artem, > > > > > > > > Thanks for the reply. > > > > > > > > 20. "Say if an application > > > > currently uses initTransactions() to achieve the current semantics, > it > > > > would need to be rewritten to use initTransactions() + abort to > achieve > > > the > > > > same semantics if the config is changed. " > > > > > > > > This only takes care of the abort case. The application still needs > to > > be > > > > changed to handle the commit case properly > > > > if transaction.two.phase.commit.enable is set to true. > > > > > > > > "Even when KIP-939 is implemented, > > > > there would be situations when 2PC is disabled by the admin (e.g. > Kafka > > > > service providers may be reluctant to enable 2PC for Flink services > > that > > > > users host themselves), so we either have to perpetuate the > > > > reflection-based implementation in Flink or enable > keepPreparedTxn=true > > > > without 2PC." > > > > > > > > Hmm, if the admin disables 2PC, there is likely a reason behind > that. I > > > am > > > > not sure that we should provide an API to encourage the application > to > > > > circumvent that. > > > > > > > > 32. Ok. That's the kafka metric. In that case, the metric name has a > > > group > > > > and a name. There is no type and no package name. > > > > > > > > Jun > > > > > > > > > > > > On Thu, Feb 15, 2024 at 8:23 PM Artem Livshits > > > > <alivsh...@confluent.io.invalid> wrote: > > > > > > > > > Hi Jun, > > > > > > > > > > Thank you for your questions. > > > > > > > > > > > 20. So to abort a prepared transaction after the producer start, > we > > > > could > > > > > use ... > > > > > > > > > > I agree, initTransaction(true) + abort would accomplish the > behavior > > of > > > > > initTransactions(false), so we could technically have fewer ways to > > > > achieve > > > > > the same thing, which is generally valuable. I wonder, though, if > > that > > > > > would be intuitive from the application perspective. Say if an > > > > application > > > > > currently uses initTransactions() to achieve the current semantics, > > it > > > > > would need to be rewritten to use initTransactions() + abort to > > achieve > > > > the > > > > > same semantics if the config is changed. I think this could create > > > > > subtle confusion, as the config change is generally decoupled from > > > > changing > > > > > application implementation. > > > > > > > > > > > The use case mentioned for keepPreparedTxn=true without 2PC > > doesn't > > > > seem > > > > > very important > > > > > > > > > > I agree, it's not a strict requirement. It is, however, a missing > > > option > > > > > in the public API, so currently Flink has to use reflection to > > emulate > > > > this > > > > > functionality without 2PC support. Even when KIP-939 is > > implemented, > > > > > there would be situations when 2PC is disabled by the admin (e.g. > > Kafka > > > > > service providers may be reluctant to enable 2PC for Flink services > > > that > > > > > users host themselves), so we either have to perpetuate the > > > > > reflection-based implementation in Flink or enable > > keepPreparedTxn=true > > > > > without 2PC. > > > > > > > > > > > 32. > > > > > > > > > > > > > > > > > > > > kafka.server:type=transaction-coordinator-metrics,name=active-transaction-open-time-max > > > > > > > > > > I just followed the existing metric implementation example > > > > > > > > > > > > > > > > > > > > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala#L95 > > > > > , > > > > > which maps to > > > > > > > > > > > > > > > > > > > > kafka.server:type=transaction-coordinator-metrics,name=partition-load-time-max. > > > > > > > > > > > 33. "If the value is 'true' then the corresponding field is set > > > > > > > > > > That's correct. Updated the KIP. > > > > > > > > > > -Artem > > > > > > > > > > On Wed, Feb 7, 2024 at 10:06 AM Jun Rao <j...@confluent.io.invalid> > > > > wrote: > > > > > > > > > > > Hi, Artem, > > > > > > > > > > > > Thanks for the reply. > > > > > > > > > > > > 20. So to abort a prepared transaction after producer start, we > > could > > > > use > > > > > > either > > > > > > producer.initTransactions(false) > > > > > > or > > > > > > producer.initTransactions(true) > > > > > > producer.abortTransaction > > > > > > Could we just always use the latter API? If we do this, we could > > > > > > potentially eliminate the keepPreparedTxn flag in > > initTransactions(). > > > > > After > > > > > > the initTransactions() call, the outstanding txn is always > > preserved > > > if > > > > > 2pc > > > > > > is enabled and aborted if 2pc is disabled. The use case mentioned > > for > > > > > > keepPreparedTxn=true without 2PC doesn't seem very important. If > we > > > > could > > > > > > do that, it seems that we have (1) less redundant and simpler > APIs; > > > (2) > > > > > > more symmetric syntax for aborting/committing a prepared txn > after > > > > > producer > > > > > > restart. > > > > > > > > > > > > 32. > > > > > > > > > > > > > > > > > > > > > > > > > > > kafka.server:type=transaction-coordinator-metrics,name=active-transaction-open-time-max > > > > > > Is this a Yammer or kafka metric? The former uses the camel case > > for > > > > name > > > > > > and type. The latter uses the hyphen notation, but doesn't have > the > > > > type > > > > > > attribute. > > > > > > > > > > > > 33. "If the value is 'true' then the corresponding field is set > in > > > the > > > > > > InitProducerIdRequest and the KafkaProducer object is set into a > > > state > > > > > > which only allows calling .commitTransaction or > .abortTransaction." > > > > > > We should also allow .completeTransaction, right? > > > > > > > > > > > > Jun > > > > > > > > > > > > > > > > > > On Tue, Feb 6, 2024 at 3:29 PM Artem Livshits > > > > > > <alivsh...@confluent.io.invalid> wrote: > > > > > > > > > > > > > Hi Jun, > > > > > > > > > > > > > > > 20. For Flink usage, it seems that the APIs used to abort and > > > > commit > > > > > a > > > > > > > prepared txn are not symmetric. > > > > > > > > > > > > > > For Flink it is expected that Flink would call > .commitTransaction > > > or > > > > > > > .abortTransaction directly, it wouldn't need to deal with > > > > > > PreparedTxnState, > > > > > > > the outcome is actually determined by the Flink's job manager, > > not > > > by > > > > > > > comparison of PreparedTxnState. So for Flink, if the Kafka > sync > > > > > crashes > > > > > > > and restarts there are 2 cases: > > > > > > > > > > > > > > 1. Transaction is not prepared. In that case just call > > > > > > > producer.initTransactions(false) and then can start > transactions > > as > > > > > > needed. > > > > > > > 2. Transaction is prepared. In that case call > > > > > > > producer.initTransactions(true) and wait for the decision from > > the > > > > job > > > > > > > manager. Note that it's not given that the transaction will > get > > > > > > committed, > > > > > > > the decision could also be an abort. > > > > > > > > > > > > > > > 21. transaction.max.timeout.ms could in theory be MAX_INT. > > > > Perhaps > > > > > we > > > > > > > could use a negative timeout in the record to indicate 2PC? > > > > > > > > > > > > > > -1 sounds good, updated. > > > > > > > > > > > > > > > 30. The KIP has two different APIs to abort an ongoing txn. > Do > > we > > > > > need > > > > > > > both? > > > > > > > > > > > > > > I think of producer.initTransactions() to be an implementation > > for > > > > > > > adminClient.forceTerminateTransaction(transactionalId). > > > > > > > > > > > > > > > 31. "This would flush all the pending messages and transition > > the > > > > > > > producer > > > > > > > > > > > > > > Updated the KIP to clarify that IllegalStateException will be > > > thrown. > > > > > > > > > > > > > > -Artem > > > > > > > > > > > > > > > > > > > > > On Mon, Feb 5, 2024 at 2:22 PM Jun Rao > <j...@confluent.io.invalid > > > > > > > > wrote: > > > > > > > > > > > > > > > Hi, Artem, > > > > > > > > > > > > > > > > Thanks for the reply. > > > > > > > > > > > > > > > > 20. For Flink usage, it seems that the APIs used to abort and > > > > commit > > > > > a > > > > > > > > prepared txn are not symmetric. > > > > > > > > To abort, the app will just call > > > > > > > > producer.initTransactions(false) > > > > > > > > > > > > > > > > To commit, the app needs to call > > > > > > > > producer.initTransactions(true) > > > > > > > > producer.completeTransaction(preparedTxnState) > > > > > > > > > > > > > > > > Will this be a concern? For the dual-writer usage, both > > > > abort/commit > > > > > > use > > > > > > > > the same API. > > > > > > > > > > > > > > > > 21. transaction.max.timeout.ms could in theory be MAX_INT. > > > Perhaps > > > > > we > > > > > > > > could > > > > > > > > use a negative timeout in the record to indicate 2PC? > > > > > > > > > > > > > > > > 30. The KIP has two different APIs to abort an ongoing txn. > Do > > we > > > > > need > > > > > > > > both? > > > > > > > > producer.initTransactions(false) > > > > > > > > adminClient.forceTerminateTransaction(transactionalId) > > > > > > > > > > > > > > > > 31. "This would flush all the pending messages and transition > > the > > > > > > > producer > > > > > > > > into a mode where only .commitTransaction, .abortTransaction, > > or > > > > > > > > .completeTransaction could be called. If the call is > > successful > > > > (all > > > > > > > > messages successfully got flushed to all partitions) the > > > > transaction > > > > > is > > > > > > > > prepared." > > > > > > > > If the producer calls send() in that state, what exception > > will > > > > the > > > > > > > caller > > > > > > > > receive? > > > > > > > > > > > > > > > > Jun > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Feb 2, 2024 at 3:34 PM Artem Livshits > > > > > > > > <alivsh...@confluent.io.invalid> wrote: > > > > > > > > > > > > > > > > > Hi Jun, > > > > > > > > > > > > > > > > > > > Then, should we change the following in the example to > use > > > > > > > > > InitProducerId(true) instead? > > > > > > > > > > > > > > > > > > We could. I just thought that it's good to make the example > > > > > > > > self-contained > > > > > > > > > by starting from a clean state. > > > > > > > > > > > > > > > > > > > Also, could Flink just follow the dual-write recipe? > > > > > > > > > > > > > > > > > > I think it would bring some unnecessary logic to Flink (or > > any > > > > > other > > > > > > > > system > > > > > > > > > that already has a transaction coordinator and just wants > to > > > > drive > > > > > > > Kafka > > > > > > > > to > > > > > > > > > the desired state). We could discuss it with Flink folks, > > the > > > > > > current > > > > > > > > > proposal was developed in collaboration with them. > > > > > > > > > > > > > > > > > > > 21. Could a non 2pc user explicitly set the > > > > TransactionTimeoutMs > > > > > to > > > > > > > > > Integer.MAX_VALUE? > > > > > > > > > > > > > > > > > > The server would reject this for regular transactions, it > > only > > > > > > accepts > > > > > > > > > values that are <= *transaction.max.timeout.ms > > > > > > > > > <http://transaction.max.timeout.ms> *(a broker config). > > > > > > > > > > > > > > > > > > > 24. Hmm, In KIP-890, without 2pc, the coordinator expects > > the > > > > > > endTxn > > > > > > > > > request to use the ongoing pid. ... > > > > > > > > > > > > > > > > > > Without 2PC there is no case where the pid could change > > between > > > > > > > starting > > > > > > > > a > > > > > > > > > transaction and endTxn (InitProducerId would abort any > > ongoing > > > > > > > > > transaction). WIth 2PC there is now a case where there > could > > > be > > > > > > > > > InitProducerId that can change the pid without aborting the > > > > > > > transaction, > > > > > > > > so > > > > > > > > > we need to handle that. I wouldn't say that the flow is > > > > different, > > > > > > but > > > > > > > > > it's rather extended to handle new cases. The main > principle > > > is > > > > > > still > > > > > > > > the > > > > > > > > > same -- for all operations we use the latest "operational" > > pid > > > > and > > > > > > > epoch > > > > > > > > > known to the client, this way we guarantee that we can > fence > > > > > zombie / > > > > > > > > split > > > > > > > > > brain clients by disrupting the "latest known" pid + epoch > > > > > > progression. > > > > > > > > > > > > > > > > > > > 25. "We send out markers using the original ongoing > > > transaction > > > > > > > > > ProducerId and ProducerEpoch" ... > > > > > > > > > > > > > > > > > > Updated. > > > > > > > > > > > > > > > > > > -Artem > > > > > > > > > > > > > > > > > > On Mon, Jan 29, 2024 at 4:57 PM Jun Rao > > > <j...@confluent.io.invalid > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Hi, Artem, > > > > > > > > > > > > > > > > > > > > Thanks for the reply. > > > > > > > > > > > > > > > > > > > > 20. So for the dual-write recipe, we should always call > > > > > > > > > > InitProducerId(keepPreparedTxn=true) from the producer? > > Then, > > > > > > should > > > > > > > we > > > > > > > > > > change the following in the example to use > > > InitProducerId(true) > > > > > > > > instead? > > > > > > > > > > 1. InitProducerId(false); TC STATE: Empty, ProducerId=42, > > > > > > > > > > ProducerEpoch=MAX-1, PrevProducerId=-1, > NextProducerId=-1, > > > > > > > > > > NextProducerEpoch=-1; RESPONSE ProducerId=42, > Epoch=MAX-1, > > > > > > > > > > OngoingTxnProducerId=-1, OngoingTxnEpoch=-1. > > > > > > > > > > Also, could Flink just follow the dual-write recipe? It's > > > > simpler > > > > > > if > > > > > > > > > there > > > > > > > > > > is one way to solve the 2pc issue. > > > > > > > > > > > > > > > > > > > > 21. Could a non 2pc user explicitly set the > > > > TransactionTimeoutMs > > > > > to > > > > > > > > > > Integer.MAX_VALUE? > > > > > > > > > > > > > > > > > > > > 24. Hmm, In KIP-890, without 2pc, the coordinator expects > > the > > > > > > endTxn > > > > > > > > > > request to use the ongoing pid. With 2pc, the coordinator > > now > > > > > > expects > > > > > > > > the > > > > > > > > > > endTxn request to use the next pid. So, the flow is > > > different, > > > > > > right? > > > > > > > > > > > > > > > > > > > > 25. "We send out markers using the original ongoing > > > transaction > > > > > > > > > ProducerId > > > > > > > > > > and ProducerEpoch" > > > > > > > > > > We should use ProducerEpoch + 1 in the marker, right? > > > > > > > > > > > > > > > > > > > > Jun > > > > > > > > > > > > > > > > > > > > On Fri, Jan 26, 2024 at 8:35 PM Artem Livshits > > > > > > > > > > <alivsh...@confluent.io.invalid> wrote: > > > > > > > > > > > > > > > > > > > > > Hi Jun, > > > > > > > > > > > > > > > > > > > > > > > 20. I am a bit confused by how we set > keepPreparedTxn. > > > > ... > > > > > > > > > > > > > > > > > > > > > > keepPreparedTxn=true informs the transaction > coordinator > > > that > > > > > it > > > > > > > > should > > > > > > > > > > > keep the ongoing transaction, if any. If the > > > > > > > keepPreparedTxn=false, > > > > > > > > > then > > > > > > > > > > > any ongoing transaction is aborted (this is exactly the > > > > current > > > > > > > > > > behavior). > > > > > > > > > > > enable2Pc is a separate argument that is controlled by > > the > > > > > > > > > > > *transaction.two.phase.commit.enable *setting on the > > > client. > > > > > > > > > > > > > > > > > > > > > > To start 2PC, the client just needs to set > > > > > > > > > > > *transaction.two.phase.commit.enable*=true in the > config. > > > > Then > > > > > > if > > > > > > > > the > > > > > > > > > > > client knows the status of the transaction upfront (in > > the > > > > case > > > > > > of > > > > > > > > > Flink, > > > > > > > > > > > Flink keeps the knowledge if the transaction is > prepared > > in > > > > its > > > > > > own > > > > > > > > > > store, > > > > > > > > > > > so it always knows upfront), it can set keepPreparedTxn > > > > > > > accordingly, > > > > > > > > > then > > > > > > > > > > > if the transaction was prepared, it'll be ready for the > > > > client > > > > > to > > > > > > > > > > complete > > > > > > > > > > > the appropriate action; if the client doesn't have a > > > > knowledge > > > > > > that > > > > > > > > the > > > > > > > > > > > transaction is prepared, keepPreparedTxn is going to be > > > > false, > > > > > in > > > > > > > > which > > > > > > > > > > > case we'll get to a clean state (the same way we do > > today). > > > > > > > > > > > > > > > > > > > > > > For the dual-write recipe, the client doesn't know > > upfront > > > if > > > > > the > > > > > > > > > > > transaction is prepared, this information is implicitly > > > > encoded > > > > > > > > > > > PreparedTxnState value that can be used to resolve the > > > > > > transaction > > > > > > > > > state. > > > > > > > > > > > In that case, keepPreparedTxn should always be true, > > > because > > > > we > > > > > > > don't > > > > > > > > > > know > > > > > > > > > > > upfront and we don't want to accidentally abort a > > committed > > > > > > > > > transaction. > > > > > > > > > > > > > > > > > > > > > > The forceTerminateTransaction call can just use > > > > > > > > keepPreparedTxn=false, > > > > > > > > > it > > > > > > > > > > > actually doesn't matter if it sets Enable2Pc flag. > > > > > > > > > > > > > > > > > > > > > > > 21. TransactionLogValue: Do we need some field to > > > identify > > > > > > > whether > > > > > > > > > this > > > > > > > > > > > is written for 2PC so that ongoing txn is never auto > > > aborted? > > > > > > > > > > > > > > > > > > > > > > The TransactionTimeoutMs would be set to > > Integer.MAX_VALUE > > > if > > > > > 2PC > > > > > > > was > > > > > > > > > > > enabled. I've added a note to the KIP about this. > > > > > > > > > > > > > > > > > > > > > > > 22 > > > > > > > > > > > > > > > > > > > > > > You're right it's a typo. I fixed it as well as step 9 > > > > > (REQUEST: > > > > > > > > > > > ProducerId=73, ProducerEpoch=MAX). > > > > > > > > > > > > > > > > > > > > > > > 23. It's a bit weird that Enable2Pc is driven by a > > config > > > > > while > > > > > > > > > > > KeepPreparedTxn is from an API param ... > > > > > > > > > > > > > > > > > > > > > > The intent to use 2PC doesn't change from transaction > to > > > > > > > transaction, > > > > > > > > > but > > > > > > > > > > > the intent to keep prepared txn may change from > > transaction > > > > to > > > > > > > > > > > transaction. In dual-write recipes the distinction is > > not > > > > > clear, > > > > > > > but > > > > > > > > > for > > > > > > > > > > > use cases where keepPreparedTxn value is known upfront > > > (e.g. > > > > > > Flink) > > > > > > > > > it's > > > > > > > > > > > more prominent. E.g. a Flink's Kafka sink operator > could > > > be > > > > > > > deployed > > > > > > > > > > with > > > > > > > > > > > *transaction.two.phase.commit.enable*=true hardcoded in > > the > > > > > > image, > > > > > > > > but > > > > > > > > > > > keepPreparedTxn cannot be hardcoded in the image, > because > > > it > > > > > > > depends > > > > > > > > on > > > > > > > > > > the > > > > > > > > > > > job manager's state. > > > > > > > > > > > > > > > > > > > > > > > 24 > > > > > > > > > > > > > > > > > > > > > > The flow is actually going to be the same way as it is > > now > > > -- > > > > > the > > > > > > > > > "main" > > > > > > > > > > > producer id + epoch needs to be used in all operations > to > > > > > prevent > > > > > > > > > fencing > > > > > > > > > > > (it's sort of a common "header" in all RPC calls that > > > follow > > > > > the > > > > > > > same > > > > > > > > > > > rules). The ongoing txn info is just additional info > for > > > > > making > > > > > > a > > > > > > > > > > commit / > > > > > > > > > > > abort decision based on the PreparedTxnState from the > DB. > > > > > > > > > > > > > > > > > > > > > > --Artem > > > > > > > > > > > > > > > > > > > > > > On Thu, Jan 25, 2024 at 11:05 AM Jun Rao > > > > > > <j...@confluent.io.invalid > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > Hi, Artem, > > > > > > > > > > > > > > > > > > > > > > > > Thanks for the reply. A few more comments. > > > > > > > > > > > > > > > > > > > > > > > > 20. I am a bit confused by how we set > keepPreparedTxn. > > > From > > > > > the > > > > > > > > KIP, > > > > > > > > > I > > > > > > > > > > > got > > > > > > > > > > > > the following (1) to start 2pc, we call > > > > > > > > > > > > InitProducerId(keepPreparedTxn=false); (2) when the > > > > producer > > > > > > > fails > > > > > > > > > and > > > > > > > > > > > > needs to do recovery, it calls > > > > > > > > InitProducerId(keepPreparedTxn=true); > > > > > > > > > > (3) > > > > > > > > > > > > Admin.forceTerminateTransaction() calls > > > > > > > > > > > > InitProducerId(keepPreparedTxn=false). > > > > > > > > > > > > 20.1 In (1), when a producer calls > > InitProducerId(false) > > > > with > > > > > > 2pc > > > > > > > > > > > enabled, > > > > > > > > > > > > and there is an ongoing txn, should the server return > > an > > > > > error > > > > > > to > > > > > > > > the > > > > > > > > > > > > InitProducerId request? If so, what would be the > error > > > > code? > > > > > > > > > > > > 20.2 How do we distinguish between (1) and (3)? It's > > the > > > > same > > > > > > API > > > > > > > > > call > > > > > > > > > > > but > > > > > > > > > > > > (1) doesn't abort ongoing txn and (2) does. > > > > > > > > > > > > 20.3 The usage in (1) seems unintuitive. 2pc implies > > > > keeping > > > > > > the > > > > > > > > > > ongoing > > > > > > > > > > > > txn. So, setting keepPreparedTxn to false to start > 2pc > > > > seems > > > > > > > > counter > > > > > > > > > > > > intuitive. > > > > > > > > > > > > > > > > > > > > > > > > 21. TransactionLogValue: Do we need some field to > > > identify > > > > > > > whether > > > > > > > > > this > > > > > > > > > > > is > > > > > > > > > > > > written for 2PC so that ongoing txn is never auto > > > aborted? > > > > > > > > > > > > > > > > > > > > > > > > 22. "8. InitProducerId(true); TC STATE: Ongoing, > > > > > ProducerId=42, > > > > > > > > > > > > ProducerEpoch=MAX-1, PrevProducerId=-1, > > > NextProducerId=73, > > > > > > > > > > > > NextProducerEpoch=MAX; RESPONSE ProducerId=73, > > > Epoch=MAX-1, > > > > > > > > > > > > OngoingTxnProducerId=42, OngoingTxnEpoch=MAX-1" > > > > > > > > > > > > It seems in the above example, Epoch in RESPONSE > should > > > be > > > > > MAX > > > > > > to > > > > > > > > > match > > > > > > > > > > > > NextProducerEpoch? > > > > > > > > > > > > > > > > > > > > > > > > 23. It's a bit weird that Enable2Pc is driven by a > > config > > > > > > > > > > > > while KeepPreparedTxn is from an API param. Should we > > > make > > > > > them > > > > > > > > more > > > > > > > > > > > > consistent since they seem related? > > > > > > > > > > > > > > > > > > > > > > > > 24. "9. Commit; REQUEST: ProducerId=73, > > > > ProducerEpoch=MAX-1; > > > > > TC > > > > > > > > > STATE: > > > > > > > > > > > > PrepareCommit, ProducerId=42, ProducerEpoch=MAX, > > > > > > > PrevProducerId=73, > > > > > > > > > > > > NextProducerId=85, NextProducerEpoch=0; RESPONSE > > > > > ProducerId=85, > > > > > > > > > > Epoch=0, > > > > > > > > > > > > When a commit request is sent, it uses the latest > > > > ProducerId > > > > > > and > > > > > > > > > > > > ProducerEpoch." > > > > > > > > > > > > The step where we use the next produceId to commit an > > old > > > > txn > > > > > > > > works, > > > > > > > > > > but > > > > > > > > > > > > can be confusing. It's going to be hard for people > > > > > implementing > > > > > > > > this > > > > > > > > > > new > > > > > > > > > > > > client protocol to figure out when to use the current > > or > > > > the > > > > > > new > > > > > > > > > > > producerId > > > > > > > > > > > > in the EndTxnRequest. One potential way to improve > this > > > is > > > > to > > > > > > > > extend > > > > > > > > > > > > EndTxnRequest with a new field like > > > expectedNextProducerId. > > > > > > Then > > > > > > > we > > > > > > > > > can > > > > > > > > > > > > always use the old produceId in the existing field, > but > > > set > > > > > > > > > > > > expectedNextProducerId to bypass the fencing logic > when > > > > > needed. > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > > > > > > > > Jun > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Dec 18, 2023 at 2:06 PM Artem Livshits > > > > > > > > > > > > <alivsh...@confluent.io.invalid> wrote: > > > > > > > > > > > > > > > > > > > > > > > > > Hi Jun, > > > > > > > > > > > > > > > > > > > > > > > > > > Thank you for the comments. > > > > > > > > > > > > > > > > > > > > > > > > > > > 10. For the two new fields in Enable2Pc and > > > > > KeepPreparedTxn > > > > > > > ... > > > > > > > > > > > > > > > > > > > > > > > > > > I added a note that all combinations are valid. > > > > > > > Enable2Pc=false > > > > > > > > & > > > > > > > > > > > > > KeepPreparedTxn=true could be potentially useful > for > > > > > backward > > > > > > > > > > > > compatibility > > > > > > > > > > > > > with Flink, when the new version of Flink that > > > implements > > > > > > > KIP-319 > > > > > > > > > > tries > > > > > > > > > > > > to > > > > > > > > > > > > > work with a cluster that doesn't authorize 2PC. > > > > > > > > > > > > > > > > > > > > > > > > > > > 11. InitProducerIdResponse: If there is no > ongoing > > > > txn, > > > > > > what > > > > > > > > > will > > > > > > > > > > > > > OngoingTxnProducerId and OngoingTxnEpoch be set? > > > > > > > > > > > > > > > > > > > > > > > > > > I added a note that they will be set to -1. The > > client > > > > > then > > > > > > > will > > > > > > > > > > know > > > > > > > > > > > > that > > > > > > > > > > > > > there is no ongoing txn and .completeTransaction > > > becomes > > > > a > > > > > > > no-op > > > > > > > > > (but > > > > > > > > > > > > still > > > > > > > > > > > > > required before .send is enabled). > > > > > > > > > > > > > > > > > > > > > > > > > > > 12. ListTransactionsRequest related changes: It > > seems > > > > > those > > > > > > > are > > > > > > > > > > > already > > > > > > > > > > > > > covered by KIP-994? > > > > > > > > > > > > > > > > > > > > > > > > > > Removed from this KIP. > > > > > > > > > > > > > > > > > > > > > > > > > > > 13. TransactionalLogValue ... > > > > > > > > > > > > > > > > > > > > > > > > > > This is now updated to work on top of KIP-890. > > > > > > > > > > > > > > > > > > > > > > > > > > > 14. "Note that the (producerId, epoch) pair that > > > > > > corresponds > > > > > > > to > > > > > > > > > the > > > > > > > > > > > > > ongoing transaction ... > > > > > > > > > > > > > > > > > > > > > > > > > > This is now updated to work on top of KIP-890. > > > > > > > > > > > > > > > > > > > > > > > > > > > 15. active-transaction-total-time-max : ... > > > > > > > > > > > > > > > > > > > > > > > > > > Updated. > > > > > > > > > > > > > > > > > > > > > > > > > > > 16. "transaction.two.phase.commit.enable The > > default > > > > > would > > > > > > be > > > > > > > > > > > ‘false’. > > > > > > > > > > > > > If it’s ‘false’, 2PC functionality is disabled even > > if > > > > the > > > > > > ACL > > > > > > > is > > > > > > > > > set > > > > > > > > > > > ... > > > > > > > > > > > > > > > > > > > > > > > > > > Disabling 2PC effectively removes all authorization > > to > > > > use > > > > > > it, > > > > > > > > > hence > > > > > > > > > > I > > > > > > > > > > > > > thought TRANSACTIONAL_ID_AUTHORIZATION_FAILED would > > be > > > > > > > > appropriate. > > > > > > > > > > > > > > > > > > > > > > > > > > Do you suggest using a different error code for 2PC > > > > > > > authorization > > > > > > > > > vs > > > > > > > > > > > some > > > > > > > > > > > > > other authorization (e.g. > > > > > > > > > TRANSACTIONAL_ID_2PC_AUTHORIZATION_FAILED) > > > > > > > > > > > or a > > > > > > > > > > > > > different code for disabled vs. unauthorised (e.g. > > > > > > > > > > > > > TWO_PHASE_COMMIT_DISABLED) or both? > > > > > > > > > > > > > > > > > > > > > > > > > > > 17. completeTransaction(). We expect this to be > > only > > > > used > > > > > > > > during > > > > > > > > > > > > > recovery. > > > > > > > > > > > > > > > > > > > > > > > > > > It can also be used if, say, a commit to the > database > > > > fails > > > > > > and > > > > > > > > the > > > > > > > > > > > > result > > > > > > > > > > > > > is inconclusive, e.g. > > > > > > > > > > > > > > > > > > > > > > > > > > 1. Begin DB transaction > > > > > > > > > > > > > 2. Begin Kafka transaction > > > > > > > > > > > > > 3. Prepare Kafka transaction > > > > > > > > > > > > > 4. Commit DB transaction > > > > > > > > > > > > > 5. The DB commit fails, figure out the state of the > > > > > > transaction > > > > > > > > by > > > > > > > > > > > > reading > > > > > > > > > > > > > the PreparedTxnState from DB > > > > > > > > > > > > > 6. Complete Kafka transaction with the > > > PreparedTxnState. > > > > > > > > > > > > > > > > > > > > > > > > > > > 18. "either prepareTransaction was called or > > > > > > > > > initTransaction(true) > > > > > > > > > > > was > > > > > > > > > > > > > called": "either" should be "neither"? > > > > > > > > > > > > > > > > > > > > > > > > > > Updated. > > > > > > > > > > > > > > > > > > > > > > > > > > > 19. Since InitProducerId always bumps up the > epoch, > > > it > > > > > > > creates > > > > > > > > a > > > > > > > > > > > > > situation ... > > > > > > > > > > > > > > > > > > > > > > > > > > InitProducerId only bumps the producer epoch, the > > > ongoing > > > > > > > > > transaction > > > > > > > > > > > > epoch > > > > > > > > > > > > > stays the same, no matter how many times the > > > > InitProducerId > > > > > > is > > > > > > > > > called > > > > > > > > > > > > > before the transaction is completed. Eventually > the > > > > epoch > > > > > > may > > > > > > > > > > > overflow, > > > > > > > > > > > > > and then a new producer id would be allocated, but > > the > > > > > > ongoing > > > > > > > > > > > > transaction > > > > > > > > > > > > > producer id would stay the same. > > > > > > > > > > > > > > > > > > > > > > > > > > I've added a couple examples in the KIP ( > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-939%3A+Support+Participation+in+2PC#KIP939:SupportParticipationin2PC-PersistedDataFormatChanges > > > > > > > > > > > > > ) > > > > > > > > > > > > > that walk through some scenarios and show how the > > state > > > > is > > > > > > > > changed. > > > > > > > > > > > > > > > > > > > > > > > > > > -Artem > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Dec 8, 2023 at 6:04 PM Jun Rao > > > > > > > <j...@confluent.io.invalid > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi, Artem, > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks for the KIP. A few comments below. > > > > > > > > > > > > > > > > > > > > > > > > > > > > 10. For the two new fields in Enable2Pc and > > > > > KeepPreparedTxn > > > > > > > in > > > > > > > > > > > > > > InitProducerId, it would be useful to document a > > bit > > > > more > > > > > > > > detail > > > > > > > > > on > > > > > > > > > > > > what > > > > > > > > > > > > > > values are set under what cases. For example, are > > all > > > > > four > > > > > > > > > > > combinations > > > > > > > > > > > > > > valid? > > > > > > > > > > > > > > > > > > > > > > > > > > > > 11. InitProducerIdResponse: If there is no > ongoing > > > > txn, > > > > > > what > > > > > > > > > will > > > > > > > > > > > > > > OngoingTxnProducerId and OngoingTxnEpoch be set? > > > > > > > > > > > > > > > > > > > > > > > > > > > > 12. ListTransactionsRequest related changes: It > > seems > > > > > those > > > > > > > are > > > > > > > > > > > already > > > > > > > > > > > > > > covered by KIP-994? > > > > > > > > > > > > > > > > > > > > > > > > > > > > 13. TransactionalLogValue: Could we name > > > > > > > TransactionProducerId > > > > > > > > > and > > > > > > > > > > > > > > ProducerId better? It's not clear from the name > > which > > > > is > > > > > > for > > > > > > > > > which. > > > > > > > > > > > > > > > > > > > > > > > > > > > > 14. "Note that the (producerId, epoch) pair that > > > > > > corresponds > > > > > > > to > > > > > > > > > the > > > > > > > > > > > > > ongoing > > > > > > > > > > > > > > transaction is going to be written instead of the > > > > > existing > > > > > > > > > > ProducerId > > > > > > > > > > > > and > > > > > > > > > > > > > > ProducerEpoch fields (which are renamed to > reflect > > > the > > > > > > > > semantics) > > > > > > > > > > to > > > > > > > > > > > > > > support downgrade.": I am a bit confused on that. > > Are > > > > we > > > > > > > > writing > > > > > > > > > > > > > different > > > > > > > > > > > > > > values to the existing fields? Then, we can't > > > > downgrade, > > > > > > > right? > > > > > > > > > > > > > > > > > > > > > > > > > > > > 15. active-transaction-total-time-max : Would > > > > > > > > > > > > > > active-transaction-open-time-max be more > intuitive? > > > > Also, > > > > > > > could > > > > > > > > > we > > > > > > > > > > > > > include > > > > > > > > > > > > > > the full name (group, tags, etc)? > > > > > > > > > > > > > > > > > > > > > > > > > > > > 16. "transaction.two.phase.commit.enable The > > default > > > > > would > > > > > > be > > > > > > > > > > > ‘false’. > > > > > > > > > > > > > If > > > > > > > > > > > > > > it’s ‘false’, 2PC functionality is disabled even > if > > > the > > > > > ACL > > > > > > > is > > > > > > > > > set, > > > > > > > > > > > > > clients > > > > > > > > > > > > > > that attempt to use this functionality would > > receive > > > > > > > > > > > > > > TRANSACTIONAL_ID_AUTHORIZATION_FAILED error." > > > > > > > > > > > > > > TRANSACTIONAL_ID_AUTHORIZATION_FAILED seems > > > unintuitive > > > > > for > > > > > > > the > > > > > > > > > > > client > > > > > > > > > > > > to > > > > > > > > > > > > > > understand what the actual cause is. > > > > > > > > > > > > > > > > > > > > > > > > > > > > 17. completeTransaction(). We expect this to be > > only > > > > used > > > > > > > > during > > > > > > > > > > > > > recovery. > > > > > > > > > > > > > > Could we document this clearly? Could we prevent > it > > > > from > > > > > > > being > > > > > > > > > used > > > > > > > > > > > > > > incorrectly (e.g. throw an exception if the > > producer > > > > has > > > > > > > called > > > > > > > > > > other > > > > > > > > > > > > > > methods like send())? > > > > > > > > > > > > > > > > > > > > > > > > > > > > 18. "either prepareTransaction was called or > > > > > > > > > initTransaction(true) > > > > > > > > > > > was > > > > > > > > > > > > > > called": "either" should be "neither"? > > > > > > > > > > > > > > > > > > > > > > > > > > > > 19. Since InitProducerId always bumps up the > epoch, > > > it > > > > > > > creates > > > > > > > > a > > > > > > > > > > > > > situation > > > > > > > > > > > > > > where there could be multiple outstanding txns. > The > > > > > > following > > > > > > > > is > > > > > > > > > an > > > > > > > > > > > > > example > > > > > > > > > > > > > > of a potential problem during recovery. > > > > > > > > > > > > > > The last txn epoch in the external store is 41 > > > when > > > > > the > > > > > > > app > > > > > > > > > > dies. > > > > > > > > > > > > > > Instance1 is created for recovery. > > > > > > > > > > > > > > 1. (instance1) > > > > InitProducerId(keepPreparedTxn=true), > > > > > > > > > epoch=42, > > > > > > > > > > > > > > ongoingEpoch=41 > > > > > > > > > > > > > > 2. (instance1) dies before completeTxn(41) > can > > > be > > > > > > > called. > > > > > > > > > > > > > > Instance2 is created for recovery. > > > > > > > > > > > > > > 3. (instance2) > > > > InitProducerId(keepPreparedTxn=true), > > > > > > > > > epoch=43, > > > > > > > > > > > > > > ongoingEpoch=42 > > > > > > > > > > > > > > 4. (instance2) completeTxn(41) => abort > > > > > > > > > > > > > > The first problem is that 41 now is aborted > when > > > it > > > > > > should > > > > > > > > be > > > > > > > > > > > > > committed. > > > > > > > > > > > > > > The second one is that it's not clear who could > > abort > > > > > epoch > > > > > > > 42, > > > > > > > > > > which > > > > > > > > > > > > is > > > > > > > > > > > > > > still open. > > > > > > > > > > > > > > > > > > > > > > > > > > > > Jun > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Dec 7, 2023 at 2:43 PM Justine Olshan > > > > > > > > > > > > > <jols...@confluent.io.invalid > > > > > > > > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hey Artem, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks for the updates. I think what you say > > makes > > > > > > sense. I > > > > > > > > > just > > > > > > > > > > > > > updated > > > > > > > > > > > > > > my > > > > > > > > > > > > > > > KIP so I want to reconcile some of the changes > we > > > > made > > > > > > > > > especially > > > > > > > > > > > > with > > > > > > > > > > > > > > > respect to the TransactionLogValue. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Firstly, I believe tagged fields require a > > default > > > > > value > > > > > > so > > > > > > > > > that > > > > > > > > > > if > > > > > > > > > > > > > they > > > > > > > > > > > > > > > are not filled, we return the default (and know > > > that > > > > > they > > > > > > > > were > > > > > > > > > > > > empty). > > > > > > > > > > > > > > For > > > > > > > > > > > > > > > my KIP, I proposed the default for producer ID > > > tagged > > > > > > > fields > > > > > > > > > > should > > > > > > > > > > > > be > > > > > > > > > > > > > > -1. > > > > > > > > > > > > > > > I was wondering if we could update the KIP to > > > include > > > > > the > > > > > > > > > default > > > > > > > > > > > > > values > > > > > > > > > > > > > > > for producer ID and epoch. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Next, I noticed we decided to rename the > fields. > > I > > > > > guess > > > > > > > that > > > > > > > > > the > > > > > > > > > > > > field > > > > > > > > > > > > > > > "NextProducerId" in my KIP correlates to > > > "ProducerId" > > > > > in > > > > > > > this > > > > > > > > > > KIP. > > > > > > > > > > > Is > > > > > > > > > > > > > > that > > > > > > > > > > > > > > > correct? So we would have > "TransactionProducerId" > > > for > > > > > the > > > > > > > > > > > non-tagged > > > > > > > > > > > > > > field > > > > > > > > > > > > > > > and have "ProducerId" (NextProducerId) and > > > > > > "PrevProducerId" > > > > > > > > as > > > > > > > > > > > tagged > > > > > > > > > > > > > > > fields the final version after KIP-890 and > > KIP-936 > > > > are > > > > > > > > > > implemented. > > > > > > > > > > > > Is > > > > > > > > > > > > > > this > > > > > > > > > > > > > > > correct? I think the tags will need updating, > but > > > > that > > > > > is > > > > > > > > > > trivial. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > The final question I had was with respect to > > > storing > > > > > the > > > > > > > new > > > > > > > > > > epoch. > > > > > > > > > > > > In > > > > > > > > > > > > > > > KIP-890 part 2 (epoch bumps) I think we > concluded > > > > that > > > > > we > > > > > > > > don't > > > > > > > > > > > need > > > > > > > > > > > > to > > > > > > > > > > > > > > > store the epoch since we can interpret the > > previous > > > > > epoch > > > > > > > > based > > > > > > > > > > on > > > > > > > > > > > > the > > > > > > > > > > > > > > > producer ID. But here we could call the > > > > InitProducerId > > > > > > > > multiple > > > > > > > > > > > times > > > > > > > > > > > > > and > > > > > > > > > > > > > > > we only want the producer with the correct > epoch > > to > > > > be > > > > > > able > > > > > > > > to > > > > > > > > > > > commit > > > > > > > > > > > > > the > > > > > > > > > > > > > > > transaction. Is that the correct reasoning for > > why > > > we > > > > > > need > > > > > > > > > epoch > > > > > > > > > > > here > > > > > > > > > > > > > but > > > > > > > > > > > > > > > not the Prepare/Commit state. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > Justine > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Nov 22, 2023 at 9:48 AM Artem Livshits > > > > > > > > > > > > > > > <alivsh...@confluent.io.invalid> wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Justine, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > After thinking a bit about supporting atomic > > dual > > > > > > writes > > > > > > > > for > > > > > > > > > > > Kafka > > > > > > > > > > > > + > > > > > > > > > > > > > > > NoSQL > > > > > > > > > > > > > > > > database, I came to a conclusion that we do > > need > > > to > > > > > > bump > > > > > > > > the > > > > > > > > > > > epoch > > > > > > > > > > > > > even > > > > > > > > > > > > > > > > with InitProducerId(keepPreparedTxn=true). > As > > I > > > > > > > described > > > > > > > > in > > > > > > > > > > my > > > > > > > > > > > > > > previous > > > > > > > > > > > > > > > > email, we wouldn't need to bump the epoch to > > > > protect > > > > > > from > > > > > > > > > > zombies > > > > > > > > > > > > so > > > > > > > > > > > > > > that > > > > > > > > > > > > > > > > reasoning is still true. But we cannot > protect > > > > from > > > > > > > > > > split-brain > > > > > > > > > > > > > > > scenarios > > > > > > > > > > > > > > > > when two or more instances of a producer with > > the > > > > > same > > > > > > > > > > > > transactional > > > > > > > > > > > > > id > > > > > > > > > > > > > > > try > > > > > > > > > > > > > > > > to produce at the same time. The dual-write > > > > example > > > > > > for > > > > > > > > SQL > > > > > > > > > > > > > databases > > > > > > > > > > > > > > ( > > > > > > > > > > > > > > > > > > https://github.com/apache/kafka/pull/14231/files > > > ) > > > > > > > doesn't > > > > > > > > > > have a > > > > > > > > > > > > > > > > split-brain problem because execution is > > > protected > > > > by > > > > > > the > > > > > > > > > > update > > > > > > > > > > > > lock > > > > > > > > > > > > > > on > > > > > > > > > > > > > > > > the transaction state record; however NoSQL > > > > databases > > > > > > may > > > > > > > > not > > > > > > > > > > > have > > > > > > > > > > > > > this > > > > > > > > > > > > > > > > protection (I'll write an example for NoSQL > > > > database > > > > > > > > > dual-write > > > > > > > > > > > > > soon). > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > In a nutshell, here is an example of a > > > split-brain > > > > > > > > scenario: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 1. (instance1) > > > > > InitProducerId(keepPreparedTxn=true), > > > > > > > got > > > > > > > > > > > > epoch=42 > > > > > > > > > > > > > > > > 2. (instance2) > > > > > InitProducerId(keepPreparedTxn=true), > > > > > > > got > > > > > > > > > > > > epoch=42 > > > > > > > > > > > > > > > > 3. (instance1) CommitTxn, epoch bumped to > 43 > > > > > > > > > > > > > > > > 4. (instance2) CommitTxn, this is > > considered a > > > > > > retry, > > > > > > > so > > > > > > > > > it > > > > > > > > > > > got > > > > > > > > > > > > > > epoch > > > > > > > > > > > > > > > 43 > > > > > > > > > > > > > > > > as well > > > > > > > > > > > > > > > > 5. (instance1) Produce messageA > w/sequence 1 > > > > > > > > > > > > > > > > 6. (instance2) Produce messageB w/sequence > > 1, > > > > this > > > > > > is > > > > > > > > > > > > considered a > > > > > > > > > > > > > > > > duplicate > > > > > > > > > > > > > > > > 7. (instance2) Produce messageC > w/sequence 2 > > > > > > > > > > > > > > > > 8. (instance1) Produce messageD w/sequence > > 2, > > > > this > > > > > > is > > > > > > > > > > > > considered a > > > > > > > > > > > > > > > > duplicate > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Now if either of those commit the > transaction, > > it > > > > > would > > > > > > > > have > > > > > > > > > a > > > > > > > > > > > mix > > > > > > > > > > > > of > > > > > > > > > > > > > > > > messages from the two instances (messageA and > > > > > > messageC). > > > > > > > > > With > > > > > > > > > > > the > > > > > > > > > > > > > > proper > > > > > > > > > > > > > > > > epoch bump, instance1 would get fenced at > step > > 3. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > In order to update epoch in > > > > > > > > > > InitProducerId(keepPreparedTxn=true) > > > > > > > > > > > we > > > > > > > > > > > > > > need > > > > > > > > > > > > > > > to > > > > > > > > > > > > > > > > preserve the ongoing transaction's epoch (and > > > > > > producerId, > > > > > > > > if > > > > > > > > > > the > > > > > > > > > > > > > epoch > > > > > > > > > > > > > > > > overflows), because we'd need to make a > correct > > > > > > decision > > > > > > > > when > > > > > > > > > > we > > > > > > > > > > > > > > compare > > > > > > > > > > > > > > > > the PreparedTxnState that we read from the > > > database > > > > > > with > > > > > > > > the > > > > > > > > > > > > > > (producerId, > > > > > > > > > > > > > > > > epoch) of the ongoing transaction. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I've updated the KIP with the following: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > - Ongoing transaction now has 2 > (producerId, > > > > > epoch) > > > > > > > > pairs > > > > > > > > > -- > > > > > > > > > > > one > > > > > > > > > > > > > > pair > > > > > > > > > > > > > > > > describes the ongoing transaction, the > other > > > > pair > > > > > > > > > describes > > > > > > > > > > > > > expected > > > > > > > > > > > > > > > > epoch > > > > > > > > > > > > > > > > for operations on this transactional id > > > > > > > > > > > > > > > > - InitProducerIdResponse now returns 2 > > > > > (producerId, > > > > > > > > epoch) > > > > > > > > > > > pairs > > > > > > > > > > > > > > > > - TransactionalLogValue now has 2 > > (producerId, > > > > > > epoch) > > > > > > > > > pairs, > > > > > > > > > > > the > > > > > > > > > > > > > new > > > > > > > > > > > > > > > > values added as tagged fields, so it's > easy > > to > > > > > > > downgrade > > > > > > > > > > > > > > > > - Added a note about downgrade in the > > > > > Compatibility > > > > > > > > > section > > > > > > > > > > > > > > > > - Added a rejected alternative > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -Artem > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Oct 6, 2023 at 5:16 PM Artem > Livshits < > > > > > > > > > > > > > alivsh...@confluent.io> > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Justine, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thank you for the questions. Currently > > > > > (pre-KIP-939) > > > > > > > we > > > > > > > > > > always > > > > > > > > > > > > > bump > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > epoch on InitProducerId and abort an > ongoing > > > > > > > transaction > > > > > > > > > (if > > > > > > > > > > > > > any). I > > > > > > > > > > > > > > > > > expect this behavior will continue with > > KIP-890 > > > > as > > > > > > > well. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > With KIP-939 we need to support the case > when > > > the > > > > > > > ongoing > > > > > > > > > > > > > transaction > > > > > > > > > > > > > > > > > needs to be preserved when > > > keepPreparedTxn=true. > > > > > > > Bumping > > > > > > > > > > epoch > > > > > > > > > > > > > > without > > > > > > > > > > > > > > > > > aborting or committing a transaction is > > tricky > > > > > > because > > > > > > > > > epoch > > > > > > > > > > > is a > > > > > > > > > > > > > > short > > > > > > > > > > > > > > > > > value and it's easy to overflow. > Currently, > > > the > > > > > > > overflow > > > > > > > > > > case > > > > > > > > > > > is > > > > > > > > > > > > > > > handled > > > > > > > > > > > > > > > > > by aborting the ongoing transaction, which > > > would > > > > > send > > > > > > > out > > > > > > > > > > > > > transaction > > > > > > > > > > > > > > > > > markers with epoch=Short.MAX_VALUE to the > > > > partition > > > > > > > > > leaders, > > > > > > > > > > > > which > > > > > > > > > > > > > > > would > > > > > > > > > > > > > > > > > fence off any messages with the producer id > > > that > > > > > > > started > > > > > > > > > the > > > > > > > > > > > > > > > transaction > > > > > > > > > > > > > > > > > (they would have epoch that is less than > > > > > > > > Short.MAX_VALUE). > > > > > > > > > > > Then > > > > > > > > > > > > it > > > > > > > > > > > > > > is > > > > > > > > > > > > > > > > safe > > > > > > > > > > > > > > > > > to allocate a new producer id and use it in > > new > > > > > > > > > transactions. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > We could say that maybe when > > > keepPreparedTxn=true > > > > > we > > > > > > > bump > > > > > > > > > > epoch > > > > > > > > > > > > > > unless > > > > > > > > > > > > > > > it > > > > > > > > > > > > > > > > > leads to overflow, and don't bump epoch in > > the > > > > > > overflow > > > > > > > > > case. > > > > > > > > > > > I > > > > > > > > > > > > > > don't > > > > > > > > > > > > > > > > > think it's a good solution because if it's > > not > > > > safe > > > > > > to > > > > > > > > keep > > > > > > > > > > the > > > > > > > > > > > > > same > > > > > > > > > > > > > > > > epoch > > > > > > > > > > > > > > > > > when keepPreparedTxn=true, then we must > > handle > > > > the > > > > > > > epoch > > > > > > > > > > > overflow > > > > > > > > > > > > > > case > > > > > > > > > > > > > > > as > > > > > > > > > > > > > > > > > well. So either we should convince > ourselves > > > > that > > > > > > it's > > > > > > > > > safe > > > > > > > > > > to > > > > > > > > > > > > > keep > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > epoch and do it in the general case, or we > > > always > > > > > > bump > > > > > > > > the > > > > > > > > > > > epoch > > > > > > > > > > > > > and > > > > > > > > > > > > > > > > handle > > > > > > > > > > > > > > > > > the overflow. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > With KIP-890, we bump the epoch on every > > > > > transaction > > > > > > > > > commit / > > > > > > > > > > > > > abort. > > > > > > > > > > > > > > > > This > > > > > > > > > > > > > > > > > guarantees that even if > > > > > > > > > InitProducerId(keepPreparedTxn=true) > > > > > > > > > > > > > doesn't > > > > > > > > > > > > > > > > > increment epoch on the ongoing transaction, > > the > > > > > > client > > > > > > > > will > > > > > > > > > > > have > > > > > > > > > > > > to > > > > > > > > > > > > > > > call > > > > > > > > > > > > > > > > > commit or abort to finish the transaction > and > > > > will > > > > > > > > > increment > > > > > > > > > > > the > > > > > > > > > > > > > > epoch > > > > > > > > > > > > > > > > (and > > > > > > > > > > > > > > > > > handle epoch overflow, if needed). If the > > > > ongoing > > > > > > > > > > transaction > > > > > > > > > > > > was > > > > > > > > > > > > > > in a > > > > > > > > > > > > > > > > bad > > > > > > > > > > > > > > > > > state and had some zombies waiting to > arrive, > > > the > > > > > > abort > > > > > > > > > > > operation > > > > > > > > > > > > > > would > > > > > > > > > > > > > > > > > fence them because with KIP-890 every abort > > > would > > > > > > bump > > > > > > > > the > > > > > > > > > > > epoch. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > We could also look at this from the > following > > > > > > > > perspective. > > > > > > > > > > > With > > > > > > > > > > > > > > > KIP-890, > > > > > > > > > > > > > > > > > zombies won't be able to cross transaction > > > > > > boundaries; > > > > > > > > each > > > > > > > > > > > > > > transaction > > > > > > > > > > > > > > > > > completion creates a boundary and any > > activity > > > in > > > > > the > > > > > > > > past > > > > > > > > > > gets > > > > > > > > > > > > > > > confined > > > > > > > > > > > > > > > > in > > > > > > > > > > > > > > > > > the boundary. Then data in any partition > > would > > > > > look > > > > > > > like > > > > > > > > > > this: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 1. message1, epoch=42 > > > > > > > > > > > > > > > > > 2. message2, epoch=42 > > > > > > > > > > > > > > > > > 3. message3, epoch=42 > > > > > > > > > > > > > > > > > 4. marker (commit or abort), epoch=43 > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Now if we inject steps 3a and 3b like this: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 1. message1, epoch=42 > > > > > > > > > > > > > > > > > 2. message2, epoch=42 > > > > > > > > > > > > > > > > > 3. message3, epoch=42 > > > > > > > > > > > > > > > > > 3a. crash > > > > > > > > > > > > > > > > > 3b. InitProducerId(keepPreparedTxn=true) > > > > > > > > > > > > > > > > > 4. marker (commit or abort), epoch=43 > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > The invariant still holds even with steps > 3a > > > and > > > > 3b > > > > > > -- > > > > > > > > > > whatever > > > > > > > > > > > > > > > activity > > > > > > > > > > > > > > > > > was in the past will get confined in the > past > > > > with > > > > > > > > > mandatory > > > > > > > > > > > > abort > > > > > > > > > > > > > / > > > > > > > > > > > > > > > > commit > > > > > > > > > > > > > > > > > that must follow > > > > > > InitProducerId(keepPreparedTxn=true). > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > So KIP-890 provides the proper isolation > > > between > > > > > > > > > > transactions, > > > > > > > > > > > so > > > > > > > > > > > > > > > > > injecting crash + > > > > > > InitProducerId(keepPreparedTxn=true) > > > > > > > > into > > > > > > > > > > the > > > > > > > > > > > > > > > > > transaction sequence is safe from the > zombie > > > > > > protection > > > > > > > > > > > > > perspective. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > That said, I'm still thinking about it and > > > > looking > > > > > > for > > > > > > > > > cases > > > > > > > > > > > that > > > > > > > > > > > > > > might > > > > > > > > > > > > > > > > > break because we don't bump epoch when > > > > > > > > > > > > > > > > > InitProducerId(keepPreparedTxn=true), if > such > > > > cases > > > > > > > > exist, > > > > > > > > > > > we'll > > > > > > > > > > > > > need > > > > > > > > > > > > > > > to > > > > > > > > > > > > > > > > > develop the logic to handle epoch overflow > > for > > > > > > ongoing > > > > > > > > > > > > > transactions. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -Artem > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Oct 3, 2023 at 10:15 AM Justine > > Olshan > > > > > > > > > > > > > > > > > <jols...@confluent.io.invalid> wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >> Hey Artem, > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> Thanks for the KIP. I had a question about > > > epoch > > > > > > > > bumping. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> Previously when we send an InitProducerId > > > > request > > > > > on > > > > > > > > > > Producer > > > > > > > > > > > > > > startup, > > > > > > > > > > > > > > > > we > > > > > > > > > > > > > > > > >> bump the epoch and abort the transaction. > Is > > > it > > > > > > > correct > > > > > > > > to > > > > > > > > > > > > assume > > > > > > > > > > > > > > that > > > > > > > > > > > > > > > > we > > > > > > > > > > > > > > > > >> will still bump the epoch, but just not > > abort > > > > the > > > > > > > > > > transaction? > > > > > > > > > > > > > > > > >> If we still bump the epoch in this case, > how > > > > does > > > > > > this > > > > > > > > > > > interact > > > > > > > > > > > > > with > > > > > > > > > > > > > > > > >> KIP-890 where we also bump the epoch on > > every > > > > > > > > transaction. > > > > > > > > > > (I > > > > > > > > > > > > > think > > > > > > > > > > > > > > > this > > > > > > > > > > > > > > > > >> means that we may skip epochs and the data > > > > itself > > > > > > will > > > > > > > > all > > > > > > > > > > > have > > > > > > > > > > > > > the > > > > > > > > > > > > > > > same > > > > > > > > > > > > > > > > >> epoch) > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> I may have follow ups depending on the > > answer > > > to > > > > > > this. > > > > > > > > :) > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> Thanks, > > > > > > > > > > > > > > > > >> Justine > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> On Thu, Sep 7, 2023 at 9:51 PM Artem > > Livshits > > > > > > > > > > > > > > > > >> <alivsh...@confluent.io.invalid> wrote: > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > Hi Alex, > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > Thank you for your questions. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > the purpose of having broker-level > > > > > > > > > > > > > > > > transaction.two.phase.commit.enable > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > The thinking is that 2PC is a bit of an > > > > advanced > > > > > > > > > construct > > > > > > > > > > > so > > > > > > > > > > > > > > > enabling > > > > > > > > > > > > > > > > >> 2PC > > > > > > > > > > > > > > > > >> > in a Kafka cluster should be an explicit > > > > > decision. > > > > > > > If > > > > > > > > > it > > > > > > > > > > is > > > > > > > > > > > > set > > > > > > > > > > > > > > to > > > > > > > > > > > > > > > > >> 'false' > > > > > > > > > > > > > > > > >> > InitiProducerId (and initTransactions) > > would > > > > > > > > > > > > > > > > >> > return > > > TRANSACTIONAL_ID_AUTHORIZATION_FAILED. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > WDYT about adding an AdminClient > method > > > that > > > > > > > returns > > > > > > > > > the > > > > > > > > > > > > state > > > > > > > > > > > > > > of > > > > > > > > > > > > > > > > >> > transaction.two.phase.commit.enable > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > I wonder if the client could just try to > > use > > > > 2PC > > > > > > and > > > > > > > > > then > > > > > > > > > > > > handle > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > >> error > > > > > > > > > > > > > > > > >> > (e.g. if it needs to fall back to > ordinary > > > > > > > > > transactions). > > > > > > > > > > > > This > > > > > > > > > > > > > > way > > > > > > > > > > > > > > > it > > > > > > > > > > > > > > > > >> > could uniformly handle cases when Kafka > > > > cluster > > > > > > > > doesn't > > > > > > > > > > > > support > > > > > > > > > > > > > > 2PC > > > > > > > > > > > > > > > > >> > completely and cases when 2PC is > > restricted > > > to > > > > > > > certain > > > > > > > > > > > users. > > > > > > > > > > > > > We > > > > > > > > > > > > > > > > could > > > > > > > > > > > > > > > > >> > also expose this config in > > describeConfigs, > > > if > > > > > the > > > > > > > > > > fallback > > > > > > > > > > > > > > approach > > > > > > > > > > > > > > > > >> > doesn't work for some scenarios. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > -Artem > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > On Tue, Sep 5, 2023 at 12:45 PM > Alexander > > > > > > Sorokoumov > > > > > > > > > > > > > > > > >> > <asorokou...@confluent.io.invalid> > wrote: > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > Hi Artem, > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > Thanks for publishing this KIP! > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > Can you please clarify the purpose of > > > having > > > > > > > > > > broker-level > > > > > > > > > > > > > > > > >> > > transaction.two.phase.commit.enable > > config > > > > in > > > > > > > > addition > > > > > > > > > > to > > > > > > > > > > > > the > > > > > > > > > > > > > > new > > > > > > > > > > > > > > > > >> ACL? If > > > > > > > > > > > > > > > > >> > > the brokers are configured with > > > > > > > > > > > > > > > > >> > > transaction.two.phase.commit.enable=false, > > > > > > > > > > > > > > > > >> > > at what point will a client configured > > > with > > > > > > > > > > > > > > > > >> > > > transaction.two.phase.commit.enable=true > > > > fail? > > > > > > > Will > > > > > > > > it > > > > > > > > > > > > happen > > > > > > > > > > > > > at > > > > > > > > > > > > > > > > >> > > KafkaProducer#initTransactions? > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > WDYT about adding an AdminClient > method > > > that > > > > > > > returns > > > > > > > > > the > > > > > > > > > > > > state > > > > > > > > > > > > > > of > > > > > > > > > > > > > > > t > > > > > > > > > > > > > > > > >> > > ransaction.two.phase.commit.enable? > This > > > > way, > > > > > > > > clients > > > > > > > > > > > would > > > > > > > > > > > > > know > > > > > > > > > > > > > > > in > > > > > > > > > > > > > > > > >> > advance > > > > > > > > > > > > > > > > >> > > if 2PC is enabled on the brokers. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > Best, > > > > > > > > > > > > > > > > >> > > Alex > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > On Fri, Aug 25, 2023 at 9:40 AM Roger > > > > Hoover < > > > > > > > > > > > > > > > > roger.hoo...@gmail.com> > > > > > > > > > > > > > > > > >> > > wrote: > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > Other than supporting multiplexing > > > > > > transactional > > > > > > > > > > streams > > > > > > > > > > > > on > > > > > > > > > > > > > a > > > > > > > > > > > > > > > > single > > > > > > > > > > > > > > > > >> > > > producer, I don't see how to improve > > it. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > On Thu, Aug 24, 2023 at 12:12 PM > Artem > > > > > > Livshits > > > > > > > > > > > > > > > > >> > > > <alivsh...@confluent.io.invalid> > > wrote: > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > Hi Roger, > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > Thank you for summarizing the > > cons. I > > > > > agree > > > > > > > and > > > > > > > > > I'm > > > > > > > > > > > > > curious > > > > > > > > > > > > > > > > what > > > > > > > > > > > > > > > > >> > would > > > > > > > > > > > > > > > > >> > > > be > > > > > > > > > > > > > > > > >> > > > > the alternatives to solve these > > > problems > > > > > > > better > > > > > > > > > and > > > > > > > > > > if > > > > > > > > > > > > > they > > > > > > > > > > > > > > > can > > > > > > > > > > > > > > > > be > > > > > > > > > > > > > > > > >> > > > > incorporated into this proposal > (or > > > > built > > > > > > > > > > > independently > > > > > > > > > > > > in > > > > > > > > > > > > > > > > >> addition > > > > > > > > > > > > > > > > >> > to > > > > > > > > > > > > > > > > >> > > or > > > > > > > > > > > > > > > > >> > > > > on top of this proposal). E.g. > one > > > > > > potential > > > > > > > > > > > extension > > > > > > > > > > > > we > > > > > > > > > > > > > > > > >> discussed > > > > > > > > > > > > > > > > >> > > > > earlier in the thread could be > > > > > multiplexing > > > > > > > > > logical > > > > > > > > > > > > > > > > transactional > > > > > > > > > > > > > > > > >> > > > "streams" > > > > > > > > > > > > > > > > >> > > > > with a single producer. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > -Artem > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > On Wed, Aug 23, 2023 at 4:50 PM > > Roger > > > > > > Hoover < > > > > > > > > > > > > > > > > >> roger.hoo...@gmail.com > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > wrote: > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > > Thanks. I like that you're > moving > > > > Kafka > > > > > > > > toward > > > > > > > > > > > > > supporting > > > > > > > > > > > > > > > > this > > > > > > > > > > > > > > > > >> > > > > dual-write > > > > > > > > > > > > > > > > >> > > > > > pattern. Each use case needs to > > > > > consider > > > > > > > the > > > > > > > > > > > > tradeoffs. > > > > > > > > > > > > > > > You > > > > > > > > > > > > > > > > >> > already > > > > > > > > > > > > > > > > >> > > > > > summarized the pros very well in > > the > > > > > > KIP. I > > > > > > > > > would > > > > > > > > > > > > > > summarize > > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > >> > cons > > > > > > > > > > > > > > > > >> > > > > > as follows: > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > >> > > > > > - you sacrifice availability - > > each > > > > > write > > > > > > > > > requires > > > > > > > > > > > > both > > > > > > > > > > > > > DB > > > > > > > > > > > > > > > and > > > > > > > > > > > > > > > > >> > Kafka > > > > > > > > > > > > > > > > >> > > to > > > > > > > > > > > > > > > > >> > > > > be > > > > > > > > > > > > > > > > >> > > > > > available so I think your > overall > > > > > > > application > > > > > > > > > > > > > availability > > > > > > > > > > > > > > > is > > > > > > > > > > > > > > > > 1 > > > > > > > > > > > > > > > > >> - > > > > > > > > > > > > > > > > >> > > p(DB > > > > > > > > > > > > > > > > >> > > > is > > > > > > > > > > > > > > > > >> > > > > > unavailable)*p(Kafka is > > > unavailable). > > > > > > > > > > > > > > > > >> > > > > > - latency will be higher and > > > > throughput > > > > > > > lower > > > > > > > > - > > > > > > > > > > each > > > > > > > > > > > > > write > > > > > > > > > > > > > > > > >> requires > > > > > > > > > > > > > > > > >> > > > both > > > > > > > > > > > > > > > > >> > > > > > writes to DB and Kafka while > > holding > > > > an > > > > > > > > > exclusive > > > > > > > > > > > lock > > > > > > > > > > > > > in > > > > > > > > > > > > > > > DB. > > > > > > > > > > > > > > > > >> > > > > > - you need to create a producer > > per > > > > unit > > > > > > of > > > > > > > > > > > > concurrency > > > > > > > > > > > > > in > > > > > > > > > > > > > > > > your > > > > > > > > > > > > > > > > >> app > > > > > > > > > > > > > > > > >> > > > which > > > > > > > > > > > > > > > > >> > > > > > has some overhead in the app and > > > Kafka > > > > > > side > > > > > > > > > > (number > > > > > > > > > > > of > > > > > > > > > > > > > > > > >> connections, > > > > > > > > > > > > > > > > >> > > > poor > > > > > > > > > > > > > > > > >> > > > > > batching). I assume the > producers > > > > would > > > > > > > need > > > > > > > > to > > > > > > > > > > be > > > > > > > > > > > > > > > configured > > > > > > > > > > > > > > > > >> for > > > > > > > > > > > > > > > > >> > > low > > > > > > > > > > > > > > > > >> > > > > > latency (linger.ms=0) > > > > > > > > > > > > > > > > >> > > > > > - there's some complexity in > > > managing > > > > > > stable > > > > > > > > > > > > > transactional > > > > > > > > > > > > > > > ids > > > > > > > > > > > > > > > > >> for > > > > > > > > > > > > > > > > >> > > each > > > > > > > > > > > > > > > > >> > > > > > producer/concurrency unit in > your > > > > > > > application. > > > > > > > > > > With > > > > > > > > > > > > k8s > > > > > > > > > > > > > > > > >> > deployment, > > > > > > > > > > > > > > > > >> > > > you > > > > > > > > > > > > > > > > >> > > > > > may need to switch to something > > > like a > > > > > > > > > StatefulSet > > > > > > > > > > > > that > > > > > > > > > > > > > > > gives > > > > > > > > > > > > > > > > >> each > > > > > > > > > > > > > > > > >> > > pod > > > > > > > > > > > > > > > > >> > > > a > > > > > > > > > > > > > > > > >> > > > > > stable identity across restarts. > > On > > > > top > > > > > > of > > > > > > > > that > > > > > > > > > > pod > > > > > > > > > > > > > > > identity > > > > > > > > > > > > > > > > >> which > > > > > > > > > > > > > > > > >> > > you > > > > > > > > > > > > > > > > >> > > > > can > > > > > > > > > > > > > > > > >> > > > > > use as a prefix, you then assign > > > > unique > > > > > > > > > > > transactional > > > > > > > > > > > > > ids > > > > > > > > > > > > > > to > > > > > > > > > > > > > > > > >> each > > > > > > > > > > > > > > > > >> > > > > > concurrency unit > > (thread/goroutine). > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > >> > > > > > On Wed, Aug 23, 2023 at 12:53 PM > > > Artem > > > > > > > > Livshits > > > > > > > > > > > > > > > > >> > > > > > <alivsh...@confluent.io > .invalid> > > > > wrote: > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > >> > > > > > > Hi Roger, > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > Thank you for the feedback. > You > > > > make > > > > > a > > > > > > > very > > > > > > > > > > good > > > > > > > > > > > > > point > > > > > > > > > > > > > > > that > > > > > > > > > > > > > > > > >> we > > > > > > > > > > > > > > > > >> > > also > > > > > > > > > > > > > > > > >> > > > > > > discussed internally. Adding > > > > support > > > > > > for > > > > > > > > > > multiple > > > > > > > > > > > > > > > > concurrent > > > > > > > > > > > > > > > > >> > > > > > > transactions in one producer > > could > > > > be > > > > > > > > valuable > > > > > > > > > > but > > > > > > > > > > > > it > > > > > > > > > > > > > > > seems > > > > > > > > > > > > > > > > to > > > > > > > > > > > > > > > > >> > be a > > > > > > > > > > > > > > > > >> > > > > > fairly > > > > > > > > > > > > > > > > >> > > > > > > large and independent change > > that > > > > > would > > > > > > > > > deserve > > > > > > > > > > a > > > > > > > > > > > > > > separate > > > > > > > > > > > > > > > > >> KIP. > > > > > > > > > > > > > > > > >> > If > > > > > > > > > > > > > > > > >> > > > > such > > > > > > > > > > > > > > > > >> > > > > > > support is added we could > modify > > > 2PC > > > > > > > > > > functionality > > > > > > > > > > > > to > > > > > > > > > > > > > > > > >> incorporate > > > > > > > > > > > > > > > > >> > > > that. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > Maybe not too bad but a bit > of > > > > pain > > > > > to > > > > > > > > > manage > > > > > > > > > > > > these > > > > > > > > > > > > > > ids > > > > > > > > > > > > > > > > >> inside > > > > > > > > > > > > > > > > >> > > each > > > > > > > > > > > > > > > > >> > > > > > > process and across all > > application > > > > > > > > processes. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > I'm not sure if supporting > > > multiple > > > > > > > > > transactions > > > > > > > > > > > in > > > > > > > > > > > > > one > > > > > > > > > > > > > > > > >> producer > > > > > > > > > > > > > > > > >> > > > would > > > > > > > > > > > > > > > > >> > > > > > make > > > > > > > > > > > > > > > > >> > > > > > > id management simpler: we'd > need > > > to > > > > > > store > > > > > > > a > > > > > > > > > > piece > > > > > > > > > > > of > > > > > > > > > > > > > > data > > > > > > > > > > > > > > > > per > > > > > > > > > > > > > > > > >> > > > > > transaction, > > > > > > > > > > > > > > > > >> > > > > > > so whether it's N producers > > with a > > > > > > single > > > > > > > > > > > > transaction > > > > > > > > > > > > > > or N > > > > > > > > > > > > > > > > >> > > > transactions > > > > > > > > > > > > > > > > >> > > > > > > with a single producer, it's > > still > > > > > > roughly > > > > > > > > the > > > > > > > > > > > same > > > > > > > > > > > > > > amount > > > > > > > > > > > > > > > > of > > > > > > > > > > > > > > > > >> > data > > > > > > > > > > > > > > > > >> > > to > > > > > > > > > > > > > > > > >> > > > > > > manage. In fact, managing > > > > > transactional > > > > > > > ids > > > > > > > > > > > > (current > > > > > > > > > > > > > > > > >> proposal) > > > > > > > > > > > > > > > > >> > > might > > > > > > > > > > > > > > > > >> > > > > be > > > > > > > > > > > > > > > > >> > > > > > > easier, because the id is > > > controlled > > > > > by > > > > > > > the > > > > > > > > > > > > > application > > > > > > > > > > > > > > > and > > > > > > > > > > > > > > > > it > > > > > > > > > > > > > > > > >> > > knows > > > > > > > > > > > > > > > > >> > > > > how > > > > > > > > > > > > > > > > >> > > > > > to > > > > > > > > > > > > > > > > >> > > > > > > complete the transaction after > > > > crash / > > > > > > > > > restart; > > > > > > > > > > > > while > > > > > > > > > > > > > a > > > > > > > > > > > > > > > TID > > > > > > > > > > > > > > > > >> would > > > > > > > > > > > > > > > > >> > > be > > > > > > > > > > > > > > > > >> > > > > > > generated by Kafka and that > > would > > > > > > create a > > > > > > > > > > > question > > > > > > > > > > > > of > > > > > > > > > > > > > > > > >> starting > > > > > > > > > > > > > > > > >> > > Kafka > > > > > > > > > > > > > > > > >> > > > > > > transaction, but not saving > its > > > TID > > > > > and > > > > > > > then > > > > > > > > > > > > crashing, > > > > > > > > > > > > > > > then > > > > > > > > > > > > > > > > >> > > figuring > > > > > > > > > > > > > > > > >> > > > > out > > > > > > > > > > > > > > > > >> > > > > > > which transactions to abort > and > > > etc. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > 2) creating a separate > > producer > > > > for > > > > > > each > > > > > > > > > > > > concurrency > > > > > > > > > > > > > > > slot > > > > > > > > > > > > > > > > in > > > > > > > > > > > > > > > > >> > the > > > > > > > > > > > > > > > > >> > > > > > > application > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > This is a very valid concern. > > > Maybe > > > > > > we'd > > > > > > > > need > > > > > > > > > > to > > > > > > > > > > > > have > > > > > > > > > > > > > > > some > > > > > > > > > > > > > > > > >> > > > > multiplexing > > > > > > > > > > > > > > > > >> > > > > > of > > > > > > > > > > > > > > > > >> > > > > > > transactional logical > "streams" > > > over > > > > > the > > > > > > > > same > > > > > > > > > > > > > > connection. > > > > > > > > > > > > > > > > >> Seems > > > > > > > > > > > > > > > > >> > > > like a > > > > > > > > > > > > > > > > >> > > > > > > separate KIP, though. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > Otherwise, it seems you're > > left > > > > with > > > > > > > > > > > > single-threaded > > > > > > > > > > > > > > > model > > > > > > > > > > > > > > > > >> per > > > > > > > > > > > > > > > > >> > > > > > > application process? > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > That's a fair assessment. Not > > > > > > necessarily > > > > > > > > > > exactly > > > > > > > > > > > > > > > > >> > single-threaded > > > > > > > > > > > > > > > > >> > > > per > > > > > > > > > > > > > > > > >> > > > > > > application, but a single > > producer > > > > per > > > > > > > > thread > > > > > > > > > > > model > > > > > > > > > > > > > > (i.e. > > > > > > > > > > > > > > > an > > > > > > > > > > > > > > > > >> > > > > application > > > > > > > > > > > > > > > > >> > > > > > > could have a pool of threads + > > > > > producers > > > > > > > to > > > > > > > > > > > increase > > > > > > > > > > > > > > > > >> > concurrency). > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > -Artem > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > On Tue, Aug 22, 2023 at > 7:22 PM > > > > Roger > > > > > > > > Hoover < > > > > > > > > > > > > > > > > >> > > roger.hoo...@gmail.com > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > wrote: > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > Artem, > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > Thanks for the reply. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > If I understand correctly, > > Kafka > > > > > does > > > > > > > not > > > > > > > > > > > support > > > > > > > > > > > > > > > > concurrent > > > > > > > > > > > > > > > > >> > > > > > transactions > > > > > > > > > > > > > > > > >> > > > > > > > from the same producer > > > > > (transactional > > > > > > > id). > > > > > > > > > I > > > > > > > > > > > > think > > > > > > > > > > > > > > this > > > > > > > > > > > > > > > > >> means > > > > > > > > > > > > > > > > >> > > that > > > > > > > > > > > > > > > > >> > > > > > > > applications that want to > > > support > > > > > > > > in-process > > > > > > > > > > > > > > concurrency > > > > > > > > > > > > > > > > >> (say > > > > > > > > > > > > > > > > >> > > > > > > thread-level > > > > > > > > > > > > > > > > >> > > > > > > > concurrency with row-level > DB > > > > > locking) > > > > > > > > would > > > > > > > > > > > need > > > > > > > > > > > > to > > > > > > > > > > > > > > > > manage > > > > > > > > > > > > > > > > >> > > > separate > > > > > > > > > > > > > > > > >> > > > > > > > transactional ids and > > producers > > > > per > > > > > > > thread > > > > > > > > > and > > > > > > > > > > > > then > > > > > > > > > > > > > > > store > > > > > > > > > > > > > > > > >> txn > > > > > > > > > > > > > > > > >> > > state > > > > > > > > > > > > > > > > >> > > > > > > > accordingly. The potential > > > > > usability > > > > > > > > > > > downsides I > > > > > > > > > > > > > see > > > > > > > > > > > > > > > are > > > > > > > > > > > > > > > > >> > > > > > > > 1) managing a set of > > > transactional > > > > > ids > > > > > > > for > > > > > > > > > > each > > > > > > > > > > > > > > > > application > > > > > > > > > > > > > > > > >> > > process > > > > > > > > > > > > > > > > >> > > > > > that > > > > > > > > > > > > > > > > >> > > > > > > > scales up to it's max > > > concurrency. > > > > > > > Maybe > > > > > > > > > not > > > > > > > > > > > too > > > > > > > > > > > > > bad > > > > > > > > > > > > > > > but > > > > > > > > > > > > > > > > a > > > > > > > > > > > > > > > > >> bit > > > > > > > > > > > > > > > > >> > > of > > > > > > > > > > > > > > > > >> > > > > pain > > > > > > > > > > > > > > > > >> > > > > > > to > > > > > > > > > > > > > > > > >> > > > > > > > manage these ids inside each > > > > process > > > > > > and > > > > > > > > > > across > > > > > > > > > > > > all > > > > > > > > > > > > > > > > >> application > > > > > > > > > > > > > > > > >> > > > > > > processes. > > > > > > > > > > > > > > > > >> > > > > > > > 2) creating a separate > > producer > > > > for > > > > > > each > > > > > > > > > > > > concurrency > > > > > > > > > > > > > > > slot > > > > > > > > > > > > > > > > in > > > > > > > > > > > > > > > > >> > the > > > > > > > > > > > > > > > > >> > > > > > > > application - this could > > create > > > a > > > > > lot > > > > > > > more > > > > > > > > > > > > producers > > > > > > > > > > > > > > and > > > > > > > > > > > > > > > > >> > > resultant > > > > > > > > > > > > > > > > >> > > > > > > > connections to Kafka than > the > > > > > typical > > > > > > > > model > > > > > > > > > > of a > > > > > > > > > > > > > > single > > > > > > > > > > > > > > > > >> > producer > > > > > > > > > > > > > > > > >> > > > per > > > > > > > > > > > > > > > > >> > > > > > > > process. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > Otherwise, it seems you're > > left > > > > with > > > > > > > > > > > > single-threaded > > > > > > > > > > > > > > > model > > > > > > > > > > > > > > > > >> per > > > > > > > > > > > > > > > > >> > > > > > > application > > > > > > > > > > > > > > > > >> > > > > > > > process? > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > Thanks, > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > Roger > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > On Tue, Aug 22, 2023 at > > 5:11 PM > > > > > Artem > > > > > > > > > Livshits > > > > > > > > > > > > > > > > >> > > > > > > > <alivsh...@confluent.io > > > .invalid> > > > > > > wrote: > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > Hi Roger, Arjun, > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > Thank you for the > questions. > > > > > > > > > > > > > > > > >> > > > > > > > > > It looks like the > > > application > > > > > must > > > > > > > > have > > > > > > > > > > > stable > > > > > > > > > > > > > > > > >> > transactional > > > > > > > > > > > > > > > > >> > > > ids > > > > > > > > > > > > > > > > >> > > > > > over > > > > > > > > > > > > > > > > >> > > > > > > > > time? > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > The transactional id > should > > > > > uniquely > > > > > > > > > > identify > > > > > > > > > > > a > > > > > > > > > > > > > > > producer > > > > > > > > > > > > > > > > >> > > instance > > > > > > > > > > > > > > > > >> > > > > and > > > > > > > > > > > > > > > > >> > > > > > > > needs > > > > > > > > > > > > > > > > >> > > > > > > > > to be stable across the > > > > restarts. > > > > > > If > > > > > > > > the > > > > > > > > > > > > > > > transactional > > > > > > > > > > > > > > > > >> id is > > > > > > > > > > > > > > > > >> > > not > > > > > > > > > > > > > > > > >> > > > > > > stable > > > > > > > > > > > > > > > > >> > > > > > > > > across restarts, then > zombie > > > > > > messages > > > > > > > > > from a > > > > > > > > > > > > > > previous > > > > > > > > > > > > > > > > >> > > incarnation > > > > > > > > > > > > > > > > >> > > > > of > > > > > > > > > > > > > > > > >> > > > > > > the > > > > > > > > > > > > > > > > >> > > > > > > > > producer may violate > > > atomicity. > > > > > If > > > > > > > > there > > > > > > > > > > are > > > > > > > > > > > 2 > > > > > > > > > > > > > > > producer > > > > > > > > > > > > > > > > >> > > > instances > > > > > > > > > > > > > > > > >> > > > > > > > > concurrently producing > data > > > with > > > > > the > > > > > > > > same > > > > > > > > > > > > > > > transactional > > > > > > > > > > > > > > > > >> id, > > > > > > > > > > > > > > > > >> > > they > > > > > > > > > > > > > > > > >> > > > > are > > > > > > > > > > > > > > > > >> > > > > > > > going > > > > > > > > > > > > > > > > >> > > > > > > > > to constantly fence each > > other > > > > and > > > > > > > most > > > > > > > > > > likely > > > > > > > > > > > > > make > > > > > > > > > > > > > > > > >> little or > > > > > > > > > > > > > > > > >> > > no > > > > > > > > > > > > > > > > >> > > > > > > > progress. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > The name might be a little > > bit > > > > > > > confusing > > > > > > > > > as > > > > > > > > > > it > > > > > > > > > > > > may > > > > > > > > > > > > > > be > > > > > > > > > > > > > > > > >> > mistaken > > > > > > > > > > > > > > > > >> > > > for > > > > > > > > > > > > > > > > >> > > > > a > > > > > > > > > > > > > > > > >> > > > > > > > > transaction id / TID that > > > > uniquely > > > > > > > > > > identifies > > > > > > > > > > > > > every > > > > > > > > > > > > > > > > >> > > transaction. > > > > > > > > > > > > > > > > >> > > > > The > > > > > > > > > > > > > > > > >> > > > > > > > name > > > > > > > > > > > > > > > > >> > > > > > > > > and the semantics were > > defined > > > > in > > > > > > the > > > > > > > > > > original > > > > > > > > > > > > > > > > >> > > > > exactly-once-semantics > > > > > > > > > > > > > > > > >> > > > > > > > (EoS) > > > > > > > > > > > > > > > > >> > > > > > > > > proposal ( > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging > > > > > > > > > > > > > > > > >> > > > > > > > > ) > > > > > > > > > > > > > > > > >> > > > > > > > > and KIP-939 just build on > > top > > > of > > > > > > that. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > I'm curious to > understand > > > what > > > > > > > happens > > > > > > > > > if > > > > > > > > > > > the > > > > > > > > > > > > > > > producer > > > > > > > > > > > > > > > > >> > dies, > > > > > > > > > > > > > > > > >> > > > and > > > > > > > > > > > > > > > > >> > > > > > does > > > > > > > > > > > > > > > > >> > > > > > > > not > > > > > > > > > > > > > > > > >> > > > > > > > > come up and recover the > > > pending > > > > > > > > > transaction > > > > > > > > > > > > within > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > >> > > > transaction > > > > > > > > > > > > > > > > >> > > > > > > > timeout > > > > > > > > > > > > > > > > >> > > > > > > > > interval. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > If the producer / > > application > > > > > never > > > > > > > > comes > > > > > > > > > > > back, > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > >> > transaction > > > > > > > > > > > > > > > > >> > > > > will > > > > > > > > > > > > > > > > >> > > > > > > > remain > > > > > > > > > > > > > > > > >> > > > > > > > > in prepared (a.k.a. > > > "in-doubt") > > > > > > state > > > > > > > > > until > > > > > > > > > > an > > > > > > > > > > > > > > > operator > > > > > > > > > > > > > > > > >> > > > forcefully > > > > > > > > > > > > > > > > >> > > > > > > > > terminates the > transaction. > > > > > That's > > > > > > > why > > > > > > > > > > there > > > > > > > > > > > > is a > > > > > > > > > > > > > > new > > > > > > > > > > > > > > > > >> ACL is > > > > > > > > > > > > > > > > >> > > > > defined > > > > > > > > > > > > > > > > >> > > > > > > in > > > > > > > > > > > > > > > > >> > > > > > > > > this proposal -- this > > > > > functionality > > > > > > > > should > > > > > > > > > > > only > > > > > > > > > > > > > > > provided > > > > > > > > > > > > > > > > >> to > > > > > > > > > > > > > > > > >> > > > > > > applications > > > > > > > > > > > > > > > > >> > > > > > > > > that implement proper > > recovery > > > > > > logic. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > -Artem > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > On Tue, Aug 22, 2023 at > > > 12:52 AM > > > > > > Arjun > > > > > > > > > > Satish > > > > > > > > > > > < > > > > > > > > > > > > > > > > >> > > > > > arjun.sat...@gmail.com> > > > > > > > > > > > > > > > > >> > > > > > > > > wrote: > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > Hello Artem, > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > Thanks for the KIP. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > I have the same question > > as > > > > > Roger > > > > > > on > > > > > > > > > > > > concurrent > > > > > > > > > > > > > > > > writes, > > > > > > > > > > > > > > > > >> and > > > > > > > > > > > > > > > > >> > > an > > > > > > > > > > > > > > > > >> > > > > > > > additional > > > > > > > > > > > > > > > > >> > > > > > > > > > one on consumer > behavior. > > > > > > Typically, > > > > > > > > > > > > > transactions > > > > > > > > > > > > > > > will > > > > > > > > > > > > > > > > >> > > timeout > > > > > > > > > > > > > > > > >> > > > if > > > > > > > > > > > > > > > > >> > > > > > not > > > > > > > > > > > > > > > > >> > > > > > > > > > committed within some > time > > > > > > interval. > > > > > > > > > With > > > > > > > > > > > the > > > > > > > > > > > > > > > proposed > > > > > > > > > > > > > > > > >> > > changes > > > > > > > > > > > > > > > > >> > > > in > > > > > > > > > > > > > > > > >> > > > > > > this > > > > > > > > > > > > > > > > >> > > > > > > > > KIP, > > > > > > > > > > > > > > > > >> > > > > > > > > > consumers cannot consume > > > past > > > > > the > > > > > > > > > ongoing > > > > > > > > > > > > > > > transaction. > > > > > > > > > > > > > > > > >> I'm > > > > > > > > > > > > > > > > >> > > > > curious > > > > > > > > > > > > > > > > >> > > > > > to > > > > > > > > > > > > > > > > >> > > > > > > > > > understand what happens > if > > > the > > > > > > > > producer > > > > > > > > > > > dies, > > > > > > > > > > > > > and > > > > > > > > > > > > > > > does > > > > > > > > > > > > > > > > >> not > > > > > > > > > > > > > > > > >> > > come > > > > > > > > > > > > > > > > >> > > > > up > > > > > > > > > > > > > > > > >> > > > > > > and > > > > > > > > > > > > > > > > >> > > > > > > > > > recover the pending > > > > transaction > > > > > > > within > > > > > > > > > the > > > > > > > > > > > > > > > transaction > > > > > > > > > > > > > > > > >> > > timeout > > > > > > > > > > > > > > > > >> > > > > > > > interval. > > > > > > > > > > > > > > > > >> > > > > > > > > Or > > > > > > > > > > > > > > > > >> > > > > > > > > > are we saying that when > > used > > > > in > > > > > > this > > > > > > > > 2PC > > > > > > > > > > > > > context, > > > > > > > > > > > > > > we > > > > > > > > > > > > > > > > >> should > > > > > > > > > > > > > > > > >> > > > > > configure > > > > > > > > > > > > > > > > >> > > > > > > > > these > > > > > > > > > > > > > > > > >> > > > > > > > > > transaction timeouts to > > very > > > > > large > > > > > > > > > > > durations? > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > Thanks in advance! > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > Best, > > > > > > > > > > > > > > > > >> > > > > > > > > > Arjun > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > On Mon, Aug 21, 2023 at > > > > 1:06 PM > > > > > > > Roger > > > > > > > > > > > Hoover < > > > > > > > > > > > > > > > > >> > > > > > roger.hoo...@gmail.com > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > wrote: > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > Hi Artem, > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > Thanks for writing > this > > > KIP. > > > > > > Can > > > > > > > > you > > > > > > > > > > > > clarify > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > >> > > > requirements > > > > > > > > > > > > > > > > >> > > > > a > > > > > > > > > > > > > > > > >> > > > > > > bit > > > > > > > > > > > > > > > > >> > > > > > > > > more > > > > > > > > > > > > > > > > >> > > > > > > > > > > for managing > transaction > > > > > state? > > > > > > > It > > > > > > > > > > looks > > > > > > > > > > > > like > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > >> > > > application > > > > > > > > > > > > > > > > >> > > > > > must > > > > > > > > > > > > > > > > >> > > > > > > > > have > > > > > > > > > > > > > > > > >> > > > > > > > > > > stable transactional > ids > > > > over > > > > > > > time? > > > > > > > > > > What > > > > > > > > > > > > is > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > >> > > granularity > > > > > > > > > > > > > > > > >> > > > > of > > > > > > > > > > > > > > > > >> > > > > > > > those > > > > > > > > > > > > > > > > >> > > > > > > > > > ids > > > > > > > > > > > > > > > > >> > > > > > > > > > > and producers? Say > the > > > > > > > application > > > > > > > > > is a > > > > > > > > > > > > > > > > >> multi-threaded > > > > > > > > > > > > > > > > >> > > Java > > > > > > > > > > > > > > > > >> > > > > web > > > > > > > > > > > > > > > > >> > > > > > > > > server, > > > > > > > > > > > > > > > > >> > > > > > > > > > > can/should all the > > > > concurrent > > > > > > > > threads > > > > > > > > > > > share > > > > > > > > > > > > a > > > > > > > > > > > > > > > > >> > transactional > > > > > > > > > > > > > > > > >> > > > id > > > > > > > > > > > > > > > > >> > > > > > and > > > > > > > > > > > > > > > > >> > > > > > > > > > > producer? That > doesn't > > > seem > > > > > > right > > > > > > > > to > > > > > > > > > me > > > > > > > > > > > > > unless > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > >> > > > application > > > > > > > > > > > > > > > > >> > > > > > is > > > > > > > > > > > > > > > > >> > > > > > > > > using > > > > > > > > > > > > > > > > >> > > > > > > > > > > global DB locks that > > > > serialize > > > > > > all > > > > > > > > > > > requests. > > > > > > > > > > > > > > > > >> Instead, if > > > > > > > > > > > > > > > > >> > > the > > > > > > > > > > > > > > > > >> > > > > > > > > application > > > > > > > > > > > > > > > > >> > > > > > > > > > > uses row-level DB > locks, > > > > there > > > > > > > could > > > > > > > > > be > > > > > > > > > > > > > > multiple, > > > > > > > > > > > > > > > > >> > > concurrent, > > > > > > > > > > > > > > > > >> > > > > > > > > independent > > > > > > > > > > > > > > > > >> > > > > > > > > > > txns happening in the > > same > > > > JVM > > > > > > so > > > > > > > it > > > > > > > > > > seems > > > > > > > > > > > > > like > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > >> > > > granularity > > > > > > > > > > > > > > > > >> > > > > > > > > managing > > > > > > > > > > > > > > > > >> > > > > > > > > > > transactional ids and > > txn > > > > > state > > > > > > > > needs > > > > > > > > > to > > > > > > > > > > > > line > > > > > > > > > > > > > up > > > > > > > > > > > > > > > > with > > > > > > > > > > > > > > > > >> > > > > granularity > > > > > > > > > > > > > > > > >> > > > > > > of > > > > > > > > > > > > > > > > >> > > > > > > > > the > > > > > > > > > > > > > > > > >> > > > > > > > > > DB > > > > > > > > > > > > > > > > >> > > > > > > > > > > locking. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > Does that make sense > or > > > am I > > > > > > > > > > > > misunderstanding? > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > Roger > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > On Wed, Aug 16, 2023 > at > > > > > 11:40 PM > > > > > > > > Artem > > > > > > > > > > > > > Livshits > > > > > > > > > > > > > > > > >> > > > > > > > > > > < > alivsh...@confluent.io > > > > > > .invalid> > > > > > > > > > wrote: > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > Hello, > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > This is a discussion > > > > thread > > > > > > for > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-939%3A+Support+Participation+in+2PC > > > > > > > > > > > > > > > > >> > > > > > > > > > > > . > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > The KIP proposes > > > extending > > > > > > Kafka > > > > > > > > > > > > transaction > > > > > > > > > > > > > > > > support > > > > > > > > > > > > > > > > >> > > (that > > > > > > > > > > > > > > > > >> > > > > > > already > > > > > > > > > > > > > > > > >> > > > > > > > > uses > > > > > > > > > > > > > > > > >> > > > > > > > > > > 2PC > > > > > > > > > > > > > > > > >> > > > > > > > > > > > under the hood) to > > > enable > > > > > > > > atomicity > > > > > > > > > of > > > > > > > > > > > > dual > > > > > > > > > > > > > > > writes > > > > > > > > > > > > > > > > >> to > > > > > > > > > > > > > > > > >> > > Kafka > > > > > > > > > > > > > > > > >> > > > > and > > > > > > > > > > > > > > > > >> > > > > > > an > > > > > > > > > > > > > > > > >> > > > > > > > > > > external > > > > > > > > > > > > > > > > >> > > > > > > > > > > > database, and helps > to > > > > fix a > > > > > > > long > > > > > > > > > > > standing > > > > > > > > > > > > > > Flink > > > > > > > > > > > > > > > > >> issue. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > An example of code > > that > > > > uses > > > > > > the > > > > > > > > > dual > > > > > > > > > > > > write > > > > > > > > > > > > > > > recipe > > > > > > > > > > > > > > > > >> with > > > > > > > > > > > > > > > > >> > > > JDBC > > > > > > > > > > > > > > > > >> > > > > > and > > > > > > > > > > > > > > > > >> > > > > > > > > should > > > > > > > > > > > > > > > > >> > > > > > > > > > > > work for most SQL > > > > databases > > > > > is > > > > > > > > here > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/kafka/pull/14231. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > The FLIP for the > > sister > > > > fix > > > > > in > > > > > > > > Flink > > > > > > > > > > is > > > > > > > > > > > > here > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255071710 > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > -Artem > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >