Hi Roger, Thank you for summarizing the cons. I agree and I'm curious what would be the alternatives to solve these problems better and if they can be incorporated into this proposal (or built independently in addition to or on top of this proposal). E.g. one potential extension we discussed earlier in the thread could be multiplexing logical transactional "streams" with a single producer.
-Artem On Wed, Aug 23, 2023 at 4:50 PM Roger Hoover <roger.hoo...@gmail.com> wrote: > Thanks. I like that you're moving Kafka toward supporting this dual-write > pattern. Each use case needs to consider the tradeoffs. You already > summarized the pros very well in the KIP. I would summarize the cons > as follows: > > - you sacrifice availability - each write requires both DB and Kafka to be > available so I think your overall application availability is 1 - p(DB is > unavailable)*p(Kafka is unavailable). > - latency will be higher and throughput lower - each write requires both > writes to DB and Kafka while holding an exclusive lock in DB. > - you need to create a producer per unit of concurrency in your app which > has some overhead in the app and Kafka side (number of connections, poor > batching). I assume the producers would need to be configured for low > latency (linger.ms=0) > - there's some complexity in managing stable transactional ids for each > producer/concurrency unit in your application. With k8s deployment, you > may need to switch to something like a StatefulSet that gives each pod a > stable identity across restarts. On top of that pod identity which you can > use as a prefix, you then assign unique transactional ids to each > concurrency unit (thread/goroutine). > > On Wed, Aug 23, 2023 at 12:53 PM Artem Livshits > <alivsh...@confluent.io.invalid> wrote: > > > Hi Roger, > > > > Thank you for the feedback. You make a very good point that we also > > discussed internally. Adding support for multiple concurrent > > transactions in one producer could be valuable but it seems to be a > fairly > > large and independent change that would deserve a separate KIP. If such > > support is added we could modify 2PC functionality to incorporate that. > > > > > Maybe not too bad but a bit of pain to manage these ids inside each > > process and across all application processes. > > > > I'm not sure if supporting multiple transactions in one producer would > make > > id management simpler: we'd need to store a piece of data per > transaction, > > so whether it's N producers with a single transaction or N transactions > > with a single producer, it's still roughly the same amount of data to > > manage. In fact, managing transactional ids (current proposal) might be > > easier, because the id is controlled by the application and it knows how > to > > complete the transaction after crash / restart; while a TID would be > > generated by Kafka and that would create a question of starting Kafka > > transaction, but not saving its TID and then crashing, then figuring out > > which transactions to abort and etc. > > > > > 2) creating a separate producer for each concurrency slot in the > > application > > > > This is a very valid concern. Maybe we'd need to have some multiplexing > of > > transactional logical "streams" over the same connection. Seems like a > > separate KIP, though. > > > > > Otherwise, it seems you're left with single-threaded model per > > application process? > > > > That's a fair assessment. Not necessarily exactly single-threaded per > > application, but a single producer per thread model (i.e. an application > > could have a pool of threads + producers to increase concurrency). > > > > -Artem > > > > On Tue, Aug 22, 2023 at 7:22 PM Roger Hoover <roger.hoo...@gmail.com> > > wrote: > > > > > Artem, > > > > > > Thanks for the reply. > > > > > > If I understand correctly, Kafka does not support concurrent > transactions > > > from the same producer (transactional id). I think this means that > > > applications that want to support in-process concurrency (say > > thread-level > > > concurrency with row-level DB locking) would need to manage separate > > > transactional ids and producers per thread and then store txn state > > > accordingly. The potential usability downsides I see are > > > 1) managing a set of transactional ids for each application process > that > > > scales up to it's max concurrency. Maybe not too bad but a bit of pain > > to > > > manage these ids inside each process and across all application > > processes. > > > 2) creating a separate producer for each concurrency slot in the > > > application - this could create a lot more producers and resultant > > > connections to Kafka than the typical model of a single producer per > > > process. > > > > > > Otherwise, it seems you're left with single-threaded model per > > application > > > process? > > > > > > Thanks, > > > > > > Roger > > > > > > On Tue, Aug 22, 2023 at 5:11 PM Artem Livshits > > > <alivsh...@confluent.io.invalid> wrote: > > > > > > > Hi Roger, Arjun, > > > > > > > > Thank you for the questions. > > > > > It looks like the application must have stable transactional ids > over > > > > time? > > > > > > > > The transactional id should uniquely identify a producer instance and > > > needs > > > > to be stable across the restarts. If the transactional id is not > > stable > > > > across restarts, then zombie messages from a previous incarnation of > > the > > > > producer may violate atomicity. If there are 2 producer instances > > > > concurrently producing data with the same transactional id, they are > > > going > > > > to constantly fence each other and most likely make little or no > > > progress. > > > > > > > > The name might be a little bit confusing as it may be mistaken for a > > > > transaction id / TID that uniquely identifies every transaction. The > > > name > > > > and the semantics were defined in the original exactly-once-semantics > > > (EoS) > > > > proposal ( > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging > > > > ) > > > > and KIP-939 just build on top of that. > > > > > > > > > I'm curious to understand what happens if the producer dies, and > does > > > not > > > > come up and recover the pending transaction within the transaction > > > timeout > > > > interval. > > > > > > > > If the producer / application never comes back, the transaction will > > > remain > > > > in prepared (a.k.a. "in-doubt") state until an operator forcefully > > > > terminates the transaction. That's why there is a new ACL is defined > > in > > > > this proposal -- this functionality should only provided to > > applications > > > > that implement proper recovery logic. > > > > > > > > -Artem > > > > > > > > On Tue, Aug 22, 2023 at 12:52 AM Arjun Satish < > arjun.sat...@gmail.com> > > > > wrote: > > > > > > > > > Hello Artem, > > > > > > > > > > Thanks for the KIP. > > > > > > > > > > I have the same question as Roger on concurrent writes, and an > > > additional > > > > > one on consumer behavior. Typically, transactions will timeout if > not > > > > > committed within some time interval. With the proposed changes in > > this > > > > KIP, > > > > > consumers cannot consume past the ongoing transaction. I'm curious > to > > > > > understand what happens if the producer dies, and does not come up > > and > > > > > recover the pending transaction within the transaction timeout > > > interval. > > > > Or > > > > > are we saying that when used in this 2PC context, we should > configure > > > > these > > > > > transaction timeouts to very large durations? > > > > > > > > > > Thanks in advance! > > > > > > > > > > Best, > > > > > Arjun > > > > > > > > > > > > > > > On Mon, Aug 21, 2023 at 1:06 PM Roger Hoover < > roger.hoo...@gmail.com > > > > > > > > wrote: > > > > > > > > > > > Hi Artem, > > > > > > > > > > > > Thanks for writing this KIP. Can you clarify the requirements a > > bit > > > > more > > > > > > for managing transaction state? It looks like the application > must > > > > have > > > > > > stable transactional ids over time? What is the granularity of > > > those > > > > > ids > > > > > > and producers? Say the application is a multi-threaded Java web > > > > server, > > > > > > can/should all the concurrent threads share a transactional id > and > > > > > > producer? That doesn't seem right to me unless the application > is > > > > using > > > > > > global DB locks that serialize all requests. Instead, if the > > > > application > > > > > > uses row-level DB locks, there could be multiple, concurrent, > > > > independent > > > > > > txns happening in the same JVM so it seems like the granularity > > > > managing > > > > > > transactional ids and txn state needs to line up with granularity > > of > > > > the > > > > > DB > > > > > > locking. > > > > > > > > > > > > Does that make sense or am I misunderstanding? > > > > > > > > > > > > Thanks, > > > > > > > > > > > > Roger > > > > > > > > > > > > On Wed, Aug 16, 2023 at 11:40 PM Artem Livshits > > > > > > <alivsh...@confluent.io.invalid> wrote: > > > > > > > > > > > > > Hello, > > > > > > > > > > > > > > This is a discussion thread for > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-939%3A+Support+Participation+in+2PC > > > > > > > . > > > > > > > > > > > > > > The KIP proposes extending Kafka transaction support (that > > already > > > > uses > > > > > > 2PC > > > > > > > under the hood) to enable atomicity of dual writes to Kafka and > > an > > > > > > external > > > > > > > database, and helps to fix a long standing Flink issue. > > > > > > > > > > > > > > An example of code that uses the dual write recipe with JDBC > and > > > > should > > > > > > > work for most SQL databases is here > > > > > > > https://github.com/apache/kafka/pull/14231. > > > > > > > > > > > > > > The FLIP for the sister fix in Flink is here > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255071710 > > > > > > > > > > > > > > -Artem > > > > > > > > > > > > > > > > > > > > > > > > > > > >