The functions/sources have an open() method that is exactly intended for
this type of initialization (constructing the Avro Schema).

You can try and subclass the kafka source and override the open() method to
initialize the schema there. Make sure you call super.open().


Greetings,
Stephan


On Sat, Aug 29, 2015 at 11:58 AM, Robert Metzger <rmetz...@apache.org>
wrote:

> Hi,
> yes, the Avro Schema is not serializable.
>
> Can you make the "_schema" field "transient" and then lazily initialize
> the field when serialize()/deserialize() is called?
> That way, you initialize the schema on the cluster, so there is no need to
> transfer it over the network.
>
>
> I think Flink's own serialization stack should also be able to handle Avro
> types with Kafka. I'm trying to get the required tooling into Flink
> 0.10-SNAPSHOT.
>
> Let me know if you need more help.
>
>
> Best,
> Robert
>
>
>
>
> On Sat, Aug 29, 2015 at 11:38 AM, Ferenc Turi <turifs...@gmail.com> wrote:
>
>> Hi,
>>
>> I tried to read avro (RecordSchema) data from Kafka using the flink-kafka
>> connector but I have problems:
>>
>> Exception says at  program startup:
>>
>> Caused by: java.io.NotSerializableException:
>> org.apache.avro.Schema$RecordSchema
>>     at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
>>     at
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>>     at
>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>>     at
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>>     at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>>
>> I know RecordSchema is not serializable so It's ok but how to add
>> serializer for RecordSchema?
>>
>> My Flink initialization:
>>
>> LocalStreamEnvironment env =
>> StreamExecutionEnvironment.createLocalEnvironment();
>> env.addSource(new KafkaSource("localhost:2181", "neverwinter", new
>> MyDeserializer())).print();
>>
>> The deserializer:
>>
>> public class MyDeserializer implements DeserializationSchema<String>,
>> SerializationSchema<String, byte[]>{
>>     private static final long serialVersionUID = -8314881700393464119L;
>>     private static final EncoderFactory avroEncoderFactory =
>> EncoderFactory.get();
>>     private Schema _schema;
>>
>>     public MyDeserializer(){
>>         System.out.println("Creating MyDeserializer");
>>         Schema.Parser parser = new Schema.Parser();
>>         try {
>>             InputStream is =
>> getClass().getResourceAsStream("/avro_schema.json");
>>             if (is != null){
>>                 _schema = parser.parse(is);
>>             }else{
>>                 System.out.println("Unable to load schema file!");
>>             }
>>         } catch (IOException e) {
>>             e.printStackTrace();
>>             throw new RuntimeException(e);
>>         }
>>     }
>>
>>     public TypeInformation<String> getProducedType() {
>>         return TypeExtractor.getForClass(String.class);
>>     }
>>
>>     public String deserialize(byte[] message) {
>>         String data = null;
>>         try {
>>             DatumReader<GenericRecord> reader = new
>> GenericDatumReader<GenericRecord>(_schema);
>>             Decoder decoder = DecoderFactory.get().binaryDecoder(message,
>> null);
>>             GenericRecord result = reader.read(null, decoder);
>>             AvroKafkaData ad = new
>> AvroKafkaData((Integer)result.get("id"),(Integer)result.get("random"),String.valueOf(result.get("data")));
>>             System.out.println("Read kafka data: " + data);
>>             data = ad.toString();
>>         } catch (IOException e) {
>>             throw new RuntimeException(e);
>>         }
>>         return data;
>>     }
>>
>>     public boolean isEndOfStream(String nextElement) {
>>         return false;
>>     }
>>
>>     public byte[] serialize(String element) {
>>         System.out.println("Serializing element = " + element);
>>         byte[] data = null;
>>         try {
>>             GenericDatumWriter writer = new
>> GenericDatumWriter(_schema);
>>
>>             ByteArrayOutputStream stream = new ByteArrayOutputStream();
>>
>>             DatumReader<GenericRecord> reader=new
>> GenericDatumReader<GenericRecord>(_schema);
>>             Decoder decoder=DecoderFactory.get().jsonDecoder(_schema,
>> element);
>>
>>             GenericRecord r=reader.read(null,decoder);
>>
>>             BinaryEncoder binaryEncoder =
>> avroEncoderFactory.binaryEncoder(stream, null);
>>
>>             writer.write(r, binaryEncoder);
>>             binaryEncoder.flush();
>>             IOUtils.closeStream(stream);
>>
>>             data = stream.toByteArray();
>>         } catch (IOException e) {
>>             throw new RuntimeException(e);
>>         }
>>         return data;
>>     }
>>
>> }
>>
>> Unfortunately as I see only the constructor of MySerializer is called.
>>
>> Can somebody could suggest something?
>>
>> Thanks,
>>
>> Ferenc
>>
>>
>>
>> --
>> Kind Regards,
>>
>> Ferenc
>>
>>
>>
>

Reply via email to