David Morávek created FLINK-23896:
-------------------------------------

             Summary: The new KafkaSink drops data if job fails between 
checkpoint and transaction commit.
                 Key: FLINK-23896
                 URL: https://issues.apache.org/jira/browse/FLINK-23896
             Project: Flink
          Issue Type: Bug
          Components: Connectors / Kafka
            Reporter: David Morávek
             Fix For: 1.14.0


* Any time a new *transactional producer* is started, 
"[KafkaProducer#initTransactions()|https://kafka.apache.org/24/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#initTransactions--]";
 needs to be called in order to obtain new *ProducerId* from 
*TransactionCoordinator* (Kafka Broker component).
 ** *ProducerId* is increased every time a new producer with the same 
*TransactionalId* is registered.
 ** Publication of new ProducerId *FENCES* all prior ProducerIds and *ABORTS* 
all of uncommitted transactions assigned with them.
 * *KafkaCommitter* uses a separate producer, that hacks into Kafka internals 
and resumes transaction using epoch and producer, without actually assigning a 
new ProducerId for itself. This committer uses *ProducerId* that is stored in 
*KafkaComittable* state to commit transaction.
 * If a *new producer is started before committing the transaction* (this can 
happen in some failover scenarios), ProducerId stored in the state is already 
*FENCED* and commit fails with *ProducerFencedException*. Because we currently 
ignore this exception (we just log a warning), we effectively *DROP* all 
uncommitted data from the previous checkpoint.

Basically any job failure that happens between successfully taking a checkpoint 
and committing transactions, will trigger this behavior.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to