Hi Ninad, Unfortunately I don’t think the provided logs shed any light here.
It does complain about: 2017-06-01 20:22:44,400 WARN org.apache.kafka.clients.producer.internals.Sender - Got error produce response with correlation id 4 on topic-partit ion topic.http.stream.event.processor-0, retrying (99999 attempts left). Error: NOT_ENOUGH_REPLICAS , not sure if this may be related to not being build with the Cloudera binaries. Could you provide info on how exactly you’re verifying the lost messages? Cheers, Gordon On 1 June 2017 at 11:14:17 PM, ninad (nni...@gmail.com) wrote: Thanks Gordon and Kostas. Gordon, "When a failure occurs in the job, Flink uses the last completed checkpoint to restart the job. In the case of the Flink Kafka producer, this essentially makes sure that records which did not make it into Kafka and caused the last run to fail are reprocessed and sent to Kafka again." This is exactly what we were expecting. Thanks for confirming. However, we still do not see messages in Kafka. All the Kafka properties are as expected: Replication: 3 Min ISR: 2 acks: all We also tried this with Flink 1.2.1. Now, we haven't tested this with the standalone configuration. We will test it to see if the result is different. That being said, we're running this on cloudera YARN/hadoop cluster. But we haven't built FLINK against cloudera binaries. The logs certainly don't indicate that being the problem. Please let us know how we can troubleshoot this. I have attached the JobManager and TaskManager log files for reference. Relevant logs from the logs files: *Job Manager* 2017-06-01 20:22:44,499 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - TriggerWindow(ProcessingTimeSessionWindows(30000), ListStateDesc riptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@e56b3293}, ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:521 )) -> Sink: sink.http.sep (1/1) (b4a5c72b52779ab9b2b093b85b8b20c9) switched from RUNNING to FAILED. 2017-06-01 20:22:44,530 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Restarting the job event-filter (510a7a83f509adace6704e7f2caa0b75). 2017-06-01 20:22:44,534 INFO org.apache.flink.runtime.executiongraph.restart.ExecutionGraphRestarter - Delaying retry of job execution for 10000 ms ... 2017-06-01 20:22:48,233 DEBUG org.apache.flink.runtime.metrics.dump.MetricDumpSerialization - Failed to serialize gauge. 2017-06-01 20:22:54,535 DEBUG org.apache.flink.runtime.executiongraph.ExecutionGraph - Resetting execution vertex Source: Custom Source (1/1) for new execution. 2017-06-01 20:22:54,535 DEBUG org.apache.flink.runtime.executiongraph.ExecutionGraph - Resetting execution vertex TriggerWindow(ProcessingTimeSessionWindows(30000), ListStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@e56b3293}, ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:521)) -> Sink: sink.http.sep (1/1) for new execution. 2017-06-01 20:22:54,535 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job event-filter (510a7a83f509adace6704e7f2caa0b75) switched from state RESTARTING to CREATED. 2017-06-01 20:22:54,536 INFO org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Recovering checkpoints from ZooKeeper. 2017-06-01 20:22:54,543 INFO org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Found 1 checkpoints in ZooKeeper. 2017-06-01 20:22:54,543 INFO org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Trying to retrieve checkpoint 7. 2017-06-01 20:22:54,585 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Restoring from latest valid checkpoint: Checkpoint 7 @ 1496348508893 for 510a7a83f509adace6704e7f2caa0b75. 2017-06-01 20:22:54,591 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job event-filter (510a7a83f509adace6704e7f2caa0b75) switched from state CREATED to RUNNING. *Task Manager 1* 2017-06-01 20:22:44,400 WARN org.apache.kafka.clients.producer.internals.Sender - Got error produce response with correlation id 4 on topic-partit ion topic.http.stream.event.processor-0, retrying (99999 attempts left). Error: NOT_ENOUGH_REPLICAS 2017-06-01 20:22:44,426 INFO org.apache.flink.runtime.taskmanager.Task - Attempting to fail task externally TriggerWindow(ProcessingTimeS essionWindows(30000), ListStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@e56b3293}, ProcessingTimeTrigger(), WindowedS tream.apply(WindowedStream.java:521)) -> Sink: sink.http.sep (1/1) (b4a5c72b52779ab9b2b093b85b8b20c9). 2017-06-01 20:22:44,427 INFO org.apache.flink.runtime.taskmanager.Task - TriggerWindow(ProcessingTimeSessionWindows(30000), ListStateDesc riptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@e56b3293}, ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:521 )) -> Sink: sink.http.sep (1/1) (b4a5c72b52779ab9b2b093b85b8b20c9) switched from RUNNING to FAILED. TimerException{java.lang.RuntimeException: Could not forward element to next operator} *Task Manager 2* jobManager.log <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n13443/jobManager.log> taskManager.log <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n13443/taskManager.log> taskManager.log <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n13443/taskManager.log> 2017-06-01 20:22:54,741 DEBUG org.apache.flink.runtime.io.network.partition.ResultPartition - Source: Custom Source (1/1) (8ee2c8a628968bc3f8006f0740bb8ad1): Initialized ResultPartition 8d68b9c00d6a329d70ee2bf1ed320318@8ee2c8a628968bc3f8006f0740bb8ad1 [PIPELINED, 1 subpartitions, 1 pending references] 2017-06-01 20:22:54,760 INFO org.apache.flink.yarn.YarnTaskManager - Received task Source: Custom Source (1/1) 2017-06-01 20:27:30,388 WARN org.apache.kafka.clients.NetworkClient - Error while fetching metadata with correlation id 1 : {topic.event.filter=LEADER_NOT_AVAILABLE} -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-tp11413p13443.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.