Total job state size is somewhere in the Giga Bytes, but we’ve seen this with 
only ~500MB state for the impacted operator.
We’re not using async io. Just a Kafka source, a chain of operators (most of 
them stateful) and then Kafka sinks.


Peter Westermann
Director, Analytics Software Architect
Genesys Cloud - Analytics
[signature_1853126556]
peter.westerm...@genesys.com<mailto:richard.sch...@genesys.com>
[signature_1853126556]
[signature_414073906]<http://www.genesys.com/>
[signature_1141770227]<https://twitter.com/Genesys>[signature_1333433069]<http://www.linkedin.com/company/601919?trk=tyah>[signature_769303212]<https://plus.google.com/+Genesyslab?rel=publisher>[signature_561365254]<https://www.facebook.com/Genesys/>[signature_262758434]<https://www.youtube.com/Genesys>[signature_2090310858]<http://blog.genesys.com/>




From: Gabor Somogyi <gabor.g.somo...@gmail.com>
Date: Friday, February 14, 2025 at 12:25 PM
To: Peter Westermann <no.westerm...@genesys.com>
Cc: user@flink.apache.org <user@flink.apache.org>
Subject: Re: Issues with state corruption in RocksDB
 EXTERNAL EMAIL - Please use caution with links and attachments

________________________________
Is it related to state size or can happen with small states too?
Are you using async io [1]?

Some pattern would be useful, otherwise hard to do anything🤷🏻‍♂️

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/dev/datastream/operators/asyncio/<https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/dev/datastream/operators/asyncio>

BR,
G


On Fri, Feb 14, 2025 at 6:17 PM Peter Westermann 
<no.westerm...@genesys.com<mailto:no.westerm...@genesys.com>> wrote:
Since this doesn’t happen consistently, we were never able to reproduce this. 
Just across a large number of various Flink jobs, we run into this sporadically 
(and since this is in our production environment, we can’t export 
savepoints/checkpoints for further analysis).  My guess would be that there’s a 
race condition somewhere that may cause this.


This may be coincidence, but it seems that all state corruption issues occurred 
in operators that contain multiple ValueStates (or a MapState and a ValueState).


Peter Westermann
Director, Analytics Software Architect
Genesys Cloud - Analytics
[signature_1853126556]
peter.westerm...@genesys.com<mailto:richard.sch...@genesys.com>
[signature_1853126556]
[signature_414073906]<http://www.genesys.com/>
[signature_1141770227]<https://twitter.com/Genesys>[signature_1333433069]<http://www.linkedin.com/company/601919?trk=tyah>[signature_769303212]<https://plus.google.com/+Genesyslab?rel=publisher>[signature_561365254]<https://www.facebook.com/Genesys>[signature_262758434]<https://www.youtube.com/Genesys>[signature_2090310858]<http://blog.genesys.com/>




From: Gabor Somogyi 
<gabor.g.somo...@gmail.com<mailto:gabor.g.somo...@gmail.com>>
Date: Friday, February 14, 2025 at 11:54 AM
To: Peter Westermann 
<no.westerm...@genesys.com<mailto:no.westerm...@genesys.com>>
Cc: user@flink.apache.org<mailto:user@flink.apache.org> 
<user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Re: Issues with state corruption in RocksDB
 EXTERNAL EMAIL - Please use caution with links and attachments

________________________________
BTW, do you have a bare minimum application which can reproduce the issue?

On Fri, Feb 14, 2025 at 5:07 PM Gabor Somogyi 
<gabor.g.somo...@gmail.com<mailto:gabor.g.somo...@gmail.com>> wrote:
Hi Peter,

I've had a look on this and seems like this problem or a similar one still 
exists in 1.20.
Since I've not done in-depth analysis I can only say this by taking a look at 
the last comments, like:
> Faced an issue on Flink 1.20.0

but there are more of such...

So all in all I've the feeling that this area still has some edge case or race 
issue(s) left.

BR,
G


