Hi , I am always facing this issue with Flink job on yarn. Basically I am reading data from kafka, transforming it & putting in kafka only.
My build.sbt is: val flinkVersion = "1.3.2" val flinkKafkaConnect = "0.10.2" libraryDependencies ++= Seq( "org.apache.flink" %% "flink-streaming-scala" % flinkVersion, "org.apache.flink" %% "flink-connector-kafka-0.10" % flinkVersion, "org.apache.flink" %% "flink-table" % flinkVersion, "org.json4s" %% "json4s-native" % "3.5.3", "org.json4s" %% "json4s-jackson" % "3.5.3" ) *Note: One of the node in our kafka Cluster goes down.* java.lang.RuntimeException: Exception occurred while processing valve output watermark: at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor. java:289) at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve. java:173) at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve. java:108) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor. java:188) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask. java:69) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask. java:263) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.streaming.runtime.tasks. ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain. java:530) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain. java:503) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain. java:483) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator. java:891) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator. java:869) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector. java:51) at org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect(TimeWindowPropertyCollector. scala:58) at org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply(IncrementalAggregateWindowFunction. scala:75) at org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply(IncrementalAggregateTimeWindowFunction. scala:65) at org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply(IncrementalAggregateTimeWindowFunction. scala:36) at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunction. java:44) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator. java:597) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator. java:504) at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService. java:275) at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager. java:107) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator. java:946) at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor. java:286) ... 7 more Caused by: org.apache.flink.streaming.runtime.tasks. ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain. java:530) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain. java:503) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain. java:483) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator. java:891) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator. java:869) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector. java:51) at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector. scala:37) at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector. scala:28) at DataStreamCalcRule$127.processElement(Unknown Source) at org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner. scala:67) at org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner. scala:35) at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator. java:66) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain. java:528) ... 23 more Caused by: org.apache.flink.streaming.runtime.tasks. ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain. java:530) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain. java:503) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain. java:483) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator. java:891) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator. java:869) at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap. java:41) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain. java:528) ... 35 more Caused by: org.apache.flink.streaming.runtime.tasks. ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain. java:530) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain. java:503) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain. java:483) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator. java:891) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator. java:869) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector. java:51) at org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream. scala:622) at org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream. scala:622) at scala.collection.immutable.List.foreach(List.scala:381) at org.apache.flink.streaming.api.scala.DataStream$$anon$6.flatMap(DataStream. scala:622) at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap. java:50) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain. java:528) ... 41 more Caused by: java.lang.Exception: Failed to send data to Kafka: This server is not the leader for that topic-partition. at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase. java:373) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010 .invokeInternal(FlinkKafkaProducer010.java:302) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010 .invoke(FlinkKafkaProducer010.java:407) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink. java:41) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain. java:528) ... 52 more Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition. 2017-12-20 05:42:16,008 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Persuasion (b65f972af4866ff0548bf7f0438caf8f) switched from state RUNNING to FAILING. java.lang.RuntimeException: Exception occurred while processing valve output watermark: at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor. java:289) at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve. java:173) at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve. java:108) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor. java:188) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask. java:69) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask. java:263) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.streaming.runtime.tasks. ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain. java:530) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain. java:503) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain. java:483) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator. java:891) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator. java:869) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector. java:51) at org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect(TimeWindowPropertyCollector. scala:58) at org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply(IncrementalAggregateWindowFunction. scala:75) at org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply(IncrementalAggregateTimeWindowFunction. scala:65) at org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply(IncrementalAggregateTimeWindowFunction. scala:36) at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunction. java:44) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator. java:597) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator. java:504) at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService. java:275) at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager. java:107) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator. java:946) at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor. java:286) ... 7 more Caused by: org.apache.flink.streaming.runtime.tasks. ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain. java:530) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain. java:503) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain. java:483) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator. java:891) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator. java:869) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector. java:51) at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector. scala:37) at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector. scala:28) at DataStreamCalcRule$127.processElement(Unknown Source) at org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner. scala:67) at org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner. scala:35) at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator. java:66) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain. java:528) ... 23 more Caused by: org.apache.flink.streaming.runtime.tasks. ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain. java:530) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain. java:503) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain. java:483) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator. java:891) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator. java:869) at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap. java:41) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain. java:528) ... 35 more Caused by: org.apache.flink.streaming.runtime.tasks. ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain. java:530) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain. java:503) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain. java:483) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator. java:891) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator. java:869) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector. java:51) at org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream. scala:622) at org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream. scala:622) at scala.collection.immutable.List.foreach(List.scala:381) at org.apache.flink.streaming.api.scala.DataStream$$anon$6.flatMap(DataStream. scala:622) at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap. java:50) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain. java:528) ... 41 more Caused by: java.lang.Exception: Failed to send data to Kafka: This server is not the leader for that topic-partition. at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase. java:373) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010 .invokeInternal(FlinkKafkaProducer010.java:302) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010 .invoke(FlinkKafkaProducer010.java:407) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink. java:41) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain. java:528) ... 52 more Thanks -- Shivam Sharma Data Engineer @ Goibibo Indian Institute Of Information Technology, Design and Manufacturing Jabalpur Mobile No- (+91) 8882114744 Email:- 28shivamsha...@gmail.com LinkedIn:-*https://www.linkedin.com/in/28shivamsharma <https://www.linkedin.com/in/28shivamsharma>*