Hi Wouter, you can try to make the SerializationSchema serializable by overriding Java's serialization methods writeObject() and readObject() similar as Flink's AvroRowSerializationSchema [1] does.
Best, Fabian [1] https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowSerializationSchema.java 2018-05-02 16:34 GMT+02:00 Piotr Nowojski <pi...@data-artisans.com>: > Hi, > > My Scala knowledge is very limited (and my Scala's serialization knowledge > is non existent), but one way or another you have to make your > SerializationSchema serialisable. If indeed this is the problem, maybe a > better place to ask this question is on Stack Overflow or some scala > specific mailing list/board (unless someone else from the Flink's community > can provide an answer to this problem)? > > Piotrek > > On 1 May 2018, at 16:30, Wouter Zorgdrager <zorgdrag...@gmail.com> wrote: > > So, I'm still struggling with this issue. I dived a bit more into the > problem and I'm pretty sure that the problem is that I have to (implicitly) > pass the SchemaFor and RecordTo classes to my serialization schema > (otherwise I can't make it generic). However those class aren't > serializable, but of course I can't annotate them transient nor make it a > lazy val which gives me the current issue. > > I hope someone has some leads for me. > > Thanks! > > Op do 26 apr. 2018 om 23:03 schreef Wouter Zorgdrager < > zorgdrag...@gmail.com>: > >> Hi Bill, >> >> Thanks for your answer. However this proposal isn't going to solve my >> issue, since the problem here is that the context bounds I need to give in >> order to serialize it to Avro (SchemaFor, ToRecord and FromRecord) aren't >> serializable classes. This results in Flink not being able to serialize the >> KafkaProducer failing the whole job. >> >> Thanks, >> Wouter >> >> Op do 26 apr. 2018 om 00:42 schreef Nortman, Bill < >> william.nort...@pimco.com>: >> >>> The things I would try would first in you are you class Person and >>> Address have getters and setters and a no argument constructor. >>> >>> >>> >>> *From:* Wouter Zorgdrager [mailto:zorgdrag...@gmail.com] >>> *Sent:* Wednesday, April 25, 2018 7:17 AM >>> *To:* user@flink.apache.org >>> *Subject:* KafkaProducer with generic (Avro) serialization schema >>> >>> >>> >>> Dear reader, >>> >>> >>> >>> I'm currently working on writing a KafkaProducer which is able to >>> serialize a generic type using avro4s. >>> >>> However this serialization schema is not serializable itself. Here is my >>> code for this: >>> >>> >>> >>> The serialization schema: >>> >>> class AvroSerializationSchema[IN : SchemaFor : FromRecord: ToRecord] >>> extends SerializationSchema[IN] { >>> >>> >>> >>> override def serialize(element: IN): Array[Byte] = { >>> >>> val byteArray = new ByteArrayOutputStream() >>> >>> val avroSer = AvroOutputStream.binary[IN](byteArray) >>> >>> avroSer.write(element) >>> >>> avroSer.flush() >>> >>> avroSer.close() >>> >>> >>> >>> return byteArray.toByteArray >>> >>> } >>> >>> } >>> >>> >>> >>> The job code: >>> >>> case class Person(name : String, age : Int, address : Address) >>> >>> case class Address(city : String, street : String) >>> >>> >>> >>> class SimpleJob { >>> >>> >>> >>> @transient >>> >>> private lazy val serSchema : AvroSerializationSchema[Person] = new >>> AvroSerializationSchema[Person]() >>> >>> >>> >>> def start() = { >>> >>> val testPerson = Person("Test", 100, Address("Test", "Test")) >>> >>> >>> >>> val env = StreamExecutionEnvironment.getExecutionEnvironment >>> >>> >>> >>> env. >>> >>> fromCollection(Seq(testPerson)). >>> >>> addSink(createKafkaSink()) >>> >>> >>> >>> env.execute("Flink sample job") >>> >>> } >>> >>> >>> >>> >>> >>> def createKafkaSink() : RichSinkFunction[Person] = { >>> >>> //set some properties >>> >>> val properties = new Properties() >>> >>> properties.put("bootstrap.servers", "127.0.0.01:9092 >>> <https://urldefense.proofpoint.com/v2/url?u=http-3A__127.0.0.01-3A9092&d=DwMFaQ&c=91HTncUBNS9Yv-Uuv2IlCA&r=uIPe81JDQPedACYiDVt5c_onxyyylzQ6P_yssgrLiLA&m=r7xfZoirhywTOErLvFFTTOxHs8dZeMd6-JtIaFdDtb0&s=GR3YuCSPimKhPq1hcics55VX6yef8lIsMEyTmEGFRSc&e=> >>> ") >>> >>> properties.put("zookeeper.connect", "127.0.0.1:2181 >>> <https://urldefense.proofpoint.com/v2/url?u=http-3A__127.0.0.1-3A2181&d=DwMFaQ&c=91HTncUBNS9Yv-Uuv2IlCA&r=uIPe81JDQPedACYiDVt5c_onxyyylzQ6P_yssgrLiLA&m=r7xfZoirhywTOErLvFFTTOxHs8dZeMd6-JtIaFdDtb0&s=zkbyqz0oyZOwyBZ9Hy7PpuGlTyPPB639vVkkFc6FlpQ&e=> >>> ") >>> >>> >>> >>> new FlinkKafkaProducer011[Person]("persons", serSchema, properties) >>> >>> } >>> >>> >>> >>> } >>> >>> >>> >>> The code does compile, however it gives the following error on runtime: >>> InvalidProgramException: >>> Object org.apache.flink.streaming.util.serialization. >>> KeyedSerializationSchemaWrapper@639c2c1d is not serializable. >>> >>> >>> >>> I assume this means that my custom SerializationSchema is not >>> serializable due to the use of SchemaFor, FromRecord and ToRecord. >>> >>> Anyone knows a solution or workaround? >>> >>> >>> >>> Thanks in advance! >>> >>> Wouter >>> >>> This message contains confidential information and is intended only for >>> the individual named. If you are not the named addressee, you should not >>> disseminate, distribute, alter or copy this e-mail. Please notify the >>> sender immediately by e-mail if you have received this e-mail by mistake >>> and delete this e-mail from your system. E-mail transmissions cannot be >>> guaranteed to be secure or without error as information could be >>> intercepted, corrupted, lost, destroyed, arrive late or incomplete, or >>> contain viruses. The sender, therefore, does not accept liability for any >>> errors or omissions in the contents of this message which arise during or >>> as a result of e-mail transmission. If verification is required, please >>> request a hard-copy version. This message is provided for information >>> purposes and should not be construed as a solicitation or offer to buy or >>> sell any securities or related financial instruments in any jurisdiction. >>> Securities are offered in the U.S. through PIMCO Investments LLC, >>> distributor and a company of PIMCO LLC. >>> >>> The individual providing the information herein is an employee of >>> Pacific Investment Management Company LLC ("PIMCO"), an SEC-registered >>> investment adviser. To the extent such individual advises you regarding a >>> PIMCO investment strategy, he or she does so as an associated person of >>> PIMCO. To the extent that any information is provided to you related to a >>> PIMCO-sponsored investment fund ("PIMCO Fund"), it is being provided to you >>> in the individual's capacity as a registered representative of PIMCO >>> Investments LLC ("PI"), an SEC-registered broker-dealer. PI is not >>> registered, and does not intend to register, as a municipal advisor and >>> therefore does not provide advice with respect to the investment of the >>> proceeds of municipal securities or municipal escrow investments. In >>> addition, unless otherwise agreed by PIMCO, this communication and any >>> related attachments are being provided on the express basis that they will >>> not cause PIMCO LLC, or its affiliates, to become an investment advice >>> fiduciary under ERISA or the Internal Revenue Code. >>> >> >