Hi, Thanks for the suggestions.
Unfortunately I cannot make FromRecord/ForRecord/SchemaFor serializable, since those classes are out of my control. I use those from the avro4s library (https://github.com/sksamuel/avro4s). The problem here, especially with the deserializer is that I need to convert an Avro 'GenericRecord' to a Scala case class. Avro is written in Java, so thats a bit problematic and therefore I need to Avro4s library. Avro4s tries to verify on compile-time if the generic is actually convertible from/to a generic record, that is why I need those context bounds. As for @Aljoscha's workaround, I don't understand how this would solve it? Because doesn't that just move the problem? If I create a factory, I still need the generic (with context bounds) I specify at my KafkaConsumer/Deserialization schema. @Fabian I'm not sure if I understand your proposal. I still need the context bounds for those compile-time macro's of Avro4s. Once again, thanks for your help so far! Regards, Wouter Op wo 2 mei 2018 om 16:48 schreef Fabian Hueske <fhue...@gmail.com>: > Hi Wouter, > > you can try to make the SerializationSchema serializable by overriding > Java's serialization methods writeObject() and readObject() similar as > Flink's AvroRowSerializationSchema [1] does. > > Best, Fabian > > [1] > https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowSerializationSchema.java > > 2018-05-02 16:34 GMT+02:00 Piotr Nowojski <pi...@data-artisans.com>: > >> Hi, >> >> My Scala knowledge is very limited (and my Scala's serialization >> knowledge is non existent), but one way or another you have to make your >> SerializationSchema serialisable. If indeed this is the problem, maybe a >> better place to ask this question is on Stack Overflow or some scala >> specific mailing list/board (unless someone else from the Flink's community >> can provide an answer to this problem)? >> >> Piotrek >> >> On 1 May 2018, at 16:30, Wouter Zorgdrager <zorgdrag...@gmail.com> wrote: >> >> So, I'm still struggling with this issue. I dived a bit more into the >> problem and I'm pretty sure that the problem is that I have to (implicitly) >> pass the SchemaFor and RecordTo classes to my serialization schema >> (otherwise I can't make it generic). However those class aren't >> serializable, but of course I can't annotate them transient nor make it a >> lazy val which gives me the current issue. >> >> I hope someone has some leads for me. >> >> Thanks! >> >> Op do 26 apr. 2018 om 23:03 schreef Wouter Zorgdrager < >> zorgdrag...@gmail.com>: >> >>> Hi Bill, >>> >>> Thanks for your answer. However this proposal isn't going to solve my >>> issue, since the problem here is that the context bounds I need to give in >>> order to serialize it to Avro (SchemaFor, ToRecord and FromRecord) aren't >>> serializable classes. This results in Flink not being able to serialize the >>> KafkaProducer failing the whole job. >>> >>> Thanks, >>> Wouter >>> >>> Op do 26 apr. 2018 om 00:42 schreef Nortman, Bill < >>> william.nort...@pimco.com>: >>> >>>> The things I would try would first in you are you class Person and >>>> Address have getters and setters and a no argument constructor. >>>> >>>> >>>> >>>> *From:* Wouter Zorgdrager [mailto:zorgdrag...@gmail.com] >>>> *Sent:* Wednesday, April 25, 2018 7:17 AM >>>> *To:* user@flink.apache.org >>>> *Subject:* KafkaProducer with generic (Avro) serialization schema >>>> >>>> >>>> >>>> Dear reader, >>>> >>>> >>>> >>>> I'm currently working on writing a KafkaProducer which is able to >>>> serialize a generic type using avro4s. >>>> >>>> However this serialization schema is not serializable itself. Here is >>>> my code for this: >>>> >>>> >>>> >>>> The serialization schema: >>>> >>>> class AvroSerializationSchema[IN : SchemaFor : FromRecord: ToRecord] >>>> extends SerializationSchema[IN] { >>>> >>>> >>>> >>>> override def serialize(element: IN): Array[Byte] = { >>>> >>>> val byteArray = new ByteArrayOutputStream() >>>> >>>> val avroSer = AvroOutputStream.binary[IN](byteArray) >>>> >>>> avroSer.write(element) >>>> >>>> avroSer.flush() >>>> >>>> avroSer.close() >>>> >>>> >>>> >>>> return byteArray.toByteArray >>>> >>>> } >>>> >>>> } >>>> >>>> >>>> >>>> The job code: >>>> >>>> case class Person(name : String, age : Int, address : Address) >>>> >>>> case class Address(city : String, street : String) >>>> >>>> >>>> >>>> class SimpleJob { >>>> >>>> >>>> >>>> @transient >>>> >>>> private lazy val serSchema : AvroSerializationSchema[Person] = new >>>> AvroSerializationSchema[Person]() >>>> >>>> >>>> >>>> def start() = { >>>> >>>> val testPerson = Person("Test", 100, Address("Test", "Test")) >>>> >>>> >>>> >>>> val env = StreamExecutionEnvironment.getExecutionEnvironment >>>> >>>> >>>> >>>> env. >>>> >>>> fromCollection(Seq(testPerson)). >>>> >>>> addSink(createKafkaSink()) >>>> >>>> >>>> >>>> env.execute("Flink sample job") >>>> >>>> } >>>> >>>> >>>> >>>> >>>> >>>> def createKafkaSink() : RichSinkFunction[Person] = { >>>> >>>> //set some properties >>>> >>>> val properties = new Properties() >>>> >>>> properties.put("bootstrap.servers", "127.0.0.01:9092 >>>> <https://urldefense.proofpoint.com/v2/url?u=http-3A__127.0.0.01-3A9092&d=DwMFaQ&c=91HTncUBNS9Yv-Uuv2IlCA&r=uIPe81JDQPedACYiDVt5c_onxyyylzQ6P_yssgrLiLA&m=r7xfZoirhywTOErLvFFTTOxHs8dZeMd6-JtIaFdDtb0&s=GR3YuCSPimKhPq1hcics55VX6yef8lIsMEyTmEGFRSc&e=> >>>> ") >>>> >>>> properties.put("zookeeper.connect", "127.0.0.1:2181 >>>> <https://urldefense.proofpoint.com/v2/url?u=http-3A__127.0.0.1-3A2181&d=DwMFaQ&c=91HTncUBNS9Yv-Uuv2IlCA&r=uIPe81JDQPedACYiDVt5c_onxyyylzQ6P_yssgrLiLA&m=r7xfZoirhywTOErLvFFTTOxHs8dZeMd6-JtIaFdDtb0&s=zkbyqz0oyZOwyBZ9Hy7PpuGlTyPPB639vVkkFc6FlpQ&e=> >>>> ") >>>> >>>> >>>> >>>> new FlinkKafkaProducer011[Person]("persons", serSchema, properties) >>>> >>>> } >>>> >>>> >>>> >>>> } >>>> >>>> >>>> >>>> The code does compile, however it gives the following error on >>>> runtime: InvalidProgramException: Object >>>> org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper@639c2c1d >>>> is not serializable. >>>> >>>> >>>> >>>> I assume this means that my custom SerializationSchema is not >>>> serializable due to the use of SchemaFor, FromRecord and ToRecord. >>>> >>>> Anyone knows a solution or workaround? >>>> >>>> >>>> >>>> Thanks in advance! >>>> >>>> Wouter >>>> >>>> This message contains confidential information and is intended only for >>>> the individual named. If you are not the named addressee, you should not >>>> disseminate, distribute, alter or copy this e-mail. Please notify the >>>> sender immediately by e-mail if you have received this e-mail by mistake >>>> and delete this e-mail from your system. E-mail transmissions cannot be >>>> guaranteed to be secure or without error as information could be >>>> intercepted, corrupted, lost, destroyed, arrive late or incomplete, or >>>> contain viruses. The sender, therefore, does not accept liability for any >>>> errors or omissions in the contents of this message which arise during or >>>> as a result of e-mail transmission. If verification is required, please >>>> request a hard-copy version. This message is provided for information >>>> purposes and should not be construed as a solicitation or offer to buy or >>>> sell any securities or related financial instruments in any jurisdiction. >>>> Securities are offered in the U.S. through PIMCO Investments LLC, >>>> distributor and a company of PIMCO LLC. >>>> >>>> The individual providing the information herein is an employee of >>>> Pacific Investment Management Company LLC ("PIMCO"), an SEC-registered >>>> investment adviser. To the extent such individual advises you regarding a >>>> PIMCO investment strategy, he or she does so as an associated person of >>>> PIMCO. To the extent that any information is provided to you related to a >>>> PIMCO-sponsored investment fund ("PIMCO Fund"), it is being provided to you >>>> in the individual's capacity as a registered representative of PIMCO >>>> Investments LLC ("PI"), an SEC-registered broker-dealer. PI is not >>>> registered, and does not intend to register, as a municipal advisor and >>>> therefore does not provide advice with respect to the investment of the >>>> proceeds of municipal securities or municipal escrow investments. In >>>> addition, unless otherwise agreed by PIMCO, this communication and any >>>> related attachments are being provided on the express basis that they will >>>> not cause PIMCO LLC, or its affiliates, to become an investment advice >>>> fiduciary under ERISA or the Internal Revenue Code. >>>> >>> >> >