Hi,
yes, the Avro Schema is not serializable.

Can you make the "_schema" field "transient" and then lazily initialize the
field when serialize()/deserialize() is called?
That way, you initialize the schema on the cluster, so there is no need to
transfer it over the network.


I think Flink's own serialization stack should also be able to handle Avro
types with Kafka. I'm trying to get the required tooling into Flink
0.10-SNAPSHOT.

Let me know if you need more help.


Best,
Robert




On Sat, Aug 29, 2015 at 11:38 AM, Ferenc Turi <turifs...@gmail.com> wrote:

> Hi,
>
> I tried to read avro (RecordSchema) data from Kafka using the flink-kafka
> connector but I have problems:
>
> Exception says at  program startup:
>
> Caused by: java.io.NotSerializableException:
> org.apache.avro.Schema$RecordSchema
>     at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
>     at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>     at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>     at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>     at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>
> I know RecordSchema is not serializable so It's ok but how to add
> serializer for RecordSchema?
>
> My Flink initialization:
>
> LocalStreamEnvironment env =
> StreamExecutionEnvironment.createLocalEnvironment();
> env.addSource(new KafkaSource("localhost:2181", "neverwinter", new
> MyDeserializer())).print();
>
> The deserializer:
>
> public class MyDeserializer implements DeserializationSchema<String>,
> SerializationSchema<String, byte[]>{
>     private static final long serialVersionUID = -8314881700393464119L;
>     private static final EncoderFactory avroEncoderFactory =
> EncoderFactory.get();
>     private Schema _schema;
>
>     public MyDeserializer(){
>         System.out.println("Creating MyDeserializer");
>         Schema.Parser parser = new Schema.Parser();
>         try {
>             InputStream is =
> getClass().getResourceAsStream("/avro_schema.json");
>             if (is != null){
>                 _schema = parser.parse(is);
>             }else{
>                 System.out.println("Unable to load schema file!");
>             }
>         } catch (IOException e) {
>             e.printStackTrace();
>             throw new RuntimeException(e);
>         }
>     }
>
>     public TypeInformation<String> getProducedType() {
>         return TypeExtractor.getForClass(String.class);
>     }
>
>     public String deserialize(byte[] message) {
>         String data = null;
>         try {
>             DatumReader<GenericRecord> reader = new
> GenericDatumReader<GenericRecord>(_schema);
>             Decoder decoder = DecoderFactory.get().binaryDecoder(message,
> null);
>             GenericRecord result = reader.read(null, decoder);
>             AvroKafkaData ad = new
> AvroKafkaData((Integer)result.get("id"),(Integer)result.get("random"),String.valueOf(result.get("data")));
>             System.out.println("Read kafka data: " + data);
>             data = ad.toString();
>         } catch (IOException e) {
>             throw new RuntimeException(e);
>         }
>         return data;
>     }
>
>     public boolean isEndOfStream(String nextElement) {
>         return false;
>     }
>
>     public byte[] serialize(String element) {
>         System.out.println("Serializing element = " + element);
>         byte[] data = null;
>         try {
>             GenericDatumWriter writer = new
> GenericDatumWriter(_schema);
>
>             ByteArrayOutputStream stream = new ByteArrayOutputStream();
>
>             DatumReader<GenericRecord> reader=new
> GenericDatumReader<GenericRecord>(_schema);
>             Decoder decoder=DecoderFactory.get().jsonDecoder(_schema,
> element);
>
>             GenericRecord r=reader.read(null,decoder);
>
>             BinaryEncoder binaryEncoder =
> avroEncoderFactory.binaryEncoder(stream, null);
>
>             writer.write(r, binaryEncoder);
>             binaryEncoder.flush();
>             IOUtils.closeStream(stream);
>
>             data = stream.toByteArray();
>         } catch (IOException e) {
>             throw new RuntimeException(e);
>         }
>         return data;
>     }
>
> }
>
> Unfortunately as I see only the constructor of MySerializer is called.
>
> Can somebody could suggest something?
>
> Thanks,
>
> Ferenc
>
>
>
> --
> Kind Regards,
>
> Ferenc
>
>
>

Reply via email to