After reading about FlinkKafkaProducer011 and 2PC function in FLINK, I know, when snapshotState(), preCommit currentTransaction. add <currentTransaction, newTransaction> to the State. when Checkpoint done and notifyCheckpointComplete(), producer will commit currentTransaction to brokers. when initializeState(), restore from State. commit currentTransaction and abort newTransaction. And I have one question, what happens if program fails after notifyCheckpointComplete() done? As my opinion, when it recovers, it will re-commit what has committed which results duplicate.