[ https://issues.apache.org/jira/browse/FLINK-31363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17699131#comment-17699131 ]
Tzu-Li (Gordon) Tai edited comment on FLINK-31363 at 3/10/23 10:36 PM: ----------------------------------------------------------------------- Looking at the KafkaProducer code, the {{TransactionManager}} keeps a {{transactionStarted}} flag that is only set when a record has actually been sent to the transaction. On {{commitTransaction()}} / {{abortTransaction()}} API calls on the Java client, if the flag is false, then the client won't actually send a {{EndTxnRequest}} to the brokers. So: {code:java} producer.beginTransaction(); producer.commitTransaction(); // or producer.beginTransaction(); producer.abortTransaction(); {code} the above doesn't throw in normal continuous execution. It only throws if there was job downtime between the {{beginTransction()}} call and the commit/abort call (because the flag would have been cleared) So - I think the correct way to fix this is that we need to additionally persist the {{transactionStarted}} flag in Flink checkpoints as transaction metadata, and then set that appropriately when creating the recovery producer at restore time. was (Author: tzulitai): Looking at the KafkaProducer code, the {{TransactionManager}} keeps a {{transactionStarted}} flag that is only set when a record has actually been sent to the transaction. On {{commitTransaction()}} / {{abortTransaction()}} API calls on the Java client, if the flag is false, then the client won't actually send a {{EndTxnRequest}} to the brokers. So: {code:java} producer.beginTransaction(); producer.commitTransaction(); // or producer.beginTransaction(); producer.abortTransaction(); {code} the above doesn't throw in normal continuous execution. It only throws if there was job downtime between the {{beginTransction()}} call and the commit/abort call. So - I think the correct way to fix this is that we need to additionally persist the {{transactionStarted}} flag in Flink checkpoints as transaction metadata, and then set that appropriately when creating the recovery producer at restore time. > KafkaSink failed to commit transactions under EXACTLY_ONCE semantics > -------------------------------------------------------------------- > > Key: FLINK-31363 > URL: https://issues.apache.org/jira/browse/FLINK-31363 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka > Affects Versions: 1.17.0, 1.16.1, 1.18.0 > Reporter: lightzhao > Priority: Major > Labels: pull-request-available > Attachments: image-2023-03-08-10-54-51-410.png > > > When KafkaSink starts Exactly once and no data is written to the topic during > a checkpoint, the transaction commit exception is triggered, with the > following exception. > [Transiting to fatal error state due to > org.apache.kafka.common.errors.InvalidTxnStateException: The producer > attempted a transactional operation in an invalid state.] -- This message was sent by Atlassian Jira (v8.20.10#820010)