Ah yes! This started happening after I moved the code to do the write outside the lock. The reason I did that is because I've run into some situations where checkpoints seem to stall indefinitely without progress. My suspicion was that there is a deadlock condition but putting more thought into it I haven't been able to come up with exactly how that would occur.
On Mon, Jul 23, 2018 at 8:43 AM, Aljoscha Krettek <aljos...@apache.org> wrote: > Ha! Would you be able to share the code for that? If you don't acquire the > "checkpoint lock" before writing this would explain the exception. > > On 23. Jul 2018, at 17:37, Gregory Fee <g...@lyft.com> wrote: > > Hi Aljoscha! I am not using any async i/o. I do use a trick similar to > ContinuousFileReaderOperator > <https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java> > where I use another thread to write to the output asynchronously though. > > On Mon, Jul 23, 2018 at 2:30 AM, Aljoscha Krettek <aljos...@apache.org> > wrote: > >> Hi Greg, >> >> just making sure but is there any asynchrony in your user functions? Any >> Async I/O operator maybe? >> >> Best, >> Aljoscha >> >> On 20. Jul 2018, at 21:53, Gregory Fee <g...@lyft.com> wrote: >> >> This is on Flink 1.4.2. I filed it as Flink-9905. Thanks! >> >> On Fri, Jul 20, 2018 at 2:51 AM, Dawid Wysakowicz <dwysakow...@apache.org >> > wrote: >> >>> Hi Gregory, >>> I think it is some flink bug. Could you file a JIRA for it? Also which >>> version of flink are you using? >>> Best, >>> Dawid >>> >>> On Fri, 20 Jul 2018 at 04:34, vino yang <yanghua1...@gmail.com> wrote: >>> >>>> Hi Gregory, >>>> >>>> This exception seems a bug, you can create a issues in the JIRA. >>>> >>>> Thanks, vino. >>>> >>>> 2018-07-20 10:28 GMT+08:00 Philip Doctor <philip.doc...@physiq.com>: >>>> >>>>> Oh you were asking about the cast exception, I haven't seen that >>>>> before, sorry to be off topic. >>>>> >>>>> >>>>> >>>>> ------------------------------ >>>>> *From:* Philip Doctor <philip.doc...@physiq.com> >>>>> *Sent:* Thursday, July 19, 2018 9:27:15 PM >>>>> *To:* Gregory Fee; user >>>>> *Subject:* Re: org.apache.flink.streaming.run >>>>> time.streamrecord.LatencyMarker cannot be cast to >>>>> org.apache.flink.streaming.runtime.streamrecord.StreamRecord >>>>> >>>>> >>>>> I'm just a flink user, not an expert. I've seen that exception >>>>> before. I have never seen it be the actual error, I usually see it when >>>>> some other operator is throwing an uncaught exception and busy dying. It >>>>> seems to me that the prior operator throws this error "Can't forward to >>>>> the >>>>> next operator" why? because the next operator's already dead, but the job >>>>> is busy dying asynchronously, so you get a cloud of errors that sort of >>>>> surround the root cause. I'd read your logs a little further back. >>>>> ------------------------------ >>>>> *From:* Gregory Fee <g...@lyft.com> >>>>> *Sent:* Thursday, July 19, 2018 9:10:37 PM >>>>> *To:* user >>>>> *Subject:* org.apache.flink.streaming.run >>>>> time.streamrecord.LatencyMarker cannot be cast to >>>>> org.apache.flink.streaming.runtime.streamrecord.StreamRecord >>>>> >>>>> Hello, I have a job running and I've gotten this error a few times. >>>>> The job recovers from a checkpoint and seems to continue forward fine. >>>>> Then >>>>> the error will happen again sometime later, perhaps 1 hour. This looks >>>>> like >>>>> a Flink bug to me but I could use an expert opinion. Thanks! >>>>> >>>>> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: >>>>> Could not forward element to next operator >>>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi >>>>> ngChainingOutput.pushToOperator(OperatorChain.java:566) >>>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi >>>>> ngChainingOutput.collect(OperatorChain.java:524) >>>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi >>>>> ngChainingOutput.collect(OperatorChain.java:504) >>>>> at org.apache.flink.streaming.api.operators.AbstractStreamOpera >>>>> tor$CountingOutput.collect(AbstractStreamOperator.java:830) >>>>> at org.apache.flink.streaming.api.operators.AbstractStreamOpera >>>>> tor$CountingOutput.collect(AbstractStreamOperator.java:808) >>>>> at com.lyft.streamingplatform.BetterBuffer$OutputThread.run(Bet >>>>> terBuffer.java:316) >>>>> Caused by: org.apache.flink.streaming.run >>>>> time.tasks.ExceptionInChainedOperatorException: Could not forward >>>>> element to next operator >>>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi >>>>> ngChainingOutput.pushToOperator(OperatorChain.java:566) >>>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi >>>>> ngChainingOutput.collect(OperatorChain.java:524) >>>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi >>>>> ngChainingOutput.collect(OperatorChain.java:504) >>>>> at org.apache.flink.streaming.api.operators.AbstractStreamOpera >>>>> tor$CountingOutput.collect(AbstractStreamOperator.java:830) >>>>> at org.apache.flink.streaming.api.operators.AbstractStreamOpera >>>>> tor$CountingOutput.collect(AbstractStreamOperator.java:808) >>>>> at org.apache.flink.streaming.api.operators.TimestampedCollecto >>>>> r.collect(TimestampedCollector.java:51) >>>>> at com.lyft.dsp.functions.Labeler$UnlabelerFunction.processElem >>>>> ent(Labeler.java:67) >>>>> at com.lyft.dsp.functions.Labeler$UnlabelerFunction.processElem >>>>> ent(Labeler.java:48) >>>>> at org.apache.flink.streaming.api.operators.ProcessOperator.pro >>>>> <http://operators.processoperator.pro/>cessElement(ProcessOperator.ja >>>>> va:66) >>>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi >>>>> ngChainingOutput.pushToOperator(OperatorChain.java:549) >>>>> ... 5 more >>>>> Caused by: org.apache.flink.streaming.run >>>>> time.tasks.ExceptionInChainedOperatorException: Could not forward >>>>> element to next operator >>>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi >>>>> ngChainingOutput.pushToOperator(OperatorChain.java:566) >>>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi >>>>> ngChainingOutput.collect(OperatorChain.java:524) >>>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi >>>>> ngChainingOutput.collect(OperatorChain.java:504) >>>>> at org.apache.flink.streaming.api.operators.AbstractStreamOpera >>>>> tor$CountingOutput.collect(AbstractStreamOperator.java:830) >>>>> at org.apache.flink.streaming.api.operators.AbstractStreamOpera >>>>> tor$CountingOutput.collect(AbstractStreamOperator.java:808) >>>>> at org.apache.flink.streaming.api.operators.TimestampedCollecto >>>>> r.collect(TimestampedCollector.java:51) >>>>> at org.apache.flink.table.runtime.CRowWrappingCollector.collect >>>>> (CRowWrappingCollector.scala:37) >>>>> at org.apache.flink.table.runtime.CRowWrappingCollector.collect >>>>> (CRowWrappingCollector.scala:28) >>>>> at DataStreamSourceConversion$14.processElement(Unknown Source) >>>>> at org.apache.flink.table.runtime.CRowOutputProcessRunner.proce >>>>> ssElement(CRowOutputProcessRunner.scala:67) >>>>> at org.apache.flink.streaming.api.operators.ProcessOperator.pro >>>>> <http://operators.processoperator.pro/>cessElement(ProcessOperator.ja >>>>> va:66) >>>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi >>>>> ngChainingOutput.pushToOperator(OperatorChain.java:549) >>>>> ... 14 more >>>>> Caused by: java.lang.RuntimeException: org.apache.flink.streaming.run >>>>> time.streamrecord.LatencyMarker cannot be cast to >>>>> org.apache.flink.streaming.runtime.streamrecord.StreamRecord >>>>> at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pus >>>>> hToRecordWriter(RecordWriterOutput.java:105) >>>>> at org.apache.flink.streaming.runtime.io.RecordWriterOutput.col >>>>> lect(RecordWriterOutput.java:84) >>>>> at org.apache.flink.streaming.runtime.io.RecordWriterOutput.col >>>>> lect(RecordWriterOutput.java:42) >>>>> at org.apache.flink.streaming.api.operators.AbstractStreamOpera >>>>> tor$CountingOutput.collect(AbstractStreamOperator.java:830) >>>>> at org.apache.flink.streaming.api.operators.AbstractStreamOpera >>>>> tor$CountingOutput.collect(AbstractStreamOperator.java:808) >>>>> at org.apache.flink.streaming.api.operators.TimestampedCollecto >>>>> r.collect(TimestampedCollector.java:51) >>>>> at org.apache.flink.table.runtime.CRowWrappingCollector.collect >>>>> (CRowWrappingCollector.scala:37) >>>>> at org.apache.flink.table.runtime.CRowWrappingCollector.collect >>>>> (CRowWrappingCollector.scala:28) >>>>> at DataStreamCalcRule$37.processElement(Unknown Source) >>>>> at org.apache.flink.table.runtime.CRowProcessRunner.processElem >>>>> ent(CRowProcessRunner.scala:66) >>>>> at org.apache.flink.table.runtime.CRowProcessRunner.processElem >>>>> ent(CRowProcessRunner.scala:35) >>>>> at org.apache.flink.streaming.api.operators.ProcessOperator.pro >>>>> <http://operators.processoperator.pro/>cessElement(ProcessOperator.ja >>>>> va:66) >>>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi >>>>> ngChainingOutput.pushToOperator(OperatorChain.java:549) >>>>> ... 25 more >>>>> Caused by: java.lang.ClassCastException: org.apache.flink.streaming.run >>>>> time.streamrecord.LatencyMarker cannot be cast to >>>>> org.apache.flink.streaming.runtime.streamrecord.StreamRecord >>>>> at org.apache.flink.streaming.runtime.partitioner.KeyGroupStrea >>>>> mPartitioner.selectChannels(KeyGroupStreamPartitioner.java:61) >>>>> at org.apache.flink.streaming.runtime.partitioner.KeyGroupStrea >>>>> mPartitioner.selectChannels(KeyGroupStreamPartitioner.java:32) >>>>> at org.apache.flink.runtime.io.network.api.writer.RecordWriter. >>>>> emit(RecordWriter.java:88) >>>>> at org.apache.flink.streaming.runtime.io.StreamRecordWriter.emi >>>>> t(StreamRecordWriter.java:84) >>>>> at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pus >>>>> hToRecordWriter(RecordWriterOutput.java:102) >>>>> ... 37 more >>>>> >>>>> -- >>>>> *Gregory Fee* >>>>> Engineer >>>>> 425.830.4734 <+14258304734> >>>>> [image: Lyft] <http://www.lyft.com/> >>>>> >>>> >>>> >> >> >> -- >> *Gregory Fee* >> Engineer >> 425.830.4734 <+14258304734> >> [image: Lyft] <http://www.lyft.com/> >> >> >> > > > -- > *Gregory Fee* > Engineer > 425.830.4734 <+14258304734> > [image: Lyft] <http://www.lyft.com/> > > > -- *Gregory Fee* Engineer 425.830.4734 <+14258304734> [image: Lyft] <http://www.lyft.com>