Hi Ninad,

Unfortunately I don’t think the provided logs shed any light here.

It does complain about:

2017-06-01 20:22:44,400 WARN 
org.apache.kafka.clients.producer.internals.Sender - Got error 
produce response with correlation id 4 on topic-partit 
ion topic.http.stream.event.processor-0, retrying (99999 attempts left). 
Error: NOT_ENOUGH_REPLICAS 
, not sure if this may be related to not being build with the Cloudera binaries.

Could you provide info on how exactly you’re verifying the lost messages?

Cheers,
Gordon

On 1 June 2017 at 11:14:17 PM, ninad (nni...@gmail.com) wrote:

Thanks Gordon and Kostas.  

Gordon,  

"When a failure occurs in the job, Flink uses the last completed checkpoint  
to restart the job. In the case of the Flink Kafka producer, this  
essentially makes sure that records which did not make it into Kafka and  
caused the last run to fail are reprocessed and sent to Kafka again."  

This is exactly what we were expecting. Thanks for confirming. However, we  
still do not see messages in Kafka.  
All the Kafka properties are as expected:  

Replication: 3  
Min ISR: 2  
acks: all  

We also tried this with Flink 1.2.1. Now, we haven't tested this with the  
standalone configuration. We will test it to see if the result is different.  

That being said, we're running this on cloudera YARN/hadoop cluster. But we  
haven't built FLINK against cloudera binaries. The logs certainly don't  
indicate that being the problem.  

Please let us know how we can troubleshoot this.  

I have attached the JobManager and TaskManager log files for reference.  

Relevant logs from the logs files:  

*Job Manager*  

2017-06-01 20:22:44,499 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph -  
TriggerWindow(ProcessingTimeSessionWindows(30000), ListStateDesc  
riptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@e56b3293},
  
ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:521  
)) -> Sink: sink.http.sep (1/1) (b4a5c72b52779ab9b2b093b85b8b20c9) switched  
from RUNNING to FAILED.  

2017-06-01 20:22:44,530 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph - Restarting  
the job event-filter (510a7a83f509adace6704e7f2caa0b75).  
2017-06-01 20:22:44,534 INFO  
org.apache.flink.runtime.executiongraph.restart.ExecutionGraphRestarter -  
Delaying retry of job execution for 10000 ms ...  
2017-06-01 20:22:48,233 DEBUG  
org.apache.flink.runtime.metrics.dump.MetricDumpSerialization - Failed to  
serialize gauge.  

2017-06-01 20:22:54,535 DEBUG  
org.apache.flink.runtime.executiongraph.ExecutionGraph - Resetting  
execution vertex Source: Custom Source (1/1) for new execution.  
2017-06-01 20:22:54,535 DEBUG  
org.apache.flink.runtime.executiongraph.ExecutionGraph - Resetting  
execution vertex TriggerWindow(ProcessingTimeSessionWindows(30000),  
ListStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@e56b3293},
  
ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:521)) ->  
Sink: sink.http.sep (1/1) for new execution.  
2017-06-01 20:22:54,535 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph - Job  
event-filter (510a7a83f509adace6704e7f2caa0b75) switched from state  
RESTARTING to CREATED.  
2017-06-01 20:22:54,536 INFO  
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore -  
Recovering checkpoints from ZooKeeper.  
2017-06-01 20:22:54,543 INFO  
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore -  
Found 1 checkpoints in ZooKeeper.  
2017-06-01 20:22:54,543 INFO  
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore -  
Trying to retrieve checkpoint 7.  
2017-06-01 20:22:54,585 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Restoring  
from latest valid checkpoint: Checkpoint 7 @ 1496348508893 for  
510a7a83f509adace6704e7f2caa0b75.  
2017-06-01 20:22:54,591 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph - Job  
event-filter (510a7a83f509adace6704e7f2caa0b75) switched from state CREATED  
to RUNNING.  


*Task Manager 1*  

2017-06-01 20:22:44,400 WARN  
org.apache.kafka.clients.producer.internals.Sender - Got error  
produce response with correlation id 4 on topic-partit  
ion topic.http.stream.event.processor-0, retrying (99999 attempts left).  
Error: NOT_ENOUGH_REPLICAS  

2017-06-01 20:22:44,426 INFO org.apache.flink.runtime.taskmanager.Task  
- Attempting to fail task externally TriggerWindow(ProcessingTimeS  
essionWindows(30000),  
ListStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@e56b3293},
  
ProcessingTimeTrigger(), WindowedS  
tream.apply(WindowedStream.java:521)) -> Sink: sink.http.sep (1/1)  
(b4a5c72b52779ab9b2b093b85b8b20c9).  

2017-06-01 20:22:44,427 INFO org.apache.flink.runtime.taskmanager.Task  
- TriggerWindow(ProcessingTimeSessionWindows(30000), ListStateDesc  
riptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@e56b3293},
  
ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:521  
)) -> Sink: sink.http.sep (1/1) (b4a5c72b52779ab9b2b093b85b8b20c9) switched  
from RUNNING to FAILED.  
TimerException{java.lang.RuntimeException: Could not forward element to next  
operator}  

*Task Manager 2* jobManager.log  
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n13443/jobManager.log>
  
taskManager.log  
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n13443/taskManager.log>
  
taskManager.log  
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n13443/taskManager.log>
  
2017-06-01 20:22:54,741 DEBUG  
org.apache.flink.runtime.io.network.partition.ResultPartition - Source:  
Custom Source (1/1) (8ee2c8a628968bc3f8006f0740bb8ad1):  
Initialized ResultPartition  
8d68b9c00d6a329d70ee2bf1ed320318@8ee2c8a628968bc3f8006f0740bb8ad1  
[PIPELINED, 1 subpartitions, 1 pending references]  
2017-06-01 20:22:54,760 INFO org.apache.flink.yarn.YarnTaskManager  
- Received task Source: Custom Source (1/1)  

2017-06-01 20:27:30,388 WARN org.apache.kafka.clients.NetworkClient  
- Error while fetching metadata with correlation id 1 :  
{topic.event.filter=LEADER_NOT_AVAILABLE}  




--  
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-tp11413p13443.html
  
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.  

Reply via email to