Are you using unaligned checkpoints? (there's a known bug in 1.12.0 which can lead to corrupted data when using UC) Can you tell us a little bit about your environment? (How are you deploying Flink, which state backend are you using, what kind of job (I guess DataStream API))
Somehow the process receiving the data is unable to deserialize it, most likely because they are configured differently (different classpath, dependency versions etc.) On Fri, Feb 5, 2021 at 10:36 AM 赵一旦 <hinobl...@gmail.com> wrote: > I do not think this is some code related problem anymore, maybe it is some > bug? > > 赵一旦 <hinobl...@gmail.com> 于2021年2月5日周五 下午4:30写道: > >> Hi all, I find that the failure always occurred in the second task, after >> the source task. So I do something in the first chaining task, I transform >> the 'Map' based class object to another normal class object, and the >> problem disappeared. >> >> Based on the new solution, I also tried to stop and restore job with >> savepoint (all successful). >> >> But, I also met another problem. Also this problem occurs while I stop >> the job, and also occurs in the second task after the source task. The log >> is below: >> 2021-02-05 16:21:26 >> java.io.EOFException >> at org.apache.flink.core.memory.DataInputDeserializer >> .readUnsignedByte(DataInputDeserializer.java:321) >> at org.apache.flink.types.StringValue.readString(StringValue.java:783 >> ) >> at org.apache.flink.api.common.typeutils.base.StringSerializer >> .deserialize(StringSerializer.java:75) >> at org.apache.flink.api.common.typeutils.base.StringSerializer >> .deserialize(StringSerializer.java:33) >> at org.apache.flink.api.java.typeutils.runtime.PojoSerializer >> .deserialize(PojoSerializer.java:411) >> at org.apache.flink.api.java.typeutils.runtime.PojoSerializer >> .deserialize(PojoSerializer.java:411) >> at org.apache.flink.streaming.runtime.streamrecord. >> StreamElementSerializer.deserialize(StreamElementSerializer.java:202) >> at org.apache.flink.streaming.runtime.streamrecord. >> StreamElementSerializer.deserialize(StreamElementSerializer.java:46) >> at org.apache.flink.runtime.plugable. >> NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate >> .java:55) >> at org.apache.flink.runtime.io.network.api.serialization. >> SpillingAdaptiveSpanningRecordDeserializer.getNextRecord( >> SpillingAdaptiveSpanningRecordDeserializer.java:92) >> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput >> .emitNext(StreamTaskNetworkInput.java:145) >> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor >> .processInput(StreamOneInputProcessor.java:67) >> at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor >> .processInput(StreamTwoInputProcessor.java:92) >> at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput( >> StreamTask.java:372) >> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor >> .runMailboxLoop(MailboxProcessor.java:186) >> at org.apache.flink.streaming.runtime.tasks.StreamTask >> .runMailboxLoop(StreamTask.java:575) >> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( >> StreamTask.java:539) >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) >> at java.lang.Thread.run(Thread.java:748) >> >> It is also about serialize and deserialize, but not related to kryo this >> time. >> >> >> Till Rohrmann <trohrm...@apache.org> 于2021年2月3日周三 下午9:22写道: >> >>> From these snippets it is hard to tell what's going wrong. Could you >>> maybe give us a minimal example with which to reproduce the problem? >>> Alternatively, have you read through Flink's serializer documentation [1]? >>> Have you tried to use a simple POJO instead of inheriting from a HashMap? >>> >>> The stack trace looks as if the job fails deserializing some key of your >>> MapRecord map. >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/types_serialization.html#most-frequent-issues >>> >>> Cheers, >>> Till >>> >>> On Wed, Feb 3, 2021 at 11:49 AM 赵一旦 <hinobl...@gmail.com> wrote: >>> >>>> Some facts are possibly related with these, since another job do not >>>> meet these expectations. >>>> The problem job use a class which contains a field of Class MapRecord, >>>> and MapRecord is defined to extend HashMap so as to accept variable json >>>> data. >>>> >>>> Class MapRecord: >>>> >>>> @NoArgsConstructor >>>> @Slf4j >>>> public class MapRecord extends HashMap<Object, Object> implements >>>> Serializable { >>>> @Override >>>> public void setTimestamp(Long timestamp) { >>>> put("timestamp", timestamp); >>>> put("server_time", timestamp); >>>> } >>>> >>>> @Override >>>> public Long getTimestamp() { >>>> try { >>>> Object ts = getOrDefault("timestamp", >>>> getOrDefault("server_time", 0L)); >>>> return ((Number) >>>> Optional.ofNullable(ts).orElse(0L)).longValue(); >>>> } catch (Exception e) { >>>> log.error("Error, MapRecord's timestamp invalid.", e); >>>> return 0L; >>>> } >>>> } >>>> } >>>> >>>> Class UserAccessLog: >>>> >>>> public class UserAccessLog extends AbstractRecord<UserAccessLog> { >>>> private MapRecord d; // I think this is related to the problem... >>>> >>>> ... ... >>>> >>>> } >>>> >>>> >>>> 赵一旦 <hinobl...@gmail.com> 于2021年2月3日周三 下午6:43写道: >>>> >>>>> Actually the exception is different every time I stop the job. >>>>> Such as: >>>>> (1) com.esotericsoftware.kryo.KryoException: Unable to find class: g^XT >>>>> The stack as I given above. >>>>> >>>>> (2) java.lang.IndexOutOfBoundsException: Index: 46, Size: 17 >>>>> 2021-02-03 18:37:24 >>>>> java.lang.IndexOutOfBoundsException: Index: 46, Size: 17 >>>>> at java.util.ArrayList.rangeCheck(ArrayList.java:657) >>>>> at java.util.ArrayList.get(ArrayList.java:433) >>>>> at com.esotericsoftware.kryo.util.MapReferenceResolver >>>>> .getReadObject(MapReferenceResolver.java:42) >>>>> at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java: >>>>> 805) >>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759 >>>>> ) >>>>> at com.esotericsoftware.kryo.serializers.MapSerializer.read( >>>>> MapSerializer.java:135) >>>>> at com.esotericsoftware.kryo.serializers.MapSerializer.read( >>>>> MapSerializer.java:21) >>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761 >>>>> ) >>>>> at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer >>>>> .deserialize(KryoSerializer.java:346) >>>>> at org.apache.flink.api.java.typeutils.runtime.PojoSerializer >>>>> .deserialize(PojoSerializer.java:411) >>>>> at org.apache.flink.streaming.runtime.streamrecord. >>>>> StreamElementSerializer.deserialize(StreamElementSerializer.java:202) >>>>> at org.apache.flink.streaming.runtime.streamrecord. >>>>> StreamElementSerializer.deserialize(StreamElementSerializer.java:46) >>>>> at org.apache.flink.runtime.plugable. >>>>> NonReusingDeserializationDelegate.read( >>>>> NonReusingDeserializationDelegate.java:55) >>>>> at org.apache.flink.runtime.io.network.api.serialization. >>>>> SpillingAdaptiveSpanningRecordDeserializer.getNextRecord( >>>>> SpillingAdaptiveSpanningRecordDeserializer.java:92) >>>>> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput >>>>> .emitNext(StreamTaskNetworkInput.java:145) >>>>> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor >>>>> .processInput(StreamOneInputProcessor.java:67) >>>>> at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor >>>>> .processInput(StreamTwoInputProcessor.java:92) >>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>>>> .processInput(StreamTask.java:372) >>>>> at org.apache.flink.streaming.runtime.tasks.mailbox. >>>>> MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186) >>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>>>> .runMailboxLoop(StreamTask.java:575) >>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( >>>>> StreamTask.java:539) >>>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) >>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) >>>>> at java.lang.Thread.run(Thread.java:748) >>>>> >>>>> (3) com.esotericsoftware.kryo.KryoException: Encountered >>>>> unregistered class ID: 96 >>>>> com.esotericsoftware.kryo.KryoException: Encountered unregistered >>>>> class ID: 96 >>>>> at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass( >>>>> DefaultClassResolver.java:119) >>>>> at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) >>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752 >>>>> ) >>>>> at com.esotericsoftware.kryo.serializers.MapSerializer.read( >>>>> MapSerializer.java:135) >>>>> at com.esotericsoftware.kryo.serializers.MapSerializer.read( >>>>> MapSerializer.java:21) >>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761 >>>>> ) >>>>> at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer >>>>> .deserialize(KryoSerializer.java:346) >>>>> at org.apache.flink.api.java.typeutils.runtime.PojoSerializer >>>>> .deserialize(PojoSerializer.java:411) >>>>> at org.apache.flink.streaming.runtime.streamrecord. >>>>> StreamElementSerializer.deserialize(StreamElementSerializer.java:202) >>>>> at org.apache.flink.streaming.runtime.streamrecord. >>>>> StreamElementSerializer.deserialize(StreamElementSerializer.java:46) >>>>> at org.apache.flink.runtime.plugable. >>>>> NonReusingDeserializationDelegate.read( >>>>> NonReusingDeserializationDelegate.java:55) >>>>> at org.apache.flink.runtime.io.network.api.serialization. >>>>> SpillingAdaptiveSpanningRecordDeserializer.getNextRecord( >>>>> SpillingAdaptiveSpanningRecordDeserializer.java:92) >>>>> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput >>>>> .emitNext(StreamTaskNetworkInput.java:145) >>>>> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor >>>>> .processInput(StreamOneInputProcessor.java:67) >>>>> at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor >>>>> .processInput(StreamTwoInputProcessor.java:92) >>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>>>> .processInput(StreamTask.java:372) >>>>> at org.apache.flink.streaming.runtime.tasks.mailbox. >>>>> MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186) >>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>>>> .runMailboxLoop(StreamTask.java:575) >>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( >>>>> StreamTask.java:539) >>>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) >>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) >>>>> at java.lang.Thread.run(Thread.java:748) >>>>> >>>>> ... >>>>> >>>>> Till Rohrmann <trohrm...@apache.org> 于2021年2月3日周三 下午6:28写道: >>>>> >>>>>> Hi, >>>>>> >>>>>> could you show us the job you are trying to resume? Is it a SQL job >>>>>> or a DataStream job, for example? >>>>>> >>>>>> From the stack trace, it looks as if the class g^XT is not on the >>>>>> class path. >>>>>> >>>>>> Cheers, >>>>>> Till >>>>>> >>>>>> On Wed, Feb 3, 2021 at 10:30 AM 赵一旦 <hinobl...@gmail.com> wrote: >>>>>> >>>>>>> I have a job, the checkpoint and savepoint all right. >>>>>>> But, if I stop the job using 'stop -p', after the savepoint >>>>>>> generated, then the job goes to fail. Here is the log: >>>>>>> >>>>>>> 2021-02-03 16:53:55,179 WARN >>>>>>> org.apache.flink.runtime.taskmanager.Task [] - >>>>>>> ual_ft_uid_subid_SidIncludeFilter -> ual_ft_uid_subid_Default >>>>>>> PassThroughFilter[null, null) -> ual_ft_uid_subid_UalUidFtExtractor >>>>>>> -> ual_ft_uid_subid_EmptyUidFilter (17/30)#0 >>>>>>> (46abce5d1148b56094726d442df2fd9c) switched >>>>>>> from RUNNING to FAILED. >>>>>>> >>>>>>> com.esotericsoftware.kryo.KryoException: Unable to find class: g^XT >>>>>>> at >>>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138) >>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>> at >>>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115) >>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>> at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) >>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>> at >>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) >>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>> at >>>>>>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) >>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>> at >>>>>>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) >>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>> at >>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) >>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>> at >>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346) >>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>> at >>>>>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411) >>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>> at >>>>>>> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202) >>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>> at >>>>>>> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46) >>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>> at >>>>>>> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) >>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>> at >>>>>>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:92) >>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>> at >>>>>>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145) >>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>> at >>>>>>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) >>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>> at >>>>>>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:92) >>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>> at >>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372) >>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>> at >>>>>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186) >>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>> at >>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575) >>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>> at >>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539) >>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>> at >>>>>>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) >>>>>>> [flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>> at >>>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) >>>>>>> [flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_251] >>>>>>> Caused by: java.lang.ClassNotFoundException: g^XT >>>>>>> at >>>>>>> java.net.URLClassLoader.findClass(URLClassLoader.java:382) >>>>>>> ~[?:1.8.0_251] >>>>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:418) >>>>>>> ~[?:1.8.0_251] >>>>>>> at >>>>>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:63) >>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>> at >>>>>>> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:72) >>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>> at >>>>>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:49) >>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:351) >>>>>>> ~[?:1.8.0_251] >>>>>>> at >>>>>>> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:168) >>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>> at java.lang.Class.forName0(Native Method) ~[?:1.8.0_251] >>>>>>> at java.lang.Class.forName(Class.java:348) ~[?:1.8.0_251] >>>>>>> at >>>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136) >>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>> ... 22 more >>>>>>> >>>>>>>