Ah yes! This started happening after I moved the code to do the write
outside the lock. The reason I did that is because I've run into some
situations where checkpoints seem to stall indefinitely without progress.
My suspicion was that there is a deadlock condition but putting more
thought into it I haven't been able to come up with exactly how that would
occur.

On Mon, Jul 23, 2018 at 8:43 AM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Ha! Would you be able to share the code for that? If you don't acquire the
> "checkpoint lock" before writing this would explain the exception.
>
> On 23. Jul 2018, at 17:37, Gregory Fee <g...@lyft.com> wrote:
>
> Hi Aljoscha! I am not using any async i/o. I do use a trick similar to
> ContinuousFileReaderOperator
> <https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java>
>  where I use another thread to write to the output asynchronously though.
>
> On Mon, Jul 23, 2018 at 2:30 AM, Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
>> Hi Greg,
>>
>> just making sure but is there any asynchrony in your user functions? Any
>> Async I/O operator maybe?
>>
>> Best,
>> Aljoscha
>>
>> On 20. Jul 2018, at 21:53, Gregory Fee <g...@lyft.com> wrote:
>>
>> This is on Flink 1.4.2. I filed it as Flink-9905. Thanks!
>>
>> On Fri, Jul 20, 2018 at 2:51 AM, Dawid Wysakowicz <dwysakow...@apache.org
>> > wrote:
>>
>>> Hi Gregory,
>>> I think it is some flink bug. Could you file a JIRA for it? Also which
>>> version of flink are you using?
>>> Best,
>>> Dawid
>>>
>>> On Fri, 20 Jul 2018 at 04:34, vino yang <yanghua1...@gmail.com> wrote:
>>>
>>>> Hi Gregory,
>>>>
>>>> This exception seems a bug, you can create a issues in the JIRA.
>>>>
>>>> Thanks, vino.
>>>>
>>>> 2018-07-20 10:28 GMT+08:00 Philip Doctor <philip.doc...@physiq.com>:
>>>>
>>>>> Oh you were asking about the cast exception, I haven't seen that
>>>>> before, sorry to be off topic.
>>>>>
>>>>>
>>>>>
>>>>> ------------------------------
>>>>> *From:* Philip Doctor <philip.doc...@physiq.com>
>>>>> *Sent:* Thursday, July 19, 2018 9:27:15 PM
>>>>> *To:* Gregory Fee; user
>>>>> *Subject:* Re: org.apache.flink.streaming.run
>>>>> time.streamrecord.LatencyMarker cannot be cast to
>>>>> org.apache.flink.streaming.runtime.streamrecord.StreamRecord
>>>>>
>>>>>
>>>>> I'm just a flink user, not an expert.  I've seen that exception
>>>>> before.  I have never seen it be the actual error, I usually see it when
>>>>> some other operator is throwing an uncaught exception and busy dying.  It
>>>>> seems to me that the prior operator throws this error "Can't forward to 
>>>>> the
>>>>> next operator" why? because the next operator's already dead, but the job
>>>>> is busy dying asynchronously, so you get a cloud of errors that sort of
>>>>> surround the root cause.  I'd read your logs a little further back.
>>>>> ------------------------------
>>>>> *From:* Gregory Fee <g...@lyft.com>
>>>>> *Sent:* Thursday, July 19, 2018 9:10:37 PM
>>>>> *To:* user
>>>>> *Subject:* org.apache.flink.streaming.run
>>>>> time.streamrecord.LatencyMarker cannot be cast to
>>>>> org.apache.flink.streaming.runtime.streamrecord.StreamRecord
>>>>>
>>>>> Hello, I have a job running and I've gotten this error a few times.
>>>>> The job recovers from a checkpoint and seems to continue forward fine. 
>>>>> Then
>>>>> the error will happen again sometime later, perhaps 1 hour. This looks 
>>>>> like
>>>>> a Flink bug to me but I could use an expert opinion. Thanks!
>>>>>
>>>>> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
>>>>> Could not forward element to next operator
>>>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi
>>>>> ngChainingOutput.pushToOperator(OperatorChain.java:566)
>>>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi
>>>>> ngChainingOutput.collect(OperatorChain.java:524)
>>>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi
>>>>> ngChainingOutput.collect(OperatorChain.java:504)
>>>>> at org.apache.flink.streaming.api.operators.AbstractStreamOpera
>>>>> tor$CountingOutput.collect(AbstractStreamOperator.java:830)
>>>>> at org.apache.flink.streaming.api.operators.AbstractStreamOpera
>>>>> tor$CountingOutput.collect(AbstractStreamOperator.java:808)
>>>>> at com.lyft.streamingplatform.BetterBuffer$OutputThread.run(Bet
>>>>> terBuffer.java:316)
>>>>> Caused by: org.apache.flink.streaming.run
>>>>> time.tasks.ExceptionInChainedOperatorException: Could not forward
>>>>> element to next operator
>>>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi
>>>>> ngChainingOutput.pushToOperator(OperatorChain.java:566)
>>>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi
>>>>> ngChainingOutput.collect(OperatorChain.java:524)
>>>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi
>>>>> ngChainingOutput.collect(OperatorChain.java:504)
>>>>> at org.apache.flink.streaming.api.operators.AbstractStreamOpera
>>>>> tor$CountingOutput.collect(AbstractStreamOperator.java:830)
>>>>> at org.apache.flink.streaming.api.operators.AbstractStreamOpera
>>>>> tor$CountingOutput.collect(AbstractStreamOperator.java:808)
>>>>> at org.apache.flink.streaming.api.operators.TimestampedCollecto
>>>>> r.collect(TimestampedCollector.java:51)
>>>>> at com.lyft.dsp.functions.Labeler$UnlabelerFunction.processElem
>>>>> ent(Labeler.java:67)
>>>>> at com.lyft.dsp.functions.Labeler$UnlabelerFunction.processElem
>>>>> ent(Labeler.java:48)
>>>>> at org.apache.flink.streaming.api.operators.ProcessOperator.pro
>>>>> <http://operators.processoperator.pro/>cessElement(ProcessOperator.ja
>>>>> va:66)
>>>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi
>>>>> ngChainingOutput.pushToOperator(OperatorChain.java:549)
>>>>> ... 5 more
>>>>> Caused by: org.apache.flink.streaming.run
>>>>> time.tasks.ExceptionInChainedOperatorException: Could not forward
>>>>> element to next operator
>>>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi
>>>>> ngChainingOutput.pushToOperator(OperatorChain.java:566)
>>>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi
>>>>> ngChainingOutput.collect(OperatorChain.java:524)
>>>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi
>>>>> ngChainingOutput.collect(OperatorChain.java:504)
>>>>> at org.apache.flink.streaming.api.operators.AbstractStreamOpera
>>>>> tor$CountingOutput.collect(AbstractStreamOperator.java:830)
>>>>> at org.apache.flink.streaming.api.operators.AbstractStreamOpera
>>>>> tor$CountingOutput.collect(AbstractStreamOperator.java:808)
>>>>> at org.apache.flink.streaming.api.operators.TimestampedCollecto
>>>>> r.collect(TimestampedCollector.java:51)
>>>>> at org.apache.flink.table.runtime.CRowWrappingCollector.collect
>>>>> (CRowWrappingCollector.scala:37)
>>>>> at org.apache.flink.table.runtime.CRowWrappingCollector.collect
>>>>> (CRowWrappingCollector.scala:28)
>>>>> at DataStreamSourceConversion$14.processElement(Unknown Source)
>>>>> at org.apache.flink.table.runtime.CRowOutputProcessRunner.proce
>>>>> ssElement(CRowOutputProcessRunner.scala:67)
>>>>> at org.apache.flink.streaming.api.operators.ProcessOperator.pro
>>>>> <http://operators.processoperator.pro/>cessElement(ProcessOperator.ja
>>>>> va:66)
>>>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi
>>>>> ngChainingOutput.pushToOperator(OperatorChain.java:549)
>>>>> ... 14 more
>>>>> Caused by: java.lang.RuntimeException: org.apache.flink.streaming.run
>>>>> time.streamrecord.LatencyMarker cannot be cast to
>>>>> org.apache.flink.streaming.runtime.streamrecord.StreamRecord
>>>>> at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pus
>>>>> hToRecordWriter(RecordWriterOutput.java:105)
>>>>> at org.apache.flink.streaming.runtime.io.RecordWriterOutput.col
>>>>> lect(RecordWriterOutput.java:84)
>>>>> at org.apache.flink.streaming.runtime.io.RecordWriterOutput.col
>>>>> lect(RecordWriterOutput.java:42)
>>>>> at org.apache.flink.streaming.api.operators.AbstractStreamOpera
>>>>> tor$CountingOutput.collect(AbstractStreamOperator.java:830)
>>>>> at org.apache.flink.streaming.api.operators.AbstractStreamOpera
>>>>> tor$CountingOutput.collect(AbstractStreamOperator.java:808)
>>>>> at org.apache.flink.streaming.api.operators.TimestampedCollecto
>>>>> r.collect(TimestampedCollector.java:51)
>>>>> at org.apache.flink.table.runtime.CRowWrappingCollector.collect
>>>>> (CRowWrappingCollector.scala:37)
>>>>> at org.apache.flink.table.runtime.CRowWrappingCollector.collect
>>>>> (CRowWrappingCollector.scala:28)
>>>>> at DataStreamCalcRule$37.processElement(Unknown Source)
>>>>> at org.apache.flink.table.runtime.CRowProcessRunner.processElem
>>>>> ent(CRowProcessRunner.scala:66)
>>>>> at org.apache.flink.table.runtime.CRowProcessRunner.processElem
>>>>> ent(CRowProcessRunner.scala:35)
>>>>> at org.apache.flink.streaming.api.operators.ProcessOperator.pro
>>>>> <http://operators.processoperator.pro/>cessElement(ProcessOperator.ja
>>>>> va:66)
>>>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi
>>>>> ngChainingOutput.pushToOperator(OperatorChain.java:549)
>>>>> ... 25 more
>>>>> Caused by: java.lang.ClassCastException: org.apache.flink.streaming.run
>>>>> time.streamrecord.LatencyMarker cannot be cast to
>>>>> org.apache.flink.streaming.runtime.streamrecord.StreamRecord
>>>>> at org.apache.flink.streaming.runtime.partitioner.KeyGroupStrea
>>>>> mPartitioner.selectChannels(KeyGroupStreamPartitioner.java:61)
>>>>> at org.apache.flink.streaming.runtime.partitioner.KeyGroupStrea
>>>>> mPartitioner.selectChannels(KeyGroupStreamPartitioner.java:32)
>>>>> at org.apache.flink.runtime.io.network.api.writer.RecordWriter.
>>>>> emit(RecordWriter.java:88)
>>>>> at org.apache.flink.streaming.runtime.io.StreamRecordWriter.emi
>>>>> t(StreamRecordWriter.java:84)
>>>>> at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pus
>>>>> hToRecordWriter(RecordWriterOutput.java:102)
>>>>> ... 37 more
>>>>>
>>>>> --
>>>>> *Gregory Fee*
>>>>> Engineer
>>>>> 425.830.4734 <+14258304734>
>>>>> [image: Lyft] <http://www.lyft.com/>
>>>>>
>>>>
>>>>
>>
>>
>> --
>> *Gregory Fee*
>> Engineer
>> 425.830.4734 <+14258304734>
>> [image: Lyft] <http://www.lyft.com/>
>>
>>
>>
>
>
> --
> *Gregory Fee*
> Engineer
> 425.830.4734 <+14258304734>
> [image: Lyft] <http://www.lyft.com/>
>
>
>


-- 
*Gregory Fee*
Engineer
425.830.4734 <+14258304734>
[image: Lyft] <http://www.lyft.com>

Reply via email to