This is with flink 1.6.4. I was on 1.6.2 and saw Kryo issues in many more circumstances.
> On Mar 8, 2019, at 4:25 PM, Konstantin Knauf <konstan...@ververica.com> wrote: > > Hi Andrew, > > which Flink version do you use? This sounds a bit like > https://issues.apache.org/jira/browse/FLINK-8836. > > Cheers, > > Konstantin > >> On Thu, Mar 7, 2019 at 5:52 PM Andrew Roberts <arobe...@fuze.com> wrote: >> Hello, >> >> I’m trying to convert some of our larger stateful computations into >> something that aligns more with the Flink windowing framework, and >> particularly, start using “event time” instead of “ingest time” as a time >> characteristics. >> >> My data is coming in from Kafka (0.8.2.2, using the out-of-the-box Kafka >> source), and while my data is generally time-ordered, there are some >> upstream races, so I’m attempting to assign timestamps and watermarks using >> BoundedOutOfOrdernessTimestampExtractor, and a lateness of 30 seconds. When >> I assign timestamps directly in the Kafka sources (I’m also connecting two >> Kafka streams here) using >> FlinkKafkaConsumer.assignTimestampsAndWatermarks(), things work ok, but my >> extractor has to do a bunch of “faking” because not every record that is >> produced will have a valid timestamp - for example, a record that can’t be >> parsed won’t. >> >> When I assign timestamps downstream, after filtering the stream down to just >> records that are going to be windowed, I see errors in my Flink job: >> >> java.io.IOException: Exception while applying AggregateFunction in >> aggregating state >> at >> org.apache.flink.runtime.state.heap.HeapAggregatingState.add(HeapAggregatingState.java:107) >> at >> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:358) >> at >> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) >> at >> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) >> at java.lang.Thread.run(Thread.java:748) >> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1 >> at com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:157) >> at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822) >> at >> com.esotericsoftware.kryo.serializers.FieldSerializer.copy(FieldSerializer.java:625) >> at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862) >> at >> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:217) >> at >> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69) >> at >> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69) >> at scala.collection.immutable.List.foreach(List.scala:392) >> at >> org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:69) >> at >> org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33) >> at >> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101) >> at >> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) >> at >> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.transform(CopyOnWriteStateTable.java:465) >> at >> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.transform(CopyOnWriteStateTable.java:341) >> at >> org.apache.flink.runtime.state.heap.HeapAggregatingState.add(HeapAggregatingState.java:105) >> ... 6 more >> >> I am calling aggregate() on my windows, but otherwise I see very little >> information that I can use to dig into this issue. Can anyone give me any >> insight into what is going wrong here? I’d much prefer assigning timestamps >> after filtering, rather than in the Kafka source, because I can filter down >> to only records that I know will have timestamps. >> >> When experimenting with the lateness in my timestamp/watermark assigner, I >> also saw a similarly opaque exception: >> >> java.lang.RuntimeException: Exception occurred while processing valve output >> watermark: >> at >> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265) >> at >> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189) >> at >> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111) >> at >> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184) >> at >> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) >> at java.lang.Thread.run(Thread.java:748) >> Caused by: java.lang.ArrayIndexOutOfBoundsException: 183 >> at com.esotericsoftware.kryo.util.IntMap.get(IntMap.java:302) >> at >> com.esotericsoftware.kryo.util.DefaultClassResolver.getRegistration(DefaultClassResolver.java:70) >> at com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:469) >> at com.esotericsoftware.kryo.Kryo.register(Kryo.java:420) >> at com.esotericsoftware.kryo.Kryo.register(Kryo.java:405) >> at >> org.apache.flink.api.java.typeutils.runtime.KryoUtils.applyRegistrations(KryoUtils.java:110) >> … >> >> >> Any tips? >> >> >> Thanks, >> >> Andrew >> -- >> *Confidentiality Notice: The information contained in this e-mail and any >> >> attachments may be confidential. If you are not an intended recipient, you >> >> are hereby notified that any dissemination, distribution or copying of this >> >> e-mail is strictly prohibited. If you have received this e-mail in error, >> >> please notify the sender and permanently delete the e-mail and any >> >> attachments immediately. You should not retain, copy or use this e-mail or >> >> any attachment for any purpose, nor disclose all or any part of the >> >> contents to any other person. Thank you.* > > > -- > Konstantin Knauf | Solutions Architect > +49 160 91394525 > > > Follow us @VervericaData > -- > Join Flink Forward - The Apache Flink Conference > Stream Processing | Event Driven | Real Time > -- > Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > -- > Data Artisans GmbH > Registered at Amtsgericht Charlottenburg: HRB 158244 B > Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen -- *Confidentiality Notice: The information contained in this e-mail and any attachments may be confidential. If you are not an intended recipient, you are hereby notified that any dissemination, distribution or copying of this e-mail is strictly prohibited. If you have received this e-mail in error, please notify the sender and permanently delete the e-mail and any attachments immediately. You should not retain, copy or use this e-mail or any attachment for any purpose, nor disclose all or any part of the contents to any other person. Thank you.*