Hi all, I find that the failure always occurred in the second task, after
the source task. So I do something in the first chaining task, I transform
the 'Map' based class object to another normal class object, and the
problem disappeared.

Based on the new solution, I also tried to stop and restore job with
savepoint (all successful).

But, I also met another problem. Also this problem occurs while I stop the
job, and also occurs in the second task after the source task. The log is
below:
2021-02-05 16:21:26
java.io.EOFException
    at org.apache.flink.core.memory.DataInputDeserializer.readUnsignedByte(
DataInputDeserializer.java:321)
    at org.apache.flink.types.StringValue.readString(StringValue.java:783)
    at org.apache.flink.api.common.typeutils.base.StringSerializer
.deserialize(StringSerializer.java:75)
    at org.apache.flink.api.common.typeutils.base.StringSerializer
.deserialize(StringSerializer.java:33)
    at org.apache.flink.api.java.typeutils.runtime.PojoSerializer
.deserialize(PojoSerializer.java:411)
    at org.apache.flink.api.java.typeutils.runtime.PojoSerializer
.deserialize(PojoSerializer.java:411)
    at org.apache.flink.streaming.runtime.streamrecord.
StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
    at org.apache.flink.streaming.runtime.streamrecord.
StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
    at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate
.read(NonReusingDeserializationDelegate.java:55)
    at org.apache.flink.runtime.io.network.api.serialization.
SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(
SpillingAdaptiveSpanningRecordDeserializer.java:92)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
.emitNext(StreamTaskNetworkInput.java:145)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
.processInput(StreamOneInputProcessor.java:67)
    at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor
.processInput(StreamTwoInputProcessor.java:92)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
StreamTask.java:372)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
.runMailboxLoop(MailboxProcessor.java:186)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(
StreamTask.java:575)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
.java:539)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
    at java.lang.Thread.run(Thread.java:748)

It is also about serialize and deserialize, but not related to kryo this
time.


Till Rohrmann <trohrm...@apache.org> 于2021年2月3日周三 下午9:22写道:

