In the past, we have infrequently had issues with corrupted timer state (see 
https://issues.apache.org/jira/browse/FLINK-23886), which resolve, when we go 
back to a previous savepoint and restart the job from that. Recently (since we 
upgraded to Flink 1.19.1), we have seen more state corruption issues for 
checkpoint/savepoint data in RocksDB. Each time, the resolution has been to go 
back to a previous savepoint and replay events and everything was fine.
Issues we have seen:

  *   Exceptions when deserializing MapState entries, with an EOFException at 
the start of key deserialization:
j.io.EOFException: null
     at 
o.a.f.c.m.DataInputDeserializer.readUnsignedShort(DataInputDeserializer.java:339)
     at o.a.f.c.m.DataInputDeserializer.readUTF(DataInputDeserializer.java:251)
     at 
c.g.a.f.o.t.s.ObservationKeySerializer.deserialize(ObservationKeySerializer.java:68)
     at 
c.g.a.f.o.t.s.ObservationKeySerializer.deserialize(ObservationKeySerializer.java:17)
     at o.a.f.c.s.s.RocksDBMapState.deserializeUserKey(RocksDBMapState.java:390)
     at o.a.f.c.s.s.RocksDBMapState.access$000(RocksDBMapState.java:66)
     at 
o.a.f.c.s.s.RocksDBMapState$RocksDBMapEntry.getKey(RocksDBMapState.java:493)
     ... 27 common frames omitted
Wrapped by: o.a.f.u.FlinkRuntimeException: Error while deserializing the user 
key.
     at 
o.a.f.c.s.s.RocksDBMapState$RocksDBMapEntry.getKey(RocksDBMapState.java:496)
     at 
c.g.a.f.o.t.ObservationsWrapper.filterObservations(ObservationsWrapper.java:83)

  *   Same with ValueState, with an EOFException immediatel when deserializing
java.io.EOFException: null

at 
o.a.f.c.m.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:329)

at o.a.f.t.StringValue.readString(StringValue.java:781)

at o.a.f.a.c.t.b.StringSerializer.deserialize(StringSerializer.java:73)

at o.a.f.a.c.t.b.StringSerializer.deserialize(StringSerializer.java:31)

at o.a.f.a.j.t.r.PojoSerializer.deserialize(PojoSerializer.java:463)

at o.a.f.c.s.s.RocksDBValueState.value(RocksDBValueState.java:88)

... 23 common frames omitted

  *   Exceptions for operators with multiple ValueStates, where deserialization 
fails due to type issues:
java.lang.IllegalArgumentException: Can not set java.util.Map field 
com.genesys.analytics.flink.user.processing.window.AggregateStatusStats.durationStats
 to java.lang.Double
    at 
java.base/jdk.internal.reflect.UnsafeFieldAccessorImpl.throwSetIllegalArgumentException(UnsafeFieldAccessorImpl.java:167)
    at 
java.base/jdk.internal.reflect.UnsafeFieldAccessorImpl.throwSetIllegalArgumentException(UnsafeFieldAccessorImpl.java:171)
    at 
java.base/jdk.internal.reflect.UnsafeObjectFieldAccessorImpl.set(UnsafeObjectFieldAccessorImpl.java:81)
    at java.base/java.lang.reflect.Field.set(Field.java:799)
    at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:465)
    at 
org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:88)
This one almost looks like one ValueState<A>  overwrote the other ValueState<B>

Our setup is Flink 1.19.1 deployed on AWS EC2 with RocksDB as the state backend 
and S3 for checkpoint/savepoint storage. We’ve had very few issues with this 
since 2017, but multiple incidents in the last few months. Is there anything 
that changed on the Flink side?


Peter Westermann
Director, Analytics Software Architect
Genesys Cloud - Analytics
[signature_1853126556]
peter.westerm...@genesys.com<mailto:richard.sch...@genesys.com>
[signature_1853126556]
[signature_414073906]<http://www.genesys.com/>
[signature_1141770227]<https://twitter.com/Genesys>[signature_1333433069]<http://www.linkedin.com/company/601919?trk=tyah>[signature_769303212]<https://plus.google.com/+Genesyslab?rel=publisher>[signature_561365254]<https://www.facebook.com/Genesys/>[signature_262758434]<https://www.youtube.com/Genesys>[signature_2090310858]<http://blog.genesys.com/>



Reply via email to