*@Fabian do you register any types / serializers via ExecutionConfig.registerKryoType(...) / ExecutionConfig.registerTypeWithKryoSerializer(...)?*
Nope, not at all. our flink job code has nowhere the word "Kryo" at all. thx for looking into it ... -- *Fabian WollertZalando SE* E-Mail: fab...@zalando.de Am Do., 4. Juli 2019 um 11:51 Uhr schrieb Tzu-Li (Gordon) Tai < tzuli...@apache.org>: > I quickly checked the implementation of duplicate() for both the > KryoSerializer and StreamElementSerializer (which are the only serializers > involved here). > They seem to be correct; especially for the KryoSerializer, since > FLINK-8836 [1] we now always perform a deep copy of the KryoSerializer when > duplicating it, and therefore Kryo instances should not be shared at all > across duplicates. > This seems to rule out any duplication issues with the serializers. > > As a maybe relevant question, @Fabian do you register any types / > serializers via ExecutionConfig.registerKryoType(...) / > ExecutionConfig.registerTypeWithKryoSerializer(...)? > > Best, > Gordon > > [1] https://issues.apache.org/jira/browse/FLINK-8836 > > On Thu, Jul 4, 2019 at 5:29 PM Fabian Wollert <fab...@zalando.de> wrote: > >> No, not yet. We lack some knowledge in understanding this. The only thing >> we found out that it happens most probably in the Elasticsearch Sink, >> because: >> - some error messages have the sink in their stack trace. >> - when bumping the ES nodes specs on AWS, the error happens less often >> (we haven't bumped it to super large instances yet, nor got to a state >> where they go away completely. also this would not be the ideal fix) >> >> so my current assumption is that some backpressuring is not happening >> correctly. but this is super vaguely, any other hints or support on this is >> highly appreciated. >> >> -- >> >> >> *Fabian WollertZalando SE* >> >> E-Mail: fab...@zalando.de >> >> >> Am Do., 4. Juli 2019 um 11:26 Uhr schrieb Flavio Pompermaier < >> pomperma...@okkam.it>: >> >>> Any news on this? Have you found the cause of the error? >>> >>> On Fri, Jun 28, 2019 at 10:10 AM Flavio Pompermaier < >>> pomperma...@okkam.it> wrote: >>> >>>> Indeed looking at StreamElementSerializer the duplicate() method could >>>> be bugged: >>>> >>>> @Override >>>> public StreamElementSerializer<T> duplicate() { >>>> TypeSerializer<T> copy = typeSerializer.duplicate(); >>>> return (copy == typeSerializer) ? this : new >>>> StreamElementSerializer<T>(copy); >>>> } >>>> >>>> Is ti safe to return this when copy == typeSerializer ...? >>>> >>>> On Fri, Jun 28, 2019 at 9:51 AM Flavio Pompermaier < >>>> pomperma...@okkam.it> wrote: >>>> >>>>> Hi Fabian, >>>>> we had similar errors with Flink 1.3 [1][2] and the error was caused >>>>> by the fact that a serialised was sharing the same object with multiple >>>>> threads. >>>>> The error was not deterministic and was changing from time to time. >>>>> So maybe it could be something similar (IMHO). >>>>> >>>>> [1] http://codeha.us/apache-flink-users/msg02010.html >>>>> [2] >>>>> http://mail-archives.apache.org/mod_mbox/flink-user/201606.mbox/%3ccaeluf_aic_izyw5f27knter_y6h4+nzg2cpniozqdgm+wk7...@mail.gmail.com%3e >>>>> >>>>> Best, >>>>> Flavio >>>>> >>>>> On Fri, Jun 28, 2019 at 8:52 AM Fabian Wollert <fab...@zalando.de> >>>>> wrote: >>>>> >>>>>> additionally we have these coming with this as well all the time: >>>>>> >>>>>> com.esotericsoftware.kryo.KryoException: >>>>>> java.lang.ArrayIndexOutOfBoundsException >>>>>> Serialization trace: >>>>>> _children (com.fasterxml.jackson.databind.node.ObjectNode) >>>>>> at >>>>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) >>>>>> at >>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) >>>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) >>>>>> at >>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:315) >>>>>> at >>>>>> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:209) >>>>>> at >>>>>> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:50) >>>>>> at >>>>>> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) >>>>>> at >>>>>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:141) >>>>>> at >>>>>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:172) >>>>>> at >>>>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) >>>>>> at >>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) >>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) >>>>>> at java.lang.Thread.run(Thread.java:748) >>>>>> Caused by: java.lang.ArrayIndexOutOfBoundsException >>>>>> >>>>>> >>>>>> or >>>>>> >>>>>> >>>>>> com.esotericsoftware.kryo.KryoException: >>>>>> java.lang.IndexOutOfBoundsException: Index: 97, Size: 29 >>>>>> Serialization trace: >>>>>> _children (com.fasterxml.jackson.databind.node.ObjectNode) >>>>>> at >>>>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) >>>>>> at >>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) >>>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) >>>>>> at >>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:315) >>>>>> at >>>>>> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:209) >>>>>> at >>>>>> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:50) >>>>>> at >>>>>> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) >>>>>> at >>>>>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:141) >>>>>> at >>>>>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:172) >>>>>> at >>>>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) >>>>>> at >>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) >>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) >>>>>> at java.lang.Thread.run(Thread.java:748) >>>>>> Caused by: java.lang.IndexOutOfBoundsException: Index: 97, Size: 29 >>>>>> at java.util.ArrayList.rangeCheck(ArrayList.java:657) >>>>>> at java.util.ArrayList.get(ArrayList.java:433) >>>>>> at >>>>>> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42) >>>>>> at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805) >>>>>> at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:728) >>>>>> at >>>>>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:131) >>>>>> at >>>>>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) >>>>>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) >>>>>> at >>>>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) >>>>>> ... 12 more >>>>>> >>>>>> >>>>>> -- >>>>>> >>>>>> >>>>>> *Fabian WollertZalando SE* >>>>>> >>>>>> E-Mail: fab...@zalando.de >>>>>> Phone: +49 152 03479412 >>>>>> >>>>>> >>>>>> >>>>>> Am Do., 27. Juni 2019 um 18:29 Uhr schrieb Fabian Wollert < >>>>>> fab...@zalando.de>: >>>>>> >>>>>>> Hi, we have some Flink Jobs (Flink Version 1.7.1) consuming from a >>>>>>> Custom Source and Ingesting into an Elasticsearch Cluster (V.5.6). In >>>>>>> recent times, we see more and more Exceptions happening like this: >>>>>>> >>>>>>> com.esotericsoftware.kryo.KryoException: Unable to find class: com. ^ >>>>>>> at >>>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138) >>>>>>> at >>>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115) >>>>>>> at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) >>>>>>> at >>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) >>>>>>> at >>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:315) >>>>>>> at >>>>>>> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:209) >>>>>>> at >>>>>>> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:50) >>>>>>> at >>>>>>> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) >>>>>>> at >>>>>>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:141) >>>>>>> at >>>>>>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:172) >>>>>>> at >>>>>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) >>>>>>> at >>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) >>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) >>>>>>> at java.lang.Thread.run(Thread.java:748) >>>>>>> Caused by: java.lang.ClassNotFoundException: com. ^ >>>>>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381) >>>>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424) >>>>>>> at >>>>>>> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:129) >>>>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357) >>>>>>> at java.lang.Class.forName0(Native Method) >>>>>>> at java.lang.Class.forName(Class.java:348) >>>>>>> at >>>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136) >>>>>>> >>>>>>> ... 13 more >>>>>>> >>>>>>> or >>>>>>> >>>>>>> com.esotericsoftware.kryo.KryoException: Unable to find class: >>>>>>> com.fasterxml.jackson.databind.node.DoubleNod >>>>>>> com.fasterxml.jackson.databind.node.ObjectNode >>>>>>> Serialization trace: >>>>>>> _children (com.fasterxml.jackson.databind.node.ObjectNode) >>>>>>> at >>>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138) >>>>>>> at >>>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115) >>>>>>> at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) >>>>>>> at >>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) >>>>>>> at >>>>>>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) >>>>>>> at >>>>>>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) >>>>>>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) >>>>>>> at >>>>>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) >>>>>>> at >>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) >>>>>>> at >>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) >>>>>>> at >>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:315) >>>>>>> at >>>>>>> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:209) >>>>>>> at >>>>>>> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:50) >>>>>>> at >>>>>>> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) >>>>>>> at >>>>>>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:141) >>>>>>> at >>>>>>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:172) >>>>>>> at >>>>>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) >>>>>>> at >>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) >>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) >>>>>>> at java.lang.Thread.run(Thread.java:748) >>>>>>> Caused by: java.lang.ClassNotFoundException: >>>>>>> com.fasterxml.jackson.databind.node.DoubleNod >>>>>>> com.fasterxml.jackson.databind.node.ObjectNode >>>>>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381) >>>>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424) >>>>>>> at >>>>>>> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:129) >>>>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357) >>>>>>> at java.lang.Class.forName0(Native Method) >>>>>>> at java.lang.Class.forName(Class.java:348) >>>>>>> at >>>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136) >>>>>>> >>>>>>> ... 19 more >>>>>>> >>>>>>> I guess somewhere the serialization between Steps in the TaskManager >>>>>>> fails. Unfortunately, it happens very unpredictably. My question is: has >>>>>>> someone seen this before? If yes, what was your approach on debugging >>>>>>> it? >>>>>>> We have this problem mostly right now on high volume event processing, >>>>>>> so >>>>>>> only when a high load is processed, then this appears. i tried to >>>>>>> investigate with TRACE log level already, but that keeps the instance >>>>>>> this >>>>>>> is running on more busy with writing tons of logs, which slows down >>>>>>> processing and eventually does not trigger the exception. I'm wondering >>>>>>> if >>>>>>> there is another way of investigation here possible. >>>>>>> >>>>>>> Thx in advance for any hints how to debug this. >>>>>>> >>>>>>> -- >>>>>>> >>>>>>> >>>>>>> *Fabian WollertZalando SE* >>>>>>> >>>>>>> E-Mail: fab...@zalando.de >>>>>>> >>>>>> >>>>> >>>> >>>> >>>