If I store the Java protobuf objects in the rocksdb instead of the scala objects, I get this stacktrace:
2023-02-07 09:17:04,246 WARN org.apache.flink.runtime.taskmanager.Task [] - KeyedProcess -> (Map -> Sink: signalSink, Map -> Flat Map -> Sink: FeatureSink, Sink: logsink) (2/2)#0 (fa4aae8fb7d2a7a94eafb36fe5470851_6760a9723a5626620871f040128bad1b_1_0) switched from RUNNING to FAILED with failure cause: org.apache.flink.util.FlinkRuntimeException: Error while adding data to RocksDB at org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:109) at com.grab.grabdefence.acorn.app.functions.stream.CollectFeatureProcessFunction$.processElement(CollectFeatureProcessFunction.scala:69) at com.grab.grabdefence.acorn.app.functions.stream.CollectFeatureProcessFunction$.processElement(CollectProcessFunction.scala:18) at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) at java.lang.Thread.run(Thread.java:750) Caused by: java.lang.IllegalStateException: The Kryo Output still contains data from a previous serialize call. It has to be flushed or cleared at the end of the serialize call. at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:358) at org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValueInternal(AbstractRocksDBState.java:158) at org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValue(AbstractRocksDBState.java:180) at org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValue(AbstractRocksDBState.java:168) at org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:107) ... 16 more I do not touch the Kryo Serializer apart from the one registerTypeWithKryoSerializer call, and I only call the state.value() and update() once each in the processElement() method. I thought these Stores are supposedly abstracted away safely enough that as a user I wouldn't have to worry about the exact Flush/Serialization/Deserialization logic but it seems that this application breaks even though I am only using what I think is quite innocent code? On Fri, Feb 3, 2023 at 4:52 PM Clemens Valiente <clemens.valie...@grab.com> wrote: > > Hi, I have been struggling with this particular Exception for days and > thought I'd ask for help here. > > I am using a KeyedProcessFunction with a > > private lazy val state: ValueState[Feature] = { > val stateDescriptor = new > ValueStateDescriptor[Feature]("CollectFeatureProcessState", > createTypeInformation[Feature]) > getRuntimeContext.getState(stateDescriptor) > } > > > which is used in my process function as follows > > override def processElement(value: Feature, ctx: > KeyedProcessFunction[String, Feature, Feature]#Context, out: > Collector[Feature]): Unit = { > val current: Feature = state.value match { > case null => value > case exists => combine(value, exists) > } > if (checkForCompleteness(current)) { > out.collect(current) > state.clear() > } else { > state.update(current) > } > } > > > Feature is a protobuf class that I registered with kryo as follows (using > chill-protobuf) > > env.getConfig.registerTypeWithKryoSerializer(classOf[Feature], > classOf[ProtobufSerializer]) > > But I also got Exceptions with normal scala case classes wrapping this > Feature class, and without the ProtobufSerializer using the standard slow > Java Serializer. > The exception occurs within the first minutes/seconds of starting the app > and looks as follows: > > 2023-02-03 08:41:07,577 WARN org.apache.flink.runtime.taskmanager.Task > [] - KeyedProcess -> (Map -> Sink: FeatureSignalSink, Map > -> Flat Map -> Sink: FeatureStore, Sink: logsink) (2/2)#0 > (1befbd4d8975833fc973fc080ea866e4) switched from RUNNING to FAILED with > failure cause: org.apache.flink.util.FlinkRuntimeException: Error while > retrieving data from RocksDB. > at > org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:91) > at > com.grab.app.functions.stream.CollectFeatureProcessFunction$.processElement(CollectFeatureProcessFunction.scala:55) > at com.grab.app.functions.stream.CollectFeatureProcessFunction > $.processElement(CollectFeatureProcessFunction.scala:17) > at > org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233) > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) > at java.lang.Thread.run(Thread.java:750) > Caused by: java.io.EOFException > at > org.apache.flink.core.memory.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:329) > at org.apache.flink.types.StringValue.readString(StringValue.java:786) > at > org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:73) > at > org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:31) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:123) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:32) > at > org.apache.flink.api.scala.typeutils.TraversableSerializer.deserialize(TraversableSerializer.scala:128) > at > org.apache.flink.api.scala.typeutils.TraversableSerializer.deserialize(TraversableSerializer.scala:34) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:123) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:32) > at > org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:89) > ... 16 more > > The exception is thrown at > com.grab.app.functions.stream.CollectFeatureProcessFunction$.processElement(CollectFeatureProcessFunction.scala:55) > which is this line: > > val current: AcornHydraeventFeature = state.value match { > > > Did someone run into this before and/or can point me at the right > direction for further investigation? > > Thanks a lot > Clemens > -- [image: Grab Singapore] <https://htmlsig.com/t/000001BKA99J> [image: Twitter] <https://twitter.com/grabth?lang=en> [image: Facebook] <https://www.facebook.com/GrabTH/> [image: LinkedIn] <https://www.linkedin.com/company/grabapp> [image: Instagram] <https://www.instagram.com/grabth/> [image: Youtube] <https://www.youtube.com/channel/UCrK1UNPks-lRzKwJ0kEWoJg> Clemens valienteclemens.valie...@grab.com Grab Singapore9 Straits View, Marina One West Tower, #23-07/12Singapore 018937www.grab.com -- By communicating with Grab Holdings Limited and/or its subsidiaries, associate companies and jointly controlled entities (collectively, “Grab”), you are deemed to have consented to the processing of your personal data as set out in the Privacy Notice which can be viewed at https://grab.com/privacy/ <https://grab.com/privacy/> This email contains confidential information that may be privileged and is only for the intended recipient(s). If you are not the intended recipient(s), please do not disseminate, distribute or copy this email. Please notify Grab immediately if you have received this by mistake and delete this email from your system. Email transmission may not be secure or error-free as any information could be intercepted, corrupted, lost, destroyed, delayed or incomplete, or contain viruses. Grab does not accept liability for any errors or omissions in this email that arise as a result of email transmission. All intellectual property rights in this email and any attachments shall remain vested in Grab, unless otherwise provided by law