Thanks Andrew for the careful read .
> (1) "Not much I can do with TransactionSession that I cannot do with 
> KafkaProducer today."
There are three things Txn Session enables that KafkaProducer structurally 
cannot, and which is pretty much needd: 
1. Resuming a prepared transaction without reflection: Today Flink calls 
FlinkKafkaInternalProducer.resumeTransaction(producerId, epoch) [1] via 
reflection on TransactionManager internals ("Reflection; manually forces state 
into TransactionManager internals"). TransactionSession.resume(...) replaces 
that with a stable public API. KIP-939's prepareTransaction() / 
completeTransaction() are exactly the operations that need this; they live on 
KafkaProducer today even though they don't produce records.
2. Sharing one transaction identity across two distinct clients. A single 
(producerId, epoch, state) cannot today be held by both a KafkaProducer and a 
KafkaShareConsumer simultaneously, because the state is private to a 
KafkaProducer instance.
3. Custom 2PC coordinators: Today this requires holding a KafkaProducer (with 
its buffer pool, sender thread, serializers) purely to drive the transaction 
lifecycle for a workload that produces no records.
So it is about what can Txn Session can do without forcing the user to 
instantiate a KafkaProducer they don't otherwise need, or reach into private 
implementations.
------
> (2) "Two variants of transactional KafkaProducer."
The KIP currently keeps both paths and labels the producer-bound methods 
"convenience wrappers". We can document producer-bound methods as the 
recommended API for simple produce-and-commit, Txn Session for advanced cases 
later on we can have deprecation of producer bound APIs.
The number of objects in a CTP loop is in fact unchanged: TS today is 
KafkaProducer.TransactionManager, just named and made public. Before KIP-1310: 
one Producer, one Consumer, one (private) TransactionManager. After KIP-1310: 
one Producer, one Consumer, one (public) TransactionSession. Same count, one is 
now correctly named.

---
> (3) "Consumer is still unaware of transactions; with TS there are even more 
>moving parts."
I am align with this and even want to keep it unbounded with both producer and 
consumer. 
We can treat consumer-awareness as a follow-up KIP (provisionally 
"Transaction-Aware KafkaConsumer") that adds something like 
KafkaConsumer.bindTransactionSession(session). After binding: the consumer 
refuses to commit independently, rolls its in-memory cursor back on 
abortTransaction(), and blocks poll() during commit/abort transitions. 
We cannot ask KafkaConsumer to bind to "a part of KafkaProducer's internal 
state".
--
Ref:
[1] 
https://github.com/apache/flink-connector-kafka/blob/0caa1c9c826f93ac37fb4db454b81c1cf83b996c/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/FlinkKafkaInternalProducer.java#L300

KIP: https://cwiki.apache.org/confluence/x/nJY8G 




Thanks,
Shekharrajak


 

    On Tuesday 21 April 2026 at 03:09:41 am GMT+5:30, Andrew Schofield 
<[email protected]> wrote:  
 
 Hi,
I've taken a good look at KIP-1310 and referred back to KIP-939 to refresh my 
memory. Thanks for putting in the effort to write the proposal.

While I don't like the fact that the transactional logic is locked inside 
KafkaProducer, it doesn't seem to me that the benefits of TransactionSession 
are really that great in practice. There's not much I can do using 
TransactionSession that I cannot do with KafkaProducer today. The main benefit 
is that if I want to use a consumer transactionally without sending any 
records, I no longer need to create a KafkaProducer. In these situations, the 
KafkaProducer is really just a wrapper for the transaction manager, so you've 
made that a bit clearer, but it's still very fiddly. I need to construct a 
TransactionSession and then a KafkaConsumer, and use them together in a very 
similar way as I have to use the KafkaProducer today.

The introduction of TransactionSession doesn't make the API any tidier. For 
example, there are now two variants of transactional KafkaProducer, either one 
that was instantiated with transactional.id configuration, or one that was 
instantiated with a TransactionSession. With the former, you can use 
KafkaProducer.commitTransaction(), while with the latter, you cannot.

The familiar usability problems of KafkaConsumer being unaware of transactions 
remains, meaning that if you use KafkaProducer to send offsets to a transaction 
and it goes wrong, you need to discard the KafkaConsumer because it needs to be 
re-initialized from the last committed offset. With TransactionSession, there 
are even more moving parts.

On reflection, it seems to me that a more straightforward way of adding EoS 
support to share groups would be preferable. That is a big piece of work in 
itself.

Thanks,
Andrew

On 2026/04/07 15:00:44 Shekhar Rajak wrote:
> 
> Hi everyone,
> 
> I’ve published KIP-1310: General Transaction Session and would like to open 
> the floor for discussion.
> 
> Historically, Kafka’s transaction logic has been locked inside KafkaProducer. 
> This worked well for simple "write-and-commit" patterns, but as the ecosystem 
> matures, this monolithic design has become a hurdle. We now have multiple 
> entities e.g. Flink/External Coordinators, Share Group Consumers (KIP-1289), 
> Kafka Connect—that need to participate in or complete transactions without 
> needing the heavy baggage of a full KafkaProducer (record batching, 
> serializers, sender threads, etc.).
> 
> KIP-1310 extracts transaction identity and lifecycle management into a 
> first-class TransactionSession client.
> 
> Proposal:
>    
>    -    
> New TransactionSession Class: A lightweight, thread-safe object for identity 
> (producerId/epoch) and lifecycle (initialize, beginTransaction, commit).
> 
>    -    
> Reflection-Free Recovery: Replaces reflection workarounds with a public 
> TransactionSession.resume() API for external coordinators.
> 
>    -    
> Decoupled Architecture: Allows KafkaProducer and KafkaShareConsumer to share 
> a single transaction identity for atomic "Exactly-Once" Kafka-to-Kafka 
> pipelines (KIP-1302).
> 
>    -    
> Automatic Heartbeats: Provides a dedicated home for the background 
> transaction heartbeat (KAFKA-20381), independent of the producer's 
> data-sending loop.
> 
> 
> This change is fully backward compatible and introduces no new wire protocol 
> changes—it simply aligns the client API with what the Kafka protocol already 
> supports.
> 
> I’m looking forward to your feedback on the proposed interfaces and the 
> refactoring of TransactionManager.
> KIP Link: https://cwiki.apache.org/confluence/x/nJY8G 
> Best regards,Shekharhttps://github.com/Shekharrajak 
> 
> 
> 
  

Reply via email to