Thanks for the reply, but I am not using it for managed state, but rather for the raw state In my implementation I have the following
class DataProcessorKeyed extends CoProcessFunction[WineRecord, ModelToServe, Double]{ // The managed keyed state see https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html var modelState: ValueState[ModelToServeStats] = _ var newModelState: ValueState[ModelToServeStats] = _ // The raw state - https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html#raw-and-managed-state var currentModel : Option[Model] = None var newModel : Option[Model] = None Where current and new model are instances of the trait for which I implement serializer According to documentation https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html#raw-and-managed-state <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html#raw-and-managed-state> “Raw State is state that operators keep in their own data structures. When checkpointed, they only write a sequence of bytes into the checkpoint. Flink knows nothing about the state’s data structures and sees only the raw bytes.” So I was assuming that I need to provide serializer for this. Am I missing something? Boris Lublinsky FDP Architect boris.lublin...@lightbend.com https://www.lightbend.com/ > > ---------- Forwarded message ---------- > From: Chesnay Schepler <ches...@apache.org <mailto:ches...@apache.org>> > Date: Wed, Jul 19, 2017 at 1:34 PM > Subject: Re: Custom Kryo serializer > To: user@flink.apache.org <mailto:user@flink.apache.org> > > > Hello, > > I assume you're passing the class of your serializer in a StateDescriptor > constructor. > > If so, you could add a breakpoint in > Statedescriptor#initializeSerializerUnlessSet, > and check what typeInfo is created and which serializer is created as a > result. > > One thing you could try right away is registering your serializer for the > Model implementations, > instead of the trait. > > Regards, > Chesnay > > > On 14.07.2017 15:50, Boris Lublinsky wrote: >> Hi >> I have several implementations of my Model trait, >> >> trait Model { >> def score(input : AnyVal) : AnyVal >> def cleanup() : Unit >> def toBytes() : Array[Byte] >> def getType : Long >> } >> >> neither one of them are serializable, but are used in the state definition. >> So I implemented custom serializer >> >> import com.esotericsoftware.kryo.io >> <http://com.esotericsoftware.kryo.io/>.{Input, Output} >> import com.esotericsoftware.kryo.{Kryo, Serializer} >> import com.lightbend.model.modeldescriptor.ModelDescriptor >> >> >> class ModelSerializerKryo extends Serializer[Model]{ >> >> super.setAcceptsNull(false) >> super.setImmutable(true) >> >> /** Reads bytes and returns a new object of the specified concrete type. >> * <p> >> * Before Kryo can be used to read child objects, {@link >> Kryo#reference(Object)} must be called with the parent object to >> * ensure it can be referenced by the child objects. Any serializer that >> uses {@link Kryo} to read a child object may need to >> * be reentrant. >> * <p> >> * This method should not be called directly, instead this serializer can >> be passed to {@link Kryo} read methods that accept a >> * serialier. >> * >> * @return May be null if { @link #getAcceptsNull()} is true. */ >> >> override def read(kryo: Kryo, input: Input, `type`: Class[Model]): Model = >> { >> >> import ModelSerializerKryo._ >> >> val mType = input.readLong().asInstanceOf[Int] >> val bytes = Stream.continually(input.readByte()).takeWhile(_ != >> -1).toArray >> factories.get(mType) match { >> case Some(factory) => factory.restore(bytes) >> case _ => throw new Exception(s"Unknown model type $mType to restore") >> } >> } >> >> /** Writes the bytes for the object to the output. >> * <p> >> * This method should not be called directly, instead this serializer can >> be passed to {@link Kryo} write methods that accept a >> * serialier. >> * >> * @param value May be null if { @link #getAcceptsNull()} is true. */ >> >> override def write(kryo: Kryo, output: Output, value: Model): Unit = { >> output.writeLong(value.getType) >> output.write(value.toBytes) >> } >> } >> >> object ModelSerializerKryo{ >> private val factories = Map(ModelDescriptor.ModelType.PMML.value -> >> PMMLModel, >> ModelDescriptor.ModelType.TENSORFLOW.value -> TensorFlowModel) >> } >> And added the following >> >> // Add custom serializer >> env.getConfig.addDefaultKryoSerializer(classOf[Model], >> classOf[ModelSerializerKryo]) >> >> To configure it. >> I can see checkpoint messages at the output console, but I never hist a >> break point in serializer. >> Any suggestions? >> >> >> >> >> Boris Lublinsky >> FDP Architect >> boris.lublin...@lightbend.com <mailto:boris.lublin...@lightbend.com> >> https://www.lightbend.com/ <https://www.lightbend.com/> > >