Hi Amara,

Could you elaborate a bit more detail about your job? How are you producing the 
4200 events into Kafka? Is that a separate process than the consuming job?
Do note that sending data to a Kafka topic is currently only at-least-once 
delivery, so if you’re sending the data to the Kafka topic as part of the 
failing job, it’ll be likely that there will be duplicates in the topic.

Also, how are you verifying the exactly-once? As I explained, Kafka producing 
has only at-least-once delivery guarantees, so checking for topic-to-topic 
duplicates is currently not a valid way to verify this. To properly verify 
this, I would suggest having a stateful operator that counts the number of 
events it has processed, and register that count as a Link managed state. That 
count should always be 4200 upon cancelling and restarting the job from 
savepoints.

How you’re creating the savepoint, and whether or not its externalized, is 
irrelevant here; the exactly-once state guarantees should still hold.

Cheers,
Gordon

On 24 May 2017 at 11:05:49 PM, F.Amara (fath...@wso2.com) wrote:

Hi all,  

I'm working with Flink 1.2.0, Kafka 0.10.0.1 and Hadoop 2.7.3.  

I have a Flink Highly Available cluster that reads data from a Kafka  
producer and processes them within the cluster. I randomly kill a Task  
Manager to introduce failure. Restart strategy is configured and the cluster  
does restart processing after a slight delay which is expected.  
But when I check the output after the final processing is done, I see  
duplicates (when sending 4200 events with a 40ms delay between them observed  
56 duplicates). As mentioned in [1] I have configured  
ExternalizedCheckpoints but still do observe duplicates.  

Even when I tested (cancelled job and restarted) using manual savepoints 2  
or 3 duplicates appeared!  

Can someone explain how I can use the savepoint created through  
ExternalizedCheckpoints to make the application start processing exactly  
from where it left? I need the application to automatically read the  
savepoint details and recover from that point onwards rather than doing it  
manually.  
Or else is the usual Savepoints capable of serving the same functionality  
automatically?  

[1]  
https://ci.apache.org/projects/flink/flink-docs-release-1.2/setup/checkpoints.html
  

Thanks,  
Amara  



--  
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Duplicated-data-when-using-Externalized-Checkpoints-in-a-Flink-Highly-Available-cluster-tp13301.html
  
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.  

Reply via email to