Ha! Would you be able to share the code for that? If you don't acquire the 
"checkpoint lock" before writing this would explain the exception.

> On 23. Jul 2018, at 17:37, Gregory Fee <g...@lyft.com> wrote:
> 
> Hi Aljoscha! I am not using any async i/o. I do use a trick similar to 
> ContinuousFileReaderOperator 
> <https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java>
>  where I use another thread to write to the output asynchronously though.
> 
> On Mon, Jul 23, 2018 at 2:30 AM, Aljoscha Krettek <aljos...@apache.org 
> <mailto:aljos...@apache.org>> wrote:
> Hi Greg,
> 
> just making sure but is there any asynchrony in your user functions? Any 
> Async I/O operator maybe?
> 
> Best,
> Aljoscha
> 
>> On 20. Jul 2018, at 21:53, Gregory Fee <g...@lyft.com 
>> <mailto:g...@lyft.com>> wrote:
>> 
>> This is on Flink 1.4.2. I filed it as Flink-9905. Thanks!
>> 
>> On Fri, Jul 20, 2018 at 2:51 AM, Dawid Wysakowicz <dwysakow...@apache.org 
>> <mailto:dwysakow...@apache.org>> wrote:
>> Hi Gregory,
>> I think it is some flink bug. Could you file a JIRA for it? Also which 
>> version of flink are you using?
>> Best,
>> Dawid
>> 
>> On Fri, 20 Jul 2018 at 04:34, vino yang <yanghua1...@gmail.com 
>> <mailto:yanghua1...@gmail.com>> wrote:
>> Hi Gregory,
>> 
>> This exception seems a bug, you can create a issues in the JIRA. 
>> 
>> Thanks, vino.
>> 
>> 2018-07-20 10:28 GMT+08:00 Philip Doctor <philip.doc...@physiq.com 
>> <mailto:philip.doc...@physiq.com>>:
>> Oh you were asking about the cast exception, I haven't seen that before, 
>> sorry to be off topic.
>> 
>> 
>> 
>> From: Philip Doctor <philip.doc...@physiq.com 
>> <mailto:philip.doc...@physiq.com>>
>> Sent: Thursday, July 19, 2018 9:27:15 PM
>> To: Gregory Fee; user
>> Subject: Re: org.apache.flink.streaming.runtime.streamrecord.LatencyMarker 
>> cannot be cast to 
>> org.apache.flink.streaming.runtime.streamrecord.StreamRecord
>>  
>> 
>> I'm just a flink user, not an expert.  I've seen that exception before.  I 
>> have never seen it be the actual error, I usually see it when some other 
>> operator is throwing an uncaught exception and busy dying.  It seems to me 
>> that the prior operator throws this error "Can't forward to the next 
>> operator" why? because the next operator's already dead, but the job is busy 
>> dying asynchronously, so you get a cloud of errors that sort of surround the 
>> root cause.  I'd read your logs a little further back.
>> From: Gregory Fee <g...@lyft.com <mailto:g...@lyft.com>>
>> Sent: Thursday, July 19, 2018 9:10:37 PM
>> To: user
>> Subject: org.apache.flink.streaming.runtime.streamrecord.LatencyMarker 
>> cannot be cast to 
>> org.apache.flink.streaming.runtime.streamrecord.StreamRecord
>>  
>> Hello, I have a job running and I've gotten this error a few times. The job 
>> recovers from a checkpoint and seems to continue forward fine. Then the 
>> error will happen again sometime later, perhaps 1 hour. This looks like a 
>> Flink bug to me but I could use an expert opinion. Thanks!
>> 
>> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
>>  Could not forward element to next operator
>> at 
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566)
>> at 
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
>> at 
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
>> at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)
>> at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)
>> at 
>> com.lyft.streamingplatform.BetterBuffer$OutputThread.run(BetterBuffer.java:316)
>> Caused by: 
>> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
>>  Could not forward element to next operator
>> at 
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566)
>> at 
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
>> at 
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
>> at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)
>> at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)
>> at 
>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>> at 
>> com.lyft.dsp.functions.Labeler$UnlabelerFunction.processElement(Labeler.java:67)
>> at 
>> com.lyft.dsp.functions.Labeler$UnlabelerFunction.processElement(Labeler.java:48)
>> at org.apache.flink.streaming.api.operators.ProcessOperator.pro 
>> <http://operators.processoperator.pro/>cessElement(ProcessOperator.java:66)
>> at 
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
>> ... 5 more
>> Caused by: 
>> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
>>  Could not forward element to next operator
>> at 
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566)
>> at 
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
>> at 
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
>> at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)
>> at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)
>> at 
>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>> at 
>> org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37)
>> at 
>> org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28)
>> at DataStreamSourceConversion$14.processElement(Unknown Source)
>> at 
>> org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement(CRowOutputProcessRunner.scala:67)
>> at org.apache.flink.streaming.api.operators.ProcessOperator.pro 
>> <http://operators.processoperator.pro/>cessElement(ProcessOperator.java:66)
>> at 
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
>> ... 14 more
>> Caused by: java.lang.RuntimeException: 
>> org.apache.flink.streaming.runtime.streamrecord.LatencyMarker cannot be cast 
>> to org.apache.flink.streaming.runtime.streamrecord.StreamRecord
>> at 
>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:105)
>> at 
>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:84)
>> at 
>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:42)
>> at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)
>> at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)
>> at 
>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>> at 
>> org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37)
>> at 
>> org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28)
>> at DataStreamCalcRule$37.processElement(Unknown Source)
>> at 
>> org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:66)
>> at 
>> org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:35)
>> at org.apache.flink.streaming.api.operators.ProcessOperator.pro 
>> <http://operators.processoperator.pro/>cessElement(ProcessOperator.java:66)
>> at 
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
>> ... 25 more
>> Caused by: java.lang.ClassCastException: 
>> org.apache.flink.streaming.runtime.streamrecord.LatencyMarker cannot be cast 
>> to org.apache.flink.streaming.runtime.streamrecord.StreamRecord
>> at 
>> org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:61)
>> at 
>> org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:32)
>> at org.apache.flink.runtime.io 
>> <http://org.apache.flink.runtime.io/>.network.api.writer.RecordWriter.emit(RecordWriter.java:88)
>> at 
>> org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:84)
>> at 
>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:102)
>> ... 37 more
>> 
>> -- 
>> Gregory Fee
>> Engineer
>> 425.830.4734 <tel:+14258304734>
>>  <http://www.lyft.com/>
>> 
>> 
>> 
>> -- 
>> Gregory Fee
>> Engineer
>> 425.830.4734 <tel:+14258304734>
>>  <http://www.lyft.com/>
> 
> 
> 
> -- 
> Gregory Fee
> Engineer
> 425.830.4734 <tel:+14258304734>
>  <http://www.lyft.com/>

Reply via email to