Hi Justine, After thinking a bit about supporting atomic dual writes for Kafka + NoSQL database, I came to a conclusion that we do need to bump the epoch even with InitProducerId(keepPreparedTxn=true). As I described in my previous email, we wouldn't need to bump the epoch to protect from zombies so that reasoning is still true. But we cannot protect from split-brain scenarios when two or more instances of a producer with the same transactional id try to produce at the same time. The dual-write example for SQL databases ( https://github.com/apache/kafka/pull/14231/files) doesn't have a split-brain problem because execution is protected by the update lock on the transaction state record; however NoSQL databases may not have this protection (I'll write an example for NoSQL database dual-write soon).
In a nutshell, here is an example of a split-brain scenario: 1. (instance1) InitProducerId(keepPreparedTxn=true), got epoch=42 2. (instance2) InitProducerId(keepPreparedTxn=true), got epoch=42 3. (instance1) CommitTxn, epoch bumped to 43 4. (instance2) CommitTxn, this is considered a retry, so it got epoch 43 as well 5. (instance1) Produce messageA w/sequence 1 6. (instance2) Produce messageB w/sequence 1, this is considered a duplicate 7. (instance2) Produce messageC w/sequence 2 8. (instance1) Produce messageD w/sequence 2, this is considered a duplicate Now if either of those commit the transaction, it would have a mix of messages from the two instances (messageA and messageC). With the proper epoch bump, instance1 would get fenced at step 3. In order to update epoch in InitProducerId(keepPreparedTxn=true) we need to preserve the ongoing transaction's epoch (and producerId, if the epoch overflows), because we'd need to make a correct decision when we compare the PreparedTxnState that we read from the database with the (producerId, epoch) of the ongoing transaction. I've updated the KIP with the following: - Ongoing transaction now has 2 (producerId, epoch) pairs -- one pair describes the ongoing transaction, the other pair describes expected epoch for operations on this transactional id - InitProducerIdResponse now returns 2 (producerId, epoch) pairs - TransactionalLogValue now has 2 (producerId, epoch) pairs, the new values added as tagged fields, so it's easy to downgrade - Added a note about downgrade in the Compatibility section - Added a rejected alternative -Artem On Fri, Oct 6, 2023 at 5:16 PM Artem Livshits <alivsh...@confluent.io> wrote: > Hi Justine, > > Thank you for the questions. Currently (pre-KIP-939) we always bump the > epoch on InitProducerId and abort an ongoing transaction (if any). I > expect this behavior will continue with KIP-890 as well. > > With KIP-939 we need to support the case when the ongoing transaction > needs to be preserved when keepPreparedTxn=true. Bumping epoch without > aborting or committing a transaction is tricky because epoch is a short > value and it's easy to overflow. Currently, the overflow case is handled > by aborting the ongoing transaction, which would send out transaction > markers with epoch=Short.MAX_VALUE to the partition leaders, which would > fence off any messages with the producer id that started the transaction > (they would have epoch that is less than Short.MAX_VALUE). Then it is safe > to allocate a new producer id and use it in new transactions. > > We could say that maybe when keepPreparedTxn=true we bump epoch unless it > leads to overflow, and don't bump epoch in the overflow case. I don't > think it's a good solution because if it's not safe to keep the same epoch > when keepPreparedTxn=true, then we must handle the epoch overflow case as > well. So either we should convince ourselves that it's safe to keep the > epoch and do it in the general case, or we always bump the epoch and handle > the overflow. > > With KIP-890, we bump the epoch on every transaction commit / abort. This > guarantees that even if InitProducerId(keepPreparedTxn=true) doesn't > increment epoch on the ongoing transaction, the client will have to call > commit or abort to finish the transaction and will increment the epoch (and > handle epoch overflow, if needed). If the ongoing transaction was in a bad > state and had some zombies waiting to arrive, the abort operation would > fence them because with KIP-890 every abort would bump the epoch. > > We could also look at this from the following perspective. With KIP-890, > zombies won't be able to cross transaction boundaries; each transaction > completion creates a boundary and any activity in the past gets confined in > the boundary. Then data in any partition would look like this: > > 1. message1, epoch=42 > 2. message2, epoch=42 > 3. message3, epoch=42 > 4. marker (commit or abort), epoch=43 > > Now if we inject steps 3a and 3b like this: > > 1. message1, epoch=42 > 2. message2, epoch=42 > 3. message3, epoch=42 > 3a. crash > 3b. InitProducerId(keepPreparedTxn=true) > 4. marker (commit or abort), epoch=43 > > The invariant still holds even with steps 3a and 3b -- whatever activity > was in the past will get confined in the past with mandatory abort / commit > that must follow InitProducerId(keepPreparedTxn=true). > > So KIP-890 provides the proper isolation between transactions, so > injecting crash + InitProducerId(keepPreparedTxn=true) into the > transaction sequence is safe from the zombie protection perspective. > > That said, I'm still thinking about it and looking for cases that might > break because we don't bump epoch when > InitProducerId(keepPreparedTxn=true), if such cases exist, we'll need to > develop the logic to handle epoch overflow for ongoing transactions. > > -Artem > > > > On Tue, Oct 3, 2023 at 10:15 AM Justine Olshan > <jols...@confluent.io.invalid> wrote: > >> Hey Artem, >> >> Thanks for the KIP. I had a question about epoch bumping. >> >> Previously when we send an InitProducerId request on Producer startup, we >> bump the epoch and abort the transaction. Is it correct to assume that we >> will still bump the epoch, but just not abort the transaction? >> If we still bump the epoch in this case, how does this interact with >> KIP-890 where we also bump the epoch on every transaction. (I think this >> means that we may skip epochs and the data itself will all have the same >> epoch) >> >> I may have follow ups depending on the answer to this. :) >> >> Thanks, >> Justine >> >> On Thu, Sep 7, 2023 at 9:51 PM Artem Livshits >> <alivsh...@confluent.io.invalid> wrote: >> >> > Hi Alex, >> > >> > Thank you for your questions. >> > >> > > the purpose of having broker-level transaction.two.phase.commit.enable >> > >> > The thinking is that 2PC is a bit of an advanced construct so enabling >> 2PC >> > in a Kafka cluster should be an explicit decision. If it is set to >> 'false' >> > InitiProducerId (and initTransactions) would >> > return TRANSACTIONAL_ID_AUTHORIZATION_FAILED. >> > >> > > WDYT about adding an AdminClient method that returns the state of >> > transaction.two.phase.commit.enable >> > >> > I wonder if the client could just try to use 2PC and then handle the >> error >> > (e.g. if it needs to fall back to ordinary transactions). This way it >> > could uniformly handle cases when Kafka cluster doesn't support 2PC >> > completely and cases when 2PC is restricted to certain users. We could >> > also expose this config in describeConfigs, if the fallback approach >> > doesn't work for some scenarios. >> > >> > -Artem >> > >> > >> > On Tue, Sep 5, 2023 at 12:45 PM Alexander Sorokoumov >> > <asorokou...@confluent.io.invalid> wrote: >> > >> > > Hi Artem, >> > > >> > > Thanks for publishing this KIP! >> > > >> > > Can you please clarify the purpose of having broker-level >> > > transaction.two.phase.commit.enable config in addition to the new >> ACL? If >> > > the brokers are configured with >> > transaction.two.phase.commit.enable=false, >> > > at what point will a client configured with >> > > transaction.two.phase.commit.enable=true fail? Will it happen at >> > > KafkaProducer#initTransactions? >> > > >> > > WDYT about adding an AdminClient method that returns the state of t >> > > ransaction.two.phase.commit.enable? This way, clients would know in >> > advance >> > > if 2PC is enabled on the brokers. >> > > >> > > Best, >> > > Alex >> > > >> > > On Fri, Aug 25, 2023 at 9:40 AM Roger Hoover <roger.hoo...@gmail.com> >> > > wrote: >> > > >> > > > Other than supporting multiplexing transactional streams on a single >> > > > producer, I don't see how to improve it. >> > > > >> > > > On Thu, Aug 24, 2023 at 12:12 PM Artem Livshits >> > > > <alivsh...@confluent.io.invalid> wrote: >> > > > >> > > > > Hi Roger, >> > > > > >> > > > > Thank you for summarizing the cons. I agree and I'm curious what >> > would >> > > > be >> > > > > the alternatives to solve these problems better and if they can be >> > > > > incorporated into this proposal (or built independently in >> addition >> > to >> > > or >> > > > > on top of this proposal). E.g. one potential extension we >> discussed >> > > > > earlier in the thread could be multiplexing logical transactional >> > > > "streams" >> > > > > with a single producer. >> > > > > >> > > > > -Artem >> > > > > >> > > > > On Wed, Aug 23, 2023 at 4:50 PM Roger Hoover < >> roger.hoo...@gmail.com >> > > >> > > > > wrote: >> > > > > >> > > > > > Thanks. I like that you're moving Kafka toward supporting this >> > > > > dual-write >> > > > > > pattern. Each use case needs to consider the tradeoffs. You >> > already >> > > > > > summarized the pros very well in the KIP. I would summarize the >> > cons >> > > > > > as follows: >> > > > > > >> > > > > > - you sacrifice availability - each write requires both DB and >> > Kafka >> > > to >> > > > > be >> > > > > > available so I think your overall application availability is 1 >> - >> > > p(DB >> > > > is >> > > > > > unavailable)*p(Kafka is unavailable). >> > > > > > - latency will be higher and throughput lower - each write >> requires >> > > > both >> > > > > > writes to DB and Kafka while holding an exclusive lock in DB. >> > > > > > - you need to create a producer per unit of concurrency in your >> app >> > > > which >> > > > > > has some overhead in the app and Kafka side (number of >> connections, >> > > > poor >> > > > > > batching). I assume the producers would need to be configured >> for >> > > low >> > > > > > latency (linger.ms=0) >> > > > > > - there's some complexity in managing stable transactional ids >> for >> > > each >> > > > > > producer/concurrency unit in your application. With k8s >> > deployment, >> > > > you >> > > > > > may need to switch to something like a StatefulSet that gives >> each >> > > pod >> > > > a >> > > > > > stable identity across restarts. On top of that pod identity >> which >> > > you >> > > > > can >> > > > > > use as a prefix, you then assign unique transactional ids to >> each >> > > > > > concurrency unit (thread/goroutine). >> > > > > > >> > > > > > On Wed, Aug 23, 2023 at 12:53 PM Artem Livshits >> > > > > > <alivsh...@confluent.io.invalid> wrote: >> > > > > > >> > > > > > > Hi Roger, >> > > > > > > >> > > > > > > Thank you for the feedback. You make a very good point that >> we >> > > also >> > > > > > > discussed internally. Adding support for multiple concurrent >> > > > > > > transactions in one producer could be valuable but it seems to >> > be a >> > > > > > fairly >> > > > > > > large and independent change that would deserve a separate >> KIP. >> > If >> > > > > such >> > > > > > > support is added we could modify 2PC functionality to >> incorporate >> > > > that. >> > > > > > > >> > > > > > > > Maybe not too bad but a bit of pain to manage these ids >> inside >> > > each >> > > > > > > process and across all application processes. >> > > > > > > >> > > > > > > I'm not sure if supporting multiple transactions in one >> producer >> > > > would >> > > > > > make >> > > > > > > id management simpler: we'd need to store a piece of data per >> > > > > > transaction, >> > > > > > > so whether it's N producers with a single transaction or N >> > > > transactions >> > > > > > > with a single producer, it's still roughly the same amount of >> > data >> > > to >> > > > > > > manage. In fact, managing transactional ids (current >> proposal) >> > > might >> > > > > be >> > > > > > > easier, because the id is controlled by the application and it >> > > knows >> > > > > how >> > > > > > to >> > > > > > > complete the transaction after crash / restart; while a TID >> would >> > > be >> > > > > > > generated by Kafka and that would create a question of >> starting >> > > Kafka >> > > > > > > transaction, but not saving its TID and then crashing, then >> > > figuring >> > > > > out >> > > > > > > which transactions to abort and etc. >> > > > > > > >> > > > > > > > 2) creating a separate producer for each concurrency slot in >> > the >> > > > > > > application >> > > > > > > >> > > > > > > This is a very valid concern. Maybe we'd need to have some >> > > > > multiplexing >> > > > > > of >> > > > > > > transactional logical "streams" over the same connection. >> Seems >> > > > like a >> > > > > > > separate KIP, though. >> > > > > > > >> > > > > > > > Otherwise, it seems you're left with single-threaded model >> per >> > > > > > > application process? >> > > > > > > >> > > > > > > That's a fair assessment. Not necessarily exactly >> > single-threaded >> > > > per >> > > > > > > application, but a single producer per thread model (i.e. an >> > > > > application >> > > > > > > could have a pool of threads + producers to increase >> > concurrency). >> > > > > > > >> > > > > > > -Artem >> > > > > > > >> > > > > > > On Tue, Aug 22, 2023 at 7:22 PM Roger Hoover < >> > > roger.hoo...@gmail.com >> > > > > >> > > > > > > wrote: >> > > > > > > >> > > > > > > > Artem, >> > > > > > > > >> > > > > > > > Thanks for the reply. >> > > > > > > > >> > > > > > > > If I understand correctly, Kafka does not support concurrent >> > > > > > transactions >> > > > > > > > from the same producer (transactional id). I think this >> means >> > > that >> > > > > > > > applications that want to support in-process concurrency >> (say >> > > > > > > thread-level >> > > > > > > > concurrency with row-level DB locking) would need to manage >> > > > separate >> > > > > > > > transactional ids and producers per thread and then store >> txn >> > > state >> > > > > > > > accordingly. The potential usability downsides I see are >> > > > > > > > 1) managing a set of transactional ids for each application >> > > process >> > > > > > that >> > > > > > > > scales up to it's max concurrency. Maybe not too bad but a >> bit >> > > of >> > > > > pain >> > > > > > > to >> > > > > > > > manage these ids inside each process and across all >> application >> > > > > > > processes. >> > > > > > > > 2) creating a separate producer for each concurrency slot in >> > the >> > > > > > > > application - this could create a lot more producers and >> > > resultant >> > > > > > > > connections to Kafka than the typical model of a single >> > producer >> > > > per >> > > > > > > > process. >> > > > > > > > >> > > > > > > > Otherwise, it seems you're left with single-threaded model >> per >> > > > > > > application >> > > > > > > > process? >> > > > > > > > >> > > > > > > > Thanks, >> > > > > > > > >> > > > > > > > Roger >> > > > > > > > >> > > > > > > > On Tue, Aug 22, 2023 at 5:11 PM Artem Livshits >> > > > > > > > <alivsh...@confluent.io.invalid> wrote: >> > > > > > > > >> > > > > > > > > Hi Roger, Arjun, >> > > > > > > > > >> > > > > > > > > Thank you for the questions. >> > > > > > > > > > It looks like the application must have stable >> > transactional >> > > > ids >> > > > > > over >> > > > > > > > > time? >> > > > > > > > > >> > > > > > > > > The transactional id should uniquely identify a producer >> > > instance >> > > > > and >> > > > > > > > needs >> > > > > > > > > to be stable across the restarts. If the transactional >> id is >> > > not >> > > > > > > stable >> > > > > > > > > across restarts, then zombie messages from a previous >> > > incarnation >> > > > > of >> > > > > > > the >> > > > > > > > > producer may violate atomicity. If there are 2 producer >> > > > instances >> > > > > > > > > concurrently producing data with the same transactional >> id, >> > > they >> > > > > are >> > > > > > > > going >> > > > > > > > > to constantly fence each other and most likely make >> little or >> > > no >> > > > > > > > progress. >> > > > > > > > > >> > > > > > > > > The name might be a little bit confusing as it may be >> > mistaken >> > > > for >> > > > > a >> > > > > > > > > transaction id / TID that uniquely identifies every >> > > transaction. >> > > > > The >> > > > > > > > name >> > > > > > > > > and the semantics were defined in the original >> > > > > exactly-once-semantics >> > > > > > > > (EoS) >> > > > > > > > > proposal ( >> > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging >> > > > > > > > > ) >> > > > > > > > > and KIP-939 just build on top of that. >> > > > > > > > > >> > > > > > > > > > I'm curious to understand what happens if the producer >> > dies, >> > > > and >> > > > > > does >> > > > > > > > not >> > > > > > > > > come up and recover the pending transaction within the >> > > > transaction >> > > > > > > > timeout >> > > > > > > > > interval. >> > > > > > > > > >> > > > > > > > > If the producer / application never comes back, the >> > transaction >> > > > > will >> > > > > > > > remain >> > > > > > > > > in prepared (a.k.a. "in-doubt") state until an operator >> > > > forcefully >> > > > > > > > > terminates the transaction. That's why there is a new >> ACL is >> > > > > defined >> > > > > > > in >> > > > > > > > > this proposal -- this functionality should only provided >> to >> > > > > > > applications >> > > > > > > > > that implement proper recovery logic. >> > > > > > > > > >> > > > > > > > > -Artem >> > > > > > > > > >> > > > > > > > > On Tue, Aug 22, 2023 at 12:52 AM Arjun Satish < >> > > > > > arjun.sat...@gmail.com> >> > > > > > > > > wrote: >> > > > > > > > > >> > > > > > > > > > Hello Artem, >> > > > > > > > > > >> > > > > > > > > > Thanks for the KIP. >> > > > > > > > > > >> > > > > > > > > > I have the same question as Roger on concurrent writes, >> and >> > > an >> > > > > > > > additional >> > > > > > > > > > one on consumer behavior. Typically, transactions will >> > > timeout >> > > > if >> > > > > > not >> > > > > > > > > > committed within some time interval. With the proposed >> > > changes >> > > > in >> > > > > > > this >> > > > > > > > > KIP, >> > > > > > > > > > consumers cannot consume past the ongoing transaction. >> I'm >> > > > > curious >> > > > > > to >> > > > > > > > > > understand what happens if the producer dies, and does >> not >> > > come >> > > > > up >> > > > > > > and >> > > > > > > > > > recover the pending transaction within the transaction >> > > timeout >> > > > > > > > interval. >> > > > > > > > > Or >> > > > > > > > > > are we saying that when used in this 2PC context, we >> should >> > > > > > configure >> > > > > > > > > these >> > > > > > > > > > transaction timeouts to very large durations? >> > > > > > > > > > >> > > > > > > > > > Thanks in advance! >> > > > > > > > > > >> > > > > > > > > > Best, >> > > > > > > > > > Arjun >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > On Mon, Aug 21, 2023 at 1:06 PM Roger Hoover < >> > > > > > roger.hoo...@gmail.com >> > > > > > > > >> > > > > > > > > > wrote: >> > > > > > > > > > >> > > > > > > > > > > Hi Artem, >> > > > > > > > > > > >> > > > > > > > > > > Thanks for writing this KIP. Can you clarify the >> > > > requirements >> > > > > a >> > > > > > > bit >> > > > > > > > > more >> > > > > > > > > > > for managing transaction state? It looks like the >> > > > application >> > > > > > must >> > > > > > > > > have >> > > > > > > > > > > stable transactional ids over time? What is the >> > > granularity >> > > > > of >> > > > > > > > those >> > > > > > > > > > ids >> > > > > > > > > > > and producers? Say the application is a >> multi-threaded >> > > Java >> > > > > web >> > > > > > > > > server, >> > > > > > > > > > > can/should all the concurrent threads share a >> > transactional >> > > > id >> > > > > > and >> > > > > > > > > > > producer? That doesn't seem right to me unless the >> > > > application >> > > > > > is >> > > > > > > > > using >> > > > > > > > > > > global DB locks that serialize all requests. >> Instead, if >> > > the >> > > > > > > > > application >> > > > > > > > > > > uses row-level DB locks, there could be multiple, >> > > concurrent, >> > > > > > > > > independent >> > > > > > > > > > > txns happening in the same JVM so it seems like the >> > > > granularity >> > > > > > > > > managing >> > > > > > > > > > > transactional ids and txn state needs to line up with >> > > > > granularity >> > > > > > > of >> > > > > > > > > the >> > > > > > > > > > DB >> > > > > > > > > > > locking. >> > > > > > > > > > > >> > > > > > > > > > > Does that make sense or am I misunderstanding? >> > > > > > > > > > > >> > > > > > > > > > > Thanks, >> > > > > > > > > > > >> > > > > > > > > > > Roger >> > > > > > > > > > > >> > > > > > > > > > > On Wed, Aug 16, 2023 at 11:40 PM Artem Livshits >> > > > > > > > > > > <alivsh...@confluent.io.invalid> wrote: >> > > > > > > > > > > >> > > > > > > > > > > > Hello, >> > > > > > > > > > > > >> > > > > > > > > > > > This is a discussion thread for >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-939%3A+Support+Participation+in+2PC >> > > > > > > > > > > > . >> > > > > > > > > > > > >> > > > > > > > > > > > The KIP proposes extending Kafka transaction support >> > > (that >> > > > > > > already >> > > > > > > > > uses >> > > > > > > > > > > 2PC >> > > > > > > > > > > > under the hood) to enable atomicity of dual writes >> to >> > > Kafka >> > > > > and >> > > > > > > an >> > > > > > > > > > > external >> > > > > > > > > > > > database, and helps to fix a long standing Flink >> issue. >> > > > > > > > > > > > >> > > > > > > > > > > > An example of code that uses the dual write recipe >> with >> > > > JDBC >> > > > > > and >> > > > > > > > > should >> > > > > > > > > > > > work for most SQL databases is here >> > > > > > > > > > > > https://github.com/apache/kafka/pull/14231. >> > > > > > > > > > > > >> > > > > > > > > > > > The FLIP for the sister fix in Flink is here >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255071710 >> > > > > > > > > > > > >> > > > > > > > > > > > -Artem >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> >