Hi, at this point, this seems more like a Kafka question than a Flink question. You think you need to configure high availability for Kafka with Zookeeper, you can probably find more about this here: https://www.cloudera.com/documentation/kafka/latest/topics/kafka_ha.html <https://www.cloudera.com/documentation/kafka/latest/topics/kafka_ha.html>.
Best, Stefan > Am 20.12.2017 um 12:06 schrieb Shivam Sharma <28shivamsha...@gmail.com>: > > Hi Stefan, > > Kafka one node was down. But I want it to restart automatically . How can I > solve it? > > Thanks > > On Wed, Dec 20, 2017 at 4:24 PM, Stefan Richter <s.rich...@data-artisans.com >> wrote: > >> Hi, >> >> did you see that the problem starts from a Kafka exception „Failed to send >> data to Kafka: This server is not the leader for that topic-partition.“? Is >> it possible that you had a network issue and the producer could not find >> the leader broker? >> >> Best, >> Stefan >> >>> Am 20.12.2017 um 10:57 schrieb Shivam Sharma <28shivamsha...@gmail.com>: >>> >>> Hi , >>> >>> I am always facing this issue with Flink job on yarn. >>> Basically I am reading data from kafka, transforming it & putting in >> kafka >>> only. >>> >>> My build.sbt is: >>> >>> val flinkVersion = "1.3.2" >>> val flinkKafkaConnect = "0.10.2" >>> >>> libraryDependencies ++= Seq( >>> "org.apache.flink" %% "flink-streaming-scala" % flinkVersion, >>> "org.apache.flink" %% "flink-connector-kafka-0.10" % flinkVersion, >>> "org.apache.flink" %% "flink-table" % flinkVersion, >>> "org.json4s" %% "json4s-native" % "3.5.3", >>> "org.json4s" %% "json4s-jackson" % "3.5.3" >>> >>> ) >>> >>> *Note: One of the node in our kafka Cluster goes down.* >>> >>> >>> java.lang.RuntimeException: Exception occurred while processing valve >>> output watermark: >>> at >>> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ >> ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor. >>> java:289) >>> at >>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve. >> findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve. >>> java:173) >>> at >>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve. >> inputWatermark(StatusWatermarkValve. >>> java:108) >>> at >>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput( >> StreamInputProcessor. >>> java:188) >>> at >>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run( >> OneInputStreamTask. >>> java:69) >>> at org.apache.flink.streaming.runtime.tasks.StreamTask. >> invoke(StreamTask. >>> java:263) >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) >>> at java.lang.Thread.run(Thread.java:748) >>> Caused by: org.apache.flink.streaming.runtime.tasks. >>> ExceptionInChainedOperatorException: Could not forward element to next >>> operator >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ >> CopyingChainingOutput.pushToOperator(OperatorChain. >>> java:530) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ >> CopyingChainingOutput.collect(OperatorChain. >>> java:503) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ >> CopyingChainingOutput.collect(OperatorChain. >>> java:483) >>> at >>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$ >> CountingOutput.collect(AbstractStreamOperator. >>> java:891) >>> at >>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$ >> CountingOutput.collect(AbstractStreamOperator. >>> java:869) >>> at >>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect( >> TimestampedCollector. >>> java:51) >>> at >>> org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector. >> collect(TimeWindowPropertyCollector. >>> scala:58) >>> at >>> org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunc >> tion.apply(IncrementalAggregateWindowFunction. >>> scala:75) >>> at >>> org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindow >> Function.apply(IncrementalAggregateTimeWindowFunction. >>> scala:65) >>> at >>> org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindow >> Function.apply(IncrementalAggregateTimeWindowFunction. >>> scala:36) >>> at >>> org.apache.flink.streaming.runtime.operators.windowing.functions. >> InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunct >> ion. >>> java:44) >>> at >>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator. >> emitWindowContents(WindowOperator. >>> java:597) >>> at >>> org.apache.flink.streaming.runtime.operators.windowing. >> WindowOperator.onEventTime(WindowOperator. >>> java:504) >>> at >>> org.apache.flink.streaming.api.operators.HeapInternalTimerService. >> advanceWatermark(HeapInternalTimerService. >>> java:275) >>> at >>> org.apache.flink.streaming.api.operators.InternalTimeServiceManager. >> advanceWatermark(InternalTimeServiceManager. >>> java:107) >>> at >>> org.apache.flink.streaming.api.operators.AbstractStreamOperator. >> processWatermark(AbstractStreamOperator. >>> java:946) >>> at >>> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ >> ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor. >>> java:286) >>> ... 7 more >>> Caused by: org.apache.flink.streaming.runtime.tasks. >>> ExceptionInChainedOperatorException: Could not forward element to next >>> operator >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ >> CopyingChainingOutput.pushToOperator(OperatorChain. >>> java:530) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ >> CopyingChainingOutput.collect(OperatorChain. >>> java:503) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ >> CopyingChainingOutput.collect(OperatorChain. >>> java:483) >>> at >>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$ >> CountingOutput.collect(AbstractStreamOperator. >>> java:891) >>> at >>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$ >> CountingOutput.collect(AbstractStreamOperator. >>> java:869) >>> at >>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect( >> TimestampedCollector. >>> java:51) >>> at >>> org.apache.flink.table.runtime.CRowWrappingCollector. >> collect(CRowWrappingCollector. >>> scala:37) >>> at >>> org.apache.flink.table.runtime.CRowWrappingCollector. >> collect(CRowWrappingCollector. >>> scala:28) >>> at DataStreamCalcRule$127.processElement(Unknown Source) >>> at >>> org.apache.flink.table.runtime.CRowProcessRunner.processElement( >> CRowProcessRunner. >>> scala:67) >>> at >>> org.apache.flink.table.runtime.CRowProcessRunner.processElement( >> CRowProcessRunner. >>> scala:35) >>> at >>> org.apache.flink.streaming.api.operators.ProcessOperator.processElement( >> ProcessOperator. >>> java:66) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ >> CopyingChainingOutput.pushToOperator(OperatorChain. >>> java:528) >>> ... 23 more >>> Caused by: org.apache.flink.streaming.runtime.tasks. >>> ExceptionInChainedOperatorException: Could not forward element to next >>> operator >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ >> CopyingChainingOutput.pushToOperator(OperatorChain. >>> java:530) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ >> CopyingChainingOutput.collect(OperatorChain. >>> java:503) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ >> CopyingChainingOutput.collect(OperatorChain. >>> java:483) >>> at >>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$ >> CountingOutput.collect(AbstractStreamOperator. >>> java:891) >>> at >>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$ >> CountingOutput.collect(AbstractStreamOperator. >>> java:869) >>> at >>> org.apache.flink.streaming.api.operators.StreamMap. >> processElement(StreamMap. >>> java:41) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ >> CopyingChainingOutput.pushToOperator(OperatorChain. >>> java:528) >>> ... 35 more >>> Caused by: org.apache.flink.streaming.runtime.tasks. >>> ExceptionInChainedOperatorException: Could not forward element to next >>> operator >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ >> CopyingChainingOutput.pushToOperator(OperatorChain. >>> java:530) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ >> CopyingChainingOutput.collect(OperatorChain. >>> java:503) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ >> CopyingChainingOutput.collect(OperatorChain. >>> java:483) >>> at >>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$ >> CountingOutput.collect(AbstractStreamOperator. >>> java:891) >>> at >>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$ >> CountingOutput.collect(AbstractStreamOperator. >>> java:869) >>> at >>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect( >> TimestampedCollector. >>> java:51) >>> at >>> org.apache.flink.streaming.api.scala.DataStream$$anon$6$$ >> anonfun$flatMap$1.apply(DataStream. >>> scala:622) >>> at >>> org.apache.flink.streaming.api.scala.DataStream$$anon$6$$ >> anonfun$flatMap$1.apply(DataStream. >>> scala:622) >>> at scala.collection.immutable.List.foreach(List.scala:381) >>> at >>> org.apache.flink.streaming.api.scala.DataStream$$anon$6. >> flatMap(DataStream. >>> scala:622) >>> at >>> org.apache.flink.streaming.api.operators.StreamFlatMap. >> processElement(StreamFlatMap. >>> java:50) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ >> CopyingChainingOutput.pushToOperator(OperatorChain. >>> java:528) >>> ... 41 more >>> Caused by: java.lang.Exception: Failed to send data to Kafka: This server >>> is not the leader for that topic-partition. >>> at >>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase. >> checkErroneous(FlinkKafkaProducerBase. >>> java:373) >>> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010 >>> .invokeInternal(FlinkKafkaProducer010.java:302) >>> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010 >>> .invoke(FlinkKafkaProducer010.java:407) >>> at >>> org.apache.flink.streaming.api.operators.StreamSink. >> processElement(StreamSink. >>> java:41) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ >> CopyingChainingOutput.pushToOperator(OperatorChain. >>> java:528) >>> ... 52 more >>> Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException >> : >>> This server is not the leader for that topic-partition. >>> 2017-12-20 05:42:16,008 INFO >>> org.apache.flink.runtime.executiongraph.ExecutionGraph - Job >>> Persuasion (b65f972af4866ff0548bf7f0438caf8f) switched from state >> RUNNING >>> to FAILING. >>> java.lang.RuntimeException: Exception occurred while processing valve >>> output watermark: >>> at >>> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ >> ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor. >>> java:289) >>> at >>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve. >> findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve. >>> java:173) >>> at >>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve. >> inputWatermark(StatusWatermarkValve. >>> java:108) >>> at >>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput( >> StreamInputProcessor. >>> java:188) >>> at >>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run( >> OneInputStreamTask. >>> java:69) >>> at org.apache.flink.streaming.runtime.tasks.StreamTask. >> invoke(StreamTask. >>> java:263) >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) >>> at java.lang.Thread.run(Thread.java:748) >>> Caused by: org.apache.flink.streaming.runtime.tasks. >>> ExceptionInChainedOperatorException: Could not forward element to next >>> operator >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ >> CopyingChainingOutput.pushToOperator(OperatorChain. >>> java:530) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ >> CopyingChainingOutput.collect(OperatorChain. >>> java:503) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ >> CopyingChainingOutput.collect(OperatorChain. >>> java:483) >>> at >>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$ >> CountingOutput.collect(AbstractStreamOperator. >>> java:891) >>> at >>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$ >> CountingOutput.collect(AbstractStreamOperator. >>> java:869) >>> at >>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect( >> TimestampedCollector. >>> java:51) >>> at >>> org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector. >> collect(TimeWindowPropertyCollector. >>> scala:58) >>> at >>> org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunc >> tion.apply(IncrementalAggregateWindowFunction. >>> scala:75) >>> at >>> org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindow >> Function.apply(IncrementalAggregateTimeWindowFunction. >>> scala:65) >>> at >>> org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindow >> Function.apply(IncrementalAggregateTimeWindowFunction. >>> scala:36) >>> at >>> org.apache.flink.streaming.runtime.operators.windowing.functions. >> InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunct >> ion. >>> java:44) >>> at >>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator. >> emitWindowContents(WindowOperator. >>> java:597) >>> at >>> org.apache.flink.streaming.runtime.operators.windowing. >> WindowOperator.onEventTime(WindowOperator. >>> java:504) >>> at >>> org.apache.flink.streaming.api.operators.HeapInternalTimerService. >> advanceWatermark(HeapInternalTimerService. >>> java:275) >>> at >>> org.apache.flink.streaming.api.operators.InternalTimeServiceManager. >> advanceWatermark(InternalTimeServiceManager. >>> java:107) >>> at >>> org.apache.flink.streaming.api.operators.AbstractStreamOperator. >> processWatermark(AbstractStreamOperator. >>> java:946) >>> at >>> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ >> ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor. >>> java:286) >>> ... 7 more >>> Caused by: org.apache.flink.streaming.runtime.tasks. >>> ExceptionInChainedOperatorException: Could not forward element to next >>> operator >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ >> CopyingChainingOutput.pushToOperator(OperatorChain. >>> java:530) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ >> CopyingChainingOutput.collect(OperatorChain. >>> java:503) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ >> CopyingChainingOutput.collect(OperatorChain. >>> java:483) >>> at >>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$ >> CountingOutput.collect(AbstractStreamOperator. >>> java:891) >>> at >>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$ >> CountingOutput.collect(AbstractStreamOperator. >>> java:869) >>> at >>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect( >> TimestampedCollector. >>> java:51) >>> at >>> org.apache.flink.table.runtime.CRowWrappingCollector. >> collect(CRowWrappingCollector. >>> scala:37) >>> at >>> org.apache.flink.table.runtime.CRowWrappingCollector. >> collect(CRowWrappingCollector. >>> scala:28) >>> at DataStreamCalcRule$127.processElement(Unknown Source) >>> at >>> org.apache.flink.table.runtime.CRowProcessRunner.processElement( >> CRowProcessRunner. >>> scala:67) >>> at >>> org.apache.flink.table.runtime.CRowProcessRunner.processElement( >> CRowProcessRunner. >>> scala:35) >>> at >>> org.apache.flink.streaming.api.operators.ProcessOperator.processElement( >> ProcessOperator. >>> java:66) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ >> CopyingChainingOutput.pushToOperator(OperatorChain. >>> java:528) >>> ... 23 more >>> Caused by: org.apache.flink.streaming.runtime.tasks. >>> ExceptionInChainedOperatorException: Could not forward element to next >>> operator >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ >> CopyingChainingOutput.pushToOperator(OperatorChain. >>> java:530) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ >> CopyingChainingOutput.collect(OperatorChain. >>> java:503) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ >> CopyingChainingOutput.collect(OperatorChain. >>> java:483) >>> at >>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$ >> CountingOutput.collect(AbstractStreamOperator. >>> java:891) >>> at >>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$ >> CountingOutput.collect(AbstractStreamOperator. >>> java:869) >>> at >>> org.apache.flink.streaming.api.operators.StreamMap. >> processElement(StreamMap. >>> java:41) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ >> CopyingChainingOutput.pushToOperator(OperatorChain. >>> java:528) >>> ... 35 more >>> Caused by: org.apache.flink.streaming.runtime.tasks. >>> ExceptionInChainedOperatorException: Could not forward element to next >>> operator >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ >> CopyingChainingOutput.pushToOperator(OperatorChain. >>> java:530) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ >> CopyingChainingOutput.collect(OperatorChain. >>> java:503) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ >> CopyingChainingOutput.collect(OperatorChain. >>> java:483) >>> at >>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$ >> CountingOutput.collect(AbstractStreamOperator. >>> java:891) >>> at >>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$ >> CountingOutput.collect(AbstractStreamOperator. >>> java:869) >>> at >>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect( >> TimestampedCollector. >>> java:51) >>> at >>> org.apache.flink.streaming.api.scala.DataStream$$anon$6$$ >> anonfun$flatMap$1.apply(DataStream. >>> scala:622) >>> at >>> org.apache.flink.streaming.api.scala.DataStream$$anon$6$$ >> anonfun$flatMap$1.apply(DataStream. >>> scala:622) >>> at scala.collection.immutable.List.foreach(List.scala:381) >>> at >>> org.apache.flink.streaming.api.scala.DataStream$$anon$6. >> flatMap(DataStream. >>> scala:622) >>> at >>> org.apache.flink.streaming.api.operators.StreamFlatMap. >> processElement(StreamFlatMap. >>> java:50) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ >> CopyingChainingOutput.pushToOperator(OperatorChain. >>> java:528) >>> ... 41 more >>> Caused by: java.lang.Exception: Failed to send data to Kafka: This server >>> is not the leader for that topic-partition. >>> at >>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase. >> checkErroneous(FlinkKafkaProducerBase. >>> java:373) >>> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010 >>> .invokeInternal(FlinkKafkaProducer010.java:302) >>> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010 >>> .invoke(FlinkKafkaProducer010.java:407) >>> at >>> org.apache.flink.streaming.api.operators.StreamSink. >> processElement(StreamSink. >>> java:41) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ >> CopyingChainingOutput.pushToOperator(OperatorChain. >>> java:528) >>> ... 52 more >>> >>> >>> Thanks >>> >>> -- >>> Shivam Sharma >>> Data Engineer @ Goibibo >>> Indian Institute Of Information Technology, Design and Manufacturing >>> Jabalpur >>> Mobile No- (+91) 8882114744 >>> Email:- 28shivamsha...@gmail.com >>> LinkedIn:-*https://www.linkedin.com/in/28shivamsharma >>> <https://www.linkedin.com/in/28shivamsharma>* >> >> > > > -- > Shivam Sharma > Data Engineer @ Goibibo > Indian Institute Of Information Technology, Design and Manufacturing > Jabalpur > Mobile No- (+91) 8882114744 > Email:- 28shivamsha...@gmail.com > LinkedIn:-*https://www.linkedin.com/in/28shivamsharma > <https://www.linkedin.com/in/28shivamsharma>*