Hi Amara, Could you elaborate a bit more detail about your job? How are you producing the 4200 events into Kafka? Is that a separate process than the consuming job? Do note that sending data to a Kafka topic is currently only at-least-once delivery, so if you’re sending the data to the Kafka topic as part of the failing job, it’ll be likely that there will be duplicates in the topic.
Also, how are you verifying the exactly-once? As I explained, Kafka producing has only at-least-once delivery guarantees, so checking for topic-to-topic duplicates is currently not a valid way to verify this. To properly verify this, I would suggest having a stateful operator that counts the number of events it has processed, and register that count as a Link managed state. That count should always be 4200 upon cancelling and restarting the job from savepoints. How you’re creating the savepoint, and whether or not its externalized, is irrelevant here; the exactly-once state guarantees should still hold. Cheers, Gordon On 24 May 2017 at 11:05:49 PM, F.Amara (fath...@wso2.com) wrote: Hi all, I'm working with Flink 1.2.0, Kafka 0.10.0.1 and Hadoop 2.7.3. I have a Flink Highly Available cluster that reads data from a Kafka producer and processes them within the cluster. I randomly kill a Task Manager to introduce failure. Restart strategy is configured and the cluster does restart processing after a slight delay which is expected. But when I check the output after the final processing is done, I see duplicates (when sending 4200 events with a 40ms delay between them observed 56 duplicates). As mentioned in [1] I have configured ExternalizedCheckpoints but still do observe duplicates. Even when I tested (cancelled job and restarted) using manual savepoints 2 or 3 duplicates appeared! Can someone explain how I can use the savepoint created through ExternalizedCheckpoints to make the application start processing exactly from where it left? I need the application to automatically read the savepoint details and recover from that point onwards rather than doing it manually. Or else is the usual Savepoints capable of serving the same functionality automatically? [1] https://ci.apache.org/projects/flink/flink-docs-release-1.2/setup/checkpoints.html Thanks, Amara -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Duplicated-data-when-using-Externalized-Checkpoints-in-a-Flink-Highly-Available-cluster-tp13301.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.