It also maybe have something to do with my job's first tasks. The second task have two input, one is the kafka source stream(A), another is self-defined mysql source as broadcast stream.(B) In A: I have a 'WatermarkReAssigner', a self-defined operator which add an offset to its input watermark and then forward to downstream. In B: The parallelism is 30, but in my rich function's implementation, only the subtask-0 will do mysql query and send out records, other subtasks do nothing. All subtasks will send max_watermark - 86400_000 as the watermark. Since both the first task have some self-defined source or implementation, I do not know whether the problem have something to do with it.
赵一旦 <hinobl...@gmail.com> 于2021年2月7日周日 下午4:05写道: > The first problem is critical, since the savepoint do not work. > The second problem, in which I changed the solution, removed the 'Map' > based implementation before the data are transformed to the second task, > and this case savepoint works. The only problem is that, I should stop the > job and remember the savepoint path, then restart job with the savepoint > path. And now it is : I stop the job, then the job failed and restart > automatically with the generated savepoint. So I do not need to restart > the job anymore, since what it does automatically is what I want to do. > > I have some idea that maybe it is also related to the data? So I am not > sure that I can provide an example to reproduces the problem. > > Till Rohrmann <trohrm...@apache.org> 于2021年2月6日周六 上午12:13写道: > >> Could you provide us with a minimal working example which reproduces the >> problem for you? This would be super helpful in figuring out the problem >> you are experiencing. Thanks a lot for your help. >> >> Cheers, >> Till >> >> On Fri, Feb 5, 2021 at 1:03 PM 赵一旦 <hinobl...@gmail.com> wrote: >> >>> Yeah, and if it is different, why my job runs normally. The problem >>> only occurres when I stop it. >>> >>> Robert Metzger <rmetz...@apache.org> 于2021年2月5日周五 下午7:08写道: >>> >>>> Are you 100% sure that the jar files in the classpath (/lib folder) are >>>> exactly the same on all machines? (It can happen quite easily in a >>>> distributed standalone setup that some files are different) >>>> >>>> >>>> On Fri, Feb 5, 2021 at 12:00 PM 赵一旦 <hinobl...@gmail.com> wrote: >>>> >>>>> Flink1.12.0; only using aligned checkpoint; Standalone Cluster; >>>>> >>>>> >>>>> >>>>> Robert Metzger <rmetz...@apache.org> 于2021年2月5日周五 下午6:52写道: >>>>> >>>>>> Are you using unaligned checkpoints? (there's a known bug in 1.12.0 >>>>>> which can lead to corrupted data when using UC) >>>>>> Can you tell us a little bit about your environment? (How are you >>>>>> deploying Flink, which state backend are you using, what kind of job (I >>>>>> guess DataStream API)) >>>>>> >>>>>> Somehow the process receiving the data is unable to deserialize it, >>>>>> most likely because they are configured differently (different classpath, >>>>>> dependency versions etc.) >>>>>> >>>>>> On Fri, Feb 5, 2021 at 10:36 AM 赵一旦 <hinobl...@gmail.com> wrote: >>>>>> >>>>>>> I do not think this is some code related problem anymore, maybe it >>>>>>> is some bug? >>>>>>> >>>>>>> 赵一旦 <hinobl...@gmail.com> 于2021年2月5日周五 下午4:30写道: >>>>>>> >>>>>>>> Hi all, I find that the failure always occurred in the second task, >>>>>>>> after the source task. So I do something in the first chaining task, I >>>>>>>> transform the 'Map' based class object to another normal class object, >>>>>>>> and >>>>>>>> the problem disappeared. >>>>>>>> >>>>>>>> Based on the new solution, I also tried to stop and restore job >>>>>>>> with savepoint (all successful). >>>>>>>> >>>>>>>> But, I also met another problem. Also this problem occurs while I >>>>>>>> stop the job, and also occurs in the second task after the source >>>>>>>> task. The >>>>>>>> log is below: >>>>>>>> 2021-02-05 16:21:26 >>>>>>>> java.io.EOFException >>>>>>>> at org.apache.flink.core.memory.DataInputDeserializer >>>>>>>> .readUnsignedByte(DataInputDeserializer.java:321) >>>>>>>> at org.apache.flink.types.StringValue.readString(StringValue >>>>>>>> .java:783) >>>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer >>>>>>>> .deserialize(StringSerializer.java:75) >>>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer >>>>>>>> .deserialize(StringSerializer.java:33) >>>>>>>> at org.apache.flink.api.java.typeutils.runtime.PojoSerializer >>>>>>>> .deserialize(PojoSerializer.java:411) >>>>>>>> at org.apache.flink.api.java.typeutils.runtime.PojoSerializer >>>>>>>> .deserialize(PojoSerializer.java:411) >>>>>>>> at org.apache.flink.streaming.runtime.streamrecord. >>>>>>>> StreamElementSerializer.deserialize(StreamElementSerializer.java: >>>>>>>> 202) >>>>>>>> at org.apache.flink.streaming.runtime.streamrecord. >>>>>>>> StreamElementSerializer.deserialize(StreamElementSerializer.java:46 >>>>>>>> ) >>>>>>>> at org.apache.flink.runtime.plugable. >>>>>>>> NonReusingDeserializationDelegate.read( >>>>>>>> NonReusingDeserializationDelegate.java:55) >>>>>>>> at org.apache.flink.runtime.io.network.api.serialization. >>>>>>>> SpillingAdaptiveSpanningRecordDeserializer.getNextRecord( >>>>>>>> SpillingAdaptiveSpanningRecordDeserializer.java:92) >>>>>>>> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput >>>>>>>> .emitNext(StreamTaskNetworkInput.java:145) >>>>>>>> at org.apache.flink.streaming.runtime.io. >>>>>>>> StreamOneInputProcessor.processInput(StreamOneInputProcessor.java: >>>>>>>> 67) >>>>>>>> at org.apache.flink.streaming.runtime.io. >>>>>>>> StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java: >>>>>>>> 92) >>>>>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>>>>>>> .processInput(StreamTask.java:372) >>>>>>>> at org.apache.flink.streaming.runtime.tasks.mailbox. >>>>>>>> MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186) >>>>>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>>>>>>> .runMailboxLoop(StreamTask.java:575) >>>>>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( >>>>>>>> StreamTask.java:539) >>>>>>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java: >>>>>>>> 722) >>>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) >>>>>>>> at java.lang.Thread.run(Thread.java:748) >>>>>>>> >>>>>>>> It is also about serialize and deserialize, but not related to kryo >>>>>>>> this time. >>>>>>>> >>>>>>>> >>>>>>>> Till Rohrmann <trohrm...@apache.org> 于2021年2月3日周三 下午9:22写道: >>>>>>>> >>>>>>>>> From these snippets it is hard to tell what's going wrong. Could >>>>>>>>> you maybe give us a minimal example with which to reproduce the >>>>>>>>> problem? >>>>>>>>> Alternatively, have you read through Flink's serializer documentation >>>>>>>>> [1]? >>>>>>>>> Have you tried to use a simple POJO instead of inheriting from a >>>>>>>>> HashMap? >>>>>>>>> >>>>>>>>> The stack trace looks as if the job fails deserializing some key >>>>>>>>> of your MapRecord map. >>>>>>>>> >>>>>>>>> [1] >>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/types_serialization.html#most-frequent-issues >>>>>>>>> >>>>>>>>> Cheers, >>>>>>>>> Till >>>>>>>>> >>>>>>>>> On Wed, Feb 3, 2021 at 11:49 AM 赵一旦 <hinobl...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Some facts are possibly related with these, since another job do >>>>>>>>>> not meet these expectations. >>>>>>>>>> The problem job use a class which contains a field of Class >>>>>>>>>> MapRecord, and MapRecord is defined to extend HashMap so as to >>>>>>>>>> accept variable json data. >>>>>>>>>> >>>>>>>>>> Class MapRecord: >>>>>>>>>> >>>>>>>>>> @NoArgsConstructor >>>>>>>>>> @Slf4j >>>>>>>>>> public class MapRecord extends HashMap<Object, Object> implements >>>>>>>>>> Serializable { >>>>>>>>>> @Override >>>>>>>>>> public void setTimestamp(Long timestamp) { >>>>>>>>>> put("timestamp", timestamp); >>>>>>>>>> put("server_time", timestamp); >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> @Override >>>>>>>>>> public Long getTimestamp() { >>>>>>>>>> try { >>>>>>>>>> Object ts = getOrDefault("timestamp", >>>>>>>>>> getOrDefault("server_time", 0L)); >>>>>>>>>> return ((Number) >>>>>>>>>> Optional.ofNullable(ts).orElse(0L)).longValue(); >>>>>>>>>> } catch (Exception e) { >>>>>>>>>> log.error("Error, MapRecord's timestamp invalid.", e); >>>>>>>>>> return 0L; >>>>>>>>>> } >>>>>>>>>> } >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> Class UserAccessLog: >>>>>>>>>> >>>>>>>>>> public class UserAccessLog extends AbstractRecord<UserAccessLog> { >>>>>>>>>> private MapRecord d; // I think this is related to the >>>>>>>>>> problem... >>>>>>>>>> >>>>>>>>>> ... ... >>>>>>>>>> >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> 赵一旦 <hinobl...@gmail.com> 于2021年2月3日周三 下午6:43写道: >>>>>>>>>> >>>>>>>>>>> Actually the exception is different every time I stop the job. >>>>>>>>>>> Such as: >>>>>>>>>>> (1) com.esotericsoftware.kryo.KryoException: Unable to find >>>>>>>>>>> class: g^XT >>>>>>>>>>> The stack as I given above. >>>>>>>>>>> >>>>>>>>>>> (2) java.lang.IndexOutOfBoundsException: Index: 46, Size: 17 >>>>>>>>>>> 2021-02-03 18:37:24 >>>>>>>>>>> java.lang.IndexOutOfBoundsException: Index: 46, Size: 17 >>>>>>>>>>> at java.util.ArrayList.rangeCheck(ArrayList.java:657) >>>>>>>>>>> at java.util.ArrayList.get(ArrayList.java:433) >>>>>>>>>>> at com.esotericsoftware.kryo.util.MapReferenceResolver >>>>>>>>>>> .getReadObject(MapReferenceResolver.java:42) >>>>>>>>>>> at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo >>>>>>>>>>> .java:805) >>>>>>>>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo >>>>>>>>>>> .java:759) >>>>>>>>>>> at com.esotericsoftware.kryo.serializers.MapSerializer.read( >>>>>>>>>>> MapSerializer.java:135) >>>>>>>>>>> at com.esotericsoftware.kryo.serializers.MapSerializer.read( >>>>>>>>>>> MapSerializer.java:21) >>>>>>>>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo >>>>>>>>>>> .java:761) >>>>>>>>>>> at org.apache.flink.api.java.typeutils.runtime.kryo. >>>>>>>>>>> KryoSerializer.deserialize(KryoSerializer.java:346) >>>>>>>>>>> at org.apache.flink.api.java.typeutils.runtime. >>>>>>>>>>> PojoSerializer.deserialize(PojoSerializer.java:411) >>>>>>>>>>> at org.apache.flink.streaming.runtime.streamrecord. >>>>>>>>>>> StreamElementSerializer.deserialize(StreamElementSerializer >>>>>>>>>>> .java:202) >>>>>>>>>>> at org.apache.flink.streaming.runtime.streamrecord. >>>>>>>>>>> StreamElementSerializer.deserialize(StreamElementSerializer >>>>>>>>>>> .java:46) >>>>>>>>>>> at org.apache.flink.runtime.plugable. >>>>>>>>>>> NonReusingDeserializationDelegate.read( >>>>>>>>>>> NonReusingDeserializationDelegate.java:55) >>>>>>>>>>> at org.apache.flink.runtime.io.network.api.serialization. >>>>>>>>>>> SpillingAdaptiveSpanningRecordDeserializer.getNextRecord( >>>>>>>>>>> SpillingAdaptiveSpanningRecordDeserializer.java:92) >>>>>>>>>>> at org.apache.flink.streaming.runtime.io. >>>>>>>>>>> StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145) >>>>>>>>>>> at org.apache.flink.streaming.runtime.io. >>>>>>>>>>> StreamOneInputProcessor.processInput(StreamOneInputProcessor >>>>>>>>>>> .java:67) >>>>>>>>>>> at org.apache.flink.streaming.runtime.io. >>>>>>>>>>> StreamTwoInputProcessor.processInput(StreamTwoInputProcessor >>>>>>>>>>> .java:92) >>>>>>>>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>>>>>>>>>> .processInput(StreamTask.java:372) >>>>>>>>>>> at org.apache.flink.streaming.runtime.tasks.mailbox. >>>>>>>>>>> MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186) >>>>>>>>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>>>>>>>>>> .runMailboxLoop(StreamTask.java:575) >>>>>>>>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>>>>>>>>>> .invoke(StreamTask.java:539) >>>>>>>>>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task >>>>>>>>>>> .java:722) >>>>>>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java: >>>>>>>>>>> 547) >>>>>>>>>>> at java.lang.Thread.run(Thread.java:748) >>>>>>>>>>> >>>>>>>>>>> (3) com.esotericsoftware.kryo.KryoException: Encountered >>>>>>>>>>> unregistered class ID: 96 >>>>>>>>>>> com.esotericsoftware.kryo.KryoException: Encountered >>>>>>>>>>> unregistered class ID: 96 >>>>>>>>>>> at com.esotericsoftware.kryo.util.DefaultClassResolver >>>>>>>>>>> .readClass(DefaultClassResolver.java:119) >>>>>>>>>>> at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) >>>>>>>>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo >>>>>>>>>>> .java:752) >>>>>>>>>>> at com.esotericsoftware.kryo.serializers.MapSerializer.read( >>>>>>>>>>> MapSerializer.java:135) >>>>>>>>>>> at com.esotericsoftware.kryo.serializers.MapSerializer.read( >>>>>>>>>>> MapSerializer.java:21) >>>>>>>>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo >>>>>>>>>>> .java:761) >>>>>>>>>>> at org.apache.flink.api.java.typeutils.runtime.kryo. >>>>>>>>>>> KryoSerializer.deserialize(KryoSerializer.java:346) >>>>>>>>>>> at org.apache.flink.api.java.typeutils.runtime. >>>>>>>>>>> PojoSerializer.deserialize(PojoSerializer.java:411) >>>>>>>>>>> at org.apache.flink.streaming.runtime.streamrecord. >>>>>>>>>>> StreamElementSerializer.deserialize(StreamElementSerializer >>>>>>>>>>> .java:202) >>>>>>>>>>> at org.apache.flink.streaming.runtime.streamrecord. >>>>>>>>>>> StreamElementSerializer.deserialize(StreamElementSerializer >>>>>>>>>>> .java:46) >>>>>>>>>>> at org.apache.flink.runtime.plugable. >>>>>>>>>>> NonReusingDeserializationDelegate.read( >>>>>>>>>>> NonReusingDeserializationDelegate.java:55) >>>>>>>>>>> at org.apache.flink.runtime.io.network.api.serialization. >>>>>>>>>>> SpillingAdaptiveSpanningRecordDeserializer.getNextRecord( >>>>>>>>>>> SpillingAdaptiveSpanningRecordDeserializer.java:92) >>>>>>>>>>> at org.apache.flink.streaming.runtime.io. >>>>>>>>>>> StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145) >>>>>>>>>>> at org.apache.flink.streaming.runtime.io. >>>>>>>>>>> StreamOneInputProcessor.processInput(StreamOneInputProcessor >>>>>>>>>>> .java:67) >>>>>>>>>>> at org.apache.flink.streaming.runtime.io. >>>>>>>>>>> StreamTwoInputProcessor.processInput(StreamTwoInputProcessor >>>>>>>>>>> .java:92) >>>>>>>>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>>>>>>>>>> .processInput(StreamTask.java:372) >>>>>>>>>>> at org.apache.flink.streaming.runtime.tasks.mailbox. >>>>>>>>>>> MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186) >>>>>>>>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>>>>>>>>>> .runMailboxLoop(StreamTask.java:575) >>>>>>>>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>>>>>>>>>> .invoke(StreamTask.java:539) >>>>>>>>>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task >>>>>>>>>>> .java:722) >>>>>>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java: >>>>>>>>>>> 547) >>>>>>>>>>> at java.lang.Thread.run(Thread.java:748) >>>>>>>>>>> >>>>>>>>>>> ... >>>>>>>>>>> >>>>>>>>>>> Till Rohrmann <trohrm...@apache.org> 于2021年2月3日周三 下午6:28写道: >>>>>>>>>>> >>>>>>>>>>>> Hi, >>>>>>>>>>>> >>>>>>>>>>>> could you show us the job you are trying to resume? Is it a SQL >>>>>>>>>>>> job or a DataStream job, for example? >>>>>>>>>>>> >>>>>>>>>>>> From the stack trace, it looks as if the class g^XT is not on >>>>>>>>>>>> the class path. >>>>>>>>>>>> >>>>>>>>>>>> Cheers, >>>>>>>>>>>> Till >>>>>>>>>>>> >>>>>>>>>>>> On Wed, Feb 3, 2021 at 10:30 AM 赵一旦 <hinobl...@gmail.com> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> I have a job, the checkpoint and savepoint all right. >>>>>>>>>>>>> But, if I stop the job using 'stop -p', after the savepoint >>>>>>>>>>>>> generated, then the job goes to fail. Here is the log: >>>>>>>>>>>>> >>>>>>>>>>>>> 2021-02-03 16:53:55,179 WARN >>>>>>>>>>>>> org.apache.flink.runtime.taskmanager.Task [] - >>>>>>>>>>>>> ual_ft_uid_subid_SidIncludeFilter -> ual_ft_uid_subid_Default >>>>>>>>>>>>> PassThroughFilter[null, null) -> >>>>>>>>>>>>> ual_ft_uid_subid_UalUidFtExtractor -> >>>>>>>>>>>>> ual_ft_uid_subid_EmptyUidFilter >>>>>>>>>>>>> (17/30)#0 (46abce5d1148b56094726d442df2fd9c) switched >>>>>>>>>>>>> from RUNNING to FAILED. >>>>>>>>>>>>> >>>>>>>>>>>>> com.esotericsoftware.kryo.KryoException: Unable to find class: >>>>>>>>>>>>> g^XT >>>>>>>>>>>>> at >>>>>>>>>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138) >>>>>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>>>>>> at >>>>>>>>>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115) >>>>>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>>>>>> at >>>>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) >>>>>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>>>>>> at >>>>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) >>>>>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>>>>>> at >>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) >>>>>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>>>>>> at >>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) >>>>>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>>>>>> at >>>>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) >>>>>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346) >>>>>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411) >>>>>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202) >>>>>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46) >>>>>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) >>>>>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:92) >>>>>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145) >>>>>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) >>>>>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:92) >>>>>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372) >>>>>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186) >>>>>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575) >>>>>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539) >>>>>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) >>>>>>>>>>>>> [flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) >>>>>>>>>>>>> [flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>>>>>> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_251] >>>>>>>>>>>>> Caused by: java.lang.ClassNotFoundException: g^XT >>>>>>>>>>>>> at >>>>>>>>>>>>> java.net.URLClassLoader.findClass(URLClassLoader.java:382) >>>>>>>>>>>>> ~[?:1.8.0_251] >>>>>>>>>>>>> at >>>>>>>>>>>>> java.lang.ClassLoader.loadClass(ClassLoader.java:418) >>>>>>>>>>>>> ~[?:1.8.0_251] >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:63) >>>>>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:72) >>>>>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:49) >>>>>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>>>>>> at >>>>>>>>>>>>> java.lang.ClassLoader.loadClass(ClassLoader.java:351) >>>>>>>>>>>>> ~[?:1.8.0_251] >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:168) >>>>>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>>>>>> at java.lang.Class.forName0(Native Method) >>>>>>>>>>>>> ~[?:1.8.0_251] >>>>>>>>>>>>> at java.lang.Class.forName(Class.java:348) >>>>>>>>>>>>> ~[?:1.8.0_251] >>>>>>>>>>>>> at >>>>>>>>>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136) >>>>>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>>>>>> ... 22 more >>>>>>>>>>>>> >>>>>>>>>>>>>