Si Chen created FLINK-22608: ------------------------------- Summary: Flink Kryo deserialize read wrong bytes Key: FLINK-22608 URL: https://issues.apache.org/jira/browse/FLINK-22608 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing, Runtime / State Backends Affects Versions: 1.12.0 Reporter: Si Chen
In flink program, I use ValueState to save my state. The state stores pojo. But my pojo used kryo serializer. As the program run some time, I add a field in pojo. Then recovery the program with checkpoint. I found the value of the field incorrect. Then I read the source code I found {code:java} //代码占位符 org.apache.flink.runtime.state.heap.HeapRestoreOperation#readStateHandleStateData private void readStateHandleStateData( FSDataInputStream fsDataInputStream, DataInputViewStreamWrapper inView, KeyGroupRangeOffsets keyGroupOffsets, Map<Integer, StateMetaInfoSnapshot> kvStatesById, int numStates, int readVersion, boolean isCompressed) throws IOException { final StreamCompressionDecorator streamCompressionDecorator = isCompressed ? SnappyStreamCompressionDecorator.INSTANCE : UncompressedStreamCompressionDecorator.INSTANCE; for (Tuple2<Integer, Long> groupOffset : keyGroupOffsets) { int keyGroupIndex = groupOffset.f0; long offset = groupOffset.f1; // Check that restored key groups all belong to the backend. Preconditions.checkState(keyGroupRange.contains(keyGroupIndex), "The key group must belong to the backend."); fsDataInputStream.seek(offset); int writtenKeyGroupIndex = inView.readInt(); Preconditions.checkState(writtenKeyGroupIndex == keyGroupIndex, "Unexpected key-group in restore."); try (InputStream kgCompressionInStream = streamCompressionDecorator.decorateWithCompression(fsDataInputStream)) { readKeyGroupStateData( kgCompressionInStream, kvStatesById, keyGroupIndex, numStates, readVersion); } } } {code} my state keyGroupIndex is 81, and keyGroupOffset is 3572. And the next keyGroupOffset is 3611. So my state offset rang is 3572 to 3611. But when I add new field in pojo. Kryo will read more bytes in the next keyGroup. -- This message was sent by Atlassian Jira (v8.3.4#803005)