Total job state size is somewhere in the Giga Bytes, but we’ve seen this with only ~500MB state for the impacted operator. We’re not using async io. Just a Kafka source, a chain of operators (most of them stateful) and then Kafka sinks.
Peter Westermann Director, Analytics Software Architect Genesys Cloud - Analytics [signature_1853126556] peter.westerm...@genesys.com<mailto:richard.sch...@genesys.com> [signature_1853126556] [signature_414073906]<http://www.genesys.com/> [signature_1141770227]<https://twitter.com/Genesys>[signature_1333433069]<http://www.linkedin.com/company/601919?trk=tyah>[signature_769303212]<https://plus.google.com/+Genesyslab?rel=publisher>[signature_561365254]<https://www.facebook.com/Genesys/>[signature_262758434]<https://www.youtube.com/Genesys>[signature_2090310858]<http://blog.genesys.com/> From: Gabor Somogyi <gabor.g.somo...@gmail.com> Date: Friday, February 14, 2025 at 12:25 PM To: Peter Westermann <no.westerm...@genesys.com> Cc: user@flink.apache.org <user@flink.apache.org> Subject: Re: Issues with state corruption in RocksDB EXTERNAL EMAIL - Please use caution with links and attachments ________________________________ Is it related to state size or can happen with small states too? Are you using async io [1]? Some pattern would be useful, otherwise hard to do anything🤷🏻♂️ [1] https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/dev/datastream/operators/asyncio/<https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/dev/datastream/operators/asyncio> BR, G On Fri, Feb 14, 2025 at 6:17 PM Peter Westermann <no.westerm...@genesys.com<mailto:no.westerm...@genesys.com>> wrote: Since this doesn’t happen consistently, we were never able to reproduce this. Just across a large number of various Flink jobs, we run into this sporadically (and since this is in our production environment, we can’t export savepoints/checkpoints for further analysis). My guess would be that there’s a race condition somewhere that may cause this. This may be coincidence, but it seems that all state corruption issues occurred in operators that contain multiple ValueStates (or a MapState and a ValueState). Peter Westermann Director, Analytics Software Architect Genesys Cloud - Analytics [signature_1853126556] peter.westerm...@genesys.com<mailto:richard.sch...@genesys.com> [signature_1853126556] [signature_414073906]<http://www.genesys.com/> [signature_1141770227]<https://twitter.com/Genesys>[signature_1333433069]<http://www.linkedin.com/company/601919?trk=tyah>[signature_769303212]<https://plus.google.com/+Genesyslab?rel=publisher>[signature_561365254]<https://www.facebook.com/Genesys>[signature_262758434]<https://www.youtube.com/Genesys>[signature_2090310858]<http://blog.genesys.com/> From: Gabor Somogyi <gabor.g.somo...@gmail.com<mailto:gabor.g.somo...@gmail.com>> Date: Friday, February 14, 2025 at 11:54 AM To: Peter Westermann <no.westerm...@genesys.com<mailto:no.westerm...@genesys.com>> Cc: user@flink.apache.org<mailto:user@flink.apache.org> <user@flink.apache.org<mailto:user@flink.apache.org>> Subject: Re: Issues with state corruption in RocksDB EXTERNAL EMAIL - Please use caution with links and attachments ________________________________ BTW, do you have a bare minimum application which can reproduce the issue? On Fri, Feb 14, 2025 at 5:07 PM Gabor Somogyi <gabor.g.somo...@gmail.com<mailto:gabor.g.somo...@gmail.com>> wrote: Hi Peter, I've had a look on this and seems like this problem or a similar one still exists in 1.20. Since I've not done in-depth analysis I can only say this by taking a look at the last comments, like: > Faced an issue on Flink 1.20.0 but there are more of such... So all in all I've the feeling that this area still has some edge case or race issue(s) left. BR, G On Tue, Jan 7, 2025 at 4:26 PM Peter Westermann <no.westerm...@genesys.com<mailto:no.westerm...@genesys.com>> wrote: In the past, we have infrequently had issues with corrupted timer state (see https://issues.apache.org/jira/browse/FLINK-23886<https://issues.apache.org/jira/browse/FLINK-23886>), which resolve, when we go back to a previous savepoint and restart the job from that. Recently (since we upgraded to Flink 1.19.1), we have seen more state corruption issues for checkpoint/savepoint data in RocksDB. Each time, the resolution has been to go back to a previous savepoint and replay events and everything was fine. Issues we have seen: * Exceptions when deserializing MapState entries, with an EOFException at the start of key deserialization: j.io.EOFException: null at o.a.f.c.m.DataInputDeserializer.readUnsignedShort(DataInputDeserializer.java:339) at o.a.f.c.m.DataInputDeserializer.readUTF(DataInputDeserializer.java:251) at c.g.a.f.o.t.s.ObservationKeySerializer.deserialize(ObservationKeySerializer.java:68) at c.g.a.f.o.t.s.ObservationKeySerializer.deserialize(ObservationKeySerializer.java:17) at o.a.f.c.s.s.RocksDBMapState.deserializeUserKey(RocksDBMapState.java:390) at o.a.f.c.s.s.RocksDBMapState.access$000(RocksDBMapState.java:66) at o.a.f.c.s.s.RocksDBMapState$RocksDBMapEntry.getKey(RocksDBMapState.java:493) ... 27 common frames omitted Wrapped by: o.a.f.u.FlinkRuntimeException: Error while deserializing the user key. at o.a.f.c.s.s.RocksDBMapState$RocksDBMapEntry.getKey(RocksDBMapState.java:496) at c.g.a.f.o.t.ObservationsWrapper.filterObservations(ObservationsWrapper.java:83) * Same with ValueState, with an EOFException immediatel when deserializing java.io.EOFException: null at o.a.f.c.m.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:329) at o.a.f.t.StringValue.readString(StringValue.java:781) at o.a.f.a.c.t.b.StringSerializer.deserialize(StringSerializer.java:73) at o.a.f.a.c.t.b.StringSerializer.deserialize(StringSerializer.java:31) at o.a.f.a.j.t.r.PojoSerializer.deserialize(PojoSerializer.java:463) at o.a.f.c.s.s.RocksDBValueState.value(RocksDBValueState.java:88) ... 23 common frames omitted * Exceptions for operators with multiple ValueStates, where deserialization fails due to type issues: java.lang.IllegalArgumentException: Can not set java.util.Map field com.genesys.analytics.flink.user.processing.window.AggregateStatusStats.durationStats to java.lang.Double at java.base/jdk.internal.reflect.UnsafeFieldAccessorImpl.throwSetIllegalArgumentException(UnsafeFieldAccessorImpl.java:167) at java.base/jdk.internal.reflect.UnsafeFieldAccessorImpl.throwSetIllegalArgumentException(UnsafeFieldAccessorImpl.java:171) at java.base/jdk.internal.reflect.UnsafeObjectFieldAccessorImpl.set(UnsafeObjectFieldAccessorImpl.java:81) at java.base/java.lang.reflect.Field.set(Field.java:799) at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:465) at org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:88) This one almost looks like one ValueState<A> overwrote the other ValueState<B> Our setup is Flink 1.19.1 deployed on AWS EC2 with RocksDB as the state backend and S3 for checkpoint/savepoint storage. We’ve had very few issues with this since 2017, but multiple incidents in the last few months. Is there anything that changed on the Flink side? Peter Westermann Director, Analytics Software Architect Genesys Cloud - Analytics [signature_1853126556] peter.westerm...@genesys.com<mailto:richard.sch...@genesys.com> [signature_1853126556] [signature_414073906]<http://www.genesys.com/> [signature_1141770227]<https://twitter.com/Genesys>[signature_1333433069]<http://www.linkedin.com/company/601919?trk=tyah>[signature_769303212]<https://plus.google.com/+Genesyslab?rel=publisher>[signature_561365254]<https://www.facebook.com/Genesys>[signature_262758434]<https://www.youtube.com/Genesys>[signature_2090310858]<http://blog.genesys.com/>