Ha! Would you be able to share the code for that? If you don't acquire the "checkpoint lock" before writing this would explain the exception.
> On 23. Jul 2018, at 17:37, Gregory Fee <g...@lyft.com> wrote: > > Hi Aljoscha! I am not using any async i/o. I do use a trick similar to > ContinuousFileReaderOperator > <https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java> > where I use another thread to write to the output asynchronously though. > > On Mon, Jul 23, 2018 at 2:30 AM, Aljoscha Krettek <aljos...@apache.org > <mailto:aljos...@apache.org>> wrote: > Hi Greg, > > just making sure but is there any asynchrony in your user functions? Any > Async I/O operator maybe? > > Best, > Aljoscha > >> On 20. Jul 2018, at 21:53, Gregory Fee <g...@lyft.com >> <mailto:g...@lyft.com>> wrote: >> >> This is on Flink 1.4.2. I filed it as Flink-9905. Thanks! >> >> On Fri, Jul 20, 2018 at 2:51 AM, Dawid Wysakowicz <dwysakow...@apache.org >> <mailto:dwysakow...@apache.org>> wrote: >> Hi Gregory, >> I think it is some flink bug. Could you file a JIRA for it? Also which >> version of flink are you using? >> Best, >> Dawid >> >> On Fri, 20 Jul 2018 at 04:34, vino yang <yanghua1...@gmail.com >> <mailto:yanghua1...@gmail.com>> wrote: >> Hi Gregory, >> >> This exception seems a bug, you can create a issues in the JIRA. >> >> Thanks, vino. >> >> 2018-07-20 10:28 GMT+08:00 Philip Doctor <philip.doc...@physiq.com >> <mailto:philip.doc...@physiq.com>>: >> Oh you were asking about the cast exception, I haven't seen that before, >> sorry to be off topic. >> >> >> >> From: Philip Doctor <philip.doc...@physiq.com >> <mailto:philip.doc...@physiq.com>> >> Sent: Thursday, July 19, 2018 9:27:15 PM >> To: Gregory Fee; user >> Subject: Re: org.apache.flink.streaming.runtime.streamrecord.LatencyMarker >> cannot be cast to >> org.apache.flink.streaming.runtime.streamrecord.StreamRecord >> >> >> I'm just a flink user, not an expert. I've seen that exception before. I >> have never seen it be the actual error, I usually see it when some other >> operator is throwing an uncaught exception and busy dying. It seems to me >> that the prior operator throws this error "Can't forward to the next >> operator" why? because the next operator's already dead, but the job is busy >> dying asynchronously, so you get a cloud of errors that sort of surround the >> root cause. I'd read your logs a little further back. >> From: Gregory Fee <g...@lyft.com <mailto:g...@lyft.com>> >> Sent: Thursday, July 19, 2018 9:10:37 PM >> To: user >> Subject: org.apache.flink.streaming.runtime.streamrecord.LatencyMarker >> cannot be cast to >> org.apache.flink.streaming.runtime.streamrecord.StreamRecord >> >> Hello, I have a job running and I've gotten this error a few times. The job >> recovers from a checkpoint and seems to continue forward fine. Then the >> error will happen again sometime later, perhaps 1 hour. This looks like a >> Flink bug to me but I could use an expert opinion. Thanks! >> >> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: >> Could not forward element to next operator >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504) >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830) >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808) >> at >> com.lyft.streamingplatform.BetterBuffer$OutputThread.run(BetterBuffer.java:316) >> Caused by: >> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: >> Could not forward element to next operator >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504) >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830) >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808) >> at >> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) >> at >> com.lyft.dsp.functions.Labeler$UnlabelerFunction.processElement(Labeler.java:67) >> at >> com.lyft.dsp.functions.Labeler$UnlabelerFunction.processElement(Labeler.java:48) >> at org.apache.flink.streaming.api.operators.ProcessOperator.pro >> <http://operators.processoperator.pro/>cessElement(ProcessOperator.java:66) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549) >> ... 5 more >> Caused by: >> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: >> Could not forward element to next operator >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504) >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830) >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808) >> at >> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) >> at >> org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37) >> at >> org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28) >> at DataStreamSourceConversion$14.processElement(Unknown Source) >> at >> org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement(CRowOutputProcessRunner.scala:67) >> at org.apache.flink.streaming.api.operators.ProcessOperator.pro >> <http://operators.processoperator.pro/>cessElement(ProcessOperator.java:66) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549) >> ... 14 more >> Caused by: java.lang.RuntimeException: >> org.apache.flink.streaming.runtime.streamrecord.LatencyMarker cannot be cast >> to org.apache.flink.streaming.runtime.streamrecord.StreamRecord >> at >> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:105) >> at >> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:84) >> at >> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:42) >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830) >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808) >> at >> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) >> at >> org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37) >> at >> org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28) >> at DataStreamCalcRule$37.processElement(Unknown Source) >> at >> org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:66) >> at >> org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:35) >> at org.apache.flink.streaming.api.operators.ProcessOperator.pro >> <http://operators.processoperator.pro/>cessElement(ProcessOperator.java:66) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549) >> ... 25 more >> Caused by: java.lang.ClassCastException: >> org.apache.flink.streaming.runtime.streamrecord.LatencyMarker cannot be cast >> to org.apache.flink.streaming.runtime.streamrecord.StreamRecord >> at >> org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:61) >> at >> org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:32) >> at org.apache.flink.runtime.io >> <http://org.apache.flink.runtime.io/>.network.api.writer.RecordWriter.emit(RecordWriter.java:88) >> at >> org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:84) >> at >> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:102) >> ... 37 more >> >> -- >> Gregory Fee >> Engineer >> 425.830.4734 <tel:+14258304734> >> <http://www.lyft.com/> >> >> >> >> -- >> Gregory Fee >> Engineer >> 425.830.4734 <tel:+14258304734> >> <http://www.lyft.com/> > > > > -- > Gregory Fee > Engineer > 425.830.4734 <tel:+14258304734> > <http://www.lyft.com/>