Hi Andrew, which Flink version do you use? This sounds a bit like https://issues.apache.org/jira/browse/FLINK-8836.
Cheers, Konstantin On Thu, Mar 7, 2019 at 5:52 PM Andrew Roberts <arobe...@fuze.com> wrote: > Hello, > > I’m trying to convert some of our larger stateful computations into > something that aligns more with the Flink windowing framework, and > particularly, start using “event time” instead of “ingest time” as a time > characteristics. > > My data is coming in from Kafka (0.8.2.2, using the out-of-the-box Kafka > source), and while my data is generally time-ordered, there are some > upstream races, so I’m attempting to assign timestamps and watermarks using > BoundedOutOfOrdernessTimestampExtractor, and a lateness of 30 seconds. When > I assign timestamps directly in the Kafka sources (I’m also connecting two > Kafka streams here) using > FlinkKafkaConsumer.assignTimestampsAndWatermarks(), things work ok, but my > extractor has to do a bunch of “faking” because not every record that is > produced will have a valid timestamp - for example, a record that can’t be > parsed won’t. > > When I assign timestamps downstream, after filtering the stream down to > just records that are going to be windowed, I see errors in my Flink job: > > java.io.IOException: Exception while applying AggregateFunction in > aggregating state > at > org.apache.flink.runtime.state.heap.HeapAggregatingState.add(HeapAggregatingState.java:107) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:358) > at org.apache.flink.streaming.runtime.io > .StreamInputProcessor.processInput(StreamInputProcessor.java:202) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.ArrayIndexOutOfBoundsException: -1 > at com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:157) > at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.copy(FieldSerializer.java:625) > at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862) > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:217) > at > org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69) > at > org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69) > at scala.collection.immutable.List.foreach(List.scala:392) > at > org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:69) > at > org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) > at > org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.transform(CopyOnWriteStateTable.java:465) > at > org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.transform(CopyOnWriteStateTable.java:341) > at > org.apache.flink.runtime.state.heap.HeapAggregatingState.add(HeapAggregatingState.java:105) > ... 6 more > > I am calling aggregate() on my windows, but otherwise I see very little > information that I can use to dig into this issue. Can anyone give me any > insight into what is going wrong here? I’d much prefer assigning timestamps > after filtering, rather than in the Kafka source, because I can filter down > to only records that I know will have timestamps. > > When experimenting with the lateness in my timestamp/watermark assigner, I > also saw a similarly opaque exception: > > java.lang.RuntimeException: Exception occurred while processing valve > output watermark: > at org.apache.flink.streaming.runtime.io > .StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265) > at > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189) > at > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111) > at org.apache.flink.streaming.runtime.io > .StreamInputProcessor.processInput(StreamInputProcessor.java:184) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.ArrayIndexOutOfBoundsException: 183 > at com.esotericsoftware.kryo.util.IntMap.get(IntMap.java:302) > at > com.esotericsoftware.kryo.util.DefaultClassResolver.getRegistration(DefaultClassResolver.java:70) > at com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:469) > at com.esotericsoftware.kryo.Kryo.register(Kryo.java:420) > at com.esotericsoftware.kryo.Kryo.register(Kryo.java:405) > at > org.apache.flink.api.java.typeutils.runtime.KryoUtils.applyRegistrations(KryoUtils.java:110) > … > > > Any tips? > > > Thanks, > > Andrew > -- > *Confidentiality Notice: The information contained in this e-mail and any > > attachments may be confidential. If you are not an intended recipient, you > > are hereby notified that any dissemination, distribution or copying of this > > e-mail is strictly prohibited. If you have received this e-mail in error, > > please notify the sender and permanently delete the e-mail and any > > attachments immediately. You should not retain, copy or use this e-mail or > > any attachment for any purpose, nor disclose all or any part of the > > contents to any other person. Thank you.* > -- Konstantin Knauf | Solutions Architect +49 160 91394525 <https://www.ververica.com/> Follow us @VervericaData -- Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Data Artisans GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen