Hi, I have been struggling with this particular Exception for days and thought I'd ask for help here.
I am using a KeyedProcessFunction with a private lazy val state: ValueState[Feature] = { val stateDescriptor = new ValueStateDescriptor[Feature]("CollectFeatureProcessState", createTypeInformation[Feature]) getRuntimeContext.getState(stateDescriptor) } which is used in my process function as follows override def processElement(value: Feature, ctx: KeyedProcessFunction[String, Feature, Feature]#Context, out: Collector[Feature]): Unit = { val current: Feature = state.value match { case null => value case exists => combine(value, exists) } if (checkForCompleteness(current)) { out.collect(current) state.clear() } else { state.update(current) } } Feature is a protobuf class that I registered with kryo as follows (using chill-protobuf) env.getConfig.registerTypeWithKryoSerializer(classOf[Feature], classOf[ProtobufSerializer]) But I also got Exceptions with normal scala case classes wrapping this Feature class, and without the ProtobufSerializer using the standard slow Java Serializer. The exception occurs within the first minutes/seconds of starting the app and looks as follows: 2023-02-03 08:41:07,577 WARN org.apache.flink.runtime.taskmanager.Task [] - KeyedProcess -> (Map -> Sink: FeatureSignalSink, Map -> Flat Map -> Sink: FeatureStore, Sink: logsink) (2/2)#0 (1befbd4d8975833fc973fc080ea866e4) switched from RUNNING to FAILED with failure cause: org.apache.flink.util.FlinkRuntimeException: Error while retrieving data from RocksDB. at org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:91) at com.grab.app.functions.stream.CollectFeatureProcessFunction$.processElement(CollectFeatureProcessFunction.scala:55) at com.grab.app.functions.stream.CollectFeatureProcessFunction $.processElement(CollectFeatureProcessFunction.scala:17) at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) at java.lang.Thread.run(Thread.java:750) Caused by: java.io.EOFException at org.apache.flink.core.memory.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:329) at org.apache.flink.types.StringValue.readString(StringValue.java:786) at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:73) at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:31) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:123) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:32) at org.apache.flink.api.scala.typeutils.TraversableSerializer.deserialize(TraversableSerializer.scala:128) at org.apache.flink.api.scala.typeutils.TraversableSerializer.deserialize(TraversableSerializer.scala:34) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:123) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:32) at org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:89) ... 16 more The exception is thrown at com.grab.app.functions.stream.CollectFeatureProcessFunction$.processElement(CollectFeatureProcessFunction.scala:55) which is this line: val current: AcornHydraeventFeature = state.value match { Did someone run into this before and/or can point me at the right direction for further investigation? Thanks a lot Clemens -- By communicating with Grab Holdings Limited and/or its subsidiaries, associate companies and jointly controlled entities (collectively, “Grab”), you are deemed to have consented to the processing of your personal data as set out in the Privacy Notice which can be viewed at https://grab.com/privacy/ <https://grab.com/privacy/> This email contains confidential information that may be privileged and is only for the intended recipient(s). If you are not the intended recipient(s), please do not disseminate, distribute or copy this email. Please notify Grab immediately if you have received this by mistake and delete this email from your system. Email transmission may not be secure or error-free as any information could be intercepted, corrupted, lost, destroyed, delayed or incomplete, or contain viruses. Grab does not accept liability for any errors or omissions in this email that arise as a result of email transmission. All intellectual property rights in this email and any attachments shall remain vested in Grab, unless otherwise provided by law