Flink1.12.0; only using aligned checkpoint; Standalone Cluster;
Robert Metzger <rmetz...@apache.org> 于2021年2月5日周五 下午6:52写道: > Are you using unaligned checkpoints? (there's a known bug in 1.12.0 which > can lead to corrupted data when using UC) > Can you tell us a little bit about your environment? (How are you > deploying Flink, which state backend are you using, what kind of job (I > guess DataStream API)) > > Somehow the process receiving the data is unable to deserialize it, most > likely because they are configured differently (different classpath, > dependency versions etc.) > > On Fri, Feb 5, 2021 at 10:36 AM 赵一旦 <hinobl...@gmail.com> wrote: > >> I do not think this is some code related problem anymore, maybe it is >> some bug? >> >> 赵一旦 <hinobl...@gmail.com> 于2021年2月5日周五 下午4:30写道: >> >>> Hi all, I find that the failure always occurred in the second task, >>> after the source task. So I do something in the first chaining task, I >>> transform the 'Map' based class object to another normal class object, and >>> the problem disappeared. >>> >>> Based on the new solution, I also tried to stop and restore job with >>> savepoint (all successful). >>> >>> But, I also met another problem. Also this problem occurs while I stop >>> the job, and also occurs in the second task after the source task. The log >>> is below: >>> 2021-02-05 16:21:26 >>> java.io.EOFException >>> at org.apache.flink.core.memory.DataInputDeserializer >>> .readUnsignedByte(DataInputDeserializer.java:321) >>> at org.apache.flink.types.StringValue.readString(StringValue.java: >>> 783) >>> at org.apache.flink.api.common.typeutils.base.StringSerializer >>> .deserialize(StringSerializer.java:75) >>> at org.apache.flink.api.common.typeutils.base.StringSerializer >>> .deserialize(StringSerializer.java:33) >>> at org.apache.flink.api.java.typeutils.runtime.PojoSerializer >>> .deserialize(PojoSerializer.java:411) >>> at org.apache.flink.api.java.typeutils.runtime.PojoSerializer >>> .deserialize(PojoSerializer.java:411) >>> at org.apache.flink.streaming.runtime.streamrecord. >>> StreamElementSerializer.deserialize(StreamElementSerializer.java:202) >>> at org.apache.flink.streaming.runtime.streamrecord. >>> StreamElementSerializer.deserialize(StreamElementSerializer.java:46) >>> at org.apache.flink.runtime.plugable. >>> NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate >>> .java:55) >>> at org.apache.flink.runtime.io.network.api.serialization. >>> SpillingAdaptiveSpanningRecordDeserializer.getNextRecord( >>> SpillingAdaptiveSpanningRecordDeserializer.java:92) >>> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput >>> .emitNext(StreamTaskNetworkInput.java:145) >>> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor >>> .processInput(StreamOneInputProcessor.java:67) >>> at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor >>> .processInput(StreamTwoInputProcessor.java:92) >>> at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput( >>> StreamTask.java:372) >>> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor >>> .runMailboxLoop(MailboxProcessor.java:186) >>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>> .runMailboxLoop(StreamTask.java:575) >>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( >>> StreamTask.java:539) >>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) >>> at java.lang.Thread.run(Thread.java:748) >>> >>> It is also about serialize and deserialize, but not related to kryo this >>> time. >>> >>> >>> Till Rohrmann <trohrm...@apache.org> 于2021年2月3日周三 下午9:22写道: >>> >>>> From these snippets it is hard to tell what's going wrong. Could you >>>> maybe give us a minimal example with which to reproduce the problem? >>>> Alternatively, have you read through Flink's serializer documentation [1]? >>>> Have you tried to use a simple POJO instead of inheriting from a HashMap? >>>> >>>> The stack trace looks as if the job fails deserializing some key of >>>> your MapRecord map. >>>> >>>> [1] >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/types_serialization.html#most-frequent-issues >>>> >>>> Cheers, >>>> Till >>>> >>>> On Wed, Feb 3, 2021 at 11:49 AM 赵一旦 <hinobl...@gmail.com> wrote: >>>> >>>>> Some facts are possibly related with these, since another job do not >>>>> meet these expectations. >>>>> The problem job use a class which contains a field of Class MapRecord, >>>>> and MapRecord is defined to extend HashMap so as to accept variable json >>>>> data. >>>>> >>>>> Class MapRecord: >>>>> >>>>> @NoArgsConstructor >>>>> @Slf4j >>>>> public class MapRecord extends HashMap<Object, Object> implements >>>>> Serializable { >>>>> @Override >>>>> public void setTimestamp(Long timestamp) { >>>>> put("timestamp", timestamp); >>>>> put("server_time", timestamp); >>>>> } >>>>> >>>>> @Override >>>>> public Long getTimestamp() { >>>>> try { >>>>> Object ts = getOrDefault("timestamp", >>>>> getOrDefault("server_time", 0L)); >>>>> return ((Number) >>>>> Optional.ofNullable(ts).orElse(0L)).longValue(); >>>>> } catch (Exception e) { >>>>> log.error("Error, MapRecord's timestamp invalid.", e); >>>>> return 0L; >>>>> } >>>>> } >>>>> } >>>>> >>>>> Class UserAccessLog: >>>>> >>>>> public class UserAccessLog extends AbstractRecord<UserAccessLog> { >>>>> private MapRecord d; // I think this is related to the problem... >>>>> >>>>> ... ... >>>>> >>>>> } >>>>> >>>>> >>>>> 赵一旦 <hinobl...@gmail.com> 于2021年2月3日周三 下午6:43写道: >>>>> >>>>>> Actually the exception is different every time I stop the job. >>>>>> Such as: >>>>>> (1) com.esotericsoftware.kryo.KryoException: Unable to find class: >>>>>> g^XT >>>>>> The stack as I given above. >>>>>> >>>>>> (2) java.lang.IndexOutOfBoundsException: Index: 46, Size: 17 >>>>>> 2021-02-03 18:37:24 >>>>>> java.lang.IndexOutOfBoundsException: Index: 46, Size: 17 >>>>>> at java.util.ArrayList.rangeCheck(ArrayList.java:657) >>>>>> at java.util.ArrayList.get(ArrayList.java:433) >>>>>> at com.esotericsoftware.kryo.util.MapReferenceResolver >>>>>> .getReadObject(MapReferenceResolver.java:42) >>>>>> at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java: >>>>>> 805) >>>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java: >>>>>> 759) >>>>>> at com.esotericsoftware.kryo.serializers.MapSerializer.read( >>>>>> MapSerializer.java:135) >>>>>> at com.esotericsoftware.kryo.serializers.MapSerializer.read( >>>>>> MapSerializer.java:21) >>>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java: >>>>>> 761) >>>>>> at org.apache.flink.api.java.typeutils.runtime.kryo. >>>>>> KryoSerializer.deserialize(KryoSerializer.java:346) >>>>>> at org.apache.flink.api.java.typeutils.runtime.PojoSerializer >>>>>> .deserialize(PojoSerializer.java:411) >>>>>> at org.apache.flink.streaming.runtime.streamrecord. >>>>>> StreamElementSerializer.deserialize(StreamElementSerializer.java:202) >>>>>> at org.apache.flink.streaming.runtime.streamrecord. >>>>>> StreamElementSerializer.deserialize(StreamElementSerializer.java:46) >>>>>> at org.apache.flink.runtime.plugable. >>>>>> NonReusingDeserializationDelegate.read( >>>>>> NonReusingDeserializationDelegate.java:55) >>>>>> at org.apache.flink.runtime.io.network.api.serialization. >>>>>> SpillingAdaptiveSpanningRecordDeserializer.getNextRecord( >>>>>> SpillingAdaptiveSpanningRecordDeserializer.java:92) >>>>>> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput >>>>>> .emitNext(StreamTaskNetworkInput.java:145) >>>>>> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor >>>>>> .processInput(StreamOneInputProcessor.java:67) >>>>>> at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor >>>>>> .processInput(StreamTwoInputProcessor.java:92) >>>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>>>>> .processInput(StreamTask.java:372) >>>>>> at org.apache.flink.streaming.runtime.tasks.mailbox. >>>>>> MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186) >>>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>>>>> .runMailboxLoop(StreamTask.java:575) >>>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( >>>>>> StreamTask.java:539) >>>>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) >>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) >>>>>> at java.lang.Thread.run(Thread.java:748) >>>>>> >>>>>> (3) com.esotericsoftware.kryo.KryoException: Encountered >>>>>> unregistered class ID: 96 >>>>>> com.esotericsoftware.kryo.KryoException: Encountered unregistered >>>>>> class ID: 96 >>>>>> at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass( >>>>>> DefaultClassResolver.java:119) >>>>>> at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) >>>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java: >>>>>> 752) >>>>>> at com.esotericsoftware.kryo.serializers.MapSerializer.read( >>>>>> MapSerializer.java:135) >>>>>> at com.esotericsoftware.kryo.serializers.MapSerializer.read( >>>>>> MapSerializer.java:21) >>>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java: >>>>>> 761) >>>>>> at org.apache.flink.api.java.typeutils.runtime.kryo. >>>>>> KryoSerializer.deserialize(KryoSerializer.java:346) >>>>>> at org.apache.flink.api.java.typeutils.runtime.PojoSerializer >>>>>> .deserialize(PojoSerializer.java:411) >>>>>> at org.apache.flink.streaming.runtime.streamrecord. >>>>>> StreamElementSerializer.deserialize(StreamElementSerializer.java:202) >>>>>> at org.apache.flink.streaming.runtime.streamrecord. >>>>>> StreamElementSerializer.deserialize(StreamElementSerializer.java:46) >>>>>> at org.apache.flink.runtime.plugable. >>>>>> NonReusingDeserializationDelegate.read( >>>>>> NonReusingDeserializationDelegate.java:55) >>>>>> at org.apache.flink.runtime.io.network.api.serialization. >>>>>> SpillingAdaptiveSpanningRecordDeserializer.getNextRecord( >>>>>> SpillingAdaptiveSpanningRecordDeserializer.java:92) >>>>>> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput >>>>>> .emitNext(StreamTaskNetworkInput.java:145) >>>>>> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor >>>>>> .processInput(StreamOneInputProcessor.java:67) >>>>>> at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor >>>>>> .processInput(StreamTwoInputProcessor.java:92) >>>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>>>>> .processInput(StreamTask.java:372) >>>>>> at org.apache.flink.streaming.runtime.tasks.mailbox. >>>>>> MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186) >>>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>>>>> .runMailboxLoop(StreamTask.java:575) >>>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( >>>>>> StreamTask.java:539) >>>>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) >>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) >>>>>> at java.lang.Thread.run(Thread.java:748) >>>>>> >>>>>> ... >>>>>> >>>>>> Till Rohrmann <trohrm...@apache.org> 于2021年2月3日周三 下午6:28写道: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> could you show us the job you are trying to resume? Is it a SQL job >>>>>>> or a DataStream job, for example? >>>>>>> >>>>>>> From the stack trace, it looks as if the class g^XT is not on the >>>>>>> class path. >>>>>>> >>>>>>> Cheers, >>>>>>> Till >>>>>>> >>>>>>> On Wed, Feb 3, 2021 at 10:30 AM 赵一旦 <hinobl...@gmail.com> wrote: >>>>>>> >>>>>>>> I have a job, the checkpoint and savepoint all right. >>>>>>>> But, if I stop the job using 'stop -p', after the savepoint >>>>>>>> generated, then the job goes to fail. Here is the log: >>>>>>>> >>>>>>>> 2021-02-03 16:53:55,179 WARN >>>>>>>> org.apache.flink.runtime.taskmanager.Task [] - >>>>>>>> ual_ft_uid_subid_SidIncludeFilter -> ual_ft_uid_subid_Default >>>>>>>> PassThroughFilter[null, null) -> ual_ft_uid_subid_UalUidFtExtractor >>>>>>>> -> ual_ft_uid_subid_EmptyUidFilter (17/30)#0 >>>>>>>> (46abce5d1148b56094726d442df2fd9c) switched >>>>>>>> from RUNNING to FAILED. >>>>>>>> >>>>>>>> com.esotericsoftware.kryo.KryoException: Unable to find class: g^XT >>>>>>>> at >>>>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138) >>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>> at >>>>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115) >>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>> at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) >>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>> at >>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) >>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>> at >>>>>>>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) >>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>> at >>>>>>>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) >>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>> at >>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) >>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>> at >>>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346) >>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>> at >>>>>>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411) >>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>> at >>>>>>>> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202) >>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>> at >>>>>>>> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46) >>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>> at >>>>>>>> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) >>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>> at >>>>>>>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:92) >>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>> at >>>>>>>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145) >>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>> at >>>>>>>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) >>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>> at >>>>>>>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:92) >>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>> at >>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372) >>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>> at >>>>>>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186) >>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>> at >>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575) >>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>> at >>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539) >>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>> at >>>>>>>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) >>>>>>>> [flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>> at >>>>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) >>>>>>>> [flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_251] >>>>>>>> Caused by: java.lang.ClassNotFoundException: g^XT >>>>>>>> at >>>>>>>> java.net.URLClassLoader.findClass(URLClassLoader.java:382) >>>>>>>> ~[?:1.8.0_251] >>>>>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:418) >>>>>>>> ~[?:1.8.0_251] >>>>>>>> at >>>>>>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:63) >>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>> at >>>>>>>> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:72) >>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>> at >>>>>>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:49) >>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:351) >>>>>>>> ~[?:1.8.0_251] >>>>>>>> at >>>>>>>> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:168) >>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>> at java.lang.Class.forName0(Native Method) ~[?:1.8.0_251] >>>>>>>> at java.lang.Class.forName(Class.java:348) ~[?:1.8.0_251] >>>>>>>> at >>>>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136) >>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>> ... 22 more >>>>>>>> >>>>>>>>