I ran a few tests and was able to find the case where there won't be a data loss. And here's how the two tests are different.
*The case where data loss is observed:* 1) Kafka source receives data. (Session window trigger hasn't been fired yet.) 2) Bring all Kafka brokers down. 3) Flink triggers a checkpoint. Checkpoints are successful. (Checkpoint 7) Kafka source task commits the correct offsets. Not sure what was check-pointed for the session window task. Relevant logs: * 2017-06-01 20:21:48,901 DEBUG org.apache.flink.yarn.YarnTaskManager - Receiver TriggerCheckpoint 7@1496348508893 for b93b267f087865c245babeb259c76593. 2017-06-01 20:21:49,011 DEBUG org.apache.flink.yarn.YarnTaskManager - Receiver ConfirmCheckpoint 7@1496348508893 for b93b267f087865c245babeb259c76593. 2017-06-01 20:21:49,011 DEBUG org.apache.flink.yarn.YarnTaskManager - Receiver ConfirmCheckpoint 7@1496348508893 for b4a5c72b52779ab9b2b093b85b8b20c9. * 4) Session window trigger is fired and windows are merged and evaluated. 5) Flink attempts to send data to Kafka but fails because brokers are down. 6) Flink restarts the job and restores state from checkpoint 7. However, there's no state available in the checkpoint for the session windows. Otherwise we would have seen the following in the logs. Right? HeapKeyedStateBackend - Initializing heap keyed state backend from snapshot HeapKeyedStateBackend - Restoring snapshot from state handles *Relevant logs: Job Manager: 2017-06-01 20:22:54,585 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Restoring from latest valid checkpoint: Checkpoint 7 @ 1496348508893 for 510a7a83f509adace6704e7f2caa0b75. Task Manager: 2017-06-01 20:22:57,349 INFO org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - Initializing heap keyed state backend with stream factory.* *The case where data loss is not observed:* 1) Kafka source receives data. 2) Bring all Kafka brokers down 3) Session window trigger is fired and windows are merged. 4) Flink triggers a checkpoint. Checkpoints are successful. (Checkpoint 12) Kafka source task commits the correct offsets. Not sure what was check-pointed for the session window task. *Relevant logs: 2017-06-02 18:53:52,953 DEBUG org.apache.flink.yarn.YarnTaskManager - Receiver TriggerCheckpoint 12@1496429632943 for 138cd5e5c8065174e8e326fbb66ac4cd. 2017-06-02 18:53:53,151 DEBUG org.apache.flink.yarn.YarnTaskManager - Receiver ConfirmCheckpoint 12@1496429632943 for 138cd5e5c8065174e8e326fbb66ac4cd. 2017-06-02 18:53:53,151 DEBUG org.apache.flink.yarn.YarnTaskManager - Receiver ConfirmCheckpoint 12@1496429632943 for dcfb670859a7fbf4ddb90c70ed5344dd.* 5) Session windows are evaluated. 6) Flink attempts to send data to Kafka but fails because brokers are down. 7) Flink restarts the job and restores state from checkpoint 12. State is available for the session window in the checkpoint and the task is restored with the state. Relevant logs: JobManager logs: * 2017-06-02 18:54:19,826 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Restoring from latest valid checkpoint: Checkpoint 12 @ 1496429632943 for ea51350d9d689b2b09ab8fd2fe0f6454. * TaskManager logs: *2017-06-02 18:54:23,582 INFO org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - Initializing heap keyed state backend from snapshot. 2017-06-02 18:54:23,583 DEBUG org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - Restoring snapshot from state handles: [KeyGroupsStateHandle{groupRangeOffsets=KeyGroupRangeOffsets{keyGroupRange=KeyGroupRan ge{startKeyGroup=0, endKeyGroup=127}, offsets=[3587, 3597, 3607, 3617, 3627, 3637, 3647, 3657, 3667, 3685, 3695, 3705, 3715, 3725, 3735, 3745, 3755, 3765, 3775, 3785, 3795, 3813, 3823, 3841, 3851, 3861, 3871, 3881, 3891 , 3901, 3911, 3921, 3931, 3941, 3951, 3961, 3971, 3981, 3991, 4009, 4019, 4029, 15215, 15225, 15235, 15245, 15255, 15265, 15275, 15285, 15295, 16187, 16197, 16207, 16217, 16227, 16237, 16247, 16257, 17149, 17159, 17169, 17179, 17189, 18081, 18091, 18101, 18111, 18121, 18131, 18141, 18151, 18161, 18171, 18181, 18191, 18201, 19093, 19985, 19995, 20005, 20015, 20025, 20043, 20061, 20071, 20081, 20099, 20109, 20119, 20129, 20139, 20149, 2 0159, 20169, 20179, 20189, 20199, 20209, 20219, 20229, 20239, 20249, 20259, 20269, 20287, 20297, 20307, 20317, 20327, 20337, 20347, 33039, 33049, 33059, 33069, 34700, 34710, 34720, 34730, 34740, 34750, 34760, 34770, 34780, 34790, 34800, 34810]}, data=File State: hdfs://dc1udtlhcld005.stack.qadev.corp:8022/user/harmony_user/flink-checkpoint/ea51350d9d689b2b09ab8fd2fe0f6454/chk-12/8373feed-7494-4b43-98ae-e1c4214b7890 [34820 bytes]}].* Assuming that I am correctly interpreting the logs, I can think of two conclusions about why we observed data loss in the first case: 1) I am missing some Flink setting. 2) Flink thought it check-pointed the windows data successfully, but didn't. (We're using cloudera hadoop, but haven't built Flink with cloudera hadoop binaries. ) 3) Flink is not set up to check-point the data in session windows before they are merged? I have attached the log files for the successful run with this post. Please let us know what you guys think. Thanks for your patience. jobManagerNoDataLoss.log <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n13467/jobManagerNoDataLoss.log> tmOneNoDataLoss.log <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n13467/tmOneNoDataLoss.log> tmTwoNoDataLoss.log <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n13467/tmTwoNoDataLoss.log> -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-tp11413p13467.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.