Hi all, I find that the failure always occurred in the second task, after the source task. So I do something in the first chaining task, I transform the 'Map' based class object to another normal class object, and the problem disappeared.
Based on the new solution, I also tried to stop and restore job with savepoint (all successful). But, I also met another problem. Also this problem occurs while I stop the job, and also occurs in the second task after the source task. The log is below: 2021-02-05 16:21:26 java.io.EOFException at org.apache.flink.core.memory.DataInputDeserializer.readUnsignedByte( DataInputDeserializer.java:321) at org.apache.flink.types.StringValue.readString(StringValue.java:783) at org.apache.flink.api.common.typeutils.base.StringSerializer .deserialize(StringSerializer.java:75) at org.apache.flink.api.common.typeutils.base.StringSerializer .deserialize(StringSerializer.java:33) at org.apache.flink.api.java.typeutils.runtime.PojoSerializer .deserialize(PojoSerializer.java:411) at org.apache.flink.api.java.typeutils.runtime.PojoSerializer .deserialize(PojoSerializer.java:411) at org.apache.flink.streaming.runtime.streamrecord. StreamElementSerializer.deserialize(StreamElementSerializer.java:202) at org.apache.flink.streaming.runtime.streamrecord. StreamElementSerializer.deserialize(StreamElementSerializer.java:46) at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate .read(NonReusingDeserializationDelegate.java:55) at org.apache.flink.runtime.io.network.api.serialization. SpillingAdaptiveSpanningRecordDeserializer.getNextRecord( SpillingAdaptiveSpanningRecordDeserializer.java:92) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput .emitNext(StreamTaskNetworkInput.java:145) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor .processInput(StreamOneInputProcessor.java:67) at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor .processInput(StreamTwoInputProcessor.java:92) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput( StreamTask.java:372) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor .runMailboxLoop(MailboxProcessor.java:186) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop( StreamTask.java:575) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask .java:539) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) at java.lang.Thread.run(Thread.java:748) It is also about serialize and deserialize, but not related to kryo this time. Till Rohrmann <trohrm...@apache.org> 于2021年2月3日周三 下午9:22写道: > From these snippets it is hard to tell what's going wrong. Could you maybe > give us a minimal example with which to reproduce the problem? > Alternatively, have you read through Flink's serializer documentation [1]? > Have you tried to use a simple POJO instead of inheriting from a HashMap? > > The stack trace looks as if the job fails deserializing some key of your > MapRecord map. > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/types_serialization.html#most-frequent-issues > > Cheers, > Till > > On Wed, Feb 3, 2021 at 11:49 AM 赵一旦 <hinobl...@gmail.com> wrote: > >> Some facts are possibly related with these, since another job do not meet >> these expectations. >> The problem job use a class which contains a field of Class MapRecord, >> and MapRecord is defined to extend HashMap so as to accept variable json >> data. >> >> Class MapRecord: >> >> @NoArgsConstructor >> @Slf4j >> public class MapRecord extends HashMap<Object, Object> implements >> Serializable { >> @Override >> public void setTimestamp(Long timestamp) { >> put("timestamp", timestamp); >> put("server_time", timestamp); >> } >> >> @Override >> public Long getTimestamp() { >> try { >> Object ts = getOrDefault("timestamp", >> getOrDefault("server_time", 0L)); >> return ((Number) Optional.ofNullable(ts).orElse(0L)).longValue(); >> } catch (Exception e) { >> log.error("Error, MapRecord's timestamp invalid.", e); >> return 0L; >> } >> } >> } >> >> Class UserAccessLog: >> >> public class UserAccessLog extends AbstractRecord<UserAccessLog> { >> private MapRecord d; // I think this is related to the problem... >> >> ... ... >> >> } >> >> >> 赵一旦 <hinobl...@gmail.com> 于2021年2月3日周三 下午6:43写道: >> >>> Actually the exception is different every time I stop the job. >>> Such as: >>> (1) com.esotericsoftware.kryo.KryoException: Unable to find class: g^XT >>> The stack as I given above. >>> >>> (2) java.lang.IndexOutOfBoundsException: Index: 46, Size: 17 >>> 2021-02-03 18:37:24 >>> java.lang.IndexOutOfBoundsException: Index: 46, Size: 17 >>> at java.util.ArrayList.rangeCheck(ArrayList.java:657) >>> at java.util.ArrayList.get(ArrayList.java:433) >>> at com.esotericsoftware.kryo.util.MapReferenceResolver >>> .getReadObject(MapReferenceResolver.java:42) >>> at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805) >>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759) >>> at com.esotericsoftware.kryo.serializers.MapSerializer.read( >>> MapSerializer.java:135) >>> at com.esotericsoftware.kryo.serializers.MapSerializer.read( >>> MapSerializer.java:21) >>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) >>> at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer >>> .deserialize(KryoSerializer.java:346) >>> at org.apache.flink.api.java.typeutils.runtime.PojoSerializer >>> .deserialize(PojoSerializer.java:411) >>> at org.apache.flink.streaming.runtime.streamrecord. >>> StreamElementSerializer.deserialize(StreamElementSerializer.java:202) >>> at org.apache.flink.streaming.runtime.streamrecord. >>> StreamElementSerializer.deserialize(StreamElementSerializer.java:46) >>> at org.apache.flink.runtime.plugable. >>> NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate >>> .java:55) >>> at org.apache.flink.runtime.io.network.api.serialization. >>> SpillingAdaptiveSpanningRecordDeserializer.getNextRecord( >>> SpillingAdaptiveSpanningRecordDeserializer.java:92) >>> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput >>> .emitNext(StreamTaskNetworkInput.java:145) >>> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor >>> .processInput(StreamOneInputProcessor.java:67) >>> at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor >>> .processInput(StreamTwoInputProcessor.java:92) >>> at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput( >>> StreamTask.java:372) >>> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor >>> .runMailboxLoop(MailboxProcessor.java:186) >>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>> .runMailboxLoop(StreamTask.java:575) >>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( >>> StreamTask.java:539) >>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) >>> at java.lang.Thread.run(Thread.java:748) >>> >>> (3) com.esotericsoftware.kryo.KryoException: Encountered unregistered >>> class ID: 96 >>> com.esotericsoftware.kryo.KryoException: Encountered unregistered class >>> ID: 96 >>> at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass( >>> DefaultClassResolver.java:119) >>> at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) >>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) >>> at com.esotericsoftware.kryo.serializers.MapSerializer.read( >>> MapSerializer.java:135) >>> at com.esotericsoftware.kryo.serializers.MapSerializer.read( >>> MapSerializer.java:21) >>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) >>> at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer >>> .deserialize(KryoSerializer.java:346) >>> at org.apache.flink.api.java.typeutils.runtime.PojoSerializer >>> .deserialize(PojoSerializer.java:411) >>> at org.apache.flink.streaming.runtime.streamrecord. >>> StreamElementSerializer.deserialize(StreamElementSerializer.java:202) >>> at org.apache.flink.streaming.runtime.streamrecord. >>> StreamElementSerializer.deserialize(StreamElementSerializer.java:46) >>> at org.apache.flink.runtime.plugable. >>> NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate >>> .java:55) >>> at org.apache.flink.runtime.io.network.api.serialization. >>> SpillingAdaptiveSpanningRecordDeserializer.getNextRecord( >>> SpillingAdaptiveSpanningRecordDeserializer.java:92) >>> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput >>> .emitNext(StreamTaskNetworkInput.java:145) >>> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor >>> .processInput(StreamOneInputProcessor.java:67) >>> at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor >>> .processInput(StreamTwoInputProcessor.java:92) >>> at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput( >>> StreamTask.java:372) >>> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor >>> .runMailboxLoop(MailboxProcessor.java:186) >>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>> .runMailboxLoop(StreamTask.java:575) >>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( >>> StreamTask.java:539) >>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) >>> at java.lang.Thread.run(Thread.java:748) >>> >>> ... >>> >>> Till Rohrmann <trohrm...@apache.org> 于2021年2月3日周三 下午6:28写道: >>> >>>> Hi, >>>> >>>> could you show us the job you are trying to resume? Is it a SQL job or >>>> a DataStream job, for example? >>>> >>>> From the stack trace, it looks as if the class g^XT is not on the class >>>> path. >>>> >>>> Cheers, >>>> Till >>>> >>>> On Wed, Feb 3, 2021 at 10:30 AM 赵一旦 <hinobl...@gmail.com> wrote: >>>> >>>>> I have a job, the checkpoint and savepoint all right. >>>>> But, if I stop the job using 'stop -p', after the savepoint generated, >>>>> then the job goes to fail. Here is the log: >>>>> >>>>> 2021-02-03 16:53:55,179 WARN >>>>> org.apache.flink.runtime.taskmanager.Task [] - >>>>> ual_ft_uid_subid_SidIncludeFilter -> ual_ft_uid_subid_Default >>>>> PassThroughFilter[null, null) -> ual_ft_uid_subid_UalUidFtExtractor -> >>>>> ual_ft_uid_subid_EmptyUidFilter (17/30)#0 >>>>> (46abce5d1148b56094726d442df2fd9c) switched >>>>> from RUNNING to FAILED. >>>>> >>>>> com.esotericsoftware.kryo.KryoException: Unable to find class: g^XT >>>>> at >>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138) >>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>> at >>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115) >>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>> at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) >>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>> at >>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) >>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>> at >>>>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) >>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>> at >>>>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) >>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>> at >>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) >>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>> at >>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346) >>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>> at >>>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411) >>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>> at >>>>> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202) >>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>> at >>>>> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46) >>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>> at >>>>> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) >>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>> at >>>>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:92) >>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>> at >>>>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145) >>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>> at >>>>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) >>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>> at >>>>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:92) >>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372) >>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186) >>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575) >>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539) >>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>> at >>>>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) >>>>> [flink-dist_2.11-1.12.0.jar:1.12.0] >>>>> at >>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) >>>>> [flink-dist_2.11-1.12.0.jar:1.12.0] >>>>> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_251] >>>>> Caused by: java.lang.ClassNotFoundException: g^XT >>>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:382) >>>>> ~[?:1.8.0_251] >>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:418) >>>>> ~[?:1.8.0_251] >>>>> at >>>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:63) >>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>> at >>>>> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:72) >>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>> at >>>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:49) >>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:351) >>>>> ~[?:1.8.0_251] >>>>> at >>>>> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:168) >>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>> at java.lang.Class.forName0(Native Method) ~[?:1.8.0_251] >>>>> at java.lang.Class.forName(Class.java:348) ~[?:1.8.0_251] >>>>> at >>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136) >>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>> ... 22 more >>>>> >>>>>