Begin forwarded message:
*From: *Boris Lublinsky <boris.lublin...@lightbend.com
<mailto:boris.lublin...@lightbend.com>>
*Subject: **Re: Custom Kryo serializer*
*Date: *July 19, 2017 at 8:28:16 AM CDT
*To: *user@flink.apache.org <mailto:user@flink.apache.org>,
ches...@apache.org <mailto:ches...@apache.org>
Thanks for the reply, but I am not using it for managed state, but
rather for the raw state
In my implementation I have the following
class DataProcessorKeyedextends CoProcessFunction[WineRecord, ModelToServe,
Double]{
// The managed keyed state see
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html
var modelState: ValueState[ModelToServeStats] = _
var newModelState: ValueState[ModelToServeStats] = _
// The raw state -
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html#raw-and-managed-state
var currentModel : Option[Model] =None
var newModel : Option[Model] =None
Where current and new model are instances of the trait for which I
implement serializer
According to documentation
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html#raw-and-managed-state
“/Raw State/ is state that operators keep in their own data
structures. When checkpointed, they only write a sequence of bytes
into the checkpoint. Flink knows nothing about the state’s data
structures and sees only the raw bytes.”
So I was assuming that I need to provide serializer for this.
Am I missing something?
Boris Lublinsky
FDP Architect
boris.lublin...@lightbend.com <mailto:boris.lublin...@lightbend.com>
https://www.lightbend.com/
---------- Forwarded message ----------
From: *Chesnay Schepler* <ches...@apache.org
<mailto:ches...@apache.org>>
Date: Wed, Jul 19, 2017 at 1:34 PM
Subject: Re: Custom Kryo serializer
To: user@flink.apache.org <mailto:user@flink.apache.org>
Hello,
I assume you're passing the class of your serializer in a
StateDescriptor constructor.
If so, you could add a breakpoint in
Statedescriptor#initializeSerializerUnlessSet,
and check what typeInfo is created and which serializer is created
as a result.
One thing you could try right away is registering your serializer
for the Model implementations,
instead of the trait.
Regards,
Chesnay
On 14.07.2017 15:50, Boris Lublinsky wrote:
Hi
I have several implementations of my Model trait,
trait Model {
def score(input :AnyVal) :AnyVal def cleanup() :Unit def toBytes() :
Array[Byte]
def getType :Long }
neither one of them are serializable, but are used in the state
definition.
So I implemented custom serializer
import com.esotericsoftware.kryo.io
<http://com.esotericsoftware.kryo.io/>.{Input, Output}
import com.esotericsoftware.kryo.{Kryo, Serializer}
import com.lightbend.model.modeldescriptor.ModelDescriptor
class ModelSerializerKryoextends Serializer[Model]{
super.setAcceptsNull(false)
super.setImmutable(true)
/** Reads bytes and returns a new object of the specified concrete
type. * <p> * Before Kryo can be used to read child objects, {@link
Kryo#reference(Object)} must be called with the parent object to *
ensure it can be referenced by the child objects. Any serializer
that uses {@link Kryo} to read a child object may need to * be
reentrant. * <p> * This method should not be called directly,
instead this serializer can be passed to {@link Kryo} read methods
that accept a * serialier. * * @return May be null if { @link
#getAcceptsNull()} is true. */ override def read(kryo: Kryo, input: Input, `type`:Class[Model]): Model = {
import ModelSerializerKryo._
val mType = input.readLong().asInstanceOf[Int]
val bytes =Stream.continually(input.readByte()).takeWhile(_ != -1).toArray
factories.get(mType)match {
case Some(factory) => factory.restore(bytes)
case _ =>throw new Exception(s"Unknown model type $mTypeto restore")
}
}
/** Writes the bytes for the object to the output. * <p> * This
method should not be called directly, instead this serializer can
be passed to {@link Kryo} write methods that accept a * serialier.
* * @param value May be null if { @link #getAcceptsNull()} is true.
*/ override def write(kryo: Kryo, output: Output, value: Model):Unit = {
output.writeLong(value.getType)
output.write(value.toBytes)
}
}
object ModelSerializerKryo{
private val factories =Map(ModelDescriptor.ModelType.PMML.value -> PMMLModel,
ModelDescriptor.ModelType.TENSORFLOW.value -> TensorFlowModel)
}
And added the following
// Add custom serializer env.getConfig.addDefaultKryoSerializer(classOf[Model],
classOf[ModelSerializerKryo])
To configure it.
I can see checkpoint messages at the output console, but I never
hist a break point in serializer.
Any suggestions?
Boris Lublinsky
FDP Architect
boris.lublin...@lightbend.com <mailto:boris.lublin...@lightbend.com>
https://www.lightbend.com/
Begin forwarded message:
*From: *Boris Lublinsky <boris.lublin...@lightbend.com
<mailto:boris.lublin...@lightbend.com>>
*Subject: **Custom Kryo serializer*
*Date: *July 14, 2017 at 8:50:22 AM CDT
*To: *user@flink.apache.org <mailto:user@flink.apache.org>
Hi
I have several implementations of my Model trait,
trait Model {
def score(input :AnyVal) :AnyVal def cleanup() :Unit def toBytes() :
Array[Byte]
def getType :Long }
neither one of them are serializable, but are used in the state
definition.
So I implemented custom serializer
import com.esotericsoftware.kryo.io
<http://com.esotericsoftware.kryo.io/>.{Input, Output}
import com.esotericsoftware.kryo.{Kryo, Serializer}
import com.lightbend.model.modeldescriptor.ModelDescriptor
class ModelSerializerKryoextends Serializer[Model]{
super.setAcceptsNull(false)
super.setImmutable(true)
/** Reads bytes and returns a new object of the specified concrete
type. * <p> * Before Kryo can be used to read child objects, {@link
Kryo#reference(Object)} must be called with the parent object to *
ensure it can be referenced by the child objects. Any serializer that
uses {@link Kryo} to read a child object may need to * be reentrant.
* <p> * This method should not be called directly, instead this
serializer can be passed to {@link Kryo} read methods that accept a *
serialier. * * @return May be null if { @link #getAcceptsNull()} is
true. */ override def read(kryo: Kryo, input: Input, `type`:Class[Model]): Model = {
import ModelSerializerKryo._
val mType = input.readLong().asInstanceOf[Int]
val bytes =Stream.continually(input.readByte()).takeWhile(_ != -1).toArray
factories.get(mType)match {
case Some(factory) => factory.restore(bytes)
case _ =>throw new Exception(s"Unknown model type $mTypeto restore")
}
}
/** Writes the bytes for the object to the output. * <p> * This
method should not be called directly, instead this serializer can be
passed to {@link Kryo} write methods that accept a * serialier. * *
@param value May be null if { @link #getAcceptsNull()} is true. */
override def write(kryo: Kryo, output: Output, value: Model):Unit = {
output.writeLong(value.getType)
output.write(value.toBytes)
}
}
object ModelSerializerKryo{
private val factories =Map(ModelDescriptor.ModelType.PMML.value -> PMMLModel,
ModelDescriptor.ModelType.TENSORFLOW.value -> TensorFlowModel)
}
And added the following
// Add custom serializer env.getConfig.addDefaultKryoSerializer(classOf[Model],
classOf[ModelSerializerKryo])
To configure it.
I can see checkpoint messages at the output console, but I never hist
a break point in serializer.
Any suggestions?
Boris Lublinsky
FDP Architect
boris.lublin...@lightbend.com <mailto:boris.lublin...@lightbend.com>
https://www.lightbend.com/