On Tue, Jan 7, 2025 at 4:26 PM Peter Westermann 
<no.westerm...@genesys.com<mailto:no.westerm...@genesys.com>> wrote:
In the past, we have infrequently had issues with corrupted timer state (see 
https://issues.apache.org/jira/browse/FLINK-23886<https://issues.apache.org/jira/browse/FLINK-23886>),
 which resolve, when we go back to a previous savepoint and restart the job 
from that. Recently (since we upgraded to Flink 1.19.1), we have seen more 
state corruption issues for checkpoint/savepoint data in RocksDB. Each time, 
the resolution has been to go back to a previous savepoint and replay events 
and everything was fine.
Issues we have seen:

  *   Exceptions when deserializing MapState entries, with an EOFException at 
the start of key deserialization:
j.io.EOFException: null
     at 
o.a.f.c.m.DataInputDeserializer.readUnsignedShort(DataInputDeserializer.java:339)
     at o.a.f.c.m.DataInputDeserializer.readUTF(DataInputDeserializer.java:251)
     at 
c.g.a.f.o.t.s.ObservationKeySerializer.deserialize(ObservationKeySerializer.java:68)
     at 
c.g.a.f.o.t.s.ObservationKeySerializer.deserialize(ObservationKeySerializer.java:17)
     at o.a.f.c.s.s.RocksDBMapState.deserializeUserKey(RocksDBMapState.java:390)
     at o.a.f.c.s.s.RocksDBMapState.access$000(RocksDBMapState.java:66)
     at 
o.a.f.c.s.s.RocksDBMapState$RocksDBMapEntry.getKey(RocksDBMapState.java:493)
     ... 27 common frames omitted
Wrapped by: o.a.f.u.FlinkRuntimeException: Error while deserializing the user 
key.
     at 
o.a.f.c.s.s.RocksDBMapState$RocksDBMapEntry.getKey(RocksDBMapState.java:496)
     at 
c.g.a.f.o.t.ObservationsWrapper.filterObservations(ObservationsWrapper.java:83)

  *   Same with ValueState, with an EOFException immediatel when deserializing
java.io.EOFException: null

at 
o.a.f.c.m.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:329)

at o.a.f.t.StringValue.readString(StringValue.java:781)

at o.a.f.a.c.t.b.StringSerializer.deserialize(StringSerializer.java:73)

at o.a.f.a.c.t.b.StringSerializer.deserialize(StringSerializer.java:31)

at o.a.f.a.j.t.r.PojoSerializer.deserialize(PojoSerializer.java:463)

at o.a.f.c.s.s.RocksDBValueState.value(RocksDBValueState.java:88)

... 23 common frames omitted

  *   Exceptions for operators with multiple ValueStates, where deserialization 
fails due to type issues:
java.lang.IllegalArgumentException: Can not set java.util.Map field 
com.genesys.analytics.flink.user.processing.window.AggregateStatusStats.durationStats
 to java.lang.Double
    at 
java.base/jdk.internal.reflect.UnsafeFieldAccessorImpl.throwSetIllegalArgumentException(UnsafeFieldAccessorImpl.java:167)
    at 
java.base/jdk.internal.reflect.UnsafeFieldAccessorImpl.throwSetIllegalArgumentException(UnsafeFieldAccessorImpl.java:171)
    at 
java.base/jdk.internal.reflect.UnsafeObjectFieldAccessorImpl.set(UnsafeObjectFieldAccessorImpl.java:81)
    at java.base/java.lang.reflect.Field.set(Field.java:799)
    at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:465)
    at 
org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:88)
This one almost looks like one ValueState<A>  overwrote the other ValueState<B>

Our setup is Flink 1.19.1 deployed on AWS EC2 with RocksDB as the state backend 
and S3 for checkpoint/savepoint storage. We’ve had very few issues with this 
since 2017, but multiple incidents in the last few months. Is there anything 
that changed on the Flink side?


Peter Westermann
Director, Analytics Software Architect
Genesys Cloud - Analytics
[signature_1853126556]
peter.westerm...@genesys.com<mailto:richard.sch...@genesys.com>
[signature_1853126556]
[signature_414073906]<http://www.genesys.com/>
[signature_1141770227]<https://twitter.com/Genesys>[signature_1333433069]<http://www.linkedin.com/company/601919?trk=tyah>[signature_769303212]<https://plus.google.com/+Genesyslab?rel=publisher>[signature_561365254]<https://www.facebook.com/Genesys>[signature_262758434]<https://www.youtube.com/Genesys>[signature_2090310858]<http://blog.genesys.com/>



Reply via email to