Hi,

I tried to read avro (RecordSchema) data from Kafka using the flink-kafka
connector but I have problems:

Exception says at  program startup:

Caused by: java.io.NotSerializableException:
org.apache.avro.Schema$RecordSchema
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
    at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)

I know RecordSchema is not serializable so It's ok but how to add
serializer for RecordSchema?

My Flink initialization:

LocalStreamEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment();
env.addSource(new KafkaSource("localhost:2181", "neverwinter", new
MyDeserializer())).print();

The deserializer:

public class MyDeserializer implements DeserializationSchema<String>,
SerializationSchema<String, byte[]>{
    private static final long serialVersionUID = -8314881700393464119L;
    private static final EncoderFactory avroEncoderFactory =
EncoderFactory.get();
    private Schema _schema;

    public MyDeserializer(){
        System.out.println("Creating MyDeserializer");
        Schema.Parser parser = new Schema.Parser();
        try {
            InputStream is =
getClass().getResourceAsStream("/avro_schema.json");
            if (is != null){
                _schema = parser.parse(is);
            }else{
                System.out.println("Unable to load schema file!");
            }
        } catch (IOException e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        }
    }

    public TypeInformation<String> getProducedType() {
        return TypeExtractor.getForClass(String.class);
    }

    public String deserialize(byte[] message) {
        String data = null;
        try {
            DatumReader<GenericRecord> reader = new
GenericDatumReader<GenericRecord>(_schema);
            Decoder decoder = DecoderFactory.get().binaryDecoder(message,
null);
            GenericRecord result = reader.read(null, decoder);
            AvroKafkaData ad = new
AvroKafkaData((Integer)result.get("id"),(Integer)result.get("random"),String.valueOf(result.get("data")));
            System.out.println("Read kafka data: " + data);
            data = ad.toString();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
        return data;
    }

    public boolean isEndOfStream(String nextElement) {
        return false;
    }

    public byte[] serialize(String element) {
        System.out.println("Serializing element = " + element);
        byte[] data = null;
        try {
            GenericDatumWriter writer = new GenericDatumWriter(_schema);


            ByteArrayOutputStream stream = new ByteArrayOutputStream();

            DatumReader<GenericRecord> reader=new
GenericDatumReader<GenericRecord>(_schema);
            Decoder decoder=DecoderFactory.get().jsonDecoder(_schema,
element);

            GenericRecord r=reader.read(null,decoder);

            BinaryEncoder binaryEncoder =
avroEncoderFactory.binaryEncoder(stream, null);

            writer.write(r, binaryEncoder);
            binaryEncoder.flush();
            IOUtils.closeStream(stream);

            data = stream.toByteArray();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
        return data;
    }

}

Unfortunately as I see only the constructor of MySerializer is called.

Can somebody could suggest something?

Thanks,

Ferenc



-- 
Kind Regards,

Ferenc

Reply via email to