Hi Artem, I totally agree that a timeout for the 2PC case is a bad idea. It does abandon the 2PC guarantee.
Thanks, Andrew > On 28 Feb 2024, at 00:44, Artem Livshits <alivsh...@confluent.io.INVALID> > wrote: > > Hi Jun, > > Thank you for the discussion. > >> For 3b, it would be useful to understand the reason why an admin doesn't > authorize 2PC for self-hosted Flink > > I think the nuance here is that for cloud, there is a cloud admin > (operator) and there is cluster admin (who, for example could manage acls > on topics or etc.). The 2PC functionality can affect cloud operations, > because a long running transaction can block the last stable offset and > prevent compaction or data tiering. In a multi-tenant environment, a long > running transaction that involves consumer offset may affect data that is > shared by multiple tenants (Flink transactions don't use consumer offsets, > so this is not an issue for Flink, but we'd need a separate ACL or some > other way to express this permission if we wanted to go in that direction). > > For that reason, I expect 2PC to be controlled by the cloud operator and it > just may not be scalable for the cloud operator to manage all potential > interactions required to resolve in-doubt transactions (communicate to the > end users, etc.). In general, we make no assumptions about Kafka > applications -- they may come and go, they may abandon transactional ids > and generate new ones. For 2PC we need to make sure that the application > is highly available and wouldn't easily abandon an open transaction in > Kafka. > >> If so, another way to address that is to allow the admin to set a timeout > even for the 2PC case. > > This effectively abandons the 2PC guarantee because it creates a case for > Kafka to unilaterally make an automatic decision on a prepared > transaction. I think it's fundamental for 2PC to abandon this ability and > wait for the external coordinator for the decision, after all the > coordinator may legitimately be unavailable for an arbitrary amount of > time. Also, we already have a timeout on regular Kafka transactions, > having another "special" timeout could be confusing, and a large enough > timeout could still produce the undesirable effects for the cloud > operations (so we kind of get worst of both options -- we don't provide > guarantees and still have impact on operations). > > -Artem > > On Fri, Feb 23, 2024 at 8:55 AM Jun Rao <j...@confluent.io.invalid> wrote: > >> Hi, Artem, >> >> Thanks for the reply. >> >> For 3b, it would be useful to understand the reason why an admin doesn't >> authorize 2PC for self-hosted Flink. Is the main reason that 2PC has >> unbounded timeout that could lead to unbounded outstanding transactions? If >> so, another way to address that is to allow the admin to set a timeout even >> for the 2PC case. The timeout would be long enough for behavioring >> applications to complete 2PC operations, but not too long for non-behaving >> applications' transactions to hang. >> >> Jun >> >> On Wed, Feb 21, 2024 at 4:34 PM Artem Livshits >> <alivsh...@confluent.io.invalid> wrote: >> >>> Hi Jun, >>> >>>> 20A. One option is to make the API initTransactions(boolean enable2PC). >>> >>> We could do that. I think there is a little bit of symmetry between the >>> client and server that would get lost with this approach (server has >>> enable2PC as config), but I don't really see a strong reason for >> enable2PC >>> to be a config vs. an argument for initTransactions. But let's see if we >>> find 20B to be a strong consideration for keeping a separate flag for >>> keepPreparedTxn. >>> >>>> 20B. But realistically, we want Flink (and other apps) to have a single >>> implementation >>> >>> That's correct and here's what I think can happen if we don't allow >>> independent keepPreparedTxn: >>> >>> 1. Pre-KIP-939 self-hosted Flink vs. any Kafka cluster -- reflection is >>> used, which effectively implements keepPreparedTxn=true without our >>> explicit support. >>> 2. KIP-939 self-hosted Flink vs. pre-KIP-939 Kafka cluster -- we can >>> either fall back to reflection or we just say we don't support this, have >>> to upgrade Kafka cluster first. >>> 3. KIP-939 self-hosted Flink vs. KIP-939 Kafka cluster, this becomes >>> interesting depending on whether the Kafka cluster authorizes 2PC or not: >>> 3a. Kafka cluster autorizes 2PC for self-hosted Flink -- everything uses >>> KIP-939 and there is no problem >>> 3b. Kafka cluster doesn't authorize 2PC for self-hosted Flink -- we can >>> either fallback to reflection or use keepPreparedTxn=true even if 2PC is >>> not enabled. >>> >>> It seems to be ok to not support case 2 (i.e. require Kafka upgrade >> first), >>> it shouldn't be an issue for cloud offerings as cloud providers are >> likely >>> to upgrade their Kafka to the latest versions. >>> >>> The case 3b seems to be important to support, though -- the latest >> version >>> of everything should work at least as well (and preferably better) than >>> previous ones. It's possible to downgrade to case 1, but it's probably >> not >>> sustainable as newer versions of Flink would also add other features that >>> the customers may want to take advantage of. >>> >>> If we enabled keepPreparedTxn=true even without 2PC, then we could enable >>> case 3b without the need to fall back to reflection, so we could get rid >> of >>> reflection-based logic and just have a single implementation based on >>> KIP-939. >>> >>>> 32. My suggestion is to change >>> >>> Let me think about it and I'll come back to this. >>> >>> -Artem >>> >>> On Wed, Feb 21, 2024 at 3:40 PM Jun Rao <j...@confluent.io.invalid> >> wrote: >>> >>>> Hi, Artem, >>>> >>>> Thanks for the reply. >>>> >>>> 20A. One option is to make the API initTransactions(boolean enable2PC). >>>> Then, it's clear from the code whether 2PC related logic should be >> added. >>>> >>>> 20B. But realistically, we want Flink (and other apps) to have a single >>>> implementation of the 2PC logic, not two different implementations, >>> right? >>>> >>>> 32. My suggestion is to >>>> change >>>> >>> >> kafka.server:type=transaction-coordinator-metrics,name=active-transaction-open-time-max >>>> to sth like >>>> Metric Name Type Group >>>> Tags Description >>>> active-transaction-open-time-max Max >> transaction-coordinator-metrics >>>> none The max time a currently-open transaction has been open >>>> >>>> Jun >>>> >>>> On Wed, Feb 21, 2024 at 11:25 AM Artem Livshits >>>> <alivsh...@confluent.io.invalid> wrote: >>>> >>>>> Hi Jun, >>>>> >>>>>> 20A. This only takes care of the abort case. The application still >>>> needs >>>>> to be changed to handle the commit case properly >>>>> >>>>> My point here is that looking at the initTransactions() call it's not >>>> clear >>>>> what the semantics is. Say I'm doing code review, I cannot say if >> the >>>> code >>>>> is correct or not -- if the config (that's something that's >>>>> theoretically not known at the time of code review) is going to >> enable >>>> 2PC, >>>>> then the correct code should look one way, otherwise it would need to >>>> look >>>>> differently. Also, say if code is written with InitTransaction() >>> without >>>>> explicit abort and then for whatever reason the code would get used >>> with >>>>> 2PC enabled (could be a library in a bigger product) it'll start >>> breaking >>>>> in a non-intuitive way. >>>>> >>>>>> 20B. Hmm, if the admin disables 2PC, there is likely a reason >> behind >>>> that >>>>> >>>>> That's true, but reality may be more complicated. Say a user wants >> to >>>> run >>>>> a self-managed Flink with Confluent cloud. Confluent cloud adim may >>> not >>>>> be comfortable enabling 2PC to general user accounts that use >> services >>>> not >>>>> managed by Confluent (the same way Confluent doesn't allow increasing >>> max >>>>> transaction timeout for general user accounts). Right now, >>> self-managed >>>>> Flink works because it uses reflection, if it moves to use public >> APIs >>>>> provided by KIP-939 it'll break. >>>>> >>>>>> 32. Ok. That's the kafka metric. In that case, the metric name has >> a >>>>> group and a name. There is no type and no package name. >>>>> >>>>> Is this a suggestion to change or confirmation that the current logic >>> is >>>>> ok? I just copied an existing metric but can change if needed. >>>>> >>>>> -Artem >>>>> >>>>> On Tue, Feb 20, 2024 at 11:25 AM Jun Rao <j...@confluent.io.invalid> >>>> wrote: >>>>> >>>>>> Hi, Artem, >>>>>> >>>>>> Thanks for the reply. >>>>>> >>>>>> 20. "Say if an application >>>>>> currently uses initTransactions() to achieve the current semantics, >>> it >>>>>> would need to be rewritten to use initTransactions() + abort to >>> achieve >>>>> the >>>>>> same semantics if the config is changed. " >>>>>> >>>>>> This only takes care of the abort case. The application still needs >>> to >>>> be >>>>>> changed to handle the commit case properly >>>>>> if transaction.two.phase.commit.enable is set to true. >>>>>> >>>>>> "Even when KIP-939 is implemented, >>>>>> there would be situations when 2PC is disabled by the admin (e.g. >>> Kafka >>>>>> service providers may be reluctant to enable 2PC for Flink services >>>> that >>>>>> users host themselves), so we either have to perpetuate the >>>>>> reflection-based implementation in Flink or enable >>> keepPreparedTxn=true >>>>>> without 2PC." >>>>>> >>>>>> Hmm, if the admin disables 2PC, there is likely a reason behind >>> that. I >>>>> am >>>>>> not sure that we should provide an API to encourage the application >>> to >>>>>> circumvent that. >>>>>> >>>>>> 32. Ok. That's the kafka metric. In that case, the metric name has >> a >>>>> group >>>>>> and a name. There is no type and no package name. >>>>>> >>>>>> Jun >>>>>> >>>>>> >>>>>> On Thu, Feb 15, 2024 at 8:23 PM Artem Livshits >>>>>> <alivsh...@confluent.io.invalid> wrote: >>>>>> >>>>>>> Hi Jun, >>>>>>> >>>>>>> Thank you for your questions. >>>>>>> >>>>>>>> 20. So to abort a prepared transaction after the producer >> start, >>> we >>>>>> could >>>>>>> use ... >>>>>>> >>>>>>> I agree, initTransaction(true) + abort would accomplish the >>> behavior >>>> of >>>>>>> initTransactions(false), so we could technically have fewer ways >> to >>>>>> achieve >>>>>>> the same thing, which is generally valuable. I wonder, though, >> if >>>> that >>>>>>> would be intuitive from the application perspective. Say if an >>>>>> application >>>>>>> currently uses initTransactions() to achieve the current >> semantics, >>>> it >>>>>>> would need to be rewritten to use initTransactions() + abort to >>>> achieve >>>>>> the >>>>>>> same semantics if the config is changed. I think this could >> create >>>>>>> subtle confusion, as the config change is generally decoupled >> from >>>>>> changing >>>>>>> application implementation. >>>>>>> >>>>>>>> The use case mentioned for keepPreparedTxn=true without 2PC >>>> doesn't >>>>>> seem >>>>>>> very important >>>>>>> >>>>>>> I agree, it's not a strict requirement. It is, however, a >> missing >>>>> option >>>>>>> in the public API, so currently Flink has to use reflection to >>>> emulate >>>>>> this >>>>>>> functionality without 2PC support. Even when KIP-939 is >>>> implemented, >>>>>>> there would be situations when 2PC is disabled by the admin (e.g. >>>> Kafka >>>>>>> service providers may be reluctant to enable 2PC for Flink >> services >>>>> that >>>>>>> users host themselves), so we either have to perpetuate the >>>>>>> reflection-based implementation in Flink or enable >>>> keepPreparedTxn=true >>>>>>> without 2PC. >>>>>>> >>>>>>>> 32. >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> kafka.server:type=transaction-coordinator-metrics,name=active-transaction-open-time-max >>>>>>> >>>>>>> I just followed the existing metric implementation example >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala#L95 >>>>>>> , >>>>>>> which maps to >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> kafka.server:type=transaction-coordinator-metrics,name=partition-load-time-max. >>>>>>> >>>>>>>> 33. "If the value is 'true' then the corresponding field is set >>>>>>> >>>>>>> That's correct. Updated the KIP. >>>>>>> >>>>>>> -Artem >>>>>>> >>>>>>> On Wed, Feb 7, 2024 at 10:06 AM Jun Rao <j...@confluent.io.invalid >>> >>>>>> wrote: >>>>>>> >>>>>>>> Hi, Artem, >>>>>>>> >>>>>>>> Thanks for the reply. >>>>>>>> >>>>>>>> 20. So to abort a prepared transaction after producer start, we >>>> could >>>>>> use >>>>>>>> either >>>>>>>> producer.initTransactions(false) >>>>>>>> or >>>>>>>> producer.initTransactions(true) >>>>>>>> producer.abortTransaction >>>>>>>> Could we just always use the latter API? If we do this, we >> could >>>>>>>> potentially eliminate the keepPreparedTxn flag in >>>> initTransactions(). >>>>>>> After >>>>>>>> the initTransactions() call, the outstanding txn is always >>>> preserved >>>>> if >>>>>>> 2pc >>>>>>>> is enabled and aborted if 2pc is disabled. The use case >> mentioned >>>> for >>>>>>>> keepPreparedTxn=true without 2PC doesn't seem very important. >> If >>> we >>>>>> could >>>>>>>> do that, it seems that we have (1) less redundant and simpler >>> APIs; >>>>> (2) >>>>>>>> more symmetric syntax for aborting/committing a prepared txn >>> after >>>>>>> producer >>>>>>>> restart. >>>>>>>> >>>>>>>> 32. >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> kafka.server:type=transaction-coordinator-metrics,name=active-transaction-open-time-max >>>>>>>> Is this a Yammer or kafka metric? The former uses the camel >> case >>>> for >>>>>> name >>>>>>>> and type. The latter uses the hyphen notation, but doesn't have >>> the >>>>>> type >>>>>>>> attribute. >>>>>>>> >>>>>>>> 33. "If the value is 'true' then the corresponding field is set >>> in >>>>> the >>>>>>>> InitProducerIdRequest and the KafkaProducer object is set into >> a >>>>> state >>>>>>>> which only allows calling .commitTransaction or >>> .abortTransaction." >>>>>>>> We should also allow .completeTransaction, right? >>>>>>>> >>>>>>>> Jun >>>>>>>> >>>>>>>> >>>>>>>> On Tue, Feb 6, 2024 at 3:29 PM Artem Livshits >>>>>>>> <alivsh...@confluent.io.invalid> wrote: >>>>>>>> >>>>>>>>> Hi Jun, >>>>>>>>> >>>>>>>>>> 20. For Flink usage, it seems that the APIs used to abort >> and >>>>>> commit >>>>>>> a >>>>>>>>> prepared txn are not symmetric. >>>>>>>>> >>>>>>>>> For Flink it is expected that Flink would call >>> .commitTransaction >>>>> or >>>>>>>>> .abortTransaction directly, it wouldn't need to deal with >>>>>>>> PreparedTxnState, >>>>>>>>> the outcome is actually determined by the Flink's job >> manager, >>>> not >>>>> by >>>>>>>>> comparison of PreparedTxnState. So for Flink, if the Kafka >>> sync >>>>>>> crashes >>>>>>>>> and restarts there are 2 cases: >>>>>>>>> >>>>>>>>> 1. Transaction is not prepared. In that case just call >>>>>>>>> producer.initTransactions(false) and then can start >>> transactions >>>> as >>>>>>>> needed. >>>>>>>>> 2. Transaction is prepared. In that case call >>>>>>>>> producer.initTransactions(true) and wait for the decision >> from >>>> the >>>>>> job >>>>>>>>> manager. Note that it's not given that the transaction will >>> get >>>>>>>> committed, >>>>>>>>> the decision could also be an abort. >>>>>>>>> >>>>>>>>>> 21. transaction.max.timeout.ms could in theory be >> MAX_INT. >>>>>> Perhaps >>>>>>> we >>>>>>>>> could use a negative timeout in the record to indicate 2PC? >>>>>>>>> >>>>>>>>> -1 sounds good, updated. >>>>>>>>> >>>>>>>>>> 30. The KIP has two different APIs to abort an ongoing txn. >>> Do >>>> we >>>>>>> need >>>>>>>>> both? >>>>>>>>> >>>>>>>>> I think of producer.initTransactions() to be an >> implementation >>>> for >>>>>>>>> adminClient.forceTerminateTransaction(transactionalId). >>>>>>>>> >>>>>>>>>> 31. "This would flush all the pending messages and >> transition >>>> the >>>>>>>>> producer >>>>>>>>> >>>>>>>>> Updated the KIP to clarify that IllegalStateException will be >>>>> thrown. >>>>>>>>> >>>>>>>>> -Artem >>>>>>>>> >>>>>>>>> >>>>>>>>> On Mon, Feb 5, 2024 at 2:22 PM Jun Rao >>> <j...@confluent.io.invalid >>>>> >>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi, Artem, >>>>>>>>>> >>>>>>>>>> Thanks for the reply. >>>>>>>>>> >>>>>>>>>> 20. For Flink usage, it seems that the APIs used to abort >> and >>>>>> commit >>>>>>> a >>>>>>>>>> prepared txn are not symmetric. >>>>>>>>>> To abort, the app will just call >>>>>>>>>> producer.initTransactions(false) >>>>>>>>>> >>>>>>>>>> To commit, the app needs to call >>>>>>>>>> producer.initTransactions(true) >>>>>>>>>> producer.completeTransaction(preparedTxnState) >>>>>>>>>> >>>>>>>>>> Will this be a concern? For the dual-writer usage, both >>>>>> abort/commit >>>>>>>> use >>>>>>>>>> the same API. >>>>>>>>>> >>>>>>>>>> 21. transaction.max.timeout.ms could in theory be MAX_INT. >>>>> Perhaps >>>>>>> we >>>>>>>>>> could >>>>>>>>>> use a negative timeout in the record to indicate 2PC? >>>>>>>>>> >>>>>>>>>> 30. The KIP has two different APIs to abort an ongoing txn. >>> Do >>>> we >>>>>>> need >>>>>>>>>> both? >>>>>>>>>> producer.initTransactions(false) >>>>>>>>>> adminClient.forceTerminateTransaction(transactionalId) >>>>>>>>>> >>>>>>>>>> 31. "This would flush all the pending messages and >> transition >>>> the >>>>>>>>> producer >>>>>>>>>> into a mode where only .commitTransaction, >> .abortTransaction, >>>> or >>>>>>>>>> .completeTransaction could be called. If the call is >>>> successful >>>>>> (all >>>>>>>>>> messages successfully got flushed to all partitions) the >>>>>> transaction >>>>>>> is >>>>>>>>>> prepared." >>>>>>>>>> If the producer calls send() in that state, what exception >>>> will >>>>>> the >>>>>>>>> caller >>>>>>>>>> receive? >>>>>>>>>> >>>>>>>>>> Jun >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Fri, Feb 2, 2024 at 3:34 PM Artem Livshits >>>>>>>>>> <alivsh...@confluent.io.invalid> wrote: >>>>>>>>>> >>>>>>>>>>> Hi Jun, >>>>>>>>>>> >>>>>>>>>>>> Then, should we change the following in the example to >>> use >>>>>>>>>>> InitProducerId(true) instead? >>>>>>>>>>> >>>>>>>>>>> We could. I just thought that it's good to make the >> example >>>>>>>>>> self-contained >>>>>>>>>>> by starting from a clean state. >>>>>>>>>>> >>>>>>>>>>>> Also, could Flink just follow the dual-write recipe? >>>>>>>>>>> >>>>>>>>>>> I think it would bring some unnecessary logic to Flink >> (or >>>> any >>>>>>> other >>>>>>>>>> system >>>>>>>>>>> that already has a transaction coordinator and just wants >>> to >>>>>> drive >>>>>>>>> Kafka >>>>>>>>>> to >>>>>>>>>>> the desired state). We could discuss it with Flink >> folks, >>>> the >>>>>>>> current >>>>>>>>>>> proposal was developed in collaboration with them. >>>>>>>>>>> >>>>>>>>>>>> 21. Could a non 2pc user explicitly set the >>>>>> TransactionTimeoutMs >>>>>>> to >>>>>>>>>>> Integer.MAX_VALUE? >>>>>>>>>>> >>>>>>>>>>> The server would reject this for regular transactions, it >>>> only >>>>>>>> accepts >>>>>>>>>>> values that are <= *transaction.max.timeout.ms >>>>>>>>>>> <http://transaction.max.timeout.ms/> *(a broker config). >>>>>>>>>>> >>>>>>>>>>>> 24. Hmm, In KIP-890, without 2pc, the coordinator >> expects >>>> the >>>>>>>> endTxn >>>>>>>>>>> request to use the ongoing pid. ... >>>>>>>>>>> >>>>>>>>>>> Without 2PC there is no case where the pid could change >>>> between >>>>>>>>> starting >>>>>>>>>> a >>>>>>>>>>> transaction and endTxn (InitProducerId would abort any >>>> ongoing >>>>>>>>>>> transaction). WIth 2PC there is now a case where there >>> could >>>>> be >>>>>>>>>>> InitProducerId that can change the pid without aborting >> the >>>>>>>>> transaction, >>>>>>>>>> so >>>>>>>>>>> we need to handle that. I wouldn't say that the flow is >>>>>> different, >>>>>>>> but >>>>>>>>>>> it's rather extended to handle new cases. The main >>> principle >>>>> is >>>>>>>> still >>>>>>>>>> the >>>>>>>>>>> same -- for all operations we use the latest >> "operational" >>>> pid >>>>>> and >>>>>>>>> epoch >>>>>>>>>>> known to the client, this way we guarantee that we can >>> fence >>>>>>> zombie / >>>>>>>>>> split >>>>>>>>>>> brain clients by disrupting the "latest known" pid + >> epoch >>>>>>>> progression. >>>>>>>>>>> >>>>>>>>>>>> 25. "We send out markers using the original ongoing >>>>> transaction >>>>>>>>>>> ProducerId and ProducerEpoch" ... >>>>>>>>>>> >>>>>>>>>>> Updated. >>>>>>>>>>> >>>>>>>>>>> -Artem >>>>>>>>>>> >>>>>>>>>>> On Mon, Jan 29, 2024 at 4:57 PM Jun Rao >>>>> <j...@confluent.io.invalid >>>>>>> >>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi, Artem, >>>>>>>>>>>> >>>>>>>>>>>> Thanks for the reply. >>>>>>>>>>>> >>>>>>>>>>>> 20. So for the dual-write recipe, we should always call >>>>>>>>>>>> InitProducerId(keepPreparedTxn=true) from the producer? >>>> Then, >>>>>>>> should >>>>>>>>> we >>>>>>>>>>>> change the following in the example to use >>>>> InitProducerId(true) >>>>>>>>>> instead? >>>>>>>>>>>> 1. InitProducerId(false); TC STATE: Empty, >> ProducerId=42, >>>>>>>>>>>> ProducerEpoch=MAX-1, PrevProducerId=-1, >>> NextProducerId=-1, >>>>>>>>>>>> NextProducerEpoch=-1; RESPONSE ProducerId=42, >>> Epoch=MAX-1, >>>>>>>>>>>> OngoingTxnProducerId=-1, OngoingTxnEpoch=-1. >>>>>>>>>>>> Also, could Flink just follow the dual-write recipe? >> It's >>>>>> simpler >>>>>>>> if >>>>>>>>>>> there >>>>>>>>>>>> is one way to solve the 2pc issue. >>>>>>>>>>>> >>>>>>>>>>>> 21. Could a non 2pc user explicitly set the >>>>>> TransactionTimeoutMs >>>>>>> to >>>>>>>>>>>> Integer.MAX_VALUE? >>>>>>>>>>>> >>>>>>>>>>>> 24. Hmm, In KIP-890, without 2pc, the coordinator >> expects >>>> the >>>>>>>> endTxn >>>>>>>>>>>> request to use the ongoing pid. With 2pc, the >> coordinator >>>> now >>>>>>>> expects >>>>>>>>>> the >>>>>>>>>>>> endTxn request to use the next pid. So, the flow is >>>>> different, >>>>>>>> right? >>>>>>>>>>>> >>>>>>>>>>>> 25. "We send out markers using the original ongoing >>>>> transaction >>>>>>>>>>> ProducerId >>>>>>>>>>>> and ProducerEpoch" >>>>>>>>>>>> We should use ProducerEpoch + 1 in the marker, right? >>>>>>>>>>>> >>>>>>>>>>>> Jun >>>>>>>>>>>> >>>>>>>>>>>> On Fri, Jan 26, 2024 at 8:35 PM Artem Livshits >>>>>>>>>>>> <alivsh...@confluent.io.invalid> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi Jun, >>>>>>>>>>>>> >>>>>>>>>>>>>> 20. I am a bit confused by how we set >>> keepPreparedTxn. >>>>>> ... >>>>>>>>>>>>> >>>>>>>>>>>>> keepPreparedTxn=true informs the transaction >>> coordinator >>>>> that >>>>>>> it >>>>>>>>>> should >>>>>>>>>>>>> keep the ongoing transaction, if any. If the >>>>>>>>> keepPreparedTxn=false, >>>>>>>>>>> then >>>>>>>>>>>>> any ongoing transaction is aborted (this is exactly >> the >>>>>> current >>>>>>>>>>>> behavior). >>>>>>>>>>>>> enable2Pc is a separate argument that is controlled >> by >>>> the >>>>>>>>>>>>> *transaction.two.phase.commit.enable *setting on the >>>>> client. >>>>>>>>>>>>> >>>>>>>>>>>>> To start 2PC, the client just needs to set >>>>>>>>>>>>> *transaction.two.phase.commit.enable*=true in the >>> config. >>>>>> Then >>>>>>>> if >>>>>>>>>> the >>>>>>>>>>>>> client knows the status of the transaction upfront >> (in >>>> the >>>>>> case >>>>>>>> of >>>>>>>>>>> Flink, >>>>>>>>>>>>> Flink keeps the knowledge if the transaction is >>> prepared >>>> in >>>>>> its >>>>>>>> own >>>>>>>>>>>> store, >>>>>>>>>>>>> so it always knows upfront), it can set >> keepPreparedTxn >>>>>>>>> accordingly, >>>>>>>>>>> then >>>>>>>>>>>>> if the transaction was prepared, it'll be ready for >> the >>>>>> client >>>>>>> to >>>>>>>>>>>> complete >>>>>>>>>>>>> the appropriate action; if the client doesn't have a >>>>>> knowledge >>>>>>>> that >>>>>>>>>> the >>>>>>>>>>>>> transaction is prepared, keepPreparedTxn is going to >> be >>>>>> false, >>>>>>> in >>>>>>>>>> which >>>>>>>>>>>>> case we'll get to a clean state (the same way we do >>>> today). >>>>>>>>>>>>> >>>>>>>>>>>>> For the dual-write recipe, the client doesn't know >>>> upfront >>>>> if >>>>>>> the >>>>>>>>>>>>> transaction is prepared, this information is >> implicitly >>>>>> encoded >>>>>>>>>>>>> PreparedTxnState value that can be used to resolve >> the >>>>>>>> transaction >>>>>>>>>>> state. >>>>>>>>>>>>> In that case, keepPreparedTxn should always be true, >>>>> because >>>>>> we >>>>>>>>> don't >>>>>>>>>>>> know >>>>>>>>>>>>> upfront and we don't want to accidentally abort a >>>> committed >>>>>>>>>>> transaction. >>>>>>>>>>>>> >>>>>>>>>>>>> The forceTerminateTransaction call can just use >>>>>>>>>> keepPreparedTxn=false, >>>>>>>>>>> it >>>>>>>>>>>>> actually doesn't matter if it sets Enable2Pc flag. >>>>>>>>>>>>> >>>>>>>>>>>>>> 21. TransactionLogValue: Do we need some field to >>>>> identify >>>>>>>>> whether >>>>>>>>>>> this >>>>>>>>>>>>> is written for 2PC so that ongoing txn is never auto >>>>> aborted? >>>>>>>>>>>>> >>>>>>>>>>>>> The TransactionTimeoutMs would be set to >>>> Integer.MAX_VALUE >>>>> if >>>>>>> 2PC >>>>>>>>> was >>>>>>>>>>>>> enabled. I've added a note to the KIP about this. >>>>>>>>>>>>> >>>>>>>>>>>>>> 22 >>>>>>>>>>>>> >>>>>>>>>>>>> You're right it's a typo. I fixed it as well as >> step 9 >>>>>>> (REQUEST: >>>>>>>>>>>>> ProducerId=73, ProducerEpoch=MAX). >>>>>>>>>>>>> >>>>>>>>>>>>>> 23. It's a bit weird that Enable2Pc is driven by a >>>> config >>>>>>> while >>>>>>>>>>>>> KeepPreparedTxn is from an API param ... >>>>>>>>>>>>> >>>>>>>>>>>>> The intent to use 2PC doesn't change from transaction >>> to >>>>>>>>> transaction, >>>>>>>>>>> but >>>>>>>>>>>>> the intent to keep prepared txn may change from >>>> transaction >>>>>> to >>>>>>>>>>>>> transaction. In dual-write recipes the distinction >> is >>>> not >>>>>>> clear, >>>>>>>>> but >>>>>>>>>>> for >>>>>>>>>>>>> use cases where keepPreparedTxn value is known >> upfront >>>>> (e.g. >>>>>>>> Flink) >>>>>>>>>>> it's >>>>>>>>>>>>> more prominent. E.g. a Flink's Kafka sink operator >>> could >>>>> be >>>>>>>>> deployed >>>>>>>>>>>> with >>>>>>>>>>>>> *transaction.two.phase.commit.enable*=true hardcoded >> in >>>> the >>>>>>>> image, >>>>>>>>>> but >>>>>>>>>>>>> keepPreparedTxn cannot be hardcoded in the image, >>> because >>>>> it >>>>>>>>> depends >>>>>>>>>> on >>>>>>>>>>>> the >>>>>>>>>>>>> job manager's state. >>>>>>>>>>>>> >>>>>>>>>>>>>> 24 >>>>>>>>>>>>> >>>>>>>>>>>>> The flow is actually going to be the same way as it >> is >>>> now >>>>> -- >>>>>>> the >>>>>>>>>>> "main" >>>>>>>>>>>>> producer id + epoch needs to be used in all >> operations >>> to >>>>>>> prevent >>>>>>>>>>> fencing >>>>>>>>>>>>> (it's sort of a common "header" in all RPC calls that >>>>> follow >>>>>>> the >>>>>>>>> same >>>>>>>>>>>>> rules). The ongoing txn info is just additional info >>> for >>>>>>> making >>>>>>>> a >>>>>>>>>>>> commit / >>>>>>>>>>>>> abort decision based on the PreparedTxnState from the >>> DB. >>>>>>>>>>>>> >>>>>>>>>>>>> --Artem >>>>>>>>>>>>> >>>>>>>>>>>>> On Thu, Jan 25, 2024 at 11:05 AM Jun Rao >>>>>>>> <j...@confluent.io.invalid >>>>>>>>>> >>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hi, Artem, >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks for the reply. A few more comments. >>>>>>>>>>>>>> >>>>>>>>>>>>>> 20. I am a bit confused by how we set >>> keepPreparedTxn. >>>>> From >>>>>>> the >>>>>>>>>> KIP, >>>>>>>>>>> I >>>>>>>>>>>>> got >>>>>>>>>>>>>> the following (1) to start 2pc, we call >>>>>>>>>>>>>> InitProducerId(keepPreparedTxn=false); (2) when the >>>>>> producer >>>>>>>>> fails >>>>>>>>>>> and >>>>>>>>>>>>>> needs to do recovery, it calls >>>>>>>>>> InitProducerId(keepPreparedTxn=true); >>>>>>>>>>>> (3) >>>>>>>>>>>>>> Admin.forceTerminateTransaction() calls >>>>>>>>>>>>>> InitProducerId(keepPreparedTxn=false). >>>>>>>>>>>>>> 20.1 In (1), when a producer calls >>>> InitProducerId(false) >>>>>> with >>>>>>>> 2pc >>>>>>>>>>>>> enabled, >>>>>>>>>>>>>> and there is an ongoing txn, should the server >> return >>>> an >>>>>>> error >>>>>>>> to >>>>>>>>>> the >>>>>>>>>>>>>> InitProducerId request? If so, what would be the >>> error >>>>>> code? >>>>>>>>>>>>>> 20.2 How do we distinguish between (1) and (3)? >> It's >>>> the >>>>>> same >>>>>>>> API >>>>>>>>>>> call >>>>>>>>>>>>> but >>>>>>>>>>>>>> (1) doesn't abort ongoing txn and (2) does. >>>>>>>>>>>>>> 20.3 The usage in (1) seems unintuitive. 2pc >> implies >>>>>> keeping >>>>>>>> the >>>>>>>>>>>> ongoing >>>>>>>>>>>>>> txn. So, setting keepPreparedTxn to false to start >>> 2pc >>>>>> seems >>>>>>>>>> counter >>>>>>>>>>>>>> intuitive. >>>>>>>>>>>>>> >>>>>>>>>>>>>> 21. TransactionLogValue: Do we need some field to >>>>> identify >>>>>>>>> whether >>>>>>>>>>> this >>>>>>>>>>>>> is >>>>>>>>>>>>>> written for 2PC so that ongoing txn is never auto >>>>> aborted? >>>>>>>>>>>>>> >>>>>>>>>>>>>> 22. "8. InitProducerId(true); TC STATE: Ongoing, >>>>>>> ProducerId=42, >>>>>>>>>>>>>> ProducerEpoch=MAX-1, PrevProducerId=-1, >>>>> NextProducerId=73, >>>>>>>>>>>>>> NextProducerEpoch=MAX; RESPONSE ProducerId=73, >>>>> Epoch=MAX-1, >>>>>>>>>>>>>> OngoingTxnProducerId=42, OngoingTxnEpoch=MAX-1" >>>>>>>>>>>>>> It seems in the above example, Epoch in RESPONSE >>> should >>>>> be >>>>>>> MAX >>>>>>>> to >>>>>>>>>>> match >>>>>>>>>>>>>> NextProducerEpoch? >>>>>>>>>>>>>> >>>>>>>>>>>>>> 23. It's a bit weird that Enable2Pc is driven by a >>>> config >>>>>>>>>>>>>> while KeepPreparedTxn is from an API param. Should >> we >>>>> make >>>>>>> them >>>>>>>>>> more >>>>>>>>>>>>>> consistent since they seem related? >>>>>>>>>>>>>> >>>>>>>>>>>>>> 24. "9. Commit; REQUEST: ProducerId=73, >>>>>> ProducerEpoch=MAX-1; >>>>>>> TC >>>>>>>>>>> STATE: >>>>>>>>>>>>>> PrepareCommit, ProducerId=42, ProducerEpoch=MAX, >>>>>>>>> PrevProducerId=73, >>>>>>>>>>>>>> NextProducerId=85, NextProducerEpoch=0; RESPONSE >>>>>>> ProducerId=85, >>>>>>>>>>>> Epoch=0, >>>>>>>>>>>>>> When a commit request is sent, it uses the latest >>>>>> ProducerId >>>>>>>> and >>>>>>>>>>>>>> ProducerEpoch." >>>>>>>>>>>>>> The step where we use the next produceId to commit >> an >>>> old >>>>>> txn >>>>>>>>>> works, >>>>>>>>>>>> but >>>>>>>>>>>>>> can be confusing. It's going to be hard for people >>>>>>> implementing >>>>>>>>>> this >>>>>>>>>>>> new >>>>>>>>>>>>>> client protocol to figure out when to use the >> current >>>> or >>>>>> the >>>>>>>> new >>>>>>>>>>>>> producerId >>>>>>>>>>>>>> in the EndTxnRequest. One potential way to improve >>> this >>>>> is >>>>>> to >>>>>>>>>> extend >>>>>>>>>>>>>> EndTxnRequest with a new field like >>>>> expectedNextProducerId. >>>>>>>> Then >>>>>>>>> we >>>>>>>>>>> can >>>>>>>>>>>>>> always use the old produceId in the existing field, >>> but >>>>> set >>>>>>>>>>>>>> expectedNextProducerId to bypass the fencing logic >>> when >>>>>>> needed. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>> >>>>>>>>>>>>>> Jun >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Mon, Dec 18, 2023 at 2:06 PM Artem Livshits >>>>>>>>>>>>>> <alivsh...@confluent.io.invalid> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi Jun, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thank you for the comments. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> 10. For the two new fields in Enable2Pc and >>>>>>> KeepPreparedTxn >>>>>>>>> ... >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I added a note that all combinations are valid. >>>>>>>>> Enable2Pc=false >>>>>>>>>> & >>>>>>>>>>>>>>> KeepPreparedTxn=true could be potentially useful >>> for >>>>>>> backward >>>>>>>>>>>>>> compatibility >>>>>>>>>>>>>>> with Flink, when the new version of Flink that >>>>> implements >>>>>>>>> KIP-319 >>>>>>>>>>>> tries >>>>>>>>>>>>>> to >>>>>>>>>>>>>>> work with a cluster that doesn't authorize 2PC. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> 11. InitProducerIdResponse: If there is no >>> ongoing >>>>>> txn, >>>>>>>> what >>>>>>>>>>> will >>>>>>>>>>>>>>> OngoingTxnProducerId and OngoingTxnEpoch be set? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I added a note that they will be set to -1. The >>>> client >>>>>>> then >>>>>>>>> will >>>>>>>>>>>> know >>>>>>>>>>>>>> that >>>>>>>>>>>>>>> there is no ongoing txn and .completeTransaction >>>>> becomes >>>>>> a >>>>>>>>> no-op >>>>>>>>>>> (but >>>>>>>>>>>>>> still >>>>>>>>>>>>>>> required before .send is enabled). >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> 12. ListTransactionsRequest related changes: It >>>> seems >>>>>>> those >>>>>>>>> are >>>>>>>>>>>>> already >>>>>>>>>>>>>>> covered by KIP-994? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Removed from this KIP. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> 13. TransactionalLogValue ... >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> This is now updated to work on top of KIP-890. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> 14. "Note that the (producerId, epoch) pair >> that >>>>>>>> corresponds >>>>>>>>> to >>>>>>>>>>> the >>>>>>>>>>>>>>> ongoing transaction ... >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> This is now updated to work on top of KIP-890. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> 15. active-transaction-total-time-max : ... >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Updated. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> 16. "transaction.two.phase.commit.enable The >>>> default >>>>>>> would >>>>>>>> be >>>>>>>>>>>>> ‘false’. >>>>>>>>>>>>>>> If it’s ‘false’, 2PC functionality is disabled >> even >>>> if >>>>>> the >>>>>>>> ACL >>>>>>>>> is >>>>>>>>>>> set >>>>>>>>>>>>> ... >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Disabling 2PC effectively removes all >> authorization >>>> to >>>>>> use >>>>>>>> it, >>>>>>>>>>> hence >>>>>>>>>>>> I >>>>>>>>>>>>>>> thought TRANSACTIONAL_ID_AUTHORIZATION_FAILED >> would >>>> be >>>>>>>>>> appropriate. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Do you suggest using a different error code for >> 2PC >>>>>>>>> authorization >>>>>>>>>>> vs >>>>>>>>>>>>> some >>>>>>>>>>>>>>> other authorization (e.g. >>>>>>>>>>> TRANSACTIONAL_ID_2PC_AUTHORIZATION_FAILED) >>>>>>>>>>>>> or a >>>>>>>>>>>>>>> different code for disabled vs. unauthorised >> (e.g. >>>>>>>>>>>>>>> TWO_PHASE_COMMIT_DISABLED) or both? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> 17. completeTransaction(). We expect this to be >>>> only >>>>>> used >>>>>>>>>> during >>>>>>>>>>>>>>> recovery. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> It can also be used if, say, a commit to the >>> database >>>>>> fails >>>>>>>> and >>>>>>>>>> the >>>>>>>>>>>>>> result >>>>>>>>>>>>>>> is inconclusive, e.g. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> 1. Begin DB transaction >>>>>>>>>>>>>>> 2. Begin Kafka transaction >>>>>>>>>>>>>>> 3. Prepare Kafka transaction >>>>>>>>>>>>>>> 4. Commit DB transaction >>>>>>>>>>>>>>> 5. The DB commit fails, figure out the state of >> the >>>>>>>> transaction >>>>>>>>>> by >>>>>>>>>>>>>> reading >>>>>>>>>>>>>>> the PreparedTxnState from DB >>>>>>>>>>>>>>> 6. Complete Kafka transaction with the >>>>> PreparedTxnState. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> 18. "either prepareTransaction was called or >>>>>>>>>>> initTransaction(true) >>>>>>>>>>>>> was >>>>>>>>>>>>>>> called": "either" should be "neither"? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Updated. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> 19. Since InitProducerId always bumps up the >>> epoch, >>>>> it >>>>>>>>> creates >>>>>>>>>> a >>>>>>>>>>>>>>> situation ... >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> InitProducerId only bumps the producer epoch, the >>>>> ongoing >>>>>>>>>>> transaction >>>>>>>>>>>>>> epoch >>>>>>>>>>>>>>> stays the same, no matter how many times the >>>>>> InitProducerId >>>>>>>> is >>>>>>>>>>> called >>>>>>>>>>>>>>> before the transaction is completed. Eventually >>> the >>>>>> epoch >>>>>>>> may >>>>>>>>>>>>> overflow, >>>>>>>>>>>>>>> and then a new producer id would be allocated, >> but >>>> the >>>>>>>> ongoing >>>>>>>>>>>>>> transaction >>>>>>>>>>>>>>> producer id would stay the same. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I've added a couple examples in the KIP ( >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-939%3A+Support+Participation+in+2PC#KIP939:SupportParticipationin2PC-PersistedDataFormatChanges >>>>>>>>>>>>>>> ) >>>>>>>>>>>>>>> that walk through some scenarios and show how the >>>> state >>>>>> is >>>>>>>>>> changed. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> -Artem >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Fri, Dec 8, 2023 at 6:04 PM Jun Rao >>>>>>>>> <j...@confluent.io.invalid >>>>>>>>>>> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hi, Artem, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thanks for the KIP. A few comments below. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> 10. For the two new fields in Enable2Pc and >>>>>>> KeepPreparedTxn >>>>>>>>> in >>>>>>>>>>>>>>>> InitProducerId, it would be useful to document >> a >>>> bit >>>>>> more >>>>>>>>>> detail >>>>>>>>>>> on >>>>>>>>>>>>>> what >>>>>>>>>>>>>>>> values are set under what cases. For example, >> are >>>> all >>>>>>> four >>>>>>>>>>>>> combinations >>>>>>>>>>>>>>>> valid? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> 11. InitProducerIdResponse: If there is no >>> ongoing >>>>>> txn, >>>>>>>> what >>>>>>>>>>> will >>>>>>>>>>>>>>>> OngoingTxnProducerId and OngoingTxnEpoch be >> set? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> 12. ListTransactionsRequest related changes: It >>>> seems >>>>>>> those >>>>>>>>> are >>>>>>>>>>>>> already >>>>>>>>>>>>>>>> covered by KIP-994? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> 13. TransactionalLogValue: Could we name >>>>>>>>> TransactionProducerId >>>>>>>>>>> and >>>>>>>>>>>>>>>> ProducerId better? It's not clear from the name >>>> which >>>>>> is >>>>>>>> for >>>>>>>>>>> which. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> 14. "Note that the (producerId, epoch) pair >> that >>>>>>>> corresponds >>>>>>>>> to >>>>>>>>>>> the >>>>>>>>>>>>>>> ongoing >>>>>>>>>>>>>>>> transaction is going to be written instead of >> the >>>>>>> existing >>>>>>>>>>>> ProducerId >>>>>>>>>>>>>> and >>>>>>>>>>>>>>>> ProducerEpoch fields (which are renamed to >>> reflect >>>>> the >>>>>>>>>> semantics) >>>>>>>>>>>> to >>>>>>>>>>>>>>>> support downgrade.": I am a bit confused on >> that. >>>> Are >>>>>> we >>>>>>>>>> writing >>>>>>>>>>>>>>> different >>>>>>>>>>>>>>>> values to the existing fields? Then, we can't >>>>>> downgrade, >>>>>>>>> right? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> 15. active-transaction-total-time-max : Would >>>>>>>>>>>>>>>> active-transaction-open-time-max be more >>> intuitive? >>>>>> Also, >>>>>>>>> could >>>>>>>>>>> we >>>>>>>>>>>>>>> include >>>>>>>>>>>>>>>> the full name (group, tags, etc)? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> 16. "transaction.two.phase.commit.enable The >>>> default >>>>>>> would >>>>>>>> be >>>>>>>>>>>>> ‘false’. >>>>>>>>>>>>>>> If >>>>>>>>>>>>>>>> it’s ‘false’, 2PC functionality is disabled >> even >>> if >>>>> the >>>>>>> ACL >>>>>>>>> is >>>>>>>>>>> set, >>>>>>>>>>>>>>> clients >>>>>>>>>>>>>>>> that attempt to use this functionality would >>>> receive >>>>>>>>>>>>>>>> TRANSACTIONAL_ID_AUTHORIZATION_FAILED error." >>>>>>>>>>>>>>>> TRANSACTIONAL_ID_AUTHORIZATION_FAILED seems >>>>> unintuitive >>>>>>> for >>>>>>>>> the >>>>>>>>>>>>> client >>>>>>>>>>>>>> to >>>>>>>>>>>>>>>> understand what the actual cause is. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> 17. completeTransaction(). We expect this to be >>>> only >>>>>> used >>>>>>>>>> during >>>>>>>>>>>>>>> recovery. >>>>>>>>>>>>>>>> Could we document this clearly? Could we >> prevent >>> it >>>>>> from >>>>>>>>> being >>>>>>>>>>> used >>>>>>>>>>>>>>>> incorrectly (e.g. throw an exception if the >>>> producer >>>>>> has >>>>>>>>> called >>>>>>>>>>>> other >>>>>>>>>>>>>>>> methods like send())? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> 18. "either prepareTransaction was called or >>>>>>>>>>> initTransaction(true) >>>>>>>>>>>>> was >>>>>>>>>>>>>>>> called": "either" should be "neither"? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> 19. Since InitProducerId always bumps up the >>> epoch, >>>>> it >>>>>>>>> creates >>>>>>>>>> a >>>>>>>>>>>>>>> situation >>>>>>>>>>>>>>>> where there could be multiple outstanding txns. >>> The >>>>>>>> following >>>>>>>>>> is >>>>>>>>>>> an >>>>>>>>>>>>>>> example >>>>>>>>>>>>>>>> of a potential problem during recovery. >>>>>>>>>>>>>>>> The last txn epoch in the external store is >> 41 >>>>> when >>>>>>> the >>>>>>>>> app >>>>>>>>>>>> dies. >>>>>>>>>>>>>>>> Instance1 is created for recovery. >>>>>>>>>>>>>>>> 1. (instance1) >>>>>> InitProducerId(keepPreparedTxn=true), >>>>>>>>>>> epoch=42, >>>>>>>>>>>>>>>> ongoingEpoch=41 >>>>>>>>>>>>>>>> 2. (instance1) dies before completeTxn(41) >>> can >>>>> be >>>>>>>>> called. >>>>>>>>>>>>>>>> Instance2 is created for recovery. >>>>>>>>>>>>>>>> 3. (instance2) >>>>>> InitProducerId(keepPreparedTxn=true), >>>>>>>>>>> epoch=43, >>>>>>>>>>>>>>>> ongoingEpoch=42 >>>>>>>>>>>>>>>> 4. (instance2) completeTxn(41) => abort >>>>>>>>>>>>>>>> The first problem is that 41 now is aborted >>> when >>>>> it >>>>>>>> should >>>>>>>>>> be >>>>>>>>>>>>>>> committed. >>>>>>>>>>>>>>>> The second one is that it's not clear who could >>>> abort >>>>>>> epoch >>>>>>>>> 42, >>>>>>>>>>>> which >>>>>>>>>>>>>> is >>>>>>>>>>>>>>>> still open. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Jun >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Thu, Dec 7, 2023 at 2:43 PM Justine Olshan >>>>>>>>>>>>>>> <jols...@confluent.io.invalid >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Hey Artem, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Thanks for the updates. I think what you say >>>> makes >>>>>>>> sense. I >>>>>>>>>>> just >>>>>>>>>>>>>>> updated >>>>>>>>>>>>>>>> my >>>>>>>>>>>>>>>>> KIP so I want to reconcile some of the >> changes >>> we >>>>>> made >>>>>>>>>>> especially >>>>>>>>>>>>>> with >>>>>>>>>>>>>>>>> respect to the TransactionLogValue. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Firstly, I believe tagged fields require a >>>> default >>>>>>> value >>>>>>>> so >>>>>>>>>>> that >>>>>>>>>>>> if >>>>>>>>>>>>>>> they >>>>>>>>>>>>>>>>> are not filled, we return the default (and >> know >>>>> that >>>>>>> they >>>>>>>>>> were >>>>>>>>>>>>>> empty). >>>>>>>>>>>>>>>> For >>>>>>>>>>>>>>>>> my KIP, I proposed the default for producer >> ID >>>>> tagged >>>>>>>>> fields >>>>>>>>>>>> should >>>>>>>>>>>>>> be >>>>>>>>>>>>>>>> -1. >>>>>>>>>>>>>>>>> I was wondering if we could update the KIP to >>>>> include >>>>>>> the >>>>>>>>>>> default >>>>>>>>>>>>>>> values >>>>>>>>>>>>>>>>> for producer ID and epoch. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Next, I noticed we decided to rename the >>> fields. >>>> I >>>>>>> guess >>>>>>>>> that >>>>>>>>>>> the >>>>>>>>>>>>>> field >>>>>>>>>>>>>>>>> "NextProducerId" in my KIP correlates to >>>>> "ProducerId" >>>>>>> in >>>>>>>>> this >>>>>>>>>>>> KIP. >>>>>>>>>>>>> Is >>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>> correct? So we would have >>> "TransactionProducerId" >>>>> for >>>>>>> the >>>>>>>>>>>>> non-tagged >>>>>>>>>>>>>>>> field >>>>>>>>>>>>>>>>> and have "ProducerId" (NextProducerId) and >>>>>>>> "PrevProducerId" >>>>>>>>>> as >>>>>>>>>>>>> tagged >>>>>>>>>>>>>>>>> fields the final version after KIP-890 and >>>> KIP-936 >>>>>> are >>>>>>>>>>>> implemented. >>>>>>>>>>>>>> Is >>>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>> correct? I think the tags will need updating, >>> but >>>>>> that >>>>>>> is >>>>>>>>>>>> trivial. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> The final question I had was with respect to >>>>> storing >>>>>>> the >>>>>>>>> new >>>>>>>>>>>> epoch. >>>>>>>>>>>>>> In >>>>>>>>>>>>>>>>> KIP-890 part 2 (epoch bumps) I think we >>> concluded >>>>>> that >>>>>>> we >>>>>>>>>> don't >>>>>>>>>>>>> need >>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>> store the epoch since we can interpret the >>>> previous >>>>>>> epoch >>>>>>>>>> based >>>>>>>>>>>> on >>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>> producer ID. But here we could call the >>>>>> InitProducerId >>>>>>>>>> multiple >>>>>>>>>>>>> times >>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>> we only want the producer with the correct >>> epoch >>>> to >>>>>> be >>>>>>>> able >>>>>>>>>> to >>>>>>>>>>>>> commit >>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>> transaction. Is that the correct reasoning >> for >>>> why >>>>> we >>>>>>>> need >>>>>>>>>>> epoch >>>>>>>>>>>>> here >>>>>>>>>>>>>>> but >>>>>>>>>>>>>>>>> not the Prepare/Commit state. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>> Justine >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Wed, Nov 22, 2023 at 9:48 AM Artem >> Livshits >>>>>>>>>>>>>>>>> <alivsh...@confluent.io.invalid> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Hi Justine, >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> After thinking a bit about supporting >> atomic >>>> dual >>>>>>>> writes >>>>>>>>>> for >>>>>>>>>>>>> Kafka >>>>>>>>>>>>>> + >>>>>>>>>>>>>>>>> NoSQL >>>>>>>>>>>>>>>>>> database, I came to a conclusion that we do >>>> need >>>>> to >>>>>>>> bump >>>>>>>>>> the >>>>>>>>>>>>> epoch >>>>>>>>>>>>>>> even >>>>>>>>>>>>>>>>>> with InitProducerId(keepPreparedTxn=true). >>> As >>>> I >>>>>>>>> described >>>>>>>>>> in >>>>>>>>>>>> my >>>>>>>>>>>>>>>> previous >>>>>>>>>>>>>>>>>> email, we wouldn't need to bump the epoch >> to >>>>>> protect >>>>>>>> from >>>>>>>>>>>> zombies >>>>>>>>>>>>>> so >>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>> reasoning is still true. But we cannot >>> protect >>>>>> from >>>>>>>>>>>> split-brain >>>>>>>>>>>>>>>>> scenarios >>>>>>>>>>>>>>>>>> when two or more instances of a producer >> with >>>> the >>>>>>> same >>>>>>>>>>>>>> transactional >>>>>>>>>>>>>>> id >>>>>>>>>>>>>>>>> try >>>>>>>>>>>>>>>>>> to produce at the same time. The >> dual-write >>>>>> example >>>>>>>> for >>>>>>>>>> SQL >>>>>>>>>>>>>>> databases >>>>>>>>>>>>>>>> ( >>>>>>>>>>>>>>>>>> >>>> https://github.com/apache/kafka/pull/14231/files >>>>> ) >>>>>>>>> doesn't >>>>>>>>>>>> have a >>>>>>>>>>>>>>>>>> split-brain problem because execution is >>>>> protected >>>>>> by >>>>>>>> the >>>>>>>>>>>> update >>>>>>>>>>>>>> lock >>>>>>>>>>>>>>>> on >>>>>>>>>>>>>>>>>> the transaction state record; however NoSQL >>>>>> databases >>>>>>>> may >>>>>>>>>> not >>>>>>>>>>>>> have >>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>>> protection (I'll write an example for NoSQL >>>>>> database >>>>>>>>>>> dual-write >>>>>>>>>>>>>>> soon). >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> In a nutshell, here is an example of a >>>>> split-brain >>>>>>>>>> scenario: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> 1. (instance1) >>>>>>> InitProducerId(keepPreparedTxn=true), >>>>>>>>> got >>>>>>>>>>>>>> epoch=42 >>>>>>>>>>>>>>>>>> 2. (instance2) >>>>>>> InitProducerId(keepPreparedTxn=true), >>>>>>>>> got >>>>>>>>>>>>>> epoch=42 >>>>>>>>>>>>>>>>>> 3. (instance1) CommitTxn, epoch bumped >> to >>> 43 >>>>>>>>>>>>>>>>>> 4. (instance2) CommitTxn, this is >>>> considered a >>>>>>>> retry, >>>>>>>>> so >>>>>>>>>>> it >>>>>>>>>>>>> got >>>>>>>>>>>>>>>> epoch >>>>>>>>>>>>>>>>> 43 >>>>>>>>>>>>>>>>>> as well >>>>>>>>>>>>>>>>>> 5. (instance1) Produce messageA >>> w/sequence 1 >>>>>>>>>>>>>>>>>> 6. (instance2) Produce messageB >> w/sequence >>>> 1, >>>>>> this >>>>>>>> is >>>>>>>>>>>>>> considered a >>>>>>>>>>>>>>>>>> duplicate >>>>>>>>>>>>>>>>>> 7. (instance2) Produce messageC >>> w/sequence 2 >>>>>>>>>>>>>>>>>> 8. (instance1) Produce messageD >> w/sequence >>>> 2, >>>>>> this >>>>>>>> is >>>>>>>>>>>>>> considered a >>>>>>>>>>>>>>>>>> duplicate >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Now if either of those commit the >>> transaction, >>>> it >>>>>>> would >>>>>>>>>> have >>>>>>>>>>> a >>>>>>>>>>>>> mix >>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>> messages from the two instances (messageA >> and >>>>>>>> messageC). >>>>>>>>>>> With >>>>>>>>>>>>> the >>>>>>>>>>>>>>>> proper >>>>>>>>>>>>>>>>>> epoch bump, instance1 would get fenced at >>> step >>>> 3. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> In order to update epoch in >>>>>>>>>>>> InitProducerId(keepPreparedTxn=true) >>>>>>>>>>>>> we >>>>>>>>>>>>>>>> need >>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>> preserve the ongoing transaction's epoch >> (and >>>>>>>> producerId, >>>>>>>>>> if >>>>>>>>>>>> the >>>>>>>>>>>>>>> epoch >>>>>>>>>>>>>>>>>> overflows), because we'd need to make a >>> correct >>>>>>>> decision >>>>>>>>>> when >>>>>>>>>>>> we >>>>>>>>>>>>>>>> compare >>>>>>>>>>>>>>>>>> the PreparedTxnState that we read from the >>>>> database >>>>>>>> with >>>>>>>>>> the >>>>>>>>>>>>>>>> (producerId, >>>>>>>>>>>>>>>>>> epoch) of the ongoing transaction. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> I've updated the KIP with the following: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> - Ongoing transaction now has 2 >>> (producerId, >>>>>>> epoch) >>>>>>>>>> pairs >>>>>>>>>>> -- >>>>>>>>>>>>> one >>>>>>>>>>>>>>>> pair >>>>>>>>>>>>>>>>>> describes the ongoing transaction, the >>> other >>>>>> pair >>>>>>>>>>> describes >>>>>>>>>>>>>>> expected >>>>>>>>>>>>>>>>>> epoch >>>>>>>>>>>>>>>>>> for operations on this transactional id >>>>>>>>>>>>>>>>>> - InitProducerIdResponse now returns 2 >>>>>>> (producerId, >>>>>>>>>> epoch) >>>>>>>>>>>>> pairs >>>>>>>>>>>>>>>>>> - TransactionalLogValue now has 2 >>>> (producerId, >>>>>>>> epoch) >>>>>>>>>>> pairs, >>>>>>>>>>>>> the >>>>>>>>>>>>>>> new >>>>>>>>>>>>>>>>>> values added as tagged fields, so it's >>> easy >>>> to >>>>>>>>> downgrade >>>>>>>>>>>>>>>>>> - Added a note about downgrade in the >>>>>>> Compatibility >>>>>>>>>>> section >>>>>>>>>>>>>>>>>> - Added a rejected alternative >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> -Artem >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Fri, Oct 6, 2023 at 5:16 PM Artem >>> Livshits < >>>>>>>>>>>>>>> alivsh...@confluent.io> >>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Hi Justine, >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Thank you for the questions. Currently >>>>>>> (pre-KIP-939) >>>>>>>>> we >>>>>>>>>>>> always >>>>>>>>>>>>>>> bump >>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>> epoch on InitProducerId and abort an >>> ongoing >>>>>>>>> transaction >>>>>>>>>>> (if >>>>>>>>>>>>>>> any). I >>>>>>>>>>>>>>>>>>> expect this behavior will continue with >>>> KIP-890 >>>>>> as >>>>>>>>> well. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> With KIP-939 we need to support the case >>> when >>>>> the >>>>>>>>> ongoing >>>>>>>>>>>>>>> transaction >>>>>>>>>>>>>>>>>>> needs to be preserved when >>>>> keepPreparedTxn=true. >>>>>>>>> Bumping >>>>>>>>>>>> epoch >>>>>>>>>>>>>>>> without >>>>>>>>>>>>>>>>>>> aborting or committing a transaction is >>>> tricky >>>>>>>> because >>>>>>>>>>> epoch >>>>>>>>>>>>> is a >>>>>>>>>>>>>>>> short >>>>>>>>>>>>>>>>>>> value and it's easy to overflow. >>> Currently, >>>>> the >>>>>>>>> overflow >>>>>>>>>>>> case >>>>>>>>>>>>> is >>>>>>>>>>>>>>>>> handled >>>>>>>>>>>>>>>>>>> by aborting the ongoing transaction, >> which >>>>> would >>>>>>> send >>>>>>>>> out >>>>>>>>>>>>>>> transaction >>>>>>>>>>>>>>>>>>> markers with epoch=Short.MAX_VALUE to the >>>>>> partition >>>>>>>>>>> leaders, >>>>>>>>>>>>>> which >>>>>>>>>>>>>>>>> would >>>>>>>>>>>>>>>>>>> fence off any messages with the producer >> id >>>>> that >>>>>>>>> started >>>>>>>>>>> the >>>>>>>>>>>>>>>>> transaction >>>>>>>>>>>>>>>>>>> (they would have epoch that is less than >>>>>>>>>> Short.MAX_VALUE). >>>>>>>>>>>>> Then >>>>>>>>>>>>>> it >>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>> safe >>>>>>>>>>>>>>>>>>> to allocate a new producer id and use it >> in >>>> new >>>>>>>>>>> transactions. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> We could say that maybe when >>>>> keepPreparedTxn=true >>>>>>> we >>>>>>>>> bump >>>>>>>>>>>> epoch >>>>>>>>>>>>>>>> unless >>>>>>>>>>>>>>>>> it >>>>>>>>>>>>>>>>>>> leads to overflow, and don't bump epoch >> in >>>> the >>>>>>>> overflow >>>>>>>>>>> case. >>>>>>>>>>>>> I >>>>>>>>>>>>>>>> don't >>>>>>>>>>>>>>>>>>> think it's a good solution because if >> it's >>>> not >>>>>> safe >>>>>>>> to >>>>>>>>>> keep >>>>>>>>>>>> the >>>>>>>>>>>>>>> same >>>>>>>>>>>>>>>>>> epoch >>>>>>>>>>>>>>>>>>> when keepPreparedTxn=true, then we must >>>> handle >>>>>> the >>>>>>>>> epoch >>>>>>>>>>>>> overflow >>>>>>>>>>>>>>>> case >>>>>>>>>>>>>>>>> as >>>>>>>>>>>>>>>>>>> well. So either we should convince >>> ourselves >>>>>> that >>>>>>>> it's >>>>>>>>>>> safe >>>>>>>>>>>> to >>>>>>>>>>>>>>> keep >>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>> epoch and do it in the general case, or >> we >>>>> always >>>>>>>> bump >>>>>>>>>> the >>>>>>>>>>>>> epoch >>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>> handle >>>>>>>>>>>>>>>>>>> the overflow. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> With KIP-890, we bump the epoch on every >>>>>>> transaction >>>>>>>>>>> commit / >>>>>>>>>>>>>>> abort. >>>>>>>>>>>>>>>>>> This >>>>>>>>>>>>>>>>>>> guarantees that even if >>>>>>>>>>> InitProducerId(keepPreparedTxn=true) >>>>>>>>>>>>>>> doesn't >>>>>>>>>>>>>>>>>>> increment epoch on the ongoing >> transaction, >>>> the >>>>>>>> client >>>>>>>>>> will >>>>>>>>>>>>> have >>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>> call >>>>>>>>>>>>>>>>>>> commit or abort to finish the transaction >>> and >>>>>> will >>>>>>>>>>> increment >>>>>>>>>>>>> the >>>>>>>>>>>>>>>> epoch >>>>>>>>>>>>>>>>>> (and >>>>>>>>>>>>>>>>>>> handle epoch overflow, if needed). If >> the >>>>>> ongoing >>>>>>>>>>>> transaction >>>>>>>>>>>>>> was >>>>>>>>>>>>>>>> in a >>>>>>>>>>>>>>>>>> bad >>>>>>>>>>>>>>>>>>> state and had some zombies waiting to >>> arrive, >>>>> the >>>>>>>> abort >>>>>>>>>>>>> operation >>>>>>>>>>>>>>>> would >>>>>>>>>>>>>>>>>>> fence them because with KIP-890 every >> abort >>>>> would >>>>>>>> bump >>>>>>>>>> the >>>>>>>>>>>>> epoch. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> We could also look at this from the >>> following >>>>>>>>>> perspective. >>>>>>>>>>>>> With >>>>>>>>>>>>>>>>> KIP-890, >>>>>>>>>>>>>>>>>>> zombies won't be able to cross >> transaction >>>>>>>> boundaries; >>>>>>>>>> each >>>>>>>>>>>>>>>> transaction >>>>>>>>>>>>>>>>>>> completion creates a boundary and any >>>> activity >>>>> in >>>>>>> the >>>>>>>>>> past >>>>>>>>>>>> gets >>>>>>>>>>>>>>>>> confined >>>>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>>> the boundary. Then data in any partition >>>> would >>>>>>> look >>>>>>>>> like >>>>>>>>>>>> this: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> 1. message1, epoch=42 >>>>>>>>>>>>>>>>>>> 2. message2, epoch=42 >>>>>>>>>>>>>>>>>>> 3. message3, epoch=42 >>>>>>>>>>>>>>>>>>> 4. marker (commit or abort), epoch=43 >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Now if we inject steps 3a and 3b like >> this: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> 1. message1, epoch=42 >>>>>>>>>>>>>>>>>>> 2. message2, epoch=42 >>>>>>>>>>>>>>>>>>> 3. message3, epoch=42 >>>>>>>>>>>>>>>>>>> 3a. crash >>>>>>>>>>>>>>>>>>> 3b. InitProducerId(keepPreparedTxn=true) >>>>>>>>>>>>>>>>>>> 4. marker (commit or abort), epoch=43 >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> The invariant still holds even with steps >>> 3a >>>>> and >>>>>> 3b >>>>>>>> -- >>>>>>>>>>>> whatever >>>>>>>>>>>>>>>>> activity >>>>>>>>>>>>>>>>>>> was in the past will get confined in the >>> past >>>>>> with >>>>>>>>>>> mandatory >>>>>>>>>>>>>> abort >>>>>>>>>>>>>>> / >>>>>>>>>>>>>>>>>> commit >>>>>>>>>>>>>>>>>>> that must follow >>>>>>>> InitProducerId(keepPreparedTxn=true). >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> So KIP-890 provides the proper isolation >>>>> between >>>>>>>>>>>> transactions, >>>>>>>>>>>>> so >>>>>>>>>>>>>>>>>>> injecting crash + >>>>>>>> InitProducerId(keepPreparedTxn=true) >>>>>>>>>> into >>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>> transaction sequence is safe from the >>> zombie >>>>>>>> protection >>>>>>>>>>>>>>> perspective. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> That said, I'm still thinking about it >> and >>>>>> looking >>>>>>>> for >>>>>>>>>>> cases >>>>>>>>>>>>> that >>>>>>>>>>>>>>>> might >>>>>>>>>>>>>>>>>>> break because we don't bump epoch when >>>>>>>>>>>>>>>>>>> InitProducerId(keepPreparedTxn=true), if >>> such >>>>>> cases >>>>>>>>>> exist, >>>>>>>>>>>>> we'll >>>>>>>>>>>>>>> need >>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>> develop the logic to handle epoch >> overflow >>>> for >>>>>>>> ongoing >>>>>>>>>>>>>>> transactions. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> -Artem >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On Tue, Oct 3, 2023 at 10:15 AM Justine >>>> Olshan >>>>>>>>>>>>>>>>>>> <jols...@confluent.io.invalid> wrote: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Hey Artem, >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Thanks for the KIP. I had a question >> about >>>>> epoch >>>>>>>>>> bumping. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Previously when we send an >> InitProducerId >>>>>> request >>>>>>> on >>>>>>>>>>>> Producer >>>>>>>>>>>>>>>> startup, >>>>>>>>>>>>>>>>>> we >>>>>>>>>>>>>>>>>>>> bump the epoch and abort the >> transaction. >>> Is >>>>> it >>>>>>>>> correct >>>>>>>>>> to >>>>>>>>>>>>>> assume >>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>> we >>>>>>>>>>>>>>>>>>>> will still bump the epoch, but just not >>>> abort >>>>>> the >>>>>>>>>>>> transaction? >>>>>>>>>>>>>>>>>>>> If we still bump the epoch in this case, >>> how >>>>>> does >>>>>>>> this >>>>>>>>>>>>> interact >>>>>>>>>>>>>>> with >>>>>>>>>>>>>>>>>>>> KIP-890 where we also bump the epoch on >>>> every >>>>>>>>>> transaction. >>>>>>>>>>>> (I >>>>>>>>>>>>>>> think >>>>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>>>>> means that we may skip epochs and the >> data >>>>>> itself >>>>>>>> will >>>>>>>>>> all >>>>>>>>>>>>> have >>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>> same >>>>>>>>>>>>>>>>>>>> epoch) >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> I may have follow ups depending on the >>>> answer >>>>> to >>>>>>>> this. >>>>>>>>>> :) >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>>>> Justine >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> On Thu, Sep 7, 2023 at 9:51 PM Artem >>>> Livshits >>>>>>>>>>>>>>>>>>>> <alivsh...@confluent.io.invalid> wrote: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Hi Alex, >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Thank you for your questions. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> the purpose of having broker-level >>>>>>>>>>>>>>>>>> transaction.two.phase.commit.enable >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> The thinking is that 2PC is a bit of >> an >>>>>> advanced >>>>>>>>>>> construct >>>>>>>>>>>>> so >>>>>>>>>>>>>>>>> enabling >>>>>>>>>>>>>>>>>>>> 2PC >>>>>>>>>>>>>>>>>>>>> in a Kafka cluster should be an >> explicit >>>>>>> decision. >>>>>>>>> If >>>>>>>>>>> it >>>>>>>>>>>> is >>>>>>>>>>>>>> set >>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>> 'false' >>>>>>>>>>>>>>>>>>>>> InitiProducerId (and initTransactions) >>>> would >>>>>>>>>>>>>>>>>>>>> return >>>>> TRANSACTIONAL_ID_AUTHORIZATION_FAILED. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> WDYT about adding an AdminClient >>> method >>>>> that >>>>>>>>> returns >>>>>>>>>>> the >>>>>>>>>>>>>> state >>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>>> transaction.two.phase.commit.enable >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> I wonder if the client could just try >> to >>>> use >>>>>> 2PC >>>>>>>> and >>>>>>>>>>> then >>>>>>>>>>>>>> handle >>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>> error >>>>>>>>>>>>>>>>>>>>> (e.g. if it needs to fall back to >>> ordinary >>>>>>>>>>> transactions). >>>>>>>>>>>>>> This >>>>>>>>>>>>>>>> way >>>>>>>>>>>>>>>>> it >>>>>>>>>>>>>>>>>>>>> could uniformly handle cases when >> Kafka >>>>>> cluster >>>>>>>>>> doesn't >>>>>>>>>>>>>> support >>>>>>>>>>>>>>>> 2PC >>>>>>>>>>>>>>>>>>>>> completely and cases when 2PC is >>>> restricted >>>>> to >>>>>>>>> certain >>>>>>>>>>>>> users. >>>>>>>>>>>>>>> We >>>>>>>>>>>>>>>>>> could >>>>>>>>>>>>>>>>>>>>> also expose this config in >>>> describeConfigs, >>>>> if >>>>>>> the >>>>>>>>>>>> fallback >>>>>>>>>>>>>>>> approach >>>>>>>>>>>>>>>>>>>>> doesn't work for some scenarios. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> -Artem >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> On Tue, Sep 5, 2023 at 12:45 PM >>> Alexander >>>>>>>> Sorokoumov >>>>>>>>>>>>>>>>>>>>> <asorokou...@confluent.io.invalid> >>> wrote: >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Hi Artem, >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Thanks for publishing this KIP! >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Can you please clarify the purpose >> of >>>>> having >>>>>>>>>>>> broker-level >>>>>>>>>>>>>>>>>>>>>> transaction.two.phase.commit.enable >>>> config >>>>>> in >>>>>>>>>> addition >>>>>>>>>>>> to >>>>>>>>>>>>>> the >>>>>>>>>>>>>>>> new >>>>>>>>>>>>>>>>>>>> ACL? If >>>>>>>>>>>>>>>>>>>>>> the brokers are configured with >>>>>>>>>>>>>>>>>>>>> >>> transaction.two.phase.commit.enable=false, >>>>>>>>>>>>>>>>>>>>>> at what point will a client >> configured >>>>> with >>>>>>>>>>>>>>>>>>>>>> >>> transaction.two.phase.commit.enable=true >>>>>> fail? >>>>>>>>> Will >>>>>>>>>> it >>>>>>>>>>>>>> happen >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>>>>>> KafkaProducer#initTransactions? >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> WDYT about adding an AdminClient >>> method >>>>> that >>>>>>>>> returns >>>>>>>>>>> the >>>>>>>>>>>>>> state >>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>> t >>>>>>>>>>>>>>>>>>>>>> ransaction.two.phase.commit.enable? >>> This >>>>>> way, >>>>>>>>>> clients >>>>>>>>>>>>> would >>>>>>>>>>>>>>> know >>>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>>>>> advance >>>>>>>>>>>>>>>>>>>>>> if 2PC is enabled on the brokers. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>>>>>>>> Alex >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> On Fri, Aug 25, 2023 at 9:40 AM >> Roger >>>>>> Hoover < >>>>>>>>>>>>>>>>>> roger.hoo...@gmail.com> >>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Other than supporting multiplexing >>>>>>>> transactional >>>>>>>>>>>> streams >>>>>>>>>>>>>> on >>>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>>> single >>>>>>>>>>>>>>>>>>>>>>> producer, I don't see how to >> improve >>>> it. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> On Thu, Aug 24, 2023 at 12:12 PM >>> Artem >>>>>>>> Livshits >>>>>>>>>>>>>>>>>>>>>>> <alivsh...@confluent.io.invalid> >>>> wrote: >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Hi Roger, >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Thank you for summarizing the >>>> cons. I >>>>>>> agree >>>>>>>>> and >>>>>>>>>>> I'm >>>>>>>>>>>>>>> curious >>>>>>>>>>>>>>>>>> what >>>>>>>>>>>>>>>>>>>>> would >>>>>>>>>>>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>>>>>>>>> the alternatives to solve these >>>>> problems >>>>>>>>> better >>>>>>>>>>> and >>>>>>>>>>>> if >>>>>>>>>>>>>>> they >>>>>>>>>>>>>>>>> can >>>>>>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>>>>>>>>> incorporated into this proposal >>> (or >>>>>> built >>>>>>>>>>>>> independently >>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>>>> addition >>>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>> or >>>>>>>>>>>>>>>>>>>>>>>> on top of this proposal). E.g. >>> one >>>>>>>> potential >>>>>>>>>>>>> extension >>>>>>>>>>>>>> we >>>>>>>>>>>>>>>>>>>> discussed >>>>>>>>>>>>>>>>>>>>>>>> earlier in the thread could be >>>>>>> multiplexing >>>>>>>>>>> logical >>>>>>>>>>>>>>>>>> transactional >>>>>>>>>>>>>>>>>>>>>>> "streams" >>>>>>>>>>>>>>>>>>>>>>>> with a single producer. >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> -Artem >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> On Wed, Aug 23, 2023 at 4:50 PM >>>> Roger >>>>>>>> Hoover < >>>>>>>>>>>>>>>>>>>> roger.hoo...@gmail.com >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Thanks. I like that you're >>> moving >>>>>> Kafka >>>>>>>>>> toward >>>>>>>>>>>>>>> supporting >>>>>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>>>>>>>>> dual-write >>>>>>>>>>>>>>>>>>>>>>>>> pattern. Each use case needs >> to >>>>>>> consider >>>>>>>>> the >>>>>>>>>>>>>> tradeoffs. >>>>>>>>>>>>>>>>> You >>>>>>>>>>>>>>>>>>>>> already >>>>>>>>>>>>>>>>>>>>>>>>> summarized the pros very well >> in >>>> the >>>>>>>> KIP. I >>>>>>>>>>> would >>>>>>>>>>>>>>>> summarize >>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>> cons >>>>>>>>>>>>>>>>>>>>>>>>> as follows: >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> - you sacrifice availability - >>>> each >>>>>>> write >>>>>>>>>>> requires >>>>>>>>>>>>>> both >>>>>>>>>>>>>>> DB >>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>> Kafka >>>>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>>>>>>>>>> available so I think your >>> overall >>>>>>>>> application >>>>>>>>>>>>>>> availability >>>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>> 1 >>>>>>>>>>>>>>>>>>>> - >>>>>>>>>>>>>>>>>>>>>> p(DB >>>>>>>>>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>>>>>>>> unavailable)*p(Kafka is >>>>> unavailable). >>>>>>>>>>>>>>>>>>>>>>>>> - latency will be higher and >>>>>> throughput >>>>>>>>> lower >>>>>>>>>> - >>>>>>>>>>>> each >>>>>>>>>>>>>>> write >>>>>>>>>>>>>>>>>>>> requires >>>>>>>>>>>>>>>>>>>>>>> both >>>>>>>>>>>>>>>>>>>>>>>>> writes to DB and Kafka while >>>> holding >>>>>> an >>>>>>>>>>> exclusive >>>>>>>>>>>>> lock >>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>> DB. >>>>>>>>>>>>>>>>>>>>>>>>> - you need to create a >> producer >>>> per >>>>>> unit >>>>>>>> of >>>>>>>>>>>>>> concurrency >>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>> your >>>>>>>>>>>>>>>>>>>> app >>>>>>>>>>>>>>>>>>>>>>> which >>>>>>>>>>>>>>>>>>>>>>>>> has some overhead in the app >> and >>>>> Kafka >>>>>>>> side >>>>>>>>>>>> (number >>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>> connections, >>>>>>>>>>>>>>>>>>>>>>> poor >>>>>>>>>>>>>>>>>>>>>>>>> batching). I assume the >>> producers >>>>>> would >>>>>>>>> need >>>>>>>>>> to >>>>>>>>>>>> be >>>>>>>>>>>>>>>>> configured >>>>>>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>>>>> low >>>>>>>>>>>>>>>>>>>>>>>>> latency (linger.ms=0) >>>>>>>>>>>>>>>>>>>>>>>>> - there's some complexity in >>>>> managing >>>>>>>> stable >>>>>>>>>>>>>>> transactional >>>>>>>>>>>>>>>>> ids >>>>>>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>>>>> each >>>>>>>>>>>>>>>>>>>>>>>>> producer/concurrency unit in >>> your >>>>>>>>> application. >>>>>>>>>>>> With >>>>>>>>>>>>>> k8s >>>>>>>>>>>>>>>>>>>>> deployment, >>>>>>>>>>>>>>>>>>>>>>> you >>>>>>>>>>>>>>>>>>>>>>>>> may need to switch to >> something >>>>> like a >>>>>>>>>>> StatefulSet >>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>> gives >>>>>>>>>>>>>>>>>>>> each >>>>>>>>>>>>>>>>>>>>>> pod >>>>>>>>>>>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>>>>>>>>>> stable identity across >> restarts. >>>> On >>>>>> top >>>>>>>> of >>>>>>>>>> that >>>>>>>>>>>> pod >>>>>>>>>>>>>>>>> identity >>>>>>>>>>>>>>>>>>>> which >>>>>>>>>>>>>>>>>>>>>> you >>>>>>>>>>>>>>>>>>>>>>>> can >>>>>>>>>>>>>>>>>>>>>>>>> use as a prefix, you then >> assign >>>>>> unique >>>>>>>>>>>>> transactional >>>>>>>>>>>>>>> ids >>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>> each >>>>>>>>>>>>>>>>>>>>>>>>> concurrency unit >>>> (thread/goroutine). >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Aug 23, 2023 at >> 12:53 PM >>>>> Artem >>>>>>>>>> Livshits >>>>>>>>>>>>>>>>>>>>>>>>> <alivsh...@confluent.io >>> .invalid> >>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> Hi Roger, >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> Thank you for the feedback. >>> You >>>>>> make >>>>>>> a >>>>>>>>> very >>>>>>>>>>>> good >>>>>>>>>>>>>>> point >>>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>> we >>>>>>>>>>>>>>>>>>>>>> also >>>>>>>>>>>>>>>>>>>>>>>>>> discussed internally. >> Adding >>>>>> support >>>>>>>> for >>>>>>>>>>>> multiple >>>>>>>>>>>>>>>>>> concurrent >>>>>>>>>>>>>>>>>>>>>>>>>> transactions in one producer >>>> could >>>>>> be >>>>>>>>>> valuable >>>>>>>>>>>> but >>>>>>>>>>>>>> it >>>>>>>>>>>>>>>>> seems >>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>> be a >>>>>>>>>>>>>>>>>>>>>>>>> fairly >>>>>>>>>>>>>>>>>>>>>>>>>> large and independent change >>>> that >>>>>>> would >>>>>>>>>>> deserve >>>>>>>>>>>> a >>>>>>>>>>>>>>>> separate >>>>>>>>>>>>>>>>>>>> KIP. >>>>>>>>>>>>>>>>>>>>> If >>>>>>>>>>>>>>>>>>>>>>>> such >>>>>>>>>>>>>>>>>>>>>>>>>> support is added we could >>> modify >>>>> 2PC >>>>>>>>>>>> functionality >>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>> incorporate >>>>>>>>>>>>>>>>>>>>>>> that. >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> Maybe not too bad but a >> bit >>> of >>>>>> pain >>>>>>> to >>>>>>>>>>> manage >>>>>>>>>>>>>> these >>>>>>>>>>>>>>>> ids >>>>>>>>>>>>>>>>>>>> inside >>>>>>>>>>>>>>>>>>>>>> each >>>>>>>>>>>>>>>>>>>>>>>>>> process and across all >>>> application >>>>>>>>>> processes. >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> I'm not sure if supporting >>>>> multiple >>>>>>>>>>> transactions >>>>>>>>>>>>> in >>>>>>>>>>>>>>> one >>>>>>>>>>>>>>>>>>>> producer >>>>>>>>>>>>>>>>>>>>>>> would >>>>>>>>>>>>>>>>>>>>>>>>> make >>>>>>>>>>>>>>>>>>>>>>>>>> id management simpler: we'd >>> need >>>>> to >>>>>>>> store >>>>>>>>> a >>>>>>>>>>>> piece >>>>>>>>>>>>> of >>>>>>>>>>>>>>>> data >>>>>>>>>>>>>>>>>> per >>>>>>>>>>>>>>>>>>>>>>>>> transaction, >>>>>>>>>>>>>>>>>>>>>>>>>> so whether it's N producers >>>> with a >>>>>>>> single >>>>>>>>>>>>>> transaction >>>>>>>>>>>>>>>> or N >>>>>>>>>>>>>>>>>>>>>>> transactions >>>>>>>>>>>>>>>>>>>>>>>>>> with a single producer, it's >>>> still >>>>>>>> roughly >>>>>>>>>> the >>>>>>>>>>>>> same >>>>>>>>>>>>>>>> amount >>>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>>> data >>>>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>>>>> manage. In fact, managing >>>>>>> transactional >>>>>>>>> ids >>>>>>>>>>>>>> (current >>>>>>>>>>>>>>>>>>>> proposal) >>>>>>>>>>>>>>>>>>>>>> might >>>>>>>>>>>>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>>>>>>>>>>> easier, because the id is >>>>> controlled >>>>>>> by >>>>>>>>> the >>>>>>>>>>>>>>> application >>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>> it >>>>>>>>>>>>>>>>>>>>>> knows >>>>>>>>>>>>>>>>>>>>>>>> how >>>>>>>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>>>>> complete the transaction >> after >>>>>> crash / >>>>>>>>>>> restart; >>>>>>>>>>>>>> while >>>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>> TID >>>>>>>>>>>>>>>>>>>> would >>>>>>>>>>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>>>>>>>>>>> generated by Kafka and that >>>> would >>>>>>>> create a >>>>>>>>>>>>> question >>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>> starting >>>>>>>>>>>>>>>>>>>>>> Kafka >>>>>>>>>>>>>>>>>>>>>>>>>> transaction, but not saving >>> its >>>>> TID >>>>>>> and >>>>>>>>> then >>>>>>>>>>>>>> crashing, >>>>>>>>>>>>>>>>> then >>>>>>>>>>>>>>>>>>>>>> figuring >>>>>>>>>>>>>>>>>>>>>>>> out >>>>>>>>>>>>>>>>>>>>>>>>>> which transactions to abort >>> and >>>>> etc. >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> 2) creating a separate >>>> producer >>>>>> for >>>>>>>> each >>>>>>>>>>>>>> concurrency >>>>>>>>>>>>>>>>> slot >>>>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>> application >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> This is a very valid >> concern. >>>>> Maybe >>>>>>>> we'd >>>>>>>>>> need >>>>>>>>>>>> to >>>>>>>>>>>>>> have >>>>>>>>>>>>>>>>> some >>>>>>>>>>>>>>>>>>>>>>>> multiplexing >>>>>>>>>>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>>>>>>>> transactional logical >>> "streams" >>>>> over >>>>>>> the >>>>>>>>>> same >>>>>>>>>>>>>>>> connection. >>>>>>>>>>>>>>>>>>>> Seems >>>>>>>>>>>>>>>>>>>>>>> like a >>>>>>>>>>>>>>>>>>>>>>>>>> separate KIP, though. >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> Otherwise, it seems you're >>>> left >>>>>> with >>>>>>>>>>>>>> single-threaded >>>>>>>>>>>>>>>>> model >>>>>>>>>>>>>>>>>>>> per >>>>>>>>>>>>>>>>>>>>>>>>>> application process? >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> That's a fair assessment. >> Not >>>>>>>> necessarily >>>>>>>>>>>> exactly >>>>>>>>>>>>>>>>>>>>> single-threaded >>>>>>>>>>>>>>>>>>>>>>> per >>>>>>>>>>>>>>>>>>>>>>>>>> application, but a single >>>> producer >>>>>> per >>>>>>>>>> thread >>>>>>>>>>>>> model >>>>>>>>>>>>>>>> (i.e. >>>>>>>>>>>>>>>>> an >>>>>>>>>>>>>>>>>>>>>>>> application >>>>>>>>>>>>>>>>>>>>>>>>>> could have a pool of >> threads + >>>>>>> producers >>>>>>>>> to >>>>>>>>>>>>> increase >>>>>>>>>>>>>>>>>>>>> concurrency). >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> -Artem >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> On Tue, Aug 22, 2023 at >>> 7:22 PM >>>>>> Roger >>>>>>>>>> Hoover < >>>>>>>>>>>>>>>>>>>>>> roger.hoo...@gmail.com >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> Artem, >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the reply. >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> If I understand correctly, >>>> Kafka >>>>>>> does >>>>>>>>> not >>>>>>>>>>>>> support >>>>>>>>>>>>>>>>>> concurrent >>>>>>>>>>>>>>>>>>>>>>>>> transactions >>>>>>>>>>>>>>>>>>>>>>>>>>> from the same producer >>>>>>> (transactional >>>>>>>>> id). >>>>>>>>>>> I >>>>>>>>>>>>>> think >>>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>>>>> means >>>>>>>>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>>>>>>>>> applications that want to >>>>> support >>>>>>>>>> in-process >>>>>>>>>>>>>>>> concurrency >>>>>>>>>>>>>>>>>>>> (say >>>>>>>>>>>>>>>>>>>>>>>>>> thread-level >>>>>>>>>>>>>>>>>>>>>>>>>>> concurrency with row-level >>> DB >>>>>>> locking) >>>>>>>>>> would >>>>>>>>>>>>> need >>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>> manage >>>>>>>>>>>>>>>>>>>>>>> separate >>>>>>>>>>>>>>>>>>>>>>>>>>> transactional ids and >>>> producers >>>>>> per >>>>>>>>> thread >>>>>>>>>>> and >>>>>>>>>>>>>> then >>>>>>>>>>>>>>>>> store >>>>>>>>>>>>>>>>>>>> txn >>>>>>>>>>>>>>>>>>>>>> state >>>>>>>>>>>>>>>>>>>>>>>>>>> accordingly. The >> potential >>>>>>> usability >>>>>>>>>>>>> downsides I >>>>>>>>>>>>>>> see >>>>>>>>>>>>>>>>> are >>>>>>>>>>>>>>>>>>>>>>>>>>> 1) managing a set of >>>>> transactional >>>>>>> ids >>>>>>>>> for >>>>>>>>>>>> each >>>>>>>>>>>>>>>>>> application >>>>>>>>>>>>>>>>>>>>>> process >>>>>>>>>>>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>>>>>>>>> scales up to it's max >>>>> concurrency. >>>>>>>>> Maybe >>>>>>>>>>> not >>>>>>>>>>>>> too >>>>>>>>>>>>>>> bad >>>>>>>>>>>>>>>>> but >>>>>>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>>>>> bit >>>>>>>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>>>>>> pain >>>>>>>>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>>>>>> manage these ids inside >> each >>>>>> process >>>>>>>> and >>>>>>>>>>>> across >>>>>>>>>>>>>> all >>>>>>>>>>>>>>>>>>>> application >>>>>>>>>>>>>>>>>>>>>>>>>> processes. >>>>>>>>>>>>>>>>>>>>>>>>>>> 2) creating a separate >>>> producer >>>>>> for >>>>>>>> each >>>>>>>>>>>>>> concurrency >>>>>>>>>>>>>>>>> slot >>>>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>> application - this could >>>> create >>>>> a >>>>>>> lot >>>>>>>>> more >>>>>>>>>>>>>> producers >>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>> resultant >>>>>>>>>>>>>>>>>>>>>>>>>>> connections to Kafka than >>> the >>>>>>> typical >>>>>>>>>> model >>>>>>>>>>>> of a >>>>>>>>>>>>>>>> single >>>>>>>>>>>>>>>>>>>>> producer >>>>>>>>>>>>>>>>>>>>>>> per >>>>>>>>>>>>>>>>>>>>>>>>>>> process. >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> Otherwise, it seems you're >>>> left >>>>>> with >>>>>>>>>>>>>> single-threaded >>>>>>>>>>>>>>>>> model >>>>>>>>>>>>>>>>>>>> per >>>>>>>>>>>>>>>>>>>>>>>>>> application >>>>>>>>>>>>>>>>>>>>>>>>>>> process? >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> Roger >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> On Tue, Aug 22, 2023 at >>>> 5:11 PM >>>>>>> Artem >>>>>>>>>>> Livshits >>>>>>>>>>>>>>>>>>>>>>>>>>> <alivsh...@confluent.io >>>>> .invalid> >>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Roger, Arjun, >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Thank you for the >>> questions. >>>>>>>>>>>>>>>>>>>>>>>>>>>>> It looks like the >>>>> application >>>>>>> must >>>>>>>>>> have >>>>>>>>>>>>> stable >>>>>>>>>>>>>>>>>>>>> transactional >>>>>>>>>>>>>>>>>>>>>>> ids >>>>>>>>>>>>>>>>>>>>>>>>> over >>>>>>>>>>>>>>>>>>>>>>>>>>>> time? >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> The transactional id >>> should >>>>>>> uniquely >>>>>>>>>>>> identify >>>>>>>>>>>>> a >>>>>>>>>>>>>>>>> producer >>>>>>>>>>>>>>>>>>>>>> instance >>>>>>>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>>>>>>> needs >>>>>>>>>>>>>>>>>>>>>>>>>>>> to be stable across the >>>>>> restarts. >>>>>>>> If >>>>>>>>>> the >>>>>>>>>>>>>>>>> transactional >>>>>>>>>>>>>>>>>>>> id is >>>>>>>>>>>>>>>>>>>>>> not >>>>>>>>>>>>>>>>>>>>>>>>>> stable >>>>>>>>>>>>>>>>>>>>>>>>>>>> across restarts, then >>> zombie >>>>>>>> messages >>>>>>>>>>> from a >>>>>>>>>>>>>>>> previous >>>>>>>>>>>>>>>>>>>>>> incarnation >>>>>>>>>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>> producer may violate >>>>> atomicity. >>>>>>> If >>>>>>>>>> there >>>>>>>>>>>> are >>>>>>>>>>>>> 2 >>>>>>>>>>>>>>>>> producer >>>>>>>>>>>>>>>>>>>>>>> instances >>>>>>>>>>>>>>>>>>>>>>>>>>>> concurrently producing >>> data >>>>> with >>>>>>> the >>>>>>>>>> same >>>>>>>>>>>>>>>>> transactional >>>>>>>>>>>>>>>>>>>> id, >>>>>>>>>>>>>>>>>>>>>> they >>>>>>>>>>>>>>>>>>>>>>>> are >>>>>>>>>>>>>>>>>>>>>>>>>>> going >>>>>>>>>>>>>>>>>>>>>>>>>>>> to constantly fence each >>>> other >>>>>> and >>>>>>>>> most >>>>>>>>>>>> likely >>>>>>>>>>>>>>> make >>>>>>>>>>>>>>>>>>>> little or >>>>>>>>>>>>>>>>>>>>>> no >>>>>>>>>>>>>>>>>>>>>>>>>>> progress. >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> The name might be a >> little >>>> bit >>>>>>>>> confusing >>>>>>>>>>> as >>>>>>>>>>>> it >>>>>>>>>>>>>> may >>>>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>>>>>> mistaken >>>>>>>>>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>>>>>>>>>>>>> transaction id / TID >> that >>>>>> uniquely >>>>>>>>>>>> identifies >>>>>>>>>>>>>>> every >>>>>>>>>>>>>>>>>>>>>> transaction. >>>>>>>>>>>>>>>>>>>>>>>> The >>>>>>>>>>>>>>>>>>>>>>>>>>> name >>>>>>>>>>>>>>>>>>>>>>>>>>>> and the semantics were >>>> defined >>>>>> in >>>>>>>> the >>>>>>>>>>>> original >>>>>>>>>>>>>>>>>>>>>>>> exactly-once-semantics >>>>>>>>>>>>>>>>>>>>>>>>>>> (EoS) >>>>>>>>>>>>>>>>>>>>>>>>>>>> proposal ( >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging >>>>>>>>>>>>>>>>>>>>>>>>>>>> ) >>>>>>>>>>>>>>>>>>>>>>>>>>>> and KIP-939 just build >> on >>>> top >>>>> of >>>>>>>> that. >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> I'm curious to >>> understand >>>>> what >>>>>>>>> happens >>>>>>>>>>> if >>>>>>>>>>>>> the >>>>>>>>>>>>>>>>> producer >>>>>>>>>>>>>>>>>>>>> dies, >>>>>>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>>>>> does >>>>>>>>>>>>>>>>>>>>>>>>>>> not >>>>>>>>>>>>>>>>>>>>>>>>>>>> come up and recover the >>>>> pending >>>>>>>>>>> transaction >>>>>>>>>>>>>> within >>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>> transaction >>>>>>>>>>>>>>>>>>>>>>>>>>> timeout >>>>>>>>>>>>>>>>>>>>>>>>>>>> interval. >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> If the producer / >>>> application >>>>>>> never >>>>>>>>>> comes >>>>>>>>>>>>> back, >>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>> transaction >>>>>>>>>>>>>>>>>>>>>>>> will >>>>>>>>>>>>>>>>>>>>>>>>>>> remain >>>>>>>>>>>>>>>>>>>>>>>>>>>> in prepared (a.k.a. >>>>> "in-doubt") >>>>>>>> state >>>>>>>>>>> until >>>>>>>>>>>> an >>>>>>>>>>>>>>>>> operator >>>>>>>>>>>>>>>>>>>>>>> forcefully >>>>>>>>>>>>>>>>>>>>>>>>>>>> terminates the >>> transaction. >>>>>>> That's >>>>>>>>> why >>>>>>>>>>>> there >>>>>>>>>>>>>> is a >>>>>>>>>>>>>>>> new >>>>>>>>>>>>>>>>>>>> ACL is >>>>>>>>>>>>>>>>>>>>>>>> defined >>>>>>>>>>>>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>>>>>>>>>>>> this proposal -- this >>>>>>> functionality >>>>>>>>>> should >>>>>>>>>>>>> only >>>>>>>>>>>>>>>>> provided >>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>>>>> applications >>>>>>>>>>>>>>>>>>>>>>>>>>>> that implement proper >>>> recovery >>>>>>>> logic. >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> -Artem >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> On Tue, Aug 22, 2023 at >>>>> 12:52 AM >>>>>>>> Arjun >>>>>>>>>>>> Satish >>>>>>>>>>>>> < >>>>>>>>>>>>>>>>>>>>>>>>> arjun.sat...@gmail.com> >>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hello Artem, >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the KIP. >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> I have the same >> question >>>> as >>>>>>> Roger >>>>>>>> on >>>>>>>>>>>>>> concurrent >>>>>>>>>>>>>>>>>> writes, >>>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>> an >>>>>>>>>>>>>>>>>>>>>>>>>>> additional >>>>>>>>>>>>>>>>>>>>>>>>>>>>> one on consumer >>> behavior. >>>>>>>> Typically, >>>>>>>>>>>>>>> transactions >>>>>>>>>>>>>>>>> will >>>>>>>>>>>>>>>>>>>>>> timeout >>>>>>>>>>>>>>>>>>>>>>> if >>>>>>>>>>>>>>>>>>>>>>>>> not >>>>>>>>>>>>>>>>>>>>>>>>>>>>> committed within some >>> time >>>>>>>> interval. >>>>>>>>>>> With >>>>>>>>>>>>> the >>>>>>>>>>>>>>>>> proposed >>>>>>>>>>>>>>>>>>>>>> changes >>>>>>>>>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>>>>>>>>>>>>> KIP, >>>>>>>>>>>>>>>>>>>>>>>>>>>>> consumers cannot >> consume >>>>> past >>>>>>> the >>>>>>>>>>> ongoing >>>>>>>>>>>>>>>>> transaction. >>>>>>>>>>>>>>>>>>>> I'm >>>>>>>>>>>>>>>>>>>>>>>> curious >>>>>>>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>>>>>>>> understand what >> happens >>> if >>>>> the >>>>>>>>>> producer >>>>>>>>>>>>> dies, >>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>> does >>>>>>>>>>>>>>>>>>>> not >>>>>>>>>>>>>>>>>>>>>> come >>>>>>>>>>>>>>>>>>>>>>>> up >>>>>>>>>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>>>>>>>>> recover the pending >>>>>> transaction >>>>>>>>> within >>>>>>>>>>> the >>>>>>>>>>>>>>>>> transaction >>>>>>>>>>>>>>>>>>>>>> timeout >>>>>>>>>>>>>>>>>>>>>>>>>>> interval. >>>>>>>>>>>>>>>>>>>>>>>>>>>> Or >>>>>>>>>>>>>>>>>>>>>>>>>>>>> are we saying that >> when >>>> used >>>>>> in >>>>>>>> this >>>>>>>>>> 2PC >>>>>>>>>>>>>>> context, >>>>>>>>>>>>>>>> we >>>>>>>>>>>>>>>>>>>> should >>>>>>>>>>>>>>>>>>>>>>>>> configure >>>>>>>>>>>>>>>>>>>>>>>>>>>> these >>>>>>>>>>>>>>>>>>>>>>>>>>>>> transaction timeouts >> to >>>> very >>>>>>> large >>>>>>>>>>>>> durations? >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks in advance! >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Arjun >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Mon, Aug 21, 2023 >> at >>>>>> 1:06 PM >>>>>>>>> Roger >>>>>>>>>>>>> Hoover < >>>>>>>>>>>>>>>>>>>>>>>>> roger.hoo...@gmail.com >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Artem, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for writing >>> this >>>>> KIP. >>>>>>>> Can >>>>>>>>>> you >>>>>>>>>>>>>> clarify >>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>> requirements >>>>>>>>>>>>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>>>>>>>>>>> bit >>>>>>>>>>>>>>>>>>>>>>>>>>>> more >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> for managing >>> transaction >>>>>>> state? >>>>>>>>> It >>>>>>>>>>>> looks >>>>>>>>>>>>>> like >>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>> application >>>>>>>>>>>>>>>>>>>>>>>>> must >>>>>>>>>>>>>>>>>>>>>>>>>>>> have >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> stable transactional >>> ids >>>>>> over >>>>>>>>> time? >>>>>>>>>>>> What >>>>>>>>>>>>>> is >>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>> granularity >>>>>>>>>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>>>>>>>>> those >>>>>>>>>>>>>>>>>>>>>>>>>>>>> ids >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> and producers? Say >>> the >>>>>>>>> application >>>>>>>>>>> is a >>>>>>>>>>>>>>>>>>>> multi-threaded >>>>>>>>>>>>>>>>>>>>>> Java >>>>>>>>>>>>>>>>>>>>>>>> web >>>>>>>>>>>>>>>>>>>>>>>>>>>> server, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> can/should all the >>>>>> concurrent >>>>>>>>>> threads >>>>>>>>>>>>> share >>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>>>>>> transactional >>>>>>>>>>>>>>>>>>>>>>> id >>>>>>>>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> producer? That >>> doesn't >>>>> seem >>>>>>>> right >>>>>>>>>> to >>>>>>>>>>> me >>>>>>>>>>>>>>> unless >>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>> application >>>>>>>>>>>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>>>>>>>>>>> using >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> global DB locks that >>>>>> serialize >>>>>>>> all >>>>>>>>>>>>> requests. >>>>>>>>>>>>>>>>>>>> Instead, if >>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>> application >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> uses row-level DB >>> locks, >>>>>> there >>>>>>>>> could >>>>>>>>>>> be >>>>>>>>>>>>>>>> multiple, >>>>>>>>>>>>>>>>>>>>>> concurrent, >>>>>>>>>>>>>>>>>>>>>>>>>>>> independent >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> txns happening in >> the >>>> same >>>>>> JVM >>>>>>>> so >>>>>>>>> it >>>>>>>>>>>> seems >>>>>>>>>>>>>>> like >>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>> granularity >>>>>>>>>>>>>>>>>>>>>>>>>>>> managing >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> transactional ids >> and >>>> txn >>>>>>> state >>>>>>>>>> needs >>>>>>>>>>> to >>>>>>>>>>>>>> line >>>>>>>>>>>>>>> up >>>>>>>>>>>>>>>>>> with >>>>>>>>>>>>>>>>>>>>>>>> granularity >>>>>>>>>>>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>>> DB >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> locking. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Does that make sense >>> or >>>>> am I >>>>>>>>>>>>>> misunderstanding? >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Roger >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Aug 16, 2023 >>> at >>>>>>> 11:40 PM >>>>>>>>>> Artem >>>>>>>>>>>>>>> Livshits >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> < >>> alivsh...@confluent.io >>>>>>>> .invalid> >>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hello, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> This is a >> discussion >>>>>> thread >>>>>>>> for >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-939%3A+Support+Participation+in+2PC >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> . >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> The KIP proposes >>>>> extending >>>>>>>> Kafka >>>>>>>>>>>>>> transaction >>>>>>>>>>>>>>>>>> support >>>>>>>>>>>>>>>>>>>>>> (that >>>>>>>>>>>>>>>>>>>>>>>>>> already >>>>>>>>>>>>>>>>>>>>>>>>>>>> uses >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2PC >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> under the hood) to >>>>> enable >>>>>>>>>> atomicity >>>>>>>>>>> of >>>>>>>>>>>>>> dual >>>>>>>>>>>>>>>>> writes >>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>> Kafka >>>>>>>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>>>>>> an >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> external >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> database, and >> helps >>> to >>>>>> fix a >>>>>>>>> long >>>>>>>>>>>>> standing >>>>>>>>>>>>>>>> Flink >>>>>>>>>>>>>>>>>>>> issue. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> An example of code >>>> that >>>>>> uses >>>>>>>> the >>>>>>>>>>> dual >>>>>>>>>>>>>> write >>>>>>>>>>>>>>>>> recipe >>>>>>>>>>>>>>>>>>>> with >>>>>>>>>>>>>>>>>>>>>>> JDBC >>>>>>>>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>>>>>>>> should >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> work for most SQL >>>>>> databases >>>>>>> is >>>>>>>>>> here >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>> https://github.com/apache/kafka/pull/14231. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> The FLIP for the >>>> sister >>>>>> fix >>>>>>> in >>>>>>>>>> Flink >>>>>>>>>>>> is >>>>>>>>>>>>>> here >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255071710 >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> -Artem