The first problem is critical, since the savepoint do not work. The second problem, in which I changed the solution, removed the 'Map' based implementation before the data are transformed to the second task, and this case savepoint works. The only problem is that, I should stop the job and remember the savepoint path, then restart job with the savepoint path. And now it is : I stop the job, then the job failed and restart automatically with the generated savepoint. So I do not need to restart the job anymore, since what it does automatically is what I want to do.
I have some idea that maybe it is also related to the data? So I am not sure that I can provide an example to reproduces the problem. Till Rohrmann <trohrm...@apache.org> 于2021年2月6日周六 上午12:13写道: > Could you provide us with a minimal working example which reproduces the > problem for you? This would be super helpful in figuring out the problem > you are experiencing. Thanks a lot for your help. > > Cheers, > Till > > On Fri, Feb 5, 2021 at 1:03 PM 赵一旦 <hinobl...@gmail.com> wrote: > >> Yeah, and if it is different, why my job runs normally. The problem only >> occurres when I stop it. >> >> Robert Metzger <rmetz...@apache.org> 于2021年2月5日周五 下午7:08写道: >> >>> Are you 100% sure that the jar files in the classpath (/lib folder) are >>> exactly the same on all machines? (It can happen quite easily in a >>> distributed standalone setup that some files are different) >>> >>> >>> On Fri, Feb 5, 2021 at 12:00 PM 赵一旦 <hinobl...@gmail.com> wrote: >>> >>>> Flink1.12.0; only using aligned checkpoint; Standalone Cluster; >>>> >>>> >>>> >>>> Robert Metzger <rmetz...@apache.org> 于2021年2月5日周五 下午6:52写道: >>>> >>>>> Are you using unaligned checkpoints? (there's a known bug in 1.12.0 >>>>> which can lead to corrupted data when using UC) >>>>> Can you tell us a little bit about your environment? (How are you >>>>> deploying Flink, which state backend are you using, what kind of job (I >>>>> guess DataStream API)) >>>>> >>>>> Somehow the process receiving the data is unable to deserialize it, >>>>> most likely because they are configured differently (different classpath, >>>>> dependency versions etc.) >>>>> >>>>> On Fri, Feb 5, 2021 at 10:36 AM 赵一旦 <hinobl...@gmail.com> wrote: >>>>> >>>>>> I do not think this is some code related problem anymore, maybe it is >>>>>> some bug? >>>>>> >>>>>> 赵一旦 <hinobl...@gmail.com> 于2021年2月5日周五 下午4:30写道: >>>>>> >>>>>>> Hi all, I find that the failure always occurred in the second task, >>>>>>> after the source task. So I do something in the first chaining task, I >>>>>>> transform the 'Map' based class object to another normal class object, >>>>>>> and >>>>>>> the problem disappeared. >>>>>>> >>>>>>> Based on the new solution, I also tried to stop and restore job with >>>>>>> savepoint (all successful). >>>>>>> >>>>>>> But, I also met another problem. Also this problem occurs while I >>>>>>> stop the job, and also occurs in the second task after the source task. >>>>>>> The >>>>>>> log is below: >>>>>>> 2021-02-05 16:21:26 >>>>>>> java.io.EOFException >>>>>>> at org.apache.flink.core.memory.DataInputDeserializer >>>>>>> .readUnsignedByte(DataInputDeserializer.java:321) >>>>>>> at org.apache.flink.types.StringValue.readString(StringValue >>>>>>> .java:783) >>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer >>>>>>> .deserialize(StringSerializer.java:75) >>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer >>>>>>> .deserialize(StringSerializer.java:33) >>>>>>> at org.apache.flink.api.java.typeutils.runtime.PojoSerializer >>>>>>> .deserialize(PojoSerializer.java:411) >>>>>>> at org.apache.flink.api.java.typeutils.runtime.PojoSerializer >>>>>>> .deserialize(PojoSerializer.java:411) >>>>>>> at org.apache.flink.streaming.runtime.streamrecord. >>>>>>> StreamElementSerializer.deserialize(StreamElementSerializer.java:202 >>>>>>> ) >>>>>>> at org.apache.flink.streaming.runtime.streamrecord. >>>>>>> StreamElementSerializer.deserialize(StreamElementSerializer.java:46) >>>>>>> at org.apache.flink.runtime.plugable. >>>>>>> NonReusingDeserializationDelegate.read( >>>>>>> NonReusingDeserializationDelegate.java:55) >>>>>>> at org.apache.flink.runtime.io.network.api.serialization. >>>>>>> SpillingAdaptiveSpanningRecordDeserializer.getNextRecord( >>>>>>> SpillingAdaptiveSpanningRecordDeserializer.java:92) >>>>>>> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput >>>>>>> .emitNext(StreamTaskNetworkInput.java:145) >>>>>>> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor >>>>>>> .processInput(StreamOneInputProcessor.java:67) >>>>>>> at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor >>>>>>> .processInput(StreamTwoInputProcessor.java:92) >>>>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>>>>>> .processInput(StreamTask.java:372) >>>>>>> at org.apache.flink.streaming.runtime.tasks.mailbox. >>>>>>> MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186) >>>>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>>>>>> .runMailboxLoop(StreamTask.java:575) >>>>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( >>>>>>> StreamTask.java:539) >>>>>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722 >>>>>>> ) >>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) >>>>>>> at java.lang.Thread.run(Thread.java:748) >>>>>>> >>>>>>> It is also about serialize and deserialize, but not related to kryo >>>>>>> this time. >>>>>>> >>>>>>> >>>>>>> Till Rohrmann <trohrm...@apache.org> 于2021年2月3日周三 下午9:22写道: >>>>>>> >>>>>>>> From these snippets it is hard to tell what's going wrong. Could >>>>>>>> you maybe give us a minimal example with which to reproduce the >>>>>>>> problem? >>>>>>>> Alternatively, have you read through Flink's serializer documentation >>>>>>>> [1]? >>>>>>>> Have you tried to use a simple POJO instead of inheriting from a >>>>>>>> HashMap? >>>>>>>> >>>>>>>> The stack trace looks as if the job fails deserializing some key of >>>>>>>> your MapRecord map. >>>>>>>> >>>>>>>> [1] >>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/types_serialization.html#most-frequent-issues >>>>>>>> >>>>>>>> Cheers, >>>>>>>> Till >>>>>>>> >>>>>>>> On Wed, Feb 3, 2021 at 11:49 AM 赵一旦 <hinobl...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Some facts are possibly related with these, since another job do >>>>>>>>> not meet these expectations. >>>>>>>>> The problem job use a class which contains a field of Class >>>>>>>>> MapRecord, and MapRecord is defined to extend HashMap so as to >>>>>>>>> accept variable json data. >>>>>>>>> >>>>>>>>> Class MapRecord: >>>>>>>>> >>>>>>>>> @NoArgsConstructor >>>>>>>>> @Slf4j >>>>>>>>> public class MapRecord extends HashMap<Object, Object> implements >>>>>>>>> Serializable { >>>>>>>>> @Override >>>>>>>>> public void setTimestamp(Long timestamp) { >>>>>>>>> put("timestamp", timestamp); >>>>>>>>> put("server_time", timestamp); >>>>>>>>> } >>>>>>>>> >>>>>>>>> @Override >>>>>>>>> public Long getTimestamp() { >>>>>>>>> try { >>>>>>>>> Object ts = getOrDefault("timestamp", >>>>>>>>> getOrDefault("server_time", 0L)); >>>>>>>>> return ((Number) >>>>>>>>> Optional.ofNullable(ts).orElse(0L)).longValue(); >>>>>>>>> } catch (Exception e) { >>>>>>>>> log.error("Error, MapRecord's timestamp invalid.", e); >>>>>>>>> return 0L; >>>>>>>>> } >>>>>>>>> } >>>>>>>>> } >>>>>>>>> >>>>>>>>> Class UserAccessLog: >>>>>>>>> >>>>>>>>> public class UserAccessLog extends AbstractRecord<UserAccessLog> { >>>>>>>>> private MapRecord d; // I think this is related to the problem... >>>>>>>>> >>>>>>>>> ... ... >>>>>>>>> >>>>>>>>> } >>>>>>>>> >>>>>>>>> >>>>>>>>> 赵一旦 <hinobl...@gmail.com> 于2021年2月3日周三 下午6:43写道: >>>>>>>>> >>>>>>>>>> Actually the exception is different every time I stop the job. >>>>>>>>>> Such as: >>>>>>>>>> (1) com.esotericsoftware.kryo.KryoException: Unable to find >>>>>>>>>> class: g^XT >>>>>>>>>> The stack as I given above. >>>>>>>>>> >>>>>>>>>> (2) java.lang.IndexOutOfBoundsException: Index: 46, Size: 17 >>>>>>>>>> 2021-02-03 18:37:24 >>>>>>>>>> java.lang.IndexOutOfBoundsException: Index: 46, Size: 17 >>>>>>>>>> at java.util.ArrayList.rangeCheck(ArrayList.java:657) >>>>>>>>>> at java.util.ArrayList.get(ArrayList.java:433) >>>>>>>>>> at com.esotericsoftware.kryo.util.MapReferenceResolver >>>>>>>>>> .getReadObject(MapReferenceResolver.java:42) >>>>>>>>>> at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo >>>>>>>>>> .java:805) >>>>>>>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo >>>>>>>>>> .java:759) >>>>>>>>>> at com.esotericsoftware.kryo.serializers.MapSerializer.read( >>>>>>>>>> MapSerializer.java:135) >>>>>>>>>> at com.esotericsoftware.kryo.serializers.MapSerializer.read( >>>>>>>>>> MapSerializer.java:21) >>>>>>>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo >>>>>>>>>> .java:761) >>>>>>>>>> at org.apache.flink.api.java.typeutils.runtime.kryo. >>>>>>>>>> KryoSerializer.deserialize(KryoSerializer.java:346) >>>>>>>>>> at org.apache.flink.api.java.typeutils.runtime.PojoSerializer >>>>>>>>>> .deserialize(PojoSerializer.java:411) >>>>>>>>>> at org.apache.flink.streaming.runtime.streamrecord. >>>>>>>>>> StreamElementSerializer.deserialize(StreamElementSerializer.java: >>>>>>>>>> 202) >>>>>>>>>> at org.apache.flink.streaming.runtime.streamrecord. >>>>>>>>>> StreamElementSerializer.deserialize(StreamElementSerializer.java: >>>>>>>>>> 46) >>>>>>>>>> at org.apache.flink.runtime.plugable. >>>>>>>>>> NonReusingDeserializationDelegate.read( >>>>>>>>>> NonReusingDeserializationDelegate.java:55) >>>>>>>>>> at org.apache.flink.runtime.io.network.api.serialization. >>>>>>>>>> SpillingAdaptiveSpanningRecordDeserializer.getNextRecord( >>>>>>>>>> SpillingAdaptiveSpanningRecordDeserializer.java:92) >>>>>>>>>> at org.apache.flink.streaming.runtime.io. >>>>>>>>>> StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145) >>>>>>>>>> at org.apache.flink.streaming.runtime.io. >>>>>>>>>> StreamOneInputProcessor.processInput(StreamOneInputProcessor >>>>>>>>>> .java:67) >>>>>>>>>> at org.apache.flink.streaming.runtime.io. >>>>>>>>>> StreamTwoInputProcessor.processInput(StreamTwoInputProcessor >>>>>>>>>> .java:92) >>>>>>>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>>>>>>>>> .processInput(StreamTask.java:372) >>>>>>>>>> at org.apache.flink.streaming.runtime.tasks.mailbox. >>>>>>>>>> MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186) >>>>>>>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>>>>>>>>> .runMailboxLoop(StreamTask.java:575) >>>>>>>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>>>>>>>>> .invoke(StreamTask.java:539) >>>>>>>>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java: >>>>>>>>>> 722) >>>>>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java: >>>>>>>>>> 547) >>>>>>>>>> at java.lang.Thread.run(Thread.java:748) >>>>>>>>>> >>>>>>>>>> (3) com.esotericsoftware.kryo.KryoException: Encountered >>>>>>>>>> unregistered class ID: 96 >>>>>>>>>> com.esotericsoftware.kryo.KryoException: Encountered >>>>>>>>>> unregistered class ID: 96 >>>>>>>>>> at com.esotericsoftware.kryo.util.DefaultClassResolver >>>>>>>>>> .readClass(DefaultClassResolver.java:119) >>>>>>>>>> at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) >>>>>>>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo >>>>>>>>>> .java:752) >>>>>>>>>> at com.esotericsoftware.kryo.serializers.MapSerializer.read( >>>>>>>>>> MapSerializer.java:135) >>>>>>>>>> at com.esotericsoftware.kryo.serializers.MapSerializer.read( >>>>>>>>>> MapSerializer.java:21) >>>>>>>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo >>>>>>>>>> .java:761) >>>>>>>>>> at org.apache.flink.api.java.typeutils.runtime.kryo. >>>>>>>>>> KryoSerializer.deserialize(KryoSerializer.java:346) >>>>>>>>>> at org.apache.flink.api.java.typeutils.runtime.PojoSerializer >>>>>>>>>> .deserialize(PojoSerializer.java:411) >>>>>>>>>> at org.apache.flink.streaming.runtime.streamrecord. >>>>>>>>>> StreamElementSerializer.deserialize(StreamElementSerializer.java: >>>>>>>>>> 202) >>>>>>>>>> at org.apache.flink.streaming.runtime.streamrecord. >>>>>>>>>> StreamElementSerializer.deserialize(StreamElementSerializer.java: >>>>>>>>>> 46) >>>>>>>>>> at org.apache.flink.runtime.plugable. >>>>>>>>>> NonReusingDeserializationDelegate.read( >>>>>>>>>> NonReusingDeserializationDelegate.java:55) >>>>>>>>>> at org.apache.flink.runtime.io.network.api.serialization. >>>>>>>>>> SpillingAdaptiveSpanningRecordDeserializer.getNextRecord( >>>>>>>>>> SpillingAdaptiveSpanningRecordDeserializer.java:92) >>>>>>>>>> at org.apache.flink.streaming.runtime.io. >>>>>>>>>> StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145) >>>>>>>>>> at org.apache.flink.streaming.runtime.io. >>>>>>>>>> StreamOneInputProcessor.processInput(StreamOneInputProcessor >>>>>>>>>> .java:67) >>>>>>>>>> at org.apache.flink.streaming.runtime.io. >>>>>>>>>> StreamTwoInputProcessor.processInput(StreamTwoInputProcessor >>>>>>>>>> .java:92) >>>>>>>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>>>>>>>>> .processInput(StreamTask.java:372) >>>>>>>>>> at org.apache.flink.streaming.runtime.tasks.mailbox. >>>>>>>>>> MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186) >>>>>>>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>>>>>>>>> .runMailboxLoop(StreamTask.java:575) >>>>>>>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>>>>>>>>> .invoke(StreamTask.java:539) >>>>>>>>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java: >>>>>>>>>> 722) >>>>>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java: >>>>>>>>>> 547) >>>>>>>>>> at java.lang.Thread.run(Thread.java:748) >>>>>>>>>> >>>>>>>>>> ... >>>>>>>>>> >>>>>>>>>> Till Rohrmann <trohrm...@apache.org> 于2021年2月3日周三 下午6:28写道: >>>>>>>>>> >>>>>>>>>>> Hi, >>>>>>>>>>> >>>>>>>>>>> could you show us the job you are trying to resume? Is it a SQL >>>>>>>>>>> job or a DataStream job, for example? >>>>>>>>>>> >>>>>>>>>>> From the stack trace, it looks as if the class g^XT is not on >>>>>>>>>>> the class path. >>>>>>>>>>> >>>>>>>>>>> Cheers, >>>>>>>>>>> Till >>>>>>>>>>> >>>>>>>>>>> On Wed, Feb 3, 2021 at 10:30 AM 赵一旦 <hinobl...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> I have a job, the checkpoint and savepoint all right. >>>>>>>>>>>> But, if I stop the job using 'stop -p', after the savepoint >>>>>>>>>>>> generated, then the job goes to fail. Here is the log: >>>>>>>>>>>> >>>>>>>>>>>> 2021-02-03 16:53:55,179 WARN >>>>>>>>>>>> org.apache.flink.runtime.taskmanager.Task [] - >>>>>>>>>>>> ual_ft_uid_subid_SidIncludeFilter -> ual_ft_uid_subid_Default >>>>>>>>>>>> PassThroughFilter[null, null) -> >>>>>>>>>>>> ual_ft_uid_subid_UalUidFtExtractor -> >>>>>>>>>>>> ual_ft_uid_subid_EmptyUidFilter >>>>>>>>>>>> (17/30)#0 (46abce5d1148b56094726d442df2fd9c) switched >>>>>>>>>>>> from RUNNING to FAILED. >>>>>>>>>>>> >>>>>>>>>>>> com.esotericsoftware.kryo.KryoException: Unable to find class: >>>>>>>>>>>> g^XT >>>>>>>>>>>> at >>>>>>>>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138) >>>>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>>>>> at >>>>>>>>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115) >>>>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>>>>> at >>>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) >>>>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>>>>> at >>>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) >>>>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>>>>> at >>>>>>>>>>>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) >>>>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>>>>> at >>>>>>>>>>>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) >>>>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>>>>> at >>>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) >>>>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346) >>>>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411) >>>>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202) >>>>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46) >>>>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) >>>>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:92) >>>>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145) >>>>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) >>>>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:92) >>>>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372) >>>>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186) >>>>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575) >>>>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539) >>>>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) >>>>>>>>>>>> [flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) >>>>>>>>>>>> [flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>>>>> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_251] >>>>>>>>>>>> Caused by: java.lang.ClassNotFoundException: g^XT >>>>>>>>>>>> at >>>>>>>>>>>> java.net.URLClassLoader.findClass(URLClassLoader.java:382) >>>>>>>>>>>> ~[?:1.8.0_251] >>>>>>>>>>>> at >>>>>>>>>>>> java.lang.ClassLoader.loadClass(ClassLoader.java:418) >>>>>>>>>>>> ~[?:1.8.0_251] >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:63) >>>>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:72) >>>>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:49) >>>>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>>>>> at >>>>>>>>>>>> java.lang.ClassLoader.loadClass(ClassLoader.java:351) >>>>>>>>>>>> ~[?:1.8.0_251] >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:168) >>>>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>>>>> at java.lang.Class.forName0(Native Method) >>>>>>>>>>>> ~[?:1.8.0_251] >>>>>>>>>>>> at java.lang.Class.forName(Class.java:348) >>>>>>>>>>>> ~[?:1.8.0_251] >>>>>>>>>>>> at >>>>>>>>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136) >>>>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>>>>> ... 22 more >>>>>>>>>>>> >>>>>>>>>>>>