I agree, it looks like one of the two mentioned issues.
> Am 25.05.2018 um 06:15 schrieb sihua zhou <summerle...@163.com>:
>
> Hi Gordon,
>
> I think this might not be caused by
> https://issues.apache.org/jira/browse/FLINK-9263
> <https://issues.apache.org/jira/browse/FLINK-9263>, the bug in FLINK-9263
> should only cause problem of Operate State, but in this case the exceptions
> thrown from HeapValueState which is a type of Keyed State.
>
> Best, Sihua
>
>
>
> On 05/25/2018 11:48,Tzu-Li (Gordon) Tai<tzuli...@apache.org>
> <mailto:tzuli...@apache.org> wrote:
> Hi,
>
> FYI, this is the JIRA ticket for the issue:
> https://issues.apache.org/jira/browse/FLINK-8836
> <https://issues.apache.org/jira/browse/FLINK-8836>
> Yes, this seems to be only included in 1.5.0 (to be released), and 1.4.3
> (there has been no discussion on releasing that yet).
>
> It could also be possible that the reported issue was caused by
> https://issues.apache.org/jira/browse/FLINK-9263
> <https://issues.apache.org/jira/browse/FLINK-9263> (which has a fix included
> in 1.5.0)?
>
> Cheers,
> Gordon
> On 25 May 2018 at 11:39:03 AM, sihua zhou (summerle...@163.com
> <mailto:summerle...@163.com>) wrote:
>
>> Hi,
>> this looks like the bug "when duplicating a KryoSerializer does not
>> duplicate registered default serializers", and this has been fixed on the
>> branch master, 1.5.0, and 1.4.x. But, unfortunately not included in
>> 1.4.2(because this bug was discovered after 1.4.2 release). @Stefan plz
>> correct me if I'm wrong.
>>
>>
>> Best, Sihua
>>
>> On 05/25/2018 05:55,Ya-Te Wong<yate.w...@gmail.com>
>> <mailto:yate.w...@gmail.com> wrote:
>> Hello,
>>
>> We're using Flink version 1.4.2.
>> Our Flink job runs pretty well most of the time. But sometimes we see
>> exceptions in the Kryo serializer.
>> The timing on when the exceptions would occur seems pretty random.
>> Sometimes we don't see any exceptions for 5 days. Sometimes we get
>> exceptions within hours.
>> I have captured the stack traces of the last 3 times that the exceptions
>> occurred. They are not exactly the same. The commonality is that our code in
>> onTimer() is triggered as part of watermark handling. Our code then tried to
>> get from the copy-on-write state table and eventually exception occurred in
>> Kryo.
>>
>> Has anyone seen something like that before?
>>
>> Thanks,
>>
>>
>>
>> [ Exception #1 ]
>>
>> java.lang.RuntimeException: Exception occurred while processing valve output
>> watermark:
>> at
>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$ForwardingValveOutputHandler2.handleWatermark(StreamTwoInputProcessor.java:400)
>> at
>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
>> at
>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
>> at
>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:252)
>> at
>> org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:91)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: com.esotericsoftware.kryo.KryoException:
>> java.lang.IndexOutOfBoundsException: Index: 3, Size: 0
>> Serialization trace:
>> timestamp (com.mycompany.datascience.datatypes.SensorStateEvent)
>> at
>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>> at
>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
>> at
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:189)
>> at
>> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
>> at
>> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
>> at scala.collection.immutable.List.foreach(List.scala:392)
>> at
>> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
>> at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:45)
>> at
>> org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:69)
>> at
>> org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33)
>> at
>> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:282)
>> at
>> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:306)
>> at
>> org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:55)
>> at
>> com.mycompany.datascience.scala.pushnotifier.operators.UserStateSensorStateJoin.onTimer(UserStateSensorStateJoin.scala:145)
>> at
>> org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator.onEventTime(KeyedCoProcessOperator.java:95)
>> at
>> org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288)
>> at
>> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:885)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark2(AbstractStreamOperator.java:912)
>> at
>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$ForwardingValveOutputHandler2.handleWatermark(StreamTwoInputProcessor.java:397)
>> ... 7 more
>> Caused by: java.lang.IndexOutOfBoundsException: Index: 3, Size: 0
>> at java.util.ArrayList.rangeCheck(ArrayList.java:653)
>> at java.util.ArrayList.set(ArrayList.java:444)
>> at
>> com.esotericsoftware.kryo.util.MapReferenceResolver.setReadObject(MapReferenceResolver.java:38)
>> at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:823)
>> at
>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:524)
>> at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:730)
>> at
>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
>> ... 27 more
>>
>>
>>
>> [ Exception #2 ]
>>
>> java.lang.RuntimeException: Exception occurred while processing valve output
>> watermark:
>> at
>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$ForwardingValveOutputHandler2.handleWatermark(StreamTwoInputProcessor.java:400)
>> at
>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
>> at
>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
>> at
>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:252)
>> at
>> org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:91)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: com.esotericsoftware.kryo.KryoException: Unable to find class:
>> SerialNumber6H0G025
>> Serialization trace:
>> sensorState (com.mycompany.datascience.datatypes.SensorStateEvent)
>> at
>> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
>> at
>> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
>> at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>> at
>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99)
>> at
>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
>> at
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:189)
>> at
>> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
>> at
>> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
>> at scala.collection.immutable.List.foreach(List.scala:392)
>> at
>> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
>> at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:45)
>> at
>> org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:69)
>> at
>> org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33)
>> at
>> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:282)
>> at
>> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:306)
>> at
>> org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:55)
>> at
>> com.mycompany.datascience.scala.pushnotifier.operators.UserStateSensorStateJoin.onTimer(UserStateSensorStateJoin.scala:145)
>> at
>> org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator.onEventTime(KeyedCoProcessOperator.java:95)
>> at
>> org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288)
>> at
>> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:885)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark2(AbstractStreamOperator.java:912)
>> at
>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$ForwardingValveOutputHandler2.handleWatermark(StreamTwoInputProcessor.java:397)
>> ... 7 more
>> Caused by: java.lang.ClassNotFoundException: SerialNumber6H0G025
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>> at
>> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:128)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>> at java.lang.Class.forName0(Native Method)
>> at java.lang.Class.forName(Class.java:348)
>> at
>> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136)
>> ... 30 more
>>
>>
>>
>> [ Exception #3 ]
>>
>> java.lang.RuntimeException: Exception occurred while processing valve output
>> watermark:
>> at
>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$ForwardingValveOutputHandler2.handleWatermark(StreamTwoInputProcessor.java:400)
>> at
>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
>> at
>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
>> at
>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:252)
>> at
>> org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:91)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
>> at com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:157)
>> at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822)
>> at
>> com.esotericsoftware.kryo.serializers.FieldSerializer.copy(FieldSerializer.java:625)
>> at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
>> at
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:175)
>> at
>> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
>> at
>> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
>> at scala.collection.immutable.List.foreach(List.scala:392)
>> at
>> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
>> at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:45)
>> at
>> org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:69)
>> at
>> org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33)
>> at
>> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:282)
>> at
>> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:306)
>> at
>> org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:55)
>> at
>> com.mycompany.datascience.scala.pushnotifier.operators.UserStateSensorStateJoin.onTimer(UserStateSensorStateJoin.scala:145)
>> at
>> org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator.onEventTime(KeyedCoProcessOperator.java:95)
>> at
>> org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288)
>> at
>> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:885)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark2(AbstractStreamOperator.java:912)
>> at
>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$ForwardingValveOutputHandler2.handleWatermark(StreamTwoInputProcessor.java:397)
>> ... 7 more
>>
>>
>>
>>
>>