So, I'm still struggling with this issue. I dived a bit more into the problem and I'm pretty sure that the problem is that I have to (implicitly) pass the SchemaFor and RecordTo classes to my serialization schema (otherwise I can't make it generic). However those class aren't serializable, but of course I can't annotate them transient nor make it a lazy val which gives me the current issue.
I hope someone has some leads for me. Thanks! Op do 26 apr. 2018 om 23:03 schreef Wouter Zorgdrager <zorgdrag...@gmail.com >: > Hi Bill, > > Thanks for your answer. However this proposal isn't going to solve my > issue, since the problem here is that the context bounds I need to give in > order to serialize it to Avro (SchemaFor, ToRecord and FromRecord) aren't > serializable classes. This results in Flink not being able to serialize the > KafkaProducer failing the whole job. > > Thanks, > Wouter > > Op do 26 apr. 2018 om 00:42 schreef Nortman, Bill < > william.nort...@pimco.com>: > >> The things I would try would first in you are you class Person and >> Address have getters and setters and a no argument constructor. >> >> >> >> *From:* Wouter Zorgdrager [mailto:zorgdrag...@gmail.com] >> *Sent:* Wednesday, April 25, 2018 7:17 AM >> *To:* user@flink.apache.org >> *Subject:* KafkaProducer with generic (Avro) serialization schema >> >> >> >> Dear reader, >> >> >> >> I'm currently working on writing a KafkaProducer which is able to >> serialize a generic type using avro4s. >> >> However this serialization schema is not serializable itself. Here is my >> code for this: >> >> >> >> The serialization schema: >> >> class AvroSerializationSchema[IN : SchemaFor : FromRecord: ToRecord] >> extends SerializationSchema[IN] { >> >> >> >> override def serialize(element: IN): Array[Byte] = { >> >> val byteArray = new ByteArrayOutputStream() >> >> val avroSer = AvroOutputStream.binary[IN](byteArray) >> >> avroSer.write(element) >> >> avroSer.flush() >> >> avroSer.close() >> >> >> >> return byteArray.toByteArray >> >> } >> >> } >> >> >> >> The job code: >> >> case class Person(name : String, age : Int, address : Address) >> >> case class Address(city : String, street : String) >> >> >> >> class SimpleJob { >> >> >> >> @transient >> >> private lazy val serSchema : AvroSerializationSchema[Person] = new >> AvroSerializationSchema[Person]() >> >> >> >> def start() = { >> >> val testPerson = Person("Test", 100, Address("Test", "Test")) >> >> >> >> val env = StreamExecutionEnvironment.getExecutionEnvironment >> >> >> >> env. >> >> fromCollection(Seq(testPerson)). >> >> addSink(createKafkaSink()) >> >> >> >> env.execute("Flink sample job") >> >> } >> >> >> >> >> >> def createKafkaSink() : RichSinkFunction[Person] = { >> >> //set some properties >> >> val properties = new Properties() >> >> properties.put("bootstrap.servers", "127.0.0.01:9092 >> <https://urldefense.proofpoint.com/v2/url?u=http-3A__127.0.0.01-3A9092&d=DwMFaQ&c=91HTncUBNS9Yv-Uuv2IlCA&r=uIPe81JDQPedACYiDVt5c_onxyyylzQ6P_yssgrLiLA&m=r7xfZoirhywTOErLvFFTTOxHs8dZeMd6-JtIaFdDtb0&s=GR3YuCSPimKhPq1hcics55VX6yef8lIsMEyTmEGFRSc&e=> >> ") >> >> properties.put("zookeeper.connect", "127.0.0.1:2181 >> <https://urldefense.proofpoint.com/v2/url?u=http-3A__127.0.0.1-3A2181&d=DwMFaQ&c=91HTncUBNS9Yv-Uuv2IlCA&r=uIPe81JDQPedACYiDVt5c_onxyyylzQ6P_yssgrLiLA&m=r7xfZoirhywTOErLvFFTTOxHs8dZeMd6-JtIaFdDtb0&s=zkbyqz0oyZOwyBZ9Hy7PpuGlTyPPB639vVkkFc6FlpQ&e=> >> ") >> >> >> >> new FlinkKafkaProducer011[Person]("persons", serSchema, properties) >> >> } >> >> >> >> } >> >> >> >> The code does compile, however it gives the following error on >> runtime: InvalidProgramException: Object >> org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper@639c2c1d >> is not serializable. >> >> >> >> I assume this means that my custom SerializationSchema is not >> serializable due to the use of SchemaFor, FromRecord and ToRecord. >> >> Anyone knows a solution or workaround? >> >> >> >> Thanks in advance! >> >> Wouter >> >> This message contains confidential information and is intended only for >> the individual named. If you are not the named addressee, you should not >> disseminate, distribute, alter or copy this e-mail. Please notify the >> sender immediately by e-mail if you have received this e-mail by mistake >> and delete this e-mail from your system. E-mail transmissions cannot be >> guaranteed to be secure or without error as information could be >> intercepted, corrupted, lost, destroyed, arrive late or incomplete, or >> contain viruses. The sender, therefore, does not accept liability for any >> errors or omissions in the contents of this message which arise during or >> as a result of e-mail transmission. If verification is required, please >> request a hard-copy version. This message is provided for information >> purposes and should not be construed as a solicitation or offer to buy or >> sell any securities or related financial instruments in any jurisdiction. >> Securities are offered in the U.S. through PIMCO Investments LLC, >> distributor and a company of PIMCO LLC. >> >> The individual providing the information herein is an employee of Pacific >> Investment Management Company LLC ("PIMCO"), an SEC-registered investment >> adviser. To the extent such individual advises you regarding a PIMCO >> investment strategy, he or she does so as an associated person of PIMCO. >> To the extent that any information is provided to you related to a >> PIMCO-sponsored investment fund ("PIMCO Fund"), it is being provided to you >> in the individual's capacity as a registered representative of PIMCO >> Investments LLC ("PI"), an SEC-registered broker-dealer. PI is not >> registered, and does not intend to register, as a municipal advisor and >> therefore does not provide advice with respect to the investment of the >> proceeds of municipal securities or municipal escrow investments. In >> addition, unless otherwise agreed by PIMCO, this communication and any >> related attachments are being provided on the express basis that they will >> not cause PIMCO LLC, or its affiliates, to become an investment advice >> fiduciary under ERISA or the Internal Revenue Code. >> >