Hi Stefan, Kafka one node was down. But I want it to restart automatically . How can I solve it?
Thanks On Wed, Dec 20, 2017 at 4:24 PM, Stefan Richter <s.rich...@data-artisans.com > wrote: > Hi, > > did you see that the problem starts from a Kafka exception „Failed to send > data to Kafka: This server is not the leader for that topic-partition.“? Is > it possible that you had a network issue and the producer could not find > the leader broker? > > Best, > Stefan > > > Am 20.12.2017 um 10:57 schrieb Shivam Sharma <28shivamsha...@gmail.com>: > > > > Hi , > > > > I am always facing this issue with Flink job on yarn. > > Basically I am reading data from kafka, transforming it & putting in > kafka > > only. > > > > My build.sbt is: > > > > val flinkVersion = "1.3.2" > > val flinkKafkaConnect = "0.10.2" > > > > libraryDependencies ++= Seq( > > "org.apache.flink" %% "flink-streaming-scala" % flinkVersion, > > "org.apache.flink" %% "flink-connector-kafka-0.10" % flinkVersion, > > "org.apache.flink" %% "flink-table" % flinkVersion, > > "org.json4s" %% "json4s-native" % "3.5.3", > > "org.json4s" %% "json4s-jackson" % "3.5.3" > > > > ) > > > > *Note: One of the node in our kafka Cluster goes down.* > > > > > > java.lang.RuntimeException: Exception occurred while processing valve > > output watermark: > > at > > org.apache.flink.streaming.runtime.io.StreamInputProcessor$ > ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor. > > java:289) > > at > > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve. > findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve. > > java:173) > > at > > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve. > inputWatermark(StatusWatermarkValve. > > java:108) > > at > > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput( > StreamInputProcessor. > > java:188) > > at > > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run( > OneInputStreamTask. > > java:69) > > at org.apache.flink.streaming.runtime.tasks.StreamTask. > invoke(StreamTask. > > java:263) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > > at java.lang.Thread.run(Thread.java:748) > > Caused by: org.apache.flink.streaming.runtime.tasks. > > ExceptionInChainedOperatorException: Could not forward element to next > > operator > > at > > org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.pushToOperator(OperatorChain. > > java:530) > > at > > org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.collect(OperatorChain. > > java:503) > > at > > org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.collect(OperatorChain. > > java:483) > > at > > org.apache.flink.streaming.api.operators.AbstractStreamOperator$ > CountingOutput.collect(AbstractStreamOperator. > > java:891) > > at > > org.apache.flink.streaming.api.operators.AbstractStreamOperator$ > CountingOutput.collect(AbstractStreamOperator. > > java:869) > > at > > org.apache.flink.streaming.api.operators.TimestampedCollector.collect( > TimestampedCollector. > > java:51) > > at > > org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector. > collect(TimeWindowPropertyCollector. > > scala:58) > > at > > org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunc > tion.apply(IncrementalAggregateWindowFunction. > > scala:75) > > at > > org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindow > Function.apply(IncrementalAggregateTimeWindowFunction. > > scala:65) > > at > > org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindow > Function.apply(IncrementalAggregateTimeWindowFunction. > > scala:36) > > at > > org.apache.flink.streaming.runtime.operators.windowing.functions. > InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunct > ion. > > java:44) > > at > > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator. > emitWindowContents(WindowOperator. > > java:597) > > at > > org.apache.flink.streaming.runtime.operators.windowing. > WindowOperator.onEventTime(WindowOperator. > > java:504) > > at > > org.apache.flink.streaming.api.operators.HeapInternalTimerService. > advanceWatermark(HeapInternalTimerService. > > java:275) > > at > > org.apache.flink.streaming.api.operators.InternalTimeServiceManager. > advanceWatermark(InternalTimeServiceManager. > > java:107) > > at > > org.apache.flink.streaming.api.operators.AbstractStreamOperator. > processWatermark(AbstractStreamOperator. > > java:946) > > at > > org.apache.flink.streaming.runtime.io.StreamInputProcessor$ > ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor. > > java:286) > > ... 7 more > > Caused by: org.apache.flink.streaming.runtime.tasks. > > ExceptionInChainedOperatorException: Could not forward element to next > > operator > > at > > org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.pushToOperator(OperatorChain. > > java:530) > > at > > org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.collect(OperatorChain. > > java:503) > > at > > org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.collect(OperatorChain. > > java:483) > > at > > org.apache.flink.streaming.api.operators.AbstractStreamOperator$ > CountingOutput.collect(AbstractStreamOperator. > > java:891) > > at > > org.apache.flink.streaming.api.operators.AbstractStreamOperator$ > CountingOutput.collect(AbstractStreamOperator. > > java:869) > > at > > org.apache.flink.streaming.api.operators.TimestampedCollector.collect( > TimestampedCollector. > > java:51) > > at > > org.apache.flink.table.runtime.CRowWrappingCollector. > collect(CRowWrappingCollector. > > scala:37) > > at > > org.apache.flink.table.runtime.CRowWrappingCollector. > collect(CRowWrappingCollector. > > scala:28) > > at DataStreamCalcRule$127.processElement(Unknown Source) > > at > > org.apache.flink.table.runtime.CRowProcessRunner.processElement( > CRowProcessRunner. > > scala:67) > > at > > org.apache.flink.table.runtime.CRowProcessRunner.processElement( > CRowProcessRunner. > > scala:35) > > at > > org.apache.flink.streaming.api.operators.ProcessOperator.processElement( > ProcessOperator. > > java:66) > > at > > org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.pushToOperator(OperatorChain. > > java:528) > > ... 23 more > > Caused by: org.apache.flink.streaming.runtime.tasks. > > ExceptionInChainedOperatorException: Could not forward element to next > > operator > > at > > org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.pushToOperator(OperatorChain. > > java:530) > > at > > org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.collect(OperatorChain. > > java:503) > > at > > org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.collect(OperatorChain. > > java:483) > > at > > org.apache.flink.streaming.api.operators.AbstractStreamOperator$ > CountingOutput.collect(AbstractStreamOperator. > > java:891) > > at > > org.apache.flink.streaming.api.operators.AbstractStreamOperator$ > CountingOutput.collect(AbstractStreamOperator. > > java:869) > > at > > org.apache.flink.streaming.api.operators.StreamMap. > processElement(StreamMap. > > java:41) > > at > > org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.pushToOperator(OperatorChain. > > java:528) > > ... 35 more > > Caused by: org.apache.flink.streaming.runtime.tasks. > > ExceptionInChainedOperatorException: Could not forward element to next > > operator > > at > > org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.pushToOperator(OperatorChain. > > java:530) > > at > > org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.collect(OperatorChain. > > java:503) > > at > > org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.collect(OperatorChain. > > java:483) > > at > > org.apache.flink.streaming.api.operators.AbstractStreamOperator$ > CountingOutput.collect(AbstractStreamOperator. > > java:891) > > at > > org.apache.flink.streaming.api.operators.AbstractStreamOperator$ > CountingOutput.collect(AbstractStreamOperator. > > java:869) > > at > > org.apache.flink.streaming.api.operators.TimestampedCollector.collect( > TimestampedCollector. > > java:51) > > at > > org.apache.flink.streaming.api.scala.DataStream$$anon$6$$ > anonfun$flatMap$1.apply(DataStream. > > scala:622) > > at > > org.apache.flink.streaming.api.scala.DataStream$$anon$6$$ > anonfun$flatMap$1.apply(DataStream. > > scala:622) > > at scala.collection.immutable.List.foreach(List.scala:381) > > at > > org.apache.flink.streaming.api.scala.DataStream$$anon$6. > flatMap(DataStream. > > scala:622) > > at > > org.apache.flink.streaming.api.operators.StreamFlatMap. > processElement(StreamFlatMap. > > java:50) > > at > > org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.pushToOperator(OperatorChain. > > java:528) > > ... 41 more > > Caused by: java.lang.Exception: Failed to send data to Kafka: This server > > is not the leader for that topic-partition. > > at > > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase. > checkErroneous(FlinkKafkaProducerBase. > > java:373) > > at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010 > > .invokeInternal(FlinkKafkaProducer010.java:302) > > at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010 > > .invoke(FlinkKafkaProducer010.java:407) > > at > > org.apache.flink.streaming.api.operators.StreamSink. > processElement(StreamSink. > > java:41) > > at > > org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.pushToOperator(OperatorChain. > > java:528) > > ... 52 more > > Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException > : > > This server is not the leader for that topic-partition. > > 2017-12-20 05:42:16,008 INFO > > org.apache.flink.runtime.executiongraph.ExecutionGraph - Job > > Persuasion (b65f972af4866ff0548bf7f0438caf8f) switched from state > RUNNING > > to FAILING. > > java.lang.RuntimeException: Exception occurred while processing valve > > output watermark: > > at > > org.apache.flink.streaming.runtime.io.StreamInputProcessor$ > ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor. > > java:289) > > at > > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve. > findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve. > > java:173) > > at > > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve. > inputWatermark(StatusWatermarkValve. > > java:108) > > at > > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput( > StreamInputProcessor. > > java:188) > > at > > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run( > OneInputStreamTask. > > java:69) > > at org.apache.flink.streaming.runtime.tasks.StreamTask. > invoke(StreamTask. > > java:263) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > > at java.lang.Thread.run(Thread.java:748) > > Caused by: org.apache.flink.streaming.runtime.tasks. > > ExceptionInChainedOperatorException: Could not forward element to next > > operator > > at > > org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.pushToOperator(OperatorChain. > > java:530) > > at > > org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.collect(OperatorChain. > > java:503) > > at > > org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.collect(OperatorChain. > > java:483) > > at > > org.apache.flink.streaming.api.operators.AbstractStreamOperator$ > CountingOutput.collect(AbstractStreamOperator. > > java:891) > > at > > org.apache.flink.streaming.api.operators.AbstractStreamOperator$ > CountingOutput.collect(AbstractStreamOperator. > > java:869) > > at > > org.apache.flink.streaming.api.operators.TimestampedCollector.collect( > TimestampedCollector. > > java:51) > > at > > org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector. > collect(TimeWindowPropertyCollector. > > scala:58) > > at > > org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunc > tion.apply(IncrementalAggregateWindowFunction. > > scala:75) > > at > > org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindow > Function.apply(IncrementalAggregateTimeWindowFunction. > > scala:65) > > at > > org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindow > Function.apply(IncrementalAggregateTimeWindowFunction. > > scala:36) > > at > > org.apache.flink.streaming.runtime.operators.windowing.functions. > InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunct > ion. > > java:44) > > at > > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator. > emitWindowContents(WindowOperator. > > java:597) > > at > > org.apache.flink.streaming.runtime.operators.windowing. > WindowOperator.onEventTime(WindowOperator. > > java:504) > > at > > org.apache.flink.streaming.api.operators.HeapInternalTimerService. > advanceWatermark(HeapInternalTimerService. > > java:275) > > at > > org.apache.flink.streaming.api.operators.InternalTimeServiceManager. > advanceWatermark(InternalTimeServiceManager. > > java:107) > > at > > org.apache.flink.streaming.api.operators.AbstractStreamOperator. > processWatermark(AbstractStreamOperator. > > java:946) > > at > > org.apache.flink.streaming.runtime.io.StreamInputProcessor$ > ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor. > > java:286) > > ... 7 more > > Caused by: org.apache.flink.streaming.runtime.tasks. > > ExceptionInChainedOperatorException: Could not forward element to next > > operator > > at > > org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.pushToOperator(OperatorChain. > > java:530) > > at > > org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.collect(OperatorChain. > > java:503) > > at > > org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.collect(OperatorChain. > > java:483) > > at > > org.apache.flink.streaming.api.operators.AbstractStreamOperator$ > CountingOutput.collect(AbstractStreamOperator. > > java:891) > > at > > org.apache.flink.streaming.api.operators.AbstractStreamOperator$ > CountingOutput.collect(AbstractStreamOperator. > > java:869) > > at > > org.apache.flink.streaming.api.operators.TimestampedCollector.collect( > TimestampedCollector. > > java:51) > > at > > org.apache.flink.table.runtime.CRowWrappingCollector. > collect(CRowWrappingCollector. > > scala:37) > > at > > org.apache.flink.table.runtime.CRowWrappingCollector. > collect(CRowWrappingCollector. > > scala:28) > > at DataStreamCalcRule$127.processElement(Unknown Source) > > at > > org.apache.flink.table.runtime.CRowProcessRunner.processElement( > CRowProcessRunner. > > scala:67) > > at > > org.apache.flink.table.runtime.CRowProcessRunner.processElement( > CRowProcessRunner. > > scala:35) > > at > > org.apache.flink.streaming.api.operators.ProcessOperator.processElement( > ProcessOperator. > > java:66) > > at > > org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.pushToOperator(OperatorChain. > > java:528) > > ... 23 more > > Caused by: org.apache.flink.streaming.runtime.tasks. > > ExceptionInChainedOperatorException: Could not forward element to next > > operator > > at > > org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.pushToOperator(OperatorChain. > > java:530) > > at > > org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.collect(OperatorChain. > > java:503) > > at > > org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.collect(OperatorChain. > > java:483) > > at > > org.apache.flink.streaming.api.operators.AbstractStreamOperator$ > CountingOutput.collect(AbstractStreamOperator. > > java:891) > > at > > org.apache.flink.streaming.api.operators.AbstractStreamOperator$ > CountingOutput.collect(AbstractStreamOperator. > > java:869) > > at > > org.apache.flink.streaming.api.operators.StreamMap. > processElement(StreamMap. > > java:41) > > at > > org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.pushToOperator(OperatorChain. > > java:528) > > ... 35 more > > Caused by: org.apache.flink.streaming.runtime.tasks. > > ExceptionInChainedOperatorException: Could not forward element to next > > operator > > at > > org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.pushToOperator(OperatorChain. > > java:530) > > at > > org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.collect(OperatorChain. > > java:503) > > at > > org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.collect(OperatorChain. > > java:483) > > at > > org.apache.flink.streaming.api.operators.AbstractStreamOperator$ > CountingOutput.collect(AbstractStreamOperator. > > java:891) > > at > > org.apache.flink.streaming.api.operators.AbstractStreamOperator$ > CountingOutput.collect(AbstractStreamOperator. > > java:869) > > at > > org.apache.flink.streaming.api.operators.TimestampedCollector.collect( > TimestampedCollector. > > java:51) > > at > > org.apache.flink.streaming.api.scala.DataStream$$anon$6$$ > anonfun$flatMap$1.apply(DataStream. > > scala:622) > > at > > org.apache.flink.streaming.api.scala.DataStream$$anon$6$$ > anonfun$flatMap$1.apply(DataStream. > > scala:622) > > at scala.collection.immutable.List.foreach(List.scala:381) > > at > > org.apache.flink.streaming.api.scala.DataStream$$anon$6. > flatMap(DataStream. > > scala:622) > > at > > org.apache.flink.streaming.api.operators.StreamFlatMap. > processElement(StreamFlatMap. > > java:50) > > at > > org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.pushToOperator(OperatorChain. > > java:528) > > ... 41 more > > Caused by: java.lang.Exception: Failed to send data to Kafka: This server > > is not the leader for that topic-partition. > > at > > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase. > checkErroneous(FlinkKafkaProducerBase. > > java:373) > > at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010 > > .invokeInternal(FlinkKafkaProducer010.java:302) > > at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010 > > .invoke(FlinkKafkaProducer010.java:407) > > at > > org.apache.flink.streaming.api.operators.StreamSink. > processElement(StreamSink. > > java:41) > > at > > org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.pushToOperator(OperatorChain. > > java:528) > > ... 52 more > > > > > > Thanks > > > > -- > > Shivam Sharma > > Data Engineer @ Goibibo > > Indian Institute Of Information Technology, Design and Manufacturing > > Jabalpur > > Mobile No- (+91) 8882114744 > > Email:- 28shivamsha...@gmail.com > > LinkedIn:-*https://www.linkedin.com/in/28shivamsharma > > <https://www.linkedin.com/in/28shivamsharma>* > > -- Shivam Sharma Data Engineer @ Goibibo Indian Institute Of Information Technology, Design and Manufacturing Jabalpur Mobile No- (+91) 8882114744 Email:- 28shivamsha...@gmail.com LinkedIn:-*https://www.linkedin.com/in/28shivamsharma <https://www.linkedin.com/in/28shivamsharma>*