Hi Gregory, This exception seems a bug, you can create a issues in the JIRA.
Thanks, vino. 2018-07-20 10:28 GMT+08:00 Philip Doctor <philip.doc...@physiq.com>: > Oh you were asking about the cast exception, I haven't seen that before, > sorry to be off topic. > > > > > ------------------------------ > *From:* Philip Doctor <philip.doc...@physiq.com> > *Sent:* Thursday, July 19, 2018 9:27:15 PM > *To:* Gregory Fee; user > *Subject:* Re: org.apache.flink.streaming.runtime.streamrecord.LatencyMarker > cannot be cast to org.apache.flink.streaming.runtime.streamrecord. > StreamRecord > > > > I'm just a flink user, not an expert. I've seen that exception before. I > have never seen it be the actual error, I usually see it when some other > operator is throwing an uncaught exception and busy dying. It seems to me > that the prior operator throws this error "Can't forward to the next > operator" why? because the next operator's already dead, but the job is > busy dying asynchronously, so you get a cloud of errors that sort of > surround the root cause. I'd read your logs a little further back. > ------------------------------ > *From:* Gregory Fee <g...@lyft.com> > *Sent:* Thursday, July 19, 2018 9:10:37 PM > *To:* user > *Subject:* org.apache.flink.streaming.runtime.streamrecord.LatencyMarker > cannot be cast to org.apache.flink.streaming.runtime.streamrecord. > StreamRecord > > Hello, I have a job running and I've gotten this error a few times. The > job recovers from a checkpoint and seems to continue forward fine. Then the > error will happen again sometime later, perhaps 1 hour. This looks like a > Flink bug to me but I could use an expert opinion. Thanks! > > org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: > Could not forward element to next operator > > at org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.pushToOperator(OperatorChain.java:566) > > at org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.collect(OperatorChain.java:524) > > at org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.collect(OperatorChain.java:504) > > at org.apache.flink.streaming.api.operators.AbstractStreamOperator$ > CountingOutput.collect(AbstractStreamOperator.java:830) > > at org.apache.flink.streaming.api.operators.AbstractStreamOperator$ > CountingOutput.collect(AbstractStreamOperator.java:808) > > at com.lyft.streamingplatform.BetterBuffer$OutputThread.run( > BetterBuffer.java:316) > > Caused by: org.apache.flink.streaming.runtime.tasks. > ExceptionInChainedOperatorException: Could not forward element to next > operator > > at org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.pushToOperator(OperatorChain.java:566) > > at org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.collect(OperatorChain.java:524) > > at org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.collect(OperatorChain.java:504) > > at org.apache.flink.streaming.api.operators.AbstractStreamOperator$ > CountingOutput.collect(AbstractStreamOperator.java:830) > > at org.apache.flink.streaming.api.operators.AbstractStreamOperator$ > CountingOutput.collect(AbstractStreamOperator.java:808) > > at org.apache.flink.streaming.api.operators.TimestampedCollector.collect( > TimestampedCollector.java:51) > > at com.lyft.dsp.functions.Labeler$UnlabelerFunction. > processElement(Labeler.java:67) > > at com.lyft.dsp.functions.Labeler$UnlabelerFunction. > processElement(Labeler.java:48) > > at org.apache.flink.streaming.api.operators.ProcessOperator. > processElement(ProcessOperator.java:66) > > at org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.pushToOperator(OperatorChain.java:549) > > ... 5 more > > Caused by: org.apache.flink.streaming.runtime.tasks. > ExceptionInChainedOperatorException: Could not forward element to next > operator > > at org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.pushToOperator(OperatorChain.java:566) > > at org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.collect(OperatorChain.java:524) > > at org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.collect(OperatorChain.java:504) > > at org.apache.flink.streaming.api.operators.AbstractStreamOperator$ > CountingOutput.collect(AbstractStreamOperator.java:830) > > at org.apache.flink.streaming.api.operators.AbstractStreamOperator$ > CountingOutput.collect(AbstractStreamOperator.java:808) > > at org.apache.flink.streaming.api.operators.TimestampedCollector.collect( > TimestampedCollector.java:51) > > at org.apache.flink.table.runtime.CRowWrappingCollector. > collect(CRowWrappingCollector.scala:37) > > at org.apache.flink.table.runtime.CRowWrappingCollector. > collect(CRowWrappingCollector.scala:28) > > at DataStreamSourceConversion$14.processElement(Unknown Source) > > at org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement( > CRowOutputProcessRunner.scala:67) > > at org.apache.flink.streaming.api.operators.ProcessOperator. > processElement(ProcessOperator.java:66) > > at org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.pushToOperator(OperatorChain.java:549) > > ... 14 more > > Caused by: java.lang.RuntimeException: org.apache.flink.streaming. > runtime.streamrecord.LatencyMarker cannot be cast to > org.apache.flink.streaming.runtime.streamrecord.StreamRecord > > at org.apache.flink.streaming.runtime.io.RecordWriterOutput. > pushToRecordWriter(RecordWriterOutput.java:105) > > at org.apache.flink.streaming.runtime.io.RecordWriterOutput. > collect(RecordWriterOutput.java:84) > > at org.apache.flink.streaming.runtime.io.RecordWriterOutput. > collect(RecordWriterOutput.java:42) > > at org.apache.flink.streaming.api.operators.AbstractStreamOperator$ > CountingOutput.collect(AbstractStreamOperator.java:830) > > at org.apache.flink.streaming.api.operators.AbstractStreamOperator$ > CountingOutput.collect(AbstractStreamOperator.java:808) > > at org.apache.flink.streaming.api.operators.TimestampedCollector.collect( > TimestampedCollector.java:51) > > at org.apache.flink.table.runtime.CRowWrappingCollector. > collect(CRowWrappingCollector.scala:37) > > at org.apache.flink.table.runtime.CRowWrappingCollector. > collect(CRowWrappingCollector.scala:28) > > at DataStreamCalcRule$37.processElement(Unknown Source) > > at org.apache.flink.table.runtime.CRowProcessRunner.processElement( > CRowProcessRunner.scala:66) > > at org.apache.flink.table.runtime.CRowProcessRunner.processElement( > CRowProcessRunner.scala:35) > > at org.apache.flink.streaming.api.operators.ProcessOperator. > processElement(ProcessOperator.java:66) > > at org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.pushToOperator(OperatorChain.java:549) > > ... 25 more > > Caused by: java.lang.ClassCastException: org.apache.flink.streaming. > runtime.streamrecord.LatencyMarker cannot be cast to > org.apache.flink.streaming.runtime.streamrecord.StreamRecord > > at org.apache.flink.streaming.runtime.partitioner. > KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner. > java:61) > > at org.apache.flink.streaming.runtime.partitioner. > KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner. > java:32) > > at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit( > RecordWriter.java:88) > > at org.apache.flink.streaming.runtime.io.StreamRecordWriter. > emit(StreamRecordWriter.java:84) > > at org.apache.flink.streaming.runtime.io.RecordWriterOutput. > pushToRecordWriter(RecordWriterOutput.java:102) > > ... 37 more > > -- > *Gregory Fee* > Engineer > 425.830.4734 <+14258304734> > [image: Lyft] <http://www.lyft.com> >