Dear reader,

I'm currently working on writing a KafkaProducer which is able to serialize
a generic type using avro4s.
However this serialization schema is not serializable itself. Here is my
code for this:

The serialization schema:
class AvroSerializationSchema[IN : SchemaFor : FromRecord: ToRecord]
extends SerializationSchema[IN] {

  override def serialize(element: IN): Array[Byte] = {
    val byteArray = new ByteArrayOutputStream()
    val avroSer = AvroOutputStream.binary[IN](byteArray)
    avroSer.write(element)
    avroSer.flush()
    avroSer.close()

    return byteArray.toByteArray
  }
}

The job code:
case class Person(name : String, age : Int, address : Address)
case class Address(city : String, street : String)

class SimpleJob {

  @transient
  private lazy val serSchema : AvroSerializationSchema[Person] = new
AvroSerializationSchema[Person]()

  def start() = {
    val testPerson = Person("Test", 100, Address("Test", "Test"))

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    env.
      fromCollection(Seq(testPerson)).
      addSink(createKafkaSink())

    env.execute("Flink sample job")
  }


  def createKafkaSink() : RichSinkFunction[Person] = {
    //set some properties
    val properties = new Properties()
    properties.put("bootstrap.servers", "127.0.0.01:9092")
    properties.put("zookeeper.connect", "127.0.0.1:2181")

    new FlinkKafkaProducer011[Person]("persons", serSchema, properties)
  }

}

The code does compile, however it gives the following error on
runtime: InvalidProgramException: Object
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper@639c2c1d
is not serializable.

I assume this means that my custom SerializationSchema is not serializable
due to the use of SchemaFor, FromRecord and ToRecord.
Anyone knows a solution or workaround?

Thanks in advance!
Wouter

Reply via email to