Hi Andrew,

generally, this looks like a concurrency problem.

Are you using asynchronous checkpointing? If so, could you check if this
issue also occurs with synchronous checkpointing. There have been reports
recently, that there might be a problem with some Kryo types.

Can you set the logging level to DEBUG? We have some checks enabled in that
case in the Kryo serializer to verify that the KryoSerializer is really
concurrently accessed.

Are you using any Scala types, in particular collections or "Try"?

Cheers,

Konstantin

On Sat, Mar 9, 2019 at 6:22 AM Andrew Roberts <arobe...@fuze.com> wrote:

> This is with flink 1.6.4. I was on 1.6.2 and saw Kryo issues in many more
> circumstances.
>
> On Mar 8, 2019, at 4:25 PM, Konstantin Knauf <konstan...@ververica.com>
> wrote:
>
> Hi Andrew,
>
> which Flink version do you use? This sounds a bit like
> https://issues.apache.org/jira/browse/FLINK-8836.
>
> Cheers,
>
> Konstantin
>
> On Thu, Mar 7, 2019 at 5:52 PM Andrew Roberts <arobe...@fuze.com> wrote:
>
>> Hello,
>>
>> I’m trying to convert some of our larger stateful computations into
>> something that aligns more with the Flink windowing framework, and
>> particularly, start using “event time” instead of “ingest time” as a time
>> characteristics.
>>
>> My data is coming in from Kafka (0.8.2.2, using the out-of-the-box Kafka
>> source), and while my data is generally time-ordered, there are some
>> upstream races, so I’m attempting to assign timestamps and watermarks using
>> BoundedOutOfOrdernessTimestampExtractor, and a lateness of 30 seconds. When
>> I assign timestamps directly in the Kafka sources (I’m also connecting two
>> Kafka streams here) using
>> FlinkKafkaConsumer.assignTimestampsAndWatermarks(), things work ok, but my
>> extractor has to do a bunch of “faking” because not every record that is
>> produced will have a valid timestamp - for example, a record that can’t be
>> parsed won’t.
>>
>> When I assign timestamps downstream, after filtering the stream down to
>> just records that are going to be windowed, I see errors in my Flink job:
>>
>> java.io.IOException: Exception while applying AggregateFunction in
>> aggregating state
>>         at
>> org.apache.flink.runtime.state.heap.HeapAggregatingState.add(HeapAggregatingState.java:107)
>>         at
>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:358)
>>         at org.apache.flink.streaming.runtime.io
>> .StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>>         at
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>>         at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>>         at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
>>         at com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:157)
>>         at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822)
>>         at
>> com.esotericsoftware.kryo.serializers.FieldSerializer.copy(FieldSerializer.java:625)
>>         at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
>>         at
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:217)
>>         at
>> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
>>         at
>> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
>>         at scala.collection.immutable.List.foreach(List.scala:392)
>>         at
>> org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:69)
>>         at
>> org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33)
>>         at
>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
>>         at
>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
>>         at
>> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.transform(CopyOnWriteStateTable.java:465)
>>         at
>> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.transform(CopyOnWriteStateTable.java:341)
>>         at
>> org.apache.flink.runtime.state.heap.HeapAggregatingState.add(HeapAggregatingState.java:105)
>>         ... 6 more
>>
>> I am calling aggregate() on my windows, but otherwise I see very little
>> information that I can use to dig into this issue. Can anyone give me any
>> insight into what is going wrong here? I’d much prefer assigning timestamps
>> after filtering, rather than in the Kafka source, because I can filter down
>> to only records that I know will have timestamps.
>>
>> When experimenting with the lateness in my timestamp/watermark assigner,
>> I also saw a similarly opaque exception:
>>
>> java.lang.RuntimeException: Exception occurred while processing valve
>> output watermark:
>>         at org.apache.flink.streaming.runtime.io
>> .StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
>>         at
>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
>>         at
>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
>>         at org.apache.flink.streaming.runtime.io
>> .StreamInputProcessor.processInput(StreamInputProcessor.java:184)
>>         at
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>>         at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>>         at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.ArrayIndexOutOfBoundsException: 183
>>         at com.esotericsoftware.kryo.util.IntMap.get(IntMap.java:302)
>>         at
>> com.esotericsoftware.kryo.util.DefaultClassResolver.getRegistration(DefaultClassResolver.java:70)
>>         at com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:469)
>>         at com.esotericsoftware.kryo.Kryo.register(Kryo.java:420)
>>         at com.esotericsoftware.kryo.Kryo.register(Kryo.java:405)
>>         at
>> org.apache.flink.api.java.typeutils.runtime.KryoUtils.applyRegistrations(KryoUtils.java:110)
>>         …
>>
>>
>> Any tips?
>>
>>
>> Thanks,
>>
>> Andrew
>> --
>> *Confidentiality Notice: The information contained in this e-mail and any
>>
>> attachments may be confidential. If you are not an intended recipient, you
>>
>> are hereby notified that any dissemination, distribution or copying of
>> this
>>
>> e-mail is strictly prohibited. If you have received this e-mail in error,
>>
>> please notify the sender and permanently delete the e-mail and any
>>
>> attachments immediately. You should not retain, copy or use this e-mail or
>>
>> any attachment for any purpose, nor disclose all or any part of the
>>
>> contents to any other person. Thank you.*
>>
>
>
> --
>
> Konstantin Knauf | Solutions Architect
>
> +49 160 91394525
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Data Artisans GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>
>
> *Confidentiality Notice: The information contained in this e-mail and any
> attachments may be confidential. If you are not an intended recipient, you
> are hereby notified that any dissemination, distribution or copying of this
> e-mail is strictly prohibited. If you have received this e-mail in error,
> please notify the sender and permanently delete the e-mail and any
> attachments immediately. You should not retain, copy or use this e-mail or
> any attachment for any purpose, nor disclose all or any part of the
> contents to any other person. Thank you.*



-- 

Konstantin Knauf | Solutions Architect

+49 160 91394525

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen

Reply via email to