Hi Jun, > Then, should we change the following in the example to use InitProducerId(true) instead?
We could. I just thought that it's good to make the example self-contained by starting from a clean state. > Also, could Flink just follow the dual-write recipe? I think it would bring some unnecessary logic to Flink (or any other system that already has a transaction coordinator and just wants to drive Kafka to the desired state). We could discuss it with Flink folks, the current proposal was developed in collaboration with them. > 21. Could a non 2pc user explicitly set the TransactionTimeoutMs to Integer.MAX_VALUE? The server would reject this for regular transactions, it only accepts values that are <= *transaction.max.timeout.ms <http://transaction.max.timeout.ms> *(a broker config). > 24. Hmm, In KIP-890, without 2pc, the coordinator expects the endTxn request to use the ongoing pid. ... Without 2PC there is no case where the pid could change between starting a transaction and endTxn (InitProducerId would abort any ongoing transaction). WIth 2PC there is now a case where there could be InitProducerId that can change the pid without aborting the transaction, so we need to handle that. I wouldn't say that the flow is different, but it's rather extended to handle new cases. The main principle is still the same -- for all operations we use the latest "operational" pid and epoch known to the client, this way we guarantee that we can fence zombie / split brain clients by disrupting the "latest known" pid + epoch progression. > 25. "We send out markers using the original ongoing transaction ProducerId and ProducerEpoch" ... Updated. -Artem On Mon, Jan 29, 2024 at 4:57 PM Jun Rao <j...@confluent.io.invalid> wrote: > Hi, Artem, > > Thanks for the reply. > > 20. So for the dual-write recipe, we should always call > InitProducerId(keepPreparedTxn=true) from the producer? Then, should we > change the following in the example to use InitProducerId(true) instead? > 1. InitProducerId(false); TC STATE: Empty, ProducerId=42, > ProducerEpoch=MAX-1, PrevProducerId=-1, NextProducerId=-1, > NextProducerEpoch=-1; RESPONSE ProducerId=42, Epoch=MAX-1, > OngoingTxnProducerId=-1, OngoingTxnEpoch=-1. > Also, could Flink just follow the dual-write recipe? It's simpler if there > is one way to solve the 2pc issue. > > 21. Could a non 2pc user explicitly set the TransactionTimeoutMs to > Integer.MAX_VALUE? > > 24. Hmm, In KIP-890, without 2pc, the coordinator expects the endTxn > request to use the ongoing pid. With 2pc, the coordinator now expects the > endTxn request to use the next pid. So, the flow is different, right? > > 25. "We send out markers using the original ongoing transaction ProducerId > and ProducerEpoch" > We should use ProducerEpoch + 1 in the marker, right? > > Jun > > On Fri, Jan 26, 2024 at 8:35 PM Artem Livshits > <alivsh...@confluent.io.invalid> wrote: > > > Hi Jun, > > > > > 20. I am a bit confused by how we set keepPreparedTxn. ... > > > > keepPreparedTxn=true informs the transaction coordinator that it should > > keep the ongoing transaction, if any. If the keepPreparedTxn=false, then > > any ongoing transaction is aborted (this is exactly the current > behavior). > > enable2Pc is a separate argument that is controlled by the > > *transaction.two.phase.commit.enable *setting on the client. > > > > To start 2PC, the client just needs to set > > *transaction.two.phase.commit.enable*=true in the config. Then if the > > client knows the status of the transaction upfront (in the case of Flink, > > Flink keeps the knowledge if the transaction is prepared in its own > store, > > so it always knows upfront), it can set keepPreparedTxn accordingly, then > > if the transaction was prepared, it'll be ready for the client to > complete > > the appropriate action; if the client doesn't have a knowledge that the > > transaction is prepared, keepPreparedTxn is going to be false, in which > > case we'll get to a clean state (the same way we do today). > > > > For the dual-write recipe, the client doesn't know upfront if the > > transaction is prepared, this information is implicitly encoded > > PreparedTxnState value that can be used to resolve the transaction state. > > In that case, keepPreparedTxn should always be true, because we don't > know > > upfront and we don't want to accidentally abort a committed transaction. > > > > The forceTerminateTransaction call can just use keepPreparedTxn=false, it > > actually doesn't matter if it sets Enable2Pc flag. > > > > > 21. TransactionLogValue: Do we need some field to identify whether this > > is written for 2PC so that ongoing txn is never auto aborted? > > > > The TransactionTimeoutMs would be set to Integer.MAX_VALUE if 2PC was > > enabled. I've added a note to the KIP about this. > > > > > 22 > > > > You're right it's a typo. I fixed it as well as step 9 (REQUEST: > > ProducerId=73, ProducerEpoch=MAX). > > > > > 23. It's a bit weird that Enable2Pc is driven by a config while > > KeepPreparedTxn is from an API param ... > > > > The intent to use 2PC doesn't change from transaction to transaction, but > > the intent to keep prepared txn may change from transaction to > > transaction. In dual-write recipes the distinction is not clear, but for > > use cases where keepPreparedTxn value is known upfront (e.g. Flink) it's > > more prominent. E.g. a Flink's Kafka sink operator could be deployed > with > > *transaction.two.phase.commit.enable*=true hardcoded in the image, but > > keepPreparedTxn cannot be hardcoded in the image, because it depends on > the > > job manager's state. > > > > > 24 > > > > The flow is actually going to be the same way as it is now -- the "main" > > producer id + epoch needs to be used in all operations to prevent fencing > > (it's sort of a common "header" in all RPC calls that follow the same > > rules). The ongoing txn info is just additional info for making a > commit / > > abort decision based on the PreparedTxnState from the DB. > > > > --Artem > > > > On Thu, Jan 25, 2024 at 11:05 AM Jun Rao <j...@confluent.io.invalid> > wrote: > > > > > Hi, Artem, > > > > > > Thanks for the reply. A few more comments. > > > > > > 20. I am a bit confused by how we set keepPreparedTxn. From the KIP, I > > got > > > the following (1) to start 2pc, we call > > > InitProducerId(keepPreparedTxn=false); (2) when the producer fails and > > > needs to do recovery, it calls InitProducerId(keepPreparedTxn=true); > (3) > > > Admin.forceTerminateTransaction() calls > > > InitProducerId(keepPreparedTxn=false). > > > 20.1 In (1), when a producer calls InitProducerId(false) with 2pc > > enabled, > > > and there is an ongoing txn, should the server return an error to the > > > InitProducerId request? If so, what would be the error code? > > > 20.2 How do we distinguish between (1) and (3)? It's the same API call > > but > > > (1) doesn't abort ongoing txn and (2) does. > > > 20.3 The usage in (1) seems unintuitive. 2pc implies keeping the > ongoing > > > txn. So, setting keepPreparedTxn to false to start 2pc seems counter > > > intuitive. > > > > > > 21. TransactionLogValue: Do we need some field to identify whether this > > is > > > written for 2PC so that ongoing txn is never auto aborted? > > > > > > 22. "8. InitProducerId(true); TC STATE: Ongoing, ProducerId=42, > > > ProducerEpoch=MAX-1, PrevProducerId=-1, NextProducerId=73, > > > NextProducerEpoch=MAX; RESPONSE ProducerId=73, Epoch=MAX-1, > > > OngoingTxnProducerId=42, OngoingTxnEpoch=MAX-1" > > > It seems in the above example, Epoch in RESPONSE should be MAX to match > > > NextProducerEpoch? > > > > > > 23. It's a bit weird that Enable2Pc is driven by a config > > > while KeepPreparedTxn is from an API param. Should we make them more > > > consistent since they seem related? > > > > > > 24. "9. Commit; REQUEST: ProducerId=73, ProducerEpoch=MAX-1; TC STATE: > > > PrepareCommit, ProducerId=42, ProducerEpoch=MAX, PrevProducerId=73, > > > NextProducerId=85, NextProducerEpoch=0; RESPONSE ProducerId=85, > Epoch=0, > > > When a commit request is sent, it uses the latest ProducerId and > > > ProducerEpoch." > > > The step where we use the next produceId to commit an old txn works, > but > > > can be confusing. It's going to be hard for people implementing this > new > > > client protocol to figure out when to use the current or the new > > producerId > > > in the EndTxnRequest. One potential way to improve this is to extend > > > EndTxnRequest with a new field like expectedNextProducerId. Then we can > > > always use the old produceId in the existing field, but set > > > expectedNextProducerId to bypass the fencing logic when needed. > > > > > > Thanks, > > > > > > Jun > > > > > > > > > > > > On Mon, Dec 18, 2023 at 2:06 PM Artem Livshits > > > <alivsh...@confluent.io.invalid> wrote: > > > > > > > Hi Jun, > > > > > > > > Thank you for the comments. > > > > > > > > > 10. For the two new fields in Enable2Pc and KeepPreparedTxn ... > > > > > > > > I added a note that all combinations are valid. Enable2Pc=false & > > > > KeepPreparedTxn=true could be potentially useful for backward > > > compatibility > > > > with Flink, when the new version of Flink that implements KIP-319 > tries > > > to > > > > work with a cluster that doesn't authorize 2PC. > > > > > > > > > 11. InitProducerIdResponse: If there is no ongoing txn, what will > > > > OngoingTxnProducerId and OngoingTxnEpoch be set? > > > > > > > > I added a note that they will be set to -1. The client then will > know > > > that > > > > there is no ongoing txn and .completeTransaction becomes a no-op (but > > > still > > > > required before .send is enabled). > > > > > > > > > 12. ListTransactionsRequest related changes: It seems those are > > already > > > > covered by KIP-994? > > > > > > > > Removed from this KIP. > > > > > > > > > 13. TransactionalLogValue ... > > > > > > > > This is now updated to work on top of KIP-890. > > > > > > > > > 14. "Note that the (producerId, epoch) pair that corresponds to the > > > > ongoing transaction ... > > > > > > > > This is now updated to work on top of KIP-890. > > > > > > > > > 15. active-transaction-total-time-max : ... > > > > > > > > Updated. > > > > > > > > > 16. "transaction.two.phase.commit.enable The default would be > > ‘false’. > > > > If it’s ‘false’, 2PC functionality is disabled even if the ACL is set > > ... > > > > > > > > Disabling 2PC effectively removes all authorization to use it, hence > I > > > > thought TRANSACTIONAL_ID_AUTHORIZATION_FAILED would be appropriate. > > > > > > > > Do you suggest using a different error code for 2PC authorization vs > > some > > > > other authorization (e.g. TRANSACTIONAL_ID_2PC_AUTHORIZATION_FAILED) > > or a > > > > different code for disabled vs. unauthorised (e.g. > > > > TWO_PHASE_COMMIT_DISABLED) or both? > > > > > > > > > 17. completeTransaction(). We expect this to be only used during > > > > recovery. > > > > > > > > It can also be used if, say, a commit to the database fails and the > > > result > > > > is inconclusive, e.g. > > > > > > > > 1. Begin DB transaction > > > > 2. Begin Kafka transaction > > > > 3. Prepare Kafka transaction > > > > 4. Commit DB transaction > > > > 5. The DB commit fails, figure out the state of the transaction by > > > reading > > > > the PreparedTxnState from DB > > > > 6. Complete Kafka transaction with the PreparedTxnState. > > > > > > > > > 18. "either prepareTransaction was called or initTransaction(true) > > was > > > > called": "either" should be "neither"? > > > > > > > > Updated. > > > > > > > > > 19. Since InitProducerId always bumps up the epoch, it creates a > > > > situation ... > > > > > > > > InitProducerId only bumps the producer epoch, the ongoing transaction > > > epoch > > > > stays the same, no matter how many times the InitProducerId is called > > > > before the transaction is completed. Eventually the epoch may > > overflow, > > > > and then a new producer id would be allocated, but the ongoing > > > transaction > > > > producer id would stay the same. > > > > > > > > I've added a couple examples in the KIP ( > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-939%3A+Support+Participation+in+2PC#KIP939:SupportParticipationin2PC-PersistedDataFormatChanges > > > > ) > > > > that walk through some scenarios and show how the state is changed. > > > > > > > > -Artem > > > > > > > > On Fri, Dec 8, 2023 at 6:04 PM Jun Rao <j...@confluent.io.invalid> > > wrote: > > > > > > > > > Hi, Artem, > > > > > > > > > > Thanks for the KIP. A few comments below. > > > > > > > > > > 10. For the two new fields in Enable2Pc and KeepPreparedTxn in > > > > > InitProducerId, it would be useful to document a bit more detail on > > > what > > > > > values are set under what cases. For example, are all four > > combinations > > > > > valid? > > > > > > > > > > 11. InitProducerIdResponse: If there is no ongoing txn, what will > > > > > OngoingTxnProducerId and OngoingTxnEpoch be set? > > > > > > > > > > 12. ListTransactionsRequest related changes: It seems those are > > already > > > > > covered by KIP-994? > > > > > > > > > > 13. TransactionalLogValue: Could we name TransactionProducerId and > > > > > ProducerId better? It's not clear from the name which is for which. > > > > > > > > > > 14. "Note that the (producerId, epoch) pair that corresponds to the > > > > ongoing > > > > > transaction is going to be written instead of the existing > ProducerId > > > and > > > > > ProducerEpoch fields (which are renamed to reflect the semantics) > to > > > > > support downgrade.": I am a bit confused on that. Are we writing > > > > different > > > > > values to the existing fields? Then, we can't downgrade, right? > > > > > > > > > > 15. active-transaction-total-time-max : Would > > > > > active-transaction-open-time-max be more intuitive? Also, could we > > > > include > > > > > the full name (group, tags, etc)? > > > > > > > > > > 16. "transaction.two.phase.commit.enable The default would be > > ‘false’. > > > > If > > > > > it’s ‘false’, 2PC functionality is disabled even if the ACL is set, > > > > clients > > > > > that attempt to use this functionality would receive > > > > > TRANSACTIONAL_ID_AUTHORIZATION_FAILED error." > > > > > TRANSACTIONAL_ID_AUTHORIZATION_FAILED seems unintuitive for the > > client > > > to > > > > > understand what the actual cause is. > > > > > > > > > > 17. completeTransaction(). We expect this to be only used during > > > > recovery. > > > > > Could we document this clearly? Could we prevent it from being used > > > > > incorrectly (e.g. throw an exception if the producer has called > other > > > > > methods like send())? > > > > > > > > > > 18. "either prepareTransaction was called or initTransaction(true) > > was > > > > > called": "either" should be "neither"? > > > > > > > > > > 19. Since InitProducerId always bumps up the epoch, it creates a > > > > situation > > > > > where there could be multiple outstanding txns. The following is an > > > > example > > > > > of a potential problem during recovery. > > > > > The last txn epoch in the external store is 41 when the app > dies. > > > > > Instance1 is created for recovery. > > > > > 1. (instance1) InitProducerId(keepPreparedTxn=true), epoch=42, > > > > > ongoingEpoch=41 > > > > > 2. (instance1) dies before completeTxn(41) can be called. > > > > > Instance2 is created for recovery. > > > > > 3. (instance2) InitProducerId(keepPreparedTxn=true), epoch=43, > > > > > ongoingEpoch=42 > > > > > 4. (instance2) completeTxn(41) => abort > > > > > The first problem is that 41 now is aborted when it should be > > > > committed. > > > > > The second one is that it's not clear who could abort epoch 42, > which > > > is > > > > > still open. > > > > > > > > > > Jun > > > > > > > > > > > > > > > On Thu, Dec 7, 2023 at 2:43 PM Justine Olshan > > > > <jols...@confluent.io.invalid > > > > > > > > > > > wrote: > > > > > > > > > > > Hey Artem, > > > > > > > > > > > > Thanks for the updates. I think what you say makes sense. I just > > > > updated > > > > > my > > > > > > KIP so I want to reconcile some of the changes we made especially > > > with > > > > > > respect to the TransactionLogValue. > > > > > > > > > > > > Firstly, I believe tagged fields require a default value so that > if > > > > they > > > > > > are not filled, we return the default (and know that they were > > > empty). > > > > > For > > > > > > my KIP, I proposed the default for producer ID tagged fields > should > > > be > > > > > -1. > > > > > > I was wondering if we could update the KIP to include the default > > > > values > > > > > > for producer ID and epoch. > > > > > > > > > > > > Next, I noticed we decided to rename the fields. I guess that the > > > field > > > > > > "NextProducerId" in my KIP correlates to "ProducerId" in this > KIP. > > Is > > > > > that > > > > > > correct? So we would have "TransactionProducerId" for the > > non-tagged > > > > > field > > > > > > and have "ProducerId" (NextProducerId) and "PrevProducerId" as > > tagged > > > > > > fields the final version after KIP-890 and KIP-936 are > implemented. > > > Is > > > > > this > > > > > > correct? I think the tags will need updating, but that is > trivial. > > > > > > > > > > > > The final question I had was with respect to storing the new > epoch. > > > In > > > > > > KIP-890 part 2 (epoch bumps) I think we concluded that we don't > > need > > > to > > > > > > store the epoch since we can interpret the previous epoch based > on > > > the > > > > > > producer ID. But here we could call the InitProducerId multiple > > times > > > > and > > > > > > we only want the producer with the correct epoch to be able to > > commit > > > > the > > > > > > transaction. Is that the correct reasoning for why we need epoch > > here > > > > but > > > > > > not the Prepare/Commit state. > > > > > > > > > > > > Thanks, > > > > > > Justine > > > > > > > > > > > > On Wed, Nov 22, 2023 at 9:48 AM Artem Livshits > > > > > > <alivsh...@confluent.io.invalid> wrote: > > > > > > > > > > > > > Hi Justine, > > > > > > > > > > > > > > After thinking a bit about supporting atomic dual writes for > > Kafka > > > + > > > > > > NoSQL > > > > > > > database, I came to a conclusion that we do need to bump the > > epoch > > > > even > > > > > > > with InitProducerId(keepPreparedTxn=true). As I described in > my > > > > > previous > > > > > > > email, we wouldn't need to bump the epoch to protect from > zombies > > > so > > > > > that > > > > > > > reasoning is still true. But we cannot protect from > split-brain > > > > > > scenarios > > > > > > > when two or more instances of a producer with the same > > > transactional > > > > id > > > > > > try > > > > > > > to produce at the same time. The dual-write example for SQL > > > > databases > > > > > ( > > > > > > > https://github.com/apache/kafka/pull/14231/files) doesn't > have a > > > > > > > split-brain problem because execution is protected by the > update > > > lock > > > > > on > > > > > > > the transaction state record; however NoSQL databases may not > > have > > > > this > > > > > > > protection (I'll write an example for NoSQL database dual-write > > > > soon). > > > > > > > > > > > > > > In a nutshell, here is an example of a split-brain scenario: > > > > > > > > > > > > > > 1. (instance1) InitProducerId(keepPreparedTxn=true), got > > > epoch=42 > > > > > > > 2. (instance2) InitProducerId(keepPreparedTxn=true), got > > > epoch=42 > > > > > > > 3. (instance1) CommitTxn, epoch bumped to 43 > > > > > > > 4. (instance2) CommitTxn, this is considered a retry, so it > > got > > > > > epoch > > > > > > 43 > > > > > > > as well > > > > > > > 5. (instance1) Produce messageA w/sequence 1 > > > > > > > 6. (instance2) Produce messageB w/sequence 1, this is > > > considered a > > > > > > > duplicate > > > > > > > 7. (instance2) Produce messageC w/sequence 2 > > > > > > > 8. (instance1) Produce messageD w/sequence 2, this is > > > considered a > > > > > > > duplicate > > > > > > > > > > > > > > Now if either of those commit the transaction, it would have a > > mix > > > of > > > > > > > messages from the two instances (messageA and messageC). With > > the > > > > > proper > > > > > > > epoch bump, instance1 would get fenced at step 3. > > > > > > > > > > > > > > In order to update epoch in > InitProducerId(keepPreparedTxn=true) > > we > > > > > need > > > > > > to > > > > > > > preserve the ongoing transaction's epoch (and producerId, if > the > > > > epoch > > > > > > > overflows), because we'd need to make a correct decision when > we > > > > > compare > > > > > > > the PreparedTxnState that we read from the database with the > > > > > (producerId, > > > > > > > epoch) of the ongoing transaction. > > > > > > > > > > > > > > I've updated the KIP with the following: > > > > > > > > > > > > > > - Ongoing transaction now has 2 (producerId, epoch) pairs -- > > one > > > > > pair > > > > > > > describes the ongoing transaction, the other pair describes > > > > expected > > > > > > > epoch > > > > > > > for operations on this transactional id > > > > > > > - InitProducerIdResponse now returns 2 (producerId, epoch) > > pairs > > > > > > > - TransactionalLogValue now has 2 (producerId, epoch) pairs, > > the > > > > new > > > > > > > values added as tagged fields, so it's easy to downgrade > > > > > > > - Added a note about downgrade in the Compatibility section > > > > > > > - Added a rejected alternative > > > > > > > > > > > > > > -Artem > > > > > > > > > > > > > > On Fri, Oct 6, 2023 at 5:16 PM Artem Livshits < > > > > alivsh...@confluent.io> > > > > > > > wrote: > > > > > > > > > > > > > > > Hi Justine, > > > > > > > > > > > > > > > > Thank you for the questions. Currently (pre-KIP-939) we > always > > > > bump > > > > > > the > > > > > > > > epoch on InitProducerId and abort an ongoing transaction (if > > > > any). I > > > > > > > > expect this behavior will continue with KIP-890 as well. > > > > > > > > > > > > > > > > With KIP-939 we need to support the case when the ongoing > > > > transaction > > > > > > > > needs to be preserved when keepPreparedTxn=true. Bumping > epoch > > > > > without > > > > > > > > aborting or committing a transaction is tricky because epoch > > is a > > > > > short > > > > > > > > value and it's easy to overflow. Currently, the overflow > case > > is > > > > > > handled > > > > > > > > by aborting the ongoing transaction, which would send out > > > > transaction > > > > > > > > markers with epoch=Short.MAX_VALUE to the partition leaders, > > > which > > > > > > would > > > > > > > > fence off any messages with the producer id that started the > > > > > > transaction > > > > > > > > (they would have epoch that is less than Short.MAX_VALUE). > > Then > > > it > > > > > is > > > > > > > safe > > > > > > > > to allocate a new producer id and use it in new transactions. > > > > > > > > > > > > > > > > We could say that maybe when keepPreparedTxn=true we bump > epoch > > > > > unless > > > > > > it > > > > > > > > leads to overflow, and don't bump epoch in the overflow case. > > I > > > > > don't > > > > > > > > think it's a good solution because if it's not safe to keep > the > > > > same > > > > > > > epoch > > > > > > > > when keepPreparedTxn=true, then we must handle the epoch > > overflow > > > > > case > > > > > > as > > > > > > > > well. So either we should convince ourselves that it's safe > to > > > > keep > > > > > > the > > > > > > > > epoch and do it in the general case, or we always bump the > > epoch > > > > and > > > > > > > handle > > > > > > > > the overflow. > > > > > > > > > > > > > > > > With KIP-890, we bump the epoch on every transaction commit / > > > > abort. > > > > > > > This > > > > > > > > guarantees that even if InitProducerId(keepPreparedTxn=true) > > > > doesn't > > > > > > > > increment epoch on the ongoing transaction, the client will > > have > > > to > > > > > > call > > > > > > > > commit or abort to finish the transaction and will increment > > the > > > > > epoch > > > > > > > (and > > > > > > > > handle epoch overflow, if needed). If the ongoing > transaction > > > was > > > > > in a > > > > > > > bad > > > > > > > > state and had some zombies waiting to arrive, the abort > > operation > > > > > would > > > > > > > > fence them because with KIP-890 every abort would bump the > > epoch. > > > > > > > > > > > > > > > > We could also look at this from the following perspective. > > With > > > > > > KIP-890, > > > > > > > > zombies won't be able to cross transaction boundaries; each > > > > > transaction > > > > > > > > completion creates a boundary and any activity in the past > gets > > > > > > confined > > > > > > > in > > > > > > > > the boundary. Then data in any partition would look like > this: > > > > > > > > > > > > > > > > 1. message1, epoch=42 > > > > > > > > 2. message2, epoch=42 > > > > > > > > 3. message3, epoch=42 > > > > > > > > 4. marker (commit or abort), epoch=43 > > > > > > > > > > > > > > > > Now if we inject steps 3a and 3b like this: > > > > > > > > > > > > > > > > 1. message1, epoch=42 > > > > > > > > 2. message2, epoch=42 > > > > > > > > 3. message3, epoch=42 > > > > > > > > 3a. crash > > > > > > > > 3b. InitProducerId(keepPreparedTxn=true) > > > > > > > > 4. marker (commit or abort), epoch=43 > > > > > > > > > > > > > > > > The invariant still holds even with steps 3a and 3b -- > whatever > > > > > > activity > > > > > > > > was in the past will get confined in the past with mandatory > > > abort > > > > / > > > > > > > commit > > > > > > > > that must follow InitProducerId(keepPreparedTxn=true). > > > > > > > > > > > > > > > > So KIP-890 provides the proper isolation between > transactions, > > so > > > > > > > > injecting crash + InitProducerId(keepPreparedTxn=true) into > the > > > > > > > > transaction sequence is safe from the zombie protection > > > > perspective. > > > > > > > > > > > > > > > > That said, I'm still thinking about it and looking for cases > > that > > > > > might > > > > > > > > break because we don't bump epoch when > > > > > > > > InitProducerId(keepPreparedTxn=true), if such cases exist, > > we'll > > > > need > > > > > > to > > > > > > > > develop the logic to handle epoch overflow for ongoing > > > > transactions. > > > > > > > > > > > > > > > > -Artem > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Oct 3, 2023 at 10:15 AM Justine Olshan > > > > > > > > <jols...@confluent.io.invalid> wrote: > > > > > > > > > > > > > > > >> Hey Artem, > > > > > > > >> > > > > > > > >> Thanks for the KIP. I had a question about epoch bumping. > > > > > > > >> > > > > > > > >> Previously when we send an InitProducerId request on > Producer > > > > > startup, > > > > > > > we > > > > > > > >> bump the epoch and abort the transaction. Is it correct to > > > assume > > > > > that > > > > > > > we > > > > > > > >> will still bump the epoch, but just not abort the > transaction? > > > > > > > >> If we still bump the epoch in this case, how does this > > interact > > > > with > > > > > > > >> KIP-890 where we also bump the epoch on every transaction. > (I > > > > think > > > > > > this > > > > > > > >> means that we may skip epochs and the data itself will all > > have > > > > the > > > > > > same > > > > > > > >> epoch) > > > > > > > >> > > > > > > > >> I may have follow ups depending on the answer to this. :) > > > > > > > >> > > > > > > > >> Thanks, > > > > > > > >> Justine > > > > > > > >> > > > > > > > >> On Thu, Sep 7, 2023 at 9:51 PM Artem Livshits > > > > > > > >> <alivsh...@confluent.io.invalid> wrote: > > > > > > > >> > > > > > > > >> > Hi Alex, > > > > > > > >> > > > > > > > > >> > Thank you for your questions. > > > > > > > >> > > > > > > > > >> > > the purpose of having broker-level > > > > > > > transaction.two.phase.commit.enable > > > > > > > >> > > > > > > > > >> > The thinking is that 2PC is a bit of an advanced construct > > so > > > > > > enabling > > > > > > > >> 2PC > > > > > > > >> > in a Kafka cluster should be an explicit decision. If it > is > > > set > > > > > to > > > > > > > >> 'false' > > > > > > > >> > InitiProducerId (and initTransactions) would > > > > > > > >> > return TRANSACTIONAL_ID_AUTHORIZATION_FAILED. > > > > > > > >> > > > > > > > > >> > > WDYT about adding an AdminClient method that returns the > > > state > > > > > of > > > > > > > >> > transaction.two.phase.commit.enable > > > > > > > >> > > > > > > > > >> > I wonder if the client could just try to use 2PC and then > > > handle > > > > > the > > > > > > > >> error > > > > > > > >> > (e.g. if it needs to fall back to ordinary transactions). > > > This > > > > > way > > > > > > it > > > > > > > >> > could uniformly handle cases when Kafka cluster doesn't > > > support > > > > > 2PC > > > > > > > >> > completely and cases when 2PC is restricted to certain > > users. > > > > We > > > > > > > could > > > > > > > >> > also expose this config in describeConfigs, if the > fallback > > > > > approach > > > > > > > >> > doesn't work for some scenarios. > > > > > > > >> > > > > > > > > >> > -Artem > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > On Tue, Sep 5, 2023 at 12:45 PM Alexander Sorokoumov > > > > > > > >> > <asorokou...@confluent.io.invalid> wrote: > > > > > > > >> > > > > > > > > >> > > Hi Artem, > > > > > > > >> > > > > > > > > > >> > > Thanks for publishing this KIP! > > > > > > > >> > > > > > > > > > >> > > Can you please clarify the purpose of having > broker-level > > > > > > > >> > > transaction.two.phase.commit.enable config in addition > to > > > the > > > > > new > > > > > > > >> ACL? If > > > > > > > >> > > the brokers are configured with > > > > > > > >> > transaction.two.phase.commit.enable=false, > > > > > > > >> > > at what point will a client configured with > > > > > > > >> > > transaction.two.phase.commit.enable=true fail? Will it > > > happen > > > > at > > > > > > > >> > > KafkaProducer#initTransactions? > > > > > > > >> > > > > > > > > > >> > > WDYT about adding an AdminClient method that returns the > > > state > > > > > of > > > > > > t > > > > > > > >> > > ransaction.two.phase.commit.enable? This way, clients > > would > > > > know > > > > > > in > > > > > > > >> > advance > > > > > > > >> > > if 2PC is enabled on the brokers. > > > > > > > >> > > > > > > > > > >> > > Best, > > > > > > > >> > > Alex > > > > > > > >> > > > > > > > > > >> > > On Fri, Aug 25, 2023 at 9:40 AM Roger Hoover < > > > > > > > roger.hoo...@gmail.com> > > > > > > > >> > > wrote: > > > > > > > >> > > > > > > > > > >> > > > Other than supporting multiplexing transactional > streams > > > on > > > > a > > > > > > > single > > > > > > > >> > > > producer, I don't see how to improve it. > > > > > > > >> > > > > > > > > > > >> > > > On Thu, Aug 24, 2023 at 12:12 PM Artem Livshits > > > > > > > >> > > > <alivsh...@confluent.io.invalid> wrote: > > > > > > > >> > > > > > > > > > > >> > > > > Hi Roger, > > > > > > > >> > > > > > > > > > > > >> > > > > Thank you for summarizing the cons. I agree and I'm > > > > curious > > > > > > > what > > > > > > > >> > would > > > > > > > >> > > > be > > > > > > > >> > > > > the alternatives to solve these problems better and > if > > > > they > > > > > > can > > > > > > > be > > > > > > > >> > > > > incorporated into this proposal (or built > > independently > > > in > > > > > > > >> addition > > > > > > > >> > to > > > > > > > >> > > or > > > > > > > >> > > > > on top of this proposal). E.g. one potential > > extension > > > we > > > > > > > >> discussed > > > > > > > >> > > > > earlier in the thread could be multiplexing logical > > > > > > > transactional > > > > > > > >> > > > "streams" > > > > > > > >> > > > > with a single producer. > > > > > > > >> > > > > > > > > > > > >> > > > > -Artem > > > > > > > >> > > > > > > > > > > > >> > > > > On Wed, Aug 23, 2023 at 4:50 PM Roger Hoover < > > > > > > > >> roger.hoo...@gmail.com > > > > > > > >> > > > > > > > > > >> > > > > wrote: > > > > > > > >> > > > > > > > > > > > >> > > > > > Thanks. I like that you're moving Kafka toward > > > > supporting > > > > > > > this > > > > > > > >> > > > > dual-write > > > > > > > >> > > > > > pattern. Each use case needs to consider the > > > tradeoffs. > > > > > > You > > > > > > > >> > already > > > > > > > >> > > > > > summarized the pros very well in the KIP. I would > > > > > summarize > > > > > > > the > > > > > > > >> > cons > > > > > > > >> > > > > > as follows: > > > > > > > >> > > > > > > > > > > > > >> > > > > > - you sacrifice availability - each write requires > > > both > > > > DB > > > > > > and > > > > > > > >> > Kafka > > > > > > > >> > > to > > > > > > > >> > > > > be > > > > > > > >> > > > > > available so I think your overall application > > > > availability > > > > > > is > > > > > > > 1 > > > > > > > >> - > > > > > > > >> > > p(DB > > > > > > > >> > > > is > > > > > > > >> > > > > > unavailable)*p(Kafka is unavailable). > > > > > > > >> > > > > > - latency will be higher and throughput lower - > each > > > > write > > > > > > > >> requires > > > > > > > >> > > > both > > > > > > > >> > > > > > writes to DB and Kafka while holding an exclusive > > lock > > > > in > > > > > > DB. > > > > > > > >> > > > > > - you need to create a producer per unit of > > > concurrency > > > > in > > > > > > > your > > > > > > > >> app > > > > > > > >> > > > which > > > > > > > >> > > > > > has some overhead in the app and Kafka side > (number > > of > > > > > > > >> connections, > > > > > > > >> > > > poor > > > > > > > >> > > > > > batching). I assume the producers would need to > be > > > > > > configured > > > > > > > >> for > > > > > > > >> > > low > > > > > > > >> > > > > > latency (linger.ms=0) > > > > > > > >> > > > > > - there's some complexity in managing stable > > > > transactional > > > > > > ids > > > > > > > >> for > > > > > > > >> > > each > > > > > > > >> > > > > > producer/concurrency unit in your application. > With > > > k8s > > > > > > > >> > deployment, > > > > > > > >> > > > you > > > > > > > >> > > > > > may need to switch to something like a StatefulSet > > > that > > > > > > gives > > > > > > > >> each > > > > > > > >> > > pod > > > > > > > >> > > > a > > > > > > > >> > > > > > stable identity across restarts. On top of that > pod > > > > > > identity > > > > > > > >> which > > > > > > > >> > > you > > > > > > > >> > > > > can > > > > > > > >> > > > > > use as a prefix, you then assign unique > > transactional > > > > ids > > > > > to > > > > > > > >> each > > > > > > > >> > > > > > concurrency unit (thread/goroutine). > > > > > > > >> > > > > > > > > > > > > >> > > > > > On Wed, Aug 23, 2023 at 12:53 PM Artem Livshits > > > > > > > >> > > > > > <alivsh...@confluent.io.invalid> wrote: > > > > > > > >> > > > > > > > > > > > > >> > > > > > > Hi Roger, > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > Thank you for the feedback. You make a very > good > > > > point > > > > > > that > > > > > > > >> we > > > > > > > >> > > also > > > > > > > >> > > > > > > discussed internally. Adding support for > multiple > > > > > > > concurrent > > > > > > > >> > > > > > > transactions in one producer could be valuable > but > > > it > > > > > > seems > > > > > > > to > > > > > > > >> > be a > > > > > > > >> > > > > > fairly > > > > > > > >> > > > > > > large and independent change that would deserve > a > > > > > separate > > > > > > > >> KIP. > > > > > > > >> > If > > > > > > > >> > > > > such > > > > > > > >> > > > > > > support is added we could modify 2PC > functionality > > > to > > > > > > > >> incorporate > > > > > > > >> > > > that. > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > Maybe not too bad but a bit of pain to manage > > > these > > > > > ids > > > > > > > >> inside > > > > > > > >> > > each > > > > > > > >> > > > > > > process and across all application processes. > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > I'm not sure if supporting multiple transactions > > in > > > > one > > > > > > > >> producer > > > > > > > >> > > > would > > > > > > > >> > > > > > make > > > > > > > >> > > > > > > id management simpler: we'd need to store a > piece > > of > > > > > data > > > > > > > per > > > > > > > >> > > > > > transaction, > > > > > > > >> > > > > > > so whether it's N producers with a single > > > transaction > > > > > or N > > > > > > > >> > > > transactions > > > > > > > >> > > > > > > with a single producer, it's still roughly the > > same > > > > > amount > > > > > > > of > > > > > > > >> > data > > > > > > > >> > > to > > > > > > > >> > > > > > > manage. In fact, managing transactional ids > > > (current > > > > > > > >> proposal) > > > > > > > >> > > might > > > > > > > >> > > > > be > > > > > > > >> > > > > > > easier, because the id is controlled by the > > > > application > > > > > > and > > > > > > > it > > > > > > > >> > > knows > > > > > > > >> > > > > how > > > > > > > >> > > > > > to > > > > > > > >> > > > > > > complete the transaction after crash / restart; > > > while > > > > a > > > > > > TID > > > > > > > >> would > > > > > > > >> > > be > > > > > > > >> > > > > > > generated by Kafka and that would create a > > question > > > of > > > > > > > >> starting > > > > > > > >> > > Kafka > > > > > > > >> > > > > > > transaction, but not saving its TID and then > > > crashing, > > > > > > then > > > > > > > >> > > figuring > > > > > > > >> > > > > out > > > > > > > >> > > > > > > which transactions to abort and etc. > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > 2) creating a separate producer for each > > > concurrency > > > > > > slot > > > > > > > in > > > > > > > >> > the > > > > > > > >> > > > > > > application > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > This is a very valid concern. Maybe we'd need > to > > > have > > > > > > some > > > > > > > >> > > > > multiplexing > > > > > > > >> > > > > > of > > > > > > > >> > > > > > > transactional logical "streams" over the same > > > > > connection. > > > > > > > >> Seems > > > > > > > >> > > > like a > > > > > > > >> > > > > > > separate KIP, though. > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > Otherwise, it seems you're left with > > > single-threaded > > > > > > model > > > > > > > >> per > > > > > > > >> > > > > > > application process? > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > That's a fair assessment. Not necessarily > exactly > > > > > > > >> > single-threaded > > > > > > > >> > > > per > > > > > > > >> > > > > > > application, but a single producer per thread > > model > > > > > (i.e. > > > > > > an > > > > > > > >> > > > > application > > > > > > > >> > > > > > > could have a pool of threads + producers to > > increase > > > > > > > >> > concurrency). > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > -Artem > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > On Tue, Aug 22, 2023 at 7:22 PM Roger Hoover < > > > > > > > >> > > roger.hoo...@gmail.com > > > > > > > >> > > > > > > > > > > > >> > > > > > > wrote: > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > Artem, > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > Thanks for the reply. > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > If I understand correctly, Kafka does not > > support > > > > > > > concurrent > > > > > > > >> > > > > > transactions > > > > > > > >> > > > > > > > from the same producer (transactional id). I > > > think > > > > > this > > > > > > > >> means > > > > > > > >> > > that > > > > > > > >> > > > > > > > applications that want to support in-process > > > > > concurrency > > > > > > > >> (say > > > > > > > >> > > > > > > thread-level > > > > > > > >> > > > > > > > concurrency with row-level DB locking) would > > need > > > to > > > > > > > manage > > > > > > > >> > > > separate > > > > > > > >> > > > > > > > transactional ids and producers per thread and > > > then > > > > > > store > > > > > > > >> txn > > > > > > > >> > > state > > > > > > > >> > > > > > > > accordingly. The potential usability > > downsides I > > > > see > > > > > > are > > > > > > > >> > > > > > > > 1) managing a set of transactional ids for > each > > > > > > > application > > > > > > > >> > > process > > > > > > > >> > > > > > that > > > > > > > >> > > > > > > > scales up to it's max concurrency. Maybe not > > too > > > > bad > > > > > > but > > > > > > > a > > > > > > > >> bit > > > > > > > >> > > of > > > > > > > >> > > > > pain > > > > > > > >> > > > > > > to > > > > > > > >> > > > > > > > manage these ids inside each process and > across > > > all > > > > > > > >> application > > > > > > > >> > > > > > > processes. > > > > > > > >> > > > > > > > 2) creating a separate producer for each > > > concurrency > > > > > > slot > > > > > > > in > > > > > > > >> > the > > > > > > > >> > > > > > > > application - this could create a lot more > > > producers > > > > > and > > > > > > > >> > > resultant > > > > > > > >> > > > > > > > connections to Kafka than the typical model > of a > > > > > single > > > > > > > >> > producer > > > > > > > >> > > > per > > > > > > > >> > > > > > > > process. > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > Otherwise, it seems you're left with > > > single-threaded > > > > > > model > > > > > > > >> per > > > > > > > >> > > > > > > application > > > > > > > >> > > > > > > > process? > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > Thanks, > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > Roger > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > On Tue, Aug 22, 2023 at 5:11 PM Artem Livshits > > > > > > > >> > > > > > > > <alivsh...@confluent.io.invalid> wrote: > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > Hi Roger, Arjun, > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > Thank you for the questions. > > > > > > > >> > > > > > > > > > It looks like the application must have > > stable > > > > > > > >> > transactional > > > > > > > >> > > > ids > > > > > > > >> > > > > > over > > > > > > > >> > > > > > > > > time? > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > The transactional id should uniquely > identify > > a > > > > > > producer > > > > > > > >> > > instance > > > > > > > >> > > > > and > > > > > > > >> > > > > > > > needs > > > > > > > >> > > > > > > > > to be stable across the restarts. If the > > > > > > transactional > > > > > > > >> id is > > > > > > > >> > > not > > > > > > > >> > > > > > > stable > > > > > > > >> > > > > > > > > across restarts, then zombie messages from a > > > > > previous > > > > > > > >> > > incarnation > > > > > > > >> > > > > of > > > > > > > >> > > > > > > the > > > > > > > >> > > > > > > > > producer may violate atomicity. If there > are > > 2 > > > > > > producer > > > > > > > >> > > > instances > > > > > > > >> > > > > > > > > concurrently producing data with the same > > > > > > transactional > > > > > > > >> id, > > > > > > > >> > > they > > > > > > > >> > > > > are > > > > > > > >> > > > > > > > going > > > > > > > >> > > > > > > > > to constantly fence each other and most > likely > > > > make > > > > > > > >> little or > > > > > > > >> > > no > > > > > > > >> > > > > > > > progress. > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > The name might be a little bit confusing as > it > > > may > > > > > be > > > > > > > >> > mistaken > > > > > > > >> > > > for > > > > > > > >> > > > > a > > > > > > > >> > > > > > > > > transaction id / TID that uniquely > identifies > > > > every > > > > > > > >> > > transaction. > > > > > > > >> > > > > The > > > > > > > >> > > > > > > > name > > > > > > > >> > > > > > > > > and the semantics were defined in the > original > > > > > > > >> > > > > exactly-once-semantics > > > > > > > >> > > > > > > > (EoS) > > > > > > > >> > > > > > > > > proposal ( > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging > > > > > > > >> > > > > > > > > ) > > > > > > > >> > > > > > > > > and KIP-939 just build on top of that. > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > I'm curious to understand what happens if > > the > > > > > > producer > > > > > > > >> > dies, > > > > > > > >> > > > and > > > > > > > >> > > > > > does > > > > > > > >> > > > > > > > not > > > > > > > >> > > > > > > > > come up and recover the pending transaction > > > within > > > > > the > > > > > > > >> > > > transaction > > > > > > > >> > > > > > > > timeout > > > > > > > >> > > > > > > > > interval. > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > If the producer / application never comes > > back, > > > > the > > > > > > > >> > transaction > > > > > > > >> > > > > will > > > > > > > >> > > > > > > > remain > > > > > > > >> > > > > > > > > in prepared (a.k.a. "in-doubt") state until > an > > > > > > operator > > > > > > > >> > > > forcefully > > > > > > > >> > > > > > > > > terminates the transaction. That's why > there > > > is a > > > > > new > > > > > > > >> ACL is > > > > > > > >> > > > > defined > > > > > > > >> > > > > > > in > > > > > > > >> > > > > > > > > this proposal -- this functionality should > > only > > > > > > provided > > > > > > > >> to > > > > > > > >> > > > > > > applications > > > > > > > >> > > > > > > > > that implement proper recovery logic. > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > -Artem > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > On Tue, Aug 22, 2023 at 12:52 AM Arjun > Satish > > < > > > > > > > >> > > > > > arjun.sat...@gmail.com> > > > > > > > >> > > > > > > > > wrote: > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > Hello Artem, > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > Thanks for the KIP. > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > I have the same question as Roger on > > > concurrent > > > > > > > writes, > > > > > > > >> and > > > > > > > >> > > an > > > > > > > >> > > > > > > > additional > > > > > > > >> > > > > > > > > > one on consumer behavior. Typically, > > > > transactions > > > > > > will > > > > > > > >> > > timeout > > > > > > > >> > > > if > > > > > > > >> > > > > > not > > > > > > > >> > > > > > > > > > committed within some time interval. With > > the > > > > > > proposed > > > > > > > >> > > changes > > > > > > > >> > > > in > > > > > > > >> > > > > > > this > > > > > > > >> > > > > > > > > KIP, > > > > > > > >> > > > > > > > > > consumers cannot consume past the ongoing > > > > > > transaction. > > > > > > > >> I'm > > > > > > > >> > > > > curious > > > > > > > >> > > > > > to > > > > > > > >> > > > > > > > > > understand what happens if the producer > > dies, > > > > and > > > > > > does > > > > > > > >> not > > > > > > > >> > > come > > > > > > > >> > > > > up > > > > > > > >> > > > > > > and > > > > > > > >> > > > > > > > > > recover the pending transaction within the > > > > > > transaction > > > > > > > >> > > timeout > > > > > > > >> > > > > > > > interval. > > > > > > > >> > > > > > > > > Or > > > > > > > >> > > > > > > > > > are we saying that when used in this 2PC > > > > context, > > > > > we > > > > > > > >> should > > > > > > > >> > > > > > configure > > > > > > > >> > > > > > > > > these > > > > > > > >> > > > > > > > > > transaction timeouts to very large > > durations? > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > Thanks in advance! > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > Best, > > > > > > > >> > > > > > > > > > Arjun > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > On Mon, Aug 21, 2023 at 1:06 PM Roger > > Hoover < > > > > > > > >> > > > > > roger.hoo...@gmail.com > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > wrote: > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > Hi Artem, > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > Thanks for writing this KIP. Can you > > > clarify > > > > > the > > > > > > > >> > > > requirements > > > > > > > >> > > > > a > > > > > > > >> > > > > > > bit > > > > > > > >> > > > > > > > > more > > > > > > > >> > > > > > > > > > > for managing transaction state? It > looks > > > like > > > > > the > > > > > > > >> > > > application > > > > > > > >> > > > > > must > > > > > > > >> > > > > > > > > have > > > > > > > >> > > > > > > > > > > stable transactional ids over time? > What > > > is > > > > > the > > > > > > > >> > > granularity > > > > > > > >> > > > > of > > > > > > > >> > > > > > > > those > > > > > > > >> > > > > > > > > > ids > > > > > > > >> > > > > > > > > > > and producers? Say the application is a > > > > > > > >> multi-threaded > > > > > > > >> > > Java > > > > > > > >> > > > > web > > > > > > > >> > > > > > > > > server, > > > > > > > >> > > > > > > > > > > can/should all the concurrent threads > > share > > > a > > > > > > > >> > transactional > > > > > > > >> > > > id > > > > > > > >> > > > > > and > > > > > > > >> > > > > > > > > > > producer? That doesn't seem right to me > > > > unless > > > > > > the > > > > > > > >> > > > application > > > > > > > >> > > > > > is > > > > > > > >> > > > > > > > > using > > > > > > > >> > > > > > > > > > > global DB locks that serialize all > > requests. > > > > > > > >> Instead, if > > > > > > > >> > > the > > > > > > > >> > > > > > > > > application > > > > > > > >> > > > > > > > > > > uses row-level DB locks, there could be > > > > > multiple, > > > > > > > >> > > concurrent, > > > > > > > >> > > > > > > > > independent > > > > > > > >> > > > > > > > > > > txns happening in the same JVM so it > seems > > > > like > > > > > > the > > > > > > > >> > > > granularity > > > > > > > >> > > > > > > > > managing > > > > > > > >> > > > > > > > > > > transactional ids and txn state needs to > > > line > > > > up > > > > > > > with > > > > > > > >> > > > > granularity > > > > > > > >> > > > > > > of > > > > > > > >> > > > > > > > > the > > > > > > > >> > > > > > > > > > DB > > > > > > > >> > > > > > > > > > > locking. > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > Does that make sense or am I > > > misunderstanding? > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > Thanks, > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > Roger > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > On Wed, Aug 16, 2023 at 11:40 PM Artem > > > > Livshits > > > > > > > >> > > > > > > > > > > <alivsh...@confluent.io.invalid> wrote: > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > Hello, > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > This is a discussion thread for > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-939%3A+Support+Participation+in+2PC > > > > > > > >> > > > > > > > > > > > . > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > The KIP proposes extending Kafka > > > transaction > > > > > > > support > > > > > > > >> > > (that > > > > > > > >> > > > > > > already > > > > > > > >> > > > > > > > > uses > > > > > > > >> > > > > > > > > > > 2PC > > > > > > > >> > > > > > > > > > > > under the hood) to enable atomicity of > > > dual > > > > > > writes > > > > > > > >> to > > > > > > > >> > > Kafka > > > > > > > >> > > > > and > > > > > > > >> > > > > > > an > > > > > > > >> > > > > > > > > > > external > > > > > > > >> > > > > > > > > > > > database, and helps to fix a long > > standing > > > > > Flink > > > > > > > >> issue. > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > An example of code that uses the dual > > > write > > > > > > recipe > > > > > > > >> with > > > > > > > >> > > > JDBC > > > > > > > >> > > > > > and > > > > > > > >> > > > > > > > > should > > > > > > > >> > > > > > > > > > > > work for most SQL databases is here > > > > > > > >> > > > > > > > > > > > > > > https://github.com/apache/kafka/pull/14231. > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > The FLIP for the sister fix in Flink > is > > > here > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255071710 > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > -Artem > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >