Hello, I’m trying to convert some of our larger stateful computations into something that aligns more with the Flink windowing framework, and particularly, start using “event time” instead of “ingest time” as a time characteristics.
My data is coming in from Kafka (0.8.2.2, using the out-of-the-box Kafka source), and while my data is generally time-ordered, there are some upstream races, so I’m attempting to assign timestamps and watermarks using BoundedOutOfOrdernessTimestampExtractor, and a lateness of 30 seconds. When I assign timestamps directly in the Kafka sources (I’m also connecting two Kafka streams here) using FlinkKafkaConsumer.assignTimestampsAndWatermarks(), things work ok, but my extractor has to do a bunch of “faking” because not every record that is produced will have a valid timestamp - for example, a record that can’t be parsed won’t. When I assign timestamps downstream, after filtering the stream down to just records that are going to be windowed, I see errors in my Flink job: java.io.IOException: Exception while applying AggregateFunction in aggregating state at org.apache.flink.runtime.state.heap.HeapAggregatingState.add(HeapAggregatingState.java:107) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:358) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.ArrayIndexOutOfBoundsException: -1 at com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:157) at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822) at com.esotericsoftware.kryo.serializers.FieldSerializer.copy(FieldSerializer.java:625) at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:217) at org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69) at org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69) at scala.collection.immutable.List.foreach(List.scala:392) at org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:69) at org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.transform(CopyOnWriteStateTable.java:465) at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.transform(CopyOnWriteStateTable.java:341) at org.apache.flink.runtime.state.heap.HeapAggregatingState.add(HeapAggregatingState.java:105) ... 6 more I am calling aggregate() on my windows, but otherwise I see very little information that I can use to dig into this issue. Can anyone give me any insight into what is going wrong here? I’d much prefer assigning timestamps after filtering, rather than in the Kafka source, because I can filter down to only records that I know will have timestamps. When experimenting with the lateness in my timestamp/watermark assigner, I also saw a similarly opaque exception: java.lang.RuntimeException: Exception occurred while processing valve output watermark: at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265) at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189) at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.ArrayIndexOutOfBoundsException: 183 at com.esotericsoftware.kryo.util.IntMap.get(IntMap.java:302) at com.esotericsoftware.kryo.util.DefaultClassResolver.getRegistration(DefaultClassResolver.java:70) at com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:469) at com.esotericsoftware.kryo.Kryo.register(Kryo.java:420) at com.esotericsoftware.kryo.Kryo.register(Kryo.java:405) at org.apache.flink.api.java.typeutils.runtime.KryoUtils.applyRegistrations(KryoUtils.java:110) … Any tips? Thanks, Andrew -- *Confidentiality Notice: The information contained in this e-mail and any attachments may be confidential. If you are not an intended recipient, you are hereby notified that any dissemination, distribution or copying of this e-mail is strictly prohibited. If you have received this e-mail in error, please notify the sender and permanently delete the e-mail and any attachments immediately. You should not retain, copy or use this e-mail or any attachment for any purpose, nor disclose all or any part of the contents to any other person. Thank you.*