Hello,
I assume you're passing the class of your serializer in a
StateDescriptor constructor.
If so, you could add a breakpoint in
Statedescriptor#initializeSerializerUnlessSet,
and check what typeInfo is created and which serializer is created as a
result.
One thing you could try right away is registering your serializer for
the Model implementations,
instead of the trait.
Regards,
Chesnay
On 14.07.2017 15:50, Boris Lublinsky wrote:
Hi
I have several implementations of my Model trait,
trait Model {
def score(input :AnyVal) :AnyVal def cleanup() :Unit def toBytes() :
Array[Byte]
def getType :Long }
neither one of them are serializable, but are used in the state
definition.
So I implemented custom serializer
import com.esotericsoftware.kryo.io
<http://com.esotericsoftware.kryo.io>.{Input, Output}
import com.esotericsoftware.kryo.{Kryo, Serializer}
import com.lightbend.model.modeldescriptor.ModelDescriptor
class ModelSerializerKryoextends Serializer[Model]{
super.setAcceptsNull(false)
super.setImmutable(true)
/** Reads bytes and returns a new object of the specified concrete
type. * <p> * Before Kryo can be used to read child objects, {@link
Kryo#reference(Object)} must be called with the parent object to *
ensure it can be referenced by the child objects. Any serializer that
uses {@link Kryo} to read a child object may need to * be reentrant. *
<p> * This method should not be called directly, instead this
serializer can be passed to {@link Kryo} read methods that accept a *
serialier. * * @return May be null if { @link #getAcceptsNull()} is
true. */ override def read(kryo: Kryo, input: Input, `type`:Class[Model]): Model = {
import ModelSerializerKryo._
val mType = input.readLong().asInstanceOf[Int]
val bytes =Stream.continually(input.readByte()).takeWhile(_ != -1).toArray
factories.get(mType)match {
case Some(factory) => factory.restore(bytes)
case _ =>throw new Exception(s"Unknown model type $mTypeto restore")
}
}
/** Writes the bytes for the object to the output. * <p> * This method
should not be called directly, instead this serializer can be passed
to {@link Kryo} write methods that accept a * serialier. * * @param
value May be null if { @link #getAcceptsNull()} is true. */ override
def write(kryo: Kryo, output: Output, value: Model):Unit = {
output.writeLong(value.getType)
output.write(value.toBytes)
}
}
object ModelSerializerKryo{
private val factories =Map(ModelDescriptor.ModelType.PMML.value -> PMMLModel,
ModelDescriptor.ModelType.TENSORFLOW.value -> TensorFlowModel)
}
And added the following
// Add custom serializer env.getConfig.addDefaultKryoSerializer(classOf[Model],
classOf[ModelSerializerKryo])
To configure it.
I can see checkpoint messages at the output console, but I never hist
a break point in serializer.
Any suggestions?
Boris Lublinsky
FDP Architect
boris.lublin...@lightbend.com <mailto:boris.lublin...@lightbend.com>
https://www.lightbend.com/