Hi,

When running our job, we’re seeing sporadic instances of when we have 
KryoExceptions.  I’m new to this area of Flink so I’m not exactly too sure what 
I could look out for.  From my understanding, Kryo is the default serializer 
for generic types, and whilst there is a potential performance penalty with 
using Kryo, it should be able to serialize / deserialize all objects without 
fail?

Another point is that our object is mutable through as it runs through the 
different operators, could periodic checkpointing be a cause of the below 
issues?

We are currently running Flink 1.7.1


11:02:43,075 INFO  org.apache.flink.runtime.taskmanager.Task                    
 - Window(ProcessingTimeSessionWindows(10000), ProcessingTimeTrigger, 
CoGroupWindowFunction) -> Flat Map -> Sink: Unnamed (1/1) (a83e88eaf06490de
c8326e4d9bd0ed26) switched from RUNNING to FAILED.
TimerException{com.esotericsoftware.kryo.KryoException: 
java.lang.ArrayIndexOutOfBoundsException: 1024
Serialization trace:
payload (com.celertech.analytics.bo.AnalyticsDataJsonMessage)}
        at 
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:288)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: com.esotericsoftware.kryo.KryoException: 
java.lang.ArrayIndexOutOfBoundsException: 1024
Serialization trace:
payload (com.celertech.analytics.bo.AnalyticsDataJsonMessage)
        at 
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:82)
        at 
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
        at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:505)
        at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:224)
        at 
org.apache.flink.streaming.api.datastream.CoGroupedStreams$UnionSerializer.copy(CoGroupedStreams.java:538)
        at 
org.apache.flink.streaming.api.datastream.CoGroupedStreams$UnionSerializer.copy(CoGroupedStreams.java:507)
        at 
org.apache.flink.api.common.typeutils.base.ListSerializer.copy(ListSerializer.java:99)
        at 
org.apache.flink.api.common.typeutils.base.ListSerializer.copy(ListSerializer.java:42)
        at 
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:287)
        at 
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:311)
        at 
org.apache.flink.runtime.state.heap.AbstractHeapAppendingState.getInternal(AbstractHeapAppendingState.java:57)
        at 
org.apache.flink.runtime.state.heap.HeapListState.get(HeapListState.java:85)
        at 
org.apache.flink.runtime.state.heap.HeapListState.get(HeapListState.java:43)
        at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:498)
        at 
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:235)
        at 
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285)
        ... 7 more
Caused by: java.lang.ArrayIndexOutOfBoundsException: 1024
        at 
com.esotericsoftware.kryo.util.IdentityObjectIntMap.getStash(IdentityObjectIntMap.java:256)
        at 
com.esotericsoftware.kryo.util.IdentityObjectIntMap.get(IdentityObjectIntMap.java:247)
        at 
com.esotericsoftware.kryo.util.MapReferenceResolver.getWrittenId(MapReferenceResolver.java:28)
        at com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:619)
        at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:564)
        at 
com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:84)
        at 
com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:21)
        at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
        at 
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
        ... 22 more


Regards,
Scott






-- 








_This message, including any attachments, may include private, 
privileged and confidential information and is intended only for the 
personal and confidential use of the intended recipient(s). If the reader 
of this message is not an intended recipient, you are hereby notified that 
any review, use, dissemination, distribution, printing or copying of this 
message or its contents is strictly prohibited and may be unlawful. If you 
are not an intended recipient or have received this communication in error, 
please immediately notify the sender by telephone and/or a reply email and 
permanently delete the original message, including any attachments, without 
making a copy._

Reply via email to