​Hi ,

I am always facing this issue with Flink job on yarn.
Basically I am reading data from kafka, transforming it & putting in kafka
only.

My build.sbt is:

val flinkVersion = "1.3.2"
val flinkKafkaConnect = "0.10.2"

libraryDependencies ++= Seq(
    "org.apache.flink" %% "flink-streaming-scala" % flinkVersion,
    "org.apache.flink" %% "flink-connector-kafka-0.10" % flinkVersion,
    "org.apache.flink" %% "flink-table" % flinkVersion,
    "org.json4s" %% "json4s-native" % "3.5.3",
    "org.json4s" %% "json4s-jackson" % "3.5.3"

)

*Note: One of the node in our kafka Cluster ​goes down.*


java.lang.RuntimeException: Exception occurred while processing valve
output watermark:
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.
java:289)
at
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.
java:173)
at
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.
java:108)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.
java:188)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.
java:69)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.
java:263)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.streaming.runtime.tasks.
ExceptionInChainedOperatorException: Could not forward element to next
operator
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.
java:530)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.
java:503)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.
java:483)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.
java:891)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.
java:869)
at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.
java:51)
at
org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect(TimeWindowPropertyCollector.
scala:58)
at
org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply(IncrementalAggregateWindowFunction.
scala:75)
at
org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply(IncrementalAggregateTimeWindowFunction.
scala:65)
at
org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply(IncrementalAggregateTimeWindowFunction.
scala:36)
at
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunction.
java:44)
at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.
java:597)
at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.
java:504)
at
org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.
java:275)
at
org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.
java:107)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.
java:946)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.
java:286)
... 7 more
Caused by: org.apache.flink.streaming.runtime.tasks.
ExceptionInChainedOperatorException: Could not forward element to next
operator
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.
java:530)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.
java:503)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.
java:483)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.
java:891)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.
java:869)
at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.
java:51)
at
org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.
scala:37)
at
org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.
scala:28)
at DataStreamCalcRule$127.processElement(Unknown Source)
at
org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.
scala:67)
at
org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.
scala:35)
at
org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.
java:66)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.
java:528)
... 23 more
Caused by: org.apache.flink.streaming.runtime.tasks.
ExceptionInChainedOperatorException: Could not forward element to next
operator
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.
java:530)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.
java:503)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.
java:483)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.
java:891)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.
java:869)
at
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.
java:41)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.
java:528)
... 35 more
Caused by: org.apache.flink.streaming.runtime.tasks.
ExceptionInChainedOperatorException: Could not forward element to next
operator
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.
java:530)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.
java:503)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.
java:483)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.
java:891)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.
java:869)
at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.
java:51)
at
org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.
scala:622)
at
org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.
scala:622)
at scala.collection.immutable.List.foreach(List.scala:381)
at
org.apache.flink.streaming.api.scala.DataStream$$anon$6.flatMap(DataStream.
scala:622)
at
org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.
java:50)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.
java:528)
... 41 more
Caused by: java.lang.Exception: Failed to send data to Kafka: This server
is not the leader for that topic-partition.
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.
java:373)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010
.invokeInternal(FlinkKafkaProducer010.java:302)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010
.invoke(FlinkKafkaProducer010.java:407)
at
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.
java:41)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.
java:528)
... 52 more
Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException:
This server is not the leader for that topic-partition.
2017-12-20 05:42:16,008 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job
Persuasion (b65f972af4866ff0548bf7f0438caf8f) switched from state RUNNING
to FAILING.
java.lang.RuntimeException: Exception occurred while processing valve
output watermark:
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.
java:289)
at
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.
java:173)
at
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.
java:108)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.
java:188)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.
java:69)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.
java:263)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.streaming.runtime.tasks.
ExceptionInChainedOperatorException: Could not forward element to next
operator
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.
java:530)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.
java:503)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.
java:483)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.
java:891)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.
java:869)
at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.
java:51)
at
org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect(TimeWindowPropertyCollector.
scala:58)
at
org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply(IncrementalAggregateWindowFunction.
scala:75)
at
org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply(IncrementalAggregateTimeWindowFunction.
scala:65)
at
org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply(IncrementalAggregateTimeWindowFunction.
scala:36)
at
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunction.
java:44)
at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.
java:597)
at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.
java:504)
at
org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.
java:275)
at
org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.
java:107)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.
java:946)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.
java:286)
... 7 more
Caused by: org.apache.flink.streaming.runtime.tasks.
ExceptionInChainedOperatorException: Could not forward element to next
operator
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.
java:530)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.
java:503)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.
java:483)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.
java:891)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.
java:869)
at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.
java:51)
at
org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.
scala:37)
at
org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.
scala:28)
at DataStreamCalcRule$127.processElement(Unknown Source)
at
org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.
scala:67)
at
org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.
scala:35)
at
org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.
java:66)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.
java:528)
... 23 more
Caused by: org.apache.flink.streaming.runtime.tasks.
ExceptionInChainedOperatorException: Could not forward element to next
operator
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.
java:530)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.
java:503)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.
java:483)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.
java:891)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.
java:869)
at
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.
java:41)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.
java:528)
... 35 more
Caused by: org.apache.flink.streaming.runtime.tasks.
ExceptionInChainedOperatorException: Could not forward element to next
operator
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.
java:530)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.
java:503)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.
java:483)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.
java:891)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.
java:869)
at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.
java:51)
at
org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.
scala:622)
at
org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.
scala:622)
at scala.collection.immutable.List.foreach(List.scala:381)
at
org.apache.flink.streaming.api.scala.DataStream$$anon$6.flatMap(DataStream.
scala:622)
at
org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.
java:50)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.
java:528)
... 41 more
Caused by: java.lang.Exception: Failed to send data to Kafka: This server
is not the leader for that topic-partition.
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.
java:373)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010
.invokeInternal(FlinkKafkaProducer010.java:302)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010
.invoke(FlinkKafkaProducer010.java:407)
at
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.
java:41)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.
java:528)
... 52 more


Thanks

-- 
Shivam Sharma
Data Engineer @ Goibibo
Indian Institute Of Information Technology, Design and Manufacturing
Jabalpur
Mobile No- (+91) 8882114744
Email:- 28shivamsha...@gmail.com
LinkedIn:-*https://www.linkedin.com/in/28shivamsharma
<https://www.linkedin.com/in/28shivamsharma>*

Reply via email to