In the past, we have infrequently had issues with corrupted timer state (see
https://issues.apache.org/jira/browse/FLINK-23886), which resolve, when we go
back to a previous savepoint and restart the job from that. Recently (since we
upgraded to Flink 1.19.1), we have seen more state corruption issues for
checkpoint/savepoint data in RocksDB. Each time, the resolution has been to go
back to a previous savepoint and replay events and everything was fine.
Issues we have seen:
* Exceptions when deserializing MapState entries, with an EOFException at
the start of key deserialization:
j.io.EOFException: null
at
o.a.f.c.m.DataInputDeserializer.readUnsignedShort(DataInputDeserializer.java:339)
at o.a.f.c.m.DataInputDeserializer.readUTF(DataInputDeserializer.java:251)
at
c.g.a.f.o.t.s.ObservationKeySerializer.deserialize(ObservationKeySerializer.java:68)
at
c.g.a.f.o.t.s.ObservationKeySerializer.deserialize(ObservationKeySerializer.java:17)
at o.a.f.c.s.s.RocksDBMapState.deserializeUserKey(RocksDBMapState.java:390)
at o.a.f.c.s.s.RocksDBMapState.access$000(RocksDBMapState.java:66)
at
o.a.f.c.s.s.RocksDBMapState$RocksDBMapEntry.getKey(RocksDBMapState.java:493)
... 27 common frames omitted
Wrapped by: o.a.f.u.FlinkRuntimeException: Error while deserializing the user
key.
at
o.a.f.c.s.s.RocksDBMapState$RocksDBMapEntry.getKey(RocksDBMapState.java:496)
at
c.g.a.f.o.t.ObservationsWrapper.filterObservations(ObservationsWrapper.java:83)
* Same with ValueState, with an EOFException immediatel when deserializing
java.io.EOFException: null
at
o.a.f.c.m.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:329)
at o.a.f.t.StringValue.readString(StringValue.java:781)
at o.a.f.a.c.t.b.StringSerializer.deserialize(StringSerializer.java:73)
at o.a.f.a.c.t.b.StringSerializer.deserialize(StringSerializer.java:31)
at o.a.f.a.j.t.r.PojoSerializer.deserialize(PojoSerializer.java:463)
at o.a.f.c.s.s.RocksDBValueState.value(RocksDBValueState.java:88)
... 23 common frames omitted
* Exceptions for operators with multiple ValueStates, where deserialization
fails due to type issues:
java.lang.IllegalArgumentException: Can not set java.util.Map field
com.genesys.analytics.flink.user.processing.window.AggregateStatusStats.durationStats
to java.lang.Double
at
java.base/jdk.internal.reflect.UnsafeFieldAccessorImpl.throwSetIllegalArgumentException(UnsafeFieldAccessorImpl.java:167)
at
java.base/jdk.internal.reflect.UnsafeFieldAccessorImpl.throwSetIllegalArgumentException(UnsafeFieldAccessorImpl.java:171)
at
java.base/jdk.internal.reflect.UnsafeObjectFieldAccessorImpl.set(UnsafeObjectFieldAccessorImpl.java:81)
at java.base/java.lang.reflect.Field.set(Field.java:799)
at
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:465)
at
org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:88)
This one almost looks like one ValueState<A> overwrote the other ValueState<B>
Our setup is Flink 1.19.1 deployed on AWS EC2 with RocksDB as the state backend
and S3 for checkpoint/savepoint storage. We’ve had very few issues with this
since 2017, but multiple incidents in the last few months. Is there anything
that changed on the Flink side?
Peter Westermann
Director, Analytics Software Architect
Genesys Cloud - Analytics
[signature_1853126556]
[email protected]<mailto:[email protected]>
[signature_1853126556]
[signature_414073906]<http://www.genesys.com/>
[signature_1141770227]<https://twitter.com/Genesys>[signature_1333433069]<http://www.linkedin.com/company/601919?trk=tyah>[signature_769303212]<https://plus.google.com/+Genesyslab?rel=publisher>[signature_561365254]<https://www.facebook.com/Genesys/>[signature_262758434]<https://www.youtube.com/Genesys>[signature_2090310858]<http://blog.genesys.com/>