> From these snippets it is hard to tell what's going wrong. Could you maybe
> give us a minimal example with which to reproduce the problem?
> Alternatively, have you read through Flink's serializer documentation [1]?
> Have you tried to use a simple POJO instead of inheriting from a HashMap?
>
> The stack trace looks as if the job fails deserializing some key of your
> MapRecord map.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/types_serialization.html#most-frequent-issues
>
> Cheers,
> Till
>
> On Wed, Feb 3, 2021 at 11:49 AM 赵一旦 <hinobl...@gmail.com> wrote:
>
>> Some facts are possibly related with these, since another job do not meet
>> these expectations.
>> The problem job use a class which contains a field of Class MapRecord,
>> and MapRecord is defined to extend HashMap so as to accept variable json
>> data.
>>
>> Class MapRecord:
>>
>> @NoArgsConstructor
>> @Slf4j
>> public class MapRecord extends HashMap<Object, Object> implements 
>> Serializable {
>>     @Override
>>     public void setTimestamp(Long timestamp) {
>>         put("timestamp", timestamp);
>>         put("server_time", timestamp);
>>     }
>>
>>     @Override
>>     public Long getTimestamp() {
>>         try {
>>             Object ts = getOrDefault("timestamp", 
>> getOrDefault("server_time", 0L));
>>             return ((Number) Optional.ofNullable(ts).orElse(0L)).longValue();
>>         } catch (Exception e) {
>>             log.error("Error, MapRecord's timestamp invalid.", e);
>>             return 0L;
>>         }
>>     }
>> }
>>
>> Class UserAccessLog:
>>
>> public class UserAccessLog extends AbstractRecord<UserAccessLog> {
>>     private MapRecord d;  // I think this is related to the problem...
>>
>>     ... ...
>>
>> }
>>
>>
>> 赵一旦 <hinobl...@gmail.com> 于2021年2月3日周三 下午6:43写道:
>>
>>> Actually the exception is different every time I stop the job.
>>> Such as:
>>> (1) com.esotericsoftware.kryo.KryoException: Unable to find class: g^XT
>>> The stack as I given above.
>>>
>>> (2) java.lang.IndexOutOfBoundsException: Index: 46, Size: 17
>>> 2021-02-03 18:37:24
>>> java.lang.IndexOutOfBoundsException: Index: 46, Size: 17
>>>     at java.util.ArrayList.rangeCheck(ArrayList.java:657)
>>>     at java.util.ArrayList.get(ArrayList.java:433)
>>>     at com.esotericsoftware.kryo.util.MapReferenceResolver
>>> .getReadObject(MapReferenceResolver.java:42)
>>>     at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
>>>     at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
>>>     at com.esotericsoftware.kryo.serializers.MapSerializer.read(
>>> MapSerializer.java:135)
>>>     at com.esotericsoftware.kryo.serializers.MapSerializer.read(
>>> MapSerializer.java:21)
>>>     at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>>     at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
>>> .deserialize(KryoSerializer.java:346)
>>>     at org.apache.flink.api.java.typeutils.runtime.PojoSerializer
>>> .deserialize(PojoSerializer.java:411)
>>>     at org.apache.flink.streaming.runtime.streamrecord.
>>> StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
>>>     at org.apache.flink.streaming.runtime.streamrecord.
>>> StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
>>>     at org.apache.flink.runtime.plugable.
>>> NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate
>>> .java:55)
>>>     at org.apache.flink.runtime.io.network.api.serialization.
>>> SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(
>>> SpillingAdaptiveSpanningRecordDeserializer.java:92)
>>>     at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
>>> .emitNext(StreamTaskNetworkInput.java:145)
>>>     at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
>>> .processInput(StreamOneInputProcessor.java:67)
>>>     at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor
>>> .processInput(StreamTwoInputProcessor.java:92)
>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
>>> StreamTask.java:372)
>>>     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
>>> .runMailboxLoop(MailboxProcessor.java:186)
>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask
>>> .runMailboxLoop(StreamTask.java:575)
>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>>> StreamTask.java:539)
>>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>>>     at java.lang.Thread.run(Thread.java:748)
>>>
>>> (3)  com.esotericsoftware.kryo.KryoException: Encountered unregistered
>>> class ID: 96
>>> com.esotericsoftware.kryo.KryoException: Encountered unregistered class
>>> ID: 96
>>>     at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(
>>> DefaultClassResolver.java:119)
>>>     at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>>>     at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>>>     at com.esotericsoftware.kryo.serializers.MapSerializer.read(
>>> MapSerializer.java:135)
>>>     at com.esotericsoftware.kryo.serializers.MapSerializer.read(
>>> MapSerializer.java:21)
>>>     at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>>     at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
>>> .deserialize(KryoSerializer.java:346)
>>>     at org.apache.flink.api.java.typeutils.runtime.PojoSerializer
>>> .deserialize(PojoSerializer.java:411)
>>>     at org.apache.flink.streaming.runtime.streamrecord.
>>> StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
>>>     at org.apache.flink.streaming.runtime.streamrecord.
>>> StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
>>>     at org.apache.flink.runtime.plugable.
>>> NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate
>>> .java:55)
>>>     at org.apache.flink.runtime.io.network.api.serialization.
>>> SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(
>>> SpillingAdaptiveSpanningRecordDeserializer.java:92)
>>>     at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
>>> .emitNext(StreamTaskNetworkInput.java:145)
>>>     at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
>>> .processInput(StreamOneInputProcessor.java:67)
>>>     at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor
>>> .processInput(StreamTwoInputProcessor.java:92)
>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
>>> StreamTask.java:372)
>>>     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
>>> .runMailboxLoop(MailboxProcessor.java:186)
>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask
>>> .runMailboxLoop(StreamTask.java:575)
>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>>> StreamTask.java:539)
>>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>>>     at java.lang.Thread.run(Thread.java:748)
>>>
>>> ...
>>>
>>> Till Rohrmann <trohrm...@apache.org> 于2021年2月3日周三 下午6:28写道:
>>>
>>>> Hi,
>>>>
>>>> could you show us the job you are trying to resume? Is it a SQL job or
>>>> a DataStream job, for example?
>>>>
>>>> From the stack trace, it looks as if the class g^XT is not on the class
>>>> path.
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Wed, Feb 3, 2021 at 10:30 AM 赵一旦 <hinobl...@gmail.com> wrote:
>>>>
>>>>> I have a job, the checkpoint and savepoint all right.
>>>>> But, if I stop the job using 'stop -p', after the savepoint generated,
>>>>> then the job goes to fail. Here is the log:
>>>>>
>>>>> 2021-02-03 16:53:55,179 WARN
>>>>>  org.apache.flink.runtime.taskmanager.Task                    [] -
>>>>> ual_ft_uid_subid_SidIncludeFilter -> ual_ft_uid_subid_Default
>>>>> PassThroughFilter[null, null) -> ual_ft_uid_subid_UalUidFtExtractor ->
>>>>> ual_ft_uid_subid_EmptyUidFilter (17/30)#0
>>>>> (46abce5d1148b56094726d442df2fd9c) switched
>>>>> from RUNNING to FAILED.
>>>>>
>>>>> com.esotericsoftware.kryo.KryoException: Unable to find class: g^XT
>>>>>         at
>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>         at
>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>         at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>         at
>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>         at
>>>>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>         at
>>>>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>         at
>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>         at
>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>         at
>>>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411)
>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>         at
>>>>> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>         at
>>>>> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>         at
>>>>> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>         at
>>>>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:92)
>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>         at
>>>>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145)
>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>         at
>>>>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>         at
>>>>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:92)
>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>         at
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>         at
>>>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>         at
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>         at
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>         at
>>>>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>>>>> [flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>         at
>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>>>>> [flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>         at java.lang.Thread.run(Thread.java:748) [?:1.8.0_251]
>>>>> Caused by: java.lang.ClassNotFoundException: g^XT
>>>>>         at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>>>>> ~[?:1.8.0_251]
>>>>>         at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
>>>>> ~[?:1.8.0_251]
>>>>>         at
>>>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:63)
>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>         at
>>>>> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:72)
>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>         at
>>>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:49)
>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>         at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
>>>>> ~[?:1.8.0_251]
>>>>>         at
>>>>> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:168)
>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>         at java.lang.Class.forName0(Native Method) ~[?:1.8.0_251]
>>>>>         at java.lang.Class.forName(Class.java:348) ~[?:1.8.0_251]
>>>>>         at
>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136)
>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>         ... 22 more
>>>>>
>>>>>

Reply via email to