Hi, Sorry for previous mail it was sent by mistake.
Flink version: 1.3.2 EMR with latest yarn. java.lang.RuntimeException: Exception occurred while processing valve output watermark: at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289) at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173) at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:530) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) at org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect(TimeWindowPropertyCollector.scala:58) at org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply(IncrementalAggregateWindowFunction.scala:75) at org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply(IncrementalAggregateTimeWindowFunction.scala:65) at org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply(IncrementalAggregateTimeWindowFunction.scala:36) at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunction.java:44) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:597) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:504) at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:275) at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:107) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:946) at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:286) ... 7 more Caused by: java.lang.Exception: Failed to send data to Kafka: This server is not the leader for that topic-partition. at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:373) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.invokeInternal(FlinkKafkaProducer010.java:302) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.invoke(FlinkKafkaProducer010.java:407) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:41) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528) ... 47 more My job is reading data from kafka and writing to another kafka topic. On Wed, Dec 13, 2017 at 12:54 PM, Shivam Sharma <28shivamsha...@gmail.com> wrote: > Hi, > > Flink version: 1.3.2 > > -- > Shivam Sharma > Data Engineer @ Goibibo > Indian Institute Of Information Technology, Design and Manufacturing > Jabalpur > Mobile No- (+91) 8882114744 > Email:- 28shivamsha...@gmail.com > LinkedIn:-*https://www.linkedin.com/in/28shivamsharma > <https://www.linkedin.com/in/28shivamsharma>* > -- Shivam Sharma Data Engineer @ Goibibo Indian Institute Of Information Technology, Design and Manufacturing Jabalpur Mobile No- (+91) 8882114744 Email:- 28shivamsha...@gmail.com LinkedIn:-*https://www.linkedin.com/in/28shivamsharma <https://www.linkedin.com/in/28shivamsharma>*