Hi I have several implementations of my Model trait, trait Model { def score(input : AnyVal) : AnyVal def cleanup() : Unit def toBytes() : Array[Byte] def getType : Long }
neither one of them are serializable, but are used in the state definition. So I implemented custom serializer import com.esotericsoftware.kryo.io.{Input, Output} import com.esotericsoftware.kryo.{Kryo, Serializer} import com.lightbend.model.modeldescriptor.ModelDescriptor class ModelSerializerKryo extends Serializer[Model]{ super.setAcceptsNull(false) super.setImmutable(true) /** Reads bytes and returns a new object of the specified concrete type. * <p> * Before Kryo can be used to read child objects, {@link Kryo#reference(Object)} must be called with the parent object to * ensure it can be referenced by the child objects. Any serializer that uses {@link Kryo} to read a child object may need to * be reentrant. * <p> * This method should not be called directly, instead this serializer can be passed to {@link Kryo} read methods that accept a * serialier. * * @return May be null if { @link #getAcceptsNull()} is true. */ override def read(kryo: Kryo, input: Input, `type`: Class[Model]): Model = { import ModelSerializerKryo._ val mType = input.readLong().asInstanceOf[Int] val bytes = Stream.continually(input.readByte()).takeWhile(_ != -1).toArray factories.get(mType) match { case Some(factory) => factory.restore(bytes) case _ => throw new Exception(s"Unknown model type $mType to restore") } } /** Writes the bytes for the object to the output. * <p> * This method should not be called directly, instead this serializer can be passed to {@link Kryo} write methods that accept a * serialier. * * @param value May be null if { @link #getAcceptsNull()} is true. */ override def write(kryo: Kryo, output: Output, value: Model): Unit = { output.writeLong(value.getType) output.write(value.toBytes) } } object ModelSerializerKryo{ private val factories = Map(ModelDescriptor.ModelType.PMML.value -> PMMLModel, ModelDescriptor.ModelType.TENSORFLOW.value -> TensorFlowModel) } And added the following // Add custom serializer env.getConfig.addDefaultKryoSerializer(classOf[Model], classOf[ModelSerializerKryo]) To configure it. I can see checkpoint messages at the output console, but I never hist a break point in serializer. Any suggestions? Boris Lublinsky FDP Architect boris.lublin...@lightbend.com https://www.lightbend.com/