It's hacked in Apache Flink using reflection: https://github.com/apache/flink/blob/c7bf460b15ff1501f1d0ffa24ad5a074032bc503/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaInternalProducer.java#L138-L164 It would be nice to have this feature supported in the API.
Viliam On Thu, 6 Jun 2019 at 10:56, Viliam Durina <vil...@hazelcast.com> wrote: > > The aim of having transactional.id configured for the producer is, in > my understanding, to fence off a zombie producer and to proactively > abort its transactions to avoid the need to wait for a timeout. > > What I'm interested in doing is to be able to continue the > transaction. For example: > > producer.beginTransaction(); > producer.send(...); > // now the producer crashes > > // start a new producer with same transactional.id > producer = new KafkaProducer(...); > // this aborts the unfinished transaction of the previous producer > producer.initTransactions(); > > My gut feeling is that it should be technically possible, there's just > no API for that. Is there anything that prevents us from doing that? > > Why do I need this? The Kafka transaction in my case is a part of a > larger distributed transaction, which failed during the 2nd phase. The > transaction coordinator saved its state and was restarted and knows > that some participants might have committed and some not, therefore it > requires all participants to finish the commit from previous run. > > Viliam