[ 
https://issues.apache.org/jira/browse/FLINK-33545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17791903#comment-17791903
 ] 

Kevin Tseng edited comment on FLINK-33545 at 12/1/23 5:16 AM:
--------------------------------------------------------------

Hi Mason,

thank you for the comment.

In the issue reported i made couple assumptions:
 # Flink Checkpoint does complete without any hindrance, causing future job 
failure not to recover from proper offset, leading to data loss
 # Broker has successfully acked record during first flush (with ack=-1 or 
ack=all) as by design, making it impossible for data loss to be originated from 
broker issue

{code:java}
Are you saying that an exception is not thrown by the KafkaWriter#flush method 
due to the Kafka producer timeout configuration?{code}
observation showed that first flush does complete successfully, which allowed 
checkpoint to proceed, job failure caused by Broker outage is observed and 
Flink will attempt to restore from the created checkpoint (the one that was 
successfully created)
{code:java}
The checkpoint barrier would have been between record 96 and 97. So the 
checkpoint afterwards would covers the at-least-once guarantee of 97-100. If 
you are saying that the checkpoint barrier is not respected, then that would be 
a bug in Flink.{code}
my understanding is based on the documentation

[https://nightlies.apache.org/flink/flink-docs-master/docs/learn-flink/fault_tolerance/]

[https://flink.apache.org/2018/02/28/an-overview-of-end-to-end-exactly-once-processing-in-apache-flink-with-apache-kafka-too/]

checkpoint barrier produced by the initial operator of a task manager 
(KafkaSource in this case) and flow along the graph within the same task 
manager, not every operator will generate a barrier.

In this scenario KafkaSource will generate a barrier at record 100 instead of 
between 96 and 97, which it will successfully complete as there's no issue 
committing read offset back to Broker at this point.

this is based on the precondition of everything before the barrier generated 
will be processed or tracked in the state, if anything breaks this 
precondition, there will be data loss.

Due to this, i believe the issue lies within the implementation of KafkaSink 
that did not fully honor the contract of checkpoint.
{code:java}
By any chance, does your KafkaSink serialization schema produce multiple 
records from one input record? {code}
data loss has been observed in 1-to-1 record streaming as well as 1-to-many 
scenarios.
{code:java}
The original description describes a scenario when the flush succeeds after a 
broker becomes healthy again. If the flush request is within checkpoint 
timeout, then this is fine right? If not, I would expect Flink to fail the 
checkpoint as it as exceeded the checkpoint timeout.{code}
As per observation, if broker became healthy within the producer timeout / 
checkpoint timeout period, there's no data loss observed.

If broker incurred network / unexpected failure for a prolong period of time, 
data loss will be observed from time to time.

the requirement for data loss to appear to be Flink failure induced restart.

 

My hypothesis after process of elimination, albeit with some uncertainty is 
that:

Based on the information illustrated in the implementation of prebarrier: 
https://issues.apache.org/jira/browse/FLINK-9428

Pre-Barrier will be invoked before barrier is emitted, but there's no 
mentioning of Pre-Barrier blocking other records from being processed 
simultaneously and I was not able to find any synchronize / blocking.

If the above is true and Flink failed right after checkpoint succeed, there 
will be data loss as KafkaSink does not do anything after Pre-Barrier phase 
except when being set as EXACTLY_ONCE

However, given that i'm not able to confirm some of the suspicion i have, I 
figured the safest way to perform flush is within the commit phase of Two Phase 
Commit, instead of pre-commit / prepareCommit phase, hence the adding flush to 
KafkaCommitter for AT_LEAST_ONCE.

 

Let me know if I have any misunderstand of how checkpointing, barrier handling, 
and 2PC works, or if there's any mistake in my assumption.

Thanks

 

 


was (Author: JIRAUSER299092):
Hi Mason,

thank you for the comment.

In the issue reported i made couple assumptions:
 # Flink Checkpoint does complete without any hindrance, causing future job 
failure not to recover from proper offset, leading to data loss
 # Broker has successfully acked record during first flush (with ack=-1 or 
ack=all) as by design, making it impossible for data loss to be originated from 
broker issue

 

 
{code:java}
Are you saying that an exception is not thrown by the KafkaWriter#flush method 
due to the Kafka producer timeout configuration?{code}
observation showed that first flush does complete successfully, which allowed 
checkpoint to proceed, job failure caused by Broker outage is observed and 
Flink will attempt to restore from the created checkpoint (the one that was 
successfully created)
{code:java}
The checkpoint barrier would have been between record 96 and 97. So the 
checkpoint afterwards would covers the at-least-once guarantee of 97-100. If 
you are saying that the checkpoint barrier is not respected, then that would be 
a bug in Flink.{code}
 

my understanding is based on the documentation

[https://nightlies.apache.org/flink/flink-docs-master/docs/learn-flink/fault_tolerance/]

[https://flink.apache.org/2018/02/28/an-overview-of-end-to-end-exactly-once-processing-in-apache-flink-with-apache-kafka-too/]

checkpoint barrier produced by the initial operator of a task manager 
(KafkaSource in this case) and flow along the graph within the same task 
manager, not every operator will generate a barrier.

In this scenario KafkaSource will generate a barrier at record 100 instead of 
between 96 and 97, which it will successfully complete as there's no issue 
committing read offset back to Broker at this point.

this is based on the precondition of everything before the barrier generated 
will be processed or tracked in the state, if anything breaks this 
precondition, there will be data loss.

Due to this, i believe the issue lies within the implementation of KafkaSink 
that did not fully honor the contract of checkpoint.

 

 
{code:java}
By any chance, does your KafkaSink serialization schema produce multiple 
records from one input record? {code}
data loss has been observed in 1-to-1 record streaming as well as 1-to-many 
scenarios.

 

 

 
{code:java}
The original description describes a scenario when the flush succeeds after a 
broker becomes healthy again. If the flush request is within checkpoint 
timeout, then this is fine right? If not, I would expect Flink to fail the 
checkpoint as it as exceeded the checkpoint timeout.{code}
 

As per observation, if broker became healthy within the producer timeout / 
checkpoint timeout period, there's no data loss observed.

If broker incurred network / unexpected failure for a prolong period of time, 
data loss will be observed from time to time.

the requirement for data loss to appear to be Flink failure induced restart.

 

My hypothesis after process of elimination, albeit with some uncertainty is 
that:

Based on the information illustrated in the implementation of prebarrier: 
https://issues.apache.org/jira/browse/FLINK-9428

Pre-Barrier will be invoked before barrier is emitted, but there's no 
mentioning of Pre-Barrier blocking other records from being processed 
simultaneously and I was not able to find any synchronize / blocking.

If the above is true and Flink failed right after checkpoint succeed, there 
will be data loss as KafkaSink does not do anything after Pre-Barrier phase 
except when being set as EXACTLY_ONCE

However, given that i'm not able to confirm some of the suspicion i have, I 
figured the safest way to perform flush is within the commit phase of Two Phase 
Commit, instead of pre-commit / prepareCommit phase, hence the adding flush to 
KafkaCommitter for AT_LEAST_ONCE.

 

Let me know if I have any misunderstand of how checkpointing, barrier handling, 
and 2PC works, or if there's any mistake in my assumption.

Thanks

 

 

> KafkaSink implementation can cause dataloss during broker issue when not 
> using EXACTLY_ONCE if there's any batching
> -------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-33545
>                 URL: https://issues.apache.org/jira/browse/FLINK-33545
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.18.0
>            Reporter: Kevin Tseng
>            Assignee: Kevin Tseng
>            Priority: Major
>              Labels: pull-request-available
>
> In the current implementation of KafkaSource and KafkaSink there are some 
> assumption that were made:
>  # KafkaSource completely relies on Checkpoint to manage and track its offset 
> in *KafkaSourceReader<T>* class
>  # KafkaSink in *KafkaWriter<IN>* class only performs catch-flush when 
> *DeliveryGuarantee.EXACTLY_ONCE* is specified.
> KafkaSource is assuming that checkpoint should be properly fenced and 
> everything it had read up-til checkpoint being initiated will be processed or 
> recorded by operators downstream, including the TwoPhaseCommiter such as 
> *KafkaSink*
> *KafkaSink* goes by the model of:
>  
> {code:java}
> flush -> prepareCommit -> commit{code}
>  
> In a scenario that:
>  * KafkaSource ingested records #1 to #100
>  * KafkaSink only had chance to send records #1 to #96
>  * with a batching interval of 5ms
> when checkpoint has been initiated, flush will only confirm the sending of 
> record #1 to #96.
> This allows checkpoint to proceed as there's no error, and record #97 to 100 
> will be batched after first flush.
> Now, if broker goes down / has issue that caused the internal KafkaProducer 
> to not be able to send out the record after a batch, and is on a constant 
> retry-cycle (default value of KafkaProducer retries is Integer.MAX_VALUE), 
> *WriterCallback* error handling will never be triggered until the next 
> checkpoint flush.
> This can be tested by creating a faulty Kafka cluster and run the following 
> code:
> {code:java}
> Properties props = new Properties(); 
> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
> props.put(ProducerConfig.CLIENT_ID_CONFIG, "example-producer");
> props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName());
> props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName()); 
> props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); 
> props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE); 
> props.put(ProducerConfig.ACKS_CONFIG, "all"); 
> final KafkaProducer<String, String> producer = new KafkaProducer<>(props);
> try {
>     for (int i = 0; i < 10; i++) {
>         System.out.printf("sending record #%d\n", i);
>         String data = UUID.randomUUID().toString();
>         final ProducerRecord<String, String> record = new 
> ProducerRecord<>(TOPIC, Integer.toString(i), data);
>         producer.send(record, new CB(Integer.toString(i), data));
>         Thread.sleep(10000); //sleep for 10 seconds
>     }
> } catch (Exception e) {
>     e.printStackTrace();
> } finally {
>     System.out.println("flushing");
>     producer.flush();
>     System.out.println("closing");
>     producer.close();
> }{code}
> Once callback returns due to network timeout, it will cause Flink to restart 
> from previously saved checkpoint (which recorded reading up to record #100), 
> but KafkaWriter never sent record #97 to #100.
> This will result in dataloss of record #97 to #100
> Because KafkaWriter only catches error *after* callback, if callback is never 
> invoked (due to broker issue) right after the first flush has taken place, 
> those records are effectively gone unless someone decided to go back and look 
> for it.
> This behavior should be ok if user has set {*}DeliveryGuarantee.NONE{*}, but 
> is not expected for {*}DeliveryGuarantee.AT_LEAST_ONCE{*}.
> There is a divergence of the process in the event of {*}EXACTLY_ONCE{*}.
> prepareCommit will produce a list of KafkaCommittable that corresponds to 
> Transactional KafkaProducer to be committed. And a catch up flush will take 
> place during *commit* step. Whether this was intentional or not, due to the 
> fact that flush is a blocking call, the second flush for EXACTLY_ONCE at the 
> end of EXACTLY_ONCE actually ensured everything fenced in the current 
> checkpoint will be sent to Kafka, or fail the checkpoint if not successful.
>  
> Due the above finding, I'm recommending one of the following fixes:
>  # need to perform second flush for AT_LEAST_ONCE
>  # or move flush to the end of the KafkaSink process.
> I'm leaning towards 2nd option as it does not make sense to flush then do 
> checkpoint, it should be right before checkpoint completes then we flush, 
> given that's what commit is meant to do.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to