I think I finally found the problem, there was also already another bug report for this: https://issues.apache.org/jira/browse/FLINK-7484
> On 12. Oct 2017, at 18:22, Federico D'Ambrosio > <federico.dambro...@smartlab.ws> wrote: > > Hi Aljoscha, > > yes, just like you're guessing, without asynchronous checkpoints, there has > been no crash so far. > > Regards, > Federico > > 2017-10-12 18:08 GMT+02:00 Aljoscha Krettek <aljos...@apache.org > <mailto:aljos...@apache.org>>: > Hi Federico, > > I'm guessing the job is still working without asynchronous watermarks? I'm > very eager to figure out what is actually going wrong with asynchronous > checkpoints. > > Best, > Aljoscha > > >> On 2. Oct 2017, at 11:57, Federico D'Ambrosio >> <federico.dambro...@smartlab.ws <mailto:federico.dambro...@smartlab.ws>> >> wrote: >> >> As a followup: >> >> the flink job has currently an uptime of almost 24 hours, with no checkpoint >> failed or restart whereas, with async snapshots, it would have already >> crashed 50 or so times. >> >> Regards, >> Federico >> >> 2017-09-30 19:01 GMT+02:00 Federico D'Ambrosio >> <federico.dambro...@smartlab.ws <mailto:federico.dambro...@smartlab.ws>>: >> Thank you very much, Gordon. >> >> I'll try to run the job without the asynchronous snapshots first thing. >> >> As for the Event data type: it's a case class with 2 fields: a String ID and >> a composite case class (let's call it RealEvent) containing 3 fields of the >> following types: Information, which is a case class with String fields, >> Coordinates, a nested case class with 2 Double and InstantValues, with 3 >> Integers and a DateTime.This DateTime field in InstantValues is the one >> being evalued in the maxBy (via InstantValues and RealEvent compareTo >> implementations, because dot notation is not working in scala as of 1.3.2, >> FLINK-7629 <https://issues.apache.org/jira/browse/FLINK-7629>) and that was >> the reason in the first place I had to register the JodaDateTimeSerializer >> with Kryo. >> >> Regards, >> Federico >> >> >> >> >> 2017-09-30 18:08 GMT+02:00 Tzu-Li (Gordon) Tai <tzuli...@apache.org >> <mailto:tzuli...@apache.org>>: >> Hi, >> >> Thanks for the extra info, it was helpful (I’m not sure why your first logs >> didn’t have the full trace, though). >> >> I spent some time digging through the error trace, and currently have some >> observations I would like to go through first: >> >> 1. So it seems like the ArrayIndexOutOfBoundsException was thrown while >> trying to access the state and making a copy (via serialization) in the >> CopyOnWriteStateTable. >> 2. The state that caused the exception seems to be the state of the reducing >> window function (i.e. the maxBy). The state type should be the same as the >> records in your `events` DataStream, which seems to be a Scala case class >> with some nested field that requires Kryo for serialization. >> 3. Somehow Kryo failed with the ArrayIndexOutOfBoundsException when trying >> to copy that field .. >> >> My current guess would perhaps be that the serializer internally used may >> have been incorrectly shared, which is probably why this exception happens >> randomly for you. >> I recall that there were similar issues that occurred before due to the fact >> that some KryoSerializers aren't thread-safe and was incorrectly shared in >> Flink. >> >> I may need some help from you to be able to look at this a bit more: >> - Is it possible that you disable asynchronous snapshots and try running >> this job a bit more to see if the problem still occurs? This is mainly to >> eliminate my guess on whether or not there is some incorrect serializer >> usage in the CopyOnWriteStateTable. >> - Could you let us know what your `events` DataStream records type case >> class looks like? >> >> Also looping in Aljoscha and Stefan here, as they would probably have more >> insights in this. >> >> Cheers, >> Gordon >> >> On 30 September 2017 at 10:56:33 AM, Federico D'Ambrosio >> (federico.dambro...@smartlab.ws <mailto:federico.dambro...@smartlab.ws>) >> wrote: >> >>> Hi Gordon, >>> >>> I remembered that I had already seen this kind of exception once during the >>> testing of the current job and fortunately I had the complete stacktrace >>> still saved on my pc: >>> >>> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1 >>> at com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:157) >>> at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822) >>> at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:863) >>> at >>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:176) >>> at >>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101) >>> at >>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) >>> at >>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101) >>> at >>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) >>> at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.ge >>> <http://heap.copyonwritestatetable.ge/>t(CopyOnWriteStateTable.java:279) >>> at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.ge >>> <http://heap.copyonwritestatetable.ge/>t(CopyOnWriteStateTable.java:296) >>> at >>> org.apache.flink.runtime.state.heap.HeapReducingState.get(HeapReducingState.java:68) >>> at >>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:498) >>> at >>> org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:275) >>> at >>> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:107) >>> at >>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:946) >>> at org.apache.flink.streaming.runtime.io >>> <http://runtime.io/>.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:286) >>> at org.apache.flink.streaming.runtime.io >>> <http://runtime.io/>.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289) >>> at >>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173) >>> at >>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108) >>> at org.apache.flink.streaming.runtime.io >>> <http://runtime.io/>.StreamInputProcessor.processInput(StreamInputProcessor.java:188) >>> at >>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263) >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) >>> at java.lang.Thread.run(Thread.java:748) >>> >>> I don't know why now the stacktrace is getting output only for the first >>> parts (handleWatermark and HeapReducingState). >>> >>> So, it looks like something that has to do with the KryoSerializer. As a >>> KryoSerializer I'm using JodaDateTimeSerializer, registered as follows: >>> >>> env.getConfig.addDefaultKryoSerializer(classOf[DateTime], >>> classOf[JodaDateTimeSerializer]) >>> >>> I hope this could help. >>> >>> Regards, >>> Federico >>> >>> 2017-09-29 15:54 GMT+02:00 Federico D'Ambrosio >>> <federico.dambro...@smartlab.ws <mailto:federico.dambro...@smartlab.ws>>: >>> Hi Gordon, >>> >>> I'm currently using Flink 1.3.2 in local mode. >>> >>> If it's any help I realized from the log that the complete task which is >>> failing is: >>> >>> 2017-09-29 14:17:20,354 INFO org.apache.flink.runtime.taskmanager.Task >>> - latest_time -> (map_active_stream, map_history_stream) >>> (1/1) (5a6c9f187326f678701f939665db6685) switched from RUNNING to FAILED. >>> >>> val events = keyedStreamByID >>> .window(TumblingEventTimeWindows.of(Time.seconds(20))) >>> .maxBy("time").name("latest_time").uid("latest_time") >>> >>> >>> val activeStream = events >>> //Serialization to JsValue >>> .map(event => >>> event.toMongoActiveJsValue).name("map_active_stream").uid("map_active_stream") >>> //Global windowing, the cause of exception should be above >>> .timeWindowAll(Time.seconds(10)) >>> .apply(new >>> MongoWindow(MongoWritingType.UPDATE)).name("active_stream_window").uid("active_stream_window") >>> >>> val historyStream = airtrafficEvents >>> //Serialization to JsValue >>> .map(event => >>> event.toMongoHistoryJsValue).name("map_history_stream").uid("map_history_stream") >>> //Global windowing, the cause of exception should be above >>> .timeWindowAll(Time.seconds(10)) >>> .apply(new >>> MongoWindow(MongoWritingType.UPDATE)).name("history_stream_window").uid("history_stream_window") >>> >>> >>> >>> Regards, >>> Federico >>> >>> 2017-09-29 15:38 GMT+02:00 Tzu-Li (Gordon) Tai <tzuli...@apache.org >>> <mailto:tzuli...@apache.org>>: >>> Hi, >>> >>> I’m looking into this. Could you let us know the Flink version in which the >>> exceptions occurred? >>> >>> Cheers, >>> Gordon >>> >>> >>> On 29 September 2017 at 3:11:30 PM, Federico D'Ambrosio >>> (federico.dambro...@smartlab.ws <mailto:federico.dambro...@smartlab.ws>) >>> wrote: >>> >>>> Hi, I'm coming across these Exceptions while running a pretty simple flink >>>> job. >>>> First one: >>>> java.lang.RuntimeException: Exception occurred while processing valve >>>> output watermark: >>>> at org.apache.flink.streaming.runtime.io >>>> <http://runtime.io/>.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289) >>>> at >>>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173) >>>> at >>>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108) >>>> at org.apache.flink.streaming.runtime.io >>>> <http://runtime.io/>.StreamInputProcessor.processInput(StreamInputProcessor.java:188) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263) >>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) >>>> at java.lang.Thread.run(Thread.java:748) >>>> Caused by: java.lang.ArrayIndexOutOfBoundsException >>>> >>>> The second one: >>>> java.io.IOException: Exception while applying ReduceFunction in reducing >>>> state >>>> at >>>> org.apache.flink.runtime.state.heap.HeapReducingState.add(HeapReducingState.java:82) >>>> at >>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:442) >>>> at org.apache.flink.streaming.runtime.io >>>> <http://runtime.io/>.StreamInputProcessor.processInput(StreamInputProcessor.java:206) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263) >>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) >>>> at java.lang.Thread.run(Thread.java:748) >>>> Caused by: java.lang.ArrayIndexOutOfBoundsException >>>> >>>> >>>> Since it looks like something is wrong in Watermark processing, in my case >>>> Watermarks are generated in my KafkaSource: >>>> >>>> val stream = env.addSource( >>>> new FlinkKafkaConsumer010[Event](topic, new JSONDeserializationSchema(), >>>> consumerConfig) >>>> .setStartFromLatest() >>>> .assignTimestampsAndWatermarks( >>>> new BoundedOutOfOrdernessTimestampExtractor[Event](Time.seconds(10)) >>>> { >>>> def extractTimestamp(element: AirTrafficEvent): Long = >>>> element.instantValues.time.getMillis >>>> }) >>>> ) >>>> These exceptions aren't really that informative per se and, from what I >>>> see, the task triggering these exceptions is the following operator: >>>> >>>> val events = keyedStreamByID >>>> .window(TumblingEventTimeWindows.of(Time.seconds(20))) >>>> .maxBy("timestamp").name("latest_time").uid("latest_time") >>>> >>>> What could be the problem here in your opinion? It's not emitting >>>> watermarks correctly? I'm not even how I could reproduce this exceptions, >>>> since it looks like they happen pretty much randomly. >>>> >>>> Thank you all, >>>> Federico D'Ambrosio >>> >>> >>> >>> -- >>> Federico D'Ambrosio >>> >>> >>> >>> -- >>> Federico D'Ambrosio >> >> >> >> -- >> Federico D'Ambrosio >> >> >> >> -- >> Federico D'Ambrosio > > > > > -- > Federico D'Ambrosio