[ 
https://issues.apache.org/jira/browse/FLINK-33545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17786907#comment-17786907
 ] 

Kevin Tseng commented on FLINK-33545:
-------------------------------------

Hi Tzu-Li,

Thanks for the response.

Below is my analysis of the issue, and please correct me if i am wrong on any 
part

 
{code:java}
this is where I'm a bit lost. A flush() call on a producer is blocking until 
all records are acked + after the blocking flush returns, we check if any of 
the records that were flushed resulted in an error in which case we fail the 
job and not complete the checkpoint.{code}
 

The problem with the first flush is that it happened before actual snapshot is 
tracked, inside `SinkWriteOperator.prepareSnapshotPreBarrier`
{code:java}
If broker is alive and well, the first flush will have no issue, which is what 
we are seeing.{code}
AT_LEAST_ONCE and NONE are currently behaving the same in this regards, since 
they don't generate commitable after this.

from this point til the actual snapshot being triggered in 
`KafkaSource.snapshotState` (record #1 ~ #100) there seem to be a race 
condition where some record may have been ingested, but wasn't part of the 
first flush (record #1 ~ #96) and have now made it to the producer (#97 ~ #100).

since the whole flink process is concurrent it only allows the internal 
KafkaProducer to actually commit its current buffer, not necessary all records 
that are still flowing through. And the way flink is utilizing KafkaProducer 
asynchronously, it can't catch error until KafkaProducer actually attempted to 
commit (flush).

There is only a short window of time between the first flush and the 
snapshotState of the KafkaSource (approx 15ms at most for situations that i 
have tested)

 
{code:java}
If I am understanding you correctly, as it sounds like in this ticket, that 
second flush actually is not a no-op?{code}
 

You are correct that the 2nd flush is a no-op as commitTransaction will flush 
before committing; making it actually doing 3 flushes in total when 
EXACTLY_ONCE is set. But there's still a flush that ensure all records have 
been committed before completing the checkpoint.

the problem is there's no 2nd flush for AT_LEAST_ONCE when checkpoint is 
finalizing to ensure there is no data left in the buffer / still being sent by 
the internal KafkaProducer.

This resulted in the current triggered checkpoint to be successful, as it only 
required first flush to be successful.

I supposed the original design decided that any read status of the non-flushed 
records can go into next checkpoint, but if broker is having issue at this 
point then the next checkpoint will not be successful, causing Flink restart 
from previous successful checkpoint handling to kick in.

 
{code:java}
So, how could the KafkaWriter only flush records #1 to #96 and already proceed 
to process the checkpoint barrier? This can only ever happen with unaligned 
checkpointing, and even in that case, records #97 to #100 will be part of the 
channel state that will be replayed upon recovery.{code}
I'm assuming this is referring to channel state of KafkaSink / functional 
operator precede it?

doesn't this require KafkaSource to keep track of these uncommitted records? or 
operators before KafkaSink and after KafkaSource to keep these records in 
states? because once it reaches KafkaSink, it doesn't keep track of anything 
beyond first flush.

 

I made following modification to test this scenario:
 # added flag to track pending records in FlinkKafkaInternalProducer between 
send & flush methods
 # changed prepareCommit to also check for this flag, and only return empty 
commitable by default if DeliveryGuarantee.NONE is set
 # in KafkaCommitter i use the flag producer.isInTransaction to determine 
whether it should be a commitTransaction call / flush call (as with the change 
#1 & #2 AT_LEAST_ONCE will also reach this segment)

in my testing the
{code:java}
LOG.debug("Committing {} committables.", committables); {code}
debug line that's supposed to be triggered only if there's record to be 
flushed/committed do get triggered from time to time when there's broker 
stability issue, confirming my suspicion.

 

> KafkaSink implementation can cause dataloss during broker issue when not 
> using EXACTLY_ONCE if there's any batching
> -------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-33545
>                 URL: https://issues.apache.org/jira/browse/FLINK-33545
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.18.0
>            Reporter: Kevin Tseng
>            Assignee: Kevin Tseng
>            Priority: Major
>
> In the current implementation of KafkaSource and KafkaSink there are some 
> assumption that were made:
>  # KafkaSource completely relies on Checkpoint to manage and track its offset 
> in *KafkaSourceReader<T>* class
>  # KafkaSink in *KafkaWriter<IN>* class only performs catch-flush when 
> *DeliveryGuarantee.EXACTLY_ONCE* is specified.
> KafkaSource is assuming that checkpoint should be properly fenced and 
> everything it had read up-til checkpoint being initiated will be processed or 
> recorded by operators downstream, including the TwoPhaseCommiter such as 
> *KafkaSink*
> *KafkaSink* goes by the model of:
>  
> {code:java}
> flush -> prepareCommit -> commit{code}
>  
> In a scenario that:
>  * KafkaSource ingested records #1 to #100
>  * KafkaSink only had chance to send records #1 to #96
>  * with a batching interval of 5ms
> when checkpoint has been initiated, flush will only confirm the sending of 
> record #1 to #96.
> This allows checkpoint to proceed as there's no error, and record #97 to 100 
> will be batched after first flush.
> Now, if broker goes down / has issue that caused the internal KafkaProducer 
> to not be able to send out the record after a batch, and is on a constant 
> retry-cycle (default value of KafkaProducer retries is Integer.MAX_VALUE), 
> *WriterCallback* error handling will never be triggered until the next 
> checkpoint flush.
> This can be tested by creating a faulty Kafka cluster and run the following 
> code:
> {code:java}
> Properties props = new Properties(); 
> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
> props.put(ProducerConfig.CLIENT_ID_CONFIG, "example-producer");
> props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName());
> props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName()); 
> props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); 
> props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE); 
> props.put(ProducerConfig.ACKS_CONFIG, "all"); 
> final KafkaProducer<String, String> producer = new KafkaProducer<>(props);
> try {
>     for (int i = 0; i < 10; i++) {
>         System.out.printf("sending record #%d\n", i);
>         String data = UUID.randomUUID().toString();
>         final ProducerRecord<String, String> record = new 
> ProducerRecord<>(TOPIC, Integer.toString(i), data);
>         producer.send(record, new CB(Integer.toString(i), data));
>         Thread.sleep(10000); //sleep for 10 seconds
>     }
> } catch (Exception e) {
>     e.printStackTrace();
> } finally {
>     System.out.println("flushing");
>     producer.flush();
>     System.out.println("closing");
>     producer.close();
> }{code}
> Once callback returns due to network timeout, it will cause Flink to restart 
> from previously saved checkpoint (which recorded reading up to record #100), 
> but KafkaWriter never sent record #97 to #100.
> This will result in dataloss of record #97 to #100
> Because KafkaWriter only catches error *after* callback, if callback is never 
> invoked (due to broker issue) right after the first flush has taken place, 
> those records are effectively gone unless someone decided to go back and look 
> for it.
> This behavior should be ok if user has set {*}DeliveryGuarantee.NONE{*}, but 
> is not expected for {*}DeliveryGuarantee.AT_LEAST_ONCE{*}.
> There is a divergence of the process in the event of {*}EXACTLY_ONCE{*}.
> prepareCommit will produce a list of KafkaCommittable that corresponds to 
> Transactional KafkaProducer to be committed. And a catch up flush will take 
> place during *commit* step. Whether this was intentional or not, due to the 
> fact that flush is a blocking call, the second flush for EXACTLY_ONCE at the 
> end of EXACTLY_ONCE actually ensured everything fenced in the current 
> checkpoint will be sent to Kafka, or fail the checkpoint if not successful.
>  
> Due the above finding, I'm recommending one of the following fixes:
>  # need to perform second flush for AT_LEAST_ONCE
>  # or move flush to the end of the KafkaSink process.
> I'm leaning towards 2nd option as it does not make sense to flush then do 
> checkpoint, it should be right before checkpoint completes then we flush, 
> given that's what commit is meant to do.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to