So, I'm still struggling with this issue. I dived a bit more into the
problem and I'm pretty sure that the problem is that I have to (implicitly)
pass the SchemaFor and RecordTo classes to my serialization schema
(otherwise I can't make it generic). However those class aren't
serializable, but of course I can't annotate them transient nor make it a
lazy val which gives me the current issue.

I hope someone has some leads for me.


Op do 26 apr. 2018 om 23:03 schreef Wouter Zorgdrager <

> Hi Bill,
> Thanks for your answer. However this proposal isn't going to solve my
> issue, since the problem here is that the context bounds I need to give in
> order to serialize it to Avro (SchemaFor, ToRecord and FromRecord) aren't
> serializable classes. This results in Flink not being able to serialize the
> KafkaProducer failing the whole job.
> Thanks,
> Wouter
> Op do 26 apr. 2018 om 00:42 schreef Nortman, Bill <
>> The things I would try would first in you are you class Person and
>> Address have getters and setters and a no argument constructor.
>> *From:* Wouter Zorgdrager []
>> *Sent:* Wednesday, April 25, 2018 7:17 AM
>> *To:*
>> *Subject:* KafkaProducer with generic (Avro) serialization schema
>> Dear reader,
>> I'm currently working on writing a KafkaProducer which is able to
>> serialize a generic type using avro4s.
>> However this serialization schema is not serializable itself. Here is my
>> code for this:
>> The serialization schema:
>> class AvroSerializationSchema[IN : SchemaFor : FromRecord: ToRecord]
>> extends SerializationSchema[IN] {
>>   override def serialize(element: IN): Array[Byte] = {
>>     val byteArray = new ByteArrayOutputStream()
>>     val avroSer = AvroOutputStream.binary[IN](byteArray)
>>     avroSer.write(element)
>>     avroSer.flush()
>>     avroSer.close()
>>     return byteArray.toByteArray
>>   }
>> }
>> The job code:
>> case class Person(name : String, age : Int, address : Address)
>> case class Address(city : String, street : String)
>> class SimpleJob {
>>   @transient
>>   private lazy val serSchema : AvroSerializationSchema[Person] = new
>> AvroSerializationSchema[Person]()
>>   def start() = {
>>     val testPerson = Person("Test", 100, Address("Test", "Test"))
>>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>>     env.
>>       fromCollection(Seq(testPerson)).
>>       addSink(createKafkaSink())
>>     env.execute("Flink sample job")
>>   }
>>   def createKafkaSink() : RichSinkFunction[Person] = {
>>     //set some properties
>>     val properties = new Properties()
>>     properties.put("bootstrap.servers", "
>> <>
>> ")
>>     properties.put("zookeeper.connect", "
>> <>
>> ")
>>     new FlinkKafkaProducer011[Person]("persons", serSchema, properties)
>>   }
>> }
>> The code does compile, however it gives the following error on
>> runtime: InvalidProgramException: Object
>> org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper@639c2c1d
>> is not serializable.
>> I assume this means that my custom SerializationSchema is not
>> serializable due to the use of SchemaFor, FromRecord and ToRecord.
>> Anyone knows a solution or workaround?
>> Thanks in advance!
>> Wouter
>> This message contains confidential information and is intended only for
>> the individual named. If you are not the named addressee, you should not
>> disseminate, distribute, alter or copy this e-mail. Please notify the
>> sender immediately by e-mail if you have received this e-mail by mistake
>> and delete this e-mail from your system. E-mail transmissions cannot be
>> guaranteed to be secure or without error as information could be
>> intercepted, corrupted, lost, destroyed, arrive late or incomplete, or
>> contain viruses. The sender, therefore, does not accept liability for any
>> errors or omissions in the contents of this message which arise during or
>> as a result of e-mail transmission. If verification is required, please
>> request a hard-copy version. This message is provided for information
>> purposes and should not be construed as a solicitation or offer to buy or
>> sell any securities or related financial instruments in any jurisdiction.
>> Securities are offered in the U.S. through PIMCO Investments LLC,
>> distributor and a company of PIMCO LLC.
>> The individual providing the information herein is an employee of Pacific
>> Investment Management Company LLC ("PIMCO"), an SEC-registered investment
>> adviser.  To the extent such individual advises you regarding a PIMCO
>> investment strategy, he or she does so as an associated person of PIMCO.
>> To the extent that any information is provided to you related to a
>> PIMCO-sponsored investment fund ("PIMCO Fund"), it is being provided to you
>> in the individual's capacity as a registered representative of PIMCO
>> Investments LLC ("PI"), an SEC-registered broker-dealer.  PI is not
>> registered, and does not intend to register, as a municipal advisor and
>> therefore does not provide advice with respect to the investment of the
>> proceeds of municipal securities or municipal escrow investments.  In
>> addition, unless otherwise agreed by PIMCO, this communication and any
>> related attachments are being provided on the express basis that they will
>> not cause PIMCO LLC, or its affiliates, to become an investment advice
>> fiduciary under ERISA or the Internal Revenue Code.

Reply via email to