Hi,

Thanks for the suggestions.

Unfortunately I cannot make FromRecord/ForRecord/SchemaFor serializable,
since those classes are out of my control. I use those from the avro4s
library (https://github.com/sksamuel/avro4s). The problem here, especially
with the deserializer is that I need to convert an Avro 'GenericRecord' to
a Scala case class. Avro is written in Java, so thats a bit problematic and
therefore I need to Avro4s library. Avro4s tries to verify on compile-time
if the generic is actually convertible from/to a generic record, that is
why I need those context bounds.

As for @Aljoscha's workaround, I don't understand how this would solve it?
Because doesn't that just move the problem? If I create a factory, I still
need the generic (with context bounds) I specify at my
KafkaConsumer/Deserialization schema.

@Fabian I'm not sure if I understand your proposal. I still need the
context bounds for those compile-time macro's of Avro4s.

Once again, thanks for your help so far!

Regards,
Wouter



Op wo 2 mei 2018 om 16:48 schreef Fabian Hueske <fhue...@gmail.com>:

> Hi Wouter,
>
> you can try to make the SerializationSchema serializable by overriding
> Java's serialization methods writeObject() and readObject() similar as
> Flink's AvroRowSerializationSchema [1] does.
>
> Best, Fabian
>
> [1]
> https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowSerializationSchema.java
>
> 2018-05-02 16:34 GMT+02:00 Piotr Nowojski <pi...@data-artisans.com>:
>
>> Hi,
>>
>> My Scala knowledge is very limited (and my Scala's serialization
>> knowledge is non existent), but one way or another you have to make your
>> SerializationSchema serialisable. If indeed this is the problem, maybe a
>> better place to ask this question is on Stack Overflow or some scala
>> specific mailing list/board (unless someone else from the Flink's community
>> can provide an answer to this problem)?
>>
>> Piotrek
>>
>> On 1 May 2018, at 16:30, Wouter Zorgdrager <zorgdrag...@gmail.com> wrote:
>>
>> So, I'm still struggling with this issue. I dived a bit more into the
>> problem and I'm pretty sure that the problem is that I have to (implicitly)
>> pass the SchemaFor and RecordTo classes to my serialization schema
>> (otherwise I can't make it generic). However those class aren't
>> serializable, but of course I can't annotate them transient nor make it a
>> lazy val which gives me the current issue.
>>
>> I hope someone has some leads for me.
>>
>> Thanks!
>>
>> Op do 26 apr. 2018 om 23:03 schreef Wouter Zorgdrager <
>> zorgdrag...@gmail.com>:
>>
>>> Hi Bill,
>>>
>>> Thanks for your answer. However this proposal isn't going to solve my
>>> issue, since the problem here is that the context bounds I need to give in
>>> order to serialize it to Avro (SchemaFor, ToRecord and FromRecord) aren't
>>> serializable classes. This results in Flink not being able to serialize the
>>> KafkaProducer failing the whole job.
>>>
>>> Thanks,
>>> Wouter
>>>
>>> Op do 26 apr. 2018 om 00:42 schreef Nortman, Bill <
>>> william.nort...@pimco.com>:
>>>
>>>> The things I would try would first in you are you class Person and
>>>> Address have getters and setters and a no argument constructor.
>>>>
>>>>
>>>>
>>>> *From:* Wouter Zorgdrager [mailto:zorgdrag...@gmail.com]
>>>> *Sent:* Wednesday, April 25, 2018 7:17 AM
>>>> *To:* user@flink.apache.org
>>>> *Subject:* KafkaProducer with generic (Avro) serialization schema
>>>>
>>>>
>>>>
>>>> Dear reader,
>>>>
>>>>
>>>>
>>>> I'm currently working on writing a KafkaProducer which is able to
>>>> serialize a generic type using avro4s.
>>>>
>>>> However this serialization schema is not serializable itself. Here is
>>>> my code for this:
>>>>
>>>>
>>>>
>>>> The serialization schema:
>>>>
>>>> class AvroSerializationSchema[IN : SchemaFor : FromRecord: ToRecord]
>>>> extends SerializationSchema[IN] {
>>>>
>>>>
>>>>
>>>>   override def serialize(element: IN): Array[Byte] = {
>>>>
>>>>     val byteArray = new ByteArrayOutputStream()
>>>>
>>>>     val avroSer = AvroOutputStream.binary[IN](byteArray)
>>>>
>>>>     avroSer.write(element)
>>>>
>>>>     avroSer.flush()
>>>>
>>>>     avroSer.close()
>>>>
>>>>
>>>>
>>>>     return byteArray.toByteArray
>>>>
>>>>   }
>>>>
>>>> }
>>>>
>>>>
>>>>
>>>> The job code:
>>>>
>>>> case class Person(name : String, age : Int, address : Address)
>>>>
>>>> case class Address(city : String, street : String)
>>>>
>>>>
>>>>
>>>> class SimpleJob {
>>>>
>>>>
>>>>
>>>>   @transient
>>>>
>>>>   private lazy val serSchema : AvroSerializationSchema[Person] = new
>>>> AvroSerializationSchema[Person]()
>>>>
>>>>
>>>>
>>>>   def start() = {
>>>>
>>>>     val testPerson = Person("Test", 100, Address("Test", "Test"))
>>>>
>>>>
>>>>
>>>>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>>
>>>>
>>>>
>>>>     env.
>>>>
>>>>       fromCollection(Seq(testPerson)).
>>>>
>>>>       addSink(createKafkaSink())
>>>>
>>>>
>>>>
>>>>     env.execute("Flink sample job")
>>>>
>>>>   }
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>   def createKafkaSink() : RichSinkFunction[Person] = {
>>>>
>>>>     //set some properties
>>>>
>>>>     val properties = new Properties()
>>>>
>>>>     properties.put("bootstrap.servers", "127.0.0.01:9092
>>>> <https://urldefense.proofpoint.com/v2/url?u=http-3A__127.0.0.01-3A9092&d=DwMFaQ&c=91HTncUBNS9Yv-Uuv2IlCA&r=uIPe81JDQPedACYiDVt5c_onxyyylzQ6P_yssgrLiLA&m=r7xfZoirhywTOErLvFFTTOxHs8dZeMd6-JtIaFdDtb0&s=GR3YuCSPimKhPq1hcics55VX6yef8lIsMEyTmEGFRSc&e=>
>>>> ")
>>>>
>>>>     properties.put("zookeeper.connect", "127.0.0.1:2181
>>>> <https://urldefense.proofpoint.com/v2/url?u=http-3A__127.0.0.1-3A2181&d=DwMFaQ&c=91HTncUBNS9Yv-Uuv2IlCA&r=uIPe81JDQPedACYiDVt5c_onxyyylzQ6P_yssgrLiLA&m=r7xfZoirhywTOErLvFFTTOxHs8dZeMd6-JtIaFdDtb0&s=zkbyqz0oyZOwyBZ9Hy7PpuGlTyPPB639vVkkFc6FlpQ&e=>
>>>> ")
>>>>
>>>>
>>>>
>>>>     new FlinkKafkaProducer011[Person]("persons", serSchema, properties)
>>>>
>>>>   }
>>>>
>>>>
>>>>
>>>> }
>>>>
>>>>
>>>>
>>>> The code does compile, however it gives the following error on
>>>> runtime: InvalidProgramException: Object
>>>> org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper@639c2c1d
>>>> is not serializable.
>>>>
>>>>
>>>>
>>>> I assume this means that my custom SerializationSchema is not
>>>> serializable due to the use of SchemaFor, FromRecord and ToRecord.
>>>>
>>>> Anyone knows a solution or workaround?
>>>>
>>>>
>>>>
>>>> Thanks in advance!
>>>>
>>>> Wouter
>>>>
>>>> This message contains confidential information and is intended only for
>>>> the individual named. If you are not the named addressee, you should not
>>>> disseminate, distribute, alter or copy this e-mail. Please notify the
>>>> sender immediately by e-mail if you have received this e-mail by mistake
>>>> and delete this e-mail from your system. E-mail transmissions cannot be
>>>> guaranteed to be secure or without error as information could be
>>>> intercepted, corrupted, lost, destroyed, arrive late or incomplete, or
>>>> contain viruses. The sender, therefore, does not accept liability for any
>>>> errors or omissions in the contents of this message which arise during or
>>>> as a result of e-mail transmission. If verification is required, please
>>>> request a hard-copy version. This message is provided for information
>>>> purposes and should not be construed as a solicitation or offer to buy or
>>>> sell any securities or related financial instruments in any jurisdiction.
>>>> Securities are offered in the U.S. through PIMCO Investments LLC,
>>>> distributor and a company of PIMCO LLC.
>>>>
>>>> The individual providing the information herein is an employee of
>>>> Pacific Investment Management Company LLC ("PIMCO"), an SEC-registered
>>>> investment adviser.  To the extent such individual advises you regarding a
>>>> PIMCO investment strategy, he or she does so as an associated person of
>>>> PIMCO.  To the extent that any information is provided to you related to a
>>>> PIMCO-sponsored investment fund ("PIMCO Fund"), it is being provided to you
>>>> in the individual's capacity as a registered representative of PIMCO
>>>> Investments LLC ("PI"), an SEC-registered broker-dealer.  PI is not
>>>> registered, and does not intend to register, as a municipal advisor and
>>>> therefore does not provide advice with respect to the investment of the
>>>> proceeds of municipal securities or municipal escrow investments.  In
>>>> addition, unless otherwise agreed by PIMCO, this communication and any
>>>> related attachments are being provided on the express basis that they will
>>>> not cause PIMCO LLC, or its affiliates, to become an investment advice
>>>> fiduciary under ERISA or the Internal Revenue Code.
>>>>
>>>
>>
>

Reply via email to