Hi Greg, just making sure but is there any asynchrony in your user functions? Any Async I/O operator maybe?
Best, Aljoscha > On 20. Jul 2018, at 21:53, Gregory Fee <g...@lyft.com> wrote: > > This is on Flink 1.4.2. I filed it as Flink-9905. Thanks! > > On Fri, Jul 20, 2018 at 2:51 AM, Dawid Wysakowicz <dwysakow...@apache.org > <mailto:dwysakow...@apache.org>> wrote: > Hi Gregory, > I think it is some flink bug. Could you file a JIRA for it? Also which > version of flink are you using? > Best, > Dawid > > On Fri, 20 Jul 2018 at 04:34, vino yang <yanghua1...@gmail.com > <mailto:yanghua1...@gmail.com>> wrote: > Hi Gregory, > > This exception seems a bug, you can create a issues in the JIRA. > > Thanks, vino. > > 2018-07-20 10:28 GMT+08:00 Philip Doctor <philip.doc...@physiq.com > <mailto:philip.doc...@physiq.com>>: > Oh you were asking about the cast exception, I haven't seen that before, > sorry to be off topic. > > > > From: Philip Doctor <philip.doc...@physiq.com > <mailto:philip.doc...@physiq.com>> > Sent: Thursday, July 19, 2018 9:27:15 PM > To: Gregory Fee; user > Subject: Re: org.apache.flink.streaming.runtime.streamrecord.LatencyMarker > cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord > > > I'm just a flink user, not an expert. I've seen that exception before. I > have never seen it be the actual error, I usually see it when some other > operator is throwing an uncaught exception and busy dying. It seems to me > that the prior operator throws this error "Can't forward to the next > operator" why? because the next operator's already dead, but the job is busy > dying asynchronously, so you get a cloud of errors that sort of surround the > root cause. I'd read your logs a little further back. > From: Gregory Fee <g...@lyft.com <mailto:g...@lyft.com>> > Sent: Thursday, July 19, 2018 9:10:37 PM > To: user > Subject: org.apache.flink.streaming.runtime.streamrecord.LatencyMarker cannot > be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord > > Hello, I have a job running and I've gotten this error a few times. The job > recovers from a checkpoint and seems to continue forward fine. Then the error > will happen again sometime later, perhaps 1 hour. This looks like a Flink bug > to me but I could use an expert opinion. Thanks! > > org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: > Could not forward element to next operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808) > at > com.lyft.streamingplatform.BetterBuffer$OutputThread.run(BetterBuffer.java:316) > Caused by: > org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: > Could not forward element to next operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) > at > com.lyft.dsp.functions.Labeler$UnlabelerFunction.processElement(Labeler.java:67) > at > com.lyft.dsp.functions.Labeler$UnlabelerFunction.processElement(Labeler.java:48) > at > org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549) > ... 5 more > Caused by: > org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: > Could not forward element to next operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) > at > org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37) > at > org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28) > at DataStreamSourceConversion$14.processElement(Unknown Source) > at > org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement(CRowOutputProcessRunner.scala:67) > at > org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549) > ... 14 more > Caused by: java.lang.RuntimeException: > org.apache.flink.streaming.runtime.streamrecord.LatencyMarker cannot be cast > to org.apache.flink.streaming.runtime.streamrecord.StreamRecord > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:105) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:84) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:42) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) > at > org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37) > at > org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28) > at DataStreamCalcRule$37.processElement(Unknown Source) > at > org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:66) > at > org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:35) > at > org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549) > ... 25 more > Caused by: java.lang.ClassCastException: > org.apache.flink.streaming.runtime.streamrecord.LatencyMarker cannot be cast > to org.apache.flink.streaming.runtime.streamrecord.StreamRecord > at > org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:61) > at > org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:32) > at org.apache.flink.runtime.io > <http://org.apache.flink.runtime.io/>.network.api.writer.RecordWriter.emit(RecordWriter.java:88) > at > org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:84) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:102) > ... 37 more > > -- > Gregory Fee > Engineer > 425.830.4734 <tel:+14258304734> > <http://www.lyft.com/> > > > > -- > Gregory Fee > Engineer > 425.830.4734 <tel:+14258304734> > <http://www.lyft.com/>