Kevin Tseng created FLINK-33545:
-----------------------------------

             Summary: KafkaSink implementation can cause dataloss during broker 
issue when not using EXACTLY_ONCE if there's any batching
                 Key: FLINK-33545
                 URL: https://issues.apache.org/jira/browse/FLINK-33545
             Project: Flink
          Issue Type: Bug
          Components: Connectors / Kafka
    Affects Versions: 1.18.0
            Reporter: Kevin Tseng


In the current implementation of KafkaSource and KafkaSink there are some 
assumption that were made:
 # KafkaSource completely relies on Checkpoint to manage and track its offset 
in *KafkaSourceReader<T>* class
 # KafkaSink in *KafkaWriter<IN>* class only performs catch-flush when 
*DeliveryGuarantee.EXACTLY_ONCE* is specified.

KafkaSource is assuming that checkpoint should be properly fenced and 
everything it had read up-til checkpoint being initiated will be processed or 
recorded by operators downstream, including the TwoPhaseCommiter such as 
*KafkaSink*

*KafkaSink* goes by the model of:

 
{code:java}
flush -> prepareCommit -> commit{code}
 

In a scenario that:
 * KafkaSource ingested records #1 to #100
 * KafkaSink only had chance to send records #1 to #96
 * with a batching interval of 5ms

when checkpoint has been initiated, flush will only confirm the sending of 
record #1 to #96.

This allows checkpoint to proceed as there's no error, and record #97 to 100 
will be batched after first flush.

Now, if broker goes down / has issue that caused the internal KafkaProducer to 
not be able to send out the record after a batch, and is on a constant 
retry-cycle (default value of KafkaProducer retries is Integer.MAX_VALUE), 
*WriterCallback* error handling will never be trigger until the next checkpoint 
flush.

This can be tested by creating a faulty Kafka cluster and run the following 
code:
{code:java}
try {
    for (int i = 0; i < 10; i++) {
        System.out.printf("sending record #%d\n", i);
        String data = UUID.randomUUID().toString();
        final ProducerRecord<String, String> record = new 
ProducerRecord<>(TOPIC, Integer.toString(i), data);
        producer.send(record, new CB(Integer.toString(i), data));
        Thread.sleep(10000); //sleep for 5 seconds
    }
} catch (Exception e) {
    e.printStackTrace();
} finally {
    System.out.println("flushing");
    producer.flush();
    System.out.println("closing");
    producer.close();
}{code}
Once callback returns due to network timeout, it will cause Flink to restart 
from previously saved checkpoint (which recorded reading up to record #100), 
but KafkaWriter never sent record #97 to #100.

This will result in dataloss of record #97 to #100

Because KafkaWriter only catches error *after* callback, if callback is never 
invoked (due to broker issue) right after the first flush has taken place, 
those records are effectively gone unless someone decided to go back and look 
for it.

This behavior should be ok if user has set {*}DeliveryGuarantee.NONE{*}, but is 
not expected for {*}DeliveryGuarantee.AT_LEAST_ONCE{*}.

There is a divergence of the process in the event of {*}EXACTLY_ONCE{*}. ** 

prepareCommit will produce a list of KafkaCommittable that corresponds to 
Transactional KafkaProducer to be committed. And a catch up flush will take 
place during *commit* step. Whether this was intentional or not, due to the 
fact that flush is a blocking call, the second flush for EXACTLY_ONCE at the 
end of EXACTLY_ONCE actually ensured everything fenced in the current 
checkpoint will be sent to Kafka, or fail the checkpoint if not successful.

 

Due the above finding, I'm recommending one of the following fixes:
 # need to perform second flush for AT_LEAST_ONCE
 # or move flush to the end of the KafkaSink process.

I'm leaning towards 2nd option as it does not make sense to flush then do 
checkpoint, it should be right before checkpoint completes then we flush, given 
that's what commit is meant to do.

 

This issue: https://issues.apache.org/jira/browse/FLINK-31305 was supposed to 
fix this but it never really did.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to