It's hacked in Apache Flink using reflection:
https://github.com/apache/flink/blob/c7bf460b15ff1501f1d0ffa24ad5a074032bc503/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaInternalProducer.java#L138-L164
It would be nice to have this feature supported in the API.

Viliam

On Thu, 6 Jun 2019 at 10:56, Viliam Durina <vil...@hazelcast.com> wrote:
>
> The aim of having transactional.id configured for the producer is, in
> my understanding, to fence off a zombie producer and to proactively
> abort its transactions to avoid the need to wait for a timeout.
>
> What I'm interested in doing is to be able to continue the
> transaction. For example:
>
> producer.beginTransaction();
> producer.send(...);
> // now the producer crashes
>
> // start a new producer with same transactional.id
> producer = new KafkaProducer(...);
> // this aborts the unfinished transaction of the previous producer
> producer.initTransactions();
>
> My gut feeling is that it should be technically possible, there's just
> no API for that. Is there anything that prevents us from doing that?
>
> Why do I need this? The Kafka transaction in my case is a part of a
> larger distributed transaction, which failed during the 2nd phase. The
> transaction coordinator saved its state and was restarted and knows
> that some participants might have committed and some not, therefore it
> requires all participants to finish the commit from previous run.
>
> Viliam

Reply via email to