Hi Andrew,

which Flink version do you use? This sounds a bit like
https://issues.apache.org/jira/browse/FLINK-8836.

Cheers,

Konstantin

On Thu, Mar 7, 2019 at 5:52 PM Andrew Roberts <arobe...@fuze.com> wrote:

> Hello,
>
> I’m trying to convert some of our larger stateful computations into
> something that aligns more with the Flink windowing framework, and
> particularly, start using “event time” instead of “ingest time” as a time
> characteristics.
>
> My data is coming in from Kafka (0.8.2.2, using the out-of-the-box Kafka
> source), and while my data is generally time-ordered, there are some
> upstream races, so I’m attempting to assign timestamps and watermarks using
> BoundedOutOfOrdernessTimestampExtractor, and a lateness of 30 seconds. When
> I assign timestamps directly in the Kafka sources (I’m also connecting two
> Kafka streams here) using
> FlinkKafkaConsumer.assignTimestampsAndWatermarks(), things work ok, but my
> extractor has to do a bunch of “faking” because not every record that is
> produced will have a valid timestamp - for example, a record that can’t be
> parsed won’t.
>
> When I assign timestamps downstream, after filtering the stream down to
> just records that are going to be windowed, I see errors in my Flink job:
>
> java.io.IOException: Exception while applying AggregateFunction in
> aggregating state
>         at
> org.apache.flink.runtime.state.heap.HeapAggregatingState.add(HeapAggregatingState.java:107)
>         at
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:358)
>         at org.apache.flink.streaming.runtime.io
> .StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>         at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
>         at com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:157)
>         at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822)
>         at
> com.esotericsoftware.kryo.serializers.FieldSerializer.copy(FieldSerializer.java:625)
>         at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
>         at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:217)
>         at
> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
>         at
> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
>         at scala.collection.immutable.List.foreach(List.scala:392)
>         at
> org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:69)
>         at
> org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33)
>         at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
>         at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
>         at
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.transform(CopyOnWriteStateTable.java:465)
>         at
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.transform(CopyOnWriteStateTable.java:341)
>         at
> org.apache.flink.runtime.state.heap.HeapAggregatingState.add(HeapAggregatingState.java:105)
>         ... 6 more
>
> I am calling aggregate() on my windows, but otherwise I see very little
> information that I can use to dig into this issue. Can anyone give me any
> insight into what is going wrong here? I’d much prefer assigning timestamps
> after filtering, rather than in the Kafka source, because I can filter down
> to only records that I know will have timestamps.
>
> When experimenting with the lateness in my timestamp/watermark assigner, I
> also saw a similarly opaque exception:
>
> java.lang.RuntimeException: Exception occurred while processing valve
> output watermark:
>         at org.apache.flink.streaming.runtime.io
> .StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
>         at
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
>         at
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
>         at org.apache.flink.streaming.runtime.io
> .StreamInputProcessor.processInput(StreamInputProcessor.java:184)
>         at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.ArrayIndexOutOfBoundsException: 183
>         at com.esotericsoftware.kryo.util.IntMap.get(IntMap.java:302)
>         at
> com.esotericsoftware.kryo.util.DefaultClassResolver.getRegistration(DefaultClassResolver.java:70)
>         at com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:469)
>         at com.esotericsoftware.kryo.Kryo.register(Kryo.java:420)
>         at com.esotericsoftware.kryo.Kryo.register(Kryo.java:405)
>         at
> org.apache.flink.api.java.typeutils.runtime.KryoUtils.applyRegistrations(KryoUtils.java:110)
>         …
>
>
> Any tips?
>
>
> Thanks,
>
> Andrew
> --
> *Confidentiality Notice: The information contained in this e-mail and any
>
> attachments may be confidential. If you are not an intended recipient, you
>
> are hereby notified that any dissemination, distribution or copying of this
>
> e-mail is strictly prohibited. If you have received this e-mail in error,
>
> please notify the sender and permanently delete the e-mail and any
>
> attachments immediately. You should not retain, copy or use this e-mail or
>
> any attachment for any purpose, nor disclose all or any part of the
>
> contents to any other person. Thank you.*
>


-- 

Konstantin Knauf | Solutions Architect

+49 160 91394525

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen

Reply via email to