Hi Aljoscha, yes, just like you're guessing, without asynchronous checkpoints, there has been no crash so far.
Regards, Federico 2017-10-12 18:08 GMT+02:00 Aljoscha Krettek <aljos...@apache.org>: > Hi Federico, > > I'm guessing the job is still working without asynchronous watermarks? I'm > very eager to figure out what is actually going wrong with asynchronous > checkpoints. > > Best, > Aljoscha > > > On 2. Oct 2017, at 11:57, Federico D'Ambrosio < > federico.dambro...@smartlab.ws> wrote: > > As a followup: > > the flink job has currently an uptime of almost 24 hours, with no > checkpoint failed or restart whereas, with async snapshots, it would have > already crashed 50 or so times. > > Regards, > Federico > > 2017-09-30 19:01 GMT+02:00 Federico D'Ambrosio < > federico.dambro...@smartlab.ws>: > >> Thank you very much, Gordon. >> >> I'll try to run the job without the asynchronous snapshots first thing. >> >> As for the Event data type: it's a case class with 2 fields: a String ID >> and a composite case class (let's call it RealEvent) containing 3 fields of >> the following types: Information, which is a case class with String fields, >> Coordinates, a nested case class with 2 Double and InstantValues, with 3 >> Integers and a DateTime.This DateTime field in InstantValues is the one >> being evalued in the maxBy (via InstantValues and RealEvent compareTo >> implementations, because dot notation is not working in scala as of 1.3.2, >> FLINK-7629 <https://issues.apache.org/jira/browse/FLINK-7629>) and that >> was the reason in the first place I had to register the >> JodaDateTimeSerializer with Kryo. >> >> Regards, >> Federico >> >> >> >> >> 2017-09-30 18:08 GMT+02:00 Tzu-Li (Gordon) Tai <tzuli...@apache.org>: >> >>> Hi, >>> >>> Thanks for the extra info, it was helpful (I’m not sure why your first >>> logs didn’t have the full trace, though). >>> >>> I spent some time digging through the error trace, and currently have >>> some observations I would like to go through first: >>> >>> 1. So it seems like the ArrayIndexOutOfBoundsException was thrown while >>> trying to access the state and making a copy (via serialization) in the >>> CopyOnWriteStateTable. >>> 2. The state that caused the exception seems to be the state of the >>> reducing window function (i.e. the maxBy). The state type should be the >>> same as the records in your `events` DataStream, which seems to be a Scala >>> case class with some nested field that requires Kryo for serialization. >>> 3. Somehow Kryo failed with the ArrayIndexOutOfBoundsException when >>> trying to copy that field .. >>> >>> My current guess would perhaps be that the serializer internally used >>> may have been incorrectly shared, which is probably why this exception >>> happens randomly for you. >>> I recall that there were similar issues that occurred before due to the >>> fact that some KryoSerializers aren't thread-safe and was incorrectly >>> shared in Flink. >>> >>> I may need some help from you to be able to look at this a bit more: >>> - Is it possible that you disable asynchronous snapshots and try running >>> this job a bit more to see if the problem still occurs? This is mainly to >>> eliminate my guess on whether or not there is some incorrect serializer >>> usage in the CopyOnWriteStateTable. >>> - Could you let us know what your `events` DataStream records type case >>> class looks like? >>> >>> Also looping in Aljoscha and Stefan here, as they would probably have >>> more insights in this. >>> >>> Cheers, >>> Gordon >>> >>> On 30 September 2017 at 10:56:33 AM, Federico D'Ambrosio ( >>> federico.dambro...@smartlab.ws) wrote: >>> >>> Hi Gordon, >>> >>> I remembered that I had already seen this kind of exception once during >>> the testing of the current job and fortunately I had the complete >>> stacktrace still saved on my pc: >>> >>> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1 >>> at com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:15 >>> 7) >>> at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822) >>> at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:863) >>> at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSeriali >>> zer.copy(KryoSerializer.java:176) >>> at org.apache.flink.api.scala.typeutils.CaseClassSerializer.cop >>> y(CaseClassSerializer.scala:101) >>> at org.apache.flink.api.scala.typeutils.CaseClassSerializer.cop >>> y(CaseClassSerializer.scala:32) >>> at org.apache.flink.api.scala.typeutils.CaseClassSerializer.cop >>> y(CaseClassSerializer.scala:101) >>> at org.apache.flink.api.scala.typeutils.CaseClassSerializer.cop >>> y(CaseClassSerializer.scala:32) >>> at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.ge >>> t(CopyOnWriteStateTable.java:279) >>> at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.ge >>> t(CopyOnWriteStateTable.java:296) >>> at org.apache.flink.runtime.state.heap.HeapReducingState.get(He >>> apReducingState.java:68) >>> at org.apache.flink.streaming.runtime.operators.windowing.Windo >>> wOperator.onEventTime(WindowOperator.java:498) >>> at org.apache.flink.streaming.api.operators.HeapInternalTimerSe >>> rvice.advanceWatermark(HeapInternalTimerService.java:275) >>> at org.apache.flink.streaming.api.operators.InternalTimeService >>> Manager.advanceWatermark(InternalTimeServiceManager.java:107) >>> at org.apache.flink.streaming.api.operators.AbstractStreamOpera >>> tor.processWatermark(AbstractStreamOperator.java:946) >>> at org.apache.flink.streaming.runtime.io.StreamInputProcessor$F >>> orwardingValveOutputHandler.handleWatermark(StreamInputProce >>> ssor.java:286) >>> at org.apache.flink.streaming.runtime.io.StreamInputProcessor$F >>> orwardingValveOutputHandler.handleWatermark(StreamInputProce >>> ssor.java:289) >>> at org.apache.flink.streaming.runtime.streamstatus.StatusWaterm >>> arkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(S >>> tatusWatermarkValve.java:173) >>> at org.apache.flink.streaming.runtime.streamstatus.StatusWaterm >>> arkValve.inputWatermark(StatusWatermarkValve.java:108) >>> at org.apache.flink.streaming.runtime.io.StreamInputProcessor.p >>> rocessInput(StreamInputProcessor.java:188) >>> at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask. >>> run(OneInputStreamTask.java:69) >>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(S >>> treamTask.java:263) >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) >>> at java.lang.Thread.run(Thread.java:748) >>> >>> I don't know why now the stacktrace is getting output only for the first >>> parts (handleWatermark and HeapReducingState). >>> >>> So, it looks like something that has to do with the KryoSerializer. As a >>> KryoSerializer I'm using JodaDateTimeSerializer, registered as follows: >>> >>> env.getConfig.addDefaultKryoSerializer(classOf[DateTime], >>> classOf[JodaDateTimeSerializer]) >>> >>> I hope this could help. >>> >>> Regards, >>> Federico >>> >>> 2017-09-29 15:54 GMT+02:00 Federico D'Ambrosio < >>> federico.dambro...@smartlab.ws>: >>> >>>> Hi Gordon, >>>> >>>> I'm currently using Flink 1.3.2 in local mode. >>>> >>>> If it's any help I realized from the log that the complete task which >>>> is failing is: >>>> >>>> 2017-09-29 14:17:20,354 INFO org.apache.flink.runtime.taskm >>>> anager.Task - latest_time -> (map_active_stream, >>>> map_history_stream) (1/1) (5a6c9f187326f678701f939665db6685) switched >>>> from RUNNING to FAILED. >>>> >>>> val events = keyedStreamByID >>>> .window(TumblingEventTimeWindows.of(Time.seconds(20))) >>>> .maxBy("time").name("latest_time").uid("latest_time") >>>> >>>> >>>> val activeStream = events >>>> //Serialization to JsValue >>>> .map(event => event.toMongoActiveJsValue).na >>>> me("map_active_stream").uid("map_active_stream") >>>> //Global windowing, the cause of exception should be above >>>> .timeWindowAll(Time.seconds(10)) >>>> .apply(new MongoWindow(MongoWritingType.U >>>> PDATE)).name("active_stream_window").uid("active_stream_window") >>>> >>>> val historyStream = airtrafficEvents >>>> //Serialization to JsValue >>>> .map(event => event.toMongoHistoryJsValue).n >>>> ame("map_history_stream").uid("map_history_stream") >>>> //Global windowing, the cause of exception should be above >>>> .timeWindowAll(Time.seconds(10)) >>>> .apply(new MongoWindow(MongoWritingType.U >>>> PDATE)).name("history_stream_window").uid("history_stream_window") >>>> >>>> >>>> >>>> Regards, >>>> Federico >>>> >>>> 2017-09-29 15:38 GMT+02:00 Tzu-Li (Gordon) Tai <tzuli...@apache.org>: >>>> >>>>> Hi, >>>>> >>>>> I’m looking into this. Could you let us know the Flink version in >>>>> which the exceptions occurred? >>>>> >>>>> Cheers, >>>>> Gordon >>>>> >>>>> >>>>> On 29 September 2017 at 3:11:30 PM, Federico D'Ambrosio ( >>>>> federico.dambro...@smartlab.ws) wrote: >>>>> >>>>> Hi, I'm coming across these Exceptions while running a pretty simple >>>>> flink job. >>>>> >>>>> First one: >>>>> java.lang.RuntimeException: Exception occurred while processing valve >>>>> output watermark: >>>>> at >>>>> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289) >>>>> at >>>>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173) >>>>> at >>>>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108) >>>>> at >>>>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188) >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263) >>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) >>>>> at java.lang.Thread.run(Thread.java:748) >>>>> Caused by: java.lang.ArrayIndexOutOfBoundsException >>>>> >>>>> The second one: >>>>> java.io.IOException: Exception while applying ReduceFunction in reducing >>>>> state >>>>> at >>>>> org.apache.flink.runtime.state.heap.HeapReducingState.add(HeapReducingState.java:82) >>>>> at >>>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:442) >>>>> at >>>>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206) >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263) >>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) >>>>> at java.lang.Thread.run(Thread.java:748) >>>>> Caused by: java.lang.ArrayIndexOutOfBoundsException >>>>> >>>>> >>>>> Since it looks like something is wrong in Watermark processing, in my >>>>> case Watermarks are generated in my KafkaSource: >>>>> >>>>> val stream = env.addSource( >>>>> new FlinkKafkaConsumer010[Event](topic, new >>>>> JSONDeserializationSchema(), consumerConfig) >>>>> .setStartFromLatest() >>>>> .assignTimestampsAndWatermarks( >>>>> new >>>>> BoundedOutOfOrdernessTimestampExtractor[Event](Time.seconds(10)) { >>>>> def extractTimestamp(element: AirTrafficEvent): Long = >>>>> element.instantValues.time.getMillis >>>>> }) >>>>> ) >>>>> >>>>> These exceptions aren't really that informative per se and, from what >>>>> I see, the task triggering these exceptions is the following operator: >>>>> >>>>> val events = keyedStreamByID >>>>> .window(TumblingEventTimeWindows.of(Time.seconds(20))) >>>>> .maxBy("timestamp").name("latest_time").uid("latest_time") >>>>> >>>>> What could be the problem here in your opinion? It's not emitting >>>>> watermarks correctly? I'm not even how I could reproduce this exceptions, >>>>> since it looks like they happen pretty much randomly. >>>>> >>>>> Thank you all, >>>>> Federico D'Ambrosio >>>>> >>>>> >>>> >>>> >>>> -- >>>> Federico D'Ambrosio >>>> >>> >>> >>> >>> -- >>> Federico D'Ambrosio >>> >>> >> >> >> -- >> Federico D'Ambrosio >> > > > > -- > Federico D'Ambrosio > > > -- Federico D'Ambrosio