I ran a few tests and was able to find the case where there won't be a data
loss. And here's how the two tests are different.

*The case where data loss is observed:*

1) Kafka source receives data.  (Session window trigger hasn't been fired
yet.)

2) Bring all Kafka brokers down. 

3) Flink triggers a checkpoint. Checkpoints are successful. (Checkpoint 7)
        Kafka source task commits the correct offsets. Not sure what was
check-pointed for the session window task.
        
        Relevant logs: 
*       2017-06-01 20:21:48,901 DEBUG org.apache.flink.yarn.YarnTaskManager     
                   
- Receiver TriggerCheckpoint 7@1496348508893 for
b93b267f087865c245babeb259c76593.
        2017-06-01 20:21:49,011 DEBUG org.apache.flink.yarn.YarnTaskManager     
                   
- Receiver ConfirmCheckpoint 7@1496348508893 for
b93b267f087865c245babeb259c76593.
        2017-06-01 20:21:49,011 DEBUG org.apache.flink.yarn.YarnTaskManager     
                   
- Receiver ConfirmCheckpoint 7@1496348508893 for
b4a5c72b52779ab9b2b093b85b8b20c9.
*
        
4) Session window trigger is fired and windows are merged and evaluated. 

5) Flink attempts to send data to Kafka but fails because brokers are down.

6) Flink restarts the job and restores state from checkpoint 7. However,
there's no state available in the checkpoint for the session windows.
        Otherwise we would have seen the following in the logs. Right? 
                HeapKeyedStateBackend     - Initializing heap keyed state 
backend from
snapshot
                HeapKeyedStateBackend     - Restoring snapshot from state 
handles
                
        *Relevant logs:
        Job Manager:
        2017-06-01 20:22:54,585 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Restoring
from latest valid checkpoint: Checkpoint 7 @ 1496348508893 for
510a7a83f509adace6704e7f2caa0b75.
        
        Task Manager:
        2017-06-01 20:22:57,349 INFO 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend     - Initializing
heap keyed state backend with stream factory.*
                

*The case where data loss is not observed:*
1) Kafka source receives data.

2) Bring all Kafka brokers down

3) Session window trigger is fired and windows are merged.

4) Flink triggers a checkpoint. Checkpoints are successful. (Checkpoint 12)
        Kafka source task commits the correct offsets. Not sure what was
check-pointed for the session window task.
        
        *Relevant logs:
        2017-06-02 18:53:52,953 DEBUG org.apache.flink.yarn.YarnTaskManager     
                   
- Receiver TriggerCheckpoint 12@1496429632943 for
138cd5e5c8065174e8e326fbb66ac4cd.
        2017-06-02 18:53:53,151 DEBUG org.apache.flink.yarn.YarnTaskManager     
                   
- Receiver ConfirmCheckpoint 12@1496429632943 for
138cd5e5c8065174e8e326fbb66ac4cd.
        2017-06-02 18:53:53,151 DEBUG org.apache.flink.yarn.YarnTaskManager     
                   
- Receiver ConfirmCheckpoint 12@1496429632943 for
dcfb670859a7fbf4ddb90c70ed5344dd.*
        
5) Session windows are evaluated.

6) Flink attempts to send data to Kafka but fails because brokers are down.

7) Flink restarts the job and restores state from checkpoint 12. State is
available for the session window in the checkpoint and the task is restored
with the state.

        Relevant logs:
        JobManager logs:
*        2017-06-02 18:54:19,826 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Restoring
from latest valid checkpoint: Checkpoint 12 @ 1496429632943 for
ea51350d9d689b2b09ab8fd2fe0f6454.
*
        
       TaskManager logs:
        *2017-06-02 18:54:23,582 INFO 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend     - Initializing
heap keyed state backend from snapshot.
        2017-06-02 18:54:23,583 DEBUG
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend     - Restoring
snapshot from state handles:
[KeyGroupsStateHandle{groupRangeOffsets=KeyGroupRangeOffsets{keyGroupRange=KeyGroupRan
        ge{startKeyGroup=0, endKeyGroup=127}, offsets=[3587, 3597, 3607, 3617,
3627, 3637, 3647, 3657, 3667, 3685, 3695, 3705, 3715, 3725, 3735, 3745,
3755, 3765, 3775, 3785, 3795, 3813, 3823, 3841, 3851, 3861, 3871, 3881, 3891
        , 3901, 3911, 3921, 3931, 3941, 3951, 3961, 3971, 3981, 3991, 4009, 
4019,
4029, 15215, 15225, 15235, 15245, 15255, 15265, 15275, 15285, 15295, 16187,
16197, 16207, 16217, 16227, 16237, 16247, 16257, 17149, 17159, 17169,
         17179, 17189, 18081, 18091, 18101, 18111, 18121, 18131, 18141, 18151,
18161, 18171, 18181, 18191, 18201, 19093, 19985, 19995, 20005, 20015, 20025,
20043, 20061, 20071, 20081, 20099, 20109, 20119, 20129, 20139, 20149, 2
        0159, 20169, 20179, 20189, 20199, 20209, 20219, 20229, 20239, 20249, 
20259,
20269, 20287, 20297, 20307, 20317, 20327, 20337, 20347, 33039, 33049, 33059,
33069, 34700, 34710, 34720, 34730, 34740, 34750, 34760, 34770, 34780, 34790,
34800, 34810]}, data=File State:
hdfs://dc1udtlhcld005.stack.qadev.corp:8022/user/harmony_user/flink-checkpoint/ea51350d9d689b2b09ab8fd2fe0f6454/chk-12/8373feed-7494-4b43-98ae-e1c4214b7890
[34820 bytes]}].*
        

Assuming that I am correctly interpreting the logs, I can think of two
conclusions about why we observed data loss in the first case:

1) I am missing some Flink setting.

2) Flink thought it check-pointed the windows data successfully, but didn't.
(We're using cloudera hadoop, but haven't built Flink with cloudera hadoop
binaries. )

3) Flink is not set up to check-point the data in session windows before
they are merged?

I have attached the log files for the successful run with this post.

Please let us know what you guys think. Thanks for your patience. 

jobManagerNoDataLoss.log
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n13467/jobManagerNoDataLoss.log>
   
tmOneNoDataLoss.log
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n13467/tmOneNoDataLoss.log>
   
tmTwoNoDataLoss.log
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n13467/tmTwoNoDataLoss.log>
  







--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-tp11413p13467.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Reply via email to