David Morávek created FLINK-23896: ------------------------------------- Summary: The new KafkaSink drops data if job fails between checkpoint and transaction commit. Key: FLINK-23896 URL: https://issues.apache.org/jira/browse/FLINK-23896 Project: Flink Issue Type: Bug Components: Connectors / Kafka Reporter: David Morávek Fix For: 1.14.0
* Any time a new *transactional producer* is started, "[KafkaProducer#initTransactions()|https://kafka.apache.org/24/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#initTransactions--]" needs to be called in order to obtain new *ProducerId* from *TransactionCoordinator* (Kafka Broker component). ** *ProducerId* is increased every time a new producer with the same *TransactionalId* is registered. ** Publication of new ProducerId *FENCES* all prior ProducerIds and *ABORTS* all of uncommitted transactions assigned with them. * *KafkaCommitter* uses a separate producer, that hacks into Kafka internals and resumes transaction using epoch and producer, without actually assigning a new ProducerId for itself. This committer uses *ProducerId* that is stored in *KafkaComittable* state to commit transaction. * If a *new producer is started before committing the transaction* (this can happen in some failover scenarios), ProducerId stored in the state is already *FENCED* and commit fails with *ProducerFencedException*. Because we currently ignore this exception (we just log a warning), we effectively *DROP* all uncommitted data from the previous checkpoint. Basically any job failure that happens between successfully taking a checkpoint and committing transactions, will trigger this behavior. -- This message was sent by Atlassian Jira (v8.3.4#803005)