In the past, we have infrequently had issues with corrupted timer state (see https://issues.apache.org/jira/browse/FLINK-23886), which resolve, when we go back to a previous savepoint and restart the job from that. Recently (since we upgraded to Flink 1.19.1), we have seen more state corruption issues for checkpoint/savepoint data in RocksDB. Each time, the resolution has been to go back to a previous savepoint and replay events and everything was fine. Issues we have seen:
* Exceptions when deserializing MapState entries, with an EOFException at the start of key deserialization: j.io.EOFException: null at o.a.f.c.m.DataInputDeserializer.readUnsignedShort(DataInputDeserializer.java:339) at o.a.f.c.m.DataInputDeserializer.readUTF(DataInputDeserializer.java:251) at c.g.a.f.o.t.s.ObservationKeySerializer.deserialize(ObservationKeySerializer.java:68) at c.g.a.f.o.t.s.ObservationKeySerializer.deserialize(ObservationKeySerializer.java:17) at o.a.f.c.s.s.RocksDBMapState.deserializeUserKey(RocksDBMapState.java:390) at o.a.f.c.s.s.RocksDBMapState.access$000(RocksDBMapState.java:66) at o.a.f.c.s.s.RocksDBMapState$RocksDBMapEntry.getKey(RocksDBMapState.java:493) ... 27 common frames omitted Wrapped by: o.a.f.u.FlinkRuntimeException: Error while deserializing the user key. at o.a.f.c.s.s.RocksDBMapState$RocksDBMapEntry.getKey(RocksDBMapState.java:496) at c.g.a.f.o.t.ObservationsWrapper.filterObservations(ObservationsWrapper.java:83) * Same with ValueState, with an EOFException immediatel when deserializing java.io.EOFException: null at o.a.f.c.m.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:329) at o.a.f.t.StringValue.readString(StringValue.java:781) at o.a.f.a.c.t.b.StringSerializer.deserialize(StringSerializer.java:73) at o.a.f.a.c.t.b.StringSerializer.deserialize(StringSerializer.java:31) at o.a.f.a.j.t.r.PojoSerializer.deserialize(PojoSerializer.java:463) at o.a.f.c.s.s.RocksDBValueState.value(RocksDBValueState.java:88) ... 23 common frames omitted * Exceptions for operators with multiple ValueStates, where deserialization fails due to type issues: java.lang.IllegalArgumentException: Can not set java.util.Map field com.genesys.analytics.flink.user.processing.window.AggregateStatusStats.durationStats to java.lang.Double at java.base/jdk.internal.reflect.UnsafeFieldAccessorImpl.throwSetIllegalArgumentException(UnsafeFieldAccessorImpl.java:167) at java.base/jdk.internal.reflect.UnsafeFieldAccessorImpl.throwSetIllegalArgumentException(UnsafeFieldAccessorImpl.java:171) at java.base/jdk.internal.reflect.UnsafeObjectFieldAccessorImpl.set(UnsafeObjectFieldAccessorImpl.java:81) at java.base/java.lang.reflect.Field.set(Field.java:799) at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:465) at org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:88) This one almost looks like one ValueState<A> overwrote the other ValueState<B> Our setup is Flink 1.19.1 deployed on AWS EC2 with RocksDB as the state backend and S3 for checkpoint/savepoint storage. We’ve had very few issues with this since 2017, but multiple incidents in the last few months. Is there anything that changed on the Flink side? Peter Westermann Director, Analytics Software Architect Genesys Cloud - Analytics [signature_1853126556] peter.westerm...@genesys.com<mailto:richard.sch...@genesys.com> [signature_1853126556] [signature_414073906]<http://www.genesys.com/> [signature_1141770227]<https://twitter.com/Genesys>[signature_1333433069]<http://www.linkedin.com/company/601919?trk=tyah>[signature_769303212]<https://plus.google.com/+Genesyslab?rel=publisher>[signature_561365254]<https://www.facebook.com/Genesys/>[signature_262758434]<https://www.youtube.com/Genesys>[signature_2090310858]<http://blog.genesys.com/>