This is with flink 1.6.4. I was on 1.6.2 and saw Kryo issues in many more 
circumstances. 

> On Mar 8, 2019, at 4:25 PM, Konstantin Knauf <konstan...@ververica.com> wrote:
> 
> Hi Andrew, 
> 
> which Flink version do you use? This sounds a bit like 
> https://issues.apache.org/jira/browse/FLINK-8836.
> 
> Cheers, 
> 
> Konstantin
> 
>> On Thu, Mar 7, 2019 at 5:52 PM Andrew Roberts <arobe...@fuze.com> wrote:
>> Hello,
>> 
>> I’m trying to convert some of our larger stateful computations into 
>> something that aligns more with the Flink windowing framework, and 
>> particularly, start using “event time” instead of “ingest time” as a time 
>> characteristics.
>> 
>> My data is coming in from Kafka (0.8.2.2, using the out-of-the-box Kafka 
>> source), and while my data is generally time-ordered, there are some 
>> upstream races, so I’m attempting to assign timestamps and watermarks using 
>> BoundedOutOfOrdernessTimestampExtractor, and a lateness of 30 seconds. When 
>> I assign timestamps directly in the Kafka sources (I’m also connecting two 
>> Kafka streams here) using 
>> FlinkKafkaConsumer.assignTimestampsAndWatermarks(), things work ok, but my 
>> extractor has to do a bunch of “faking” because not every record that is 
>> produced will have a valid timestamp - for example, a record that can’t be 
>> parsed won’t.
>> 
>> When I assign timestamps downstream, after filtering the stream down to just 
>> records that are going to be windowed, I see errors in my Flink job:
>> 
>> java.io.IOException: Exception while applying AggregateFunction in 
>> aggregating state
>>         at 
>> org.apache.flink.runtime.state.heap.HeapAggregatingState.add(HeapAggregatingState.java:107)
>>         at 
>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:358)
>>         at 
>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>>         at 
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>>         at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>>         at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
>>         at com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:157)
>>         at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822)
>>         at 
>> com.esotericsoftware.kryo.serializers.FieldSerializer.copy(FieldSerializer.java:625)
>>         at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
>>         at 
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:217)
>>         at 
>> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
>>         at 
>> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
>>         at scala.collection.immutable.List.foreach(List.scala:392)
>>         at 
>> org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:69)
>>         at 
>> org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33)
>>         at 
>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
>>         at 
>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
>>         at 
>> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.transform(CopyOnWriteStateTable.java:465)
>>         at 
>> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.transform(CopyOnWriteStateTable.java:341)
>>         at 
>> org.apache.flink.runtime.state.heap.HeapAggregatingState.add(HeapAggregatingState.java:105)
>>         ... 6 more
>> 
>> I am calling aggregate() on my windows, but otherwise I see very little 
>> information that I can use to dig into this issue. Can anyone give me any 
>> insight into what is going wrong here? I’d much prefer assigning timestamps 
>> after filtering, rather than in the Kafka source, because I can filter down 
>> to only records that I know will have timestamps.
>> 
>> When experimenting with the lateness in my timestamp/watermark assigner, I 
>> also saw a similarly opaque exception:
>> 
>> java.lang.RuntimeException: Exception occurred while processing valve output 
>> watermark: 
>>         at 
>> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
>>         at 
>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
>>         at 
>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
>>         at 
>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
>>         at 
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>>         at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>>         at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.ArrayIndexOutOfBoundsException: 183
>>         at com.esotericsoftware.kryo.util.IntMap.get(IntMap.java:302)
>>         at 
>> com.esotericsoftware.kryo.util.DefaultClassResolver.getRegistration(DefaultClassResolver.java:70)
>>         at com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:469)
>>         at com.esotericsoftware.kryo.Kryo.register(Kryo.java:420)
>>         at com.esotericsoftware.kryo.Kryo.register(Kryo.java:405)
>>         at 
>> org.apache.flink.api.java.typeutils.runtime.KryoUtils.applyRegistrations(KryoUtils.java:110)
>>         …
>> 
>> 
>> Any tips?
>> 
>> 
>> Thanks,
>> 
>> Andrew
>> -- 
>> *Confidentiality Notice: The information contained in this e-mail and any
>> 
>> attachments may be confidential. If you are not an intended recipient, you
>> 
>> are hereby notified that any dissemination, distribution or copying of this
>> 
>> e-mail is strictly prohibited. If you have received this e-mail in error,
>> 
>> please notify the sender and permanently delete the e-mail and any
>> 
>> attachments immediately. You should not retain, copy or use this e-mail or
>> 
>> any attachment for any purpose, nor disclose all or any part of the
>> 
>> contents to any other person. Thank you.*
> 
> 
> -- 
> Konstantin Knauf | Solutions Architect
> +49 160 91394525
> 
> 
> Follow us @VervericaData
> --
> Join Flink Forward - The Apache Flink Conference
> Stream Processing | Event Driven | Real Time
> --
> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
> --
> Data Artisans GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen    

-- 
*Confidentiality Notice: The information contained in this e-mail and any

attachments may be confidential. If you are not an intended recipient, you

are hereby notified that any dissemination, distribution or copying of this

e-mail is strictly prohibited. If you have received this e-mail in error,

please notify the sender and permanently delete the e-mail and any

attachments immediately. You should not retain, copy or use this e-mail or

any attachment for any purpose, nor disclose all or any part of the

contents to any other person. Thank you.*

Reply via email to