Hi, I have been struggling with this particular Exception for days and
thought I'd ask for help here.

I am using a KeyedProcessFunction with a

  private lazy val state: ValueState[Feature] = {
    val stateDescriptor = new
ValueStateDescriptor[Feature]("CollectFeatureProcessState",
createTypeInformation[Feature])
    getRuntimeContext.getState(stateDescriptor)
  }


which is used in my process function as follows

  override def processElement(value: Feature, ctx:
KeyedProcessFunction[String, Feature, Feature]#Context, out:
Collector[Feature]): Unit = {
    val current: Feature = state.value match {
      case null => value
      case exists => combine(value, exists)
    }
    if (checkForCompleteness(current)) {
      out.collect(current)
      state.clear()
    } else {
      state.update(current)
    }
  }


Feature is a protobuf class that I registered with kryo as follows (using
chill-protobuf)

env.getConfig.registerTypeWithKryoSerializer(classOf[Feature],
classOf[ProtobufSerializer])

But I also got Exceptions with normal scala case classes wrapping this
Feature class, and without the ProtobufSerializer using the standard slow
Java Serializer.
The exception occurs within the first minutes/seconds of starting the app
and looks as follows:

2023-02-03 08:41:07,577 WARN  org.apache.flink.runtime.taskmanager.Task
               [] - KeyedProcess -> (Map -> Sink: FeatureSignalSink, Map ->
Flat Map -> Sink: FeatureStore, Sink: logsink) (2/2)#0
(1befbd4d8975833fc973fc080ea866e4) switched from RUNNING to FAILED with
failure cause: org.apache.flink.util.FlinkRuntimeException: Error while
retrieving data from RocksDB.
at
org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:91)
at
com.grab.app.functions.stream.CollectFeatureProcessFunction$.processElement(CollectFeatureProcessFunction.scala:55)
at com.grab.app.functions.stream.CollectFeatureProcessFunction
$.processElement(CollectFeatureProcessFunction.scala:17)
at
org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.io.EOFException
at
org.apache.flink.core.memory.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:329)
at org.apache.flink.types.StringValue.readString(StringValue.java:786)
at
org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:73)
at
org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:31)
at
org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:123)
at
org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:32)
at
org.apache.flink.api.scala.typeutils.TraversableSerializer.deserialize(TraversableSerializer.scala:128)
at
org.apache.flink.api.scala.typeutils.TraversableSerializer.deserialize(TraversableSerializer.scala:34)
at
org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:123)
at
org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:32)
at
org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:89)
... 16 more

The exception is thrown at
com.grab.app.functions.stream.CollectFeatureProcessFunction$.processElement(CollectFeatureProcessFunction.scala:55)
which is this line:

val current: AcornHydraeventFeature = state.value match {


Did someone run into this before and/or can point me at the right direction
for further investigation?

Thanks a lot
Clemens

-- 


By communicating with Grab Holdings Limited and/or its subsidiaries, 
associate companies and jointly controlled entities (collectively, “Grab”), 
you are deemed to have consented to the processing of your personal data as 
set out in the Privacy Notice which can be viewed at 
https://grab.com/privacy/  <https://grab.com/privacy/>


 This email 
contains confidential information that may be privileged and is only for 
the intended recipient(s). If you are not the intended recipient(s), please 
do not disseminate, distribute or copy this email. Please notify Grab 
immediately if you have received this by mistake and delete this email from 
your system. Email transmission may not be secure or error-free as any 
information could be intercepted, corrupted, lost, destroyed, delayed or 
incomplete, or contain viruses. Grab does not accept liability for any 
errors or omissions in this email that arise as a result of email 
transmission. All intellectual property rights in this email and any 
attachments shall remain vested in Grab, unless otherwise provided by law

Reply via email to