I agree, it looks like one of the two mentioned issues.

> Am 25.05.2018 um 06:15 schrieb sihua zhou <summerle...@163.com>:
> 
> Hi Gordon,
> 
> I think this might not be caused by 
> https://issues.apache.org/jira/browse/FLINK-9263 
> <https://issues.apache.org/jira/browse/FLINK-9263>, the bug in FLINK-9263 
> should only cause problem of Operate State, but in this case the exceptions 
> thrown from HeapValueState which is a type of Keyed State.
> 
> Best, Sihua
> 
> 
> 
> On 05/25/2018 11:48,Tzu-Li (Gordon) Tai<tzuli...@apache.org> 
> <mailto:tzuli...@apache.org> wrote: 
> Hi,
> 
> FYI, this is the JIRA ticket for the issue: 
> https://issues.apache.org/jira/browse/FLINK-8836 
> <https://issues.apache.org/jira/browse/FLINK-8836>
> Yes, this seems to be only included in 1.5.0 (to be released), and 1.4.3 
> (there has been no discussion on releasing that yet).
> 
> It could also be possible that the reported issue was caused by 
> https://issues.apache.org/jira/browse/FLINK-9263 
> <https://issues.apache.org/jira/browse/FLINK-9263> (which has a fix included 
> in 1.5.0)?
> 
> Cheers,
> Gordon
> On 25 May 2018 at 11:39:03 AM, sihua zhou (summerle...@163.com 
> <mailto:summerle...@163.com>) wrote:
> 
>> Hi,
>> this looks like the bug "when duplicating a KryoSerializer does not 
>> duplicate registered default serializers", and this has been fixed on the 
>> branch master, 1.5.0, and 1.4.x. But, unfortunately not included in 
>> 1.4.2(because this bug was discovered after 1.4.2 release). @Stefan plz 
>> correct me if I'm wrong.
>> 
>> 
>> Best, Sihua
>> 
>> On 05/25/2018 05:55,Ya-Te Wong<yate.w...@gmail.com> 
>> <mailto:yate.w...@gmail.com> wrote:
>> Hello,
>> 
>> We're using Flink version 1.4.2.
>> Our Flink job runs pretty well most of the time. But sometimes we see 
>> exceptions in the Kryo serializer.
>> The timing on when the exceptions would occur seems pretty random.
>> Sometimes we don't see any exceptions for 5 days. Sometimes we get 
>> exceptions within hours.
>> I have captured the stack traces of the last 3 times that the exceptions 
>> occurred. They are not exactly the same. The commonality is that our code in 
>> onTimer() is triggered as part of watermark handling. Our code then tried to 
>> get from the copy-on-write state table and eventually exception occurred in 
>> Kryo.
>> 
>> Has anyone seen something like that before?
>> 
>> Thanks,
>> 
>> 
>> 
>> [ Exception #1 ]
>> 
>> java.lang.RuntimeException: Exception occurred while processing valve output 
>> watermark:
>> at 
>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$ForwardingValveOutputHandler2.handleWatermark(StreamTwoInputProcessor.java:400)
>> at 
>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
>> at 
>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
>> at 
>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:252)
>> at 
>> org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:91)
>> at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: com.esotericsoftware.kryo.KryoException: 
>> java.lang.IndexOutOfBoundsException: Index: 3, Size: 0
>> Serialization trace:
>> timestamp (com.mycompany.datascience.datatypes.SensorStateEvent)
>> at 
>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>> at 
>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
>> at 
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:189)
>> at 
>> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
>> at 
>> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
>> at scala.collection.immutable.List.foreach(List.scala:392)
>> at 
>> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
>> at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:45)
>> at 
>> org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:69)
>> at 
>> org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33)
>> at 
>> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:282)
>> at 
>> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:306)
>> at 
>> org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:55)
>> at 
>> com.mycompany.datascience.scala.pushnotifier.operators.UserStateSensorStateJoin.onTimer(UserStateSensorStateJoin.scala:145)
>> at 
>> org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator.onEventTime(KeyedCoProcessOperator.java:95)
>> at 
>> org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288)
>> at 
>> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
>> at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:885)
>> at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark2(AbstractStreamOperator.java:912)
>> at 
>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$ForwardingValveOutputHandler2.handleWatermark(StreamTwoInputProcessor.java:397)
>> ... 7 more
>> Caused by: java.lang.IndexOutOfBoundsException: Index: 3, Size: 0
>> at java.util.ArrayList.rangeCheck(ArrayList.java:653)
>> at java.util.ArrayList.set(ArrayList.java:444)
>> at 
>> com.esotericsoftware.kryo.util.MapReferenceResolver.setReadObject(MapReferenceResolver.java:38)
>> at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:823)
>> at 
>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:524)
>> at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:730)
>> at 
>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
>> ... 27 more
>> 
>> 
>> 
>> [ Exception #2 ]
>> 
>> java.lang.RuntimeException: Exception occurred while processing valve output 
>> watermark:
>> at 
>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$ForwardingValveOutputHandler2.handleWatermark(StreamTwoInputProcessor.java:400)
>> at 
>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
>> at 
>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
>> at 
>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:252)
>> at 
>> org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:91)
>> at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: com.esotericsoftware.kryo.KryoException: Unable to find class: 
>> SerialNumber6H0G025
>> Serialization trace:
>> sensorState (com.mycompany.datascience.datatypes.SensorStateEvent)
>> at 
>> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
>> at 
>> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
>> at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>> at 
>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99)
>> at 
>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
>> at 
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:189)
>> at 
>> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
>> at 
>> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
>> at scala.collection.immutable.List.foreach(List.scala:392)
>> at 
>> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
>> at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:45)
>> at 
>> org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:69)
>> at 
>> org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33)
>> at 
>> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:282)
>> at 
>> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:306)
>> at 
>> org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:55)
>> at 
>> com.mycompany.datascience.scala.pushnotifier.operators.UserStateSensorStateJoin.onTimer(UserStateSensorStateJoin.scala:145)
>> at 
>> org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator.onEventTime(KeyedCoProcessOperator.java:95)
>> at 
>> org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288)
>> at 
>> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
>> at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:885)
>> at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark2(AbstractStreamOperator.java:912)
>> at 
>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$ForwardingValveOutputHandler2.handleWatermark(StreamTwoInputProcessor.java:397)
>> ... 7 more
>> Caused by: java.lang.ClassNotFoundException: SerialNumber6H0G025
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>> at 
>> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:128)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>> at java.lang.Class.forName0(Native Method)
>> at java.lang.Class.forName(Class.java:348)
>> at 
>> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136)
>> ... 30 more
>> 
>> 
>> 
>> [ Exception #3 ]
>> 
>> java.lang.RuntimeException: Exception occurred while processing valve output 
>> watermark:
>> at 
>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$ForwardingValveOutputHandler2.handleWatermark(StreamTwoInputProcessor.java:400)
>> at 
>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
>> at 
>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
>> at 
>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:252)
>> at 
>> org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:91)
>> at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
>> at com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:157)
>> at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822)
>> at 
>> com.esotericsoftware.kryo.serializers.FieldSerializer.copy(FieldSerializer.java:625)
>> at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
>> at 
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:175)
>> at 
>> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
>> at 
>> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
>> at scala.collection.immutable.List.foreach(List.scala:392)
>> at 
>> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
>> at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:45)
>> at 
>> org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:69)
>> at 
>> org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33)
>> at 
>> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:282)
>> at 
>> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:306)
>> at 
>> org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:55)
>> at 
>> com.mycompany.datascience.scala.pushnotifier.operators.UserStateSensorStateJoin.onTimer(UserStateSensorStateJoin.scala:145)
>> at 
>> org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator.onEventTime(KeyedCoProcessOperator.java:95)
>> at 
>> org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288)
>> at 
>> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
>> at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:885)
>> at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark2(AbstractStreamOperator.java:912)
>> at 
>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$ForwardingValveOutputHandler2.handleWatermark(StreamTwoInputProcessor.java:397)
>> ... 7 more
>> 
>> 
>> 
>> 
>> 

Reply via email to