[ https://issues.apache.org/jira/browse/FLINK-31356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17702651#comment-17702651 ]
John commented on FLINK-31356: ------------------------------ I know the reason through debugging. My old pojo does not have a parameterless constructor, and the system uses *KryoSerializer* for serialization. After I add the parameterless constructor, the system uses *PojoSerializer* for serialization, but the property *_subclassSerializerCache_* still retains that my pojo should be deserialized with {*}KryoSerializer{*}, which causes problems. {code:java} TypeSerializer<?> getSubclassSerializer(Class<?> subclass) { TypeSerializer<?> result = subclassSerializerCache.get(subclass); if (result == null) { result = createSubclassSerializer(subclass); subclassSerializerCache.put(subclass, result); } return result; } {code} normally by removing this cache when restoring, I hope the official can improve this problem later {code:java} PojoSerializer( Class<T> clazz, Field[] fields, TypeSerializer<Object>[] fieldSerializers, LinkedHashMap<Class<?>, Integer> registeredClasses, TypeSerializer<?>[] registeredSerializers, Map<Class<?>, TypeSerializer<?>> subclassSerializerCache, ExecutionConfig executionConfig) { this.clazz = checkNotNull(clazz); this.fields = checkNotNull(fields); this.numFields = fields.length; this.fieldSerializers = checkNotNull(fieldSerializers); this.registeredClasses = checkNotNull(registeredClasses); this.registeredSerializers = checkNotNull(registeredSerializers); this.subclassSerializerCache = checkNotNull(subclassSerializerCache); this.subclassSerializerCache.entrySet().removeIf(next -> "xxx".equals(next.getKey().getName())); this.executionConfig = checkNotNull(executionConfig); this.cl = Thread.currentThread().getContextClassLoader(); } {code} > Serialize garbled characters at checkpoint > ------------------------------------------ > > Key: FLINK-31356 > URL: https://issues.apache.org/jira/browse/FLINK-31356 > Project: Flink > Issue Type: Bug > Components: API / Type Serialization System > Affects Versions: 1.13.6 > Reporter: John > Priority: Major > > > {panel:title=The last checkpoint of the program was successful} > 2023-03-07 08:33:16,085 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering > checkpoint 39126 (type=CHECKPOINT) @ 1678149196059 for job > 8b5720a4a40f50b995c97c6fe5b93079. > 2023-03-07 08:33:16,918 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed > checkpoint 39126 for job 8b5720a4a40f50b995c97c6fe5b93079 (71251394 bytes in > 849 ms). > 2023-03-07 08:33:16,918 INFO > org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Marking > checkpoint 39126 as completed for source Source: kafkaDataStream. > 2023-03-07 08:36:10,444 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Sink: > mysqlSink (1/2) (898af6700ac9cd087c763cef0b5585d4) switched from RUNNING to > FAILED on container_e38_1676011848026_0012_01_000002 @ xxxx (dataPort=44633). > java.lang.RuntimeException: Writing records to JDBC failed. > at > org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.checkFlushException(JdbcBatchingOutputFormat.java:153) > ~[flink-connector-jdbc_2.11-1.13.6.jar:1.13.6] > {panel} > {panel:title=But from this checkpoint restore, it can't be decoded} > Caused by: org.apache.flink.util.FlinkException: Could not restore keyed > state backend for WindowOperator_a1b6a20a1eb2801464c79c8d018a24d1_(1/2) from > any of the 1 provided restore options. > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160) > ~[flink-dist_2.11-1.13.6.jar:1.13.6] > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345) > ~[flink-dist_2.11-1.13.6.jar:1.13.6] > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163) > ~[flink-dist_2.11-1.13.6.jar:1.13.6] > ... 10 more > Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed > when trying to restore heap backend > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.restoreState(HeapKeyedStateBackendBuilder.java:177) > ~[flink-dist_2.11-1.13.6.jar:1.13.6] > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:111) > ~[flink-dist_2.11-1.13.6.jar:1.13.6] > at > org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createKeyedStateBackend(HashMapStateBackend.java:131) > ~[flink-dist_2.11-1.13.6.jar:1.13.6] > at > org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createKeyedStateBackend(HashMapStateBackend.java:73) > ~[flink-dist_2.11-1.13.6.jar:1.13.6] > at > org.apache.flink.runtime.state.StateBackend.createKeyedStateBackend(StateBackend.java:136) > ~[flink-dist_2.11-1.13.6.jar:1.13.6] > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328) > ~[flink-dist_2.11-1.13.6.jar:1.13.6] > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) > ~[flink-dist_2.11-1.13.6.jar:1.13.6] > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) > ~[flink-dist_2.11-1.13.6.jar:1.13.6] > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345) > ~[flink-dist_2.11-1.13.6.jar:1.13.6] > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163) > ~[flink-dist_2.11-1.13.6.jar:1.13.6] > ... 10 more > Caused by: java.io.UTFDataFormatException: malformed input around byte 32 > at java.io.DataInputStream.readUTF(DataInputStream.java:656) > ~[?:1.8.0_201] > at java.io.DataInputStream.readUTF(DataInputStream.java:564) > ~[?:1.8.0_201] > at > org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:379) > ~[flink-core-1.13.6.jar:1.13.6] > at > org.apache.flink.api.common.typeutils.base.MapSerializer.deserialize(MapSerializer.java:155) > ~[flink-core-1.13.6.jar:1.13.6] > at > org.apache.flink.api.common.typeutils.base.MapSerializer.deserialize(MapSerializer.java:43) > ~[flink-core-1.13.6.jar:1.13.6] > at > org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders.lambda$createV2PlusReader$0(StateTableByKeyGroupReaders.java:79) > ~[flink-dist_2.11-1.13.6.jar:1.13.6] > at > org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:297) > ~[flink-dist_2.11-1.13.6.jar:1.13.6] > at > org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:258) > ~[flink-dist_2.11-1.13.6.jar:1.13.6] > at > org.apache.flink.runtime.state.heap.HeapRestoreOperation.readStateHandleStateData(HeapRestoreOperation.java:220) > ~[flink-dist_2.11-1.13.6.jar:1.13.6] > at > org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:166) > ~[flink-dist_2.11-1.13.6.jar:1.13.6] > at > org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:62) > ~[flink-dist_2.11-1.13.6.jar:1.13.6] > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.restoreState(HeapKeyedStateBackendBuilder.java:174) > ~[flink-dist_2.11-1.13.6.jar:1.13.6] > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:111) > ~[flink-dist_2.11-1.13.6.jar:1.13.6] > at > org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createKeyedStateBackend(HashMapStateBackend.java:131) > ~[flink-dist_2.11-1.13.6.jar:1.13.6] > at > org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createKeyedStateBackend(HashMapStateBackend.java:73) > ~[flink-dist_2.11-1.13.6.jar:1.13.6] > at > org.apache.flink.runtime.state.StateBackend.createKeyedStateBackend(StateBackend.java:136) > ~[flink-dist_2.11-1.13.6.jar:1.13.6] > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328) > ~[flink-dist_2.11-1.13.6.jar:1.13.6] > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) > ~[flink-dist_2.11-1.13.6.jar:1.13.6] > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) > ~[flink-dist_2.11-1.13.6.jar:1.13.6] > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345) > ~[flink-dist_2.11-1.13.6.jar:1.13.6] > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163) > ~[flink-dist_2.11-1.13.6.jar:1.13.6] > {panel} > -- This message was sent by Atlassian Jira (v8.20.10#820010)