Dear reader, I'm currently working on writing a KafkaProducer which is able to serialize a generic type using avro4s. However this serialization schema is not serializable itself. Here is my code for this:
The serialization schema: class AvroSerializationSchema[IN : SchemaFor : FromRecord: ToRecord] extends SerializationSchema[IN] { override def serialize(element: IN): Array[Byte] = { val byteArray = new ByteArrayOutputStream() val avroSer = AvroOutputStream.binary[IN](byteArray) avroSer.write(element) avroSer.flush() avroSer.close() return byteArray.toByteArray } } The job code: case class Person(name : String, age : Int, address : Address) case class Address(city : String, street : String) class SimpleJob { @transient private lazy val serSchema : AvroSerializationSchema[Person] = new AvroSerializationSchema[Person]() def start() = { val testPerson = Person("Test", 100, Address("Test", "Test")) val env = StreamExecutionEnvironment.getExecutionEnvironment env. fromCollection(Seq(testPerson)). addSink(createKafkaSink()) env.execute("Flink sample job") } def createKafkaSink() : RichSinkFunction[Person] = { //set some properties val properties = new Properties() properties.put("bootstrap.servers", "127.0.0.01:9092") properties.put("zookeeper.connect", "127.0.0.1:2181") new FlinkKafkaProducer011[Person]("persons", serSchema, properties) } } The code does compile, however it gives the following error on runtime: InvalidProgramException: Object org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper@639c2c1d is not serializable. I assume this means that my custom SerializationSchema is not serializable due to the use of SchemaFor, FromRecord and ToRecord. Anyone knows a solution or workaround? Thanks in advance! Wouter