Hi Wouter,

you can try to make the SerializationSchema serializable by overriding
Java's serialization methods writeObject() and readObject() similar as
Flink's AvroRowSerializationSchema [1] does.

Best, Fabian

[1]
https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowSerializationSchema.java

2018-05-02 16:34 GMT+02:00 Piotr Nowojski <pi...@data-artisans.com>:

> Hi,
>
> My Scala knowledge is very limited (and my Scala's serialization knowledge
> is non existent), but one way or another you have to make your
> SerializationSchema serialisable. If indeed this is the problem, maybe a
> better place to ask this question is on Stack Overflow or some scala
> specific mailing list/board (unless someone else from the Flink's community
> can provide an answer to this problem)?
>
> Piotrek
>
> On 1 May 2018, at 16:30, Wouter Zorgdrager <zorgdrag...@gmail.com> wrote:
>
> So, I'm still struggling with this issue. I dived a bit more into the
> problem and I'm pretty sure that the problem is that I have to (implicitly)
> pass the SchemaFor and RecordTo classes to my serialization schema
> (otherwise I can't make it generic). However those class aren't
> serializable, but of course I can't annotate them transient nor make it a
> lazy val which gives me the current issue.
>
> I hope someone has some leads for me.
>
> Thanks!
>
> Op do 26 apr. 2018 om 23:03 schreef Wouter Zorgdrager <
> zorgdrag...@gmail.com>:
>
>> Hi Bill,
>>
>> Thanks for your answer. However this proposal isn't going to solve my
>> issue, since the problem here is that the context bounds I need to give in
>> order to serialize it to Avro (SchemaFor, ToRecord and FromRecord) aren't
>> serializable classes. This results in Flink not being able to serialize the
>> KafkaProducer failing the whole job.
>>
>> Thanks,
>> Wouter
>>
>> Op do 26 apr. 2018 om 00:42 schreef Nortman, Bill <
>> william.nort...@pimco.com>:
>>
>>> The things I would try would first in you are you class Person and
>>> Address have getters and setters and a no argument constructor.
>>>
>>>
>>>
>>> *From:* Wouter Zorgdrager [mailto:zorgdrag...@gmail.com]
>>> *Sent:* Wednesday, April 25, 2018 7:17 AM
>>> *To:* user@flink.apache.org
>>> *Subject:* KafkaProducer with generic (Avro) serialization schema
>>>
>>>
>>>
>>> Dear reader,
>>>
>>>
>>>
>>> I'm currently working on writing a KafkaProducer which is able to
>>> serialize a generic type using avro4s.
>>>
>>> However this serialization schema is not serializable itself. Here is my
>>> code for this:
>>>
>>>
>>>
>>> The serialization schema:
>>>
>>> class AvroSerializationSchema[IN : SchemaFor : FromRecord: ToRecord]
>>> extends SerializationSchema[IN] {
>>>
>>>
>>>
>>>   override def serialize(element: IN): Array[Byte] = {
>>>
>>>     val byteArray = new ByteArrayOutputStream()
>>>
>>>     val avroSer = AvroOutputStream.binary[IN](byteArray)
>>>
>>>     avroSer.write(element)
>>>
>>>     avroSer.flush()
>>>
>>>     avroSer.close()
>>>
>>>
>>>
>>>     return byteArray.toByteArray
>>>
>>>   }
>>>
>>> }
>>>
>>>
>>>
>>> The job code:
>>>
>>> case class Person(name : String, age : Int, address : Address)
>>>
>>> case class Address(city : String, street : String)
>>>
>>>
>>>
>>> class SimpleJob {
>>>
>>>
>>>
>>>   @transient
>>>
>>>   private lazy val serSchema : AvroSerializationSchema[Person] = new
>>> AvroSerializationSchema[Person]()
>>>
>>>
>>>
>>>   def start() = {
>>>
>>>     val testPerson = Person("Test", 100, Address("Test", "Test"))
>>>
>>>
>>>
>>>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>
>>>
>>>
>>>     env.
>>>
>>>       fromCollection(Seq(testPerson)).
>>>
>>>       addSink(createKafkaSink())
>>>
>>>
>>>
>>>     env.execute("Flink sample job")
>>>
>>>   }
>>>
>>>
>>>
>>>
>>>
>>>   def createKafkaSink() : RichSinkFunction[Person] = {
>>>
>>>     //set some properties
>>>
>>>     val properties = new Properties()
>>>
>>>     properties.put("bootstrap.servers", "127.0.0.01:9092
>>> <https://urldefense.proofpoint.com/v2/url?u=http-3A__127.0.0.01-3A9092&d=DwMFaQ&c=91HTncUBNS9Yv-Uuv2IlCA&r=uIPe81JDQPedACYiDVt5c_onxyyylzQ6P_yssgrLiLA&m=r7xfZoirhywTOErLvFFTTOxHs8dZeMd6-JtIaFdDtb0&s=GR3YuCSPimKhPq1hcics55VX6yef8lIsMEyTmEGFRSc&e=>
>>> ")
>>>
>>>     properties.put("zookeeper.connect", "127.0.0.1:2181
>>> <https://urldefense.proofpoint.com/v2/url?u=http-3A__127.0.0.1-3A2181&d=DwMFaQ&c=91HTncUBNS9Yv-Uuv2IlCA&r=uIPe81JDQPedACYiDVt5c_onxyyylzQ6P_yssgrLiLA&m=r7xfZoirhywTOErLvFFTTOxHs8dZeMd6-JtIaFdDtb0&s=zkbyqz0oyZOwyBZ9Hy7PpuGlTyPPB639vVkkFc6FlpQ&e=>
>>> ")
>>>
>>>
>>>
>>>     new FlinkKafkaProducer011[Person]("persons", serSchema, properties)
>>>
>>>   }
>>>
>>>
>>>
>>> }
>>>
>>>
>>>
>>> The code does compile, however it gives the following error on runtime: 
>>> InvalidProgramException:
>>> Object org.apache.flink.streaming.util.serialization.
>>> KeyedSerializationSchemaWrapper@639c2c1d is not serializable.
>>>
>>>
>>>
>>> I assume this means that my custom SerializationSchema is not
>>> serializable due to the use of SchemaFor, FromRecord and ToRecord.
>>>
>>> Anyone knows a solution or workaround?
>>>
>>>
>>>
>>> Thanks in advance!
>>>
>>> Wouter
>>>
>>> This message contains confidential information and is intended only for
>>> the individual named. If you are not the named addressee, you should not
>>> disseminate, distribute, alter or copy this e-mail. Please notify the
>>> sender immediately by e-mail if you have received this e-mail by mistake
>>> and delete this e-mail from your system. E-mail transmissions cannot be
>>> guaranteed to be secure or without error as information could be
>>> intercepted, corrupted, lost, destroyed, arrive late or incomplete, or
>>> contain viruses. The sender, therefore, does not accept liability for any
>>> errors or omissions in the contents of this message which arise during or
>>> as a result of e-mail transmission. If verification is required, please
>>> request a hard-copy version. This message is provided for information
>>> purposes and should not be construed as a solicitation or offer to buy or
>>> sell any securities or related financial instruments in any jurisdiction.
>>> Securities are offered in the U.S. through PIMCO Investments LLC,
>>> distributor and a company of PIMCO LLC.
>>>
>>> The individual providing the information herein is an employee of
>>> Pacific Investment Management Company LLC ("PIMCO"), an SEC-registered
>>> investment adviser.  To the extent such individual advises you regarding a
>>> PIMCO investment strategy, he or she does so as an associated person of
>>> PIMCO.  To the extent that any information is provided to you related to a
>>> PIMCO-sponsored investment fund ("PIMCO Fund"), it is being provided to you
>>> in the individual's capacity as a registered representative of PIMCO
>>> Investments LLC ("PI"), an SEC-registered broker-dealer.  PI is not
>>> registered, and does not intend to register, as a municipal advisor and
>>> therefore does not provide advice with respect to the investment of the
>>> proceeds of municipal securities or municipal escrow investments.  In
>>> addition, unless otherwise agreed by PIMCO, this communication and any
>>> related attachments are being provided on the express basis that they will
>>> not cause PIMCO LLC, or its affiliates, to become an investment advice
>>> fiduciary under ERISA or the Internal Revenue Code.
>>>
>>
>

Reply via email to